001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.db;
020
021 import java.sql.Connection;
022 import java.sql.DriverManager;
023 import java.sql.SQLException;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.hadoop.mapreduce.Job;
029 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
030
031 /**
032 * A container for configuration property names for jobs with DB input/output.
033 *
034 * The job can be configured using the static methods in this class,
035 * {@link DBInputFormat}, and {@link DBOutputFormat}.
036 * Alternatively, the properties can be set in the configuration with proper
037 * values.
038 *
039 * @see DBConfiguration#configureDB(Configuration, String, String, String, String)
040 * @see DBInputFormat#setInput(Job, Class, String, String)
041 * @see DBInputFormat#setInput(Job, Class, String, String, String, String...)
042 * @see DBOutputFormat#setOutput(Job, String, String...)
043 */
044 @InterfaceAudience.Public
045 @InterfaceStability.Stable
046 public class DBConfiguration {
047
048 /** The JDBC Driver class name */
049 public static final String DRIVER_CLASS_PROPERTY =
050 "mapreduce.jdbc.driver.class";
051
052 /** JDBC Database access URL */
053 public static final String URL_PROPERTY = "mapreduce.jdbc.url";
054
055 /** User name to access the database */
056 public static final String USERNAME_PROPERTY = "mapreduce.jdbc.username";
057
058 /** Password to access the database */
059 public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password";
060
061 /** Input table name */
062 public static final String INPUT_TABLE_NAME_PROPERTY =
063 "mapreduce.jdbc.input.table.name";
064
065 /** Field names in the Input table */
066 public static final String INPUT_FIELD_NAMES_PROPERTY =
067 "mapreduce.jdbc.input.field.names";
068
069 /** WHERE clause in the input SELECT statement */
070 public static final String INPUT_CONDITIONS_PROPERTY =
071 "mapreduce.jdbc.input.conditions";
072
073 /** ORDER BY clause in the input SELECT statement */
074 public static final String INPUT_ORDER_BY_PROPERTY =
075 "mapreduce.jdbc.input.orderby";
076
077 /** Whole input query, exluding LIMIT...OFFSET */
078 public static final String INPUT_QUERY = "mapreduce.jdbc.input.query";
079
080 /** Input query to get the count of records */
081 public static final String INPUT_COUNT_QUERY =
082 "mapreduce.jdbc.input.count.query";
083
084 /** Input query to get the max and min values of the jdbc.input.query */
085 public static final String INPUT_BOUNDING_QUERY =
086 "mapred.jdbc.input.bounding.query";
087
088 /** Class name implementing DBWritable which will hold input tuples */
089 public static final String INPUT_CLASS_PROPERTY =
090 "mapreduce.jdbc.input.class";
091
092 /** Output table name */
093 public static final String OUTPUT_TABLE_NAME_PROPERTY =
094 "mapreduce.jdbc.output.table.name";
095
096 /** Field names in the Output table */
097 public static final String OUTPUT_FIELD_NAMES_PROPERTY =
098 "mapreduce.jdbc.output.field.names";
099
100 /** Number of fields in the Output table */
101 public static final String OUTPUT_FIELD_COUNT_PROPERTY =
102 "mapreduce.jdbc.output.field.count";
103
104 /**
105 * Sets the DB access related fields in the {@link Configuration}.
106 * @param conf the configuration
107 * @param driverClass JDBC Driver class name
108 * @param dbUrl JDBC DB access URL.
109 * @param userName DB access username
110 * @param passwd DB access passwd
111 */
112 public static void configureDB(Configuration conf, String driverClass,
113 String dbUrl, String userName, String passwd) {
114
115 conf.set(DRIVER_CLASS_PROPERTY, driverClass);
116 conf.set(URL_PROPERTY, dbUrl);
117 if (userName != null) {
118 conf.set(USERNAME_PROPERTY, userName);
119 }
120 if (passwd != null) {
121 conf.set(PASSWORD_PROPERTY, passwd);
122 }
123 }
124
125 /**
126 * Sets the DB access related fields in the JobConf.
127 * @param job the job
128 * @param driverClass JDBC Driver class name
129 * @param dbUrl JDBC DB access URL.
130 */
131 public static void configureDB(Configuration job, String driverClass,
132 String dbUrl) {
133 configureDB(job, driverClass, dbUrl, null, null);
134 }
135
136 private Configuration conf;
137
138 public DBConfiguration(Configuration job) {
139 this.conf = job;
140 }
141
142 /** Returns a connection object o the DB
143 * @throws ClassNotFoundException
144 * @throws SQLException */
145 public Connection getConnection()
146 throws ClassNotFoundException, SQLException {
147
148 Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
149
150 if(conf.get(DBConfiguration.USERNAME_PROPERTY) == null) {
151 return DriverManager.getConnection(
152 conf.get(DBConfiguration.URL_PROPERTY));
153 } else {
154 return DriverManager.getConnection(
155 conf.get(DBConfiguration.URL_PROPERTY),
156 conf.get(DBConfiguration.USERNAME_PROPERTY),
157 conf.get(DBConfiguration.PASSWORD_PROPERTY));
158 }
159 }
160
161 public Configuration getConf() {
162 return conf;
163 }
164
165 public String getInputTableName() {
166 return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
167 }
168
169 public void setInputTableName(String tableName) {
170 conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
171 }
172
173 public String[] getInputFieldNames() {
174 return conf.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
175 }
176
177 public void setInputFieldNames(String... fieldNames) {
178 conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
179 }
180
181 public String getInputConditions() {
182 return conf.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
183 }
184
185 public void setInputConditions(String conditions) {
186 if (conditions != null && conditions.length() > 0)
187 conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
188 }
189
190 public String getInputOrderBy() {
191 return conf.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
192 }
193
194 public void setInputOrderBy(String orderby) {
195 if(orderby != null && orderby.length() >0) {
196 conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
197 }
198 }
199
200 public String getInputQuery() {
201 return conf.get(DBConfiguration.INPUT_QUERY);
202 }
203
204 public void setInputQuery(String query) {
205 if(query != null && query.length() >0) {
206 conf.set(DBConfiguration.INPUT_QUERY, query);
207 }
208 }
209
210 public String getInputCountQuery() {
211 return conf.get(DBConfiguration.INPUT_COUNT_QUERY);
212 }
213
214 public void setInputCountQuery(String query) {
215 if(query != null && query.length() > 0) {
216 conf.set(DBConfiguration.INPUT_COUNT_QUERY, query);
217 }
218 }
219
220 public void setInputBoundingQuery(String query) {
221 if (query != null && query.length() > 0) {
222 conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
223 }
224 }
225
226 public String getInputBoundingQuery() {
227 return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY);
228 }
229
230 public Class<?> getInputClass() {
231 return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY,
232 NullDBWritable.class);
233 }
234
235 public void setInputClass(Class<? extends DBWritable> inputClass) {
236 conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass,
237 DBWritable.class);
238 }
239
240 public String getOutputTableName() {
241 return conf.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
242 }
243
244 public void setOutputTableName(String tableName) {
245 conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
246 }
247
248 public String[] getOutputFieldNames() {
249 return conf.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
250 }
251
252 public void setOutputFieldNames(String... fieldNames) {
253 conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
254 }
255
256 public void setOutputFieldCount(int fieldCount) {
257 conf.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
258 }
259
260 public int getOutputFieldCount() {
261 return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
262 }
263
264 }
265