001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022 import java.util.regex.PatternSyntaxException;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.util.ReflectionUtils;
028
029 /**
030 * A class that allows a map/red job to work on a sample of sequence files.
031 * The sample is decided by the filter class set by the job.
032 */
033 @InterfaceAudience.Public
034 @InterfaceStability.Stable
035 public class SequenceFileInputFilter<K, V>
036 extends SequenceFileInputFormat<K, V> {
037
038 final private static String FILTER_CLASS = org.apache.hadoop.mapreduce.lib.
039 input.SequenceFileInputFilter.FILTER_CLASS;
040
041 public SequenceFileInputFilter() {
042 }
043
044 /** Create a record reader for the given split
045 * @param split file split
046 * @param job job configuration
047 * @param reporter reporter who sends report to task tracker
048 * @return RecordReader
049 */
050 public RecordReader<K, V> getRecordReader(InputSplit split,
051 JobConf job, Reporter reporter)
052 throws IOException {
053
054 reporter.setStatus(split.toString());
055
056 return new FilterRecordReader<K, V>(job, (FileSplit) split);
057 }
058
059
060 /** set the filter class
061 *
062 * @param conf application configuration
063 * @param filterClass filter class
064 */
065 public static void setFilterClass(Configuration conf, Class filterClass) {
066 conf.set(FILTER_CLASS, filterClass.getName());
067 }
068
069
070 /**
071 * filter interface
072 */
073 public interface Filter extends
074 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.Filter {
075 }
076
077 /**
078 * base class for Filters
079 */
080 public static abstract class FilterBase extends org.apache.hadoop.mapreduce.
081 lib.input.SequenceFileInputFilter.FilterBase
082 implements Filter {
083 }
084
085 /** Records filter by matching key to regex
086 */
087 public static class RegexFilter extends FilterBase {
088 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
089 RegexFilter rf;
090 public static void setPattern(Configuration conf, String regex)
091 throws PatternSyntaxException {
092 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
093 RegexFilter.setPattern(conf, regex);
094 }
095
096 public RegexFilter() {
097 rf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
098 RegexFilter();
099 }
100
101 /** configure the Filter by checking the configuration
102 */
103 public void setConf(Configuration conf) {
104 rf.setConf(conf);
105 }
106
107
108 /** Filtering method
109 * If key matches the regex, return true; otherwise return false
110 * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
111 */
112 public boolean accept(Object key) {
113 return rf.accept(key);
114 }
115 }
116
117 /** This class returns a percentage of records
118 * The percentage is determined by a filtering frequency <i>f</i> using
119 * the criteria record# % f == 0.
120 * For example, if the frequency is 10, one out of 10 records is returned.
121 */
122 public static class PercentFilter extends FilterBase {
123 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
124 PercentFilter pf;
125 /** set the frequency and stores it in conf
126 * @param conf configuration
127 * @param frequency filtering frequencey
128 */
129 public static void setFrequency(Configuration conf, int frequency) {
130 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
131 PercentFilter.setFrequency(conf, frequency);
132 }
133
134 public PercentFilter() {
135 pf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
136 PercentFilter();
137 }
138
139 /** configure the filter by checking the configuration
140 *
141 * @param conf configuration
142 */
143 public void setConf(Configuration conf) {
144 pf.setConf(conf);
145 }
146
147 /** Filtering method
148 * If record# % frequency==0, return true; otherwise return false
149 * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
150 */
151 public boolean accept(Object key) {
152 return pf.accept(key);
153 }
154 }
155
156 /** This class returns a set of records by examing the MD5 digest of its
157 * key against a filtering frequency <i>f</i>. The filtering criteria is
158 * MD5(key) % f == 0.
159 */
160 public static class MD5Filter extends FilterBase {
161 public static final int MD5_LEN = org.apache.hadoop.mapreduce.lib.
162 input.SequenceFileInputFilter.MD5Filter.MD5_LEN;
163 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter mf;
164 /** set the filtering frequency in configuration
165 *
166 * @param conf configuration
167 * @param frequency filtering frequency
168 */
169 public static void setFrequency(Configuration conf, int frequency) {
170 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter.
171 setFrequency(conf, frequency);
172 }
173
174 public MD5Filter() {
175 mf = new org.apache.hadoop.mapreduce.lib.input.
176 SequenceFileInputFilter.MD5Filter();
177 }
178
179 /** configure the filter according to configuration
180 *
181 * @param conf configuration
182 */
183 public void setConf(Configuration conf) {
184 mf.setConf(conf);
185 }
186
187 /** Filtering method
188 * If MD5(key) % frequency==0, return true; otherwise return false
189 * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
190 */
191 public boolean accept(Object key) {
192 return mf.accept(key);
193 }
194 }
195
196 private static class FilterRecordReader<K, V>
197 extends SequenceFileRecordReader<K, V> {
198
199 private Filter filter;
200
201 public FilterRecordReader(Configuration conf, FileSplit split)
202 throws IOException {
203 super(conf, split);
204 // instantiate filter
205 filter = (Filter)ReflectionUtils.newInstance(
206 conf.getClass(FILTER_CLASS, PercentFilter.class),
207 conf);
208 }
209
210 public synchronized boolean next(K key, V value) throws IOException {
211 while (next(key)) {
212 if (filter.accept(key)) {
213 getCurrentValue(value);
214 return true;
215 }
216 }
217
218 return false;
219 }
220 }
221 }