001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapreduce.tools; 019 020import java.io.BufferedOutputStream; 021import java.io.File; 022import java.io.FileOutputStream; 023import java.io.IOException; 024import java.io.OutputStreamWriter; 025import java.io.PrintStream; 026import java.io.PrintWriter; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Set; 032 033import com.google.common.annotations.VisibleForTesting; 034import org.apache.commons.lang.StringUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.apache.hadoop.classification.InterfaceAudience; 038import org.apache.hadoop.classification.InterfaceStability; 039import org.apache.hadoop.classification.InterfaceAudience.Private; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.conf.Configured; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.ipc.RemoteException; 045import org.apache.hadoop.mapred.JobConf; 046import org.apache.hadoop.mapred.TIPStatus; 047import org.apache.hadoop.mapreduce.Cluster; 048import org.apache.hadoop.mapreduce.Counters; 049import org.apache.hadoop.mapreduce.Job; 050import org.apache.hadoop.mapreduce.JobID; 051import org.apache.hadoop.mapreduce.JobPriority; 052import org.apache.hadoop.mapreduce.JobStatus; 053import org.apache.hadoop.mapreduce.MRJobConfig; 054import org.apache.hadoop.mapreduce.TaskAttemptID; 055import org.apache.hadoop.mapreduce.TaskCompletionEvent; 056import org.apache.hadoop.mapreduce.TaskReport; 057import org.apache.hadoop.mapreduce.TaskTrackerInfo; 058import org.apache.hadoop.mapreduce.TaskType; 059import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; 060import org.apache.hadoop.mapreduce.v2.LogParams; 061import org.apache.hadoop.security.AccessControlException; 062import org.apache.hadoop.util.ExitUtil; 063import org.apache.hadoop.util.Tool; 064import org.apache.hadoop.util.ToolRunner; 065import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; 066 067import com.google.common.base.Charsets; 068 069/** 070 * Interprets the map reduce cli options 071 */ 072@InterfaceAudience.Public 073@InterfaceStability.Stable 074public class CLI extends Configured implements Tool { 075 private static final Log LOG = LogFactory.getLog(CLI.class); 076 protected Cluster cluster; 077 private static final Set<String> taskTypes = new HashSet<String>( 078 Arrays.asList("MAP", "REDUCE")); 079 private final Set<String> taskStates = new HashSet<String>(Arrays.asList( 080 "running", "completed", "pending", "failed", "killed")); 081 082 public CLI() { 083 } 084 085 public CLI(Configuration conf) { 086 setConf(conf); 087 } 088 089 public int run(String[] argv) throws Exception { 090 int exitCode = -1; 091 if (argv.length < 1) { 092 displayUsage(""); 093 return exitCode; 094 } 095 // process arguments 096 String cmd = argv[0]; 097 String submitJobFile = null; 098 String jobid = null; 099 String taskid = null; 100 String historyFileOrJobId = null; 101 String historyOutFile = null; 102 String historyOutFormat = HistoryViewer.HUMAN_FORMAT; 103 String counterGroupName = null; 104 String counterName = null; 105 JobPriority jp = null; 106 String taskType = null; 107 String taskState = null; 108 int fromEvent = 0; 109 int nEvents = 0; 110 String configOutFile = null; 111 boolean getStatus = false; 112 boolean getCounter = false; 113 boolean killJob = false; 114 boolean listEvents = false; 115 boolean viewHistory = false; 116 boolean viewAllHistory = false; 117 boolean listJobs = false; 118 boolean listAllJobs = false; 119 boolean listActiveTrackers = false; 120 boolean listBlacklistedTrackers = false; 121 boolean displayTasks = false; 122 boolean killTask = false; 123 boolean failTask = false; 124 boolean setJobPriority = false; 125 boolean logs = false; 126 boolean downloadConfig = false; 127 128 if ("-submit".equals(cmd)) { 129 if (argv.length != 2) { 130 displayUsage(cmd); 131 return exitCode; 132 } 133 submitJobFile = argv[1]; 134 } else if ("-status".equals(cmd)) { 135 if (argv.length != 2) { 136 displayUsage(cmd); 137 return exitCode; 138 } 139 jobid = argv[1]; 140 getStatus = true; 141 } else if("-counter".equals(cmd)) { 142 if (argv.length != 4) { 143 displayUsage(cmd); 144 return exitCode; 145 } 146 getCounter = true; 147 jobid = argv[1]; 148 counterGroupName = argv[2]; 149 counterName = argv[3]; 150 } else if ("-kill".equals(cmd)) { 151 if (argv.length != 2) { 152 displayUsage(cmd); 153 return exitCode; 154 } 155 jobid = argv[1]; 156 killJob = true; 157 } else if ("-set-priority".equals(cmd)) { 158 if (argv.length != 3) { 159 displayUsage(cmd); 160 return exitCode; 161 } 162 jobid = argv[1]; 163 try { 164 jp = JobPriority.valueOf(argv[2]); 165 } catch (IllegalArgumentException iae) { 166 LOG.info(iae); 167 displayUsage(cmd); 168 return exitCode; 169 } 170 setJobPriority = true; 171 } else if ("-events".equals(cmd)) { 172 if (argv.length != 4) { 173 displayUsage(cmd); 174 return exitCode; 175 } 176 jobid = argv[1]; 177 fromEvent = Integer.parseInt(argv[2]); 178 nEvents = Integer.parseInt(argv[3]); 179 listEvents = true; 180 } else if ("-history".equals(cmd)) { 181 viewHistory = true; 182 if (argv.length < 2 || argv.length > 7) { 183 displayUsage(cmd); 184 return exitCode; 185 } 186 187 // Some arguments are optional while others are not, and some require 188 // second arguments. Due to this, the indexing can vary depending on 189 // what's specified and what's left out, as summarized in the below table: 190 // [all] <jobHistoryFile|jobId> [-outfile <file>] [-format <human|json>] 191 // 1 2 3 4 5 6 192 // 1 2 3 4 193 // 1 2 3 4 194 // 1 2 195 // 1 2 3 4 5 196 // 1 2 3 197 // 1 2 3 198 // 1 199 200 // "all" is optional, but comes first if specified 201 int index = 1; 202 if ("all".equals(argv[index])) { 203 index++; 204 viewAllHistory = true; 205 if (argv.length == 2) { 206 displayUsage(cmd); 207 return exitCode; 208 } 209 } 210 // Get the job history file or job id argument 211 historyFileOrJobId = argv[index++]; 212 // "-outfile" is optional, but if specified requires a second argument 213 if (argv.length > index + 1 && "-outfile".equals(argv[index])) { 214 index++; 215 historyOutFile = argv[index++]; 216 } 217 // "-format" is optional, but if specified required a second argument 218 if (argv.length > index + 1 && "-format".equals(argv[index])) { 219 index++; 220 historyOutFormat = argv[index++]; 221 } 222 // Check for any extra arguments that don't belong here 223 if (argv.length > index) { 224 displayUsage(cmd); 225 return exitCode; 226 } 227 } else if ("-list".equals(cmd)) { 228 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) { 229 displayUsage(cmd); 230 return exitCode; 231 } 232 if (argv.length == 2 && "all".equals(argv[1])) { 233 listAllJobs = true; 234 } else { 235 listJobs = true; 236 } 237 } else if("-kill-task".equals(cmd)) { 238 if (argv.length != 2) { 239 displayUsage(cmd); 240 return exitCode; 241 } 242 killTask = true; 243 taskid = argv[1]; 244 } else if("-fail-task".equals(cmd)) { 245 if (argv.length != 2) { 246 displayUsage(cmd); 247 return exitCode; 248 } 249 failTask = true; 250 taskid = argv[1]; 251 } else if ("-list-active-trackers".equals(cmd)) { 252 if (argv.length != 1) { 253 displayUsage(cmd); 254 return exitCode; 255 } 256 listActiveTrackers = true; 257 } else if ("-list-blacklisted-trackers".equals(cmd)) { 258 if (argv.length != 1) { 259 displayUsage(cmd); 260 return exitCode; 261 } 262 listBlacklistedTrackers = true; 263 } else if ("-list-attempt-ids".equals(cmd)) { 264 if (argv.length != 4) { 265 displayUsage(cmd); 266 return exitCode; 267 } 268 jobid = argv[1]; 269 taskType = argv[2]; 270 taskState = argv[3]; 271 displayTasks = true; 272 if (!taskTypes.contains(taskType.toUpperCase())) { 273 System.out.println("Error: Invalid task-type: " + taskType); 274 displayUsage(cmd); 275 return exitCode; 276 } 277 if (!taskStates.contains(taskState.toLowerCase())) { 278 System.out.println("Error: Invalid task-state: " + taskState); 279 displayUsage(cmd); 280 return exitCode; 281 } 282 } else if ("-logs".equals(cmd)) { 283 if (argv.length == 2 || argv.length ==3) { 284 logs = true; 285 jobid = argv[1]; 286 if (argv.length == 3) { 287 taskid = argv[2]; 288 } else { 289 taskid = null; 290 } 291 } else { 292 displayUsage(cmd); 293 return exitCode; 294 } 295 } else if ("-config".equals(cmd)) { 296 downloadConfig = true; 297 if (argv.length != 3) { 298 displayUsage(cmd); 299 return exitCode; 300 } 301 jobid = argv[1]; 302 configOutFile = argv[2]; 303 } else { 304 displayUsage(cmd); 305 return exitCode; 306 } 307 308 // initialize cluster 309 cluster = createCluster(); 310 311 // Submit the request 312 try { 313 if (submitJobFile != null) { 314 Job job = Job.getInstance(new JobConf(submitJobFile)); 315 job.submit(); 316 System.out.println("Created job " + job.getJobID()); 317 exitCode = 0; 318 } else if (getStatus) { 319 Job job = getJob(JobID.forName(jobid)); 320 if (job == null) { 321 System.out.println("Could not find job " + jobid); 322 } else { 323 Counters counters = job.getCounters(); 324 System.out.println(); 325 System.out.println(job); 326 if (counters != null) { 327 System.out.println(counters); 328 } else { 329 System.out.println("Counters not available. Job is retired."); 330 } 331 exitCode = 0; 332 } 333 } else if (getCounter) { 334 Job job = getJob(JobID.forName(jobid)); 335 if (job == null) { 336 System.out.println("Could not find job " + jobid); 337 } else { 338 Counters counters = job.getCounters(); 339 if (counters == null) { 340 System.out.println("Counters not available for retired job " + 341 jobid); 342 exitCode = -1; 343 } else { 344 System.out.println(getCounter(counters, 345 counterGroupName, counterName)); 346 exitCode = 0; 347 } 348 } 349 } else if (killJob) { 350 Job job = getJob(JobID.forName(jobid)); 351 if (job == null) { 352 System.out.println("Could not find job " + jobid); 353 } else { 354 job.killJob(); 355 System.out.println("Killed job " + jobid); 356 exitCode = 0; 357 } 358 } else if (setJobPriority) { 359 Job job = getJob(JobID.forName(jobid)); 360 if (job == null) { 361 System.out.println("Could not find job " + jobid); 362 } else { 363 job.setPriority(jp); 364 System.out.println("Changed job priority."); 365 exitCode = 0; 366 } 367 } else if (viewHistory) { 368 // If it ends with .jhist, assume it's a jhist file; otherwise, assume 369 // it's a Job ID 370 if (historyFileOrJobId.endsWith(".jhist")) { 371 viewHistory(historyFileOrJobId, viewAllHistory, historyOutFile, 372 historyOutFormat); 373 exitCode = 0; 374 } else { 375 Job job = getJob(JobID.forName(historyFileOrJobId)); 376 if (job == null) { 377 System.out.println("Could not find job " + jobid); 378 } else { 379 String historyUrl = job.getHistoryUrl(); 380 if (historyUrl == null || historyUrl.isEmpty()) { 381 System.out.println("History file for job " + historyFileOrJobId + 382 " is currently unavailable."); 383 } else { 384 viewHistory(historyUrl, viewAllHistory, historyOutFile, 385 historyOutFormat); 386 exitCode = 0; 387 } 388 } 389 } 390 } else if (listEvents) { 391 listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents); 392 exitCode = 0; 393 } else if (listJobs) { 394 listJobs(cluster); 395 exitCode = 0; 396 } else if (listAllJobs) { 397 listAllJobs(cluster); 398 exitCode = 0; 399 } else if (listActiveTrackers) { 400 listActiveTrackers(cluster); 401 exitCode = 0; 402 } else if (listBlacklistedTrackers) { 403 listBlacklistedTrackers(cluster); 404 exitCode = 0; 405 } else if (displayTasks) { 406 displayTasks(getJob(JobID.forName(jobid)), taskType, taskState); 407 exitCode = 0; 408 } else if(killTask) { 409 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 410 Job job = getJob(taskID.getJobID()); 411 if (job == null) { 412 System.out.println("Could not find job " + jobid); 413 } else if (job.killTask(taskID, false)) { 414 System.out.println("Killed task " + taskid); 415 exitCode = 0; 416 } else { 417 System.out.println("Could not kill task " + taskid); 418 exitCode = -1; 419 } 420 } else if(failTask) { 421 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 422 Job job = getJob(taskID.getJobID()); 423 if (job == null) { 424 System.out.println("Could not find job " + jobid); 425 } else if(job.killTask(taskID, true)) { 426 System.out.println("Killed task " + taskID + " by failing it"); 427 exitCode = 0; 428 } else { 429 System.out.println("Could not fail task " + taskid); 430 exitCode = -1; 431 } 432 } else if (logs) { 433 try { 434 JobID jobID = JobID.forName(jobid); 435 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid); 436 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID); 437 LogCLIHelpers logDumper = new LogCLIHelpers(); 438 logDumper.setConf(getConf()); 439 exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(), 440 logParams.getContainerId(), logParams.getNodeId(), 441 logParams.getOwner()); 442 } catch (IOException e) { 443 if (e instanceof RemoteException) { 444 throw e; 445 } 446 System.out.println(e.getMessage()); 447 } 448 } else if (downloadConfig) { 449 Job job = getJob(JobID.forName(jobid)); 450 if (job == null) { 451 System.out.println("Could not find job " + jobid); 452 } else { 453 String jobFile = job.getJobFile(); 454 if (jobFile == null || jobFile.isEmpty()) { 455 System.out.println("Config file for job " + jobFile + 456 " could not be found."); 457 } else { 458 Path configPath = new Path(jobFile); 459 FileSystem fs = FileSystem.get(getConf()); 460 fs.copyToLocalFile(configPath, new Path(configOutFile)); 461 exitCode = 0; 462 } 463 } 464 } 465 } catch (RemoteException re) { 466 IOException unwrappedException = re.unwrapRemoteException(); 467 if (unwrappedException instanceof AccessControlException) { 468 System.out.println(unwrappedException.getMessage()); 469 } else { 470 throw re; 471 } 472 } finally { 473 cluster.close(); 474 } 475 return exitCode; 476 } 477 478 Cluster createCluster() throws IOException { 479 return new Cluster(getConf()); 480 } 481 482 private String getJobPriorityNames() { 483 StringBuffer sb = new StringBuffer(); 484 for (JobPriority p : JobPriority.values()) { 485 sb.append(p.name()).append(" "); 486 } 487 return sb.substring(0, sb.length()-1); 488 } 489 490 private String getTaskTypes() { 491 return StringUtils.join(taskTypes, " "); 492 } 493 494 /** 495 * Display usage of the command-line tool and terminate execution. 496 */ 497 private void displayUsage(String cmd) { 498 String prefix = "Usage: CLI "; 499 String jobPriorityValues = getJobPriorityNames(); 500 String taskStates = "running, completed"; 501 502 if ("-submit".equals(cmd)) { 503 System.err.println(prefix + "[" + cmd + " <job-file>]"); 504 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) { 505 System.err.println(prefix + "[" + cmd + " <job-id>]"); 506 } else if ("-counter".equals(cmd)) { 507 System.err.println(prefix + "[" + cmd + 508 " <job-id> <group-name> <counter-name>]"); 509 } else if ("-events".equals(cmd)) { 510 System.err.println(prefix + "[" + cmd + 511 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1."); 512 } else if ("-history".equals(cmd)) { 513 System.err.println(prefix + "[" + cmd + " [all] <jobHistoryFile|jobId> " + 514 "[-outfile <file>] [-format <human|json>]]"); 515 } else if ("-list".equals(cmd)) { 516 System.err.println(prefix + "[" + cmd + " [all]]"); 517 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) { 518 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]"); 519 } else if ("-set-priority".equals(cmd)) { 520 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " + 521 "Valid values for priorities are: " 522 + jobPriorityValues); 523 } else if ("-list-active-trackers".equals(cmd)) { 524 System.err.println(prefix + "[" + cmd + "]"); 525 } else if ("-list-blacklisted-trackers".equals(cmd)) { 526 System.err.println(prefix + "[" + cmd + "]"); 527 } else if ("-list-attempt-ids".equals(cmd)) { 528 System.err.println(prefix + "[" + cmd + 529 " <job-id> <task-type> <task-state>]. " + 530 "Valid values for <task-type> are " + getTaskTypes() + ". " + 531 "Valid values for <task-state> are " + taskStates); 532 } else if ("-logs".equals(cmd)) { 533 System.err.println(prefix + "[" + cmd + 534 " <job-id> <task-attempt-id>]. " + 535 " <task-attempt-id> is optional to get task attempt logs."); 536 } else if ("-config".equals(cmd)) { 537 System.err.println(prefix + "[" + cmd + " <job-id> <file>]"); 538 } else { 539 System.err.printf(prefix + "<command> <args>%n"); 540 System.err.printf("\t[-submit <job-file>]%n"); 541 System.err.printf("\t[-status <job-id>]%n"); 542 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n"); 543 System.err.printf("\t[-kill <job-id>]%n"); 544 System.err.printf("\t[-set-priority <job-id> <priority>]. " + 545 "Valid values for priorities are: " + jobPriorityValues + "%n"); 546 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n"); 547 System.err.printf("\t[-history [all] <jobHistoryFile|jobId> " + 548 "[-outfile <file>] [-format <human|json>]]%n"); 549 System.err.printf("\t[-list [all]]%n"); 550 System.err.printf("\t[-list-active-trackers]%n"); 551 System.err.printf("\t[-list-blacklisted-trackers]%n"); 552 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " + 553 "<task-state>]. " + 554 "Valid values for <task-type> are " + getTaskTypes() + ". " + 555 "Valid values for <task-state> are " + taskStates); 556 System.err.printf("\t[-kill-task <task-attempt-id>]%n"); 557 System.err.printf("\t[-fail-task <task-attempt-id>]%n"); 558 System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n"); 559 System.err.printf("\t[-config <job-id> <file>%n%n"); 560 ToolRunner.printGenericCommandUsage(System.out); 561 } 562 } 563 564 private void viewHistory(String historyFile, boolean all, 565 String historyOutFile, String format) throws IOException { 566 HistoryViewer historyViewer = new HistoryViewer(historyFile, 567 getConf(), all, format); 568 PrintStream ps = System.out; 569 if (historyOutFile != null) { 570 ps = new PrintStream(new BufferedOutputStream(new FileOutputStream( 571 new File(historyOutFile))), true, "UTF-8"); 572 } 573 historyViewer.print(ps); 574 } 575 576 protected long getCounter(Counters counters, String counterGroupName, 577 String counterName) throws IOException { 578 return counters.findCounter(counterGroupName, counterName).getValue(); 579 } 580 581 /** 582 * List the events for the given job 583 * @param jobId the job id for the job's events to list 584 * @throws IOException 585 */ 586 private void listEvents(Job job, int fromEventId, int numEvents) 587 throws IOException, InterruptedException { 588 TaskCompletionEvent[] events = job. 589 getTaskCompletionEvents(fromEventId, numEvents); 590 System.out.println("Task completion events for " + job.getJobID()); 591 System.out.println("Number of events (from " + fromEventId + ") are: " 592 + events.length); 593 for(TaskCompletionEvent event: events) { 594 System.out.println(event.getStatus() + " " + 595 event.getTaskAttemptId() + " " + 596 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp())); 597 } 598 } 599 600 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 601 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 602 } 603 604 @VisibleForTesting 605 Job getJob(JobID jobid) throws IOException, InterruptedException { 606 607 int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, 608 MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES); 609 long retryInterval = getConf() 610 .getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, 611 MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL); 612 Job job = cluster.getJob(jobid); 613 614 for (int i = 0; i < maxRetry; ++i) { 615 if (job != null) { 616 return job; 617 } 618 LOG.info("Could not obtain job info after " + String.valueOf(i + 1) 619 + " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000) 620 + " seconds and retrying."); 621 Thread.sleep(retryInterval); 622 job = cluster.getJob(jobid); 623 } 624 return job; 625 } 626 627 628 /** 629 * Dump a list of currently running jobs 630 * @throws IOException 631 */ 632 private void listJobs(Cluster cluster) 633 throws IOException, InterruptedException { 634 List<JobStatus> runningJobs = new ArrayList<JobStatus>(); 635 for (JobStatus job : cluster.getAllJobStatuses()) { 636 if (!job.isJobComplete()) { 637 runningJobs.add(job); 638 } 639 } 640 displayJobList(runningJobs.toArray(new JobStatus[0])); 641 } 642 643 /** 644 * Dump a list of all jobs submitted. 645 * @throws IOException 646 */ 647 private void listAllJobs(Cluster cluster) 648 throws IOException, InterruptedException { 649 displayJobList(cluster.getAllJobStatuses()); 650 } 651 652 /** 653 * Display the list of active trackers 654 */ 655 private void listActiveTrackers(Cluster cluster) 656 throws IOException, InterruptedException { 657 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers(); 658 for (TaskTrackerInfo tracker : trackers) { 659 System.out.println(tracker.getTaskTrackerName()); 660 } 661 } 662 663 /** 664 * Display the list of blacklisted trackers 665 */ 666 private void listBlacklistedTrackers(Cluster cluster) 667 throws IOException, InterruptedException { 668 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers(); 669 if (trackers.length > 0) { 670 System.out.println("BlackListedNode \t Reason"); 671 } 672 for (TaskTrackerInfo tracker : trackers) { 673 System.out.println(tracker.getTaskTrackerName() + "\t" + 674 tracker.getReasonForBlacklist()); 675 } 676 } 677 678 private void printTaskAttempts(TaskReport report) { 679 if (report.getCurrentStatus() == TIPStatus.COMPLETE) { 680 System.out.println(report.getSuccessfulTaskAttemptId()); 681 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) { 682 for (TaskAttemptID t : 683 report.getRunningTaskAttemptIds()) { 684 System.out.println(t); 685 } 686 } 687 } 688 689 /** 690 * Display the information about a job's tasks, of a particular type and 691 * in a particular state 692 * 693 * @param job the job 694 * @param type the type of the task (map/reduce/setup/cleanup) 695 * @param state the state of the task 696 * (pending/running/completed/failed/killed) 697 */ 698 protected void displayTasks(Job job, String type, String state) 699 throws IOException, InterruptedException { 700 TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase())); 701 for (TaskReport report : reports) { 702 TIPStatus status = report.getCurrentStatus(); 703 if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) || 704 (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) || 705 (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) || 706 (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) || 707 (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) { 708 printTaskAttempts(report); 709 } 710 } 711 } 712 713 public void displayJobList(JobStatus[] jobs) 714 throws IOException, InterruptedException { 715 displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out, 716 Charsets.UTF_8))); 717 } 718 719 @Private 720 public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 721 @Private 722 public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 723 private static String memPattern = "%dM"; 724 private static String UNAVAILABLE = "N/A"; 725 726 @Private 727 public void displayJobList(JobStatus[] jobs, PrintWriter writer) { 728 writer.println("Total jobs:" + jobs.length); 729 writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName", 730 "Queue", "Priority", "UsedContainers", 731 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info"); 732 for (JobStatus job : jobs) { 733 int numUsedSlots = job.getNumUsedSlots(); 734 int numReservedSlots = job.getNumReservedSlots(); 735 int usedMem = job.getUsedMem(); 736 int rsvdMem = job.getReservedMem(); 737 int neededMem = job.getNeededMem(); 738 writer.printf(dataPattern, 739 job.getJobID().toString(), job.getState(), job.getStartTime(), 740 job.getUsername(), job.getQueue(), 741 job.getPriority().name(), 742 numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots, 743 numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots, 744 usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem), 745 rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem), 746 neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem), 747 job.getSchedulingInfo()); 748 } 749 writer.flush(); 750 } 751 752 public static void main(String[] argv) throws Exception { 753 int res = ToolRunner.run(new CLI(), argv); 754 ExitUtil.terminate(res); 755 } 756}