001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.lib.db;
020
021 import java.io.IOException;
022 import java.sql.Connection;
023 import java.sql.PreparedStatement;
024 import java.sql.SQLException;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.fs.FileSystem;
029 import org.apache.hadoop.mapred.JobConf;
030 import org.apache.hadoop.mapred.OutputFormat;
031 import org.apache.hadoop.mapred.RecordWriter;
032 import org.apache.hadoop.mapred.Reporter;
033 import org.apache.hadoop.mapreduce.MRJobConfig;
034 import org.apache.hadoop.mapreduce.TaskAttemptID;
035 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
036 import org.apache.hadoop.util.Progressable;
037
038 @InterfaceAudience.Public
039 @InterfaceStability.Stable
040 public class DBOutputFormat<K extends DBWritable, V>
041 extends org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V>
042 implements OutputFormat<K, V> {
043
044 /**
045 * A RecordWriter that writes the reduce output to a SQL table
046 */
047 protected class DBRecordWriter extends
048 org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V>.DBRecordWriter
049 implements RecordWriter<K, V> {
050
051 protected DBRecordWriter(Connection connection,
052 PreparedStatement statement) throws SQLException {
053 super(connection, statement);
054 }
055
056 /** {@inheritDoc} */
057 public void close(Reporter reporter) throws IOException {
058 super.close(null);
059 }
060 }
061
062 /** {@inheritDoc} */
063 public void checkOutputSpecs(FileSystem filesystem, JobConf job)
064 throws IOException {
065 }
066
067
068 /** {@inheritDoc} */
069 public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
070 JobConf job, String name, Progressable progress) throws IOException {
071 org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter(
072 new TaskAttemptContextImpl(job,
073 TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID))));
074 org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer =
075 (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w;
076 try {
077 return new DBRecordWriter(writer.getConnection(), writer.getStatement());
078 } catch(SQLException se) {
079 throw new IOException(se);
080 }
081 }
082
083 /**
084 * Initializes the reduce-part of the job with the appropriate output settings
085 *
086 * @param job The job
087 * @param tableName The table to insert data into
088 * @param fieldNames The field names in the table.
089 */
090 public static void setOutput(JobConf job, String tableName, String... fieldNames) {
091 if(fieldNames.length > 0 && fieldNames[0] != null) {
092 DBConfiguration dbConf = setOutput(job, tableName);
093 dbConf.setOutputFieldNames(fieldNames);
094 } else {
095 if(fieldNames.length > 0)
096 setOutput(job, tableName, fieldNames.length);
097 else
098 throw new IllegalArgumentException("Field names must be greater than 0");
099 }
100 }
101
102 /**
103 * Initializes the reduce-part of the job with the appropriate output settings
104 *
105 * @param job The job
106 * @param tableName The table to insert data into
107 * @param fieldCount the number of fields in the table.
108 */
109 public static void setOutput(JobConf job, String tableName, int fieldCount) {
110 DBConfiguration dbConf = setOutput(job, tableName);
111 dbConf.setOutputFieldCount(fieldCount);
112 }
113
114 private static DBConfiguration setOutput(JobConf job, String tableName) {
115 job.setOutputFormat(DBOutputFormat.class);
116 job.setReduceSpeculativeExecution(false);
117
118 DBConfiguration dbConf = new DBConfiguration(job);
119
120 dbConf.setOutputTableName(tableName);
121 return dbConf;
122 }
123
124 }