001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.db;
020
021 import java.io.IOException;
022 import java.sql.Connection;
023 import java.sql.SQLException;
024 import java.lang.reflect.Method;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.commons.logging.Log;
030 import org.apache.commons.logging.LogFactory;
031
032 /**
033 * A RecordReader that reads records from an Oracle SQL table.
034 */
035 @InterfaceAudience.Public
036 @InterfaceStability.Evolving
037 public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
038
039 /** Configuration key to set to a timezone string. */
040 public static final String SESSION_TIMEZONE_KEY = "oracle.sessionTimeZone";
041
042 private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class);
043
044 public OracleDBRecordReader(DBInputFormat.DBInputSplit split,
045 Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
046 String cond, String [] fields, String table) throws SQLException {
047 super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
048 setSessionTimeZone(conf, conn);
049 }
050
051 /** Returns the query for selecting the records from an Oracle DB. */
052 protected String getSelectQuery() {
053 StringBuilder query = new StringBuilder();
054 DBConfiguration dbConf = getDBConf();
055 String conditions = getConditions();
056 String tableName = getTableName();
057 String [] fieldNames = getFieldNames();
058
059 // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
060 if(dbConf.getInputQuery() == null) {
061 query.append("SELECT ");
062
063 for (int i = 0; i < fieldNames.length; i++) {
064 query.append(fieldNames[i]);
065 if (i != fieldNames.length -1) {
066 query.append(", ");
067 }
068 }
069
070 query.append(" FROM ").append(tableName);
071 if (conditions != null && conditions.length() > 0)
072 query.append(" WHERE ").append(conditions);
073 String orderBy = dbConf.getInputOrderBy();
074 if (orderBy != null && orderBy.length() > 0) {
075 query.append(" ORDER BY ").append(orderBy);
076 }
077 } else {
078 //PREBUILT QUERY
079 query.append(dbConf.getInputQuery());
080 }
081
082 try {
083 DBInputFormat.DBInputSplit split = getSplit();
084 if (split.getLength() > 0){
085 String querystring = query.toString();
086
087 query = new StringBuilder();
088 query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( ");
089 query.append(querystring);
090 query.append(" ) a WHERE rownum <= ").append(split.getEnd());
091 query.append(" ) WHERE dbif_rno > ").append(split.getStart());
092 }
093 } catch (IOException ex) {
094 // ignore, will not throw.
095 }
096
097 return query.toString();
098 }
099
100 /**
101 * Set session time zone
102 * @param conf The current configuration.
103 * We read the 'oracle.sessionTimeZone' property from here.
104 * @param conn The connection to alter the timezone properties of.
105 */
106 public static void setSessionTimeZone(Configuration conf,
107 Connection conn) throws SQLException {
108 // need to use reflection to call the method setSessionTimeZone on
109 // the OracleConnection class because oracle specific java libraries are
110 // not accessible in this context.
111 Method method;
112 try {
113 method = conn.getClass().getMethod(
114 "setSessionTimeZone", new Class [] {String.class});
115 } catch (Exception ex) {
116 LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex);
117 // rethrow SQLException
118 throw new SQLException(ex);
119 }
120
121 // Need to set the time zone in order for Java
122 // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE".
123 // We can't easily get the correct Oracle-specific timezone string
124 // from Java; just let the user set the timezone in a property.
125 String clientTimeZone = conf.get(SESSION_TIMEZONE_KEY, "GMT");
126 try {
127 method.setAccessible(true);
128 method.invoke(conn, clientTimeZone);
129 LOG.info("Time zone has been set to " + clientTimeZone);
130 } catch (Exception ex) {
131 LOG.warn("Time zone " + clientTimeZone +
132 " could not be set on Oracle database.");
133 LOG.warn("Setting default time zone: GMT");
134 try {
135 // "GMT" timezone is guaranteed to exist.
136 method.invoke(conn, "GMT");
137 } catch (Exception ex2) {
138 LOG.error("Could not set time zone for oracle connection", ex2);
139 // rethrow SQLException
140 throw new SQLException(ex);
141 }
142 }
143 }
144 }