001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.db;
020
021 import java.io.IOException;
022 import java.sql.Connection;
023 import java.sql.PreparedStatement;
024 import java.sql.SQLException;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.mapreduce.Job;
031 import org.apache.hadoop.mapreduce.JobContext;
032 import org.apache.hadoop.mapreduce.OutputCommitter;
033 import org.apache.hadoop.mapreduce.OutputFormat;
034 import org.apache.hadoop.mapreduce.RecordWriter;
035 import org.apache.hadoop.mapreduce.TaskAttemptContext;
036 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
037 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
038 import org.apache.hadoop.util.StringUtils;
039
040 /**
041 * A OutputFormat that sends the reduce output to a SQL table.
042 * <p>
043 * {@link DBOutputFormat} accepts <key,value> pairs, where
044 * key has a type extending DBWritable. Returned {@link RecordWriter}
045 * writes <b>only the key</b> to the database with a batch SQL query.
046 *
047 */
048 @InterfaceAudience.Public
049 @InterfaceStability.Stable
050 public class DBOutputFormat<K extends DBWritable, V>
051 extends OutputFormat<K,V> {
052
053 private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
054 public void checkOutputSpecs(JobContext context)
055 throws IOException, InterruptedException {}
056
057 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
058 throws IOException, InterruptedException {
059 return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
060 context);
061 }
062
063 /**
064 * A RecordWriter that writes the reduce output to a SQL table
065 */
066 @InterfaceStability.Evolving
067 public class DBRecordWriter
068 extends RecordWriter<K, V> {
069
070 private Connection connection;
071 private PreparedStatement statement;
072
073 public DBRecordWriter() throws SQLException {
074 }
075
076 public DBRecordWriter(Connection connection
077 , PreparedStatement statement) throws SQLException {
078 this.connection = connection;
079 this.statement = statement;
080 this.connection.setAutoCommit(false);
081 }
082
083 public Connection getConnection() {
084 return connection;
085 }
086
087 public PreparedStatement getStatement() {
088 return statement;
089 }
090
091 /** {@inheritDoc} */
092 public void close(TaskAttemptContext context) throws IOException {
093 try {
094 statement.executeBatch();
095 connection.commit();
096 } catch (SQLException e) {
097 try {
098 connection.rollback();
099 }
100 catch (SQLException ex) {
101 LOG.warn(StringUtils.stringifyException(ex));
102 }
103 throw new IOException(e.getMessage());
104 } finally {
105 try {
106 statement.close();
107 connection.close();
108 }
109 catch (SQLException ex) {
110 throw new IOException(ex.getMessage());
111 }
112 }
113 }
114
115 /** {@inheritDoc} */
116 public void write(K key, V value) throws IOException {
117 try {
118 key.write(statement);
119 statement.addBatch();
120 } catch (SQLException e) {
121 e.printStackTrace();
122 }
123 }
124 }
125
126 /**
127 * Constructs the query used as the prepared statement to insert data.
128 *
129 * @param table
130 * the table to insert into
131 * @param fieldNames
132 * the fields to insert into. If field names are unknown, supply an
133 * array of nulls.
134 */
135 public String constructQuery(String table, String[] fieldNames) {
136 if(fieldNames == null) {
137 throw new IllegalArgumentException("Field names may not be null");
138 }
139
140 StringBuilder query = new StringBuilder();
141 query.append("INSERT INTO ").append(table);
142
143 if (fieldNames.length > 0 && fieldNames[0] != null) {
144 query.append(" (");
145 for (int i = 0; i < fieldNames.length; i++) {
146 query.append(fieldNames[i]);
147 if (i != fieldNames.length - 1) {
148 query.append(",");
149 }
150 }
151 query.append(")");
152 }
153 query.append(" VALUES (");
154
155 for (int i = 0; i < fieldNames.length; i++) {
156 query.append("?");
157 if(i != fieldNames.length - 1) {
158 query.append(",");
159 }
160 }
161 query.append(");");
162
163 return query.toString();
164 }
165
166 /** {@inheritDoc} */
167 public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
168 throws IOException {
169 DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
170 String tableName = dbConf.getOutputTableName();
171 String[] fieldNames = dbConf.getOutputFieldNames();
172
173 if(fieldNames == null) {
174 fieldNames = new String[dbConf.getOutputFieldCount()];
175 }
176
177 try {
178 Connection connection = dbConf.getConnection();
179 PreparedStatement statement = null;
180
181 statement = connection.prepareStatement(
182 constructQuery(tableName, fieldNames));
183 return new DBRecordWriter(connection, statement);
184 } catch (Exception ex) {
185 throw new IOException(ex.getMessage());
186 }
187 }
188
189 /**
190 * Initializes the reduce-part of the job with
191 * the appropriate output settings
192 *
193 * @param job The job
194 * @param tableName The table to insert data into
195 * @param fieldNames The field names in the table.
196 */
197 public static void setOutput(Job job, String tableName,
198 String... fieldNames) throws IOException {
199 if(fieldNames.length > 0 && fieldNames[0] != null) {
200 DBConfiguration dbConf = setOutput(job, tableName);
201 dbConf.setOutputFieldNames(fieldNames);
202 } else {
203 if (fieldNames.length > 0) {
204 setOutput(job, tableName, fieldNames.length);
205 }
206 else {
207 throw new IllegalArgumentException(
208 "Field names must be greater than 0");
209 }
210 }
211 }
212
213 /**
214 * Initializes the reduce-part of the job
215 * with the appropriate output settings
216 *
217 * @param job The job
218 * @param tableName The table to insert data into
219 * @param fieldCount the number of fields in the table.
220 */
221 public static void setOutput(Job job, String tableName,
222 int fieldCount) throws IOException {
223 DBConfiguration dbConf = setOutput(job, tableName);
224 dbConf.setOutputFieldCount(fieldCount);
225 }
226
227 private static DBConfiguration setOutput(Job job,
228 String tableName) throws IOException {
229 job.setOutputFormatClass(DBOutputFormat.class);
230 job.setReduceSpeculativeExecution(false);
231
232 DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
233
234 dbConf.setOutputTableName(tableName);
235 return dbConf;
236 }
237 }