001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.input;
020
021 import java.io.IOException;
022 import java.nio.ByteBuffer;
023 import java.security.DigestException;
024 import java.security.MessageDigest;
025 import java.security.NoSuchAlgorithmException;
026 import java.util.regex.Pattern;
027 import java.util.regex.PatternSyntaxException;
028
029 import org.apache.commons.logging.Log;
030 import org.apache.commons.logging.LogFactory;
031 import org.apache.hadoop.classification.InterfaceAudience;
032 import org.apache.hadoop.classification.InterfaceStability;
033 import org.apache.hadoop.conf.Configurable;
034 import org.apache.hadoop.conf.Configuration;
035 import org.apache.hadoop.io.BytesWritable;
036 import org.apache.hadoop.io.Text;
037 import org.apache.hadoop.mapreduce.InputSplit;
038 import org.apache.hadoop.mapreduce.Job;
039 import org.apache.hadoop.mapreduce.RecordReader;
040 import org.apache.hadoop.mapreduce.TaskAttemptContext;
041 import org.apache.hadoop.util.ReflectionUtils;
042
043 /**
044 * A class that allows a map/red job to work on a sample of sequence files.
045 * The sample is decided by the filter class set by the job.
046 */
047 @InterfaceAudience.Public
048 @InterfaceStability.Stable
049 public class SequenceFileInputFilter<K, V>
050 extends SequenceFileInputFormat<K, V> {
051 public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
052
053 final public static String FILTER_CLASS =
054 "mapreduce.input.sequencefileinputfilter.class";
055 final public static String FILTER_FREQUENCY =
056 "mapreduce.input.sequencefileinputfilter.frequency";
057 final public static String FILTER_REGEX =
058 "mapreduce.input.sequencefileinputfilter.regex";
059
060 public SequenceFileInputFilter() {
061 }
062
063 /** Create a record reader for the given split
064 * @param split file split
065 * @param context the task-attempt context
066 * @return RecordReader
067 */
068 public RecordReader<K, V> createRecordReader(InputSplit split,
069 TaskAttemptContext context) throws IOException {
070 context.setStatus(split.toString());
071 return new FilterRecordReader<K, V>(context.getConfiguration());
072 }
073
074
075 /** set the filter class
076 *
077 * @param job The job
078 * @param filterClass filter class
079 */
080 public static void setFilterClass(Job job, Class<?> filterClass) {
081 job.getConfiguration().set(FILTER_CLASS, filterClass.getName());
082 }
083
084
085 /**
086 * filter interface
087 */
088 public interface Filter extends Configurable {
089 /** filter function
090 * Decide if a record should be filtered or not
091 * @param key record key
092 * @return true if a record is accepted; return false otherwise
093 */
094 public abstract boolean accept(Object key);
095 }
096
097 /**
098 * base class for Filters
099 */
100 public static abstract class FilterBase implements Filter {
101 Configuration conf;
102
103 public Configuration getConf() {
104 return conf;
105 }
106 }
107
108 /** Records filter by matching key to regex
109 */
110 public static class RegexFilter extends FilterBase {
111 private Pattern p;
112 /** Define the filtering regex and stores it in conf
113 * @param conf where the regex is set
114 * @param regex regex used as a filter
115 */
116 public static void setPattern(Configuration conf, String regex)
117 throws PatternSyntaxException {
118 try {
119 Pattern.compile(regex);
120 } catch (PatternSyntaxException e) {
121 throw new IllegalArgumentException("Invalid pattern: "+regex);
122 }
123 conf.set(FILTER_REGEX, regex);
124 }
125
126 public RegexFilter() { }
127
128 /** configure the Filter by checking the configuration
129 */
130 public void setConf(Configuration conf) {
131 String regex = conf.get(FILTER_REGEX);
132 if (regex == null)
133 throw new RuntimeException(FILTER_REGEX + "not set");
134 this.p = Pattern.compile(regex);
135 this.conf = conf;
136 }
137
138
139 /** Filtering method
140 * If key matches the regex, return true; otherwise return false
141 * @see Filter#accept(Object)
142 */
143 public boolean accept(Object key) {
144 return p.matcher(key.toString()).matches();
145 }
146 }
147
148 /** This class returns a percentage of records
149 * The percentage is determined by a filtering frequency <i>f</i> using
150 * the criteria record# % f == 0.
151 * For example, if the frequency is 10, one out of 10 records is returned.
152 */
153 public static class PercentFilter extends FilterBase {
154 private int frequency;
155 private int count;
156
157 /** set the frequency and stores it in conf
158 * @param conf configuration
159 * @param frequency filtering frequencey
160 */
161 public static void setFrequency(Configuration conf, int frequency) {
162 if (frequency <= 0)
163 throw new IllegalArgumentException(
164 "Negative " + FILTER_FREQUENCY + ": " + frequency);
165 conf.setInt(FILTER_FREQUENCY, frequency);
166 }
167
168 public PercentFilter() { }
169
170 /** configure the filter by checking the configuration
171 *
172 * @param conf configuration
173 */
174 public void setConf(Configuration conf) {
175 this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
176 if (this.frequency <= 0) {
177 throw new RuntimeException(
178 "Negative "+FILTER_FREQUENCY + ": " + this.frequency);
179 }
180 this.conf = conf;
181 }
182
183 /** Filtering method
184 * If record# % frequency==0, return true; otherwise return false
185 * @see Filter#accept(Object)
186 */
187 public boolean accept(Object key) {
188 boolean accepted = false;
189 if (count == 0)
190 accepted = true;
191 if (++count == frequency) {
192 count = 0;
193 }
194 return accepted;
195 }
196 }
197
198 /** This class returns a set of records by examing the MD5 digest of its
199 * key against a filtering frequency <i>f</i>. The filtering criteria is
200 * MD5(key) % f == 0.
201 */
202 public static class MD5Filter extends FilterBase {
203 private int frequency;
204 private static final MessageDigest DIGESTER;
205 public static final int MD5_LEN = 16;
206 private byte [] digest = new byte[MD5_LEN];
207
208 static {
209 try {
210 DIGESTER = MessageDigest.getInstance("MD5");
211 } catch (NoSuchAlgorithmException e) {
212 throw new RuntimeException(e);
213 }
214 }
215
216
217 /** set the filtering frequency in configuration
218 *
219 * @param conf configuration
220 * @param frequency filtering frequency
221 */
222 public static void setFrequency(Configuration conf, int frequency) {
223 if (frequency <= 0)
224 throw new IllegalArgumentException(
225 "Negative " + FILTER_FREQUENCY + ": " + frequency);
226 conf.setInt(FILTER_FREQUENCY, frequency);
227 }
228
229 public MD5Filter() { }
230
231 /** configure the filter according to configuration
232 *
233 * @param conf configuration
234 */
235 public void setConf(Configuration conf) {
236 this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
237 if (this.frequency <= 0) {
238 throw new RuntimeException(
239 "Negative " + FILTER_FREQUENCY + ": " + this.frequency);
240 }
241 this.conf = conf;
242 }
243
244 /** Filtering method
245 * If MD5(key) % frequency==0, return true; otherwise return false
246 * @see Filter#accept(Object)
247 */
248 public boolean accept(Object key) {
249 try {
250 long hashcode;
251 if (key instanceof Text) {
252 hashcode = MD5Hashcode((Text)key);
253 } else if (key instanceof BytesWritable) {
254 hashcode = MD5Hashcode((BytesWritable)key);
255 } else {
256 ByteBuffer bb;
257 bb = Text.encode(key.toString());
258 hashcode = MD5Hashcode(bb.array(), 0, bb.limit());
259 }
260 if (hashcode / frequency * frequency == hashcode)
261 return true;
262 } catch(Exception e) {
263 LOG.warn(e);
264 throw new RuntimeException(e);
265 }
266 return false;
267 }
268
269 private long MD5Hashcode(Text key) throws DigestException {
270 return MD5Hashcode(key.getBytes(), 0, key.getLength());
271 }
272
273 private long MD5Hashcode(BytesWritable key) throws DigestException {
274 return MD5Hashcode(key.getBytes(), 0, key.getLength());
275 }
276
277 synchronized private long MD5Hashcode(byte[] bytes,
278 int start, int length) throws DigestException {
279 DIGESTER.update(bytes, 0, length);
280 DIGESTER.digest(digest, 0, MD5_LEN);
281 long hashcode=0;
282 for (int i = 0; i < 8; i++)
283 hashcode |= ((digest[i] & 0xffL) << (8 * (7 - i)));
284 return hashcode;
285 }
286 }
287
288 private static class FilterRecordReader<K, V>
289 extends SequenceFileRecordReader<K, V> {
290
291 private Filter filter;
292 private K key;
293 private V value;
294
295 public FilterRecordReader(Configuration conf)
296 throws IOException {
297 super();
298 // instantiate filter
299 filter = (Filter)ReflectionUtils.newInstance(
300 conf.getClass(FILTER_CLASS, PercentFilter.class), conf);
301 }
302
303 public synchronized boolean nextKeyValue()
304 throws IOException, InterruptedException {
305 while (super.nextKeyValue()) {
306 key = super.getCurrentKey();
307 if (filter.accept(key)) {
308 value = super.getCurrentValue();
309 return true;
310 }
311 }
312 return false;
313 }
314
315 @Override
316 public K getCurrentKey() {
317 return key;
318 }
319
320 @Override
321 public V getCurrentValue() {
322 return value;
323 }
324 }
325 }