001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.applications.distributedshell;
020
021 import java.io.IOException;
022 import java.nio.ByteBuffer;
023 import java.util.ArrayList;
024 import java.util.HashMap;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Vector;
028
029 import org.apache.commons.cli.CommandLine;
030 import org.apache.commons.cli.GnuParser;
031 import org.apache.commons.cli.HelpFormatter;
032 import org.apache.commons.cli.Option;
033 import org.apache.commons.cli.Options;
034 import org.apache.commons.cli.ParseException;
035 import org.apache.commons.io.IOUtils;
036 import org.apache.commons.lang.StringUtils;
037 import org.apache.commons.logging.Log;
038 import org.apache.commons.logging.LogFactory;
039 import org.apache.hadoop.classification.InterfaceAudience;
040 import org.apache.hadoop.classification.InterfaceStability;
041 import org.apache.hadoop.conf.Configuration;
042 import org.apache.hadoop.fs.FSDataOutputStream;
043 import org.apache.hadoop.fs.FileStatus;
044 import org.apache.hadoop.fs.FileSystem;
045 import org.apache.hadoop.fs.Path;
046 import org.apache.hadoop.fs.permission.FsPermission;
047 import org.apache.hadoop.io.DataOutputBuffer;
048 import org.apache.hadoop.security.Credentials;
049 import org.apache.hadoop.security.UserGroupInformation;
050 import org.apache.hadoop.security.token.Token;
051 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
052 import org.apache.hadoop.yarn.api.ApplicationConstants;
053 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
054 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
055 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
056 import org.apache.hadoop.yarn.api.records.ApplicationId;
057 import org.apache.hadoop.yarn.api.records.ApplicationReport;
058 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
059 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
060 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
061 import org.apache.hadoop.yarn.api.records.LocalResource;
062 import org.apache.hadoop.yarn.api.records.LocalResourceType;
063 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
064 import org.apache.hadoop.yarn.api.records.NodeReport;
065 import org.apache.hadoop.yarn.api.records.NodeState;
066 import org.apache.hadoop.yarn.api.records.Priority;
067 import org.apache.hadoop.yarn.api.records.QueueACL;
068 import org.apache.hadoop.yarn.api.records.QueueInfo;
069 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
070 import org.apache.hadoop.yarn.api.records.Resource;
071 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
072 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
073 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
074 import org.apache.hadoop.yarn.client.api.TimelineClient;
075 import org.apache.hadoop.yarn.client.api.YarnClient;
076 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
077 import org.apache.hadoop.yarn.conf.YarnConfiguration;
078 import org.apache.hadoop.yarn.exceptions.YarnException;
079 import org.apache.hadoop.yarn.util.ConverterUtils;
080 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
081
082 /**
083 * Client for Distributed Shell application submission to YARN.
084 *
085 * <p> The distributed shell client allows an application master to be launched that in turn would run
086 * the provided shell command on a set of containers. </p>
087 *
088 * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
089 *
090 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code>
091 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol}
092 * provides a way for the client to get access to cluster information and to request for a
093 * new {@link ApplicationId}. <p>
094 *
095 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}.
096 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId}
097 * and application name, the priority assigned to the application and the queue
098 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
099 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which
100 * the {@link ApplicationMaster} is launched. </p>
101 *
102 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the
103 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available
104 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the
105 * {@link ApplicationMaster}. <p>
106 *
107 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the
108 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code>
109 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client
110 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
111 *
112 */
113 @InterfaceAudience.Public
114 @InterfaceStability.Unstable
115 public class Client {
116
117 private static final Log LOG = LogFactory.getLog(Client.class);
118
119 // Configuration
120 private Configuration conf;
121 private YarnClient yarnClient;
122 // Application master specific info to register a new Application with RM/ASM
123 private String appName = "";
124 // App master priority
125 private int amPriority = 0;
126 // Queue for App master
127 private String amQueue = "";
128 // Amt. of memory resource to request for to run the App Master
129 private int amMemory = 10;
130 // Amt. of virtual core resource to request for to run the App Master
131 private int amVCores = 1;
132
133 // Application master jar file
134 private String appMasterJar = "";
135 // Main class to invoke application master
136 private final String appMasterMainClass;
137
138 // Shell command to be executed
139 private String shellCommand = "";
140 // Location of shell script
141 private String shellScriptPath = "";
142 // Args to be passed to the shell command
143 private String[] shellArgs = new String[] {};
144 // Env variables to be setup for the shell command
145 private Map<String, String> shellEnv = new HashMap<String, String>();
146 // Shell Command Container priority
147 private int shellCmdPriority = 0;
148
149 // Amt of memory to request for container in which shell script will be executed
150 private int containerMemory = 10;
151 // Amt. of virtual cores to request for container in which shell script will be executed
152 private int containerVirtualCores = 1;
153 // No. of containers in which the shell script needs to be executed
154 private int numContainers = 1;
155 private String nodeLabelExpression = null;
156
157 // log4j.properties file
158 // if available, add to local resources and set into classpath
159 private String log4jPropFile = "";
160
161 // Start time for client
162 private final long clientStartTime = System.currentTimeMillis();
163 // Timeout threshold for client. Kill app after time interval expires.
164 private long clientTimeout = 600000;
165
166 // flag to indicate whether to keep containers across application attempts.
167 private boolean keepContainers = false;
168
169 private long attemptFailuresValidityInterval = -1;
170
171 // Debug flag
172 boolean debugFlag = false;
173
174 // Timeline domain ID
175 private String domainId = null;
176
177 // Flag to indicate whether to create the domain of the given ID
178 private boolean toCreateDomain = false;
179
180 // Timeline domain reader access control
181 private String viewACLs = null;
182
183 // Timeline domain writer access control
184 private String modifyACLs = null;
185
186 // Command line options
187 private Options opts;
188
189 private static final String shellCommandPath = "shellCommands";
190 private static final String shellArgsPath = "shellArgs";
191 private static final String appMasterJarPath = "AppMaster.jar";
192 // Hardcoded path to custom log_properties
193 private static final String log4jPath = "log4j.properties";
194
195 public static final String SCRIPT_PATH = "ExecScript";
196
197 /**
198 * @param args Command line arguments
199 */
200 public static void main(String[] args) {
201 boolean result = false;
202 try {
203 Client client = new Client();
204 LOG.info("Initializing Client");
205 try {
206 boolean doRun = client.init(args);
207 if (!doRun) {
208 System.exit(0);
209 }
210 } catch (IllegalArgumentException e) {
211 System.err.println(e.getLocalizedMessage());
212 client.printUsage();
213 System.exit(-1);
214 }
215 result = client.run();
216 } catch (Throwable t) {
217 LOG.fatal("Error running Client", t);
218 System.exit(1);
219 }
220 if (result) {
221 LOG.info("Application completed successfully");
222 System.exit(0);
223 }
224 LOG.error("Application failed to complete successfully");
225 System.exit(2);
226 }
227
228 /**
229 */
230 public Client(Configuration conf) throws Exception {
231 this(
232 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
233 conf);
234 }
235
236 Client(String appMasterMainClass, Configuration conf) {
237 this.conf = conf;
238 this.appMasterMainClass = appMasterMainClass;
239 yarnClient = YarnClient.createYarnClient();
240 yarnClient.init(conf);
241 opts = new Options();
242 opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
243 opts.addOption("priority", true, "Application Priority. Default 0");
244 opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
245 opts.addOption("timeout", true, "Application timeout in milliseconds");
246 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
247 opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
248 opts.addOption("jar", true, "Jar file containing the application master");
249 opts.addOption("shell_command", true, "Shell command to be executed by " +
250 "the Application Master. Can only specify either --shell_command " +
251 "or --shell_script");
252 opts.addOption("shell_script", true, "Location of the shell script to be " +
253 "executed. Can only specify either --shell_command or --shell_script");
254 opts.addOption("shell_args", true, "Command line args for the shell script." +
255 "Multiple args can be separated by empty space.");
256 opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
257 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
258 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
259 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
260 opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
261 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
262 opts.addOption("log_properties", true, "log4j.properties file");
263 opts.addOption("keep_containers_across_application_attempts", false,
264 "Flag to indicate whether to keep containers across application attempts." +
265 " If the flag is true, running containers will not be killed when" +
266 " application attempt fails and these containers will be retrieved by" +
267 " the new application attempt ");
268 opts.addOption("attempt_failures_validity_interval", true,
269 "when attempt_failures_validity_interval in milliseconds is set to > 0," +
270 "the failure number will not take failures which happen out of " +
271 "the validityInterval into failure count. " +
272 "If failure count reaches to maxAppAttempts, " +
273 "the application will be failed.");
274 opts.addOption("debug", false, "Dump out debug information");
275 opts.addOption("domain", true, "ID of the timeline domain where the "
276 + "timeline entities will be put");
277 opts.addOption("view_acls", true, "Users and groups that allowed to "
278 + "view the timeline entities in the given domain");
279 opts.addOption("modify_acls", true, "Users and groups that allowed to "
280 + "modify the timeline entities in the given domain");
281 opts.addOption("create", false, "Flag to indicate whether to create the "
282 + "domain specified with -domain.");
283 opts.addOption("help", false, "Print usage");
284 opts.addOption("node_label_expression", true,
285 "Node label expression to determine the nodes"
286 + " where all the containers of this application"
287 + " will be allocated, \"\" means containers"
288 + " can be allocated anywhere, if you don't specify the option,"
289 + " default node_label_expression of queue will be used.");
290 }
291
292 /**
293 */
294 public Client() throws Exception {
295 this(new YarnConfiguration());
296 }
297
298 /**
299 * Helper function to print out usage
300 */
301 private void printUsage() {
302 new HelpFormatter().printHelp("Client", opts);
303 }
304
305 /**
306 * Parse command line options
307 * @param args Parsed command line options
308 * @return Whether the init was successful to run the client
309 * @throws ParseException
310 */
311 public boolean init(String[] args) throws ParseException {
312
313 CommandLine cliParser = new GnuParser().parse(opts, args);
314
315 if (args.length == 0) {
316 throw new IllegalArgumentException("No args specified for client to initialize");
317 }
318
319 if (cliParser.hasOption("log_properties")) {
320 String log4jPath = cliParser.getOptionValue("log_properties");
321 try {
322 Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath);
323 } catch (Exception e) {
324 LOG.warn("Can not set up custom log4j properties. " + e);
325 }
326 }
327
328 if (cliParser.hasOption("help")) {
329 printUsage();
330 return false;
331 }
332
333 if (cliParser.hasOption("debug")) {
334 debugFlag = true;
335
336 }
337
338 if (cliParser.hasOption("keep_containers_across_application_attempts")) {
339 LOG.info("keep_containers_across_application_attempts");
340 keepContainers = true;
341 }
342
343 appName = cliParser.getOptionValue("appname", "DistributedShell");
344 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
345 amQueue = cliParser.getOptionValue("queue", "default");
346 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
347 amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
348
349 if (amMemory < 0) {
350 throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
351 + " Specified memory=" + amMemory);
352 }
353 if (amVCores < 0) {
354 throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
355 + " Specified virtual cores=" + amVCores);
356 }
357
358 if (!cliParser.hasOption("jar")) {
359 throw new IllegalArgumentException("No jar file specified for application master");
360 }
361
362 appMasterJar = cliParser.getOptionValue("jar");
363
364 if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) {
365 throw new IllegalArgumentException(
366 "No shell command or shell script specified to be executed by application master");
367 } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) {
368 throw new IllegalArgumentException("Can not specify shell_command option " +
369 "and shell_script option at the same time");
370 } else if (cliParser.hasOption("shell_command")) {
371 shellCommand = cliParser.getOptionValue("shell_command");
372 } else {
373 shellScriptPath = cliParser.getOptionValue("shell_script");
374 }
375 if (cliParser.hasOption("shell_args")) {
376 shellArgs = cliParser.getOptionValues("shell_args");
377 }
378 if (cliParser.hasOption("shell_env")) {
379 String envs[] = cliParser.getOptionValues("shell_env");
380 for (String env : envs) {
381 env = env.trim();
382 int index = env.indexOf('=');
383 if (index == -1) {
384 shellEnv.put(env, "");
385 continue;
386 }
387 String key = env.substring(0, index);
388 String val = "";
389 if (index < (env.length()-1)) {
390 val = env.substring(index+1);
391 }
392 shellEnv.put(key, val);
393 }
394 }
395 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
396
397 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
398 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
399 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
400
401
402 if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
403 throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
404 + " exiting."
405 + " Specified containerMemory=" + containerMemory
406 + ", containerVirtualCores=" + containerVirtualCores
407 + ", numContainer=" + numContainers);
408 }
409
410 nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
411
412 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
413
414 attemptFailuresValidityInterval =
415 Long.parseLong(cliParser.getOptionValue(
416 "attempt_failures_validity_interval", "-1"));
417
418 log4jPropFile = cliParser.getOptionValue("log_properties", "");
419
420 // Get timeline domain options
421 if (cliParser.hasOption("domain")) {
422 domainId = cliParser.getOptionValue("domain");
423 toCreateDomain = cliParser.hasOption("create");
424 if (cliParser.hasOption("view_acls")) {
425 viewACLs = cliParser.getOptionValue("view_acls");
426 }
427 if (cliParser.hasOption("modify_acls")) {
428 modifyACLs = cliParser.getOptionValue("modify_acls");
429 }
430 }
431
432 return true;
433 }
434
435 /**
436 * Main run function for the client
437 * @return true if application completed successfully
438 * @throws IOException
439 * @throws YarnException
440 */
441 public boolean run() throws IOException, YarnException {
442
443 LOG.info("Running Client");
444 yarnClient.start();
445
446 YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
447 LOG.info("Got Cluster metric info from ASM"
448 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
449
450 List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
451 NodeState.RUNNING);
452 LOG.info("Got Cluster node info from ASM");
453 for (NodeReport node : clusterNodeReports) {
454 LOG.info("Got node report from ASM for"
455 + ", nodeId=" + node.getNodeId()
456 + ", nodeAddress" + node.getHttpAddress()
457 + ", nodeRackName" + node.getRackName()
458 + ", nodeNumContainers" + node.getNumContainers());
459 }
460
461 QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
462 LOG.info("Queue info"
463 + ", queueName=" + queueInfo.getQueueName()
464 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
465 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
466 + ", queueApplicationCount=" + queueInfo.getApplications().size()
467 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
468
469 List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
470 for (QueueUserACLInfo aclInfo : listAclInfo) {
471 for (QueueACL userAcl : aclInfo.getUserAcls()) {
472 LOG.info("User ACL Info for Queue"
473 + ", queueName=" + aclInfo.getQueueName()
474 + ", userAcl=" + userAcl.name());
475 }
476 }
477
478 if (domainId != null && domainId.length() > 0 && toCreateDomain) {
479 prepareTimelineDomain();
480 }
481
482 // Get a new application id
483 YarnClientApplication app = yarnClient.createApplication();
484 GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
485 // TODO get min/max resource capabilities from RM and change memory ask if needed
486 // If we do not have min/max, we may not be able to correctly request
487 // the required resources from the RM for the app master
488 // Memory ask has to be a multiple of min and less than max.
489 // Dump out information about cluster capability as seen by the resource manager
490 int maxMem = appResponse.getMaximumResourceCapability().getMemory();
491 LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
492
493 // A resource ask cannot exceed the max.
494 if (amMemory > maxMem) {
495 LOG.info("AM memory specified above max threshold of cluster. Using max value."
496 + ", specified=" + amMemory
497 + ", max=" + maxMem);
498 amMemory = maxMem;
499 }
500
501 int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
502 LOG.info("Max virtual cores capabililty of resources in this cluster " + maxVCores);
503
504 if (amVCores > maxVCores) {
505 LOG.info("AM virtual cores specified above max threshold of cluster. "
506 + "Using max value." + ", specified=" + amVCores
507 + ", max=" + maxVCores);
508 amVCores = maxVCores;
509 }
510
511 // set the application name
512 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
513 ApplicationId appId = appContext.getApplicationId();
514
515 appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
516 appContext.setApplicationName(appName);
517
518 if (attemptFailuresValidityInterval >= 0) {
519 appContext
520 .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
521 }
522
523 // set local resources for the application master
524 // local files or archives as needed
525 // In this scenario, the jar file for the application master is part of the local resources
526 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
527
528 LOG.info("Copy App Master jar from local filesystem and add to local environment");
529 // Copy the application master jar to the filesystem
530 // Create a local resource to point to the destination jar path
531 FileSystem fs = FileSystem.get(conf);
532 addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
533 localResources, null);
534
535 // Set the log4j properties if needed
536 if (!log4jPropFile.isEmpty()) {
537 addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
538 localResources, null);
539 }
540
541 // The shell script has to be made available on the final container(s)
542 // where it will be executed.
543 // To do this, we need to first copy into the filesystem that is visible
544 // to the yarn framework.
545 // We do not need to set this as a local resource for the application
546 // master as the application master does not need it.
547 String hdfsShellScriptLocation = "";
548 long hdfsShellScriptLen = 0;
549 long hdfsShellScriptTimestamp = 0;
550 if (!shellScriptPath.isEmpty()) {
551 Path shellSrc = new Path(shellScriptPath);
552 String shellPathSuffix =
553 appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
554 Path shellDst =
555 new Path(fs.getHomeDirectory(), shellPathSuffix);
556 fs.copyFromLocalFile(false, true, shellSrc, shellDst);
557 hdfsShellScriptLocation = shellDst.toUri().toString();
558 FileStatus shellFileStatus = fs.getFileStatus(shellDst);
559 hdfsShellScriptLen = shellFileStatus.getLen();
560 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
561 }
562
563 if (!shellCommand.isEmpty()) {
564 addToLocalResources(fs, null, shellCommandPath, appId.toString(),
565 localResources, shellCommand);
566 }
567
568 if (shellArgs.length > 0) {
569 addToLocalResources(fs, null, shellArgsPath, appId.toString(),
570 localResources, StringUtils.join(shellArgs, " "));
571 }
572
573 // Set the necessary security tokens as needed
574 //amContainer.setContainerTokens(containerToken);
575
576 // Set the env variables to be setup in the env where the application master will be run
577 LOG.info("Set the environment for the application master");
578 Map<String, String> env = new HashMap<String, String>();
579
580 // put location of shell script into env
581 // using the env info, the application master will create the correct local resource for the
582 // eventual containers that will be launched to execute the shell scripts
583 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
584 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
585 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
586 if (domainId != null && domainId.length() > 0) {
587 env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
588 }
589
590 // Add AppMaster.jar location to classpath
591 // At some point we should not be required to add
592 // the hadoop specific classpaths to the env.
593 // It should be provided out of the box.
594 // For now setting all required classpaths including
595 // the classpath to "." for the application jar
596 StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
597 .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
598 for (String c : conf.getStrings(
599 YarnConfiguration.YARN_APPLICATION_CLASSPATH,
600 YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
601 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
602 classPathEnv.append(c.trim());
603 }
604 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
605 "./log4j.properties");
606
607 // add the runtime classpath needed for tests to work
608 if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
609 classPathEnv.append(':');
610 classPathEnv.append(System.getProperty("java.class.path"));
611 }
612
613 env.put("CLASSPATH", classPathEnv.toString());
614
615 // Set the necessary command to execute the application master
616 Vector<CharSequence> vargs = new Vector<CharSequence>(30);
617
618 // Set java executable command
619 LOG.info("Setting up app master command");
620 vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
621 // Set Xmx based on am memory size
622 vargs.add("-Xmx" + amMemory + "m");
623 // Set class name
624 vargs.add(appMasterMainClass);
625 // Set params for Application Master
626 vargs.add("--container_memory " + String.valueOf(containerMemory));
627 vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
628 vargs.add("--num_containers " + String.valueOf(numContainers));
629 if (null != nodeLabelExpression) {
630 appContext.setNodeLabelExpression(nodeLabelExpression);
631 }
632 vargs.add("--priority " + String.valueOf(shellCmdPriority));
633
634 for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
635 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
636 }
637 if (debugFlag) {
638 vargs.add("--debug");
639 }
640
641 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
642 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
643
644 // Get final commmand
645 StringBuilder command = new StringBuilder();
646 for (CharSequence str : vargs) {
647 command.append(str).append(" ");
648 }
649
650 LOG.info("Completed setting up app master command " + command.toString());
651 List<String> commands = new ArrayList<String>();
652 commands.add(command.toString());
653
654 // Set up the container launch context for the application master
655 ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
656 localResources, env, commands, null, null, null);
657
658 // Set up resource type requirements
659 // For now, both memory and vcores are supported, so we set memory and
660 // vcores requirements
661 Resource capability = Resource.newInstance(amMemory, amVCores);
662 appContext.setResource(capability);
663
664 // Service data is a binary blob that can be passed to the application
665 // Not needed in this scenario
666 // amContainer.setServiceData(serviceData);
667
668 // Setup security tokens
669 if (UserGroupInformation.isSecurityEnabled()) {
670 // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
671 Credentials credentials = new Credentials();
672 String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
673 if (tokenRenewer == null || tokenRenewer.length() == 0) {
674 throw new IOException(
675 "Can't get Master Kerberos principal for the RM to use as renewer");
676 }
677
678 // For now, only getting tokens for the default file-system.
679 final Token<?> tokens[] =
680 fs.addDelegationTokens(tokenRenewer, credentials);
681 if (tokens != null) {
682 for (Token<?> token : tokens) {
683 LOG.info("Got dt for " + fs.getUri() + "; " + token);
684 }
685 }
686 DataOutputBuffer dob = new DataOutputBuffer();
687 credentials.writeTokenStorageToStream(dob);
688 ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
689 amContainer.setTokens(fsTokens);
690 }
691
692 appContext.setAMContainerSpec(amContainer);
693
694 // Set the priority for the application master
695 // TODO - what is the range for priority? how to decide?
696 Priority pri = Priority.newInstance(amPriority);
697 appContext.setPriority(pri);
698
699 // Set the queue to which this application is to be submitted in the RM
700 appContext.setQueue(amQueue);
701
702 // Submit the application to the applications manager
703 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
704 // Ignore the response as either a valid response object is returned on success
705 // or an exception thrown to denote some form of a failure
706 LOG.info("Submitting application to ASM");
707
708 yarnClient.submitApplication(appContext);
709
710 // TODO
711 // Try submitting the same request again
712 // app submission failure?
713
714 // Monitor the application
715 return monitorApplication(appId);
716
717 }
718
719 /**
720 * Monitor the submitted application for completion.
721 * Kill application if time expires.
722 * @param appId Application Id of application to be monitored
723 * @return true if application completed successfully
724 * @throws YarnException
725 * @throws IOException
726 */
727 private boolean monitorApplication(ApplicationId appId)
728 throws YarnException, IOException {
729
730 while (true) {
731
732 // Check app status every 1 second.
733 try {
734 Thread.sleep(1000);
735 } catch (InterruptedException e) {
736 LOG.debug("Thread sleep in monitoring loop interrupted");
737 }
738
739 // Get application report for the appId we are interested in
740 ApplicationReport report = yarnClient.getApplicationReport(appId);
741
742 LOG.info("Got application report from ASM for"
743 + ", appId=" + appId.getId()
744 + ", clientToAMToken=" + report.getClientToAMToken()
745 + ", appDiagnostics=" + report.getDiagnostics()
746 + ", appMasterHost=" + report.getHost()
747 + ", appQueue=" + report.getQueue()
748 + ", appMasterRpcPort=" + report.getRpcPort()
749 + ", appStartTime=" + report.getStartTime()
750 + ", yarnAppState=" + report.getYarnApplicationState().toString()
751 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
752 + ", appTrackingUrl=" + report.getTrackingUrl()
753 + ", appUser=" + report.getUser());
754
755 YarnApplicationState state = report.getYarnApplicationState();
756 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
757 if (YarnApplicationState.FINISHED == state) {
758 if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
759 LOG.info("Application has completed successfully. Breaking monitoring loop");
760 return true;
761 }
762 else {
763 LOG.info("Application did finished unsuccessfully."
764 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
765 + ". Breaking monitoring loop");
766 return false;
767 }
768 }
769 else if (YarnApplicationState.KILLED == state
770 || YarnApplicationState.FAILED == state) {
771 LOG.info("Application did not finish."
772 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
773 + ". Breaking monitoring loop");
774 return false;
775 }
776
777 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
778 LOG.info("Reached client specified timeout for application. Killing application");
779 forceKillApplication(appId);
780 return false;
781 }
782 }
783
784 }
785
786 /**
787 * Kill a submitted application by sending a call to the ASM
788 * @param appId Application Id to be killed.
789 * @throws YarnException
790 * @throws IOException
791 */
792 private void forceKillApplication(ApplicationId appId)
793 throws YarnException, IOException {
794 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at
795 // the same time.
796 // If yes, can we kill a particular attempt only?
797
798 // Response can be ignored as it is non-null on success or
799 // throws an exception in case of failures
800 yarnClient.killApplication(appId);
801 }
802
803 private void addToLocalResources(FileSystem fs, String fileSrcPath,
804 String fileDstPath, String appId, Map<String, LocalResource> localResources,
805 String resources) throws IOException {
806 String suffix =
807 appName + "/" + appId + "/" + fileDstPath;
808 Path dst =
809 new Path(fs.getHomeDirectory(), suffix);
810 if (fileSrcPath == null) {
811 FSDataOutputStream ostream = null;
812 try {
813 ostream = FileSystem
814 .create(fs, dst, new FsPermission((short) 0710));
815 ostream.writeUTF(resources);
816 } finally {
817 IOUtils.closeQuietly(ostream);
818 }
819 } else {
820 fs.copyFromLocalFile(new Path(fileSrcPath), dst);
821 }
822 FileStatus scFileStatus = fs.getFileStatus(dst);
823 LocalResource scRsrc =
824 LocalResource.newInstance(
825 ConverterUtils.getYarnUrlFromURI(dst.toUri()),
826 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
827 scFileStatus.getLen(), scFileStatus.getModificationTime());
828 localResources.put(fileDstPath, scRsrc);
829 }
830
831 private void prepareTimelineDomain() {
832 TimelineClient timelineClient = null;
833 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
834 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
835 timelineClient = TimelineClient.createTimelineClient();
836 timelineClient.init(conf);
837 timelineClient.start();
838 } else {
839 LOG.warn("Cannot put the domain " + domainId +
840 " because the timeline service is not enabled");
841 return;
842 }
843 try {
844 //TODO: we need to check and combine the existing timeline domain ACLs,
845 //but let's do it once we have client java library to query domains.
846 TimelineDomain domain = new TimelineDomain();
847 domain.setId(domainId);
848 domain.setReaders(
849 viewACLs != null && viewACLs.length() > 0 ? viewACLs : " ");
850 domain.setWriters(
851 modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " ");
852 timelineClient.putDomain(domain);
853 LOG.info("Put the timeline domain: " +
854 TimelineUtils.dumpTimelineRecordtoJSON(domain));
855 } catch (Exception e) {
856 LOG.error("Error when putting the timeline domain", e);
857 } finally {
858 timelineClient.stop();
859 }
860 }
861 }