001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.IOException; 022import java.net.URI; 023import java.security.PrivilegedExceptionAction; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.classification.InterfaceAudience.Private; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.conf.Configuration.IntegerRanges; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.RawComparator; 035import org.apache.hadoop.mapred.JobConf; 036import org.apache.hadoop.mapreduce.filecache.DistributedCache; 037import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038import org.apache.hadoop.mapreduce.task.JobContextImpl; 039import org.apache.hadoop.mapreduce.util.ConfigUtil; 040import org.apache.hadoop.util.StringUtils; 041import org.apache.hadoop.yarn.api.records.ReservationId; 042 043/** 044 * The job submitter's view of the Job. 045 * 046 * <p>It allows the user to configure the 047 * job, submit it, control its execution, and query the state. The set methods 048 * only work until the job is submitted, afterwards they will throw an 049 * IllegalStateException. </p> 050 * 051 * <p> 052 * Normally the user creates the application, describes various facets of the 053 * job via {@link Job} and then submits the job and monitor its progress.</p> 054 * 055 * <p>Here is an example on how to submit a job:</p> 056 * <p><blockquote><pre> 057 * // Create a new Job 058 * Job job = Job.getInstance(); 059 * job.setJarByClass(MyJob.class); 060 * 061 * // Specify various job-specific parameters 062 * job.setJobName("myjob"); 063 * 064 * job.setInputPath(new Path("in")); 065 * job.setOutputPath(new Path("out")); 066 * 067 * job.setMapperClass(MyJob.MyMapper.class); 068 * job.setReducerClass(MyJob.MyReducer.class); 069 * 070 * // Submit the job, then poll for progress until the job is complete 071 * job.waitForCompletion(true); 072 * </pre></blockquote></p> 073 * 074 * 075 */ 076@InterfaceAudience.Public 077@InterfaceStability.Evolving 078public class Job extends JobContextImpl implements JobContext { 079 private static final Log LOG = LogFactory.getLog(Job.class); 080 081 @InterfaceStability.Evolving 082 public static enum JobState {DEFINE, RUNNING}; 083 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; 084 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; 085 /** Key in mapred-*.xml that sets completionPollInvervalMillis */ 086 public static final String COMPLETION_POLL_INTERVAL_KEY = 087 "mapreduce.client.completion.pollinterval"; 088 089 /** Default completionPollIntervalMillis is 5000 ms. */ 090 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; 091 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ 092 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = 093 "mapreduce.client.progressmonitor.pollinterval"; 094 /** Default progMonitorPollIntervalMillis is 1000 ms. */ 095 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; 096 097 public static final String USED_GENERIC_PARSER = 098 "mapreduce.client.genericoptionsparser.used"; 099 public static final String SUBMIT_REPLICATION = 100 "mapreduce.client.submit.file.replication"; 101 private static final String TASKLOG_PULL_TIMEOUT_KEY = 102 "mapreduce.client.tasklog.timeout"; 103 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; 104 public static final int DEFAULT_SUBMIT_REPLICATION = 10; 105 106 @InterfaceStability.Evolving 107 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 108 109 static { 110 ConfigUtil.loadResources(); 111 } 112 113 private JobState state = JobState.DEFINE; 114 private JobStatus status; 115 private long statustime; 116 private Cluster cluster; 117 private ReservationId reservationId; 118 119 /** 120 * @deprecated Use {@link #getInstance()} 121 */ 122 @Deprecated 123 public Job() throws IOException { 124 this(new Configuration()); 125 } 126 127 /** 128 * @deprecated Use {@link #getInstance(Configuration)} 129 */ 130 @Deprecated 131 public Job(Configuration conf) throws IOException { 132 this(new JobConf(conf)); 133 } 134 135 /** 136 * @deprecated Use {@link #getInstance(Configuration, String)} 137 */ 138 @Deprecated 139 public Job(Configuration conf, String jobName) throws IOException { 140 this(conf); 141 setJobName(jobName); 142 } 143 144 Job(JobConf conf) throws IOException { 145 super(conf, null); 146 // propagate existing user credentials to job 147 this.credentials.mergeAll(this.ugi.getCredentials()); 148 this.cluster = null; 149 } 150 151 Job(JobStatus status, JobConf conf) throws IOException { 152 this(conf); 153 setJobID(status.getJobID()); 154 this.status = status; 155 state = JobState.RUNNING; 156 } 157 158 159 /** 160 * Creates a new {@link Job} with no particular {@link Cluster} . 161 * A Cluster will be created with a generic {@link Configuration}. 162 * 163 * @return the {@link Job} , with no connection to a cluster yet. 164 * @throws IOException 165 */ 166 public static Job getInstance() throws IOException { 167 // create with a null Cluster 168 return getInstance(new Configuration()); 169 } 170 171 /** 172 * Creates a new {@link Job} with no particular {@link Cluster} and a 173 * given {@link Configuration}. 174 * 175 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 176 * that any necessary internal modifications do not reflect on the incoming 177 * parameter. 178 * 179 * A Cluster will be created from the conf parameter only when it's needed. 180 * 181 * @param conf the configuration 182 * @return the {@link Job} , with no connection to a cluster yet. 183 * @throws IOException 184 */ 185 public static Job getInstance(Configuration conf) throws IOException { 186 // create with a null Cluster 187 JobConf jobConf = new JobConf(conf); 188 return new Job(jobConf); 189 } 190 191 192 /** 193 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. 194 * A Cluster will be created from the conf parameter only when it's needed. 195 * 196 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 197 * that any necessary internal modifications do not reflect on the incoming 198 * parameter. 199 * 200 * @param conf the configuration 201 * @return the {@link Job} , with no connection to a cluster yet. 202 * @throws IOException 203 */ 204 public static Job getInstance(Configuration conf, String jobName) 205 throws IOException { 206 // create with a null Cluster 207 Job result = getInstance(conf); 208 result.setJobName(jobName); 209 return result; 210 } 211 212 /** 213 * Creates a new {@link Job} with no particular {@link Cluster} and given 214 * {@link Configuration} and {@link JobStatus}. 215 * A Cluster will be created from the conf parameter only when it's needed. 216 * 217 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 218 * that any necessary internal modifications do not reflect on the incoming 219 * parameter. 220 * 221 * @param status job status 222 * @param conf job configuration 223 * @return the {@link Job} , with no connection to a cluster yet. 224 * @throws IOException 225 */ 226 public static Job getInstance(JobStatus status, Configuration conf) 227 throws IOException { 228 return new Job(status, new JobConf(conf)); 229 } 230 231 /** 232 * Creates a new {@link Job} with no particular {@link Cluster}. 233 * A Cluster will be created from the conf parameter only when it's needed. 234 * 235 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 236 * that any necessary internal modifications do not reflect on the incoming 237 * parameter. 238 * 239 * @param ignored 240 * @return the {@link Job} , with no connection to a cluster yet. 241 * @throws IOException 242 * @deprecated Use {@link #getInstance()} 243 */ 244 @Deprecated 245 public static Job getInstance(Cluster ignored) throws IOException { 246 return getInstance(); 247 } 248 249 /** 250 * Creates a new {@link Job} with no particular {@link Cluster} and given 251 * {@link Configuration}. 252 * A Cluster will be created from the conf parameter only when it's needed. 253 * 254 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 255 * that any necessary internal modifications do not reflect on the incoming 256 * parameter. 257 * 258 * @param ignored 259 * @param conf job configuration 260 * @return the {@link Job} , with no connection to a cluster yet. 261 * @throws IOException 262 * @deprecated Use {@link #getInstance(Configuration)} 263 */ 264 @Deprecated 265 public static Job getInstance(Cluster ignored, Configuration conf) 266 throws IOException { 267 return getInstance(conf); 268 } 269 270 /** 271 * Creates a new {@link Job} with no particular {@link Cluster} and given 272 * {@link Configuration} and {@link JobStatus}. 273 * A Cluster will be created from the conf parameter only when it's needed. 274 * 275 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 276 * that any necessary internal modifications do not reflect on the incoming 277 * parameter. 278 * 279 * @param cluster cluster 280 * @param status job status 281 * @param conf job configuration 282 * @return the {@link Job} , with no connection to a cluster yet. 283 * @throws IOException 284 */ 285 @Private 286 public static Job getInstance(Cluster cluster, JobStatus status, 287 Configuration conf) throws IOException { 288 Job job = getInstance(status, conf); 289 job.setCluster(cluster); 290 return job; 291 } 292 293 private void ensureState(JobState state) throws IllegalStateException { 294 if (state != this.state) { 295 throw new IllegalStateException("Job in state "+ this.state + 296 " instead of " + state); 297 } 298 299 if (state == JobState.RUNNING && cluster == null) { 300 throw new IllegalStateException 301 ("Job in state " + this.state 302 + ", but it isn't attached to any job tracker!"); 303 } 304 } 305 306 /** 307 * Some methods rely on having a recent job status object. Refresh 308 * it, if necessary 309 */ 310 synchronized void ensureFreshStatus() 311 throws IOException { 312 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { 313 updateStatus(); 314 } 315 } 316 317 /** Some methods need to update status immediately. So, refresh 318 * immediately 319 * @throws IOException 320 */ 321 synchronized void updateStatus() throws IOException { 322 try { 323 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 324 @Override 325 public JobStatus run() throws IOException, InterruptedException { 326 return cluster.getClient().getJobStatus(getJobID()); 327 } 328 }); 329 } 330 catch (InterruptedException ie) { 331 throw new IOException(ie); 332 } 333 if (this.status == null) { 334 throw new IOException("Job status not available "); 335 } 336 this.statustime = System.currentTimeMillis(); 337 } 338 339 public JobStatus getStatus() throws IOException, InterruptedException { 340 ensureState(JobState.RUNNING); 341 updateStatus(); 342 return status; 343 } 344 345 private void setStatus(JobStatus status) { 346 this.status = status; 347 } 348 349 /** 350 * Returns the current state of the Job. 351 * 352 * @return JobStatus#State 353 * @throws IOException 354 * @throws InterruptedException 355 */ 356 public JobStatus.State getJobState() 357 throws IOException, InterruptedException { 358 ensureState(JobState.RUNNING); 359 updateStatus(); 360 return status.getState(); 361 } 362 363 /** 364 * Set the boolean property for specifying which classpath takes precedence - 365 * the user's one or the system one, when the tasks are launched 366 * @param value pass true if user's classes should take precedence 367 */ 368 public void setUserClassesTakesPrecedence(boolean value) { 369 ensureState(JobState.DEFINE); 370 conf.setUserClassesTakesPrecedence(value); 371 } 372 373 /** 374 * Get the URL where some job progress information will be displayed. 375 * 376 * @return the URL where some job progress information will be displayed. 377 */ 378 public String getTrackingURL(){ 379 ensureState(JobState.RUNNING); 380 return status.getTrackingUrl().toString(); 381 } 382 383 /** 384 * Get the path of the submitted job configuration. 385 * 386 * @return the path of the submitted job configuration. 387 */ 388 public String getJobFile() { 389 ensureState(JobState.RUNNING); 390 return status.getJobFile(); 391 } 392 393 /** 394 * Get start time of the job. 395 * 396 * @return the start time of the job 397 */ 398 public long getStartTime() { 399 ensureState(JobState.RUNNING); 400 return status.getStartTime(); 401 } 402 403 /** 404 * Get finish time of the job. 405 * 406 * @return the finish time of the job 407 */ 408 public long getFinishTime() throws IOException, InterruptedException { 409 ensureState(JobState.RUNNING); 410 updateStatus(); 411 return status.getFinishTime(); 412 } 413 414 /** 415 * Get scheduling info of the job. 416 * 417 * @return the scheduling info of the job 418 */ 419 public String getSchedulingInfo() { 420 ensureState(JobState.RUNNING); 421 return status.getSchedulingInfo(); 422 } 423 424 /** 425 * Get scheduling info of the job. 426 * 427 * @return the scheduling info of the job 428 */ 429 public JobPriority getPriority() throws IOException, InterruptedException { 430 ensureState(JobState.RUNNING); 431 updateStatus(); 432 return status.getPriority(); 433 } 434 435 /** 436 * The user-specified job name. 437 */ 438 public String getJobName() { 439 if (state == JobState.DEFINE || status == null) { 440 return super.getJobName(); 441 } 442 ensureState(JobState.RUNNING); 443 return status.getJobName(); 444 } 445 446 public String getHistoryUrl() throws IOException, InterruptedException { 447 ensureState(JobState.RUNNING); 448 updateStatus(); 449 return status.getHistoryFile(); 450 } 451 452 public boolean isRetired() throws IOException, InterruptedException { 453 ensureState(JobState.RUNNING); 454 updateStatus(); 455 return status.isRetired(); 456 } 457 458 @Private 459 public Cluster getCluster() { 460 return cluster; 461 } 462 463 /** Only for mocks in unit tests. */ 464 @Private 465 private void setCluster(Cluster cluster) { 466 this.cluster = cluster; 467 } 468 469 /** 470 * Dump stats to screen. 471 */ 472 @Override 473 public String toString() { 474 ensureState(JobState.RUNNING); 475 String reasonforFailure = " "; 476 int numMaps = 0; 477 int numReduces = 0; 478 try { 479 updateStatus(); 480 if (status.getState().equals(JobStatus.State.FAILED)) 481 reasonforFailure = getTaskFailureEventString(); 482 numMaps = getTaskReports(TaskType.MAP).length; 483 numReduces = getTaskReports(TaskType.REDUCE).length; 484 } catch (IOException e) { 485 } catch (InterruptedException ie) { 486 } 487 StringBuffer sb = new StringBuffer(); 488 sb.append("Job: ").append(status.getJobID()).append("\n"); 489 sb.append("Job File: ").append(status.getJobFile()).append("\n"); 490 sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); 491 sb.append("\n"); 492 sb.append("Uber job : ").append(status.isUber()).append("\n"); 493 sb.append("Number of maps: ").append(numMaps).append("\n"); 494 sb.append("Number of reduces: ").append(numReduces).append("\n"); 495 sb.append("map() completion: "); 496 sb.append(status.getMapProgress()).append("\n"); 497 sb.append("reduce() completion: "); 498 sb.append(status.getReduceProgress()).append("\n"); 499 sb.append("Job state: "); 500 sb.append(status.getState()).append("\n"); 501 sb.append("retired: ").append(status.isRetired()).append("\n"); 502 sb.append("reason for failure: ").append(reasonforFailure); 503 return sb.toString(); 504 } 505 506 /** 507 * @return taskid which caused job failure 508 * @throws IOException 509 * @throws InterruptedException 510 */ 511 String getTaskFailureEventString() throws IOException, 512 InterruptedException { 513 int failCount = 1; 514 TaskCompletionEvent lastEvent = null; 515 TaskCompletionEvent[] events = ugi.doAs(new 516 PrivilegedExceptionAction<TaskCompletionEvent[]>() { 517 @Override 518 public TaskCompletionEvent[] run() throws IOException, 519 InterruptedException { 520 return cluster.getClient().getTaskCompletionEvents( 521 status.getJobID(), 0, 10); 522 } 523 }); 524 for (TaskCompletionEvent event : events) { 525 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { 526 failCount++; 527 lastEvent = event; 528 } 529 } 530 if (lastEvent == null) { 531 return "There are no failed tasks for the job. " 532 + "Job is failed due to some other reason and reason " 533 + "can be found in the logs."; 534 } 535 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); 536 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); 537 return (" task " + taskID + " failed " + 538 failCount + " times " + "For details check tasktracker at: " + 539 lastEvent.getTaskTrackerHttp()); 540 } 541 542 /** 543 * Get the information of the current state of the tasks of a job. 544 * 545 * @param type Type of the task 546 * @return the list of all of the map tips. 547 * @throws IOException 548 */ 549 public TaskReport[] getTaskReports(TaskType type) 550 throws IOException, InterruptedException { 551 ensureState(JobState.RUNNING); 552 final TaskType tmpType = type; 553 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() { 554 public TaskReport[] run() throws IOException, InterruptedException { 555 return cluster.getClient().getTaskReports(getJobID(), tmpType); 556 } 557 }); 558 } 559 560 /** 561 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 562 * and 1.0. When all map tasks have completed, the function returns 1.0. 563 * 564 * @return the progress of the job's map-tasks. 565 * @throws IOException 566 */ 567 public float mapProgress() throws IOException { 568 ensureState(JobState.RUNNING); 569 ensureFreshStatus(); 570 return status.getMapProgress(); 571 } 572 573 /** 574 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 575 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 576 * 577 * @return the progress of the job's reduce-tasks. 578 * @throws IOException 579 */ 580 public float reduceProgress() throws IOException { 581 ensureState(JobState.RUNNING); 582 ensureFreshStatus(); 583 return status.getReduceProgress(); 584 } 585 586 /** 587 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 588 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. 589 * 590 * @return the progress of the job's cleanup-tasks. 591 * @throws IOException 592 */ 593 public float cleanupProgress() throws IOException, InterruptedException { 594 ensureState(JobState.RUNNING); 595 ensureFreshStatus(); 596 return status.getCleanupProgress(); 597 } 598 599 /** 600 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 601 * and 1.0. When all setup tasks have completed, the function returns 1.0. 602 * 603 * @return the progress of the job's setup-tasks. 604 * @throws IOException 605 */ 606 public float setupProgress() throws IOException { 607 ensureState(JobState.RUNNING); 608 ensureFreshStatus(); 609 return status.getSetupProgress(); 610 } 611 612 /** 613 * Check if the job is finished or not. 614 * This is a non-blocking call. 615 * 616 * @return <code>true</code> if the job is complete, else <code>false</code>. 617 * @throws IOException 618 */ 619 public boolean isComplete() throws IOException { 620 ensureState(JobState.RUNNING); 621 updateStatus(); 622 return status.isJobComplete(); 623 } 624 625 /** 626 * Check if the job completed successfully. 627 * 628 * @return <code>true</code> if the job succeeded, else <code>false</code>. 629 * @throws IOException 630 */ 631 public boolean isSuccessful() throws IOException { 632 ensureState(JobState.RUNNING); 633 updateStatus(); 634 return status.getState() == JobStatus.State.SUCCEEDED; 635 } 636 637 /** 638 * Kill the running job. Blocks until all job tasks have been 639 * killed as well. If the job is no longer running, it simply returns. 640 * 641 * @throws IOException 642 */ 643 public void killJob() throws IOException { 644 ensureState(JobState.RUNNING); 645 try { 646 cluster.getClient().killJob(getJobID()); 647 } 648 catch (InterruptedException ie) { 649 throw new IOException(ie); 650 } 651 } 652 653 /** 654 * Set the priority of a running job. 655 * @param priority the new priority for the job. 656 * @throws IOException 657 */ 658 public void setPriority(JobPriority priority) 659 throws IOException, InterruptedException { 660 if (state == JobState.DEFINE) { 661 conf.setJobPriority( 662 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); 663 } else { 664 ensureState(JobState.RUNNING); 665 final JobPriority tmpPriority = priority; 666 ugi.doAs(new PrivilegedExceptionAction<Object>() { 667 @Override 668 public Object run() throws IOException, InterruptedException { 669 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); 670 return null; 671 } 672 }); 673 } 674 } 675 676 /** 677 * Get events indicating completion (success/failure) of component tasks. 678 * 679 * @param startFrom index to start fetching events from 680 * @param numEvents number of events to fetch 681 * @return an array of {@link TaskCompletionEvent}s 682 * @throws IOException 683 */ 684 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, 685 final int numEvents) throws IOException, InterruptedException { 686 ensureState(JobState.RUNNING); 687 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() { 688 @Override 689 public TaskCompletionEvent[] run() throws IOException, InterruptedException { 690 return cluster.getClient().getTaskCompletionEvents(getJobID(), 691 startFrom, numEvents); 692 } 693 }); 694 } 695 696 /** 697 * Get events indicating completion (success/failure) of component tasks. 698 * 699 * @param startFrom index to start fetching events from 700 * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s 701 * @throws IOException 702 */ 703 public org.apache.hadoop.mapred.TaskCompletionEvent[] 704 getTaskCompletionEvents(final int startFrom) throws IOException { 705 try { 706 TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10); 707 org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents = 708 new org.apache.hadoop.mapred.TaskCompletionEvent[events.length]; 709 for (int i = 0; i < events.length; i++) { 710 retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade 711 (events[i]); 712 } 713 return retEvents; 714 } catch (InterruptedException ie) { 715 throw new IOException(ie); 716 } 717 } 718 719 /** 720 * Kill indicated task attempt. 721 * @param taskId the id of the task to kill. 722 * @param shouldFail if <code>true</code> the task is failed and added 723 * to failed tasks list, otherwise it is just killed, 724 * w/o affecting job failure status. 725 */ 726 @Private 727 public boolean killTask(final TaskAttemptID taskId, 728 final boolean shouldFail) throws IOException { 729 ensureState(JobState.RUNNING); 730 try { 731 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 732 public Boolean run() throws IOException, InterruptedException { 733 return cluster.getClient().killTask(taskId, shouldFail); 734 } 735 }); 736 } 737 catch (InterruptedException ie) { 738 throw new IOException(ie); 739 } 740 } 741 742 /** 743 * Kill indicated task attempt. 744 * 745 * @param taskId the id of the task to be terminated. 746 * @throws IOException 747 */ 748 public void killTask(final TaskAttemptID taskId) 749 throws IOException { 750 killTask(taskId, false); 751 } 752 753 /** 754 * Fail indicated task attempt. 755 * 756 * @param taskId the id of the task to be terminated. 757 * @throws IOException 758 */ 759 public void failTask(final TaskAttemptID taskId) 760 throws IOException { 761 killTask(taskId, true); 762 } 763 764 /** 765 * Gets the counters for this job. May return null if the job has been 766 * retired and the job is no longer in the completed job store. 767 * 768 * @return the counters for this job. 769 * @throws IOException 770 */ 771 public Counters getCounters() 772 throws IOException { 773 ensureState(JobState.RUNNING); 774 try { 775 return ugi.doAs(new PrivilegedExceptionAction<Counters>() { 776 @Override 777 public Counters run() throws IOException, InterruptedException { 778 return cluster.getClient().getJobCounters(getJobID()); 779 } 780 }); 781 } 782 catch (InterruptedException ie) { 783 throw new IOException(ie); 784 } 785 } 786 787 /** 788 * Gets the diagnostic messages for a given task attempt. 789 * @param taskid 790 * @return the list of diagnostic messages for the task 791 * @throws IOException 792 */ 793 public String[] getTaskDiagnostics(final TaskAttemptID taskid) 794 throws IOException, InterruptedException { 795 ensureState(JobState.RUNNING); 796 return ugi.doAs(new PrivilegedExceptionAction<String[]>() { 797 @Override 798 public String[] run() throws IOException, InterruptedException { 799 return cluster.getClient().getTaskDiagnostics(taskid); 800 } 801 }); 802 } 803 804 /** 805 * Set the number of reduce tasks for the job. 806 * @param tasks the number of reduce tasks 807 * @throws IllegalStateException if the job is submitted 808 */ 809 public void setNumReduceTasks(int tasks) throws IllegalStateException { 810 ensureState(JobState.DEFINE); 811 conf.setNumReduceTasks(tasks); 812 } 813 814 /** 815 * Set the current working directory for the default file system. 816 * 817 * @param dir the new current working directory. 818 * @throws IllegalStateException if the job is submitted 819 */ 820 public void setWorkingDirectory(Path dir) throws IOException { 821 ensureState(JobState.DEFINE); 822 conf.setWorkingDirectory(dir); 823 } 824 825 /** 826 * Set the {@link InputFormat} for the job. 827 * @param cls the <code>InputFormat</code> to use 828 * @throws IllegalStateException if the job is submitted 829 */ 830 public void setInputFormatClass(Class<? extends InputFormat> cls 831 ) throws IllegalStateException { 832 ensureState(JobState.DEFINE); 833 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 834 InputFormat.class); 835 } 836 837 /** 838 * Set the {@link OutputFormat} for the job. 839 * @param cls the <code>OutputFormat</code> to use 840 * @throws IllegalStateException if the job is submitted 841 */ 842 public void setOutputFormatClass(Class<? extends OutputFormat> cls 843 ) throws IllegalStateException { 844 ensureState(JobState.DEFINE); 845 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 846 OutputFormat.class); 847 } 848 849 /** 850 * Set the {@link Mapper} for the job. 851 * @param cls the <code>Mapper</code> to use 852 * @throws IllegalStateException if the job is submitted 853 */ 854 public void setMapperClass(Class<? extends Mapper> cls 855 ) throws IllegalStateException { 856 ensureState(JobState.DEFINE); 857 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 858 } 859 860 /** 861 * Set the Jar by finding where a given class came from. 862 * @param cls the example class 863 */ 864 public void setJarByClass(Class<?> cls) { 865 ensureState(JobState.DEFINE); 866 conf.setJarByClass(cls); 867 } 868 869 /** 870 * Set the job jar 871 */ 872 public void setJar(String jar) { 873 ensureState(JobState.DEFINE); 874 conf.setJar(jar); 875 } 876 877 /** 878 * Set the reported username for this job. 879 * 880 * @param user the username for this job. 881 */ 882 public void setUser(String user) { 883 ensureState(JobState.DEFINE); 884 conf.setUser(user); 885 } 886 887 /** 888 * Set the combiner class for the job. 889 * @param cls the combiner to use 890 * @throws IllegalStateException if the job is submitted 891 */ 892 public void setCombinerClass(Class<? extends Reducer> cls 893 ) throws IllegalStateException { 894 ensureState(JobState.DEFINE); 895 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 896 } 897 898 /** 899 * Set the {@link Reducer} for the job. 900 * @param cls the <code>Reducer</code> to use 901 * @throws IllegalStateException if the job is submitted 902 */ 903 public void setReducerClass(Class<? extends Reducer> cls 904 ) throws IllegalStateException { 905 ensureState(JobState.DEFINE); 906 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 907 } 908 909 /** 910 * Set the {@link Partitioner} for the job. 911 * @param cls the <code>Partitioner</code> to use 912 * @throws IllegalStateException if the job is submitted 913 */ 914 public void setPartitionerClass(Class<? extends Partitioner> cls 915 ) throws IllegalStateException { 916 ensureState(JobState.DEFINE); 917 conf.setClass(PARTITIONER_CLASS_ATTR, cls, 918 Partitioner.class); 919 } 920 921 /** 922 * Set the key class for the map output data. This allows the user to 923 * specify the map output key class to be different than the final output 924 * value class. 925 * 926 * @param theClass the map output key class. 927 * @throws IllegalStateException if the job is submitted 928 */ 929 public void setMapOutputKeyClass(Class<?> theClass 930 ) throws IllegalStateException { 931 ensureState(JobState.DEFINE); 932 conf.setMapOutputKeyClass(theClass); 933 } 934 935 /** 936 * Set the value class for the map output data. This allows the user to 937 * specify the map output value class to be different than the final output 938 * value class. 939 * 940 * @param theClass the map output value class. 941 * @throws IllegalStateException if the job is submitted 942 */ 943 public void setMapOutputValueClass(Class<?> theClass 944 ) throws IllegalStateException { 945 ensureState(JobState.DEFINE); 946 conf.setMapOutputValueClass(theClass); 947 } 948 949 /** 950 * Set the key class for the job output data. 951 * 952 * @param theClass the key class for the job output data. 953 * @throws IllegalStateException if the job is submitted 954 */ 955 public void setOutputKeyClass(Class<?> theClass 956 ) throws IllegalStateException { 957 ensureState(JobState.DEFINE); 958 conf.setOutputKeyClass(theClass); 959 } 960 961 /** 962 * Set the value class for job outputs. 963 * 964 * @param theClass the value class for job outputs. 965 * @throws IllegalStateException if the job is submitted 966 */ 967 public void setOutputValueClass(Class<?> theClass 968 ) throws IllegalStateException { 969 ensureState(JobState.DEFINE); 970 conf.setOutputValueClass(theClass); 971 } 972 973 /** 974 * Define the comparator that controls which keys are grouped together 975 * for a single call to combiner, 976 * {@link Reducer#reduce(Object, Iterable, 977 * org.apache.hadoop.mapreduce.Reducer.Context)} 978 * 979 * @param cls the raw comparator to use 980 * @throws IllegalStateException if the job is submitted 981 */ 982 public void setCombinerKeyGroupingComparatorClass( 983 Class<? extends RawComparator> cls) throws IllegalStateException { 984 ensureState(JobState.DEFINE); 985 conf.setCombinerKeyGroupingComparator(cls); 986 } 987 988 /** 989 * Define the comparator that controls how the keys are sorted before they 990 * are passed to the {@link Reducer}. 991 * @param cls the raw comparator 992 * @throws IllegalStateException if the job is submitted 993 * @see #setCombinerKeyGroupingComparatorClass(Class) 994 */ 995 public void setSortComparatorClass(Class<? extends RawComparator> cls 996 ) throws IllegalStateException { 997 ensureState(JobState.DEFINE); 998 conf.setOutputKeyComparatorClass(cls); 999 } 1000 1001 /** 1002 * Define the comparator that controls which keys are grouped together 1003 * for a single call to 1004 * {@link Reducer#reduce(Object, Iterable, 1005 * org.apache.hadoop.mapreduce.Reducer.Context)} 1006 * @param cls the raw comparator to use 1007 * @throws IllegalStateException if the job is submitted 1008 * @see #setCombinerKeyGroupingComparatorClass(Class) 1009 */ 1010 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 1011 ) throws IllegalStateException { 1012 ensureState(JobState.DEFINE); 1013 conf.setOutputValueGroupingComparator(cls); 1014 } 1015 1016 /** 1017 * Set the user-specified job name. 1018 * 1019 * @param name the job's new name. 1020 * @throws IllegalStateException if the job is submitted 1021 */ 1022 public void setJobName(String name) throws IllegalStateException { 1023 ensureState(JobState.DEFINE); 1024 conf.setJobName(name); 1025 } 1026 1027 /** 1028 * Turn speculative execution on or off for this job. 1029 * 1030 * @param speculativeExecution <code>true</code> if speculative execution 1031 * should be turned on, else <code>false</code>. 1032 */ 1033 public void setSpeculativeExecution(boolean speculativeExecution) { 1034 ensureState(JobState.DEFINE); 1035 conf.setSpeculativeExecution(speculativeExecution); 1036 } 1037 1038 /** 1039 * Turn speculative execution on or off for this job for map tasks. 1040 * 1041 * @param speculativeExecution <code>true</code> if speculative execution 1042 * should be turned on for map tasks, 1043 * else <code>false</code>. 1044 */ 1045 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1046 ensureState(JobState.DEFINE); 1047 conf.setMapSpeculativeExecution(speculativeExecution); 1048 } 1049 1050 /** 1051 * Turn speculative execution on or off for this job for reduce tasks. 1052 * 1053 * @param speculativeExecution <code>true</code> if speculative execution 1054 * should be turned on for reduce tasks, 1055 * else <code>false</code>. 1056 */ 1057 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1058 ensureState(JobState.DEFINE); 1059 conf.setReduceSpeculativeExecution(speculativeExecution); 1060 } 1061 1062 /** 1063 * Specify whether job-setup and job-cleanup is needed for the job 1064 * 1065 * @param needed If <code>true</code>, job-setup and job-cleanup will be 1066 * considered from {@link OutputCommitter} 1067 * else ignored. 1068 */ 1069 public void setJobSetupCleanupNeeded(boolean needed) { 1070 ensureState(JobState.DEFINE); 1071 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); 1072 } 1073 1074 /** 1075 * Set the given set of archives 1076 * @param archives The list of archives that need to be localized 1077 */ 1078 public void setCacheArchives(URI[] archives) { 1079 ensureState(JobState.DEFINE); 1080 DistributedCache.setCacheArchives(archives, conf); 1081 } 1082 1083 /** 1084 * Set the given set of files 1085 * @param files The list of files that need to be localized 1086 */ 1087 public void setCacheFiles(URI[] files) { 1088 ensureState(JobState.DEFINE); 1089 DistributedCache.setCacheFiles(files, conf); 1090 } 1091 1092 /** 1093 * Add a archives to be localized 1094 * @param uri The uri of the cache to be localized 1095 */ 1096 public void addCacheArchive(URI uri) { 1097 ensureState(JobState.DEFINE); 1098 DistributedCache.addCacheArchive(uri, conf); 1099 } 1100 1101 /** 1102 * Add a file to be localized 1103 * @param uri The uri of the cache to be localized 1104 */ 1105 public void addCacheFile(URI uri) { 1106 ensureState(JobState.DEFINE); 1107 DistributedCache.addCacheFile(uri, conf); 1108 } 1109 1110 /** 1111 * Add an file path to the current set of classpath entries It adds the file 1112 * to cache as well. 1113 * 1114 * Files added with this method will not be unpacked while being added to the 1115 * classpath. 1116 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} 1117 * method instead. 1118 * 1119 * @param file Path of the file to be added 1120 */ 1121 public void addFileToClassPath(Path file) 1122 throws IOException { 1123 ensureState(JobState.DEFINE); 1124 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); 1125 } 1126 1127 /** 1128 * Add an archive path to the current set of classpath entries. It adds the 1129 * archive to cache as well. 1130 * 1131 * Archive files will be unpacked and added to the classpath 1132 * when being distributed. 1133 * 1134 * @param archive Path of the archive to be added 1135 */ 1136 public void addArchiveToClassPath(Path archive) 1137 throws IOException { 1138 ensureState(JobState.DEFINE); 1139 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); 1140 } 1141 1142 /** 1143 * Originally intended to enable symlinks, but currently symlinks cannot be 1144 * disabled. 1145 */ 1146 @Deprecated 1147 public void createSymlink() { 1148 ensureState(JobState.DEFINE); 1149 DistributedCache.createSymlink(conf); 1150 } 1151 1152 /** 1153 * Expert: Set the number of maximum attempts that will be made to run a 1154 * map task. 1155 * 1156 * @param n the number of attempts per map task. 1157 */ 1158 public void setMaxMapAttempts(int n) { 1159 ensureState(JobState.DEFINE); 1160 conf.setMaxMapAttempts(n); 1161 } 1162 1163 /** 1164 * Expert: Set the number of maximum attempts that will be made to run a 1165 * reduce task. 1166 * 1167 * @param n the number of attempts per reduce task. 1168 */ 1169 public void setMaxReduceAttempts(int n) { 1170 ensureState(JobState.DEFINE); 1171 conf.setMaxReduceAttempts(n); 1172 } 1173 1174 /** 1175 * Set whether the system should collect profiler information for some of 1176 * the tasks in this job? The information is stored in the user log 1177 * directory. 1178 * @param newValue true means it should be gathered 1179 */ 1180 public void setProfileEnabled(boolean newValue) { 1181 ensureState(JobState.DEFINE); 1182 conf.setProfileEnabled(newValue); 1183 } 1184 1185 /** 1186 * Set the profiler configuration arguments. If the string contains a '%s' it 1187 * will be replaced with the name of the profiling output file when the task 1188 * runs. 1189 * 1190 * This value is passed to the task child JVM on the command line. 1191 * 1192 * @param value the configuration string 1193 */ 1194 public void setProfileParams(String value) { 1195 ensureState(JobState.DEFINE); 1196 conf.setProfileParams(value); 1197 } 1198 1199 /** 1200 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1201 * must also be called. 1202 * @param newValue a set of integer ranges of the map ids 1203 */ 1204 public void setProfileTaskRange(boolean isMap, String newValue) { 1205 ensureState(JobState.DEFINE); 1206 conf.setProfileTaskRange(isMap, newValue); 1207 } 1208 1209 private void ensureNotSet(String attr, String msg) throws IOException { 1210 if (conf.get(attr) != null) { 1211 throw new IOException(attr + " is incompatible with " + msg + " mode."); 1212 } 1213 } 1214 1215 /** 1216 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 1217 * tokens upon job completion. Defaults to true. 1218 */ 1219 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 1220 ensureState(JobState.DEFINE); 1221 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 1222 } 1223 1224 /** 1225 * Default to the new APIs unless they are explicitly set or the old mapper or 1226 * reduce attributes are used. 1227 * @throws IOException if the configuration is inconsistant 1228 */ 1229 private void setUseNewAPI() throws IOException { 1230 int numReduces = conf.getNumReduceTasks(); 1231 String oldMapperClass = "mapred.mapper.class"; 1232 String oldReduceClass = "mapred.reducer.class"; 1233 conf.setBooleanIfUnset("mapred.mapper.new-api", 1234 conf.get(oldMapperClass) == null); 1235 if (conf.getUseNewMapper()) { 1236 String mode = "new map API"; 1237 ensureNotSet("mapred.input.format.class", mode); 1238 ensureNotSet(oldMapperClass, mode); 1239 if (numReduces != 0) { 1240 ensureNotSet("mapred.partitioner.class", mode); 1241 } else { 1242 ensureNotSet("mapred.output.format.class", mode); 1243 } 1244 } else { 1245 String mode = "map compatability"; 1246 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); 1247 ensureNotSet(MAP_CLASS_ATTR, mode); 1248 if (numReduces != 0) { 1249 ensureNotSet(PARTITIONER_CLASS_ATTR, mode); 1250 } else { 1251 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1252 } 1253 } 1254 if (numReduces != 0) { 1255 conf.setBooleanIfUnset("mapred.reducer.new-api", 1256 conf.get(oldReduceClass) == null); 1257 if (conf.getUseNewReducer()) { 1258 String mode = "new reduce API"; 1259 ensureNotSet("mapred.output.format.class", mode); 1260 ensureNotSet(oldReduceClass, mode); 1261 } else { 1262 String mode = "reduce compatability"; 1263 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1264 ensureNotSet(REDUCE_CLASS_ATTR, mode); 1265 } 1266 } 1267 } 1268 1269 private synchronized void connect() 1270 throws IOException, InterruptedException, ClassNotFoundException { 1271 if (cluster == null) { 1272 cluster = 1273 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 1274 public Cluster run() 1275 throws IOException, InterruptedException, 1276 ClassNotFoundException { 1277 return new Cluster(getConfiguration()); 1278 } 1279 }); 1280 } 1281 } 1282 1283 boolean isConnected() { 1284 return cluster != null; 1285 } 1286 1287 /** Only for mocking via unit tests. */ 1288 @Private 1289 public JobSubmitter getJobSubmitter(FileSystem fs, 1290 ClientProtocol submitClient) throws IOException { 1291 return new JobSubmitter(fs, submitClient); 1292 } 1293 /** 1294 * Submit the job to the cluster and return immediately. 1295 * @throws IOException 1296 */ 1297 public void submit() 1298 throws IOException, InterruptedException, ClassNotFoundException { 1299 ensureState(JobState.DEFINE); 1300 setUseNewAPI(); 1301 connect(); 1302 final JobSubmitter submitter = 1303 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 1304 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 1305 public JobStatus run() throws IOException, InterruptedException, 1306 ClassNotFoundException { 1307 return submitter.submitJobInternal(Job.this, cluster); 1308 } 1309 }); 1310 state = JobState.RUNNING; 1311 LOG.info("The url to track the job: " + getTrackingURL()); 1312 } 1313 1314 /** 1315 * Submit the job to the cluster and wait for it to finish. 1316 * @param verbose print the progress to the user 1317 * @return true if the job succeeded 1318 * @throws IOException thrown if the communication with the 1319 * <code>JobTracker</code> is lost 1320 */ 1321 public boolean waitForCompletion(boolean verbose 1322 ) throws IOException, InterruptedException, 1323 ClassNotFoundException { 1324 if (state == JobState.DEFINE) { 1325 submit(); 1326 } 1327 if (verbose) { 1328 monitorAndPrintJob(); 1329 } else { 1330 // get the completion poll interval from the client. 1331 int completionPollIntervalMillis = 1332 Job.getCompletionPollInterval(cluster.getConf()); 1333 while (!isComplete()) { 1334 try { 1335 Thread.sleep(completionPollIntervalMillis); 1336 } catch (InterruptedException ie) { 1337 } 1338 } 1339 } 1340 return isSuccessful(); 1341 } 1342 1343 /** 1344 * Monitor a job and print status in real-time as progress is made and tasks 1345 * fail. 1346 * @return true if the job succeeded 1347 * @throws IOException if communication to the JobTracker fails 1348 */ 1349 public boolean monitorAndPrintJob() 1350 throws IOException, InterruptedException { 1351 String lastReport = null; 1352 Job.TaskStatusFilter filter; 1353 Configuration clientConf = getConfiguration(); 1354 filter = Job.getTaskOutputFilter(clientConf); 1355 JobID jobId = getJobID(); 1356 LOG.info("Running job: " + jobId); 1357 int eventCounter = 0; 1358 boolean profiling = getProfileEnabled(); 1359 IntegerRanges mapRanges = getProfileTaskRange(true); 1360 IntegerRanges reduceRanges = getProfileTaskRange(false); 1361 int progMonitorPollIntervalMillis = 1362 Job.getProgressPollInterval(clientConf); 1363 /* make sure to report full progress after the job is done */ 1364 boolean reportedAfterCompletion = false; 1365 boolean reportedUberMode = false; 1366 while (!isComplete() || !reportedAfterCompletion) { 1367 if (isComplete()) { 1368 reportedAfterCompletion = true; 1369 } else { 1370 Thread.sleep(progMonitorPollIntervalMillis); 1371 } 1372 if (status.getState() == JobStatus.State.PREP) { 1373 continue; 1374 } 1375 if (!reportedUberMode) { 1376 reportedUberMode = true; 1377 LOG.info("Job " + jobId + " running in uber mode : " + isUber()); 1378 } 1379 String report = 1380 (" map " + StringUtils.formatPercent(mapProgress(), 0)+ 1381 " reduce " + 1382 StringUtils.formatPercent(reduceProgress(), 0)); 1383 if (!report.equals(lastReport)) { 1384 LOG.info(report); 1385 lastReport = report; 1386 } 1387 1388 TaskCompletionEvent[] events = 1389 getTaskCompletionEvents(eventCounter, 10); 1390 eventCounter += events.length; 1391 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); 1392 } 1393 boolean success = isSuccessful(); 1394 if (success) { 1395 LOG.info("Job " + jobId + " completed successfully"); 1396 } else { 1397 LOG.info("Job " + jobId + " failed with state " + status.getState() + 1398 " due to: " + status.getFailureInfo()); 1399 } 1400 Counters counters = getCounters(); 1401 if (counters != null) { 1402 LOG.info(counters.toString()); 1403 } 1404 return success; 1405 } 1406 1407 /** 1408 * @return true if the profile parameters indicate that this is using 1409 * hprof, which generates profile files in a particular location 1410 * that we can retrieve to the client. 1411 */ 1412 private boolean shouldDownloadProfile() { 1413 // Check the argument string that was used to initialize profiling. 1414 // If this indicates hprof and file-based output, then we're ok to 1415 // download. 1416 String profileParams = getProfileParams(); 1417 1418 if (null == profileParams) { 1419 return false; 1420 } 1421 1422 // Split this on whitespace. 1423 String [] parts = profileParams.split("[ \\t]+"); 1424 1425 // If any of these indicate hprof, and the use of output files, return true. 1426 boolean hprofFound = false; 1427 boolean fileFound = false; 1428 for (String p : parts) { 1429 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) { 1430 hprofFound = true; 1431 1432 // This contains a number of comma-delimited components, one of which 1433 // may specify the file to write to. Make sure this is present and 1434 // not empty. 1435 String [] subparts = p.split(","); 1436 for (String sub : subparts) { 1437 if (sub.startsWith("file=") && sub.length() != "file=".length()) { 1438 fileFound = true; 1439 } 1440 } 1441 } 1442 } 1443 1444 return hprofFound && fileFound; 1445 } 1446 1447 private void printTaskEvents(TaskCompletionEvent[] events, 1448 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, 1449 IntegerRanges reduceRanges) throws IOException, InterruptedException { 1450 for (TaskCompletionEvent event : events) { 1451 switch (filter) { 1452 case NONE: 1453 break; 1454 case SUCCEEDED: 1455 if (event.getStatus() == 1456 TaskCompletionEvent.Status.SUCCEEDED) { 1457 LOG.info(event.toString()); 1458 } 1459 break; 1460 case FAILED: 1461 if (event.getStatus() == 1462 TaskCompletionEvent.Status.FAILED) { 1463 LOG.info(event.toString()); 1464 // Displaying the task diagnostic information 1465 TaskAttemptID taskId = event.getTaskAttemptId(); 1466 String[] taskDiagnostics = getTaskDiagnostics(taskId); 1467 if (taskDiagnostics != null) { 1468 for (String diagnostics : taskDiagnostics) { 1469 System.err.println(diagnostics); 1470 } 1471 } 1472 } 1473 break; 1474 case KILLED: 1475 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ 1476 LOG.info(event.toString()); 1477 } 1478 break; 1479 case ALL: 1480 LOG.info(event.toString()); 1481 break; 1482 } 1483 } 1484 } 1485 1486 /** The interval at which monitorAndPrintJob() prints status */ 1487 public static int getProgressPollInterval(Configuration conf) { 1488 // Read progress monitor poll interval from config. Default is 1 second. 1489 int progMonitorPollIntervalMillis = conf.getInt( 1490 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); 1491 if (progMonitorPollIntervalMillis < 1) { 1492 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 1493 " has been set to an invalid value; " 1494 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); 1495 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; 1496 } 1497 return progMonitorPollIntervalMillis; 1498 } 1499 1500 /** The interval at which waitForCompletion() should check. */ 1501 public static int getCompletionPollInterval(Configuration conf) { 1502 int completionPollIntervalMillis = conf.getInt( 1503 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); 1504 if (completionPollIntervalMillis < 1) { 1505 LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 1506 " has been set to an invalid value; " 1507 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); 1508 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; 1509 } 1510 return completionPollIntervalMillis; 1511 } 1512 1513 /** 1514 * Get the task output filter. 1515 * 1516 * @param conf the configuration. 1517 * @return the filter level. 1518 */ 1519 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { 1520 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); 1521 } 1522 1523 /** 1524 * Modify the Configuration to set the task output filter. 1525 * 1526 * @param conf the Configuration to modify. 1527 * @param newValue the value to set. 1528 */ 1529 public static void setTaskOutputFilter(Configuration conf, 1530 TaskStatusFilter newValue) { 1531 conf.set(Job.OUTPUT_FILTER, newValue.toString()); 1532 } 1533 1534 public boolean isUber() throws IOException, InterruptedException { 1535 ensureState(JobState.RUNNING); 1536 updateStatus(); 1537 return status.isUber(); 1538 } 1539 1540 /** 1541 * Get the reservation to which the job is submitted to, if any 1542 * 1543 * @return the reservationId the identifier of the job's reservation, null if 1544 * the job does not have any reservation associated with it 1545 */ 1546 public ReservationId getReservationId() { 1547 return reservationId; 1548 } 1549 1550 /** 1551 * Set the reservation to which the job is submitted to 1552 * 1553 * @param reservationId the reservationId to set 1554 */ 1555 public void setReservationId(ReservationId reservationId) { 1556 this.reservationId = reservationId; 1557 } 1558 1559}