001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.tools;
019
020import java.io.BufferedOutputStream;
021import java.io.File;
022import java.io.FileOutputStream;
023import java.io.IOException;
024import java.io.OutputStreamWriter;
025import java.io.PrintStream;
026import java.io.PrintWriter;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Set;
032
033import com.google.common.annotations.VisibleForTesting;
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.apache.hadoop.classification.InterfaceAudience;
038import org.apache.hadoop.classification.InterfaceStability;
039import org.apache.hadoop.classification.InterfaceAudience.Private;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.conf.Configured;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.ipc.RemoteException;
045import org.apache.hadoop.mapred.JobConf;
046import org.apache.hadoop.mapred.TIPStatus;
047import org.apache.hadoop.mapreduce.Cluster;
048import org.apache.hadoop.mapreduce.Counters;
049import org.apache.hadoop.mapreduce.Job;
050import org.apache.hadoop.mapreduce.JobID;
051import org.apache.hadoop.mapreduce.JobPriority;
052import org.apache.hadoop.mapreduce.JobStatus;
053import org.apache.hadoop.mapreduce.MRJobConfig;
054import org.apache.hadoop.mapreduce.TaskAttemptID;
055import org.apache.hadoop.mapreduce.TaskCompletionEvent;
056import org.apache.hadoop.mapreduce.TaskReport;
057import org.apache.hadoop.mapreduce.TaskTrackerInfo;
058import org.apache.hadoop.mapreduce.TaskType;
059import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
060import org.apache.hadoop.mapreduce.v2.LogParams;
061import org.apache.hadoop.security.AccessControlException;
062import org.apache.hadoop.util.ExitUtil;
063import org.apache.hadoop.util.Tool;
064import org.apache.hadoop.util.ToolRunner;
065import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
066
067import com.google.common.base.Charsets;
068
069/**
070 * Interprets the map reduce cli options 
071 */
072@InterfaceAudience.Public
073@InterfaceStability.Stable
074public class CLI extends Configured implements Tool {
075  private static final Log LOG = LogFactory.getLog(CLI.class);
076  protected Cluster cluster;
077  private static final Set<String> taskTypes = new HashSet<String>(
078      Arrays.asList("MAP", "REDUCE"));
079  private final Set<String> taskStates = new HashSet<String>(Arrays.asList(
080      "running", "completed", "pending", "failed", "killed"));
081 
082  public CLI() {
083  }
084  
085  public CLI(Configuration conf) {
086    setConf(conf);
087  }
088  
089  public int run(String[] argv) throws Exception {
090    int exitCode = -1;
091    if (argv.length < 1) {
092      displayUsage("");
093      return exitCode;
094    }    
095    // process arguments
096    String cmd = argv[0];
097    String submitJobFile = null;
098    String jobid = null;
099    String taskid = null;
100    String historyFileOrJobId = null;
101    String historyOutFile = null;
102    String historyOutFormat = HistoryViewer.HUMAN_FORMAT;
103    String counterGroupName = null;
104    String counterName = null;
105    JobPriority jp = null;
106    String taskType = null;
107    String taskState = null;
108    int fromEvent = 0;
109    int nEvents = 0;
110    String configOutFile = null;
111    boolean getStatus = false;
112    boolean getCounter = false;
113    boolean killJob = false;
114    boolean listEvents = false;
115    boolean viewHistory = false;
116    boolean viewAllHistory = false;
117    boolean listJobs = false;
118    boolean listAllJobs = false;
119    boolean listActiveTrackers = false;
120    boolean listBlacklistedTrackers = false;
121    boolean displayTasks = false;
122    boolean killTask = false;
123    boolean failTask = false;
124    boolean setJobPriority = false;
125    boolean logs = false;
126    boolean downloadConfig = false;
127
128    if ("-submit".equals(cmd)) {
129      if (argv.length != 2) {
130        displayUsage(cmd);
131        return exitCode;
132      }
133      submitJobFile = argv[1];
134    } else if ("-status".equals(cmd)) {
135      if (argv.length != 2) {
136        displayUsage(cmd);
137        return exitCode;
138      }
139      jobid = argv[1];
140      getStatus = true;
141    } else if("-counter".equals(cmd)) {
142      if (argv.length != 4) {
143        displayUsage(cmd);
144        return exitCode;
145      }
146      getCounter = true;
147      jobid = argv[1];
148      counterGroupName = argv[2];
149      counterName = argv[3];
150    } else if ("-kill".equals(cmd)) {
151      if (argv.length != 2) {
152        displayUsage(cmd);
153        return exitCode;
154      }
155      jobid = argv[1];
156      killJob = true;
157    } else if ("-set-priority".equals(cmd)) {
158      if (argv.length != 3) {
159        displayUsage(cmd);
160        return exitCode;
161      }
162      jobid = argv[1];
163      try {
164        jp = JobPriority.valueOf(argv[2]); 
165      } catch (IllegalArgumentException iae) {
166        LOG.info(iae);
167        displayUsage(cmd);
168        return exitCode;
169      }
170      setJobPriority = true; 
171    } else if ("-events".equals(cmd)) {
172      if (argv.length != 4) {
173        displayUsage(cmd);
174        return exitCode;
175      }
176      jobid = argv[1];
177      fromEvent = Integer.parseInt(argv[2]);
178      nEvents = Integer.parseInt(argv[3]);
179      listEvents = true;
180    } else if ("-history".equals(cmd)) {
181      viewHistory = true;
182      if (argv.length < 2 || argv.length > 7) {
183        displayUsage(cmd);
184        return exitCode;
185      }
186
187      // Some arguments are optional while others are not, and some require
188      // second arguments.  Due to this, the indexing can vary depending on
189      // what's specified and what's left out, as summarized in the below table:
190      // [all] <jobHistoryFile|jobId> [-outfile <file>] [-format <human|json>]
191      //   1                  2            3       4         5         6
192      //   1                  2            3       4
193      //   1                  2                              3         4
194      //   1                  2
195      //                      1            2       3         4         5
196      //                      1            2       3
197      //                      1                              2         3
198      //                      1
199
200      // "all" is optional, but comes first if specified
201      int index = 1;
202      if ("all".equals(argv[index])) {
203        index++;
204        viewAllHistory = true;
205        if (argv.length == 2) {
206          displayUsage(cmd);
207          return exitCode;
208        }
209      }
210      // Get the job history file or job id argument
211      historyFileOrJobId = argv[index++];
212      // "-outfile" is optional, but if specified requires a second argument
213      if (argv.length > index + 1 && "-outfile".equals(argv[index])) {
214        index++;
215        historyOutFile = argv[index++];
216      }
217      // "-format" is optional, but if specified required a second argument
218      if (argv.length > index + 1 && "-format".equals(argv[index])) {
219        index++;
220        historyOutFormat = argv[index++];
221      }
222      // Check for any extra arguments that don't belong here
223      if (argv.length > index) {
224        displayUsage(cmd);
225        return exitCode;
226      }
227    } else if ("-list".equals(cmd)) {
228      if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
229        displayUsage(cmd);
230        return exitCode;
231      }
232      if (argv.length == 2 && "all".equals(argv[1])) {
233        listAllJobs = true;
234      } else {
235        listJobs = true;
236      }
237    } else if("-kill-task".equals(cmd)) {
238      if (argv.length != 2) {
239        displayUsage(cmd);
240        return exitCode;
241      }
242      killTask = true;
243      taskid = argv[1];
244    } else if("-fail-task".equals(cmd)) {
245      if (argv.length != 2) {
246        displayUsage(cmd);
247        return exitCode;
248      }
249      failTask = true;
250      taskid = argv[1];
251    } else if ("-list-active-trackers".equals(cmd)) {
252      if (argv.length != 1) {
253        displayUsage(cmd);
254        return exitCode;
255      }
256      listActiveTrackers = true;
257    } else if ("-list-blacklisted-trackers".equals(cmd)) {
258      if (argv.length != 1) {
259        displayUsage(cmd);
260        return exitCode;
261      }
262      listBlacklistedTrackers = true;
263    } else if ("-list-attempt-ids".equals(cmd)) {
264      if (argv.length != 4) {
265        displayUsage(cmd);
266        return exitCode;
267      }
268      jobid = argv[1];
269      taskType = argv[2];
270      taskState = argv[3];
271      displayTasks = true;
272      if (!taskTypes.contains(taskType.toUpperCase())) {
273        System.out.println("Error: Invalid task-type: " + taskType);
274        displayUsage(cmd);
275        return exitCode;
276      }
277      if (!taskStates.contains(taskState.toLowerCase())) {
278        System.out.println("Error: Invalid task-state: " + taskState);
279        displayUsage(cmd);
280        return exitCode;
281      }
282    } else if ("-logs".equals(cmd)) {
283      if (argv.length == 2 || argv.length ==3) {
284        logs = true;
285        jobid = argv[1];
286        if (argv.length == 3) {
287          taskid = argv[2];
288        }  else {
289          taskid = null;
290        }
291      } else {
292        displayUsage(cmd);
293        return exitCode;
294      }
295    } else if ("-config".equals(cmd)) {
296      downloadConfig = true;
297      if (argv.length != 3) {
298        displayUsage(cmd);
299        return exitCode;
300      }
301      jobid = argv[1];
302      configOutFile = argv[2];
303    } else {
304      displayUsage(cmd);
305      return exitCode;
306    }
307
308    // initialize cluster
309    cluster = createCluster();
310        
311    // Submit the request
312    try {
313      if (submitJobFile != null) {
314        Job job = Job.getInstance(new JobConf(submitJobFile));
315        job.submit();
316        System.out.println("Created job " + job.getJobID());
317        exitCode = 0;
318      } else if (getStatus) {
319        Job job = getJob(JobID.forName(jobid));
320        if (job == null) {
321          System.out.println("Could not find job " + jobid);
322        } else {
323          Counters counters = job.getCounters();
324          System.out.println();
325          System.out.println(job);
326          if (counters != null) {
327            System.out.println(counters);
328          } else {
329            System.out.println("Counters not available. Job is retired.");
330          }
331          exitCode = 0;
332        }
333      } else if (getCounter) {
334        Job job = getJob(JobID.forName(jobid));
335        if (job == null) {
336          System.out.println("Could not find job " + jobid);
337        } else {
338          Counters counters = job.getCounters();
339          if (counters == null) {
340            System.out.println("Counters not available for retired job " + 
341            jobid);
342            exitCode = -1;
343          } else {
344            System.out.println(getCounter(counters,
345              counterGroupName, counterName));
346            exitCode = 0;
347          }
348        }
349      } else if (killJob) {
350        Job job = getJob(JobID.forName(jobid));
351        if (job == null) {
352          System.out.println("Could not find job " + jobid);
353        } else {
354          job.killJob();
355          System.out.println("Killed job " + jobid);
356          exitCode = 0;
357        }
358      } else if (setJobPriority) {
359        Job job = getJob(JobID.forName(jobid));
360        if (job == null) {
361          System.out.println("Could not find job " + jobid);
362        } else {
363          job.setPriority(jp);
364          System.out.println("Changed job priority.");
365          exitCode = 0;
366        } 
367      } else if (viewHistory) {
368        // If it ends with .jhist, assume it's a jhist file; otherwise, assume
369        // it's a Job ID
370        if (historyFileOrJobId.endsWith(".jhist")) {
371          viewHistory(historyFileOrJobId, viewAllHistory, historyOutFile,
372              historyOutFormat);
373          exitCode = 0;
374        } else {
375          Job job = getJob(JobID.forName(historyFileOrJobId));
376          if (job == null) {
377            System.out.println("Could not find job " + jobid);
378          } else {
379            String historyUrl = job.getHistoryUrl();
380            if (historyUrl == null || historyUrl.isEmpty()) {
381              System.out.println("History file for job " + historyFileOrJobId +
382                  " is currently unavailable.");
383            } else {
384              viewHistory(historyUrl, viewAllHistory, historyOutFile,
385                  historyOutFormat);
386              exitCode = 0;
387            }
388          }
389        }
390      } else if (listEvents) {
391        listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents);
392        exitCode = 0;
393      } else if (listJobs) {
394        listJobs(cluster);
395        exitCode = 0;
396      } else if (listAllJobs) {
397        listAllJobs(cluster);
398        exitCode = 0;
399      } else if (listActiveTrackers) {
400        listActiveTrackers(cluster);
401        exitCode = 0;
402      } else if (listBlacklistedTrackers) {
403        listBlacklistedTrackers(cluster);
404        exitCode = 0;
405      } else if (displayTasks) {
406        displayTasks(getJob(JobID.forName(jobid)), taskType, taskState);
407        exitCode = 0;
408      } else if(killTask) {
409        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
410        Job job = getJob(taskID.getJobID());
411        if (job == null) {
412          System.out.println("Could not find job " + jobid);
413        } else if (job.killTask(taskID, false)) {
414          System.out.println("Killed task " + taskid);
415          exitCode = 0;
416        } else {
417          System.out.println("Could not kill task " + taskid);
418          exitCode = -1;
419        }
420      } else if(failTask) {
421        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
422        Job job = getJob(taskID.getJobID());
423        if (job == null) {
424            System.out.println("Could not find job " + jobid);
425        } else if(job.killTask(taskID, true)) {
426          System.out.println("Killed task " + taskID + " by failing it");
427          exitCode = 0;
428        } else {
429          System.out.println("Could not fail task " + taskid);
430          exitCode = -1;
431        }
432      } else if (logs) {
433        try {
434        JobID jobID = JobID.forName(jobid);
435        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
436        LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
437        LogCLIHelpers logDumper = new LogCLIHelpers();
438        logDumper.setConf(getConf());
439        exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
440            logParams.getContainerId(), logParams.getNodeId(),
441            logParams.getOwner());
442        } catch (IOException e) {
443          if (e instanceof RemoteException) {
444            throw e;
445          } 
446          System.out.println(e.getMessage());
447        }
448      } else if (downloadConfig) {
449        Job job = getJob(JobID.forName(jobid));
450        if (job == null) {
451          System.out.println("Could not find job " + jobid);
452        } else {
453          String jobFile = job.getJobFile();
454          if (jobFile == null || jobFile.isEmpty()) {
455            System.out.println("Config file for job " + jobFile +
456                " could not be found.");
457          } else {
458            Path configPath = new Path(jobFile);
459            FileSystem fs = FileSystem.get(getConf());
460            fs.copyToLocalFile(configPath, new Path(configOutFile));
461            exitCode = 0;
462          }
463        }
464      }
465    } catch (RemoteException re) {
466      IOException unwrappedException = re.unwrapRemoteException();
467      if (unwrappedException instanceof AccessControlException) {
468        System.out.println(unwrappedException.getMessage());
469      } else {
470        throw re;
471      }
472    } finally {
473      cluster.close();
474    }
475    return exitCode;
476  }
477
478  Cluster createCluster() throws IOException {
479    return new Cluster(getConf());
480  }
481
482  private String getJobPriorityNames() {
483    StringBuffer sb = new StringBuffer();
484    for (JobPriority p : JobPriority.values()) {
485      sb.append(p.name()).append(" ");
486    }
487    return sb.substring(0, sb.length()-1);
488  }
489
490  private String getTaskTypes() {
491    return StringUtils.join(taskTypes, " ");
492  }
493
494  /**
495   * Display usage of the command-line tool and terminate execution.
496   */
497  private void displayUsage(String cmd) {
498    String prefix = "Usage: CLI ";
499    String jobPriorityValues = getJobPriorityNames();
500    String taskStates = "running, completed";
501
502    if ("-submit".equals(cmd)) {
503      System.err.println(prefix + "[" + cmd + " <job-file>]");
504    } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
505      System.err.println(prefix + "[" + cmd + " <job-id>]");
506    } else if ("-counter".equals(cmd)) {
507      System.err.println(prefix + "[" + cmd + 
508        " <job-id> <group-name> <counter-name>]");
509    } else if ("-events".equals(cmd)) {
510      System.err.println(prefix + "[" + cmd + 
511        " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
512    } else if ("-history".equals(cmd)) {
513      System.err.println(prefix + "[" + cmd + " [all] <jobHistoryFile|jobId> " +
514          "[-outfile <file>] [-format <human|json>]]");
515    } else if ("-list".equals(cmd)) {
516      System.err.println(prefix + "[" + cmd + " [all]]");
517    } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
518      System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
519    } else if ("-set-priority".equals(cmd)) {
520      System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
521          "Valid values for priorities are: " 
522          + jobPriorityValues); 
523    } else if ("-list-active-trackers".equals(cmd)) {
524      System.err.println(prefix + "[" + cmd + "]");
525    } else if ("-list-blacklisted-trackers".equals(cmd)) {
526      System.err.println(prefix + "[" + cmd + "]");
527    } else if ("-list-attempt-ids".equals(cmd)) {
528      System.err.println(prefix + "[" + cmd + 
529          " <job-id> <task-type> <task-state>]. " +
530          "Valid values for <task-type> are " + getTaskTypes() + ". " +
531          "Valid values for <task-state> are " + taskStates);
532    } else if ("-logs".equals(cmd)) {
533      System.err.println(prefix + "[" + cmd +
534          " <job-id> <task-attempt-id>]. " +
535          " <task-attempt-id> is optional to get task attempt logs.");
536    } else if ("-config".equals(cmd)) {
537      System.err.println(prefix + "[" + cmd + " <job-id> <file>]");
538    } else {
539      System.err.printf(prefix + "<command> <args>%n");
540      System.err.printf("\t[-submit <job-file>]%n");
541      System.err.printf("\t[-status <job-id>]%n");
542      System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
543      System.err.printf("\t[-kill <job-id>]%n");
544      System.err.printf("\t[-set-priority <job-id> <priority>]. " +
545        "Valid values for priorities are: " + jobPriorityValues + "%n");
546      System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
547      System.err.printf("\t[-history [all] <jobHistoryFile|jobId> " +
548          "[-outfile <file>] [-format <human|json>]]%n");
549      System.err.printf("\t[-list [all]]%n");
550      System.err.printf("\t[-list-active-trackers]%n");
551      System.err.printf("\t[-list-blacklisted-trackers]%n");
552      System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
553        "<task-state>]. " +
554        "Valid values for <task-type> are " + getTaskTypes() + ". " +
555        "Valid values for <task-state> are " + taskStates);
556      System.err.printf("\t[-kill-task <task-attempt-id>]%n");
557      System.err.printf("\t[-fail-task <task-attempt-id>]%n");
558      System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n");
559      System.err.printf("\t[-config <job-id> <file>%n%n");
560      ToolRunner.printGenericCommandUsage(System.out);
561    }
562  }
563    
564  private void viewHistory(String historyFile, boolean all,
565      String historyOutFile, String format) throws IOException {
566    HistoryViewer historyViewer = new HistoryViewer(historyFile,
567        getConf(), all, format);
568    PrintStream ps = System.out;
569    if (historyOutFile != null) {
570      ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(
571          new File(historyOutFile))), true, "UTF-8");
572    }
573    historyViewer.print(ps);
574  }
575
576  protected long getCounter(Counters counters, String counterGroupName,
577      String counterName) throws IOException {
578    return counters.findCounter(counterGroupName, counterName).getValue();
579  }
580  
581  /**
582   * List the events for the given job
583   * @param jobId the job id for the job's events to list
584   * @throws IOException
585   */
586  private void listEvents(Job job, int fromEventId, int numEvents)
587      throws IOException, InterruptedException {
588    TaskCompletionEvent[] events = job.
589      getTaskCompletionEvents(fromEventId, numEvents);
590    System.out.println("Task completion events for " + job.getJobID());
591    System.out.println("Number of events (from " + fromEventId + ") are: " 
592      + events.length);
593    for(TaskCompletionEvent event: events) {
594      System.out.println(event.getStatus() + " " + 
595        event.getTaskAttemptId() + " " + 
596        getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
597    }
598  }
599
600  protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
601    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
602  }
603
604  @VisibleForTesting
605  Job getJob(JobID jobid) throws IOException, InterruptedException {
606
607    int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
608        MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
609    long retryInterval = getConf()
610        .getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
611            MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
612    Job job = cluster.getJob(jobid);
613
614    for (int i = 0; i < maxRetry; ++i) {
615      if (job != null) {
616        return job;
617      }
618      LOG.info("Could not obtain job info after " + String.valueOf(i + 1)
619          + " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000)
620          + " seconds and retrying.");
621      Thread.sleep(retryInterval);
622      job = cluster.getJob(jobid);
623    }
624    return job;
625  }
626  
627
628  /**
629   * Dump a list of currently running jobs
630   * @throws IOException
631   */
632  private void listJobs(Cluster cluster) 
633      throws IOException, InterruptedException {
634    List<JobStatus> runningJobs = new ArrayList<JobStatus>();
635    for (JobStatus job : cluster.getAllJobStatuses()) {
636      if (!job.isJobComplete()) {
637        runningJobs.add(job);
638      }
639    }
640    displayJobList(runningJobs.toArray(new JobStatus[0]));
641  }
642    
643  /**
644   * Dump a list of all jobs submitted.
645   * @throws IOException
646   */
647  private void listAllJobs(Cluster cluster) 
648      throws IOException, InterruptedException {
649    displayJobList(cluster.getAllJobStatuses());
650  }
651  
652  /**
653   * Display the list of active trackers
654   */
655  private void listActiveTrackers(Cluster cluster) 
656      throws IOException, InterruptedException {
657    TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
658    for (TaskTrackerInfo tracker : trackers) {
659      System.out.println(tracker.getTaskTrackerName());
660    }
661  }
662
663  /**
664   * Display the list of blacklisted trackers
665   */
666  private void listBlacklistedTrackers(Cluster cluster) 
667      throws IOException, InterruptedException {
668    TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
669    if (trackers.length > 0) {
670      System.out.println("BlackListedNode \t Reason");
671    }
672    for (TaskTrackerInfo tracker : trackers) {
673      System.out.println(tracker.getTaskTrackerName() + "\t" + 
674        tracker.getReasonForBlacklist());
675    }
676  }
677
678  private void printTaskAttempts(TaskReport report) {
679    if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
680      System.out.println(report.getSuccessfulTaskAttemptId());
681    } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
682      for (TaskAttemptID t : 
683        report.getRunningTaskAttemptIds()) {
684        System.out.println(t);
685      }
686    }
687  }
688
689  /**
690   * Display the information about a job's tasks, of a particular type and
691   * in a particular state
692   * 
693   * @param job the job
694   * @param type the type of the task (map/reduce/setup/cleanup)
695   * @param state the state of the task 
696   * (pending/running/completed/failed/killed)
697   */
698  protected void displayTasks(Job job, String type, String state) 
699  throws IOException, InterruptedException {
700    TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase()));
701    for (TaskReport report : reports) {
702      TIPStatus status = report.getCurrentStatus();
703      if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) ||
704          (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) ||
705          (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) ||
706          (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) ||
707          (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) {
708        printTaskAttempts(report);
709      }
710    }
711  }
712
713  public void displayJobList(JobStatus[] jobs) 
714      throws IOException, InterruptedException {
715    displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out,
716        Charsets.UTF_8)));
717  }
718
719  @Private
720  public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
721  @Private
722  public static String dataPattern   = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
723  private static String memPattern   = "%dM";
724  private static String UNAVAILABLE  = "N/A";
725
726  @Private
727  public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
728    writer.println("Total jobs:" + jobs.length);
729    writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
730      "Queue", "Priority", "UsedContainers",
731      "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
732    for (JobStatus job : jobs) {
733      int numUsedSlots = job.getNumUsedSlots();
734      int numReservedSlots = job.getNumReservedSlots();
735      int usedMem = job.getUsedMem();
736      int rsvdMem = job.getReservedMem();
737      int neededMem = job.getNeededMem();
738      writer.printf(dataPattern,
739          job.getJobID().toString(), job.getState(), job.getStartTime(),
740          job.getUsername(), job.getQueue(), 
741          job.getPriority().name(),
742          numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
743          numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
744          usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
745          rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
746          neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
747          job.getSchedulingInfo());
748    }
749    writer.flush();
750  }
751  
752  public static void main(String[] argv) throws Exception {
753    int res = ToolRunner.run(new CLI(), argv);
754    ExitUtil.terminate(res);
755  }
756}