001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021
022 import java.io.IOException;
023 import java.util.regex.Pattern;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceAudience.Private;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.FileStatus;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.LongWritable;
035 import org.apache.hadoop.io.RawComparator;
036 import org.apache.hadoop.io.Text;
037 import org.apache.hadoop.io.WritableComparable;
038 import org.apache.hadoop.io.WritableComparator;
039 import org.apache.hadoop.io.compress.CompressionCodec;
040 import org.apache.hadoop.mapred.lib.HashPartitioner;
041 import org.apache.hadoop.mapred.lib.IdentityMapper;
042 import org.apache.hadoop.mapred.lib.IdentityReducer;
043 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
044 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
045 import org.apache.hadoop.mapreduce.MRConfig;
046 import org.apache.hadoop.mapreduce.MRJobConfig;
047 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048 import org.apache.hadoop.mapreduce.util.ConfigUtil;
049 import org.apache.hadoop.security.Credentials;
050 import org.apache.hadoop.util.ClassUtil;
051 import org.apache.hadoop.util.ReflectionUtils;
052 import org.apache.hadoop.util.Tool;
053 import org.apache.log4j.Level;
054
055 /**
056 * A map/reduce job configuration.
057 *
058 * <p><code>JobConf</code> is the primary interface for a user to describe a
059 * map-reduce job to the Hadoop framework for execution. The framework tries to
060 * faithfully execute the job as-is described by <code>JobConf</code>, however:
061 * <ol>
062 * <li>
063 * Some configuration parameters might have been marked as
064 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
065 * final</a> by administrators and hence cannot be altered.
066 * </li>
067 * <li>
068 * While some job parameters are straight-forward to set
069 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
070 * with the rest of the framework and/or job-configuration and is relatively
071 * more complex for the user to control finely
072 * (e.g. {@link #setNumMapTasks(int)}).
073 * </li>
074 * </ol></p>
075 *
076 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner
077 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
078 * {@link OutputFormat} implementations to be used etc.
079 *
080 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets
081 * of the job such as <code>Comparator</code>s to be used, files to be put in
082 * the {@link DistributedCache}, whether or not intermediate and/or job outputs
083 * are to be compressed (and how), debugability via user-provided scripts
084 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
085 * for doing post-processing on task logs, task's stdout, stderr, syslog.
086 * and etc.</p>
087 *
088 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
089 * <p><blockquote><pre>
090 * // Create a new JobConf
091 * JobConf job = new JobConf(new Configuration(), MyJob.class);
092 *
093 * // Specify various job-specific parameters
094 * job.setJobName("myjob");
095 *
096 * FileInputFormat.setInputPaths(job, new Path("in"));
097 * FileOutputFormat.setOutputPath(job, new Path("out"));
098 *
099 * job.setMapperClass(MyJob.MyMapper.class);
100 * job.setCombinerClass(MyJob.MyReducer.class);
101 * job.setReducerClass(MyJob.MyReducer.class);
102 *
103 * job.setInputFormat(SequenceFileInputFormat.class);
104 * job.setOutputFormat(SequenceFileOutputFormat.class);
105 * </pre></blockquote></p>
106 *
107 * @see JobClient
108 * @see ClusterStatus
109 * @see Tool
110 * @see DistributedCache
111 */
112 @InterfaceAudience.Public
113 @InterfaceStability.Stable
114 public class JobConf extends Configuration {
115
116 private static final Log LOG = LogFactory.getLog(JobConf.class);
117
118 static{
119 ConfigUtil.loadResources();
120 }
121
122 /**
123 * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
124 * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
125 */
126 @Deprecated
127 public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
128 "mapred.task.maxvmem";
129
130 /**
131 * @deprecated
132 */
133 @Deprecated
134 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
135 "mapred.task.limit.maxvmem";
136
137 /**
138 * @deprecated
139 */
140 @Deprecated
141 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
142 "mapred.task.default.maxvmem";
143
144 /**
145 * @deprecated
146 */
147 @Deprecated
148 public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
149 "mapred.task.maxpmem";
150
151 /**
152 * A value which if set for memory related configuration options,
153 * indicates that the options are turned off.
154 * Deprecated because it makes no sense in the context of MR2.
155 */
156 @Deprecated
157 public static final long DISABLED_MEMORY_LIMIT = -1L;
158
159 /**
160 * Property name for the configuration property mapreduce.cluster.local.dir
161 */
162 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
163
164 /**
165 * Name of the queue to which jobs will be submitted, if no queue
166 * name is mentioned.
167 */
168 public static final String DEFAULT_QUEUE_NAME = "default";
169
170 static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
171 JobContext.MAP_MEMORY_MB;
172
173 static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
174 JobContext.REDUCE_MEMORY_MB;
175
176 /**
177 * The variable is kept for M/R 1.x applications, while M/R 2.x applications
178 * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
179 */
180 @Deprecated
181 public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
182 "mapred.job.map.memory.mb";
183
184 /**
185 * The variable is kept for M/R 1.x applications, while M/R 2.x applications
186 * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
187 */
188 @Deprecated
189 public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
190 "mapred.job.reduce.memory.mb";
191
192 /** Pattern for the default unpacking behavior for job jars */
193 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
194 Pattern.compile("(?:classes/|lib/).*");
195
196 /**
197 * Configuration key to set the java command line options for the child
198 * map and reduce tasks.
199 *
200 * Java opts for the task tracker child processes.
201 * The following symbol, if present, will be interpolated: @taskid@.
202 * It is replaced by current TaskID. Any other occurrences of '@' will go
203 * unchanged.
204 * For example, to enable verbose gc logging to a file named for the taskid in
205 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
206 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
207 *
208 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass
209 * other environment variables to the child processes.
210 *
211 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or
212 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
213 */
214 @Deprecated
215 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
216
217 /**
218 * Configuration key to set the java command line options for the map tasks.
219 *
220 * Java opts for the task tracker child map processes.
221 * The following symbol, if present, will be interpolated: @taskid@.
222 * It is replaced by current TaskID. Any other occurrences of '@' will go
223 * unchanged.
224 * For example, to enable verbose gc logging to a file named for the taskid in
225 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
226 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
227 *
228 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass
229 * other environment variables to the map processes.
230 */
231 public static final String MAPRED_MAP_TASK_JAVA_OPTS =
232 JobContext.MAP_JAVA_OPTS;
233
234 /**
235 * Configuration key to set the java command line options for the reduce tasks.
236 *
237 * Java opts for the task tracker child reduce processes.
238 * The following symbol, if present, will be interpolated: @taskid@.
239 * It is replaced by current TaskID. Any other occurrences of '@' will go
240 * unchanged.
241 * For example, to enable verbose gc logging to a file named for the taskid in
242 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
243 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
244 *
245 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to
246 * pass process environment variables to the reduce processes.
247 */
248 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS =
249 JobContext.REDUCE_JAVA_OPTS;
250
251 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
252
253 /**
254 * @deprecated
255 * Configuration key to set the maximum virtual memory available to the child
256 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
257 * longer have any effect.
258 */
259 @Deprecated
260 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
261
262 /**
263 * @deprecated
264 * Configuration key to set the maximum virtual memory available to the
265 * map tasks (in kilo-bytes). This has been deprecated and will no
266 * longer have any effect.
267 */
268 @Deprecated
269 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
270
271 /**
272 * @deprecated
273 * Configuration key to set the maximum virtual memory available to the
274 * reduce tasks (in kilo-bytes). This has been deprecated and will no
275 * longer have any effect.
276 */
277 @Deprecated
278 public static final String MAPRED_REDUCE_TASK_ULIMIT =
279 "mapreduce.reduce.ulimit";
280
281
282 /**
283 * Configuration key to set the environment of the child map/reduce tasks.
284 *
285 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
286 * reference existing environment variables via <code>$key</code> on
287 * Linux or <code>%key%</code> on Windows.
288 *
289 * Example:
290 * <ul>
291 * <li> A=foo - This will set the env variable A to foo. </li>
292 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
293 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
294 * </ul>
295 *
296 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or
297 * {@link #MAPRED_REDUCE_TASK_ENV}
298 */
299 @Deprecated
300 public static final String MAPRED_TASK_ENV = "mapred.child.env";
301
302 /**
303 * Configuration key to set the environment of the child map tasks.
304 *
305 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
306 * reference existing environment variables via <code>$key</code> on
307 * Linux or <code>%key%</code> on Windows.
308 *
309 * Example:
310 * <ul>
311 * <li> A=foo - This will set the env variable A to foo. </li>
312 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
313 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
314 * </ul>
315 */
316 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
317
318 /**
319 * Configuration key to set the environment of the child reduce tasks.
320 *
321 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
322 * reference existing environment variables via <code>$key</code> on
323 * Linux or <code>%key%</code> on Windows.
324 *
325 * Example:
326 * <ul>
327 * <li> A=foo - This will set the env variable A to foo. </li>
328 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
329 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
330 * </ul>
331 */
332 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
333
334 private Credentials credentials = new Credentials();
335
336 /**
337 * Configuration key to set the logging {@link Level} for the map task.
338 *
339 * The allowed logging levels are:
340 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
341 */
342 public static final String MAPRED_MAP_TASK_LOG_LEVEL =
343 JobContext.MAP_LOG_LEVEL;
344
345 /**
346 * Configuration key to set the logging {@link Level} for the reduce task.
347 *
348 * The allowed logging levels are:
349 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
350 */
351 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL =
352 JobContext.REDUCE_LOG_LEVEL;
353
354 /**
355 * Default logging level for map/reduce tasks.
356 */
357 public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
358
359 /**
360 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
361 * use {@link MRJobConfig#WORKFLOW_ID} instead
362 */
363 @Deprecated
364 public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
365
366 /**
367 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
368 * use {@link MRJobConfig#WORKFLOW_NAME} instead
369 */
370 @Deprecated
371 public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
372
373 /**
374 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
375 * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
376 */
377 @Deprecated
378 public static final String WORKFLOW_NODE_NAME =
379 MRJobConfig.WORKFLOW_NODE_NAME;
380
381 /**
382 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
383 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
384 */
385 @Deprecated
386 public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
387 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
388
389 /**
390 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
391 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
392 */
393 @Deprecated
394 public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
395 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
396
397 /**
398 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
399 * use {@link MRJobConfig#WORKFLOW_TAGS} instead
400 */
401 @Deprecated
402 public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
403
404 /**
405 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
406 * not use it
407 */
408 @Deprecated
409 public static final String MAPREDUCE_RECOVER_JOB =
410 "mapreduce.job.restart.recover";
411
412 /**
413 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
414 * not use it
415 */
416 @Deprecated
417 public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
418
419 /**
420 * Construct a map/reduce job configuration.
421 */
422 public JobConf() {
423 checkAndWarnDeprecation();
424 }
425
426 /**
427 * Construct a map/reduce job configuration.
428 *
429 * @param exampleClass a class whose containing jar is used as the job's jar.
430 */
431 public JobConf(Class exampleClass) {
432 setJarByClass(exampleClass);
433 checkAndWarnDeprecation();
434 }
435
436 /**
437 * Construct a map/reduce job configuration.
438 *
439 * @param conf a Configuration whose settings will be inherited.
440 */
441 public JobConf(Configuration conf) {
442 super(conf);
443
444 if (conf instanceof JobConf) {
445 JobConf that = (JobConf)conf;
446 credentials = that.credentials;
447 }
448
449 checkAndWarnDeprecation();
450 }
451
452
453 /** Construct a map/reduce job configuration.
454 *
455 * @param conf a Configuration whose settings will be inherited.
456 * @param exampleClass a class whose containing jar is used as the job's jar.
457 */
458 public JobConf(Configuration conf, Class exampleClass) {
459 this(conf);
460 setJarByClass(exampleClass);
461 }
462
463
464 /** Construct a map/reduce configuration.
465 *
466 * @param config a Configuration-format XML job description file.
467 */
468 public JobConf(String config) {
469 this(new Path(config));
470 }
471
472 /** Construct a map/reduce configuration.
473 *
474 * @param config a Configuration-format XML job description file.
475 */
476 public JobConf(Path config) {
477 super();
478 addResource(config);
479 checkAndWarnDeprecation();
480 }
481
482 /** A new map/reduce configuration where the behavior of reading from the
483 * default resources can be turned off.
484 * <p/>
485 * If the parameter {@code loadDefaults} is false, the new instance
486 * will not load resources from the default files.
487 *
488 * @param loadDefaults specifies whether to load from the default files
489 */
490 public JobConf(boolean loadDefaults) {
491 super(loadDefaults);
492 checkAndWarnDeprecation();
493 }
494
495 /**
496 * Get credentials for the job.
497 * @return credentials for the job
498 */
499 public Credentials getCredentials() {
500 return credentials;
501 }
502
503 @Private
504 public void setCredentials(Credentials credentials) {
505 this.credentials = credentials;
506 }
507
508 /**
509 * Get the user jar for the map-reduce job.
510 *
511 * @return the user jar for the map-reduce job.
512 */
513 public String getJar() { return get(JobContext.JAR); }
514
515 /**
516 * Set the user jar for the map-reduce job.
517 *
518 * @param jar the user jar for the map-reduce job.
519 */
520 public void setJar(String jar) { set(JobContext.JAR, jar); }
521
522 /**
523 * Get the pattern for jar contents to unpack on the tasktracker
524 */
525 public Pattern getJarUnpackPattern() {
526 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
527 }
528
529
530 /**
531 * Set the job's jar file by finding an example class location.
532 *
533 * @param cls the example class.
534 */
535 public void setJarByClass(Class cls) {
536 String jar = ClassUtil.findContainingJar(cls);
537 if (jar != null) {
538 setJar(jar);
539 }
540 }
541
542 public String[] getLocalDirs() throws IOException {
543 return getTrimmedStrings(MRConfig.LOCAL_DIR);
544 }
545
546 /**
547 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
548 */
549 @Deprecated
550 public void deleteLocalFiles() throws IOException {
551 String[] localDirs = getLocalDirs();
552 for (int i = 0; i < localDirs.length; i++) {
553 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
554 }
555 }
556
557 public void deleteLocalFiles(String subdir) throws IOException {
558 String[] localDirs = getLocalDirs();
559 for (int i = 0; i < localDirs.length; i++) {
560 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
561 }
562 }
563
564 /**
565 * Constructs a local file name. Files are distributed among configured
566 * local directories.
567 */
568 public Path getLocalPath(String pathString) throws IOException {
569 return getLocalPath(MRConfig.LOCAL_DIR, pathString);
570 }
571
572 /**
573 * Get the reported username for this job.
574 *
575 * @return the username
576 */
577 public String getUser() {
578 return get(JobContext.USER_NAME);
579 }
580
581 /**
582 * Set the reported username for this job.
583 *
584 * @param user the username for this job.
585 */
586 public void setUser(String user) {
587 set(JobContext.USER_NAME, user);
588 }
589
590
591
592 /**
593 * Set whether the framework should keep the intermediate files for
594 * failed tasks.
595 *
596 * @param keep <code>true</code> if framework should keep the intermediate files
597 * for failed tasks, <code>false</code> otherwise.
598 *
599 */
600 public void setKeepFailedTaskFiles(boolean keep) {
601 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
602 }
603
604 /**
605 * Should the temporary files for failed tasks be kept?
606 *
607 * @return should the files be kept?
608 */
609 public boolean getKeepFailedTaskFiles() {
610 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
611 }
612
613 /**
614 * Set a regular expression for task names that should be kept.
615 * The regular expression ".*_m_000123_0" would keep the files
616 * for the first instance of map 123 that ran.
617 *
618 * @param pattern the java.util.regex.Pattern to match against the
619 * task names.
620 */
621 public void setKeepTaskFilesPattern(String pattern) {
622 set(JobContext.PRESERVE_FILES_PATTERN, pattern);
623 }
624
625 /**
626 * Get the regular expression that is matched against the task names
627 * to see if we need to keep the files.
628 *
629 * @return the pattern as a string, if it was set, othewise null.
630 */
631 public String getKeepTaskFilesPattern() {
632 return get(JobContext.PRESERVE_FILES_PATTERN);
633 }
634
635 /**
636 * Set the current working directory for the default file system.
637 *
638 * @param dir the new current working directory.
639 */
640 public void setWorkingDirectory(Path dir) {
641 dir = new Path(getWorkingDirectory(), dir);
642 set(JobContext.WORKING_DIR, dir.toString());
643 }
644
645 /**
646 * Get the current working directory for the default file system.
647 *
648 * @return the directory name.
649 */
650 public Path getWorkingDirectory() {
651 String name = get(JobContext.WORKING_DIR);
652 if (name != null) {
653 return new Path(name);
654 } else {
655 try {
656 Path dir = FileSystem.get(this).getWorkingDirectory();
657 set(JobContext.WORKING_DIR, dir.toString());
658 return dir;
659 } catch (IOException e) {
660 throw new RuntimeException(e);
661 }
662 }
663 }
664
665 /**
666 * Sets the number of tasks that a spawned task JVM should run
667 * before it exits
668 * @param numTasks the number of tasks to execute; defaults to 1;
669 * -1 signifies no limit
670 */
671 public void setNumTasksToExecutePerJvm(int numTasks) {
672 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
673 }
674
675 /**
676 * Get the number of tasks that a spawned JVM should execute
677 */
678 public int getNumTasksToExecutePerJvm() {
679 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
680 }
681
682 /**
683 * Get the {@link InputFormat} implementation for the map-reduce job,
684 * defaults to {@link TextInputFormat} if not specified explicity.
685 *
686 * @return the {@link InputFormat} implementation for the map-reduce job.
687 */
688 public InputFormat getInputFormat() {
689 return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
690 TextInputFormat.class,
691 InputFormat.class),
692 this);
693 }
694
695 /**
696 * Set the {@link InputFormat} implementation for the map-reduce job.
697 *
698 * @param theClass the {@link InputFormat} implementation for the map-reduce
699 * job.
700 */
701 public void setInputFormat(Class<? extends InputFormat> theClass) {
702 setClass("mapred.input.format.class", theClass, InputFormat.class);
703 }
704
705 /**
706 * Get the {@link OutputFormat} implementation for the map-reduce job,
707 * defaults to {@link TextOutputFormat} if not specified explicity.
708 *
709 * @return the {@link OutputFormat} implementation for the map-reduce job.
710 */
711 public OutputFormat getOutputFormat() {
712 return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
713 TextOutputFormat.class,
714 OutputFormat.class),
715 this);
716 }
717
718 /**
719 * Get the {@link OutputCommitter} implementation for the map-reduce job,
720 * defaults to {@link FileOutputCommitter} if not specified explicitly.
721 *
722 * @return the {@link OutputCommitter} implementation for the map-reduce job.
723 */
724 public OutputCommitter getOutputCommitter() {
725 return (OutputCommitter)ReflectionUtils.newInstance(
726 getClass("mapred.output.committer.class", FileOutputCommitter.class,
727 OutputCommitter.class), this);
728 }
729
730 /**
731 * Set the {@link OutputCommitter} implementation for the map-reduce job.
732 *
733 * @param theClass the {@link OutputCommitter} implementation for the map-reduce
734 * job.
735 */
736 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
737 setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
738 }
739
740 /**
741 * Set the {@link OutputFormat} implementation for the map-reduce job.
742 *
743 * @param theClass the {@link OutputFormat} implementation for the map-reduce
744 * job.
745 */
746 public void setOutputFormat(Class<? extends OutputFormat> theClass) {
747 setClass("mapred.output.format.class", theClass, OutputFormat.class);
748 }
749
750 /**
751 * Should the map outputs be compressed before transfer?
752 *
753 * @param compress should the map outputs be compressed?
754 */
755 public void setCompressMapOutput(boolean compress) {
756 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
757 }
758
759 /**
760 * Are the outputs of the maps be compressed?
761 *
762 * @return <code>true</code> if the outputs of the maps are to be compressed,
763 * <code>false</code> otherwise.
764 */
765 public boolean getCompressMapOutput() {
766 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
767 }
768
769 /**
770 * Set the given class as the {@link CompressionCodec} for the map outputs.
771 *
772 * @param codecClass the {@link CompressionCodec} class that will compress
773 * the map outputs.
774 */
775 public void
776 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
777 setCompressMapOutput(true);
778 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass,
779 CompressionCodec.class);
780 }
781
782 /**
783 * Get the {@link CompressionCodec} for compressing the map outputs.
784 *
785 * @param defaultValue the {@link CompressionCodec} to return if not set
786 * @return the {@link CompressionCodec} class that should be used to compress the
787 * map outputs.
788 * @throws IllegalArgumentException if the class was specified, but not found
789 */
790 public Class<? extends CompressionCodec>
791 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
792 Class<? extends CompressionCodec> codecClass = defaultValue;
793 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
794 if (name != null) {
795 try {
796 codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
797 } catch (ClassNotFoundException e) {
798 throw new IllegalArgumentException("Compression codec " + name +
799 " was not found.", e);
800 }
801 }
802 return codecClass;
803 }
804
805 /**
806 * Get the key class for the map output data. If it is not set, use the
807 * (final) output key class. This allows the map output key class to be
808 * different than the final output key class.
809 *
810 * @return the map output key class.
811 */
812 public Class<?> getMapOutputKeyClass() {
813 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
814 if (retv == null) {
815 retv = getOutputKeyClass();
816 }
817 return retv;
818 }
819
820 /**
821 * Set the key class for the map output data. This allows the user to
822 * specify the map output key class to be different than the final output
823 * value class.
824 *
825 * @param theClass the map output key class.
826 */
827 public void setMapOutputKeyClass(Class<?> theClass) {
828 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
829 }
830
831 /**
832 * Get the value class for the map output data. If it is not set, use the
833 * (final) output value class This allows the map output value class to be
834 * different than the final output value class.
835 *
836 * @return the map output value class.
837 */
838 public Class<?> getMapOutputValueClass() {
839 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
840 Object.class);
841 if (retv == null) {
842 retv = getOutputValueClass();
843 }
844 return retv;
845 }
846
847 /**
848 * Set the value class for the map output data. This allows the user to
849 * specify the map output value class to be different than the final output
850 * value class.
851 *
852 * @param theClass the map output value class.
853 */
854 public void setMapOutputValueClass(Class<?> theClass) {
855 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
856 }
857
858 /**
859 * Get the key class for the job output data.
860 *
861 * @return the key class for the job output data.
862 */
863 public Class<?> getOutputKeyClass() {
864 return getClass(JobContext.OUTPUT_KEY_CLASS,
865 LongWritable.class, Object.class);
866 }
867
868 /**
869 * Set the key class for the job output data.
870 *
871 * @param theClass the key class for the job output data.
872 */
873 public void setOutputKeyClass(Class<?> theClass) {
874 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
875 }
876
877 /**
878 * Get the {@link RawComparator} comparator used to compare keys.
879 *
880 * @return the {@link RawComparator} comparator used to compare keys.
881 */
882 public RawComparator getOutputKeyComparator() {
883 Class<? extends RawComparator> theClass = getClass(
884 JobContext.KEY_COMPARATOR, null, RawComparator.class);
885 if (theClass != null)
886 return ReflectionUtils.newInstance(theClass, this);
887 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
888 }
889
890 /**
891 * Set the {@link RawComparator} comparator used to compare keys.
892 *
893 * @param theClass the {@link RawComparator} comparator used to
894 * compare keys.
895 * @see #setOutputValueGroupingComparator(Class)
896 */
897 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
898 setClass(JobContext.KEY_COMPARATOR,
899 theClass, RawComparator.class);
900 }
901
902 /**
903 * Set the {@link KeyFieldBasedComparator} options used to compare keys.
904 *
905 * @param keySpec the key specification of the form -k pos1[,pos2], where,
906 * pos is of the form f[.c][opts], where f is the number
907 * of the key field to use, and c is the number of the first character from
908 * the beginning of the field. Fields and character posns are numbered
909 * starting with 1; a character position of zero in pos2 indicates the
910 * field's last character. If '.c' is omitted from pos1, it defaults to 1
911 * (the beginning of the field); if omitted from pos2, it defaults to 0
912 * (the end of the field). opts are ordering options. The supported options
913 * are:
914 * -n, (Sort numerically)
915 * -r, (Reverse the result of comparison)
916 */
917 public void setKeyFieldComparatorOptions(String keySpec) {
918 setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
919 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
920 }
921
922 /**
923 * Get the {@link KeyFieldBasedComparator} options
924 */
925 public String getKeyFieldComparatorOption() {
926 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
927 }
928
929 /**
930 * Set the {@link KeyFieldBasedPartitioner} options used for
931 * {@link Partitioner}
932 *
933 * @param keySpec the key specification of the form -k pos1[,pos2], where,
934 * pos is of the form f[.c][opts], where f is the number
935 * of the key field to use, and c is the number of the first character from
936 * the beginning of the field. Fields and character posns are numbered
937 * starting with 1; a character position of zero in pos2 indicates the
938 * field's last character. If '.c' is omitted from pos1, it defaults to 1
939 * (the beginning of the field); if omitted from pos2, it defaults to 0
940 * (the end of the field).
941 */
942 public void setKeyFieldPartitionerOptions(String keySpec) {
943 setPartitionerClass(KeyFieldBasedPartitioner.class);
944 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
945 }
946
947 /**
948 * Get the {@link KeyFieldBasedPartitioner} options
949 */
950 public String getKeyFieldPartitionerOption() {
951 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
952 }
953
954 /**
955 * Get the user defined {@link WritableComparable} comparator for
956 * grouping keys of inputs to the combiner.
957 *
958 * @return comparator set by the user for grouping values.
959 * @see #setCombinerKeyGroupingComparator(Class) for details.
960 */
961 public RawComparator getCombinerKeyGroupingComparator() {
962 Class<? extends RawComparator> theClass = getClass(
963 JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
964 if (theClass == null) {
965 return getOutputKeyComparator();
966 }
967
968 return ReflectionUtils.newInstance(theClass, this);
969 }
970
971 /**
972 * Get the user defined {@link WritableComparable} comparator for
973 * grouping keys of inputs to the reduce.
974 *
975 * @return comparator set by the user for grouping values.
976 * @see #setOutputValueGroupingComparator(Class) for details.
977 */
978 public RawComparator getOutputValueGroupingComparator() {
979 Class<? extends RawComparator> theClass = getClass(
980 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
981 if (theClass == null) {
982 return getOutputKeyComparator();
983 }
984
985 return ReflectionUtils.newInstance(theClass, this);
986 }
987
988 /**
989 * Set the user defined {@link RawComparator} comparator for
990 * grouping keys in the input to the combiner.
991 * <p/>
992 * <p>This comparator should be provided if the equivalence rules for keys
993 * for sorting the intermediates are different from those for grouping keys
994 * before each call to
995 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
996 * <p/>
997 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
998 * in a single call to the reduce function if K1 and K2 compare as equal.</p>
999 * <p/>
1000 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1001 * how keys are sorted, this can be used in conjunction to simulate
1002 * <i>secondary sort on values</i>.</p>
1003 * <p/>
1004 * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1005 * <i>stable</i> in any sense. (In any case, with the order of available
1006 * map-outputs to the combiner being non-deterministic, it wouldn't make
1007 * that much sense.)</p>
1008 *
1009 * @param theClass the comparator class to be used for grouping keys for the
1010 * combiner. It should implement <code>RawComparator</code>.
1011 * @see #setOutputKeyComparatorClass(Class)
1012 */
1013 public void setCombinerKeyGroupingComparator(
1014 Class<? extends RawComparator> theClass) {
1015 setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1016 theClass, RawComparator.class);
1017 }
1018
1019 /**
1020 * Set the user defined {@link RawComparator} comparator for
1021 * grouping keys in the input to the reduce.
1022 *
1023 * <p>This comparator should be provided if the equivalence rules for keys
1024 * for sorting the intermediates are different from those for grouping keys
1025 * before each call to
1026 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1027 *
1028 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1029 * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1030 *
1031 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1032 * how keys are sorted, this can be used in conjunction to simulate
1033 * <i>secondary sort on values</i>.</p>
1034 *
1035 * <p><i>Note</i>: This is not a guarantee of the reduce sort being
1036 * <i>stable</i> in any sense. (In any case, with the order of available
1037 * map-outputs to the reduce being non-deterministic, it wouldn't make
1038 * that much sense.)</p>
1039 *
1040 * @param theClass the comparator class to be used for grouping keys.
1041 * It should implement <code>RawComparator</code>.
1042 * @see #setOutputKeyComparatorClass(Class)
1043 * @see #setCombinerKeyGroupingComparator(Class)
1044 */
1045 public void setOutputValueGroupingComparator(
1046 Class<? extends RawComparator> theClass) {
1047 setClass(JobContext.GROUP_COMPARATOR_CLASS,
1048 theClass, RawComparator.class);
1049 }
1050
1051 /**
1052 * Should the framework use the new context-object code for running
1053 * the mapper?
1054 * @return true, if the new api should be used
1055 */
1056 public boolean getUseNewMapper() {
1057 return getBoolean("mapred.mapper.new-api", false);
1058 }
1059 /**
1060 * Set whether the framework should use the new api for the mapper.
1061 * This is the default for jobs submitted with the new Job api.
1062 * @param flag true, if the new api should be used
1063 */
1064 public void setUseNewMapper(boolean flag) {
1065 setBoolean("mapred.mapper.new-api", flag);
1066 }
1067
1068 /**
1069 * Should the framework use the new context-object code for running
1070 * the reducer?
1071 * @return true, if the new api should be used
1072 */
1073 public boolean getUseNewReducer() {
1074 return getBoolean("mapred.reducer.new-api", false);
1075 }
1076 /**
1077 * Set whether the framework should use the new api for the reducer.
1078 * This is the default for jobs submitted with the new Job api.
1079 * @param flag true, if the new api should be used
1080 */
1081 public void setUseNewReducer(boolean flag) {
1082 setBoolean("mapred.reducer.new-api", flag);
1083 }
1084
1085 /**
1086 * Get the value class for job outputs.
1087 *
1088 * @return the value class for job outputs.
1089 */
1090 public Class<?> getOutputValueClass() {
1091 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1092 }
1093
1094 /**
1095 * Set the value class for job outputs.
1096 *
1097 * @param theClass the value class for job outputs.
1098 */
1099 public void setOutputValueClass(Class<?> theClass) {
1100 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1101 }
1102
1103 /**
1104 * Get the {@link Mapper} class for the job.
1105 *
1106 * @return the {@link Mapper} class for the job.
1107 */
1108 public Class<? extends Mapper> getMapperClass() {
1109 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1110 }
1111
1112 /**
1113 * Set the {@link Mapper} class for the job.
1114 *
1115 * @param theClass the {@link Mapper} class for the job.
1116 */
1117 public void setMapperClass(Class<? extends Mapper> theClass) {
1118 setClass("mapred.mapper.class", theClass, Mapper.class);
1119 }
1120
1121 /**
1122 * Get the {@link MapRunnable} class for the job.
1123 *
1124 * @return the {@link MapRunnable} class for the job.
1125 */
1126 public Class<? extends MapRunnable> getMapRunnerClass() {
1127 return getClass("mapred.map.runner.class",
1128 MapRunner.class, MapRunnable.class);
1129 }
1130
1131 /**
1132 * Expert: Set the {@link MapRunnable} class for the job.
1133 *
1134 * Typically used to exert greater control on {@link Mapper}s.
1135 *
1136 * @param theClass the {@link MapRunnable} class for the job.
1137 */
1138 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1139 setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1140 }
1141
1142 /**
1143 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs
1144 * to be sent to the {@link Reducer}s.
1145 *
1146 * @return the {@link Partitioner} used to partition map-outputs.
1147 */
1148 public Class<? extends Partitioner> getPartitionerClass() {
1149 return getClass("mapred.partitioner.class",
1150 HashPartitioner.class, Partitioner.class);
1151 }
1152
1153 /**
1154 * Set the {@link Partitioner} class used to partition
1155 * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1156 *
1157 * @param theClass the {@link Partitioner} used to partition map-outputs.
1158 */
1159 public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1160 setClass("mapred.partitioner.class", theClass, Partitioner.class);
1161 }
1162
1163 /**
1164 * Get the {@link Reducer} class for the job.
1165 *
1166 * @return the {@link Reducer} class for the job.
1167 */
1168 public Class<? extends Reducer> getReducerClass() {
1169 return getClass("mapred.reducer.class",
1170 IdentityReducer.class, Reducer.class);
1171 }
1172
1173 /**
1174 * Set the {@link Reducer} class for the job.
1175 *
1176 * @param theClass the {@link Reducer} class for the job.
1177 */
1178 public void setReducerClass(Class<? extends Reducer> theClass) {
1179 setClass("mapred.reducer.class", theClass, Reducer.class);
1180 }
1181
1182 /**
1183 * Get the user-defined <i>combiner</i> class used to combine map-outputs
1184 * before being sent to the reducers. Typically the combiner is same as the
1185 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1186 *
1187 * @return the user-defined combiner class used to combine map-outputs.
1188 */
1189 public Class<? extends Reducer> getCombinerClass() {
1190 return getClass("mapred.combiner.class", null, Reducer.class);
1191 }
1192
1193 /**
1194 * Set the user-defined <i>combiner</i> class used to combine map-outputs
1195 * before being sent to the reducers.
1196 *
1197 * <p>The combiner is an application-specified aggregation operation, which
1198 * can help cut down the amount of data transferred between the
1199 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1200 *
1201 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1202 * the mapper and reducer tasks. In general, the combiner is called as the
1203 * sort/merge result is written to disk. The combiner must:
1204 * <ul>
1205 * <li> be side-effect free</li>
1206 * <li> have the same input and output key types and the same input and
1207 * output value types</li>
1208 * </ul></p>
1209 *
1210 * <p>Typically the combiner is same as the <code>Reducer</code> for the
1211 * job i.e. {@link #setReducerClass(Class)}.</p>
1212 *
1213 * @param theClass the user-defined combiner class used to combine
1214 * map-outputs.
1215 */
1216 public void setCombinerClass(Class<? extends Reducer> theClass) {
1217 setClass("mapred.combiner.class", theClass, Reducer.class);
1218 }
1219
1220 /**
1221 * Should speculative execution be used for this job?
1222 * Defaults to <code>true</code>.
1223 *
1224 * @return <code>true</code> if speculative execution be used for this job,
1225 * <code>false</code> otherwise.
1226 */
1227 public boolean getSpeculativeExecution() {
1228 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1229 }
1230
1231 /**
1232 * Turn speculative execution on or off for this job.
1233 *
1234 * @param speculativeExecution <code>true</code> if speculative execution
1235 * should be turned on, else <code>false</code>.
1236 */
1237 public void setSpeculativeExecution(boolean speculativeExecution) {
1238 setMapSpeculativeExecution(speculativeExecution);
1239 setReduceSpeculativeExecution(speculativeExecution);
1240 }
1241
1242 /**
1243 * Should speculative execution be used for this job for map tasks?
1244 * Defaults to <code>true</code>.
1245 *
1246 * @return <code>true</code> if speculative execution be
1247 * used for this job for map tasks,
1248 * <code>false</code> otherwise.
1249 */
1250 public boolean getMapSpeculativeExecution() {
1251 return getBoolean(JobContext.MAP_SPECULATIVE, true);
1252 }
1253
1254 /**
1255 * Turn speculative execution on or off for this job for map tasks.
1256 *
1257 * @param speculativeExecution <code>true</code> if speculative execution
1258 * should be turned on for map tasks,
1259 * else <code>false</code>.
1260 */
1261 public void setMapSpeculativeExecution(boolean speculativeExecution) {
1262 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1263 }
1264
1265 /**
1266 * Should speculative execution be used for this job for reduce tasks?
1267 * Defaults to <code>true</code>.
1268 *
1269 * @return <code>true</code> if speculative execution be used
1270 * for reduce tasks for this job,
1271 * <code>false</code> otherwise.
1272 */
1273 public boolean getReduceSpeculativeExecution() {
1274 return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1275 }
1276
1277 /**
1278 * Turn speculative execution on or off for this job for reduce tasks.
1279 *
1280 * @param speculativeExecution <code>true</code> if speculative execution
1281 * should be turned on for reduce tasks,
1282 * else <code>false</code>.
1283 */
1284 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1285 setBoolean(JobContext.REDUCE_SPECULATIVE,
1286 speculativeExecution);
1287 }
1288
1289 /**
1290 * Get configured the number of reduce tasks for this job.
1291 * Defaults to <code>1</code>.
1292 *
1293 * @return the number of reduce tasks for this job.
1294 */
1295 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1296
1297 /**
1298 * Set the number of map tasks for this job.
1299 *
1300 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual
1301 * number of spawned map tasks depends on the number of {@link InputSplit}s
1302 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1303 *
1304 * A custom {@link InputFormat} is typically used to accurately control
1305 * the number of map tasks for the job.</p>
1306 *
1307 * <h4 id="NoOfMaps">How many maps?</h4>
1308 *
1309 * <p>The number of maps is usually driven by the total size of the inputs
1310 * i.e. total number of blocks of the input files.</p>
1311 *
1312 * <p>The right level of parallelism for maps seems to be around 10-100 maps
1313 * per-node, although it has been set up to 300 or so for very cpu-light map
1314 * tasks. Task setup takes awhile, so it is best if the maps take at least a
1315 * minute to execute.</p>
1316 *
1317 * <p>The default behavior of file-based {@link InputFormat}s is to split the
1318 * input into <i>logical</i> {@link InputSplit}s based on the total size, in
1319 * bytes, of input files. However, the {@link FileSystem} blocksize of the
1320 * input files is treated as an upper bound for input splits. A lower bound
1321 * on the split size can be set via
1322 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1323 * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1324 *
1325 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB,
1326 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is
1327 * used to set it even higher.</p>
1328 *
1329 * @param n the number of map tasks for this job.
1330 * @see InputFormat#getSplits(JobConf, int)
1331 * @see FileInputFormat
1332 * @see FileSystem#getDefaultBlockSize()
1333 * @see FileStatus#getBlockSize()
1334 */
1335 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1336
1337 /**
1338 * Get configured the number of reduce tasks for this job. Defaults to
1339 * <code>1</code>.
1340 *
1341 * @return the number of reduce tasks for this job.
1342 */
1343 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1344
1345 /**
1346 * Set the requisite number of reduce tasks for this job.
1347 *
1348 * <h4 id="NoOfReduces">How many reduces?</h4>
1349 *
1350 * <p>The right number of reduces seems to be <code>0.95</code> or
1351 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> *
1352 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1353 * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1354 * </p>
1355 *
1356 * <p>With <code>0.95</code> all of the reduces can launch immediately and
1357 * start transfering map outputs as the maps finish. With <code>1.75</code>
1358 * the faster nodes will finish their first round of reduces and launch a
1359 * second wave of reduces doing a much better job of load balancing.</p>
1360 *
1361 * <p>Increasing the number of reduces increases the framework overhead, but
1362 * increases load balancing and lowers the cost of failures.</p>
1363 *
1364 * <p>The scaling factors above are slightly less than whole numbers to
1365 * reserve a few reduce slots in the framework for speculative-tasks, failures
1366 * etc.</p>
1367 *
1368 * <h4 id="ReducerNone">Reducer NONE</h4>
1369 *
1370 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1371 *
1372 * <p>In this case the output of the map-tasks directly go to distributed
1373 * file-system, to the path set by
1374 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the
1375 * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1376 *
1377 * @param n the number of reduce tasks for this job.
1378 */
1379 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1380
1381 /**
1382 * Get the configured number of maximum attempts that will be made to run a
1383 * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1384 * property. If this property is not already set, the default is 4 attempts.
1385 *
1386 * @return the max number of attempts per map task.
1387 */
1388 public int getMaxMapAttempts() {
1389 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1390 }
1391
1392 /**
1393 * Expert: Set the number of maximum attempts that will be made to run a
1394 * map task.
1395 *
1396 * @param n the number of attempts per map task.
1397 */
1398 public void setMaxMapAttempts(int n) {
1399 setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1400 }
1401
1402 /**
1403 * Get the configured number of maximum attempts that will be made to run a
1404 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1405 * property. If this property is not already set, the default is 4 attempts.
1406 *
1407 * @return the max number of attempts per reduce task.
1408 */
1409 public int getMaxReduceAttempts() {
1410 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1411 }
1412 /**
1413 * Expert: Set the number of maximum attempts that will be made to run a
1414 * reduce task.
1415 *
1416 * @param n the number of attempts per reduce task.
1417 */
1418 public void setMaxReduceAttempts(int n) {
1419 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1420 }
1421
1422 /**
1423 * Get the user-specified job name. This is only used to identify the
1424 * job to the user.
1425 *
1426 * @return the job's name, defaulting to "".
1427 */
1428 public String getJobName() {
1429 return get(JobContext.JOB_NAME, "");
1430 }
1431
1432 /**
1433 * Set the user-specified job name.
1434 *
1435 * @param name the job's new name.
1436 */
1437 public void setJobName(String name) {
1438 set(JobContext.JOB_NAME, name);
1439 }
1440
1441 /**
1442 * Get the user-specified session identifier. The default is the empty string.
1443 *
1444 * The session identifier is used to tag metric data that is reported to some
1445 * performance metrics system via the org.apache.hadoop.metrics API. The
1446 * session identifier is intended, in particular, for use by Hadoop-On-Demand
1447 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently.
1448 * HOD will set the session identifier by modifying the mapred-site.xml file
1449 * before starting the cluster.
1450 *
1451 * When not running under HOD, this identifer is expected to remain set to
1452 * the empty string.
1453 *
1454 * @return the session identifier, defaulting to "".
1455 */
1456 @Deprecated
1457 public String getSessionId() {
1458 return get("session.id", "");
1459 }
1460
1461 /**
1462 * Set the user-specified session identifier.
1463 *
1464 * @param sessionId the new session id.
1465 */
1466 @Deprecated
1467 public void setSessionId(String sessionId) {
1468 set("session.id", sessionId);
1469 }
1470
1471 /**
1472 * Set the maximum no. of failures of a given job per tasktracker.
1473 * If the no. of task failures exceeds <code>noFailures</code>, the
1474 * tasktracker is <i>blacklisted</i> for this job.
1475 *
1476 * @param noFailures maximum no. of failures of a given job per tasktracker.
1477 */
1478 public void setMaxTaskFailuresPerTracker(int noFailures) {
1479 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1480 }
1481
1482 /**
1483 * Expert: Get the maximum no. of failures of a given job per tasktracker.
1484 * If the no. of task failures exceeds this, the tasktracker is
1485 * <i>blacklisted</i> for this job.
1486 *
1487 * @return the maximum no. of failures of a given job per tasktracker.
1488 */
1489 public int getMaxTaskFailuresPerTracker() {
1490 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1491 }
1492
1493 /**
1494 * Get the maximum percentage of map tasks that can fail without
1495 * the job being aborted.
1496 *
1497 * Each map task is executed a minimum of {@link #getMaxMapAttempts()}
1498 * attempts before being declared as <i>failed</i>.
1499 *
1500 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1501 * the job being declared as {@link JobStatus#FAILED}.
1502 *
1503 * @return the maximum percentage of map tasks that can fail without
1504 * the job being aborted.
1505 */
1506 public int getMaxMapTaskFailuresPercent() {
1507 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1508 }
1509
1510 /**
1511 * Expert: Set the maximum percentage of map tasks that can fail without the
1512 * job being aborted.
1513 *
1514 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts
1515 * before being declared as <i>failed</i>.
1516 *
1517 * @param percent the maximum percentage of map tasks that can fail without
1518 * the job being aborted.
1519 */
1520 public void setMaxMapTaskFailuresPercent(int percent) {
1521 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1522 }
1523
1524 /**
1525 * Get the maximum percentage of reduce tasks that can fail without
1526 * the job being aborted.
1527 *
1528 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1529 * attempts before being declared as <i>failed</i>.
1530 *
1531 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results
1532 * in the job being declared as {@link JobStatus#FAILED}.
1533 *
1534 * @return the maximum percentage of reduce tasks that can fail without
1535 * the job being aborted.
1536 */
1537 public int getMaxReduceTaskFailuresPercent() {
1538 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1539 }
1540
1541 /**
1542 * Set the maximum percentage of reduce tasks that can fail without the job
1543 * being aborted.
1544 *
1545 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1546 * attempts before being declared as <i>failed</i>.
1547 *
1548 * @param percent the maximum percentage of reduce tasks that can fail without
1549 * the job being aborted.
1550 */
1551 public void setMaxReduceTaskFailuresPercent(int percent) {
1552 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1553 }
1554
1555 /**
1556 * Set {@link JobPriority} for this job.
1557 *
1558 * @param prio the {@link JobPriority} for this job.
1559 */
1560 public void setJobPriority(JobPriority prio) {
1561 set(JobContext.PRIORITY, prio.toString());
1562 }
1563
1564 /**
1565 * Get the {@link JobPriority} for this job.
1566 *
1567 * @return the {@link JobPriority} for this job.
1568 */
1569 public JobPriority getJobPriority() {
1570 String prio = get(JobContext.PRIORITY);
1571 if(prio == null) {
1572 return JobPriority.NORMAL;
1573 }
1574
1575 return JobPriority.valueOf(prio);
1576 }
1577
1578 /**
1579 * Set JobSubmitHostName for this job.
1580 *
1581 * @param hostname the JobSubmitHostName for this job.
1582 */
1583 void setJobSubmitHostName(String hostname) {
1584 set(MRJobConfig.JOB_SUBMITHOST, hostname);
1585 }
1586
1587 /**
1588 * Get the JobSubmitHostName for this job.
1589 *
1590 * @return the JobSubmitHostName for this job.
1591 */
1592 String getJobSubmitHostName() {
1593 String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1594
1595 return hostname;
1596 }
1597
1598 /**
1599 * Set JobSubmitHostAddress for this job.
1600 *
1601 * @param hostadd the JobSubmitHostAddress for this job.
1602 */
1603 void setJobSubmitHostAddress(String hostadd) {
1604 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1605 }
1606
1607 /**
1608 * Get JobSubmitHostAddress for this job.
1609 *
1610 * @return JobSubmitHostAddress for this job.
1611 */
1612 String getJobSubmitHostAddress() {
1613 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1614
1615 return hostadd;
1616 }
1617
1618 /**
1619 * Get whether the task profiling is enabled.
1620 * @return true if some tasks will be profiled
1621 */
1622 public boolean getProfileEnabled() {
1623 return getBoolean(JobContext.TASK_PROFILE, false);
1624 }
1625
1626 /**
1627 * Set whether the system should collect profiler information for some of
1628 * the tasks in this job? The information is stored in the user log
1629 * directory.
1630 * @param newValue true means it should be gathered
1631 */
1632 public void setProfileEnabled(boolean newValue) {
1633 setBoolean(JobContext.TASK_PROFILE, newValue);
1634 }
1635
1636 /**
1637 * Get the profiler configuration arguments.
1638 *
1639 * The default value for this property is
1640 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1641 *
1642 * @return the parameters to pass to the task child to configure profiling
1643 */
1644 public String getProfileParams() {
1645 return get(JobContext.TASK_PROFILE_PARAMS,
1646 MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
1647 }
1648
1649 /**
1650 * Set the profiler configuration arguments. If the string contains a '%s' it
1651 * will be replaced with the name of the profiling output file when the task
1652 * runs.
1653 *
1654 * This value is passed to the task child JVM on the command line.
1655 *
1656 * @param value the configuration string
1657 */
1658 public void setProfileParams(String value) {
1659 set(JobContext.TASK_PROFILE_PARAMS, value);
1660 }
1661
1662 /**
1663 * Get the range of maps or reduces to profile.
1664 * @param isMap is the task a map?
1665 * @return the task ranges
1666 */
1667 public IntegerRanges getProfileTaskRange(boolean isMap) {
1668 return getRange((isMap ? JobContext.NUM_MAP_PROFILES :
1669 JobContext.NUM_REDUCE_PROFILES), "0-2");
1670 }
1671
1672 /**
1673 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1674 * must also be called.
1675 * @param newValue a set of integer ranges of the map ids
1676 */
1677 public void setProfileTaskRange(boolean isMap, String newValue) {
1678 // parse the value to make sure it is legal
1679 new Configuration.IntegerRanges(newValue);
1680 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES),
1681 newValue);
1682 }
1683
1684 /**
1685 * Set the debug script to run when the map tasks fail.
1686 *
1687 * <p>The debug script can aid debugging of failed map tasks. The script is
1688 * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1689 *
1690 * <p>The debug command, run on the node where the map failed, is:</p>
1691 * <p><pre><blockquote>
1692 * $script $stdout $stderr $syslog $jobconf.
1693 * </blockquote></pre></p>
1694 *
1695 * <p> The script file is distributed through {@link DistributedCache}
1696 * APIs. The script needs to be symlinked. </p>
1697 *
1698 * <p>Here is an example on how to submit a script
1699 * <p><blockquote><pre>
1700 * job.setMapDebugScript("./myscript");
1701 * DistributedCache.createSymlink(job);
1702 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1703 * </pre></blockquote></p>
1704 *
1705 * @param mDbgScript the script name
1706 */
1707 public void setMapDebugScript(String mDbgScript) {
1708 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1709 }
1710
1711 /**
1712 * Get the map task's debug script.
1713 *
1714 * @return the debug Script for the mapred job for failed map tasks.
1715 * @see #setMapDebugScript(String)
1716 */
1717 public String getMapDebugScript() {
1718 return get(JobContext.MAP_DEBUG_SCRIPT);
1719 }
1720
1721 /**
1722 * Set the debug script to run when the reduce tasks fail.
1723 *
1724 * <p>The debug script can aid debugging of failed reduce tasks. The script
1725 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1726 *
1727 * <p>The debug command, run on the node where the map failed, is:</p>
1728 * <p><pre><blockquote>
1729 * $script $stdout $stderr $syslog $jobconf.
1730 * </blockquote></pre></p>
1731 *
1732 * <p> The script file is distributed through {@link DistributedCache}
1733 * APIs. The script file needs to be symlinked </p>
1734 *
1735 * <p>Here is an example on how to submit a script
1736 * <p><blockquote><pre>
1737 * job.setReduceDebugScript("./myscript");
1738 * DistributedCache.createSymlink(job);
1739 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1740 * </pre></blockquote></p>
1741 *
1742 * @param rDbgScript the script name
1743 */
1744 public void setReduceDebugScript(String rDbgScript) {
1745 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1746 }
1747
1748 /**
1749 * Get the reduce task's debug Script
1750 *
1751 * @return the debug script for the mapred job for failed reduce tasks.
1752 * @see #setReduceDebugScript(String)
1753 */
1754 public String getReduceDebugScript() {
1755 return get(JobContext.REDUCE_DEBUG_SCRIPT);
1756 }
1757
1758 /**
1759 * Get the uri to be invoked in-order to send a notification after the job
1760 * has completed (success/failure).
1761 *
1762 * @return the job end notification uri, <code>null</code> if it hasn't
1763 * been set.
1764 * @see #setJobEndNotificationURI(String)
1765 */
1766 public String getJobEndNotificationURI() {
1767 return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1768 }
1769
1770 /**
1771 * Set the uri to be invoked in-order to send a notification after the job
1772 * has completed (success/failure).
1773 *
1774 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and
1775 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's
1776 * identifier and completion-status respectively.</p>
1777 *
1778 * <p>This is typically used by application-writers to implement chaining of
1779 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1780 *
1781 * @param uri the job end notification uri
1782 * @see JobStatus
1783 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1784 * JobCompletionAndChaining">Job Completion and Chaining</a>
1785 */
1786 public void setJobEndNotificationURI(String uri) {
1787 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1788 }
1789
1790 /**
1791 * Get job-specific shared directory for use as scratch space
1792 *
1793 * <p>
1794 * When a job starts, a shared directory is created at location
1795 * <code>
1796 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1797 * This directory is exposed to the users through
1798 * <code>mapreduce.job.local.dir </code>.
1799 * So, the tasks can use this space
1800 * as scratch space and share files among them. </p>
1801 * This value is available as System property also.
1802 *
1803 * @return The localized job specific shared directory
1804 */
1805 public String getJobLocalDir() {
1806 return get(JobContext.JOB_LOCAL_DIR);
1807 }
1808
1809 /**
1810 * Get memory required to run a map task of the job, in MB.
1811 *
1812 * If a value is specified in the configuration, it is returned.
1813 * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}.
1814 * <p/>
1815 * For backward compatibility, if the job configuration sets the
1816 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1817 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1818 * after converting it from bytes to MB.
1819 * @return memory required to run a map task of the job, in MB,
1820 */
1821 public long getMemoryForMapTask() {
1822 long value = getDeprecatedMemoryValue();
1823 if (value < 0) {
1824 return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1825 JobContext.DEFAULT_MAP_MEMORY_MB);
1826 }
1827 return value;
1828 }
1829
1830 public void setMemoryForMapTask(long mem) {
1831 setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1832 // In case that M/R 1.x applications use the old property name
1833 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1834 }
1835
1836 /**
1837 * Get memory required to run a reduce task of the job, in MB.
1838 *
1839 * If a value is specified in the configuration, it is returned.
1840 * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}.
1841 * <p/>
1842 * For backward compatibility, if the job configuration sets the
1843 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1844 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1845 * after converting it from bytes to MB.
1846 * @return memory required to run a reduce task of the job, in MB.
1847 */
1848 public long getMemoryForReduceTask() {
1849 long value = getDeprecatedMemoryValue();
1850 if (value < 0) {
1851 return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1852 JobContext.DEFAULT_REDUCE_MEMORY_MB);
1853 }
1854 return value;
1855 }
1856
1857 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1858 // converted into MBs.
1859 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1860 // value.
1861 private long getDeprecatedMemoryValue() {
1862 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1863 DISABLED_MEMORY_LIMIT);
1864 if (oldValue > 0) {
1865 oldValue /= (1024*1024);
1866 }
1867 return oldValue;
1868 }
1869
1870 public void setMemoryForReduceTask(long mem) {
1871 setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1872 // In case that M/R 1.x applications use the old property name
1873 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1874 }
1875
1876 /**
1877 * Return the name of the queue to which this job is submitted.
1878 * Defaults to 'default'.
1879 *
1880 * @return name of the queue
1881 */
1882 public String getQueueName() {
1883 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1884 }
1885
1886 /**
1887 * Set the name of the queue to which this job should be submitted.
1888 *
1889 * @param queueName Name of the queue
1890 */
1891 public void setQueueName(String queueName) {
1892 set(JobContext.QUEUE_NAME, queueName);
1893 }
1894
1895 /**
1896 * Normalize the negative values in configuration
1897 *
1898 * @param val
1899 * @return normalized value
1900 */
1901 public static long normalizeMemoryConfigValue(long val) {
1902 if (val < 0) {
1903 val = DISABLED_MEMORY_LIMIT;
1904 }
1905 return val;
1906 }
1907
1908 /**
1909 * Find a jar that contains a class of the same name, if any.
1910 * It will return a jar file, even if that is not the first thing
1911 * on the class path that has a class with the same name.
1912 *
1913 * @param my_class the class to find.
1914 * @return a jar file that contains the class, or null.
1915 * @throws IOException
1916 */
1917 public static String findContainingJar(Class my_class) {
1918 return ClassUtil.findContainingJar(my_class);
1919 }
1920
1921 /**
1922 * Get the memory required to run a task of this job, in bytes. See
1923 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1924 * <p/>
1925 * This method is deprecated. Now, different memory limits can be
1926 * set for map and reduce tasks of a job, in MB.
1927 * <p/>
1928 * For backward compatibility, if the job configuration sets the
1929 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned.
1930 * Otherwise, this method will return the larger of the values returned by
1931 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1932 * after converting them into bytes.
1933 *
1934 * @return Memory required to run a task of this job, in bytes.
1935 * @see #setMaxVirtualMemoryForTask(long)
1936 * @deprecated Use {@link #getMemoryForMapTask()} and
1937 * {@link #getMemoryForReduceTask()}
1938 */
1939 @Deprecated
1940 public long getMaxVirtualMemoryForTask() {
1941 LOG.warn(
1942 "getMaxVirtualMemoryForTask() is deprecated. " +
1943 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1944
1945 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1946 Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024);
1947 return value;
1948 }
1949
1950 /**
1951 * Set the maximum amount of memory any task of this job can use. See
1952 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1953 * <p/>
1954 * mapred.task.maxvmem is split into
1955 * mapreduce.map.memory.mb
1956 * and mapreduce.map.memory.mb,mapred
1957 * each of the new key are set
1958 * as mapred.task.maxvmem / 1024
1959 * as new values are in MB
1960 *
1961 * @param vmem Maximum amount of virtual memory in bytes any task of this job
1962 * can use.
1963 * @see #getMaxVirtualMemoryForTask()
1964 * @deprecated
1965 * Use {@link #setMemoryForMapTask(long mem)} and
1966 * Use {@link #setMemoryForReduceTask(long mem)}
1967 */
1968 @Deprecated
1969 public void setMaxVirtualMemoryForTask(long vmem) {
1970 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
1971 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
1972 if (vmem < 0) {
1973 throw new IllegalArgumentException("Task memory allocation may not be < 0");
1974 }
1975
1976 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
1977 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
1978 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
1979 }else{
1980 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
1981 }
1982 }
1983
1984 /**
1985 * @deprecated this variable is deprecated and nolonger in use.
1986 */
1987 @Deprecated
1988 public long getMaxPhysicalMemoryForTask() {
1989 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
1990 + " Refer to the APIs getMemoryForMapTask() and"
1991 + " getMemoryForReduceTask() for details.");
1992 return -1;
1993 }
1994
1995 /*
1996 * @deprecated this
1997 */
1998 @Deprecated
1999 public void setMaxPhysicalMemoryForTask(long mem) {
2000 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2001 + " The value set is ignored. Refer to "
2002 + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2003 }
2004
2005 static String deprecatedString(String key) {
2006 return "The variable " + key + " is no longer used.";
2007 }
2008
2009 private void checkAndWarnDeprecation() {
2010 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2011 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2012 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2013 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2014 }
2015 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2016 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2017 }
2018 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2019 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2020 }
2021 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2022 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2023 }
2024 }
2025
2026
2027 }
2028