001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapred;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.URL;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.List;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.io.Text;
036import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
037import org.apache.hadoop.mapreduce.Cluster;
038import org.apache.hadoop.mapreduce.ClusterMetrics;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.mapreduce.MRJobConfig;
041import org.apache.hadoop.mapreduce.QueueInfo;
042import org.apache.hadoop.mapreduce.TaskTrackerInfo;
043import org.apache.hadoop.mapreduce.TaskType;
044import org.apache.hadoop.mapreduce.counters.Limits;
045import org.apache.hadoop.mapreduce.filecache.DistributedCache;
046import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
047import org.apache.hadoop.mapreduce.tools.CLI;
048import org.apache.hadoop.mapreduce.util.ConfigUtil;
049import org.apache.hadoop.security.UserGroupInformation;
050import org.apache.hadoop.security.token.SecretManager.InvalidToken;
051import org.apache.hadoop.security.token.Token;
052import org.apache.hadoop.security.token.TokenRenewer;
053import org.apache.hadoop.util.Tool;
054import org.apache.hadoop.util.ToolRunner;
055
056/**
057 * <code>JobClient</code> is the primary interface for the user-job to interact
058 * with the cluster.
059 * 
060 * <code>JobClient</code> provides facilities to submit jobs, track their 
061 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
062 * status information etc.
063 * 
064 * <p>The job submission process involves:
065 * <ol>
066 *   <li>
067 *   Checking the input and output specifications of the job.
068 *   </li>
069 *   <li>
070 *   Computing the {@link InputSplit}s for the job.
071 *   </li>
072 *   <li>
073 *   Setup the requisite accounting information for the {@link DistributedCache} 
074 *   of the job, if necessary.
075 *   </li>
076 *   <li>
077 *   Copying the job's jar and configuration to the map-reduce system directory 
078 *   on the distributed file-system. 
079 *   </li>
080 *   <li>
081 *   Submitting the job to the cluster and optionally monitoring
082 *   it's status.
083 *   </li>
084 * </ol></p>
085 *  
086 * Normally the user creates the application, describes various facets of the
087 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
088 * the job and monitor its progress.
089 * 
090 * <p>Here is an example on how to use <code>JobClient</code>:</p>
091 * <p><blockquote><pre>
092 *     // Create a new JobConf
093 *     JobConf job = new JobConf(new Configuration(), MyJob.class);
094 *     
095 *     // Specify various job-specific parameters     
096 *     job.setJobName("myjob");
097 *     
098 *     job.setInputPath(new Path("in"));
099 *     job.setOutputPath(new Path("out"));
100 *     
101 *     job.setMapperClass(MyJob.MyMapper.class);
102 *     job.setReducerClass(MyJob.MyReducer.class);
103 *
104 *     // Submit the job, then poll for progress until the job is complete
105 *     JobClient.runJob(job);
106 * </pre></blockquote></p>
107 * 
108 * <h4 id="JobControl">Job Control</h4>
109 * 
110 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
111 * which cannot be done via a single map-reduce job. This is fairly easy since 
112 * the output of the job, typically, goes to distributed file-system and that 
113 * can be used as the input for the next job.</p>
114 * 
115 * <p>However, this also means that the onus on ensuring jobs are complete 
116 * (success/failure) lies squarely on the clients. In such situations the 
117 * various job-control options are:
118 * <ol>
119 *   <li>
120 *   {@link #runJob(JobConf)} : submits the job and returns only after 
121 *   the job has completed.
122 *   </li>
123 *   <li>
124 *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
125 *   returned handle to the {@link RunningJob} to query status and make 
126 *   scheduling decisions.
127 *   </li>
128 *   <li>
129 *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
130 *   on job-completion, thus avoiding polling.
131 *   </li>
132 * </ol></p>
133 * 
134 * @see JobConf
135 * @see ClusterStatus
136 * @see Tool
137 * @see DistributedCache
138 */
139@InterfaceAudience.Public
140@InterfaceStability.Stable
141public class JobClient extends CLI implements AutoCloseable {
142
143  @InterfaceAudience.Private
144  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
145      "mapreduce.jobclient.retry.policy.enabled";
146  @InterfaceAudience.Private
147  public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
148      false;
149  @InterfaceAudience.Private
150  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
151      "mapreduce.jobclient.retry.policy.spec";
152  @InterfaceAudience.Private
153  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
154      "10000,6,60000,10"; // t1,n1,t2,n2,...
155
156  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
157  private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
158  
159  private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES;
160  private long retryInterval =
161      MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL;
162
163  static{
164    ConfigUtil.loadResources();
165  }
166
167  /**
168   * A NetworkedJob is an implementation of RunningJob.  It holds
169   * a JobProfile object to provide some info, and interacts with the
170   * remote service to provide certain functionality.
171   */
172  static class NetworkedJob implements RunningJob {
173    Job job;
174    /**
175     * We store a JobProfile and a timestamp for when we last
176     * acquired the job profile.  If the job is null, then we cannot
177     * perform any of the tasks.  The job might be null if the cluster
178     * has completely forgotten about the job.  (eg, 24 hours after the
179     * job completes.)
180     */
181    public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
182      this(status, cluster, new JobConf(status.getJobFile()));
183    }
184    
185    private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf)
186        throws IOException {
187      this(Job.getInstance(cluster, status, conf));
188    }
189
190    public NetworkedJob(Job job) throws IOException {
191      this.job = job;
192    }
193
194    public Configuration getConfiguration() {
195      return job.getConfiguration();
196    }
197
198    /**
199     * An identifier for the job
200     */
201    public JobID getID() {
202      return JobID.downgrade(job.getJobID());
203    }
204    
205    /** @deprecated This method is deprecated and will be removed. Applications should 
206     * rather use {@link #getID()}.*/
207    @Deprecated
208    public String getJobID() {
209      return getID().toString();
210    }
211    
212    /**
213     * The user-specified job name
214     */
215    public String getJobName() {
216      return job.getJobName();
217    }
218
219    /**
220     * The name of the job file
221     */
222    public String getJobFile() {
223      return job.getJobFile();
224    }
225
226    /**
227     * A URL where the job's status can be seen
228     */
229    public String getTrackingURL() {
230      return job.getTrackingURL();
231    }
232
233    /**
234     * A float between 0.0 and 1.0, indicating the % of map work
235     * completed.
236     */
237    public float mapProgress() throws IOException {
238      return job.mapProgress();
239    }
240
241    /**
242     * A float between 0.0 and 1.0, indicating the % of reduce work
243     * completed.
244     */
245    public float reduceProgress() throws IOException {
246      return job.reduceProgress();
247    }
248
249    /**
250     * A float between 0.0 and 1.0, indicating the % of cleanup work
251     * completed.
252     */
253    public float cleanupProgress() throws IOException {
254      try {
255        return job.cleanupProgress();
256      } catch (InterruptedException ie) {
257        throw new IOException(ie);
258      }
259    }
260
261    /**
262     * A float between 0.0 and 1.0, indicating the % of setup work
263     * completed.
264     */
265    public float setupProgress() throws IOException {
266      return job.setupProgress();
267    }
268
269    /**
270     * Returns immediately whether the whole job is done yet or not.
271     */
272    public synchronized boolean isComplete() throws IOException {
273      return job.isComplete();
274    }
275
276    /**
277     * True iff job completed successfully.
278     */
279    public synchronized boolean isSuccessful() throws IOException {
280      return job.isSuccessful();
281    }
282
283    /**
284     * Blocks until the job is finished
285     */
286    public void waitForCompletion() throws IOException {
287      try {
288        job.waitForCompletion(false);
289      } catch (InterruptedException ie) {
290        throw new IOException(ie);
291      } catch (ClassNotFoundException ce) {
292        throw new IOException(ce);
293      }
294    }
295
296    /**
297     * Tells the service to get the state of the current job.
298     */
299    public synchronized int getJobState() throws IOException {
300      try {
301        return job.getJobState().getValue();
302      } catch (InterruptedException ie) {
303        throw new IOException(ie);
304      }
305    }
306    
307    /**
308     * Tells the service to terminate the current job.
309     */
310    public synchronized void killJob() throws IOException {
311      job.killJob();
312    }
313   
314    
315    /** Set the priority of the job.
316    * @param priority new priority of the job. 
317    */
318    public synchronized void setJobPriority(String priority) 
319                                                throws IOException {
320      try {
321        job.setPriority(
322          org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
323      } catch (InterruptedException ie) {
324        throw new IOException(ie);
325      }
326    }
327    
328    /**
329     * Kill indicated task attempt.
330     * @param taskId the id of the task to kill.
331     * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
332     * it is just killed, w/o affecting job failure status.
333     */
334    public synchronized void killTask(TaskAttemptID taskId,
335        boolean shouldFail) throws IOException {
336      if (shouldFail) {
337        job.failTask(taskId);
338      } else {
339        job.killTask(taskId);
340      }
341    }
342
343    /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
344    @Deprecated
345    public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
346      killTask(TaskAttemptID.forName(taskId), shouldFail);
347    }
348    
349    /**
350     * Fetch task completion events from cluster for this job. 
351     */
352    public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
353        int startFrom) throws IOException {
354      try {
355        org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
356          job.getTaskCompletionEvents(startFrom, 10);
357        TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
358        for (int i = 0 ; i < acls.length; i++ ) {
359          ret[i] = TaskCompletionEvent.downgrade(acls[i]);
360        }
361        return ret;
362      } catch (InterruptedException ie) {
363        throw new IOException(ie);
364      }
365    }
366
367    /**
368     * Dump stats to screen
369     */
370    @Override
371    public String toString() {
372      return job.toString();
373    }
374        
375    /**
376     * Returns the counters for this job
377     */
378    public Counters getCounters() throws IOException {
379      Counters result = null;
380      org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
381      if(temp != null) {
382        result = Counters.downgrade(temp);
383      }
384      return result;
385    }
386    
387    @Override
388    public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
389      try { 
390        return job.getTaskDiagnostics(id);
391      } catch (InterruptedException ie) {
392        throw new IOException(ie);
393      }
394    }
395
396    public String getHistoryUrl() throws IOException {
397      try {
398        return job.getHistoryUrl();
399      } catch (InterruptedException ie) {
400        throw new IOException(ie);
401      }
402    }
403
404    public boolean isRetired() throws IOException {
405      try {
406        return job.isRetired();
407      } catch (InterruptedException ie) {
408        throw new IOException(ie);
409      }
410    }
411    
412    boolean monitorAndPrintJob() throws IOException, InterruptedException {
413      return job.monitorAndPrintJob();
414    }
415    
416    @Override
417    public String getFailureInfo() throws IOException {
418      try {
419        return job.getStatus().getFailureInfo();
420      } catch (InterruptedException ie) {
421        throw new IOException(ie);
422      }
423    }
424
425    @Override
426    public JobStatus getJobStatus() throws IOException {
427      try {
428        return JobStatus.downgrade(job.getStatus());
429      } catch (InterruptedException ie) {
430        throw new IOException(ie);
431      }
432    }
433  }
434
435  /**
436   * Ugi of the client. We store this ugi when the client is created and 
437   * then make sure that the same ugi is used to run the various protocols.
438   */
439  UserGroupInformation clientUgi;
440  
441  /**
442   * Create a job client.
443   */
444  public JobClient() {
445  }
446    
447  /**
448   * Build a job client with the given {@link JobConf}, and connect to the 
449   * default cluster
450   * 
451   * @param conf the job configuration.
452   * @throws IOException
453   */
454  public JobClient(JobConf conf) throws IOException {
455    init(conf);
456  }
457
458  /**
459   * Build a job client with the given {@link Configuration}, 
460   * and connect to the default cluster
461   * 
462   * @param conf the configuration.
463   * @throws IOException
464   */
465  public JobClient(Configuration conf) throws IOException {
466    init(new JobConf(conf));
467  }
468
469  /**
470   * Connect to the default cluster
471   * @param conf the job configuration.
472   * @throws IOException
473   */
474  public void init(JobConf conf) throws IOException {
475    setConf(conf);
476    Limits.init(conf);
477    cluster = new Cluster(conf);
478    clientUgi = UserGroupInformation.getCurrentUser();
479
480    maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
481      MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
482
483    retryInterval =
484      conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
485        MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
486
487  }
488
489  /**
490   * Build a job client, connect to the indicated job tracker.
491   * 
492   * @param jobTrackAddr the job tracker to connect to.
493   * @param conf configuration.
494   */
495  public JobClient(InetSocketAddress jobTrackAddr, 
496                   Configuration conf) throws IOException {
497    cluster = new Cluster(jobTrackAddr, conf);
498    clientUgi = UserGroupInformation.getCurrentUser();
499  }
500
501  /**
502   * Close the <code>JobClient</code>.
503   */
504  @Override
505  public synchronized void close() throws IOException {
506    cluster.close();
507  }
508
509  /**
510   * Get a filesystem handle.  We need this to prepare jobs
511   * for submission to the MapReduce system.
512   * 
513   * @return the filesystem handle.
514   */
515  public synchronized FileSystem getFs() throws IOException {
516    try { 
517      return cluster.getFileSystem();
518    } catch (InterruptedException ie) {
519      throw new IOException(ie);
520    }
521  }
522  
523  /**
524   * Get a handle to the Cluster
525   */
526  public Cluster getClusterHandle() {
527    return cluster;
528  }
529  
530  /**
531   * Submit a job to the MR system.
532   * 
533   * This returns a handle to the {@link RunningJob} which can be used to track
534   * the running-job.
535   * 
536   * @param jobFile the job configuration.
537   * @return a handle to the {@link RunningJob} which can be used to track the
538   *         running-job.
539   * @throws FileNotFoundException
540   * @throws InvalidJobConfException
541   * @throws IOException
542   */
543  public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
544                                                     InvalidJobConfException, 
545                                                     IOException {
546    // Load in the submitted job details
547    JobConf job = new JobConf(jobFile);
548    return submitJob(job);
549  }
550    
551  /**
552   * Submit a job to the MR system.
553   * This returns a handle to the {@link RunningJob} which can be used to track
554   * the running-job.
555   * 
556   * @param conf the job configuration.
557   * @return a handle to the {@link RunningJob} which can be used to track the
558   *         running-job.
559   * @throws FileNotFoundException
560   * @throws IOException
561   */
562  public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
563                                                  IOException {
564    return submitJobInternal(conf);
565  }
566
567  @InterfaceAudience.Private
568  public RunningJob submitJobInternal(final JobConf conf)
569      throws FileNotFoundException, IOException {
570    try {
571      conf.setBooleanIfUnset("mapred.mapper.new-api", false);
572      conf.setBooleanIfUnset("mapred.reducer.new-api", false);
573      Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
574        @Override
575        public Job run() throws IOException, ClassNotFoundException, 
576          InterruptedException {
577          Job job = Job.getInstance(conf);
578          job.submit();
579          return job;
580        }
581      });
582
583      Cluster prev = cluster;
584      // update our Cluster instance with the one created by Job for submission
585      // (we can't pass our Cluster instance to Job, since Job wraps the config
586      // instance, and the two configs would then diverge)
587      cluster = job.getCluster();
588
589      // It is important to close the previous cluster instance
590      // to cleanup resources.
591      if (prev != null) {
592        prev.close();
593      }
594      return new NetworkedJob(job);
595    } catch (InterruptedException ie) {
596      throw new IOException("interrupted", ie);
597    }
598  }
599
600  private Job getJobUsingCluster(final JobID jobid) throws IOException,
601  InterruptedException {
602    return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
603      public Job run() throws IOException, InterruptedException  {
604       return cluster.getJob(jobid);
605      }
606    });
607  }
608
609  protected RunningJob getJobInner(final JobID jobid) throws IOException {
610    try {
611      
612      Job job = getJobUsingCluster(jobid);
613      if (job != null) {
614        JobStatus status = JobStatus.downgrade(job.getStatus());
615        if (status != null) {
616          return new NetworkedJob(status, cluster,
617              new JobConf(job.getConfiguration()));
618        } 
619      }
620    } catch (InterruptedException ie) {
621      throw new IOException(ie);
622    }
623    return null;
624  }
625
626  /**
627   * Get an {@link RunningJob} object to track an ongoing job.  Returns
628   * null if the id does not correspond to any known job.
629   *
630   * @param jobid the jobid of the job.
631   * @return the {@link RunningJob} handle to track the job, null if the
632   *         <code>jobid</code> doesn't correspond to any known job.
633   * @throws IOException
634   */
635  public RunningJob getJob(final JobID jobid) throws IOException {
636     for (int i = 0;i <= maxRetry;i++) {
637       if (i > 0) {
638         try {
639           Thread.sleep(retryInterval);
640         } catch (Exception e) { }
641       }
642       RunningJob job = getJobInner(jobid);
643       if (job != null) {
644         return job;
645       }
646     }
647     return null;
648  }
649
650  /**@deprecated Applications should rather use {@link #getJob(JobID)}.
651   */
652  @Deprecated
653  public RunningJob getJob(String jobid) throws IOException {
654    return getJob(JobID.forName(jobid));
655  }
656  
657  private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
658  
659  /**
660   * Get the information of the current state of the map tasks of a job.
661   * 
662   * @param jobId the job to query.
663   * @return the list of all of the map tips.
664   * @throws IOException
665   */
666  public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
667    return getTaskReports(jobId, TaskType.MAP);
668  }
669  
670  private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 
671    IOException {
672    try {
673      Job j = getJobUsingCluster(jobId);
674      if(j == null) {
675        return EMPTY_TASK_REPORTS;
676      }
677      return TaskReport.downgradeArray(j.getTaskReports(type));
678    } catch (InterruptedException ie) {
679      throw new IOException(ie);
680    }
681  }
682  
683  /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
684  @Deprecated
685  public TaskReport[] getMapTaskReports(String jobId) throws IOException {
686    return getMapTaskReports(JobID.forName(jobId));
687  }
688  
689  /**
690   * Get the information of the current state of the reduce tasks of a job.
691   * 
692   * @param jobId the job to query.
693   * @return the list of all of the reduce tips.
694   * @throws IOException
695   */    
696  public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
697    return getTaskReports(jobId, TaskType.REDUCE);
698  }
699
700  /**
701   * Get the information of the current state of the cleanup tasks of a job.
702   * 
703   * @param jobId the job to query.
704   * @return the list of all of the cleanup tips.
705   * @throws IOException
706   */    
707  public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
708    return getTaskReports(jobId, TaskType.JOB_CLEANUP);
709  }
710
711  /**
712   * Get the information of the current state of the setup tasks of a job.
713   * 
714   * @param jobId the job to query.
715   * @return the list of all of the setup tips.
716   * @throws IOException
717   */    
718  public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
719    return getTaskReports(jobId, TaskType.JOB_SETUP);
720  }
721
722  
723  /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
724  @Deprecated
725  public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
726    return getReduceTaskReports(JobID.forName(jobId));
727  }
728  
729  /**
730   * Display the information about a job's tasks, of a particular type and
731   * in a particular state
732   * 
733   * @param jobId the ID of the job
734   * @param type the type of the task (map/reduce/setup/cleanup)
735   * @param state the state of the task 
736   * (pending/running/completed/failed/killed)
737   */
738  public void displayTasks(final JobID jobId, String type, String state) 
739  throws IOException {
740    try {
741      Job job = getJobUsingCluster(jobId);
742      super.displayTasks(job, type, state);
743    } catch (InterruptedException ie) {
744      throw new IOException(ie);
745    }
746  }
747  
748  /**
749   * Get status information about the Map-Reduce cluster.
750   *  
751   * @return the status information about the Map-Reduce cluster as an object
752   *         of {@link ClusterStatus}.
753   * @throws IOException
754   */
755  public ClusterStatus getClusterStatus() throws IOException {
756    try {
757      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
758        public ClusterStatus run() throws IOException, InterruptedException {
759          ClusterMetrics metrics = cluster.getClusterStatus();
760          return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
761            .getBlackListedTaskTrackerCount(), cluster
762            .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
763            metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
764            metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
765            metrics.getDecommissionedTaskTrackerCount(), metrics
766              .getGrayListedTaskTrackerCount());
767        }
768      });
769    } catch (InterruptedException ie) {
770      throw new IOException(ie);
771    }
772  }
773
774  private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
775    Collection<String> list = new ArrayList<String>();
776    for (TaskTrackerInfo info: objs) {
777      list.add(info.getTaskTrackerName());
778    }
779    return list;
780  }
781
782  private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
783    Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
784    for (TaskTrackerInfo info: objs) {
785      BlackListInfo binfo = new BlackListInfo();
786      binfo.setTrackerName(info.getTaskTrackerName());
787      binfo.setReasonForBlackListing(info.getReasonForBlacklist());
788      binfo.setBlackListReport(info.getBlacklistReport());
789      list.add(binfo);
790    }
791    return list;
792  }
793
794  /**
795   * Get status information about the Map-Reduce cluster.
796   *  
797   * @param  detailed if true then get a detailed status including the
798   *         tracker names
799   * @return the status information about the Map-Reduce cluster as an object
800   *         of {@link ClusterStatus}.
801   * @throws IOException
802   */
803  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
804    try {
805      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
806        public ClusterStatus run() throws IOException, InterruptedException {
807        ClusterMetrics metrics = cluster.getClusterStatus();
808        return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
809          arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
810          cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
811          metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
812          metrics.getReduceSlotCapacity(), 
813          cluster.getJobTrackerStatus());
814        }
815      });
816    } catch (InterruptedException ie) {
817      throw new IOException(ie);
818    }
819  }
820    
821
822  /** 
823   * Get the jobs that are not completed and not failed.
824   * 
825   * @return array of {@link JobStatus} for the running/to-be-run jobs.
826   * @throws IOException
827   */
828  public JobStatus[] jobsToComplete() throws IOException {
829    List<JobStatus> stats = new ArrayList<JobStatus>();
830    for (JobStatus stat : getAllJobs()) {
831      if (!stat.isJobComplete()) {
832        stats.add(stat);
833      }
834    }
835    return stats.toArray(new JobStatus[0]);
836  }
837
838  /** 
839   * Get the jobs that are submitted.
840   * 
841   * @return array of {@link JobStatus} for the submitted jobs.
842   * @throws IOException
843   */
844  public JobStatus[] getAllJobs() throws IOException {
845    try {
846      org.apache.hadoop.mapreduce.JobStatus[] jobs = 
847          clientUgi.doAs(new PrivilegedExceptionAction<
848              org.apache.hadoop.mapreduce.JobStatus[]> () {
849            public org.apache.hadoop.mapreduce.JobStatus[] run() 
850                throws IOException, InterruptedException {
851              return cluster.getAllJobStatuses();
852            }
853          });
854      JobStatus[] stats = new JobStatus[jobs.length];
855      for (int i = 0; i < jobs.length; i++) {
856        stats[i] = JobStatus.downgrade(jobs[i]);
857      }
858      return stats;
859    } catch (InterruptedException ie) {
860      throw new IOException(ie);
861    }
862  }
863  
864  /** 
865   * Utility that submits a job, then polls for progress until the job is
866   * complete.
867   * 
868   * @param job the job configuration.
869   * @throws IOException if the job fails
870   */
871  public static RunningJob runJob(JobConf job) throws IOException {
872    JobClient jc = new JobClient(job);
873    RunningJob rj = jc.submitJob(job);
874    try {
875      if (!jc.monitorAndPrintJob(job, rj)) {
876        throw new IOException("Job failed!");
877      }
878    } catch (InterruptedException ie) {
879      Thread.currentThread().interrupt();
880    }
881    return rj;
882  }
883  
884  /**
885   * Monitor a job and print status in real-time as progress is made and tasks 
886   * fail.
887   * @param conf the job's configuration
888   * @param job the job to track
889   * @return true if the job succeeded
890   * @throws IOException if communication to the JobTracker fails
891   */
892  public boolean monitorAndPrintJob(JobConf conf, 
893                                    RunningJob job
894  ) throws IOException, InterruptedException {
895    return ((NetworkedJob)job).monitorAndPrintJob();
896  }
897
898  static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
899    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
900  }
901  
902  static Configuration getConfiguration(String jobTrackerSpec)
903  {
904    Configuration conf = new Configuration();
905    if (jobTrackerSpec != null) {        
906      if (jobTrackerSpec.indexOf(":") >= 0) {
907        conf.set("mapred.job.tracker", jobTrackerSpec);
908      } else {
909        String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
910        URL validate = conf.getResource(classpathFile);
911        if (validate == null) {
912          throw new RuntimeException(classpathFile + " not found on CLASSPATH");
913        }
914        conf.addResource(classpathFile);
915      }
916    }
917    return conf;
918  }
919
920  /**
921   * Sets the output filter for tasks. only those tasks are printed whose
922   * output matches the filter. 
923   * @param newValue task filter.
924   */
925  @Deprecated
926  public void setTaskOutputFilter(TaskStatusFilter newValue){
927    this.taskOutputFilter = newValue;
928  }
929    
930  /**
931   * Get the task output filter out of the JobConf.
932   * 
933   * @param job the JobConf to examine.
934   * @return the filter level.
935   */
936  public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
937    return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
938                                            "FAILED"));
939  }
940    
941  /**
942   * Modify the JobConf to set the task output filter.
943   * 
944   * @param job the JobConf to modify.
945   * @param newValue the value to set.
946   */
947  public static void setTaskOutputFilter(JobConf job, 
948                                         TaskStatusFilter newValue) {
949    job.set("jobclient.output.filter", newValue.toString());
950  }
951    
952  /**
953   * Returns task output filter.
954   * @return task filter. 
955   */
956  @Deprecated
957  public TaskStatusFilter getTaskOutputFilter(){
958    return this.taskOutputFilter; 
959  }
960
961  protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
962      String counterGroupName, String counterName) throws IOException {
963    Counters counters = Counters.downgrade(cntrs);
964    return counters.findCounter(counterGroupName, counterName).getValue();
965  }
966
967  /**
968   * Get status information about the max available Maps in the cluster.
969   *  
970   * @return the max available Maps in the cluster
971   * @throws IOException
972   */
973  public int getDefaultMaps() throws IOException {
974    try {
975      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
976        @Override
977        public Integer run() throws IOException, InterruptedException {
978          return cluster.getClusterStatus().getMapSlotCapacity();
979        }
980      });
981    } catch (InterruptedException ie) {
982      throw new IOException(ie);
983    }
984  }
985
986  /**
987   * Get status information about the max available Reduces in the cluster.
988   *  
989   * @return the max available Reduces in the cluster
990   * @throws IOException
991   */
992  public int getDefaultReduces() throws IOException {
993    try {
994      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
995        @Override
996        public Integer run() throws IOException, InterruptedException {
997          return cluster.getClusterStatus().getReduceSlotCapacity();
998        }
999      });
1000    } catch (InterruptedException ie) {
1001      throw new IOException(ie);
1002    }
1003  }
1004
1005  /**
1006   * Grab the jobtracker system directory path where job-specific files are to be placed.
1007   * 
1008   * @return the system directory where job-specific files are to be placed.
1009   */
1010  public Path getSystemDir() {
1011    try {
1012      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1013        @Override
1014        public Path run() throws IOException, InterruptedException {
1015          return cluster.getSystemDir();
1016        }
1017      });
1018      } catch (IOException ioe) {
1019      return null;
1020    } catch (InterruptedException ie) {
1021      return null;
1022    }
1023  }
1024
1025  /**
1026   * Checks if the job directory is clean and has all the required components
1027   * for (re) starting the job
1028   */
1029  public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
1030      throws IOException {
1031    FileStatus[] contents = fs.listStatus(jobDirPath);
1032    int matchCount = 0;
1033    if (contents != null && contents.length >= 2) {
1034      for (FileStatus status : contents) {
1035        if ("job.xml".equals(status.getPath().getName())) {
1036          ++matchCount;
1037        }
1038        if ("job.split".equals(status.getPath().getName())) {
1039          ++matchCount;
1040        }
1041      }
1042      if (matchCount == 2) {
1043        return true;
1044      }
1045    }
1046    return false;
1047  }
1048
1049  /**
1050   * Fetch the staging area directory for the application
1051   * 
1052   * @return path to staging area directory
1053   * @throws IOException
1054   */
1055  public Path getStagingAreaDir() throws IOException {
1056    try {
1057      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1058        @Override
1059        public Path run() throws IOException, InterruptedException {
1060          return cluster.getStagingAreaDir();
1061        }
1062      });
1063    } catch (InterruptedException ie) {
1064      // throw RuntimeException instead for compatibility reasons
1065      throw new RuntimeException(ie);
1066    }
1067  }
1068
1069  private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1070    JobQueueInfo ret = new JobQueueInfo(queue);
1071    // make sure to convert any children
1072    if (queue.getQueueChildren().size() > 0) {
1073      List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1074          .getQueueChildren().size());
1075      for (QueueInfo child : queue.getQueueChildren()) {
1076        childQueues.add(getJobQueueInfo(child));
1077      }
1078      ret.setChildren(childQueues);
1079    }
1080    return ret;
1081  }
1082
1083  private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1084      throws IOException {
1085    JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1086    for (int i = 0; i < queues.length; i++) {
1087      ret[i] = getJobQueueInfo(queues[i]);
1088    }
1089    return ret;
1090  }
1091
1092  /**
1093   * Returns an array of queue information objects about root level queues
1094   * configured
1095   *
1096   * @return the array of root level JobQueueInfo objects
1097   * @throws IOException
1098   */
1099  public JobQueueInfo[] getRootQueues() throws IOException {
1100    try {
1101      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1102        public JobQueueInfo[] run() throws IOException, InterruptedException {
1103          return getJobQueueInfoArray(cluster.getRootQueues());
1104        }
1105      });
1106    } catch (InterruptedException ie) {
1107      throw new IOException(ie);
1108    }
1109  }
1110
1111  /**
1112   * Returns an array of queue information objects about immediate children
1113   * of queue queueName.
1114   * 
1115   * @param queueName
1116   * @return the array of immediate children JobQueueInfo objects
1117   * @throws IOException
1118   */
1119  public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1120    try {
1121      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1122        public JobQueueInfo[] run() throws IOException, InterruptedException {
1123          return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1124        }
1125      });
1126    } catch (InterruptedException ie) {
1127      throw new IOException(ie);
1128    }
1129  }
1130  
1131  /**
1132   * Return an array of queue information objects about all the Job Queues
1133   * configured.
1134   * 
1135   * @return Array of JobQueueInfo objects
1136   * @throws IOException
1137   */
1138  public JobQueueInfo[] getQueues() throws IOException {
1139    try {
1140      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1141        public JobQueueInfo[] run() throws IOException, InterruptedException {
1142          return getJobQueueInfoArray(cluster.getQueues());
1143        }
1144      });
1145    } catch (InterruptedException ie) {
1146      throw new IOException(ie);
1147    }
1148  }
1149  
1150  /**
1151   * Gets all the jobs which were added to particular Job Queue
1152   * 
1153   * @param queueName name of the Job Queue
1154   * @return Array of jobs present in the job queue
1155   * @throws IOException
1156   */
1157  
1158  public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1159    try {
1160      QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1161        @Override
1162        public QueueInfo run() throws IOException, InterruptedException {
1163          return cluster.getQueue(queueName);
1164        }
1165      });
1166      if (queue == null) {
1167        return null;
1168      }
1169      org.apache.hadoop.mapreduce.JobStatus[] stats = 
1170        queue.getJobStatuses();
1171      JobStatus[] ret = new JobStatus[stats.length];
1172      for (int i = 0 ; i < stats.length; i++ ) {
1173        ret[i] = JobStatus.downgrade(stats[i]);
1174      }
1175      return ret;
1176    } catch (InterruptedException ie) {
1177      throw new IOException(ie);
1178    }
1179  }
1180  
1181  /**
1182   * Gets the queue information associated to a particular Job Queue
1183   * 
1184   * @param queueName name of the job queue.
1185   * @return Queue information associated to particular queue.
1186   * @throws IOException
1187   */
1188  public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1189    try {
1190      QueueInfo queueInfo = clientUgi.doAs(new 
1191          PrivilegedExceptionAction<QueueInfo>() {
1192        public QueueInfo run() throws IOException, InterruptedException {
1193          return cluster.getQueue(queueName);
1194        }
1195      });
1196      if (queueInfo != null) {
1197        return new JobQueueInfo(queueInfo);
1198      }
1199      return null;
1200    } catch (InterruptedException ie) {
1201      throw new IOException(ie);
1202    }
1203  }
1204  
1205  /**
1206   * Gets the Queue ACLs for current user
1207   * @return array of QueueAclsInfo object for current user.
1208   * @throws IOException
1209   */
1210  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1211    try {
1212      org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
1213        clientUgi.doAs(new 
1214            PrivilegedExceptionAction
1215            <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1216              public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 
1217              throws IOException, InterruptedException {
1218                return cluster.getQueueAclsForCurrentUser();
1219              }
1220        });
1221      QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1222      for (int i = 0 ; i < acls.length; i++ ) {
1223        ret[i] = QueueAclsInfo.downgrade(acls[i]);
1224      }
1225      return ret;
1226    } catch (InterruptedException ie) {
1227      throw new IOException(ie);
1228    }
1229  }
1230
1231  /**
1232   * Get a delegation token for the user from the JobTracker.
1233   * @param renewer the user who can renew the token
1234   * @return the new token
1235   * @throws IOException
1236   */
1237  public Token<DelegationTokenIdentifier> 
1238    getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1239    return clientUgi.doAs(new 
1240        PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1241      public Token<DelegationTokenIdentifier> run() throws IOException, 
1242      InterruptedException {
1243        return cluster.getDelegationToken(renewer);
1244      }
1245    });
1246  }
1247
1248  /**
1249   * Renew a delegation token
1250   * @param token the token to renew
1251   * @return true if the renewal went well
1252   * @throws InvalidToken
1253   * @throws IOException
1254   * @deprecated Use {@link Token#renew} instead
1255   */
1256  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1257                                   ) throws InvalidToken, IOException, 
1258                                            InterruptedException {
1259    return token.renew(getConf());
1260  }
1261
1262  /**
1263   * Cancel a delegation token from the JobTracker
1264   * @param token the token to cancel
1265   * @throws IOException
1266   * @deprecated Use {@link Token#cancel} instead
1267   */
1268  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1269                                    ) throws InvalidToken, IOException, 
1270                                             InterruptedException {
1271    token.cancel(getConf());
1272  }
1273
1274  /**
1275   */
1276  public static void main(String argv[]) throws Exception {
1277    int res = ToolRunner.run(new JobClient(), argv);
1278    System.exit(res);
1279  }
1280}
1281