001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.IOException;
022import java.net.URI;
023import java.security.PrivilegedExceptionAction;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.classification.InterfaceAudience.Private;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.conf.Configuration.IntegerRanges;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.RawComparator;
035import org.apache.hadoop.mapred.JobConf;
036import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038import org.apache.hadoop.mapreduce.task.JobContextImpl;
039import org.apache.hadoop.mapreduce.util.ConfigUtil;
040import org.apache.hadoop.util.StringUtils;
041import org.apache.hadoop.yarn.api.records.ReservationId;
042
043/**
044 * The job submitter's view of the Job.
045 * 
046 * <p>It allows the user to configure the
047 * job, submit it, control its execution, and query the state. The set methods
048 * only work until the job is submitted, afterwards they will throw an 
049 * IllegalStateException. </p>
050 * 
051 * <p>
052 * Normally the user creates the application, describes various facets of the
053 * job via {@link Job} and then submits the job and monitor its progress.</p>
054 * 
055 * <p>Here is an example on how to submit a job:</p>
056 * <p><blockquote><pre>
057 *     // Create a new Job
058 *     Job job = Job.getInstance();
059 *     job.setJarByClass(MyJob.class);
060 *     
061 *     // Specify various job-specific parameters     
062 *     job.setJobName("myjob");
063 *     
064 *     job.setInputPath(new Path("in"));
065 *     job.setOutputPath(new Path("out"));
066 *     
067 *     job.setMapperClass(MyJob.MyMapper.class);
068 *     job.setReducerClass(MyJob.MyReducer.class);
069 *
070 *     // Submit the job, then poll for progress until the job is complete
071 *     job.waitForCompletion(true);
072 * </pre></blockquote></p>
073 * 
074 * 
075 */
076@InterfaceAudience.Public
077@InterfaceStability.Evolving
078public class Job extends JobContextImpl implements JobContext {  
079  private static final Log LOG = LogFactory.getLog(Job.class);
080
081  @InterfaceStability.Evolving
082  public static enum JobState {DEFINE, RUNNING};
083  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
084  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
085  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
086  public static final String COMPLETION_POLL_INTERVAL_KEY = 
087    "mapreduce.client.completion.pollinterval";
088  
089  /** Default completionPollIntervalMillis is 5000 ms. */
090  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
091  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
092  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
093    "mapreduce.client.progressmonitor.pollinterval";
094  /** Default progMonitorPollIntervalMillis is 1000 ms. */
095  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
096
097  public static final String USED_GENERIC_PARSER = 
098    "mapreduce.client.genericoptionsparser.used";
099  public static final String SUBMIT_REPLICATION = 
100    "mapreduce.client.submit.file.replication";
101  private static final String TASKLOG_PULL_TIMEOUT_KEY =
102           "mapreduce.client.tasklog.timeout";
103  private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
104  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
105
106  @InterfaceStability.Evolving
107  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
108
109  static {
110    ConfigUtil.loadResources();
111  }
112
113  private JobState state = JobState.DEFINE;
114  private JobStatus status;
115  private long statustime;
116  private Cluster cluster;
117  private ReservationId reservationId;
118
119  /**
120   * @deprecated Use {@link #getInstance()}
121   */
122  @Deprecated
123  public Job() throws IOException {
124    this(new Configuration());
125  }
126
127  /**
128   * @deprecated Use {@link #getInstance(Configuration)}
129   */
130  @Deprecated
131  public Job(Configuration conf) throws IOException {
132    this(new JobConf(conf));
133  }
134
135  /**
136   * @deprecated Use {@link #getInstance(Configuration, String)}
137   */
138  @Deprecated
139  public Job(Configuration conf, String jobName) throws IOException {
140    this(conf);
141    setJobName(jobName);
142  }
143
144  Job(JobConf conf) throws IOException {
145    super(conf, null);
146    // propagate existing user credentials to job
147    this.credentials.mergeAll(this.ugi.getCredentials());
148    this.cluster = null;
149  }
150
151  Job(JobStatus status, JobConf conf) throws IOException {
152    this(conf);
153    setJobID(status.getJobID());
154    this.status = status;
155    state = JobState.RUNNING;
156  }
157
158      
159  /**
160   * Creates a new {@link Job} with no particular {@link Cluster} .
161   * A Cluster will be created with a generic {@link Configuration}.
162   * 
163   * @return the {@link Job} , with no connection to a cluster yet.
164   * @throws IOException
165   */
166  public static Job getInstance() throws IOException {
167    // create with a null Cluster
168    return getInstance(new Configuration());
169  }
170      
171  /**
172   * Creates a new {@link Job} with no particular {@link Cluster} and a 
173   * given {@link Configuration}.
174   * 
175   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
176   * that any necessary internal modifications do not reflect on the incoming 
177   * parameter.
178   * 
179   * A Cluster will be created from the conf parameter only when it's needed.
180   * 
181   * @param conf the configuration
182   * @return the {@link Job} , with no connection to a cluster yet.
183   * @throws IOException
184   */
185  public static Job getInstance(Configuration conf) throws IOException {
186    // create with a null Cluster
187    JobConf jobConf = new JobConf(conf);
188    return new Job(jobConf);
189  }
190
191      
192  /**
193   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
194   * A Cluster will be created from the conf parameter only when it's needed.
195   *
196   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
197   * that any necessary internal modifications do not reflect on the incoming 
198   * parameter.
199   * 
200   * @param conf the configuration
201   * @return the {@link Job} , with no connection to a cluster yet.
202   * @throws IOException
203   */
204  public static Job getInstance(Configuration conf, String jobName)
205           throws IOException {
206    // create with a null Cluster
207    Job result = getInstance(conf);
208    result.setJobName(jobName);
209    return result;
210  }
211  
212  /**
213   * Creates a new {@link Job} with no particular {@link Cluster} and given
214   * {@link Configuration} and {@link JobStatus}.
215   * A Cluster will be created from the conf parameter only when it's needed.
216   * 
217   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
218   * that any necessary internal modifications do not reflect on the incoming 
219   * parameter.
220   * 
221   * @param status job status
222   * @param conf job configuration
223   * @return the {@link Job} , with no connection to a cluster yet.
224   * @throws IOException
225   */
226  public static Job getInstance(JobStatus status, Configuration conf) 
227  throws IOException {
228    return new Job(status, new JobConf(conf));
229  }
230
231  /**
232   * Creates a new {@link Job} with no particular {@link Cluster}.
233   * A Cluster will be created from the conf parameter only when it's needed.
234   *
235   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
236   * that any necessary internal modifications do not reflect on the incoming 
237   * parameter.
238   * 
239   * @param ignored
240   * @return the {@link Job} , with no connection to a cluster yet.
241   * @throws IOException
242   * @deprecated Use {@link #getInstance()}
243   */
244  @Deprecated
245  public static Job getInstance(Cluster ignored) throws IOException {
246    return getInstance();
247  }
248  
249  /**
250   * Creates a new {@link Job} with no particular {@link Cluster} and given
251   * {@link Configuration}.
252   * A Cluster will be created from the conf parameter only when it's needed.
253   * 
254   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
255   * that any necessary internal modifications do not reflect on the incoming 
256   * parameter.
257   * 
258   * @param ignored
259   * @param conf job configuration
260   * @return the {@link Job} , with no connection to a cluster yet.
261   * @throws IOException
262   * @deprecated Use {@link #getInstance(Configuration)}
263   */
264  @Deprecated
265  public static Job getInstance(Cluster ignored, Configuration conf) 
266      throws IOException {
267    return getInstance(conf);
268  }
269  
270  /**
271   * Creates a new {@link Job} with no particular {@link Cluster} and given
272   * {@link Configuration} and {@link JobStatus}.
273   * A Cluster will be created from the conf parameter only when it's needed.
274   * 
275   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
276   * that any necessary internal modifications do not reflect on the incoming 
277   * parameter.
278   * 
279   * @param cluster cluster
280   * @param status job status
281   * @param conf job configuration
282   * @return the {@link Job} , with no connection to a cluster yet.
283   * @throws IOException
284   */
285  @Private
286  public static Job getInstance(Cluster cluster, JobStatus status, 
287      Configuration conf) throws IOException {
288    Job job = getInstance(status, conf);
289    job.setCluster(cluster);
290    return job;
291  }
292
293  private void ensureState(JobState state) throws IllegalStateException {
294    if (state != this.state) {
295      throw new IllegalStateException("Job in state "+ this.state + 
296                                      " instead of " + state);
297    }
298
299    if (state == JobState.RUNNING && cluster == null) {
300      throw new IllegalStateException
301        ("Job in state " + this.state
302         + ", but it isn't attached to any job tracker!");
303    }
304  }
305
306  /**
307   * Some methods rely on having a recent job status object.  Refresh
308   * it, if necessary
309   */
310  synchronized void ensureFreshStatus() 
311      throws IOException {
312    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
313      updateStatus();
314    }
315  }
316    
317  /** Some methods need to update status immediately. So, refresh
318   * immediately
319   * @throws IOException
320   */
321  synchronized void updateStatus() throws IOException {
322    try {
323      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
324        @Override
325        public JobStatus run() throws IOException, InterruptedException {
326          return cluster.getClient().getJobStatus(getJobID());
327        }
328      });
329    }
330    catch (InterruptedException ie) {
331      throw new IOException(ie);
332    }
333    if (this.status == null) {
334      throw new IOException("Job status not available ");
335    }
336    this.statustime = System.currentTimeMillis();
337  }
338  
339  public JobStatus getStatus() throws IOException, InterruptedException {
340    ensureState(JobState.RUNNING);
341    updateStatus();
342    return status;
343  }
344  
345  private void setStatus(JobStatus status) {
346    this.status = status;
347  }
348
349  /**
350   * Returns the current state of the Job.
351   * 
352   * @return JobStatus#State
353   * @throws IOException
354   * @throws InterruptedException
355   */
356  public JobStatus.State getJobState() 
357      throws IOException, InterruptedException {
358    ensureState(JobState.RUNNING);
359    updateStatus();
360    return status.getState();
361  }
362
363  /**
364   * Set the boolean property for specifying which classpath takes precedence -
365   * the user's one or the system one, when the tasks are launched
366   * @param value pass true if user's classes should take precedence
367   */
368  public void setUserClassesTakesPrecedence(boolean value) {
369    ensureState(JobState.DEFINE);
370    conf.setUserClassesTakesPrecedence(value);
371  }
372
373  /**
374   * Get the URL where some job progress information will be displayed.
375   * 
376   * @return the URL where some job progress information will be displayed.
377   */
378  public String getTrackingURL(){
379    ensureState(JobState.RUNNING);
380    return status.getTrackingUrl().toString();
381  }
382
383  /**
384   * Get the path of the submitted job configuration.
385   * 
386   * @return the path of the submitted job configuration.
387   */
388  public String getJobFile() {
389    ensureState(JobState.RUNNING);
390    return status.getJobFile();
391  }
392
393  /**
394   * Get start time of the job.
395   * 
396   * @return the start time of the job
397   */
398  public long getStartTime() {
399    ensureState(JobState.RUNNING);
400    return status.getStartTime();
401  }
402
403  /**
404   * Get finish time of the job.
405   * 
406   * @return the finish time of the job
407   */
408  public long getFinishTime() throws IOException, InterruptedException {
409    ensureState(JobState.RUNNING);
410    updateStatus();
411    return status.getFinishTime();
412  }
413
414  /**
415   * Get scheduling info of the job.
416   * 
417   * @return the scheduling info of the job
418   */
419  public String getSchedulingInfo() {
420    ensureState(JobState.RUNNING);
421    return status.getSchedulingInfo();
422  }
423
424  /**
425   * Get scheduling info of the job.
426   * 
427   * @return the scheduling info of the job
428   */
429  public JobPriority getPriority() throws IOException, InterruptedException {
430    ensureState(JobState.RUNNING);
431    updateStatus();
432    return status.getPriority();
433  }
434
435  /**
436   * The user-specified job name.
437   */
438  public String getJobName() {
439    if (state == JobState.DEFINE || status == null) {
440      return super.getJobName();
441    }
442    ensureState(JobState.RUNNING);
443    return status.getJobName();
444  }
445
446  public String getHistoryUrl() throws IOException, InterruptedException {
447    ensureState(JobState.RUNNING);
448    updateStatus();
449    return status.getHistoryFile();
450  }
451
452  public boolean isRetired() throws IOException, InterruptedException {
453    ensureState(JobState.RUNNING);
454    updateStatus();
455    return status.isRetired();
456  }
457  
458  @Private
459  public Cluster getCluster() {
460    return cluster;
461  }
462
463  /** Only for mocks in unit tests. */
464  @Private
465  private void setCluster(Cluster cluster) {
466    this.cluster = cluster;
467  }
468
469  /**
470   * Dump stats to screen.
471   */
472  @Override
473  public String toString() {
474    ensureState(JobState.RUNNING);
475    String reasonforFailure = " ";
476    int numMaps = 0;
477    int numReduces = 0;
478    try {
479      updateStatus();
480      if (status.getState().equals(JobStatus.State.FAILED))
481        reasonforFailure = getTaskFailureEventString();
482      numMaps = getTaskReports(TaskType.MAP).length;
483      numReduces = getTaskReports(TaskType.REDUCE).length;
484    } catch (IOException e) {
485    } catch (InterruptedException ie) {
486    }
487    StringBuffer sb = new StringBuffer();
488    sb.append("Job: ").append(status.getJobID()).append("\n");
489    sb.append("Job File: ").append(status.getJobFile()).append("\n");
490    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
491    sb.append("\n");
492    sb.append("Uber job : ").append(status.isUber()).append("\n");
493    sb.append("Number of maps: ").append(numMaps).append("\n");
494    sb.append("Number of reduces: ").append(numReduces).append("\n");
495    sb.append("map() completion: ");
496    sb.append(status.getMapProgress()).append("\n");
497    sb.append("reduce() completion: ");
498    sb.append(status.getReduceProgress()).append("\n");
499    sb.append("Job state: ");
500    sb.append(status.getState()).append("\n");
501    sb.append("retired: ").append(status.isRetired()).append("\n");
502    sb.append("reason for failure: ").append(reasonforFailure);
503    return sb.toString();
504  }
505
506  /**
507   * @return taskid which caused job failure
508   * @throws IOException
509   * @throws InterruptedException
510   */
511  String getTaskFailureEventString() throws IOException,
512      InterruptedException {
513    int failCount = 1;
514    TaskCompletionEvent lastEvent = null;
515    TaskCompletionEvent[] events = ugi.doAs(new 
516        PrivilegedExceptionAction<TaskCompletionEvent[]>() {
517          @Override
518          public TaskCompletionEvent[] run() throws IOException,
519          InterruptedException {
520            return cluster.getClient().getTaskCompletionEvents(
521                status.getJobID(), 0, 10);
522          }
523        });
524    for (TaskCompletionEvent event : events) {
525      if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
526        failCount++;
527        lastEvent = event;
528      }
529    }
530    if (lastEvent == null) {
531      return "There are no failed tasks for the job. "
532          + "Job is failed due to some other reason and reason "
533          + "can be found in the logs.";
534    }
535    String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
536    String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
537    return (" task " + taskID + " failed " +
538      failCount + " times " + "For details check tasktracker at: " +
539      lastEvent.getTaskTrackerHttp());
540  }
541
542  /**
543   * Get the information of the current state of the tasks of a job.
544   * 
545   * @param type Type of the task
546   * @return the list of all of the map tips.
547   * @throws IOException
548   */
549  public TaskReport[] getTaskReports(TaskType type) 
550      throws IOException, InterruptedException {
551    ensureState(JobState.RUNNING);
552    final TaskType tmpType = type;
553    return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
554      public TaskReport[] run() throws IOException, InterruptedException {
555        return cluster.getClient().getTaskReports(getJobID(), tmpType);
556      }
557    });
558  }
559
560  /**
561   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
562   * and 1.0.  When all map tasks have completed, the function returns 1.0.
563   * 
564   * @return the progress of the job's map-tasks.
565   * @throws IOException
566   */
567  public float mapProgress() throws IOException {
568    ensureState(JobState.RUNNING);
569    ensureFreshStatus();
570    return status.getMapProgress();
571  }
572
573  /**
574   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
575   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
576   * 
577   * @return the progress of the job's reduce-tasks.
578   * @throws IOException
579   */
580  public float reduceProgress() throws IOException {
581    ensureState(JobState.RUNNING);
582    ensureFreshStatus();
583    return status.getReduceProgress();
584  }
585
586  /**
587   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
588   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
589   * 
590   * @return the progress of the job's cleanup-tasks.
591   * @throws IOException
592   */
593  public float cleanupProgress() throws IOException, InterruptedException {
594    ensureState(JobState.RUNNING);
595    ensureFreshStatus();
596    return status.getCleanupProgress();
597  }
598
599  /**
600   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
601   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
602   * 
603   * @return the progress of the job's setup-tasks.
604   * @throws IOException
605   */
606  public float setupProgress() throws IOException {
607    ensureState(JobState.RUNNING);
608    ensureFreshStatus();
609    return status.getSetupProgress();
610  }
611
612  /**
613   * Check if the job is finished or not. 
614   * This is a non-blocking call.
615   * 
616   * @return <code>true</code> if the job is complete, else <code>false</code>.
617   * @throws IOException
618   */
619  public boolean isComplete() throws IOException {
620    ensureState(JobState.RUNNING);
621    updateStatus();
622    return status.isJobComplete();
623  }
624
625  /**
626   * Check if the job completed successfully. 
627   * 
628   * @return <code>true</code> if the job succeeded, else <code>false</code>.
629   * @throws IOException
630   */
631  public boolean isSuccessful() throws IOException {
632    ensureState(JobState.RUNNING);
633    updateStatus();
634    return status.getState() == JobStatus.State.SUCCEEDED;
635  }
636
637  /**
638   * Kill the running job.  Blocks until all job tasks have been
639   * killed as well.  If the job is no longer running, it simply returns.
640   * 
641   * @throws IOException
642   */
643  public void killJob() throws IOException {
644    ensureState(JobState.RUNNING);
645    try {
646      cluster.getClient().killJob(getJobID());
647    }
648    catch (InterruptedException ie) {
649      throw new IOException(ie);
650    }
651  }
652
653  /**
654   * Set the priority of a running job.
655   * @param priority the new priority for the job.
656   * @throws IOException
657   */
658  public void setPriority(JobPriority priority) 
659      throws IOException, InterruptedException {
660    if (state == JobState.DEFINE) {
661      conf.setJobPriority(
662        org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
663    } else {
664      ensureState(JobState.RUNNING);
665      final JobPriority tmpPriority = priority;
666      ugi.doAs(new PrivilegedExceptionAction<Object>() {
667        @Override
668        public Object run() throws IOException, InterruptedException {
669          cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
670          return null;
671        }
672      });
673    }
674  }
675
676  /**
677   * Get events indicating completion (success/failure) of component tasks.
678   *  
679   * @param startFrom index to start fetching events from
680   * @param numEvents number of events to fetch
681   * @return an array of {@link TaskCompletionEvent}s
682   * @throws IOException
683   */
684  public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
685      final int numEvents) throws IOException, InterruptedException {
686    ensureState(JobState.RUNNING);
687    return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
688      @Override
689      public TaskCompletionEvent[] run() throws IOException, InterruptedException {
690        return cluster.getClient().getTaskCompletionEvents(getJobID(),
691            startFrom, numEvents); 
692      }
693    });
694  }
695
696  /**
697   * Get events indicating completion (success/failure) of component tasks.
698   *  
699   * @param startFrom index to start fetching events from
700   * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
701   * @throws IOException
702   */
703  public org.apache.hadoop.mapred.TaskCompletionEvent[]
704    getTaskCompletionEvents(final int startFrom) throws IOException {
705    try {
706      TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
707      org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
708          new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
709      for (int i = 0; i < events.length; i++) {
710        retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
711            (events[i]);
712      }
713      return retEvents;
714    } catch (InterruptedException ie) {
715      throw new IOException(ie);
716    }
717  }
718
719  /**
720   * Kill indicated task attempt.
721   * @param taskId the id of the task to kill.
722   * @param shouldFail if <code>true</code> the task is failed and added
723   *                   to failed tasks list, otherwise it is just killed,
724   *                   w/o affecting job failure status.
725   */
726  @Private
727  public boolean killTask(final TaskAttemptID taskId,
728                          final boolean shouldFail) throws IOException {
729    ensureState(JobState.RUNNING);
730    try {
731      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
732        public Boolean run() throws IOException, InterruptedException {
733          return cluster.getClient().killTask(taskId, shouldFail);
734        }
735      });
736    }
737    catch (InterruptedException ie) {
738      throw new IOException(ie);
739    }
740  }
741
742  /**
743   * Kill indicated task attempt.
744   * 
745   * @param taskId the id of the task to be terminated.
746   * @throws IOException
747   */
748  public void killTask(final TaskAttemptID taskId)
749      throws IOException {
750    killTask(taskId, false);
751  }
752
753  /**
754   * Fail indicated task attempt.
755   * 
756   * @param taskId the id of the task to be terminated.
757   * @throws IOException
758   */
759  public void failTask(final TaskAttemptID taskId)
760      throws IOException {
761    killTask(taskId, true);
762  }
763
764  /**
765   * Gets the counters for this job. May return null if the job has been
766   * retired and the job is no longer in the completed job store.
767   * 
768   * @return the counters for this job.
769   * @throws IOException
770   */
771  public Counters getCounters() 
772      throws IOException {
773    ensureState(JobState.RUNNING);
774    try {
775      return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
776        @Override
777        public Counters run() throws IOException, InterruptedException {
778          return cluster.getClient().getJobCounters(getJobID());
779        }
780      });
781    }
782    catch (InterruptedException ie) {
783      throw new IOException(ie);
784    }
785  }
786
787  /**
788   * Gets the diagnostic messages for a given task attempt.
789   * @param taskid
790   * @return the list of diagnostic messages for the task
791   * @throws IOException
792   */
793  public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
794      throws IOException, InterruptedException {
795    ensureState(JobState.RUNNING);
796    return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
797      @Override
798      public String[] run() throws IOException, InterruptedException {
799        return cluster.getClient().getTaskDiagnostics(taskid);
800      }
801    });
802  }
803
804  /**
805   * Set the number of reduce tasks for the job.
806   * @param tasks the number of reduce tasks
807   * @throws IllegalStateException if the job is submitted
808   */
809  public void setNumReduceTasks(int tasks) throws IllegalStateException {
810    ensureState(JobState.DEFINE);
811    conf.setNumReduceTasks(tasks);
812  }
813
814  /**
815   * Set the current working directory for the default file system.
816   * 
817   * @param dir the new current working directory.
818   * @throws IllegalStateException if the job is submitted
819   */
820  public void setWorkingDirectory(Path dir) throws IOException {
821    ensureState(JobState.DEFINE);
822    conf.setWorkingDirectory(dir);
823  }
824
825  /**
826   * Set the {@link InputFormat} for the job.
827   * @param cls the <code>InputFormat</code> to use
828   * @throws IllegalStateException if the job is submitted
829   */
830  public void setInputFormatClass(Class<? extends InputFormat> cls
831                                  ) throws IllegalStateException {
832    ensureState(JobState.DEFINE);
833    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
834                  InputFormat.class);
835  }
836
837  /**
838   * Set the {@link OutputFormat} for the job.
839   * @param cls the <code>OutputFormat</code> to use
840   * @throws IllegalStateException if the job is submitted
841   */
842  public void setOutputFormatClass(Class<? extends OutputFormat> cls
843                                   ) throws IllegalStateException {
844    ensureState(JobState.DEFINE);
845    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
846                  OutputFormat.class);
847  }
848
849  /**
850   * Set the {@link Mapper} for the job.
851   * @param cls the <code>Mapper</code> to use
852   * @throws IllegalStateException if the job is submitted
853   */
854  public void setMapperClass(Class<? extends Mapper> cls
855                             ) throws IllegalStateException {
856    ensureState(JobState.DEFINE);
857    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
858  }
859
860  /**
861   * Set the Jar by finding where a given class came from.
862   * @param cls the example class
863   */
864  public void setJarByClass(Class<?> cls) {
865    ensureState(JobState.DEFINE);
866    conf.setJarByClass(cls);
867  }
868
869  /**
870   * Set the job jar 
871   */
872  public void setJar(String jar) {
873    ensureState(JobState.DEFINE);
874    conf.setJar(jar);
875  }
876
877  /**
878   * Set the reported username for this job.
879   * 
880   * @param user the username for this job.
881   */
882  public void setUser(String user) {
883    ensureState(JobState.DEFINE);
884    conf.setUser(user);
885  }
886
887  /**
888   * Set the combiner class for the job.
889   * @param cls the combiner to use
890   * @throws IllegalStateException if the job is submitted
891   */
892  public void setCombinerClass(Class<? extends Reducer> cls
893                               ) throws IllegalStateException {
894    ensureState(JobState.DEFINE);
895    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
896  }
897
898  /**
899   * Set the {@link Reducer} for the job.
900   * @param cls the <code>Reducer</code> to use
901   * @throws IllegalStateException if the job is submitted
902   */
903  public void setReducerClass(Class<? extends Reducer> cls
904                              ) throws IllegalStateException {
905    ensureState(JobState.DEFINE);
906    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
907  }
908
909  /**
910   * Set the {@link Partitioner} for the job.
911   * @param cls the <code>Partitioner</code> to use
912   * @throws IllegalStateException if the job is submitted
913   */
914  public void setPartitionerClass(Class<? extends Partitioner> cls
915                                  ) throws IllegalStateException {
916    ensureState(JobState.DEFINE);
917    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
918                  Partitioner.class);
919  }
920
921  /**
922   * Set the key class for the map output data. This allows the user to
923   * specify the map output key class to be different than the final output
924   * value class.
925   * 
926   * @param theClass the map output key class.
927   * @throws IllegalStateException if the job is submitted
928   */
929  public void setMapOutputKeyClass(Class<?> theClass
930                                   ) throws IllegalStateException {
931    ensureState(JobState.DEFINE);
932    conf.setMapOutputKeyClass(theClass);
933  }
934
935  /**
936   * Set the value class for the map output data. This allows the user to
937   * specify the map output value class to be different than the final output
938   * value class.
939   * 
940   * @param theClass the map output value class.
941   * @throws IllegalStateException if the job is submitted
942   */
943  public void setMapOutputValueClass(Class<?> theClass
944                                     ) throws IllegalStateException {
945    ensureState(JobState.DEFINE);
946    conf.setMapOutputValueClass(theClass);
947  }
948
949  /**
950   * Set the key class for the job output data.
951   * 
952   * @param theClass the key class for the job output data.
953   * @throws IllegalStateException if the job is submitted
954   */
955  public void setOutputKeyClass(Class<?> theClass
956                                ) throws IllegalStateException {
957    ensureState(JobState.DEFINE);
958    conf.setOutputKeyClass(theClass);
959  }
960
961  /**
962   * Set the value class for job outputs.
963   * 
964   * @param theClass the value class for job outputs.
965   * @throws IllegalStateException if the job is submitted
966   */
967  public void setOutputValueClass(Class<?> theClass
968                                  ) throws IllegalStateException {
969    ensureState(JobState.DEFINE);
970    conf.setOutputValueClass(theClass);
971  }
972
973  /**
974   * Define the comparator that controls which keys are grouped together
975   * for a single call to combiner,
976   * {@link Reducer#reduce(Object, Iterable,
977   * org.apache.hadoop.mapreduce.Reducer.Context)}
978   *
979   * @param cls the raw comparator to use
980   * @throws IllegalStateException if the job is submitted
981   */
982  public void setCombinerKeyGroupingComparatorClass(
983      Class<? extends RawComparator> cls) throws IllegalStateException {
984    ensureState(JobState.DEFINE);
985    conf.setCombinerKeyGroupingComparator(cls);
986  }
987
988  /**
989   * Define the comparator that controls how the keys are sorted before they
990   * are passed to the {@link Reducer}.
991   * @param cls the raw comparator
992   * @throws IllegalStateException if the job is submitted
993   * @see #setCombinerKeyGroupingComparatorClass(Class)
994   */
995  public void setSortComparatorClass(Class<? extends RawComparator> cls
996                                     ) throws IllegalStateException {
997    ensureState(JobState.DEFINE);
998    conf.setOutputKeyComparatorClass(cls);
999  }
1000
1001  /**
1002   * Define the comparator that controls which keys are grouped together
1003   * for a single call to 
1004   * {@link Reducer#reduce(Object, Iterable, 
1005   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
1006   * @param cls the raw comparator to use
1007   * @throws IllegalStateException if the job is submitted
1008   * @see #setCombinerKeyGroupingComparatorClass(Class)
1009   */
1010  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
1011                                         ) throws IllegalStateException {
1012    ensureState(JobState.DEFINE);
1013    conf.setOutputValueGroupingComparator(cls);
1014  }
1015
1016  /**
1017   * Set the user-specified job name.
1018   * 
1019   * @param name the job's new name.
1020   * @throws IllegalStateException if the job is submitted
1021   */
1022  public void setJobName(String name) throws IllegalStateException {
1023    ensureState(JobState.DEFINE);
1024    conf.setJobName(name);
1025  }
1026
1027  /**
1028   * Turn speculative execution on or off for this job. 
1029   * 
1030   * @param speculativeExecution <code>true</code> if speculative execution 
1031   *                             should be turned on, else <code>false</code>.
1032   */
1033  public void setSpeculativeExecution(boolean speculativeExecution) {
1034    ensureState(JobState.DEFINE);
1035    conf.setSpeculativeExecution(speculativeExecution);
1036  }
1037
1038  /**
1039   * Turn speculative execution on or off for this job for map tasks. 
1040   * 
1041   * @param speculativeExecution <code>true</code> if speculative execution 
1042   *                             should be turned on for map tasks,
1043   *                             else <code>false</code>.
1044   */
1045  public void setMapSpeculativeExecution(boolean speculativeExecution) {
1046    ensureState(JobState.DEFINE);
1047    conf.setMapSpeculativeExecution(speculativeExecution);
1048  }
1049
1050  /**
1051   * Turn speculative execution on or off for this job for reduce tasks. 
1052   * 
1053   * @param speculativeExecution <code>true</code> if speculative execution 
1054   *                             should be turned on for reduce tasks,
1055   *                             else <code>false</code>.
1056   */
1057  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1058    ensureState(JobState.DEFINE);
1059    conf.setReduceSpeculativeExecution(speculativeExecution);
1060  }
1061
1062  /**
1063   * Specify whether job-setup and job-cleanup is needed for the job 
1064   * 
1065   * @param needed If <code>true</code>, job-setup and job-cleanup will be
1066   *               considered from {@link OutputCommitter} 
1067   *               else ignored.
1068   */
1069  public void setJobSetupCleanupNeeded(boolean needed) {
1070    ensureState(JobState.DEFINE);
1071    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1072  }
1073
1074  /**
1075   * Set the given set of archives
1076   * @param archives The list of archives that need to be localized
1077   */
1078  public void setCacheArchives(URI[] archives) {
1079    ensureState(JobState.DEFINE);
1080    DistributedCache.setCacheArchives(archives, conf);
1081  }
1082
1083  /**
1084   * Set the given set of files
1085   * @param files The list of files that need to be localized
1086   */
1087  public void setCacheFiles(URI[] files) {
1088    ensureState(JobState.DEFINE);
1089    DistributedCache.setCacheFiles(files, conf);
1090  }
1091
1092  /**
1093   * Add a archives to be localized
1094   * @param uri The uri of the cache to be localized
1095   */
1096  public void addCacheArchive(URI uri) {
1097    ensureState(JobState.DEFINE);
1098    DistributedCache.addCacheArchive(uri, conf);
1099  }
1100  
1101  /**
1102   * Add a file to be localized
1103   * @param uri The uri of the cache to be localized
1104   */
1105  public void addCacheFile(URI uri) {
1106    ensureState(JobState.DEFINE);
1107    DistributedCache.addCacheFile(uri, conf);
1108  }
1109
1110  /**
1111   * Add an file path to the current set of classpath entries It adds the file
1112   * to cache as well.
1113   * 
1114   * Files added with this method will not be unpacked while being added to the
1115   * classpath.
1116   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1117   * method instead.
1118   *
1119   * @param file Path of the file to be added
1120   */
1121  public void addFileToClassPath(Path file)
1122    throws IOException {
1123    ensureState(JobState.DEFINE);
1124    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1125  }
1126
1127  /**
1128   * Add an archive path to the current set of classpath entries. It adds the
1129   * archive to cache as well.
1130   * 
1131   * Archive files will be unpacked and added to the classpath
1132   * when being distributed.
1133   *
1134   * @param archive Path of the archive to be added
1135   */
1136  public void addArchiveToClassPath(Path archive)
1137    throws IOException {
1138    ensureState(JobState.DEFINE);
1139    DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1140  }
1141
1142  /**
1143   * Originally intended to enable symlinks, but currently symlinks cannot be
1144   * disabled.
1145   */
1146  @Deprecated
1147  public void createSymlink() {
1148    ensureState(JobState.DEFINE);
1149    DistributedCache.createSymlink(conf);
1150  }
1151  
1152  /** 
1153   * Expert: Set the number of maximum attempts that will be made to run a
1154   * map task.
1155   * 
1156   * @param n the number of attempts per map task.
1157   */
1158  public void setMaxMapAttempts(int n) {
1159    ensureState(JobState.DEFINE);
1160    conf.setMaxMapAttempts(n);
1161  }
1162
1163  /** 
1164   * Expert: Set the number of maximum attempts that will be made to run a
1165   * reduce task.
1166   * 
1167   * @param n the number of attempts per reduce task.
1168   */
1169  public void setMaxReduceAttempts(int n) {
1170    ensureState(JobState.DEFINE);
1171    conf.setMaxReduceAttempts(n);
1172  }
1173
1174  /**
1175   * Set whether the system should collect profiler information for some of 
1176   * the tasks in this job? The information is stored in the user log 
1177   * directory.
1178   * @param newValue true means it should be gathered
1179   */
1180  public void setProfileEnabled(boolean newValue) {
1181    ensureState(JobState.DEFINE);
1182    conf.setProfileEnabled(newValue);
1183  }
1184
1185  /**
1186   * Set the profiler configuration arguments. If the string contains a '%s' it
1187   * will be replaced with the name of the profiling output file when the task
1188   * runs.
1189   *
1190   * This value is passed to the task child JVM on the command line.
1191   *
1192   * @param value the configuration string
1193   */
1194  public void setProfileParams(String value) {
1195    ensureState(JobState.DEFINE);
1196    conf.setProfileParams(value);
1197  }
1198
1199  /**
1200   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1201   * must also be called.
1202   * @param newValue a set of integer ranges of the map ids
1203   */
1204  public void setProfileTaskRange(boolean isMap, String newValue) {
1205    ensureState(JobState.DEFINE);
1206    conf.setProfileTaskRange(isMap, newValue);
1207  }
1208
1209  private void ensureNotSet(String attr, String msg) throws IOException {
1210    if (conf.get(attr) != null) {
1211      throw new IOException(attr + " is incompatible with " + msg + " mode.");
1212    }    
1213  }
1214  
1215  /**
1216   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1217   * tokens upon job completion. Defaults to true.
1218   */
1219  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1220    ensureState(JobState.DEFINE);
1221    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1222  }
1223
1224  /**
1225   * Default to the new APIs unless they are explicitly set or the old mapper or
1226   * reduce attributes are used.
1227   * @throws IOException if the configuration is inconsistant
1228   */
1229  private void setUseNewAPI() throws IOException {
1230    int numReduces = conf.getNumReduceTasks();
1231    String oldMapperClass = "mapred.mapper.class";
1232    String oldReduceClass = "mapred.reducer.class";
1233    conf.setBooleanIfUnset("mapred.mapper.new-api",
1234                           conf.get(oldMapperClass) == null);
1235    if (conf.getUseNewMapper()) {
1236      String mode = "new map API";
1237      ensureNotSet("mapred.input.format.class", mode);
1238      ensureNotSet(oldMapperClass, mode);
1239      if (numReduces != 0) {
1240        ensureNotSet("mapred.partitioner.class", mode);
1241       } else {
1242        ensureNotSet("mapred.output.format.class", mode);
1243      }      
1244    } else {
1245      String mode = "map compatability";
1246      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1247      ensureNotSet(MAP_CLASS_ATTR, mode);
1248      if (numReduces != 0) {
1249        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1250       } else {
1251        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1252      }
1253    }
1254    if (numReduces != 0) {
1255      conf.setBooleanIfUnset("mapred.reducer.new-api",
1256                             conf.get(oldReduceClass) == null);
1257      if (conf.getUseNewReducer()) {
1258        String mode = "new reduce API";
1259        ensureNotSet("mapred.output.format.class", mode);
1260        ensureNotSet(oldReduceClass, mode);   
1261      } else {
1262        String mode = "reduce compatability";
1263        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1264        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1265      }
1266    }   
1267  }
1268
1269  private synchronized void connect()
1270          throws IOException, InterruptedException, ClassNotFoundException {
1271    if (cluster == null) {
1272      cluster = 
1273        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1274                   public Cluster run()
1275                          throws IOException, InterruptedException, 
1276                                 ClassNotFoundException {
1277                     return new Cluster(getConfiguration());
1278                   }
1279                 });
1280    }
1281  }
1282
1283  boolean isConnected() {
1284    return cluster != null;
1285  }
1286
1287  /** Only for mocking via unit tests. */
1288  @Private
1289  public JobSubmitter getJobSubmitter(FileSystem fs, 
1290      ClientProtocol submitClient) throws IOException {
1291    return new JobSubmitter(fs, submitClient);
1292  }
1293  /**
1294   * Submit the job to the cluster and return immediately.
1295   * @throws IOException
1296   */
1297  public void submit() 
1298         throws IOException, InterruptedException, ClassNotFoundException {
1299    ensureState(JobState.DEFINE);
1300    setUseNewAPI();
1301    connect();
1302    final JobSubmitter submitter = 
1303        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1304    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1305      public JobStatus run() throws IOException, InterruptedException, 
1306      ClassNotFoundException {
1307        return submitter.submitJobInternal(Job.this, cluster);
1308      }
1309    });
1310    state = JobState.RUNNING;
1311    LOG.info("The url to track the job: " + getTrackingURL());
1312   }
1313  
1314  /**
1315   * Submit the job to the cluster and wait for it to finish.
1316   * @param verbose print the progress to the user
1317   * @return true if the job succeeded
1318   * @throws IOException thrown if the communication with the 
1319   *         <code>JobTracker</code> is lost
1320   */
1321  public boolean waitForCompletion(boolean verbose
1322                                   ) throws IOException, InterruptedException,
1323                                            ClassNotFoundException {
1324    if (state == JobState.DEFINE) {
1325      submit();
1326    }
1327    if (verbose) {
1328      monitorAndPrintJob();
1329    } else {
1330      // get the completion poll interval from the client.
1331      int completionPollIntervalMillis = 
1332        Job.getCompletionPollInterval(cluster.getConf());
1333      while (!isComplete()) {
1334        try {
1335          Thread.sleep(completionPollIntervalMillis);
1336        } catch (InterruptedException ie) {
1337        }
1338      }
1339    }
1340    return isSuccessful();
1341  }
1342  
1343  /**
1344   * Monitor a job and print status in real-time as progress is made and tasks 
1345   * fail.
1346   * @return true if the job succeeded
1347   * @throws IOException if communication to the JobTracker fails
1348   */
1349  public boolean monitorAndPrintJob() 
1350      throws IOException, InterruptedException {
1351    String lastReport = null;
1352    Job.TaskStatusFilter filter;
1353    Configuration clientConf = getConfiguration();
1354    filter = Job.getTaskOutputFilter(clientConf);
1355    JobID jobId = getJobID();
1356    LOG.info("Running job: " + jobId);
1357    int eventCounter = 0;
1358    boolean profiling = getProfileEnabled();
1359    IntegerRanges mapRanges = getProfileTaskRange(true);
1360    IntegerRanges reduceRanges = getProfileTaskRange(false);
1361    int progMonitorPollIntervalMillis = 
1362      Job.getProgressPollInterval(clientConf);
1363    /* make sure to report full progress after the job is done */
1364    boolean reportedAfterCompletion = false;
1365    boolean reportedUberMode = false;
1366    while (!isComplete() || !reportedAfterCompletion) {
1367      if (isComplete()) {
1368        reportedAfterCompletion = true;
1369      } else {
1370        Thread.sleep(progMonitorPollIntervalMillis);
1371      }
1372      if (status.getState() == JobStatus.State.PREP) {
1373        continue;
1374      }      
1375      if (!reportedUberMode) {
1376        reportedUberMode = true;
1377        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1378      }      
1379      String report = 
1380        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1381            " reduce " + 
1382            StringUtils.formatPercent(reduceProgress(), 0));
1383      if (!report.equals(lastReport)) {
1384        LOG.info(report);
1385        lastReport = report;
1386      }
1387
1388      TaskCompletionEvent[] events = 
1389        getTaskCompletionEvents(eventCounter, 10); 
1390      eventCounter += events.length;
1391      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1392    }
1393    boolean success = isSuccessful();
1394    if (success) {
1395      LOG.info("Job " + jobId + " completed successfully");
1396    } else {
1397      LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1398          " due to: " + status.getFailureInfo());
1399    }
1400    Counters counters = getCounters();
1401    if (counters != null) {
1402      LOG.info(counters.toString());
1403    }
1404    return success;
1405  }
1406
1407  /**
1408   * @return true if the profile parameters indicate that this is using
1409   * hprof, which generates profile files in a particular location
1410   * that we can retrieve to the client.
1411   */
1412  private boolean shouldDownloadProfile() {
1413    // Check the argument string that was used to initialize profiling.
1414    // If this indicates hprof and file-based output, then we're ok to
1415    // download.
1416    String profileParams = getProfileParams();
1417
1418    if (null == profileParams) {
1419      return false;
1420    }
1421
1422    // Split this on whitespace.
1423    String [] parts = profileParams.split("[ \\t]+");
1424
1425    // If any of these indicate hprof, and the use of output files, return true.
1426    boolean hprofFound = false;
1427    boolean fileFound = false;
1428    for (String p : parts) {
1429      if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1430        hprofFound = true;
1431
1432        // This contains a number of comma-delimited components, one of which
1433        // may specify the file to write to. Make sure this is present and
1434        // not empty.
1435        String [] subparts = p.split(",");
1436        for (String sub : subparts) {
1437          if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1438            fileFound = true;
1439          }
1440        }
1441      }
1442    }
1443
1444    return hprofFound && fileFound;
1445  }
1446
1447  private void printTaskEvents(TaskCompletionEvent[] events,
1448      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1449      IntegerRanges reduceRanges) throws IOException, InterruptedException {
1450    for (TaskCompletionEvent event : events) {
1451      switch (filter) {
1452      case NONE:
1453        break;
1454      case SUCCEEDED:
1455        if (event.getStatus() == 
1456          TaskCompletionEvent.Status.SUCCEEDED) {
1457          LOG.info(event.toString());
1458        }
1459        break; 
1460      case FAILED:
1461        if (event.getStatus() == 
1462          TaskCompletionEvent.Status.FAILED) {
1463          LOG.info(event.toString());
1464          // Displaying the task diagnostic information
1465          TaskAttemptID taskId = event.getTaskAttemptId();
1466          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1467          if (taskDiagnostics != null) {
1468            for (String diagnostics : taskDiagnostics) {
1469              System.err.println(diagnostics);
1470            }
1471          }
1472        }
1473        break; 
1474      case KILLED:
1475        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1476          LOG.info(event.toString());
1477        }
1478        break; 
1479      case ALL:
1480        LOG.info(event.toString());
1481        break;
1482      }
1483    }
1484  }
1485
1486  /** The interval at which monitorAndPrintJob() prints status */
1487  public static int getProgressPollInterval(Configuration conf) {
1488    // Read progress monitor poll interval from config. Default is 1 second.
1489    int progMonitorPollIntervalMillis = conf.getInt(
1490      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1491    if (progMonitorPollIntervalMillis < 1) {
1492      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1493        " has been set to an invalid value; "
1494        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1495      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1496    }
1497    return progMonitorPollIntervalMillis;
1498  }
1499
1500  /** The interval at which waitForCompletion() should check. */
1501  public static int getCompletionPollInterval(Configuration conf) {
1502    int completionPollIntervalMillis = conf.getInt(
1503      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1504    if (completionPollIntervalMillis < 1) { 
1505      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1506       " has been set to an invalid value; "
1507       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1508      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1509    }
1510    return completionPollIntervalMillis;
1511  }
1512
1513  /**
1514   * Get the task output filter.
1515   * 
1516   * @param conf the configuration.
1517   * @return the filter level.
1518   */
1519  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1520    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1521  }
1522
1523  /**
1524   * Modify the Configuration to set the task output filter.
1525   * 
1526   * @param conf the Configuration to modify.
1527   * @param newValue the value to set.
1528   */
1529  public static void setTaskOutputFilter(Configuration conf, 
1530      TaskStatusFilter newValue) {
1531    conf.set(Job.OUTPUT_FILTER, newValue.toString());
1532  }
1533
1534  public boolean isUber() throws IOException, InterruptedException {
1535    ensureState(JobState.RUNNING);
1536    updateStatus();
1537    return status.isUber();
1538  }
1539
1540  /**
1541   * Get the reservation to which the job is submitted to, if any
1542   *
1543   * @return the reservationId the identifier of the job's reservation, null if
1544   *         the job does not have any reservation associated with it
1545   */
1546  public ReservationId getReservationId() {
1547    return reservationId;
1548  }
1549
1550  /**
1551   * Set the reservation to which the job is submitted to
1552   *
1553   * @param reservationId the reservationId to set
1554   */
1555  public void setReservationId(ReservationId reservationId) {
1556    this.reservationId = reservationId;
1557  }
1558  
1559}