001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.pipes;
020
021 import java.io.IOException;
022 import java.net.URI;
023 import java.net.URISyntaxException;
024 import java.net.URL;
025 import java.net.URLClassLoader;
026 import java.security.AccessController;
027 import java.security.PrivilegedAction;
028 import java.util.StringTokenizer;
029
030 import org.apache.commons.cli.BasicParser;
031 import org.apache.commons.cli.CommandLine;
032 import org.apache.commons.cli.Option;
033 import org.apache.commons.cli.OptionBuilder;
034 import org.apache.commons.cli.Options;
035 import org.apache.commons.cli.ParseException;
036 import org.apache.commons.cli.Parser;
037 import org.apache.commons.logging.Log;
038 import org.apache.commons.logging.LogFactory;
039 import org.apache.hadoop.classification.InterfaceAudience;
040 import org.apache.hadoop.classification.InterfaceStability;
041 import org.apache.hadoop.conf.Configuration;
042 import org.apache.hadoop.conf.Configured;
043 import org.apache.hadoop.fs.FileSystem;
044 import org.apache.hadoop.fs.Path;
045 import org.apache.hadoop.io.Text;
046 import org.apache.hadoop.mapred.FileInputFormat;
047 import org.apache.hadoop.mapred.FileOutputFormat;
048 import org.apache.hadoop.mapred.InputFormat;
049 import org.apache.hadoop.mapred.JobClient;
050 import org.apache.hadoop.mapred.JobConf;
051 import org.apache.hadoop.mapred.Mapper;
052 import org.apache.hadoop.mapred.OutputFormat;
053 import org.apache.hadoop.mapred.Partitioner;
054 import org.apache.hadoop.mapred.Reducer;
055 import org.apache.hadoop.mapred.RunningJob;
056 import org.apache.hadoop.mapred.lib.HashPartitioner;
057 import org.apache.hadoop.mapred.lib.LazyOutputFormat;
058 import org.apache.hadoop.mapred.lib.NullOutputFormat;
059 import org.apache.hadoop.mapreduce.MRJobConfig;
060 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
061 import org.apache.hadoop.util.ExitUtil;
062 import org.apache.hadoop.util.GenericOptionsParser;
063 import org.apache.hadoop.util.Tool;
064
065 /**
066 * The main entry point and job submitter. It may either be used as a command
067 * line-based or API-based method to launch Pipes jobs.
068 */
069 @InterfaceAudience.Public
070 @InterfaceStability.Stable
071 public class Submitter extends Configured implements Tool {
072
073 protected static final Log LOG = LogFactory.getLog(Submitter.class);
074 public static final String PRESERVE_COMMANDFILE =
075 "mapreduce.pipes.commandfile.preserve";
076 public static final String EXECUTABLE = "mapreduce.pipes.executable";
077 public static final String INTERPRETOR =
078 "mapreduce.pipes.executable.interpretor";
079 public static final String IS_JAVA_MAP = "mapreduce.pipes.isjavamapper";
080 public static final String IS_JAVA_RR = "mapreduce.pipes.isjavarecordreader";
081 public static final String IS_JAVA_RW = "mapreduce.pipes.isjavarecordwriter";
082 public static final String IS_JAVA_REDUCE = "mapreduce.pipes.isjavareducer";
083 public static final String PARTITIONER = "mapreduce.pipes.partitioner";
084 public static final String INPUT_FORMAT = "mapreduce.pipes.inputformat";
085 public static final String PORT = "mapreduce.pipes.command.port";
086
087 public Submitter() {
088 this(new Configuration());
089 }
090
091 public Submitter(Configuration conf) {
092 setConf(conf);
093 }
094
095 /**
096 * Get the URI of the application's executable.
097 * @param conf
098 * @return the URI where the application's executable is located
099 */
100 public static String getExecutable(JobConf conf) {
101 return conf.get(Submitter.EXECUTABLE);
102 }
103
104 /**
105 * Set the URI for the application's executable. Normally this is a hdfs:
106 * location.
107 * @param conf
108 * @param executable The URI of the application's executable.
109 */
110 public static void setExecutable(JobConf conf, String executable) {
111 conf.set(Submitter.EXECUTABLE, executable);
112 }
113
114 /**
115 * Set whether the job is using a Java RecordReader.
116 * @param conf the configuration to modify
117 * @param value the new value
118 */
119 public static void setIsJavaRecordReader(JobConf conf, boolean value) {
120 conf.setBoolean(Submitter.IS_JAVA_RR, value);
121 }
122
123 /**
124 * Check whether the job is using a Java RecordReader
125 * @param conf the configuration to check
126 * @return is it a Java RecordReader?
127 */
128 public static boolean getIsJavaRecordReader(JobConf conf) {
129 return conf.getBoolean(Submitter.IS_JAVA_RR, false);
130 }
131
132 /**
133 * Set whether the Mapper is written in Java.
134 * @param conf the configuration to modify
135 * @param value the new value
136 */
137 public static void setIsJavaMapper(JobConf conf, boolean value) {
138 conf.setBoolean(Submitter.IS_JAVA_MAP, value);
139 }
140
141 /**
142 * Check whether the job is using a Java Mapper.
143 * @param conf the configuration to check
144 * @return is it a Java Mapper?
145 */
146 public static boolean getIsJavaMapper(JobConf conf) {
147 return conf.getBoolean(Submitter.IS_JAVA_MAP, false);
148 }
149
150 /**
151 * Set whether the Reducer is written in Java.
152 * @param conf the configuration to modify
153 * @param value the new value
154 */
155 public static void setIsJavaReducer(JobConf conf, boolean value) {
156 conf.setBoolean(Submitter.IS_JAVA_REDUCE, value);
157 }
158
159 /**
160 * Check whether the job is using a Java Reducer.
161 * @param conf the configuration to check
162 * @return is it a Java Reducer?
163 */
164 public static boolean getIsJavaReducer(JobConf conf) {
165 return conf.getBoolean(Submitter.IS_JAVA_REDUCE, false);
166 }
167
168 /**
169 * Set whether the job will use a Java RecordWriter.
170 * @param conf the configuration to modify
171 * @param value the new value to set
172 */
173 public static void setIsJavaRecordWriter(JobConf conf, boolean value) {
174 conf.setBoolean(Submitter.IS_JAVA_RW, value);
175 }
176
177 /**
178 * Will the reduce use a Java RecordWriter?
179 * @param conf the configuration to check
180 * @return true, if the output of the job will be written by Java
181 */
182 public static boolean getIsJavaRecordWriter(JobConf conf) {
183 return conf.getBoolean(Submitter.IS_JAVA_RW, false);
184 }
185
186 /**
187 * Set the configuration, if it doesn't already have a value for the given
188 * key.
189 * @param conf the configuration to modify
190 * @param key the key to set
191 * @param value the new "default" value to set
192 */
193 private static void setIfUnset(JobConf conf, String key, String value) {
194 if (conf.get(key) == null) {
195 conf.set(key, value);
196 }
197 }
198
199 /**
200 * Save away the user's original partitioner before we override it.
201 * @param conf the configuration to modify
202 * @param cls the user's partitioner class
203 */
204 static void setJavaPartitioner(JobConf conf, Class cls) {
205 conf.set(Submitter.PARTITIONER, cls.getName());
206 }
207
208 /**
209 * Get the user's original partitioner.
210 * @param conf the configuration to look in
211 * @return the class that the user submitted
212 */
213 static Class<? extends Partitioner> getJavaPartitioner(JobConf conf) {
214 return conf.getClass(Submitter.PARTITIONER,
215 HashPartitioner.class,
216 Partitioner.class);
217 }
218
219 /**
220 * Does the user want to keep the command file for debugging? If this is
221 * true, pipes will write a copy of the command data to a file in the
222 * task directory named "downlink.data", which may be used to run the C++
223 * program under the debugger. You probably also want to set
224 * JobConf.setKeepFailedTaskFiles(true) to keep the entire directory from
225 * being deleted.
226 * To run using the data file, set the environment variable
227 * "mapreduce.pipes.commandfile" to point to the file.
228 * @param conf the configuration to check
229 * @return will the framework save the command file?
230 */
231 public static boolean getKeepCommandFile(JobConf conf) {
232 return conf.getBoolean(Submitter.PRESERVE_COMMANDFILE, false);
233 }
234
235 /**
236 * Set whether to keep the command file for debugging
237 * @param conf the configuration to modify
238 * @param keep the new value
239 */
240 public static void setKeepCommandFile(JobConf conf, boolean keep) {
241 conf.setBoolean(Submitter.PRESERVE_COMMANDFILE, keep);
242 }
243
244 /**
245 * Submit a job to the map/reduce cluster. All of the necessary modifications
246 * to the job to run under pipes are made to the configuration.
247 * @param conf the job to submit to the cluster (MODIFIED)
248 * @throws IOException
249 * @deprecated Use {@link Submitter#runJob(JobConf)}
250 */
251 @Deprecated
252 public static RunningJob submitJob(JobConf conf) throws IOException {
253 return runJob(conf);
254 }
255
256 /**
257 * Submit a job to the map/reduce cluster. All of the necessary modifications
258 * to the job to run under pipes are made to the configuration.
259 * @param conf the job to submit to the cluster (MODIFIED)
260 * @throws IOException
261 */
262 public static RunningJob runJob(JobConf conf) throws IOException {
263 setupPipesJob(conf);
264 return JobClient.runJob(conf);
265 }
266
267 /**
268 * Submit a job to the Map-Reduce framework.
269 * This returns a handle to the {@link RunningJob} which can be used to track
270 * the running-job.
271 *
272 * @param conf the job configuration.
273 * @return a handle to the {@link RunningJob} which can be used to track the
274 * running-job.
275 * @throws IOException
276 */
277 public static RunningJob jobSubmit(JobConf conf) throws IOException {
278 setupPipesJob(conf);
279 return new JobClient(conf).submitJob(conf);
280 }
281
282 private static void setupPipesJob(JobConf conf) throws IOException {
283 // default map output types to Text
284 if (!getIsJavaMapper(conf)) {
285 conf.setMapRunnerClass(PipesMapRunner.class);
286 // Save the user's partitioner and hook in our's.
287 setJavaPartitioner(conf, conf.getPartitionerClass());
288 conf.setPartitionerClass(PipesPartitioner.class);
289 }
290 if (!getIsJavaReducer(conf)) {
291 conf.setReducerClass(PipesReducer.class);
292 if (!getIsJavaRecordWriter(conf)) {
293 conf.setOutputFormat(NullOutputFormat.class);
294 }
295 }
296 String textClassname = Text.class.getName();
297 setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
298 setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
299 setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
300 setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
301
302 // Use PipesNonJavaInputFormat if necessary to handle progress reporting
303 // from C++ RecordReaders ...
304 if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
305 conf.setClass(Submitter.INPUT_FORMAT,
306 conf.getInputFormat().getClass(), InputFormat.class);
307 conf.setInputFormat(PipesNonJavaInputFormat.class);
308 }
309
310 String exec = getExecutable(conf);
311 if (exec == null) {
312 throw new IllegalArgumentException("No application program defined.");
313 }
314 // add default debug script only when executable is expressed as
315 // <path>#<executable>
316 if (exec.contains("#")) {
317 // set default gdb commands for map and reduce task
318 String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
319 setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
320 setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
321 }
322 URI[] fileCache = DistributedCache.getCacheFiles(conf);
323 if (fileCache == null) {
324 fileCache = new URI[1];
325 } else {
326 URI[] tmp = new URI[fileCache.length+1];
327 System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
328 fileCache = tmp;
329 }
330 try {
331 fileCache[0] = new URI(exec);
332 } catch (URISyntaxException e) {
333 IOException ie = new IOException("Problem parsing execable URI " + exec);
334 ie.initCause(e);
335 throw ie;
336 }
337 DistributedCache.setCacheFiles(fileCache, conf);
338 }
339
340 /**
341 * A command line parser for the CLI-based Pipes job submitter.
342 */
343 static class CommandLineParser {
344 private Options options = new Options();
345
346 void addOption(String longName, boolean required, String description,
347 String paramName) {
348 Option option = OptionBuilder.withArgName(paramName).hasArgs(1).withDescription(description).isRequired(required).create(longName);
349 options.addOption(option);
350 }
351
352 void addArgument(String name, boolean required, String description) {
353 Option option = OptionBuilder.withArgName(name).hasArgs(1).withDescription(description).isRequired(required).create();
354 options.addOption(option);
355
356 }
357
358 Parser createParser() {
359 Parser result = new BasicParser();
360 return result;
361 }
362
363 void printUsage() {
364 // The CLI package should do this for us, but I can't figure out how
365 // to make it print something reasonable.
366 System.out.println("bin/hadoop pipes");
367 System.out.println(" [-input <path>] // Input directory");
368 System.out.println(" [-output <path>] // Output directory");
369 System.out.println(" [-jar <jar file> // jar filename");
370 System.out.println(" [-inputformat <class>] // InputFormat class");
371 System.out.println(" [-map <class>] // Java Map class");
372 System.out.println(" [-partitioner <class>] // Java Partitioner");
373 System.out.println(" [-reduce <class>] // Java Reduce class");
374 System.out.println(" [-writer <class>] // Java RecordWriter");
375 System.out.println(" [-program <executable>] // executable URI");
376 System.out.println(" [-reduces <num>] // number of reduces");
377 System.out.println(" [-lazyOutput <true/false>] // createOutputLazily");
378 System.out.println();
379 GenericOptionsParser.printGenericCommandUsage(System.out);
380 }
381 }
382
383 private static <InterfaceType>
384 Class<? extends InterfaceType> getClass(CommandLine cl, String key,
385 JobConf conf,
386 Class<InterfaceType> cls
387 ) throws ClassNotFoundException {
388 return conf.getClassByName(cl.getOptionValue(key)).asSubclass(cls);
389 }
390
391 @Override
392 public int run(String[] args) throws Exception {
393 CommandLineParser cli = new CommandLineParser();
394 if (args.length == 0) {
395 cli.printUsage();
396 return 1;
397 }
398 cli.addOption("input", false, "input path to the maps", "path");
399 cli.addOption("output", false, "output path from the reduces", "path");
400
401 cli.addOption("jar", false, "job jar file", "path");
402 cli.addOption("inputformat", false, "java classname of InputFormat",
403 "class");
404 //cli.addArgument("javareader", false, "is the RecordReader in Java");
405 cli.addOption("map", false, "java classname of Mapper", "class");
406 cli.addOption("partitioner", false, "java classname of Partitioner",
407 "class");
408 cli.addOption("reduce", false, "java classname of Reducer", "class");
409 cli.addOption("writer", false, "java classname of OutputFormat", "class");
410 cli.addOption("program", false, "URI to application executable", "class");
411 cli.addOption("reduces", false, "number of reduces", "num");
412 cli.addOption("jobconf", false,
413 "\"n1=v1,n2=v2,..\" (Deprecated) Optional. Add or override a JobConf property.",
414 "key=val");
415 cli.addOption("lazyOutput", false, "Optional. Create output lazily",
416 "boolean");
417 Parser parser = cli.createParser();
418 try {
419
420 GenericOptionsParser genericParser = new GenericOptionsParser(getConf(), args);
421 CommandLine results = parser.parse(cli.options, genericParser.getRemainingArgs());
422
423 JobConf job = new JobConf(getConf());
424
425 if (results.hasOption("input")) {
426 FileInputFormat.setInputPaths(job, results.getOptionValue("input"));
427 }
428 if (results.hasOption("output")) {
429 FileOutputFormat.setOutputPath(job,
430 new Path(results.getOptionValue("output")));
431 }
432 if (results.hasOption("jar")) {
433 job.setJar(results.getOptionValue("jar"));
434 }
435 if (results.hasOption("inputformat")) {
436 setIsJavaRecordReader(job, true);
437 job.setInputFormat(getClass(results, "inputformat", job,
438 InputFormat.class));
439 }
440 if (results.hasOption("javareader")) {
441 setIsJavaRecordReader(job, true);
442 }
443 if (results.hasOption("map")) {
444 setIsJavaMapper(job, true);
445 job.setMapperClass(getClass(results, "map", job, Mapper.class));
446 }
447 if (results.hasOption("partitioner")) {
448 job.setPartitionerClass(getClass(results, "partitioner", job,
449 Partitioner.class));
450 }
451 if (results.hasOption("reduce")) {
452 setIsJavaReducer(job, true);
453 job.setReducerClass(getClass(results, "reduce", job, Reducer.class));
454 }
455 if (results.hasOption("reduces")) {
456 job.setNumReduceTasks(Integer.parseInt(
457 results.getOptionValue("reduces")));
458 }
459 if (results.hasOption("writer")) {
460 setIsJavaRecordWriter(job, true);
461 job.setOutputFormat(getClass(results, "writer", job,
462 OutputFormat.class));
463 }
464
465 if (results.hasOption("lazyOutput")) {
466 if (Boolean.parseBoolean(results.getOptionValue("lazyOutput"))) {
467 LazyOutputFormat.setOutputFormatClass(job,
468 job.getOutputFormat().getClass());
469 }
470 }
471
472 if (results.hasOption("program")) {
473 setExecutable(job, results.getOptionValue("program"));
474 }
475 if (results.hasOption("jobconf")) {
476 LOG.warn("-jobconf option is deprecated, please use -D instead.");
477 String options = results.getOptionValue("jobconf");
478 StringTokenizer tokenizer = new StringTokenizer(options, ",");
479 while (tokenizer.hasMoreTokens()) {
480 String keyVal = tokenizer.nextToken().trim();
481 String[] keyValSplit = keyVal.split("=");
482 job.set(keyValSplit[0], keyValSplit[1]);
483 }
484 }
485 // if they gave us a jar file, include it into the class path
486 String jarFile = job.getJar();
487 if (jarFile != null) {
488 final URL[] urls = new URL[]{ FileSystem.getLocal(job).
489 pathToFile(new Path(jarFile)).toURL()};
490 //FindBugs complains that creating a URLClassLoader should be
491 //in a doPrivileged() block.
492 ClassLoader loader =
493 AccessController.doPrivileged(
494 new PrivilegedAction<ClassLoader>() {
495 public ClassLoader run() {
496 return new URLClassLoader(urls);
497 }
498 }
499 );
500 job.setClassLoader(loader);
501 }
502
503 runJob(job);
504 return 0;
505 } catch (ParseException pe) {
506 LOG.info("Error : " + pe);
507 cli.printUsage();
508 return 1;
509 }
510
511 }
512
513 /**
514 * Submit a pipes job based on the command line arguments.
515 * @param args
516 */
517 public static void main(String[] args) throws Exception {
518 int exitCode = new Submitter().run(args);
519 ExitUtil.terminate(exitCode);
520 }
521
522 }