001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.applications.distributedshell;
020
021 import java.io.BufferedReader;
022 import java.io.DataInputStream;
023 import java.io.File;
024 import java.io.FileInputStream;
025 import java.io.IOException;
026 import java.io.StringReader;
027 import java.lang.reflect.UndeclaredThrowableException;
028 import java.net.URI;
029 import java.net.URISyntaxException;
030 import java.nio.ByteBuffer;
031 import java.security.PrivilegedExceptionAction;
032 import java.util.ArrayList;
033 import java.util.HashMap;
034 import java.util.Iterator;
035 import java.util.List;
036 import java.util.Map;
037 import java.util.Vector;
038 import java.util.concurrent.ConcurrentHashMap;
039 import java.util.concurrent.ConcurrentMap;
040 import java.util.concurrent.atomic.AtomicInteger;
041
042 import org.apache.commons.cli.CommandLine;
043 import org.apache.commons.cli.GnuParser;
044 import org.apache.commons.cli.HelpFormatter;
045 import org.apache.commons.cli.Options;
046 import org.apache.commons.cli.ParseException;
047 import org.apache.commons.logging.Log;
048 import org.apache.commons.logging.LogFactory;
049 import org.apache.hadoop.classification.InterfaceAudience;
050 import org.apache.hadoop.classification.InterfaceAudience.Private;
051 import org.apache.hadoop.classification.InterfaceStability;
052 import org.apache.hadoop.conf.Configuration;
053 import org.apache.hadoop.fs.FileSystem;
054 import org.apache.hadoop.fs.Path;
055 import org.apache.hadoop.io.DataOutputBuffer;
056 import org.apache.hadoop.io.IOUtils;
057 import org.apache.hadoop.net.NetUtils;
058 import org.apache.hadoop.security.Credentials;
059 import org.apache.hadoop.security.UserGroupInformation;
060 import org.apache.hadoop.security.token.Token;
061 import org.apache.hadoop.util.ExitUtil;
062 import org.apache.hadoop.util.Shell;
063 import org.apache.hadoop.yarn.api.ApplicationConstants;
064 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
065 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
066 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
067 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
068 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
069 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
070 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
071 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
072 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
073 import org.apache.hadoop.yarn.api.records.Container;
074 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
075 import org.apache.hadoop.yarn.api.records.ContainerId;
076 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
077 import org.apache.hadoop.yarn.api.records.ContainerState;
078 import org.apache.hadoop.yarn.api.records.ContainerStatus;
079 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
080 import org.apache.hadoop.yarn.api.records.LocalResource;
081 import org.apache.hadoop.yarn.api.records.LocalResourceType;
082 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
083 import org.apache.hadoop.yarn.api.records.NodeReport;
084 import org.apache.hadoop.yarn.api.records.Priority;
085 import org.apache.hadoop.yarn.api.records.Resource;
086 import org.apache.hadoop.yarn.api.records.ResourceRequest;
087 import org.apache.hadoop.yarn.api.records.URL;
088 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
089 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
090 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
091 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
092 import org.apache.hadoop.yarn.client.api.TimelineClient;
093 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
094 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
095 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
096 import org.apache.hadoop.yarn.conf.YarnConfiguration;
097 import org.apache.hadoop.yarn.exceptions.YarnException;
098 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
099 import org.apache.hadoop.yarn.util.ConverterUtils;
100 import org.apache.log4j.LogManager;
101
102 import com.google.common.annotations.VisibleForTesting;
103
104 /**
105 * An ApplicationMaster for executing shell commands on a set of launched
106 * containers using the YARN framework.
107 *
108 * <p>
109 * This class is meant to act as an example on how to write yarn-based
110 * application masters.
111 * </p>
112 *
113 * <p>
114 * The ApplicationMaster is started on a container by the
115 * <code>ResourceManager</code>'s launcher. The first thing that the
116 * <code>ApplicationMaster</code> needs to do is to connect and register itself
117 * with the <code>ResourceManager</code>. The registration sets up information
118 * within the <code>ResourceManager</code> regarding what host:port the
119 * ApplicationMaster is listening on to provide any form of functionality to a
120 * client as well as a tracking url that a client can use to keep track of
121 * status/job history if needed. However, in the distributedshell, trackingurl
122 * and appMasterHost:appMasterRpcPort are not supported.
123 * </p>
124 *
125 * <p>
126 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
127 * <code>ResourceManager</code> at regular intervals to inform the
128 * <code>ResourceManager</code> that it is up and alive. The
129 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
130 * <code>ApplicationMaster</code> acts as a heartbeat.
131 *
132 * <p>
133 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
134 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
135 * required no. of containers using {@link ResourceRequest} with the necessary
136 * resource specifications such as node location, computational
137 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
138 * responds with an {@link AllocateResponse} that informs the
139 * <code>ApplicationMaster</code> of the set of newly allocated containers,
140 * completed containers as well as current state of available resources.
141 * </p>
142 *
143 * <p>
144 * For each allocated container, the <code>ApplicationMaster</code> can then set
145 * up the necessary launch context via {@link ContainerLaunchContext} to specify
146 * the allocated container id, local resources required by the executable, the
147 * environment to be setup for the executable, commands to execute, etc. and
148 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
149 * launch and execute the defined commands on the given allocated container.
150 * </p>
151 *
152 * <p>
153 * The <code>ApplicationMaster</code> can monitor the launched container by
154 * either querying the <code>ResourceManager</code> using
155 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
156 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
157 * container's {@link ContainerId}.
158 *
159 * <p>
160 * After the job has been completed, the <code>ApplicationMaster</code> has to
161 * send a {@link FinishApplicationMasterRequest} to the
162 * <code>ResourceManager</code> to inform it that the
163 * <code>ApplicationMaster</code> has been completed.
164 */
165 @InterfaceAudience.Public
166 @InterfaceStability.Unstable
167 public class ApplicationMaster {
168
169 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
170
171 @VisibleForTesting
172 @Private
173 public static enum DSEvent {
174 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
175 }
176
177 @VisibleForTesting
178 @Private
179 public static enum DSEntity {
180 DS_APP_ATTEMPT, DS_CONTAINER
181 }
182
183 // Configuration
184 private Configuration conf;
185
186 // Handle to communicate with the Resource Manager
187 @SuppressWarnings("rawtypes")
188 private AMRMClientAsync amRMClient;
189
190 // In both secure and non-secure modes, this points to the job-submitter.
191 private UserGroupInformation appSubmitterUgi;
192
193 // Handle to communicate with the Node Manager
194 private NMClientAsync nmClientAsync;
195 // Listen to process the response from the Node Manager
196 private NMCallbackHandler containerListener;
197
198 // Application Attempt Id ( combination of attemptId and fail count )
199 @VisibleForTesting
200 protected ApplicationAttemptId appAttemptID;
201
202 // TODO
203 // For status update for clients - yet to be implemented
204 // Hostname of the container
205 private String appMasterHostname = "";
206 // Port on which the app master listens for status updates from clients
207 private int appMasterRpcPort = -1;
208 // Tracking url to which app master publishes info for clients to monitor
209 private String appMasterTrackingUrl = "";
210
211 // App Master configuration
212 // No. of containers to run shell command on
213 @VisibleForTesting
214 protected int numTotalContainers = 1;
215 // Memory to request for the container on which the shell command will run
216 private int containerMemory = 10;
217 // VirtualCores to request for the container on which the shell command will run
218 private int containerVirtualCores = 1;
219 // Priority of the request
220 private int requestPriority;
221
222 // Counter for completed containers ( complete denotes successful or failed )
223 private AtomicInteger numCompletedContainers = new AtomicInteger();
224 // Allocated container count so that we know how many containers has the RM
225 // allocated to us
226 @VisibleForTesting
227 protected AtomicInteger numAllocatedContainers = new AtomicInteger();
228 // Count of failed containers
229 private AtomicInteger numFailedContainers = new AtomicInteger();
230 // Count of containers already requested from the RM
231 // Needed as once requested, we should not request for containers again.
232 // Only request for more if the original requirement changes.
233 @VisibleForTesting
234 protected AtomicInteger numRequestedContainers = new AtomicInteger();
235
236 // Shell command to be executed
237 private String shellCommand = "";
238 // Args to be passed to the shell command
239 private String shellArgs = "";
240 // Env variables to be setup for the shell command
241 private Map<String, String> shellEnv = new HashMap<String, String>();
242
243 // Location of shell script ( obtained from info set in env )
244 // Shell script path in fs
245 private String scriptPath = "";
246 // Timestamp needed for creating a local resource
247 private long shellScriptPathTimestamp = 0;
248 // File length needed for local resource
249 private long shellScriptPathLen = 0;
250
251 // Timeline domain ID
252 private String domainId = null;
253
254 // Hardcoded path to shell script in launch container's local env
255 private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
256 private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
257 + ".bat";
258
259 // Hardcoded path to custom log_properties
260 private static final String log4jPath = "log4j.properties";
261
262 private static final String shellCommandPath = "shellCommands";
263 private static final String shellArgsPath = "shellArgs";
264
265 private volatile boolean done;
266
267 private ByteBuffer allTokens;
268
269 // Launch threads
270 private List<Thread> launchThreads = new ArrayList<Thread>();
271
272 // Timeline Client
273 private TimelineClient timelineClient;
274
275 private final String linux_bash_command = "bash";
276 private final String windows_command = "cmd /c";
277
278 /**
279 * @param args Command line args
280 */
281 public static void main(String[] args) {
282 boolean result = false;
283 try {
284 ApplicationMaster appMaster = new ApplicationMaster();
285 LOG.info("Initializing ApplicationMaster");
286 boolean doRun = appMaster.init(args);
287 if (!doRun) {
288 System.exit(0);
289 }
290 appMaster.run();
291 result = appMaster.finish();
292 } catch (Throwable t) {
293 LOG.fatal("Error running ApplicationMaster", t);
294 LogManager.shutdown();
295 ExitUtil.terminate(1, t);
296 }
297 if (result) {
298 LOG.info("Application Master completed successfully. exiting");
299 System.exit(0);
300 } else {
301 LOG.info("Application Master failed. exiting");
302 System.exit(2);
303 }
304 }
305
306 /**
307 * Dump out contents of $CWD and the environment to stdout for debugging
308 */
309 private void dumpOutDebugInfo() {
310
311 LOG.info("Dump debug output");
312 Map<String, String> envs = System.getenv();
313 for (Map.Entry<String, String> env : envs.entrySet()) {
314 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
315 System.out.println("System env: key=" + env.getKey() + ", val="
316 + env.getValue());
317 }
318
319 BufferedReader buf = null;
320 try {
321 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
322 Shell.execCommand("ls", "-al");
323 buf = new BufferedReader(new StringReader(lines));
324 String line = "";
325 while ((line = buf.readLine()) != null) {
326 LOG.info("System CWD content: " + line);
327 System.out.println("System CWD content: " + line);
328 }
329 } catch (IOException e) {
330 e.printStackTrace();
331 } finally {
332 IOUtils.cleanup(LOG, buf);
333 }
334 }
335
336 public ApplicationMaster() {
337 // Set up the configuration
338 conf = new YarnConfiguration();
339 }
340
341 /**
342 * Parse command line options
343 *
344 * @param args Command line args
345 * @return Whether init successful and run should be invoked
346 * @throws ParseException
347 * @throws IOException
348 */
349 public boolean init(String[] args) throws ParseException, IOException {
350 Options opts = new Options();
351 opts.addOption("app_attempt_id", true,
352 "App Attempt ID. Not to be used unless for testing purposes");
353 opts.addOption("shell_env", true,
354 "Environment for shell script. Specified as env_key=env_val pairs");
355 opts.addOption("container_memory", true,
356 "Amount of memory in MB to be requested to run the shell command");
357 opts.addOption("container_vcores", true,
358 "Amount of virtual cores to be requested to run the shell command");
359 opts.addOption("num_containers", true,
360 "No. of containers on which the shell command needs to be executed");
361 opts.addOption("priority", true, "Application Priority. Default 0");
362 opts.addOption("debug", false, "Dump out debug information");
363
364 opts.addOption("help", false, "Print usage");
365 CommandLine cliParser = new GnuParser().parse(opts, args);
366
367 if (args.length == 0) {
368 printUsage(opts);
369 throw new IllegalArgumentException(
370 "No args specified for application master to initialize");
371 }
372
373 //Check whether customer log4j.properties file exists
374 if (fileExist(log4jPath)) {
375 try {
376 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
377 log4jPath);
378 } catch (Exception e) {
379 LOG.warn("Can not set up custom log4j properties. " + e);
380 }
381 }
382
383 if (cliParser.hasOption("help")) {
384 printUsage(opts);
385 return false;
386 }
387
388 if (cliParser.hasOption("debug")) {
389 dumpOutDebugInfo();
390 }
391
392 Map<String, String> envs = System.getenv();
393
394 if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
395 if (cliParser.hasOption("app_attempt_id")) {
396 String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
397 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
398 } else {
399 throw new IllegalArgumentException(
400 "Application Attempt Id not set in the environment");
401 }
402 } else {
403 ContainerId containerId = ConverterUtils.toContainerId(envs
404 .get(Environment.CONTAINER_ID.name()));
405 appAttemptID = containerId.getApplicationAttemptId();
406 }
407
408 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
409 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
410 + " not set in the environment");
411 }
412 if (!envs.containsKey(Environment.NM_HOST.name())) {
413 throw new RuntimeException(Environment.NM_HOST.name()
414 + " not set in the environment");
415 }
416 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
417 throw new RuntimeException(Environment.NM_HTTP_PORT
418 + " not set in the environment");
419 }
420 if (!envs.containsKey(Environment.NM_PORT.name())) {
421 throw new RuntimeException(Environment.NM_PORT.name()
422 + " not set in the environment");
423 }
424
425 LOG.info("Application master for app" + ", appId="
426 + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
427 + appAttemptID.getApplicationId().getClusterTimestamp()
428 + ", attemptId=" + appAttemptID.getAttemptId());
429
430 if (!fileExist(shellCommandPath)
431 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
432 throw new IllegalArgumentException(
433 "No shell command or shell script specified to be executed by application master");
434 }
435
436 if (fileExist(shellCommandPath)) {
437 shellCommand = readContent(shellCommandPath);
438 }
439
440 if (fileExist(shellArgsPath)) {
441 shellArgs = readContent(shellArgsPath);
442 }
443
444 if (cliParser.hasOption("shell_env")) {
445 String shellEnvs[] = cliParser.getOptionValues("shell_env");
446 for (String env : shellEnvs) {
447 env = env.trim();
448 int index = env.indexOf('=');
449 if (index == -1) {
450 shellEnv.put(env, "");
451 continue;
452 }
453 String key = env.substring(0, index);
454 String val = "";
455 if (index < (env.length() - 1)) {
456 val = env.substring(index + 1);
457 }
458 shellEnv.put(key, val);
459 }
460 }
461
462 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
463 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
464
465 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
466 shellScriptPathTimestamp = Long.valueOf(envs
467 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
468 }
469 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
470 shellScriptPathLen = Long.valueOf(envs
471 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
472 }
473 if (!scriptPath.isEmpty()
474 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
475 LOG.error("Illegal values in env for shell script path" + ", path="
476 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
477 + shellScriptPathTimestamp);
478 throw new IllegalArgumentException(
479 "Illegal values in env for shell script path");
480 }
481 }
482
483 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
484 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
485 }
486
487 containerMemory = Integer.parseInt(cliParser.getOptionValue(
488 "container_memory", "10"));
489 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
490 "container_vcores", "1"));
491 numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
492 "num_containers", "1"));
493 if (numTotalContainers == 0) {
494 throw new IllegalArgumentException(
495 "Cannot run distributed shell with no containers");
496 }
497 requestPriority = Integer.parseInt(cliParser
498 .getOptionValue("priority", "0"));
499
500 // Creating the Timeline Client
501 timelineClient = TimelineClient.createTimelineClient();
502 timelineClient.init(conf);
503 timelineClient.start();
504
505 return true;
506 }
507
508 /**
509 * Helper function to print usage
510 *
511 * @param opts Parsed command line options
512 */
513 private void printUsage(Options opts) {
514 new HelpFormatter().printHelp("ApplicationMaster", opts);
515 }
516
517 /**
518 * Main run function for the application master
519 *
520 * @throws YarnException
521 * @throws IOException
522 */
523 @SuppressWarnings({ "unchecked" })
524 public void run() throws YarnException, IOException {
525 LOG.info("Starting ApplicationMaster");
526
527 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
528 // are marked as LimitedPrivate
529 Credentials credentials =
530 UserGroupInformation.getCurrentUser().getCredentials();
531 DataOutputBuffer dob = new DataOutputBuffer();
532 credentials.writeTokenStorageToStream(dob);
533 // Now remove the AM->RM token so that containers cannot access it.
534 Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
535 LOG.info("Executing with tokens:");
536 while (iter.hasNext()) {
537 Token<?> token = iter.next();
538 LOG.info(token);
539 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
540 iter.remove();
541 }
542 }
543 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
544
545 // Create appSubmitterUgi and add original tokens to it
546 String appSubmitterUserName =
547 System.getenv(ApplicationConstants.Environment.USER.name());
548 appSubmitterUgi =
549 UserGroupInformation.createRemoteUser(appSubmitterUserName);
550 appSubmitterUgi.addCredentials(credentials);
551
552 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
553 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
554
555 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
556 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
557 amRMClient.init(conf);
558 amRMClient.start();
559
560 containerListener = createNMCallbackHandler();
561 nmClientAsync = new NMClientAsyncImpl(containerListener);
562 nmClientAsync.init(conf);
563 nmClientAsync.start();
564
565 // Setup local RPC Server to accept status requests directly from clients
566 // TODO need to setup a protocol for client to be able to communicate to
567 // the RPC server
568 // TODO use the rpc port info to register with the RM for the client to
569 // send requests to this app master
570
571 // Register self with ResourceManager
572 // This will start heartbeating to the RM
573 appMasterHostname = NetUtils.getHostname();
574 RegisterApplicationMasterResponse response = amRMClient
575 .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
576 appMasterTrackingUrl);
577 // Dump out information about cluster capability as seen by the
578 // resource manager
579 int maxMem = response.getMaximumResourceCapability().getMemory();
580 LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
581
582 int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
583 LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
584
585 // A resource ask cannot exceed the max.
586 if (containerMemory > maxMem) {
587 LOG.info("Container memory specified above max threshold of cluster."
588 + " Using max value." + ", specified=" + containerMemory + ", max="
589 + maxMem);
590 containerMemory = maxMem;
591 }
592
593 if (containerVirtualCores > maxVCores) {
594 LOG.info("Container virtual cores specified above max threshold of cluster."
595 + " Using max value." + ", specified=" + containerVirtualCores + ", max="
596 + maxVCores);
597 containerVirtualCores = maxVCores;
598 }
599
600 List<Container> previousAMRunningContainers =
601 response.getContainersFromPreviousAttempts();
602 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
603 + " previous attempts' running containers on AM registration.");
604 numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
605
606 int numTotalContainersToRequest =
607 numTotalContainers - previousAMRunningContainers.size();
608 // Setup ask for containers from RM
609 // Send request for containers to RM
610 // Until we get our fully allocated quota, we keep on polling RM for
611 // containers
612 // Keep looping until all the containers are launched and shell script
613 // executed on them ( regardless of success/failure).
614 for (int i = 0; i < numTotalContainersToRequest; ++i) {
615 ContainerRequest containerAsk = setupContainerAskForRM();
616 amRMClient.addContainerRequest(containerAsk);
617 }
618 numRequestedContainers.set(numTotalContainers);
619
620 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
621 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
622 }
623
624 @VisibleForTesting
625 NMCallbackHandler createNMCallbackHandler() {
626 return new NMCallbackHandler(this);
627 }
628
629 @VisibleForTesting
630 protected boolean finish() {
631 // wait for completion.
632 while (!done
633 && (numCompletedContainers.get() != numTotalContainers)) {
634 try {
635 Thread.sleep(200);
636 } catch (InterruptedException ex) {}
637 }
638
639 // Join all launched threads
640 // needed for when we time out
641 // and we need to release containers
642 for (Thread launchThread : launchThreads) {
643 try {
644 launchThread.join(10000);
645 } catch (InterruptedException e) {
646 LOG.info("Exception thrown in thread join: " + e.getMessage());
647 e.printStackTrace();
648 }
649 }
650
651 // When the application completes, it should stop all running containers
652 LOG.info("Application completed. Stopping running containers");
653 nmClientAsync.stop();
654
655 // When the application completes, it should send a finish application
656 // signal to the RM
657 LOG.info("Application completed. Signalling finish to RM");
658
659 FinalApplicationStatus appStatus;
660 String appMessage = null;
661 boolean success = true;
662 if (numFailedContainers.get() == 0 &&
663 numCompletedContainers.get() == numTotalContainers) {
664 appStatus = FinalApplicationStatus.SUCCEEDED;
665 } else {
666 appStatus = FinalApplicationStatus.FAILED;
667 appMessage = "Diagnostics." + ", total=" + numTotalContainers
668 + ", completed=" + numCompletedContainers.get() + ", allocated="
669 + numAllocatedContainers.get() + ", failed="
670 + numFailedContainers.get();
671 LOG.info(appMessage);
672 success = false;
673 }
674 try {
675 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
676 } catch (YarnException ex) {
677 LOG.error("Failed to unregister application", ex);
678 } catch (IOException e) {
679 LOG.error("Failed to unregister application", e);
680 }
681
682 amRMClient.stop();
683
684 return success;
685 }
686
687 private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
688 @SuppressWarnings("unchecked")
689 @Override
690 public void onContainersCompleted(List<ContainerStatus> completedContainers) {
691 LOG.info("Got response from RM for container ask, completedCnt="
692 + completedContainers.size());
693 for (ContainerStatus containerStatus : completedContainers) {
694 LOG.info(appAttemptID + " got container status for containerID="
695 + containerStatus.getContainerId() + ", state="
696 + containerStatus.getState() + ", exitStatus="
697 + containerStatus.getExitStatus() + ", diagnostics="
698 + containerStatus.getDiagnostics());
699
700 // non complete containers should not be here
701 assert (containerStatus.getState() == ContainerState.COMPLETE);
702
703 // increment counters for completed/failed containers
704 int exitStatus = containerStatus.getExitStatus();
705 if (0 != exitStatus) {
706 // container failed
707 if (ContainerExitStatus.ABORTED != exitStatus) {
708 // shell script failed
709 // counts as completed
710 numCompletedContainers.incrementAndGet();
711 numFailedContainers.incrementAndGet();
712 } else {
713 // container was killed by framework, possibly preempted
714 // we should re-try as the container was lost for some reason
715 numAllocatedContainers.decrementAndGet();
716 numRequestedContainers.decrementAndGet();
717 // we do not need to release the container as it would be done
718 // by the RM
719 }
720 } else {
721 // nothing to do
722 // container completed successfully
723 numCompletedContainers.incrementAndGet();
724 LOG.info("Container completed successfully." + ", containerId="
725 + containerStatus.getContainerId());
726 }
727 publishContainerEndEvent(
728 timelineClient, containerStatus, domainId, appSubmitterUgi);
729 }
730
731 // ask for more containers if any failed
732 int askCount = numTotalContainers - numRequestedContainers.get();
733 numRequestedContainers.addAndGet(askCount);
734
735 if (askCount > 0) {
736 for (int i = 0; i < askCount; ++i) {
737 ContainerRequest containerAsk = setupContainerAskForRM();
738 amRMClient.addContainerRequest(containerAsk);
739 }
740 }
741
742 if (numCompletedContainers.get() == numTotalContainers) {
743 done = true;
744 }
745 }
746
747 @Override
748 public void onContainersAllocated(List<Container> allocatedContainers) {
749 LOG.info("Got response from RM for container ask, allocatedCnt="
750 + allocatedContainers.size());
751 numAllocatedContainers.addAndGet(allocatedContainers.size());
752 for (Container allocatedContainer : allocatedContainers) {
753 LOG.info("Launching shell command on a new container."
754 + ", containerId=" + allocatedContainer.getId()
755 + ", containerNode=" + allocatedContainer.getNodeId().getHost()
756 + ":" + allocatedContainer.getNodeId().getPort()
757 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
758 + ", containerResourceMemory"
759 + allocatedContainer.getResource().getMemory()
760 + ", containerResourceVirtualCores"
761 + allocatedContainer.getResource().getVirtualCores());
762 // + ", containerToken"
763 // +allocatedContainer.getContainerToken().getIdentifier().toString());
764
765 LaunchContainerRunnable runnableLaunchContainer =
766 new LaunchContainerRunnable(allocatedContainer, containerListener);
767 Thread launchThread = new Thread(runnableLaunchContainer);
768
769 // launch and start the container on a separate thread to keep
770 // the main thread unblocked
771 // as all containers may not be allocated at one go.
772 launchThreads.add(launchThread);
773 launchThread.start();
774 }
775 }
776
777 @Override
778 public void onShutdownRequest() {
779 done = true;
780 }
781
782 @Override
783 public void onNodesUpdated(List<NodeReport> updatedNodes) {}
784
785 @Override
786 public float getProgress() {
787 // set progress to deliver to RM on next heartbeat
788 float progress = (float) numCompletedContainers.get()
789 / numTotalContainers;
790 return progress;
791 }
792
793 @Override
794 public void onError(Throwable e) {
795 done = true;
796 amRMClient.stop();
797 }
798 }
799
800 @VisibleForTesting
801 static class NMCallbackHandler
802 implements NMClientAsync.CallbackHandler {
803
804 private ConcurrentMap<ContainerId, Container> containers =
805 new ConcurrentHashMap<ContainerId, Container>();
806 private final ApplicationMaster applicationMaster;
807
808 public NMCallbackHandler(ApplicationMaster applicationMaster) {
809 this.applicationMaster = applicationMaster;
810 }
811
812 public void addContainer(ContainerId containerId, Container container) {
813 containers.putIfAbsent(containerId, container);
814 }
815
816 @Override
817 public void onContainerStopped(ContainerId containerId) {
818 if (LOG.isDebugEnabled()) {
819 LOG.debug("Succeeded to stop Container " + containerId);
820 }
821 containers.remove(containerId);
822 }
823
824 @Override
825 public void onContainerStatusReceived(ContainerId containerId,
826 ContainerStatus containerStatus) {
827 if (LOG.isDebugEnabled()) {
828 LOG.debug("Container Status: id=" + containerId + ", status=" +
829 containerStatus);
830 }
831 }
832
833 @Override
834 public void onContainerStarted(ContainerId containerId,
835 Map<String, ByteBuffer> allServiceResponse) {
836 if (LOG.isDebugEnabled()) {
837 LOG.debug("Succeeded to start Container " + containerId);
838 }
839 Container container = containers.get(containerId);
840 if (container != null) {
841 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
842 }
843 ApplicationMaster.publishContainerStartEvent(
844 applicationMaster.timelineClient, container,
845 applicationMaster.domainId, applicationMaster.appSubmitterUgi);
846 }
847
848 @Override
849 public void onStartContainerError(ContainerId containerId, Throwable t) {
850 LOG.error("Failed to start Container " + containerId);
851 containers.remove(containerId);
852 applicationMaster.numCompletedContainers.incrementAndGet();
853 applicationMaster.numFailedContainers.incrementAndGet();
854 }
855
856 @Override
857 public void onGetContainerStatusError(
858 ContainerId containerId, Throwable t) {
859 LOG.error("Failed to query the status of Container " + containerId);
860 }
861
862 @Override
863 public void onStopContainerError(ContainerId containerId, Throwable t) {
864 LOG.error("Failed to stop Container " + containerId);
865 containers.remove(containerId);
866 }
867 }
868
869 /**
870 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
871 * that will execute the shell command.
872 */
873 private class LaunchContainerRunnable implements Runnable {
874
875 // Allocated container
876 Container container;
877
878 NMCallbackHandler containerListener;
879
880 /**
881 * @param lcontainer Allocated container
882 * @param containerListener Callback handler of the container
883 */
884 public LaunchContainerRunnable(
885 Container lcontainer, NMCallbackHandler containerListener) {
886 this.container = lcontainer;
887 this.containerListener = containerListener;
888 }
889
890 @Override
891 /**
892 * Connects to CM, sets up container launch context
893 * for shell command and eventually dispatches the container
894 * start request to the CM.
895 */
896 public void run() {
897 LOG.info("Setting up container launch container for containerid="
898 + container.getId());
899
900 // Set the local resources
901 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
902
903 // The container for the eventual shell commands needs its own local
904 // resources too.
905 // In this scenario, if a shell script is specified, we need to have it
906 // copied and made available to the container.
907 if (!scriptPath.isEmpty()) {
908 Path renamedScriptPath = null;
909 if (Shell.WINDOWS) {
910 renamedScriptPath = new Path(scriptPath + ".bat");
911 } else {
912 renamedScriptPath = new Path(scriptPath + ".sh");
913 }
914
915 try {
916 // rename the script file based on the underlying OS syntax.
917 renameScriptFile(renamedScriptPath);
918 } catch (Exception e) {
919 LOG.error(
920 "Not able to add suffix (.bat/.sh) to the shell script filename",
921 e);
922 // We know we cannot continue launching the container
923 // so we should release it.
924 numCompletedContainers.incrementAndGet();
925 numFailedContainers.incrementAndGet();
926 return;
927 }
928
929 URL yarnUrl = null;
930 try {
931 yarnUrl = ConverterUtils.getYarnUrlFromURI(
932 new URI(renamedScriptPath.toString()));
933 } catch (URISyntaxException e) {
934 LOG.error("Error when trying to use shell script path specified"
935 + " in env, path=" + renamedScriptPath, e);
936 // A failure scenario on bad input such as invalid shell script path
937 // We know we cannot continue launching the container
938 // so we should release it.
939 // TODO
940 numCompletedContainers.incrementAndGet();
941 numFailedContainers.incrementAndGet();
942 return;
943 }
944 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
945 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
946 shellScriptPathLen, shellScriptPathTimestamp);
947 localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
948 ExecShellStringPath, shellRsrc);
949 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
950 }
951
952 // Set the necessary command to execute on the allocated container
953 Vector<CharSequence> vargs = new Vector<CharSequence>(5);
954
955 // Set executable command
956 vargs.add(shellCommand);
957 // Set shell script path
958 if (!scriptPath.isEmpty()) {
959 vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
960 : ExecShellStringPath);
961 }
962
963 // Set args for the shell command if any
964 vargs.add(shellArgs);
965 // Add log redirect params
966 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
967 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
968
969 // Get final commmand
970 StringBuilder command = new StringBuilder();
971 for (CharSequence str : vargs) {
972 command.append(str).append(" ");
973 }
974
975 List<String> commands = new ArrayList<String>();
976 commands.add(command.toString());
977
978 // Set up ContainerLaunchContext, setting local resource, environment,
979 // command and token for constructor.
980
981 // Note for tokens: Set up tokens for the container too. Today, for normal
982 // shell commands, the container in distribute-shell doesn't need any
983 // tokens. We are populating them mainly for NodeManagers to be able to
984 // download anyfiles in the distributed file-system. The tokens are
985 // otherwise also useful in cases, for e.g., when one is running a
986 // "hadoop dfs" command inside the distributed shell.
987 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
988 localResources, shellEnv, commands, null, allTokens.duplicate(), null);
989 containerListener.addContainer(container.getId(), container);
990 nmClientAsync.startContainerAsync(container, ctx);
991 }
992 }
993
994 private void renameScriptFile(final Path renamedScriptPath)
995 throws IOException, InterruptedException {
996 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
997 @Override
998 public Void run() throws IOException {
999 FileSystem fs = renamedScriptPath.getFileSystem(conf);
1000 fs.rename(new Path(scriptPath), renamedScriptPath);
1001 return null;
1002 }
1003 });
1004 LOG.info("User " + appSubmitterUgi.getUserName()
1005 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1006 }
1007
1008 /**
1009 * Setup the request that will be sent to the RM for the container ask.
1010 *
1011 * @return the setup ResourceRequest to be sent to RM
1012 */
1013 private ContainerRequest setupContainerAskForRM() {
1014 // setup requirements for hosts
1015 // using * as any host will do for the distributed shell app
1016 // set the priority for the request
1017 // TODO - what is the range for priority? how to decide?
1018 Priority pri = Priority.newInstance(requestPriority);
1019
1020 // Set up resource type requirements
1021 // For now, memory and CPU are supported so we set memory and cpu requirements
1022 Resource capability = Resource.newInstance(containerMemory,
1023 containerVirtualCores);
1024
1025 ContainerRequest request = new ContainerRequest(capability, null, null,
1026 pri);
1027 LOG.info("Requested container ask: " + request.toString());
1028 return request;
1029 }
1030
1031 private boolean fileExist(String filePath) {
1032 return new File(filePath).exists();
1033 }
1034
1035 private String readContent(String filePath) throws IOException {
1036 DataInputStream ds = null;
1037 try {
1038 ds = new DataInputStream(new FileInputStream(filePath));
1039 return ds.readUTF();
1040 } finally {
1041 org.apache.commons.io.IOUtils.closeQuietly(ds);
1042 }
1043 }
1044
1045 private static void publishContainerStartEvent(
1046 final TimelineClient timelineClient, Container container, String domainId,
1047 UserGroupInformation ugi) {
1048 final TimelineEntity entity = new TimelineEntity();
1049 entity.setEntityId(container.getId().toString());
1050 entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1051 entity.setDomainId(domainId);
1052 entity.addPrimaryFilter("user", ugi.getShortUserName());
1053 TimelineEvent event = new TimelineEvent();
1054 event.setTimestamp(System.currentTimeMillis());
1055 event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1056 event.addEventInfo("Node", container.getNodeId().toString());
1057 event.addEventInfo("Resources", container.getResource().toString());
1058 entity.addEvent(event);
1059
1060 try {
1061 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1062 @Override
1063 public TimelinePutResponse run() throws Exception {
1064 return timelineClient.putEntities(entity);
1065 }
1066 });
1067 } catch (Exception e) {
1068 LOG.error("Container start event could not be published for "
1069 + container.getId().toString(),
1070 e instanceof UndeclaredThrowableException ? e.getCause() : e);
1071 }
1072 }
1073
1074 private static void publishContainerEndEvent(
1075 final TimelineClient timelineClient, ContainerStatus container,
1076 String domainId, UserGroupInformation ugi) {
1077 final TimelineEntity entity = new TimelineEntity();
1078 entity.setEntityId(container.getContainerId().toString());
1079 entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1080 entity.setDomainId(domainId);
1081 entity.addPrimaryFilter("user", ugi.getShortUserName());
1082 TimelineEvent event = new TimelineEvent();
1083 event.setTimestamp(System.currentTimeMillis());
1084 event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1085 event.addEventInfo("State", container.getState().name());
1086 event.addEventInfo("Exit Status", container.getExitStatus());
1087 entity.addEvent(event);
1088
1089 try {
1090 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1091 @Override
1092 public TimelinePutResponse run() throws Exception {
1093 return timelineClient.putEntities(entity);
1094 }
1095 });
1096 } catch (Exception e) {
1097 LOG.error("Container end event could not be published for "
1098 + container.getContainerId().toString(),
1099 e instanceof UndeclaredThrowableException ? e.getCause() : e);
1100 }
1101 }
1102
1103 private static void publishApplicationAttemptEvent(
1104 final TimelineClient timelineClient, String appAttemptId,
1105 DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1106 final TimelineEntity entity = new TimelineEntity();
1107 entity.setEntityId(appAttemptId);
1108 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1109 entity.setDomainId(domainId);
1110 entity.addPrimaryFilter("user", ugi.getShortUserName());
1111 TimelineEvent event = new TimelineEvent();
1112 event.setEventType(appEvent.toString());
1113 event.setTimestamp(System.currentTimeMillis());
1114 entity.addEvent(event);
1115
1116 try {
1117 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1118 @Override
1119 public TimelinePutResponse run() throws Exception {
1120 return timelineClient.putEntities(entity);
1121 }
1122 });
1123 } catch (Exception e) {
1124 LOG.error("App Attempt "
1125 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1126 + " event could not be published for "
1127 + appAttemptId.toString(),
1128 e instanceof UndeclaredThrowableException ? e.getCause() : e);
1129 }
1130 }
1131 }