001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.DataInputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.StringReader; 027import java.lang.reflect.UndeclaredThrowableException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.ByteBuffer; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.Vector; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.atomic.AtomicInteger; 043 044import org.apache.commons.cli.CommandLine; 045import org.apache.commons.cli.GnuParser; 046import org.apache.commons.cli.HelpFormatter; 047import org.apache.commons.cli.Options; 048import org.apache.commons.cli.ParseException; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.apache.hadoop.classification.InterfaceAudience; 052import org.apache.hadoop.classification.InterfaceAudience.Private; 053import org.apache.hadoop.classification.InterfaceStability; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.io.DataOutputBuffer; 058import org.apache.hadoop.io.IOUtils; 059import org.apache.hadoop.net.NetUtils; 060import org.apache.hadoop.security.Credentials; 061import org.apache.hadoop.security.UserGroupInformation; 062import org.apache.hadoop.security.token.Token; 063import org.apache.hadoop.util.ExitUtil; 064import org.apache.hadoop.util.Shell; 065import org.apache.hadoop.yarn.api.ApplicationConstants; 066import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 067import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 068import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 069import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 070import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 071import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 072import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 073import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 074import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 075import org.apache.hadoop.yarn.api.records.Container; 076import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 077import org.apache.hadoop.yarn.api.records.ContainerId; 078import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 079import org.apache.hadoop.yarn.api.records.ContainerState; 080import org.apache.hadoop.yarn.api.records.ContainerStatus; 081import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 082import org.apache.hadoop.yarn.api.records.LocalResource; 083import org.apache.hadoop.yarn.api.records.LocalResourceType; 084import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 085import org.apache.hadoop.yarn.api.records.NodeReport; 086import org.apache.hadoop.yarn.api.records.Priority; 087import org.apache.hadoop.yarn.api.records.Resource; 088import org.apache.hadoop.yarn.api.records.ResourceRequest; 089import org.apache.hadoop.yarn.api.records.URL; 090import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 091import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 092import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 093import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 094import org.apache.hadoop.yarn.client.api.TimelineClient; 095import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 096import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 097import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 098import org.apache.hadoop.yarn.conf.YarnConfiguration; 099import org.apache.hadoop.yarn.exceptions.YarnException; 100import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 101import org.apache.hadoop.yarn.util.ConverterUtils; 102import org.apache.log4j.LogManager; 103 104import com.google.common.annotations.VisibleForTesting; 105 106/** 107 * An ApplicationMaster for executing shell commands on a set of launched 108 * containers using the YARN framework. 109 * 110 * <p> 111 * This class is meant to act as an example on how to write yarn-based 112 * application masters. 113 * </p> 114 * 115 * <p> 116 * The ApplicationMaster is started on a container by the 117 * <code>ResourceManager</code>'s launcher. The first thing that the 118 * <code>ApplicationMaster</code> needs to do is to connect and register itself 119 * with the <code>ResourceManager</code>. The registration sets up information 120 * within the <code>ResourceManager</code> regarding what host:port the 121 * ApplicationMaster is listening on to provide any form of functionality to a 122 * client as well as a tracking url that a client can use to keep track of 123 * status/job history if needed. However, in the distributedshell, trackingurl 124 * and appMasterHost:appMasterRpcPort are not supported. 125 * </p> 126 * 127 * <p> 128 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 129 * <code>ResourceManager</code> at regular intervals to inform the 130 * <code>ResourceManager</code> that it is up and alive. The 131 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 132 * <code>ApplicationMaster</code> acts as a heartbeat. 133 * 134 * <p> 135 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 136 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 137 * required no. of containers using {@link ResourceRequest} with the necessary 138 * resource specifications such as node location, computational 139 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 140 * responds with an {@link AllocateResponse} that informs the 141 * <code>ApplicationMaster</code> of the set of newly allocated containers, 142 * completed containers as well as current state of available resources. 143 * </p> 144 * 145 * <p> 146 * For each allocated container, the <code>ApplicationMaster</code> can then set 147 * up the necessary launch context via {@link ContainerLaunchContext} to specify 148 * the allocated container id, local resources required by the executable, the 149 * environment to be setup for the executable, commands to execute, etc. and 150 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 151 * launch and execute the defined commands on the given allocated container. 152 * </p> 153 * 154 * <p> 155 * The <code>ApplicationMaster</code> can monitor the launched container by 156 * either querying the <code>ResourceManager</code> using 157 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 158 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 159 * container's {@link ContainerId}. 160 * 161 * <p> 162 * After the job has been completed, the <code>ApplicationMaster</code> has to 163 * send a {@link FinishApplicationMasterRequest} to the 164 * <code>ResourceManager</code> to inform it that the 165 * <code>ApplicationMaster</code> has been completed. 166 */ 167@InterfaceAudience.Public 168@InterfaceStability.Unstable 169public class ApplicationMaster { 170 171 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 172 173 @VisibleForTesting 174 @Private 175 public static enum DSEvent { 176 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 177 } 178 179 @VisibleForTesting 180 @Private 181 public static enum DSEntity { 182 DS_APP_ATTEMPT, DS_CONTAINER 183 } 184 185 private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; 186 187 // Configuration 188 private Configuration conf; 189 190 // Handle to communicate with the Resource Manager 191 @SuppressWarnings("rawtypes") 192 private AMRMClientAsync amRMClient; 193 194 // In both secure and non-secure modes, this points to the job-submitter. 195 @VisibleForTesting 196 UserGroupInformation appSubmitterUgi; 197 198 // Handle to communicate with the Node Manager 199 private NMClientAsync nmClientAsync; 200 // Listen to process the response from the Node Manager 201 private NMCallbackHandler containerListener; 202 203 // Application Attempt Id ( combination of attemptId and fail count ) 204 @VisibleForTesting 205 protected ApplicationAttemptId appAttemptID; 206 207 // TODO 208 // For status update for clients - yet to be implemented 209 // Hostname of the container 210 private String appMasterHostname = ""; 211 // Port on which the app master listens for status updates from clients 212 private int appMasterRpcPort = -1; 213 // Tracking url to which app master publishes info for clients to monitor 214 private String appMasterTrackingUrl = ""; 215 216 // App Master configuration 217 // No. of containers to run shell command on 218 @VisibleForTesting 219 protected int numTotalContainers = 1; 220 // Memory to request for the container on which the shell command will run 221 private int containerMemory = 10; 222 // VirtualCores to request for the container on which the shell command will run 223 private int containerVirtualCores = 1; 224 // Priority of the request 225 private int requestPriority; 226 227 // Counter for completed containers ( complete denotes successful or failed ) 228 private AtomicInteger numCompletedContainers = new AtomicInteger(); 229 // Allocated container count so that we know how many containers has the RM 230 // allocated to us 231 @VisibleForTesting 232 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 233 // Count of failed containers 234 private AtomicInteger numFailedContainers = new AtomicInteger(); 235 // Count of containers already requested from the RM 236 // Needed as once requested, we should not request for containers again. 237 // Only request for more if the original requirement changes. 238 @VisibleForTesting 239 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 240 241 // Shell command to be executed 242 private String shellCommand = ""; 243 // Args to be passed to the shell command 244 private String shellArgs = ""; 245 // Env variables to be setup for the shell command 246 private Map<String, String> shellEnv = new HashMap<String, String>(); 247 248 // Location of shell script ( obtained from info set in env ) 249 // Shell script path in fs 250 private String scriptPath = ""; 251 // Timestamp needed for creating a local resource 252 private long shellScriptPathTimestamp = 0; 253 // File length needed for local resource 254 private long shellScriptPathLen = 0; 255 256 // Timeline domain ID 257 private String domainId = null; 258 259 // Hardcoded path to shell script in launch container's local env 260 private static final String EXEC_SHELL_STRING_PATH = Client.SCRIPT_PATH 261 + ".sh"; 262 private static final String EXEC_BAT_SCRIPT_STRING_PATH = Client.SCRIPT_PATH 263 + ".bat"; 264 265 // Hardcoded path to custom log_properties 266 private static final String log4jPath = "log4j.properties"; 267 268 private static final String shellCommandPath = "shellCommands"; 269 private static final String shellArgsPath = "shellArgs"; 270 271 private volatile boolean done; 272 273 private ByteBuffer allTokens; 274 275 // Launch threads 276 private List<Thread> launchThreads = new ArrayList<Thread>(); 277 278 // Timeline Client 279 private TimelineClient timelineClient; 280 281 private final String linux_bash_command = "bash"; 282 private final String windows_command = "cmd /c"; 283 284 private int yarnShellIdCounter = 1; 285 286 @VisibleForTesting 287 protected final Set<ContainerId> launchedContainers = 288 Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); 289 290 /** 291 * @param args Command line args 292 */ 293 public static void main(String[] args) { 294 boolean result = false; 295 try { 296 ApplicationMaster appMaster = new ApplicationMaster(); 297 LOG.info("Initializing ApplicationMaster"); 298 boolean doRun = appMaster.init(args); 299 if (!doRun) { 300 System.exit(0); 301 } 302 appMaster.run(); 303 result = appMaster.finish(); 304 } catch (Throwable t) { 305 LOG.fatal("Error running ApplicationMaster", t); 306 LogManager.shutdown(); 307 ExitUtil.terminate(1, t); 308 } 309 if (result) { 310 LOG.info("Application Master completed successfully. exiting"); 311 System.exit(0); 312 } else { 313 LOG.info("Application Master failed. exiting"); 314 System.exit(2); 315 } 316 } 317 318 /** 319 * Dump out contents of $CWD and the environment to stdout for debugging 320 */ 321 private void dumpOutDebugInfo() { 322 323 LOG.info("Dump debug output"); 324 Map<String, String> envs = System.getenv(); 325 for (Map.Entry<String, String> env : envs.entrySet()) { 326 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 327 System.out.println("System env: key=" + env.getKey() + ", val=" 328 + env.getValue()); 329 } 330 331 BufferedReader buf = null; 332 try { 333 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 334 Shell.execCommand("ls", "-al"); 335 buf = new BufferedReader(new StringReader(lines)); 336 String line = ""; 337 while ((line = buf.readLine()) != null) { 338 LOG.info("System CWD content: " + line); 339 System.out.println("System CWD content: " + line); 340 } 341 } catch (IOException e) { 342 e.printStackTrace(); 343 } finally { 344 IOUtils.cleanup(LOG, buf); 345 } 346 } 347 348 public ApplicationMaster() { 349 // Set up the configuration 350 conf = new YarnConfiguration(); 351 } 352 353 /** 354 * Parse command line options 355 * 356 * @param args Command line args 357 * @return Whether init successful and run should be invoked 358 * @throws ParseException 359 * @throws IOException 360 */ 361 public boolean init(String[] args) throws ParseException, IOException { 362 Options opts = new Options(); 363 opts.addOption("app_attempt_id", true, 364 "App Attempt ID. Not to be used unless for testing purposes"); 365 opts.addOption("shell_env", true, 366 "Environment for shell script. Specified as env_key=env_val pairs"); 367 opts.addOption("container_memory", true, 368 "Amount of memory in MB to be requested to run the shell command"); 369 opts.addOption("container_vcores", true, 370 "Amount of virtual cores to be requested to run the shell command"); 371 opts.addOption("num_containers", true, 372 "No. of containers on which the shell command needs to be executed"); 373 opts.addOption("priority", true, "Application Priority. Default 0"); 374 opts.addOption("debug", false, "Dump out debug information"); 375 376 opts.addOption("help", false, "Print usage"); 377 CommandLine cliParser = new GnuParser().parse(opts, args); 378 379 if (args.length == 0) { 380 printUsage(opts); 381 throw new IllegalArgumentException( 382 "No args specified for application master to initialize"); 383 } 384 385 //Check whether customer log4j.properties file exists 386 if (fileExist(log4jPath)) { 387 try { 388 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 389 log4jPath); 390 } catch (Exception e) { 391 LOG.warn("Can not set up custom log4j properties. " + e); 392 } 393 } 394 395 if (cliParser.hasOption("help")) { 396 printUsage(opts); 397 return false; 398 } 399 400 if (cliParser.hasOption("debug")) { 401 dumpOutDebugInfo(); 402 } 403 404 Map<String, String> envs = System.getenv(); 405 406 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 407 if (cliParser.hasOption("app_attempt_id")) { 408 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 409 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); 410 } else { 411 throw new IllegalArgumentException( 412 "Application Attempt Id not set in the environment"); 413 } 414 } else { 415 ContainerId containerId = ConverterUtils.toContainerId(envs 416 .get(Environment.CONTAINER_ID.name())); 417 appAttemptID = containerId.getApplicationAttemptId(); 418 } 419 420 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 421 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 422 + " not set in the environment"); 423 } 424 if (!envs.containsKey(Environment.NM_HOST.name())) { 425 throw new RuntimeException(Environment.NM_HOST.name() 426 + " not set in the environment"); 427 } 428 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 429 throw new RuntimeException(Environment.NM_HTTP_PORT 430 + " not set in the environment"); 431 } 432 if (!envs.containsKey(Environment.NM_PORT.name())) { 433 throw new RuntimeException(Environment.NM_PORT.name() 434 + " not set in the environment"); 435 } 436 437 LOG.info("Application master for app" + ", appId=" 438 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 439 + appAttemptID.getApplicationId().getClusterTimestamp() 440 + ", attemptId=" + appAttemptID.getAttemptId()); 441 442 if (!fileExist(shellCommandPath) 443 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 444 throw new IllegalArgumentException( 445 "No shell command or shell script specified to be executed by application master"); 446 } 447 448 if (fileExist(shellCommandPath)) { 449 shellCommand = readContent(shellCommandPath); 450 } 451 452 if (fileExist(shellArgsPath)) { 453 shellArgs = readContent(shellArgsPath); 454 } 455 456 if (cliParser.hasOption("shell_env")) { 457 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 458 for (String env : shellEnvs) { 459 env = env.trim(); 460 int index = env.indexOf('='); 461 if (index == -1) { 462 shellEnv.put(env, ""); 463 continue; 464 } 465 String key = env.substring(0, index); 466 String val = ""; 467 if (index < (env.length() - 1)) { 468 val = env.substring(index + 1); 469 } 470 shellEnv.put(key, val); 471 } 472 } 473 474 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 475 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 476 477 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 478 shellScriptPathTimestamp = Long.parseLong(envs 479 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 480 } 481 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 482 shellScriptPathLen = Long.parseLong(envs 483 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 484 } 485 if (!scriptPath.isEmpty() 486 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 487 LOG.error("Illegal values in env for shell script path" + ", path=" 488 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 489 + shellScriptPathTimestamp); 490 throw new IllegalArgumentException( 491 "Illegal values in env for shell script path"); 492 } 493 } 494 495 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { 496 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); 497 } 498 499 containerMemory = Integer.parseInt(cliParser.getOptionValue( 500 "container_memory", "10")); 501 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 502 "container_vcores", "1")); 503 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 504 "num_containers", "1")); 505 if (numTotalContainers == 0) { 506 throw new IllegalArgumentException( 507 "Cannot run distributed shell with no containers"); 508 } 509 requestPriority = Integer.parseInt(cliParser 510 .getOptionValue("priority", "0")); 511 512 // Creating the Timeline Client 513 timelineClient = TimelineClient.createTimelineClient(); 514 timelineClient.init(conf); 515 timelineClient.start(); 516 517 return true; 518 } 519 520 /** 521 * Helper function to print usage 522 * 523 * @param opts Parsed command line options 524 */ 525 private void printUsage(Options opts) { 526 new HelpFormatter().printHelp("ApplicationMaster", opts); 527 } 528 529 /** 530 * Main run function for the application master 531 * 532 * @throws YarnException 533 * @throws IOException 534 */ 535 @SuppressWarnings({ "unchecked" }) 536 public void run() throws YarnException, IOException { 537 LOG.info("Starting ApplicationMaster"); 538 539 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class 540 // are marked as LimitedPrivate 541 Credentials credentials = 542 UserGroupInformation.getCurrentUser().getCredentials(); 543 DataOutputBuffer dob = new DataOutputBuffer(); 544 credentials.writeTokenStorageToStream(dob); 545 // Now remove the AM->RM token so that containers cannot access it. 546 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 547 LOG.info("Executing with tokens:"); 548 while (iter.hasNext()) { 549 Token<?> token = iter.next(); 550 LOG.info(token); 551 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 552 iter.remove(); 553 } 554 } 555 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 556 557 // Create appSubmitterUgi and add original tokens to it 558 String appSubmitterUserName = 559 System.getenv(ApplicationConstants.Environment.USER.name()); 560 appSubmitterUgi = 561 UserGroupInformation.createRemoteUser(appSubmitterUserName); 562 appSubmitterUgi.addCredentials(credentials); 563 564 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 565 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); 566 567 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); 568 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 569 amRMClient.init(conf); 570 amRMClient.start(); 571 572 containerListener = createNMCallbackHandler(); 573 nmClientAsync = new NMClientAsyncImpl(containerListener); 574 nmClientAsync.init(conf); 575 nmClientAsync.start(); 576 577 // Setup local RPC Server to accept status requests directly from clients 578 // TODO need to setup a protocol for client to be able to communicate to 579 // the RPC server 580 // TODO use the rpc port info to register with the RM for the client to 581 // send requests to this app master 582 583 // Register self with ResourceManager 584 // This will start heartbeating to the RM 585 appMasterHostname = NetUtils.getHostname(); 586 RegisterApplicationMasterResponse response = amRMClient 587 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 588 appMasterTrackingUrl); 589 // Dump out information about cluster capability as seen by the 590 // resource manager 591 int maxMem = response.getMaximumResourceCapability().getMemory(); 592 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 593 594 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 595 LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores); 596 597 // A resource ask cannot exceed the max. 598 if (containerMemory > maxMem) { 599 LOG.info("Container memory specified above max threshold of cluster." 600 + " Using max value." + ", specified=" + containerMemory + ", max=" 601 + maxMem); 602 containerMemory = maxMem; 603 } 604 605 if (containerVirtualCores > maxVCores) { 606 LOG.info("Container virtual cores specified above max threshold of cluster." 607 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 608 + maxVCores); 609 containerVirtualCores = maxVCores; 610 } 611 612 List<Container> previousAMRunningContainers = 613 response.getContainersFromPreviousAttempts(); 614 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() 615 + " previous attempts' running containers on AM registration."); 616 for(Container container: previousAMRunningContainers) { 617 launchedContainers.add(container.getId()); 618 } 619 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 620 621 622 int numTotalContainersToRequest = 623 numTotalContainers - previousAMRunningContainers.size(); 624 // Setup ask for containers from RM 625 // Send request for containers to RM 626 // Until we get our fully allocated quota, we keep on polling RM for 627 // containers 628 // Keep looping until all the containers are launched and shell script 629 // executed on them ( regardless of success/failure). 630 for (int i = 0; i < numTotalContainersToRequest; ++i) { 631 ContainerRequest containerAsk = setupContainerAskForRM(); 632 amRMClient.addContainerRequest(containerAsk); 633 } 634 numRequestedContainers.set(numTotalContainers); 635 636 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 637 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); 638 } 639 640 @VisibleForTesting 641 NMCallbackHandler createNMCallbackHandler() { 642 return new NMCallbackHandler(this); 643 } 644 645 @VisibleForTesting 646 protected boolean finish() { 647 // wait for completion. 648 while (!done 649 && (numCompletedContainers.get() != numTotalContainers)) { 650 try { 651 Thread.sleep(200); 652 } catch (InterruptedException ex) {} 653 } 654 655 // Join all launched threads 656 // needed for when we time out 657 // and we need to release containers 658 for (Thread launchThread : launchThreads) { 659 try { 660 launchThread.join(10000); 661 } catch (InterruptedException e) { 662 LOG.info("Exception thrown in thread join: " + e.getMessage()); 663 e.printStackTrace(); 664 } 665 } 666 667 // When the application completes, it should stop all running containers 668 LOG.info("Application completed. Stopping running containers"); 669 nmClientAsync.stop(); 670 671 // When the application completes, it should send a finish application 672 // signal to the RM 673 LOG.info("Application completed. Signalling finish to RM"); 674 675 FinalApplicationStatus appStatus; 676 String appMessage = null; 677 boolean success = true; 678 if (numFailedContainers.get() == 0 && 679 numCompletedContainers.get() == numTotalContainers) { 680 appStatus = FinalApplicationStatus.SUCCEEDED; 681 } else { 682 appStatus = FinalApplicationStatus.FAILED; 683 appMessage = "Diagnostics." + ", total=" + numTotalContainers 684 + ", completed=" + numCompletedContainers.get() + ", allocated=" 685 + numAllocatedContainers.get() + ", failed=" 686 + numFailedContainers.get(); 687 LOG.info(appMessage); 688 success = false; 689 } 690 try { 691 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 692 } catch (YarnException ex) { 693 LOG.error("Failed to unregister application", ex); 694 } catch (IOException e) { 695 LOG.error("Failed to unregister application", e); 696 } 697 698 amRMClient.stop(); 699 700 return success; 701 } 702 703 @VisibleForTesting 704 class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { 705 @SuppressWarnings("unchecked") 706 @Override 707 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 708 LOG.info("Got response from RM for container ask, completedCnt=" 709 + completedContainers.size()); 710 for (ContainerStatus containerStatus : completedContainers) { 711 LOG.info(appAttemptID + " got container status for containerID=" 712 + containerStatus.getContainerId() + ", state=" 713 + containerStatus.getState() + ", exitStatus=" 714 + containerStatus.getExitStatus() + ", diagnostics=" 715 + containerStatus.getDiagnostics()); 716 717 // non complete containers should not be here 718 assert (containerStatus.getState() == ContainerState.COMPLETE); 719 // ignore containers we know nothing about - probably from a previous 720 // attempt 721 if (!launchedContainers.contains(containerStatus.getContainerId())) { 722 LOG.info("Ignoring completed status of " 723 + containerStatus.getContainerId() 724 + "; unknown container(probably launched by previous attempt)"); 725 continue; 726 } 727 728 // increment counters for completed/failed containers 729 int exitStatus = containerStatus.getExitStatus(); 730 if (0 != exitStatus) { 731 // container failed 732 if (ContainerExitStatus.ABORTED != exitStatus) { 733 // shell script failed 734 // counts as completed 735 numCompletedContainers.incrementAndGet(); 736 numFailedContainers.incrementAndGet(); 737 } else { 738 // container was killed by framework, possibly preempted 739 // we should re-try as the container was lost for some reason 740 numAllocatedContainers.decrementAndGet(); 741 numRequestedContainers.decrementAndGet(); 742 // we do not need to release the container as it would be done 743 // by the RM 744 } 745 } else { 746 // nothing to do 747 // container completed successfully 748 numCompletedContainers.incrementAndGet(); 749 LOG.info("Container completed successfully." + ", containerId=" 750 + containerStatus.getContainerId()); 751 } 752 publishContainerEndEvent( 753 timelineClient, containerStatus, domainId, appSubmitterUgi); 754 } 755 756 // ask for more containers if any failed 757 int askCount = numTotalContainers - numRequestedContainers.get(); 758 numRequestedContainers.addAndGet(askCount); 759 760 if (askCount > 0) { 761 for (int i = 0; i < askCount; ++i) { 762 ContainerRequest containerAsk = setupContainerAskForRM(); 763 amRMClient.addContainerRequest(containerAsk); 764 } 765 } 766 767 if (numCompletedContainers.get() == numTotalContainers) { 768 done = true; 769 } 770 } 771 772 @Override 773 public void onContainersAllocated(List<Container> allocatedContainers) { 774 LOG.info("Got response from RM for container ask, allocatedCnt=" 775 + allocatedContainers.size()); 776 numAllocatedContainers.addAndGet(allocatedContainers.size()); 777 for (Container allocatedContainer : allocatedContainers) { 778 String yarnShellId = Integer.toString(yarnShellIdCounter); 779 yarnShellIdCounter++; 780 LOG.info("Launching shell command on a new container." 781 + ", containerId=" + allocatedContainer.getId() 782 + ", yarnShellId=" + yarnShellId 783 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 784 + ":" + allocatedContainer.getNodeId().getPort() 785 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 786 + ", containerResourceMemory" 787 + allocatedContainer.getResource().getMemory() 788 + ", containerResourceVirtualCores" 789 + allocatedContainer.getResource().getVirtualCores()); 790 // + ", containerToken" 791 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 792 793 Thread launchThread = createLaunchContainerThread(allocatedContainer, 794 yarnShellId); 795 796 // launch and start the container on a separate thread to keep 797 // the main thread unblocked 798 // as all containers may not be allocated at one go. 799 launchThreads.add(launchThread); 800 launchedContainers.add(allocatedContainer.getId()); 801 launchThread.start(); 802 } 803 } 804 805 @Override 806 public void onShutdownRequest() { 807 done = true; 808 } 809 810 @Override 811 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 812 813 @Override 814 public float getProgress() { 815 // set progress to deliver to RM on next heartbeat 816 float progress = (float) numCompletedContainers.get() 817 / numTotalContainers; 818 return progress; 819 } 820 821 @Override 822 public void onError(Throwable e) { 823 done = true; 824 amRMClient.stop(); 825 } 826 } 827 828 @VisibleForTesting 829 static class NMCallbackHandler 830 implements NMClientAsync.CallbackHandler { 831 832 private ConcurrentMap<ContainerId, Container> containers = 833 new ConcurrentHashMap<ContainerId, Container>(); 834 private final ApplicationMaster applicationMaster; 835 836 public NMCallbackHandler(ApplicationMaster applicationMaster) { 837 this.applicationMaster = applicationMaster; 838 } 839 840 public void addContainer(ContainerId containerId, Container container) { 841 containers.putIfAbsent(containerId, container); 842 } 843 844 @Override 845 public void onContainerStopped(ContainerId containerId) { 846 if (LOG.isDebugEnabled()) { 847 LOG.debug("Succeeded to stop Container " + containerId); 848 } 849 containers.remove(containerId); 850 } 851 852 @Override 853 public void onContainerStatusReceived(ContainerId containerId, 854 ContainerStatus containerStatus) { 855 if (LOG.isDebugEnabled()) { 856 LOG.debug("Container Status: id=" + containerId + ", status=" + 857 containerStatus); 858 } 859 } 860 861 @Override 862 public void onContainerStarted(ContainerId containerId, 863 Map<String, ByteBuffer> allServiceResponse) { 864 if (LOG.isDebugEnabled()) { 865 LOG.debug("Succeeded to start Container " + containerId); 866 } 867 Container container = containers.get(containerId); 868 if (container != null) { 869 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); 870 } 871 ApplicationMaster.publishContainerStartEvent( 872 applicationMaster.timelineClient, container, 873 applicationMaster.domainId, applicationMaster.appSubmitterUgi); 874 } 875 876 @Override 877 public void onStartContainerError(ContainerId containerId, Throwable t) { 878 LOG.error("Failed to start Container " + containerId); 879 containers.remove(containerId); 880 applicationMaster.numCompletedContainers.incrementAndGet(); 881 applicationMaster.numFailedContainers.incrementAndGet(); 882 } 883 884 @Override 885 public void onGetContainerStatusError( 886 ContainerId containerId, Throwable t) { 887 LOG.error("Failed to query the status of Container " + containerId); 888 } 889 890 @Override 891 public void onStopContainerError(ContainerId containerId, Throwable t) { 892 LOG.error("Failed to stop Container " + containerId); 893 containers.remove(containerId); 894 } 895 } 896 897 /** 898 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 899 * that will execute the shell command. 900 */ 901 private class LaunchContainerRunnable implements Runnable { 902 903 // Allocated container 904 private Container container; 905 private String shellId; 906 907 NMCallbackHandler containerListener; 908 909 /** 910 * @param lcontainer Allocated container 911 * @param containerListener Callback handler of the container 912 */ 913 public LaunchContainerRunnable(Container lcontainer, 914 NMCallbackHandler containerListener, String shellId) { 915 this.container = lcontainer; 916 this.containerListener = containerListener; 917 this.shellId = shellId; 918 } 919 920 @Override 921 /** 922 * Connects to CM, sets up container launch context 923 * for shell command and eventually dispatches the container 924 * start request to the CM. 925 */ 926 public void run() { 927 LOG.info("Setting up container launch container for containerid=" 928 + container.getId() + " with shellid=" + shellId); 929 930 // Set the local resources 931 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 932 933 // The container for the eventual shell commands needs its own local 934 // resources too. 935 // In this scenario, if a shell script is specified, we need to have it 936 // copied and made available to the container. 937 if (!scriptPath.isEmpty()) { 938 Path renamedScriptPath = null; 939 if (Shell.WINDOWS) { 940 renamedScriptPath = new Path(scriptPath + ".bat"); 941 } else { 942 renamedScriptPath = new Path(scriptPath + ".sh"); 943 } 944 945 try { 946 // rename the script file based on the underlying OS syntax. 947 renameScriptFile(renamedScriptPath); 948 } catch (Exception e) { 949 LOG.error( 950 "Not able to add suffix (.bat/.sh) to the shell script filename", 951 e); 952 // We know we cannot continue launching the container 953 // so we should release it. 954 numCompletedContainers.incrementAndGet(); 955 numFailedContainers.incrementAndGet(); 956 return; 957 } 958 959 URL yarnUrl = null; 960 try { 961 yarnUrl = ConverterUtils.getYarnUrlFromURI( 962 new URI(renamedScriptPath.toString())); 963 } catch (URISyntaxException e) { 964 LOG.error("Error when trying to use shell script path specified" 965 + " in env, path=" + renamedScriptPath, e); 966 // A failure scenario on bad input such as invalid shell script path 967 // We know we cannot continue launching the container 968 // so we should release it. 969 // TODO 970 numCompletedContainers.incrementAndGet(); 971 numFailedContainers.incrementAndGet(); 972 return; 973 } 974 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, 975 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 976 shellScriptPathLen, shellScriptPathTimestamp); 977 localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH : 978 EXEC_SHELL_STRING_PATH, shellRsrc); 979 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 980 } 981 982 // Set the necessary command to execute on the allocated container 983 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 984 985 // Set executable command 986 vargs.add(shellCommand); 987 // Set shell script path 988 if (!scriptPath.isEmpty()) { 989 vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH 990 : EXEC_SHELL_STRING_PATH); 991 } 992 993 // Set args for the shell command if any 994 vargs.add(shellArgs); 995 // Add log redirect params 996 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 997 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 998 999 // Get final commmand 1000 StringBuilder command = new StringBuilder(); 1001 for (CharSequence str : vargs) { 1002 command.append(str).append(" "); 1003 } 1004 1005 List<String> commands = new ArrayList<String>(); 1006 commands.add(command.toString()); 1007 1008 // Set up ContainerLaunchContext, setting local resource, environment, 1009 // command and token for constructor. 1010 1011 // Note for tokens: Set up tokens for the container too. Today, for normal 1012 // shell commands, the container in distribute-shell doesn't need any 1013 // tokens. We are populating them mainly for NodeManagers to be able to 1014 // download anyfiles in the distributed file-system. The tokens are 1015 // otherwise also useful in cases, for e.g., when one is running a 1016 // "hadoop dfs" command inside the distributed shell. 1017 Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv); 1018 myShellEnv.put(YARN_SHELL_ID, shellId); 1019 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( 1020 localResources, myShellEnv, commands, null, allTokens.duplicate(), 1021 null); 1022 containerListener.addContainer(container.getId(), container); 1023 nmClientAsync.startContainerAsync(container, ctx); 1024 } 1025 } 1026 1027 private void renameScriptFile(final Path renamedScriptPath) 1028 throws IOException, InterruptedException { 1029 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 1030 @Override 1031 public Void run() throws IOException { 1032 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1033 fs.rename(new Path(scriptPath), renamedScriptPath); 1034 return null; 1035 } 1036 }); 1037 LOG.info("User " + appSubmitterUgi.getUserName() 1038 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1039 } 1040 1041 /** 1042 * Setup the request that will be sent to the RM for the container ask. 1043 * 1044 * @return the setup ResourceRequest to be sent to RM 1045 */ 1046 private ContainerRequest setupContainerAskForRM() { 1047 // setup requirements for hosts 1048 // using * as any host will do for the distributed shell app 1049 // set the priority for the request 1050 // TODO - what is the range for priority? how to decide? 1051 Priority pri = Priority.newInstance(requestPriority); 1052 1053 // Set up resource type requirements 1054 // For now, memory and CPU are supported so we set memory and cpu requirements 1055 Resource capability = Resource.newInstance(containerMemory, 1056 containerVirtualCores); 1057 1058 ContainerRequest request = new ContainerRequest(capability, null, null, 1059 pri); 1060 LOG.info("Requested container ask: " + request.toString()); 1061 return request; 1062 } 1063 1064 private boolean fileExist(String filePath) { 1065 return new File(filePath).exists(); 1066 } 1067 1068 private String readContent(String filePath) throws IOException { 1069 DataInputStream ds = null; 1070 try { 1071 ds = new DataInputStream(new FileInputStream(filePath)); 1072 return ds.readUTF(); 1073 } finally { 1074 org.apache.commons.io.IOUtils.closeQuietly(ds); 1075 } 1076 } 1077 1078 private static void publishContainerStartEvent( 1079 final TimelineClient timelineClient, Container container, String domainId, 1080 UserGroupInformation ugi) { 1081 final TimelineEntity entity = new TimelineEntity(); 1082 entity.setEntityId(container.getId().toString()); 1083 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1084 entity.setDomainId(domainId); 1085 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1086 TimelineEvent event = new TimelineEvent(); 1087 event.setTimestamp(System.currentTimeMillis()); 1088 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1089 event.addEventInfo("Node", container.getNodeId().toString()); 1090 event.addEventInfo("Resources", container.getResource().toString()); 1091 entity.addEvent(event); 1092 1093 try { 1094 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1095 @Override 1096 public TimelinePutResponse run() throws Exception { 1097 return timelineClient.putEntities(entity); 1098 } 1099 }); 1100 } catch (Exception e) { 1101 LOG.error("Container start event could not be published for " 1102 + container.getId().toString(), 1103 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1104 } 1105 } 1106 1107 private static void publishContainerEndEvent( 1108 final TimelineClient timelineClient, ContainerStatus container, 1109 String domainId, UserGroupInformation ugi) { 1110 final TimelineEntity entity = new TimelineEntity(); 1111 entity.setEntityId(container.getContainerId().toString()); 1112 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1113 entity.setDomainId(domainId); 1114 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1115 TimelineEvent event = new TimelineEvent(); 1116 event.setTimestamp(System.currentTimeMillis()); 1117 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1118 event.addEventInfo("State", container.getState().name()); 1119 event.addEventInfo("Exit Status", container.getExitStatus()); 1120 entity.addEvent(event); 1121 1122 try { 1123 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1124 @Override 1125 public TimelinePutResponse run() throws Exception { 1126 return timelineClient.putEntities(entity); 1127 } 1128 }); 1129 } catch (Exception e) { 1130 LOG.error("Container end event could not be published for " 1131 + container.getContainerId().toString(), 1132 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1133 } 1134 } 1135 1136 private static void publishApplicationAttemptEvent( 1137 final TimelineClient timelineClient, String appAttemptId, 1138 DSEvent appEvent, String domainId, UserGroupInformation ugi) { 1139 final TimelineEntity entity = new TimelineEntity(); 1140 entity.setEntityId(appAttemptId); 1141 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1142 entity.setDomainId(domainId); 1143 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1144 TimelineEvent event = new TimelineEvent(); 1145 event.setEventType(appEvent.toString()); 1146 event.setTimestamp(System.currentTimeMillis()); 1147 entity.addEvent(event); 1148 1149 try { 1150 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1151 @Override 1152 public TimelinePutResponse run() throws Exception { 1153 return timelineClient.putEntities(entity); 1154 } 1155 }); 1156 } catch (Exception e) { 1157 LOG.error("App Attempt " 1158 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1159 + " event could not be published for " 1160 + appAttemptId.toString(), 1161 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1162 } 1163 } 1164 1165 RMCallbackHandler getRMCallbackHandler() { 1166 return new RMCallbackHandler(); 1167 } 1168 1169 @VisibleForTesting 1170 void setAmRMClient(AMRMClientAsync client) { 1171 this.amRMClient = client; 1172 } 1173 1174 @VisibleForTesting 1175 int getNumCompletedContainers() { 1176 return numCompletedContainers.get(); 1177 } 1178 1179 @VisibleForTesting 1180 boolean getDone() { 1181 return done; 1182 } 1183 1184 @VisibleForTesting 1185 Thread createLaunchContainerThread(Container allocatedContainer, 1186 String shellId) { 1187 LaunchContainerRunnable runnableLaunchContainer = 1188 new LaunchContainerRunnable(allocatedContainer, containerListener, 1189 shellId); 1190 return new Thread(runnableLaunchContainer); 1191 } 1192}