001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.lib;
020
021 import java.io.IOException;
022 import java.util.Iterator;
023 import java.util.TreeMap;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.fs.FileSystem;
028 import org.apache.hadoop.fs.Path;
029 import org.apache.hadoop.mapred.JobConf;
030 import org.apache.hadoop.mapred.FileOutputFormat;
031 import org.apache.hadoop.mapred.RecordWriter;
032 import org.apache.hadoop.mapred.Reporter;
033 import org.apache.hadoop.mapreduce.JobContext;
034 import org.apache.hadoop.mapreduce.MRJobConfig;
035 import org.apache.hadoop.util.Progressable;
036
037 /**
038 * This abstract class extends the FileOutputFormat, allowing to write the
039 * output data to different output files. There are three basic use cases for
040 * this class.
041 *
042 * Case one: This class is used for a map reduce job with at least one reducer.
043 * The reducer wants to write data to different files depending on the actual
044 * keys. It is assumed that a key (or value) encodes the actual key (value)
045 * and the desired location for the actual key (value).
046 *
047 * Case two: This class is used for a map only job. The job wants to use an
048 * output file name that is either a part of the input file name of the input
049 * data, or some derivation of it.
050 *
051 * Case three: This class is used for a map only job. The job wants to use an
052 * output file name that depends on both the keys and the input file name,
053 */
054 @InterfaceAudience.Public
055 @InterfaceStability.Stable
056 public abstract class MultipleOutputFormat<K, V>
057 extends FileOutputFormat<K, V> {
058
059 /**
060 * Create a composite record writer that can write key/value data to different
061 * output files
062 *
063 * @param fs
064 * the file system to use
065 * @param job
066 * the job conf for the job
067 * @param name
068 * the leaf file name for the output file (such as part-00000")
069 * @param arg3
070 * a progressable for reporting progress.
071 * @return a composite record writer
072 * @throws IOException
073 */
074 public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
075 String name, Progressable arg3) throws IOException {
076
077 final FileSystem myFS = fs;
078 final String myName = generateLeafFileName(name);
079 final JobConf myJob = job;
080 final Progressable myProgressable = arg3;
081
082 return new RecordWriter<K, V>() {
083
084 // a cache storing the record writers for different output files.
085 TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
086
087 public void write(K key, V value) throws IOException {
088
089 // get the file name based on the key
090 String keyBasedPath = generateFileNameForKeyValue(key, value, myName);
091
092 // get the file name based on the input file name
093 String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);
094
095 // get the actual key
096 K actualKey = generateActualKey(key, value);
097 V actualValue = generateActualValue(key, value);
098
099 RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
100 if (rw == null) {
101 // if we don't have the record writer yet for the final path, create
102 // one
103 // and add it to the cache
104 rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
105 this.recordWriters.put(finalPath, rw);
106 }
107 rw.write(actualKey, actualValue);
108 };
109
110 public void close(Reporter reporter) throws IOException {
111 Iterator<String> keys = this.recordWriters.keySet().iterator();
112 while (keys.hasNext()) {
113 RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
114 rw.close(reporter);
115 }
116 this.recordWriters.clear();
117 };
118 };
119 }
120
121 /**
122 * Generate the leaf name for the output file name. The default behavior does
123 * not change the leaf file name (such as part-00000)
124 *
125 * @param name
126 * the leaf file name for the output file
127 * @return the given leaf file name
128 */
129 protected String generateLeafFileName(String name) {
130 return name;
131 }
132
133 /**
134 * Generate the file output file name based on the given key and the leaf file
135 * name. The default behavior is that the file name does not depend on the
136 * key.
137 *
138 * @param key
139 * the key of the output data
140 * @param name
141 * the leaf file name
142 * @return generated file name
143 */
144 protected String generateFileNameForKeyValue(K key, V value, String name) {
145 return name;
146 }
147
148 /**
149 * Generate the actual key from the given key/value. The default behavior is that
150 * the actual key is equal to the given key
151 *
152 * @param key
153 * the key of the output data
154 * @param value
155 * the value of the output data
156 * @return the actual key derived from the given key/value
157 */
158 protected K generateActualKey(K key, V value) {
159 return key;
160 }
161
162 /**
163 * Generate the actual value from the given key and value. The default behavior is that
164 * the actual value is equal to the given value
165 *
166 * @param key
167 * the key of the output data
168 * @param value
169 * the value of the output data
170 * @return the actual value derived from the given key/value
171 */
172 protected V generateActualValue(K key, V value) {
173 return value;
174 }
175
176
177 /**
178 * Generate the outfile name based on a given anme and the input file name. If
179 * the {@link JobContext#MAP_INPUT_FILE} does not exists (i.e. this is not for a map only job),
180 * the given name is returned unchanged. If the config value for
181 * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
182 * name is returned unchanged. Otherwise, return a file name consisting of the
183 * N trailing legs of the input file name where N is the config value for
184 * "num.of.trailing.legs.to.use".
185 *
186 * @param job
187 * the job config
188 * @param name
189 * the output file name
190 * @return the outfile name based on a given anme and the input file name.
191 */
192 protected String getInputFileBasedOutputFileName(JobConf job, String name) {
193 String infilepath = job.get(MRJobConfig.MAP_INPUT_FILE);
194 if (infilepath == null) {
195 // if the {@link JobContext#MAP_INPUT_FILE} does not exists,
196 // then return the given name
197 return name;
198 }
199 int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
200 if (numOfTrailingLegsToUse <= 0) {
201 return name;
202 }
203 Path infile = new Path(infilepath);
204 Path parent = infile.getParent();
205 String midName = infile.getName();
206 Path outPath = new Path(midName);
207 for (int i = 1; i < numOfTrailingLegsToUse; i++) {
208 if (parent == null) break;
209 midName = parent.getName();
210 if (midName.length() == 0) break;
211 parent = parent.getParent();
212 outPath = new Path(midName, outPath);
213 }
214 return outPath.toString();
215 }
216
217 /**
218 *
219 * @param fs
220 * the file system to use
221 * @param job
222 * a job conf object
223 * @param name
224 * the name of the file over which a record writer object will be
225 * constructed
226 * @param arg3
227 * a progressable object
228 * @return A RecordWriter object over the given file
229 * @throws IOException
230 */
231 abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
232 JobConf job, String name, Progressable arg3) throws IOException;
233 }