001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.db;
020
021 import java.io.DataInput;
022 import java.io.DataOutput;
023 import java.io.IOException;
024 import java.sql.Connection;
025 import java.sql.DatabaseMetaData;
026 import java.sql.PreparedStatement;
027 import java.sql.ResultSet;
028 import java.sql.SQLException;
029 import java.sql.Statement;
030 import java.util.ArrayList;
031 import java.util.List;
032
033 import org.apache.commons.logging.Log;
034 import org.apache.commons.logging.LogFactory;
035 import org.apache.hadoop.io.LongWritable;
036 import org.apache.hadoop.io.Writable;
037 import org.apache.hadoop.mapreduce.InputFormat;
038 import org.apache.hadoop.mapreduce.InputSplit;
039 import org.apache.hadoop.mapreduce.Job;
040 import org.apache.hadoop.mapreduce.JobContext;
041 import org.apache.hadoop.mapreduce.RecordReader;
042 import org.apache.hadoop.mapreduce.TaskAttemptContext;
043 import org.apache.hadoop.util.ReflectionUtils;
044 import org.apache.hadoop.classification.InterfaceAudience;
045 import org.apache.hadoop.classification.InterfaceStability;
046 import org.apache.hadoop.conf.Configurable;
047 import org.apache.hadoop.conf.Configuration;
048
049 /**
050 * A RecordReader that reads records from a SQL table.
051 * Emits LongWritables containing the record number as
052 * key and DBWritables as value.
053 */
054 @InterfaceAudience.Public
055 @InterfaceStability.Evolving
056 public class DBRecordReader<T extends DBWritable> extends
057 RecordReader<LongWritable, T> {
058
059 private static final Log LOG = LogFactory.getLog(DBRecordReader.class);
060
061 private ResultSet results = null;
062
063 private Class<T> inputClass;
064
065 private Configuration conf;
066
067 private DBInputFormat.DBInputSplit split;
068
069 private long pos = 0;
070
071 private LongWritable key = null;
072
073 private T value = null;
074
075 private Connection connection;
076
077 protected PreparedStatement statement;
078
079 private DBConfiguration dbConf;
080
081 private String conditions;
082
083 private String [] fieldNames;
084
085 private String tableName;
086
087 /**
088 * @param split The InputSplit to read data for
089 * @throws SQLException
090 */
091 public DBRecordReader(DBInputFormat.DBInputSplit split,
092 Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
093 String cond, String [] fields, String table)
094 throws SQLException {
095 this.inputClass = inputClass;
096 this.split = split;
097 this.conf = conf;
098 this.connection = conn;
099 this.dbConf = dbConfig;
100 this.conditions = cond;
101 this.fieldNames = fields;
102 this.tableName = table;
103 }
104
105 protected ResultSet executeQuery(String query) throws SQLException {
106 this.statement = connection.prepareStatement(query,
107 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
108 return statement.executeQuery();
109 }
110
111 /** Returns the query for selecting the records,
112 * subclasses can override this for custom behaviour.*/
113 protected String getSelectQuery() {
114 StringBuilder query = new StringBuilder();
115
116 // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
117 if(dbConf.getInputQuery() == null) {
118 query.append("SELECT ");
119
120 for (int i = 0; i < fieldNames.length; i++) {
121 query.append(fieldNames[i]);
122 if (i != fieldNames.length -1) {
123 query.append(", ");
124 }
125 }
126
127 query.append(" FROM ").append(tableName);
128 query.append(" AS ").append(tableName); //in hsqldb this is necessary
129 if (conditions != null && conditions.length() > 0) {
130 query.append(" WHERE (").append(conditions).append(")");
131 }
132
133 String orderBy = dbConf.getInputOrderBy();
134 if (orderBy != null && orderBy.length() > 0) {
135 query.append(" ORDER BY ").append(orderBy);
136 }
137 } else {
138 //PREBUILT QUERY
139 query.append(dbConf.getInputQuery());
140 }
141
142 try {
143 query.append(" LIMIT ").append(split.getLength());
144 query.append(" OFFSET ").append(split.getStart());
145 } catch (IOException ex) {
146 // Ignore, will not throw.
147 }
148
149 return query.toString();
150 }
151
152 /** {@inheritDoc} */
153 public void close() throws IOException {
154 try {
155 if (null != results) {
156 results.close();
157 }
158 if (null != statement) {
159 statement.close();
160 }
161 if (null != connection) {
162 connection.commit();
163 connection.close();
164 }
165 } catch (SQLException e) {
166 throw new IOException(e.getMessage());
167 }
168 }
169
170 public void initialize(InputSplit split, TaskAttemptContext context)
171 throws IOException, InterruptedException {
172 //do nothing
173 }
174
175 /** {@inheritDoc} */
176 public LongWritable getCurrentKey() {
177 return key;
178 }
179
180 /** {@inheritDoc} */
181 public T getCurrentValue() {
182 return value;
183 }
184
185 /**
186 * @deprecated
187 */
188 @Deprecated
189 public T createValue() {
190 return ReflectionUtils.newInstance(inputClass, conf);
191 }
192
193 /**
194 * @deprecated
195 */
196 @Deprecated
197 public long getPos() throws IOException {
198 return pos;
199 }
200
201 /**
202 * @deprecated Use {@link #nextKeyValue()}
203 */
204 @Deprecated
205 public boolean next(LongWritable key, T value) throws IOException {
206 this.key = key;
207 this.value = value;
208 return nextKeyValue();
209 }
210
211 /** {@inheritDoc} */
212 public float getProgress() throws IOException {
213 return pos / (float)split.getLength();
214 }
215
216 /** {@inheritDoc} */
217 public boolean nextKeyValue() throws IOException {
218 try {
219 if (key == null) {
220 key = new LongWritable();
221 }
222 if (value == null) {
223 value = createValue();
224 }
225 if (null == this.results) {
226 // First time into this method, run the query.
227 this.results = executeQuery(getSelectQuery());
228 }
229 if (!results.next())
230 return false;
231
232 // Set the key field value as the output key value
233 key.set(pos + split.getStart());
234
235 value.readFields(results);
236
237 pos ++;
238 } catch (SQLException e) {
239 throw new IOException("SQLException in nextKeyValue", e);
240 }
241 return true;
242 }
243
244 protected DBInputFormat.DBInputSplit getSplit() {
245 return split;
246 }
247
248 protected String [] getFieldNames() {
249 return fieldNames;
250 }
251
252 protected String getTableName() {
253 return tableName;
254 }
255
256 protected String getConditions() {
257 return conditions;
258 }
259
260 protected DBConfiguration getDBConf() {
261 return dbConf;
262 }
263
264 protected Connection getConnection() {
265 return connection;
266 }
267
268 protected PreparedStatement getStatement() {
269 return statement;
270 }
271
272 protected void setStatement(PreparedStatement stmt) {
273 this.statement = stmt;
274 }
275 }