001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.db;
020
021 import java.sql.ResultSet;
022 import java.sql.SQLException;
023 import java.math.BigDecimal;
024 import java.util.ArrayList;
025 import java.util.Iterator;
026 import java.util.List;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030
031 import org.apache.hadoop.classification.InterfaceAudience;
032 import org.apache.hadoop.classification.InterfaceStability;
033 import org.apache.hadoop.conf.Configuration;
034 import org.apache.hadoop.mapreduce.InputSplit;
035 import org.apache.hadoop.mapreduce.MRJobConfig;
036
037 /**
038 * Implement DBSplitter over text strings.
039 */
040 @InterfaceAudience.Public
041 @InterfaceStability.Evolving
042 public class TextSplitter extends BigDecimalSplitter {
043
044 private static final Log LOG = LogFactory.getLog(TextSplitter.class);
045
046 /**
047 * This method needs to determine the splits between two user-provided strings.
048 * In the case where the user's strings are 'A' and 'Z', this is not hard; we
049 * could create two splits from ['A', 'M') and ['M', 'Z'], 26 splits for strings
050 * beginning with each letter, etc.
051 *
052 * If a user has provided us with the strings "Ham" and "Haze", however, we need
053 * to create splits that differ in the third letter.
054 *
055 * The algorithm used is as follows:
056 * Since there are 2**16 unicode characters, we interpret characters as digits in
057 * base 65536. Given a string 's' containing characters s_0, s_1 .. s_n, we interpret
058 * the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the
059 * low and high strings into floating-point values, we then use the BigDecimalSplitter
060 * to establish the even split points, then map the resulting floating point values
061 * back into strings.
062 */
063 public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
064 throws SQLException {
065
066 LOG.warn("Generating splits for a textual index column.");
067 LOG.warn("If your database sorts in a case-insensitive order, "
068 + "this may result in a partial import or duplicate records.");
069 LOG.warn("You are strongly encouraged to choose an integral split column.");
070
071 String minString = results.getString(1);
072 String maxString = results.getString(2);
073
074 boolean minIsNull = false;
075
076 // If the min value is null, switch it to an empty string instead for purposes
077 // of interpolation. Then add [null, null] as a special case split.
078 if (null == minString) {
079 minString = "";
080 minIsNull = true;
081 }
082
083 if (null == maxString) {
084 // If the max string is null, then the min string has to be null too.
085 // Just return a special split for this case.
086 List<InputSplit> splits = new ArrayList<InputSplit>();
087 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
088 colName + " IS NULL", colName + " IS NULL"));
089 return splits;
090 }
091
092 // Use this as a hint. May need an extra task if the size doesn't
093 // divide cleanly.
094 int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
095
096 String lowClausePrefix = colName + " >= '";
097 String highClausePrefix = colName + " < '";
098
099 // If there is a common prefix between minString and maxString, establish it
100 // and pull it out of minString and maxString.
101 int maxPrefixLen = Math.min(minString.length(), maxString.length());
102 int sharedLen;
103 for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
104 char c1 = minString.charAt(sharedLen);
105 char c2 = maxString.charAt(sharedLen);
106 if (c1 != c2) {
107 break;
108 }
109 }
110
111 // The common prefix has length 'sharedLen'. Extract it from both.
112 String commonPrefix = minString.substring(0, sharedLen);
113 minString = minString.substring(sharedLen);
114 maxString = maxString.substring(sharedLen);
115
116 List<String> splitStrings = split(numSplits, minString, maxString, commonPrefix);
117 List<InputSplit> splits = new ArrayList<InputSplit>();
118
119 // Convert the list of split point strings into an actual set of InputSplits.
120 String start = splitStrings.get(0);
121 for (int i = 1; i < splitStrings.size(); i++) {
122 String end = splitStrings.get(i);
123
124 if (i == splitStrings.size() - 1) {
125 // This is the last one; use a closed interval.
126 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
127 lowClausePrefix + start + "'", colName + " <= '" + end + "'"));
128 } else {
129 // Normal open-interval case.
130 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
131 lowClausePrefix + start + "'", highClausePrefix + end + "'"));
132 }
133 }
134
135 if (minIsNull) {
136 // Add the special null split at the end.
137 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
138 colName + " IS NULL", colName + " IS NULL"));
139 }
140
141 return splits;
142 }
143
144 List<String> split(int numSplits, String minString, String maxString, String commonPrefix)
145 throws SQLException {
146
147 BigDecimal minVal = stringToBigDecimal(minString);
148 BigDecimal maxVal = stringToBigDecimal(maxString);
149
150 List<BigDecimal> splitPoints = split(new BigDecimal(numSplits), minVal, maxVal);
151 List<String> splitStrings = new ArrayList<String>();
152
153 // Convert the BigDecimal splitPoints into their string representations.
154 for (BigDecimal bd : splitPoints) {
155 splitStrings.add(commonPrefix + bigDecimalToString(bd));
156 }
157
158 // Make sure that our user-specified boundaries are the first and last entries
159 // in the array.
160 if (splitStrings.size() == 0 || !splitStrings.get(0).equals(commonPrefix + minString)) {
161 splitStrings.add(0, commonPrefix + minString);
162 }
163 if (splitStrings.size() == 1
164 || !splitStrings.get(splitStrings.size() - 1).equals(commonPrefix + maxString)) {
165 splitStrings.add(commonPrefix + maxString);
166 }
167
168 return splitStrings;
169 }
170
171 private final static BigDecimal ONE_PLACE = new BigDecimal(65536);
172
173 // Maximum number of characters to convert. This is to prevent rounding errors
174 // or repeating fractions near the very bottom from getting out of control. Note
175 // that this still gives us a huge number of possible splits.
176 private final static int MAX_CHARS = 8;
177
178 /**
179 * Return a BigDecimal representation of string 'str' suitable for use
180 * in a numerically-sorting order.
181 */
182 BigDecimal stringToBigDecimal(String str) {
183 BigDecimal result = BigDecimal.ZERO;
184 BigDecimal curPlace = ONE_PLACE; // start with 1/65536 to compute the first digit.
185
186 int len = Math.min(str.length(), MAX_CHARS);
187
188 for (int i = 0; i < len; i++) {
189 int codePoint = str.codePointAt(i);
190 result = result.add(tryDivide(new BigDecimal(codePoint), curPlace));
191 // advance to the next less significant place. e.g., 1/(65536^2) for the second char.
192 curPlace = curPlace.multiply(ONE_PLACE);
193 }
194
195 return result;
196 }
197
198 /**
199 * Return the string encoded in a BigDecimal.
200 * Repeatedly multiply the input value by 65536; the integer portion after such a multiplication
201 * represents a single character in base 65536. Convert that back into a char and create a
202 * string out of these until we have no data left.
203 */
204 String bigDecimalToString(BigDecimal bd) {
205 BigDecimal cur = bd.stripTrailingZeros();
206 StringBuilder sb = new StringBuilder();
207
208 for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) {
209 cur = cur.multiply(ONE_PLACE);
210 int curCodePoint = cur.intValue();
211 if (0 == curCodePoint) {
212 break;
213 }
214
215 cur = cur.subtract(new BigDecimal(curCodePoint));
216 sb.append(Character.toChars(curCodePoint));
217 }
218
219 return sb.toString();
220 }
221 }