001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapred; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.net.InetSocketAddress; 023import java.net.URL; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.List; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.io.Text; 036import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; 037import org.apache.hadoop.mapreduce.Cluster; 038import org.apache.hadoop.mapreduce.ClusterMetrics; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.MRJobConfig; 041import org.apache.hadoop.mapreduce.QueueInfo; 042import org.apache.hadoop.mapreduce.TaskTrackerInfo; 043import org.apache.hadoop.mapreduce.TaskType; 044import org.apache.hadoop.mapreduce.counters.Limits; 045import org.apache.hadoop.mapreduce.filecache.DistributedCache; 046import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 047import org.apache.hadoop.mapreduce.tools.CLI; 048import org.apache.hadoop.mapreduce.util.ConfigUtil; 049import org.apache.hadoop.security.UserGroupInformation; 050import org.apache.hadoop.security.token.SecretManager.InvalidToken; 051import org.apache.hadoop.security.token.Token; 052import org.apache.hadoop.security.token.TokenRenewer; 053import org.apache.hadoop.util.Tool; 054import org.apache.hadoop.util.ToolRunner; 055 056/** 057 * <code>JobClient</code> is the primary interface for the user-job to interact 058 * with the cluster. 059 * 060 * <code>JobClient</code> provides facilities to submit jobs, track their 061 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster 062 * status information etc. 063 * 064 * <p>The job submission process involves: 065 * <ol> 066 * <li> 067 * Checking the input and output specifications of the job. 068 * </li> 069 * <li> 070 * Computing the {@link InputSplit}s for the job. 071 * </li> 072 * <li> 073 * Setup the requisite accounting information for the {@link DistributedCache} 074 * of the job, if necessary. 075 * </li> 076 * <li> 077 * Copying the job's jar and configuration to the map-reduce system directory 078 * on the distributed file-system. 079 * </li> 080 * <li> 081 * Submitting the job to the cluster and optionally monitoring 082 * it's status. 083 * </li> 084 * </ol></p> 085 * 086 * Normally the user creates the application, describes various facets of the 087 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 088 * the job and monitor its progress. 089 * 090 * <p>Here is an example on how to use <code>JobClient</code>:</p> 091 * <p><blockquote><pre> 092 * // Create a new JobConf 093 * JobConf job = new JobConf(new Configuration(), MyJob.class); 094 * 095 * // Specify various job-specific parameters 096 * job.setJobName("myjob"); 097 * 098 * job.setInputPath(new Path("in")); 099 * job.setOutputPath(new Path("out")); 100 * 101 * job.setMapperClass(MyJob.MyMapper.class); 102 * job.setReducerClass(MyJob.MyReducer.class); 103 * 104 * // Submit the job, then poll for progress until the job is complete 105 * JobClient.runJob(job); 106 * </pre></blockquote></p> 107 * 108 * <h4 id="JobControl">Job Control</h4> 109 * 110 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 111 * which cannot be done via a single map-reduce job. This is fairly easy since 112 * the output of the job, typically, goes to distributed file-system and that 113 * can be used as the input for the next job.</p> 114 * 115 * <p>However, this also means that the onus on ensuring jobs are complete 116 * (success/failure) lies squarely on the clients. In such situations the 117 * various job-control options are: 118 * <ol> 119 * <li> 120 * {@link #runJob(JobConf)} : submits the job and returns only after 121 * the job has completed. 122 * </li> 123 * <li> 124 * {@link #submitJob(JobConf)} : only submits the job, then poll the 125 * returned handle to the {@link RunningJob} to query status and make 126 * scheduling decisions. 127 * </li> 128 * <li> 129 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification 130 * on job-completion, thus avoiding polling. 131 * </li> 132 * </ol></p> 133 * 134 * @see JobConf 135 * @see ClusterStatus 136 * @see Tool 137 * @see DistributedCache 138 */ 139@InterfaceAudience.Public 140@InterfaceStability.Stable 141public class JobClient extends CLI implements AutoCloseable { 142 143 @InterfaceAudience.Private 144 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY = 145 "mapreduce.jobclient.retry.policy.enabled"; 146 @InterfaceAudience.Private 147 public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = 148 false; 149 @InterfaceAudience.Private 150 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY = 151 "mapreduce.jobclient.retry.policy.spec"; 152 @InterfaceAudience.Private 153 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT = 154 "10000,6,60000,10"; // t1,n1,t2,n2,... 155 156 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 157 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 158 159 private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES; 160 private long retryInterval = 161 MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL; 162 163 static{ 164 ConfigUtil.loadResources(); 165 } 166 167 /** 168 * A NetworkedJob is an implementation of RunningJob. It holds 169 * a JobProfile object to provide some info, and interacts with the 170 * remote service to provide certain functionality. 171 */ 172 static class NetworkedJob implements RunningJob { 173 Job job; 174 /** 175 * We store a JobProfile and a timestamp for when we last 176 * acquired the job profile. If the job is null, then we cannot 177 * perform any of the tasks. The job might be null if the cluster 178 * has completely forgotten about the job. (eg, 24 hours after the 179 * job completes.) 180 */ 181 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException { 182 this(status, cluster, new JobConf(status.getJobFile())); 183 } 184 185 private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf) 186 throws IOException { 187 this(Job.getInstance(cluster, status, conf)); 188 } 189 190 public NetworkedJob(Job job) throws IOException { 191 this.job = job; 192 } 193 194 public Configuration getConfiguration() { 195 return job.getConfiguration(); 196 } 197 198 /** 199 * An identifier for the job 200 */ 201 public JobID getID() { 202 return JobID.downgrade(job.getJobID()); 203 } 204 205 /** @deprecated This method is deprecated and will be removed. Applications should 206 * rather use {@link #getID()}.*/ 207 @Deprecated 208 public String getJobID() { 209 return getID().toString(); 210 } 211 212 /** 213 * The user-specified job name 214 */ 215 public String getJobName() { 216 return job.getJobName(); 217 } 218 219 /** 220 * The name of the job file 221 */ 222 public String getJobFile() { 223 return job.getJobFile(); 224 } 225 226 /** 227 * A URL where the job's status can be seen 228 */ 229 public String getTrackingURL() { 230 return job.getTrackingURL(); 231 } 232 233 /** 234 * A float between 0.0 and 1.0, indicating the % of map work 235 * completed. 236 */ 237 public float mapProgress() throws IOException { 238 return job.mapProgress(); 239 } 240 241 /** 242 * A float between 0.0 and 1.0, indicating the % of reduce work 243 * completed. 244 */ 245 public float reduceProgress() throws IOException { 246 return job.reduceProgress(); 247 } 248 249 /** 250 * A float between 0.0 and 1.0, indicating the % of cleanup work 251 * completed. 252 */ 253 public float cleanupProgress() throws IOException { 254 try { 255 return job.cleanupProgress(); 256 } catch (InterruptedException ie) { 257 throw new IOException(ie); 258 } 259 } 260 261 /** 262 * A float between 0.0 and 1.0, indicating the % of setup work 263 * completed. 264 */ 265 public float setupProgress() throws IOException { 266 return job.setupProgress(); 267 } 268 269 /** 270 * Returns immediately whether the whole job is done yet or not. 271 */ 272 public synchronized boolean isComplete() throws IOException { 273 return job.isComplete(); 274 } 275 276 /** 277 * True iff job completed successfully. 278 */ 279 public synchronized boolean isSuccessful() throws IOException { 280 return job.isSuccessful(); 281 } 282 283 /** 284 * Blocks until the job is finished 285 */ 286 public void waitForCompletion() throws IOException { 287 try { 288 job.waitForCompletion(false); 289 } catch (InterruptedException ie) { 290 throw new IOException(ie); 291 } catch (ClassNotFoundException ce) { 292 throw new IOException(ce); 293 } 294 } 295 296 /** 297 * Tells the service to get the state of the current job. 298 */ 299 public synchronized int getJobState() throws IOException { 300 try { 301 return job.getJobState().getValue(); 302 } catch (InterruptedException ie) { 303 throw new IOException(ie); 304 } 305 } 306 307 /** 308 * Tells the service to terminate the current job. 309 */ 310 public synchronized void killJob() throws IOException { 311 job.killJob(); 312 } 313 314 315 /** Set the priority of the job. 316 * @param priority new priority of the job. 317 */ 318 public synchronized void setJobPriority(String priority) 319 throws IOException { 320 try { 321 job.setPriority( 322 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority)); 323 } catch (InterruptedException ie) { 324 throw new IOException(ie); 325 } 326 } 327 328 /** 329 * Kill indicated task attempt. 330 * @param taskId the id of the task to kill. 331 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise 332 * it is just killed, w/o affecting job failure status. 333 */ 334 public synchronized void killTask(TaskAttemptID taskId, 335 boolean shouldFail) throws IOException { 336 if (shouldFail) { 337 job.failTask(taskId); 338 } else { 339 job.killTask(taskId); 340 } 341 } 342 343 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/ 344 @Deprecated 345 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException { 346 killTask(TaskAttemptID.forName(taskId), shouldFail); 347 } 348 349 /** 350 * Fetch task completion events from cluster for this job. 351 */ 352 public synchronized TaskCompletionEvent[] getTaskCompletionEvents( 353 int startFrom) throws IOException { 354 try { 355 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 356 job.getTaskCompletionEvents(startFrom, 10); 357 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length]; 358 for (int i = 0 ; i < acls.length; i++ ) { 359 ret[i] = TaskCompletionEvent.downgrade(acls[i]); 360 } 361 return ret; 362 } catch (InterruptedException ie) { 363 throw new IOException(ie); 364 } 365 } 366 367 /** 368 * Dump stats to screen 369 */ 370 @Override 371 public String toString() { 372 return job.toString(); 373 } 374 375 /** 376 * Returns the counters for this job 377 */ 378 public Counters getCounters() throws IOException { 379 Counters result = null; 380 org.apache.hadoop.mapreduce.Counters temp = job.getCounters(); 381 if(temp != null) { 382 result = Counters.downgrade(temp); 383 } 384 return result; 385 } 386 387 @Override 388 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException { 389 try { 390 return job.getTaskDiagnostics(id); 391 } catch (InterruptedException ie) { 392 throw new IOException(ie); 393 } 394 } 395 396 public String getHistoryUrl() throws IOException { 397 try { 398 return job.getHistoryUrl(); 399 } catch (InterruptedException ie) { 400 throw new IOException(ie); 401 } 402 } 403 404 public boolean isRetired() throws IOException { 405 try { 406 return job.isRetired(); 407 } catch (InterruptedException ie) { 408 throw new IOException(ie); 409 } 410 } 411 412 boolean monitorAndPrintJob() throws IOException, InterruptedException { 413 return job.monitorAndPrintJob(); 414 } 415 416 @Override 417 public String getFailureInfo() throws IOException { 418 try { 419 return job.getStatus().getFailureInfo(); 420 } catch (InterruptedException ie) { 421 throw new IOException(ie); 422 } 423 } 424 425 @Override 426 public JobStatus getJobStatus() throws IOException { 427 try { 428 return JobStatus.downgrade(job.getStatus()); 429 } catch (InterruptedException ie) { 430 throw new IOException(ie); 431 } 432 } 433 } 434 435 /** 436 * Ugi of the client. We store this ugi when the client is created and 437 * then make sure that the same ugi is used to run the various protocols. 438 */ 439 UserGroupInformation clientUgi; 440 441 /** 442 * Create a job client. 443 */ 444 public JobClient() { 445 } 446 447 /** 448 * Build a job client with the given {@link JobConf}, and connect to the 449 * default cluster 450 * 451 * @param conf the job configuration. 452 * @throws IOException 453 */ 454 public JobClient(JobConf conf) throws IOException { 455 init(conf); 456 } 457 458 /** 459 * Build a job client with the given {@link Configuration}, 460 * and connect to the default cluster 461 * 462 * @param conf the configuration. 463 * @throws IOException 464 */ 465 public JobClient(Configuration conf) throws IOException { 466 init(new JobConf(conf)); 467 } 468 469 /** 470 * Connect to the default cluster 471 * @param conf the job configuration. 472 * @throws IOException 473 */ 474 public void init(JobConf conf) throws IOException { 475 setConf(conf); 476 Limits.init(conf); 477 cluster = new Cluster(conf); 478 clientUgi = UserGroupInformation.getCurrentUser(); 479 480 maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, 481 MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES); 482 483 retryInterval = 484 conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, 485 MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL); 486 487 } 488 489 /** 490 * Build a job client, connect to the indicated job tracker. 491 * 492 * @param jobTrackAddr the job tracker to connect to. 493 * @param conf configuration. 494 */ 495 public JobClient(InetSocketAddress jobTrackAddr, 496 Configuration conf) throws IOException { 497 cluster = new Cluster(jobTrackAddr, conf); 498 clientUgi = UserGroupInformation.getCurrentUser(); 499 } 500 501 /** 502 * Close the <code>JobClient</code>. 503 */ 504 @Override 505 public synchronized void close() throws IOException { 506 cluster.close(); 507 } 508 509 /** 510 * Get a filesystem handle. We need this to prepare jobs 511 * for submission to the MapReduce system. 512 * 513 * @return the filesystem handle. 514 */ 515 public synchronized FileSystem getFs() throws IOException { 516 try { 517 return cluster.getFileSystem(); 518 } catch (InterruptedException ie) { 519 throw new IOException(ie); 520 } 521 } 522 523 /** 524 * Get a handle to the Cluster 525 */ 526 public Cluster getClusterHandle() { 527 return cluster; 528 } 529 530 /** 531 * Submit a job to the MR system. 532 * 533 * This returns a handle to the {@link RunningJob} which can be used to track 534 * the running-job. 535 * 536 * @param jobFile the job configuration. 537 * @return a handle to the {@link RunningJob} which can be used to track the 538 * running-job. 539 * @throws FileNotFoundException 540 * @throws InvalidJobConfException 541 * @throws IOException 542 */ 543 public RunningJob submitJob(String jobFile) throws FileNotFoundException, 544 InvalidJobConfException, 545 IOException { 546 // Load in the submitted job details 547 JobConf job = new JobConf(jobFile); 548 return submitJob(job); 549 } 550 551 /** 552 * Submit a job to the MR system. 553 * This returns a handle to the {@link RunningJob} which can be used to track 554 * the running-job. 555 * 556 * @param conf the job configuration. 557 * @return a handle to the {@link RunningJob} which can be used to track the 558 * running-job. 559 * @throws FileNotFoundException 560 * @throws IOException 561 */ 562 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, 563 IOException { 564 return submitJobInternal(conf); 565 } 566 567 @InterfaceAudience.Private 568 public RunningJob submitJobInternal(final JobConf conf) 569 throws FileNotFoundException, IOException { 570 try { 571 conf.setBooleanIfUnset("mapred.mapper.new-api", false); 572 conf.setBooleanIfUnset("mapred.reducer.new-api", false); 573 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { 574 @Override 575 public Job run() throws IOException, ClassNotFoundException, 576 InterruptedException { 577 Job job = Job.getInstance(conf); 578 job.submit(); 579 return job; 580 } 581 }); 582 583 Cluster prev = cluster; 584 // update our Cluster instance with the one created by Job for submission 585 // (we can't pass our Cluster instance to Job, since Job wraps the config 586 // instance, and the two configs would then diverge) 587 cluster = job.getCluster(); 588 589 // It is important to close the previous cluster instance 590 // to cleanup resources. 591 if (prev != null) { 592 prev.close(); 593 } 594 return new NetworkedJob(job); 595 } catch (InterruptedException ie) { 596 throw new IOException("interrupted", ie); 597 } 598 } 599 600 private Job getJobUsingCluster(final JobID jobid) throws IOException, 601 InterruptedException { 602 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() { 603 public Job run() throws IOException, InterruptedException { 604 return cluster.getJob(jobid); 605 } 606 }); 607 } 608 609 protected RunningJob getJobInner(final JobID jobid) throws IOException { 610 try { 611 612 Job job = getJobUsingCluster(jobid); 613 if (job != null) { 614 JobStatus status = JobStatus.downgrade(job.getStatus()); 615 if (status != null) { 616 return new NetworkedJob(status, cluster, 617 new JobConf(job.getConfiguration())); 618 } 619 } 620 } catch (InterruptedException ie) { 621 throw new IOException(ie); 622 } 623 return null; 624 } 625 626 /** 627 * Get an {@link RunningJob} object to track an ongoing job. Returns 628 * null if the id does not correspond to any known job. 629 * 630 * @param jobid the jobid of the job. 631 * @return the {@link RunningJob} handle to track the job, null if the 632 * <code>jobid</code> doesn't correspond to any known job. 633 * @throws IOException 634 */ 635 public RunningJob getJob(final JobID jobid) throws IOException { 636 for (int i = 0;i <= maxRetry;i++) { 637 if (i > 0) { 638 try { 639 Thread.sleep(retryInterval); 640 } catch (Exception e) { } 641 } 642 RunningJob job = getJobInner(jobid); 643 if (job != null) { 644 return job; 645 } 646 } 647 return null; 648 } 649 650 /**@deprecated Applications should rather use {@link #getJob(JobID)}. 651 */ 652 @Deprecated 653 public RunningJob getJob(String jobid) throws IOException { 654 return getJob(JobID.forName(jobid)); 655 } 656 657 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0]; 658 659 /** 660 * Get the information of the current state of the map tasks of a job. 661 * 662 * @param jobId the job to query. 663 * @return the list of all of the map tips. 664 * @throws IOException 665 */ 666 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException { 667 return getTaskReports(jobId, TaskType.MAP); 668 } 669 670 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 671 IOException { 672 try { 673 Job j = getJobUsingCluster(jobId); 674 if(j == null) { 675 return EMPTY_TASK_REPORTS; 676 } 677 return TaskReport.downgradeArray(j.getTaskReports(type)); 678 } catch (InterruptedException ie) { 679 throw new IOException(ie); 680 } 681 } 682 683 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/ 684 @Deprecated 685 public TaskReport[] getMapTaskReports(String jobId) throws IOException { 686 return getMapTaskReports(JobID.forName(jobId)); 687 } 688 689 /** 690 * Get the information of the current state of the reduce tasks of a job. 691 * 692 * @param jobId the job to query. 693 * @return the list of all of the reduce tips. 694 * @throws IOException 695 */ 696 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException { 697 return getTaskReports(jobId, TaskType.REDUCE); 698 } 699 700 /** 701 * Get the information of the current state of the cleanup tasks of a job. 702 * 703 * @param jobId the job to query. 704 * @return the list of all of the cleanup tips. 705 * @throws IOException 706 */ 707 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException { 708 return getTaskReports(jobId, TaskType.JOB_CLEANUP); 709 } 710 711 /** 712 * Get the information of the current state of the setup tasks of a job. 713 * 714 * @param jobId the job to query. 715 * @return the list of all of the setup tips. 716 * @throws IOException 717 */ 718 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException { 719 return getTaskReports(jobId, TaskType.JOB_SETUP); 720 } 721 722 723 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/ 724 @Deprecated 725 public TaskReport[] getReduceTaskReports(String jobId) throws IOException { 726 return getReduceTaskReports(JobID.forName(jobId)); 727 } 728 729 /** 730 * Display the information about a job's tasks, of a particular type and 731 * in a particular state 732 * 733 * @param jobId the ID of the job 734 * @param type the type of the task (map/reduce/setup/cleanup) 735 * @param state the state of the task 736 * (pending/running/completed/failed/killed) 737 */ 738 public void displayTasks(final JobID jobId, String type, String state) 739 throws IOException { 740 try { 741 Job job = getJobUsingCluster(jobId); 742 super.displayTasks(job, type, state); 743 } catch (InterruptedException ie) { 744 throw new IOException(ie); 745 } 746 } 747 748 /** 749 * Get status information about the Map-Reduce cluster. 750 * 751 * @return the status information about the Map-Reduce cluster as an object 752 * of {@link ClusterStatus}. 753 * @throws IOException 754 */ 755 public ClusterStatus getClusterStatus() throws IOException { 756 try { 757 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 758 public ClusterStatus run() throws IOException, InterruptedException { 759 ClusterMetrics metrics = cluster.getClusterStatus(); 760 return new ClusterStatus(metrics.getTaskTrackerCount(), metrics 761 .getBlackListedTaskTrackerCount(), cluster 762 .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 763 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 764 metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(), 765 metrics.getDecommissionedTaskTrackerCount(), metrics 766 .getGrayListedTaskTrackerCount()); 767 } 768 }); 769 } catch (InterruptedException ie) { 770 throw new IOException(ie); 771 } 772 } 773 774 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) { 775 Collection<String> list = new ArrayList<String>(); 776 for (TaskTrackerInfo info: objs) { 777 list.add(info.getTaskTrackerName()); 778 } 779 return list; 780 } 781 782 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) { 783 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>(); 784 for (TaskTrackerInfo info: objs) { 785 BlackListInfo binfo = new BlackListInfo(); 786 binfo.setTrackerName(info.getTaskTrackerName()); 787 binfo.setReasonForBlackListing(info.getReasonForBlacklist()); 788 binfo.setBlackListReport(info.getBlacklistReport()); 789 list.add(binfo); 790 } 791 return list; 792 } 793 794 /** 795 * Get status information about the Map-Reduce cluster. 796 * 797 * @param detailed if true then get a detailed status including the 798 * tracker names 799 * @return the status information about the Map-Reduce cluster as an object 800 * of {@link ClusterStatus}. 801 * @throws IOException 802 */ 803 public ClusterStatus getClusterStatus(boolean detailed) throws IOException { 804 try { 805 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 806 public ClusterStatus run() throws IOException, InterruptedException { 807 ClusterMetrics metrics = cluster.getClusterStatus(); 808 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()), 809 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()), 810 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 811 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 812 metrics.getReduceSlotCapacity(), 813 cluster.getJobTrackerStatus()); 814 } 815 }); 816 } catch (InterruptedException ie) { 817 throw new IOException(ie); 818 } 819 } 820 821 822 /** 823 * Get the jobs that are not completed and not failed. 824 * 825 * @return array of {@link JobStatus} for the running/to-be-run jobs. 826 * @throws IOException 827 */ 828 public JobStatus[] jobsToComplete() throws IOException { 829 List<JobStatus> stats = new ArrayList<JobStatus>(); 830 for (JobStatus stat : getAllJobs()) { 831 if (!stat.isJobComplete()) { 832 stats.add(stat); 833 } 834 } 835 return stats.toArray(new JobStatus[0]); 836 } 837 838 /** 839 * Get the jobs that are submitted. 840 * 841 * @return array of {@link JobStatus} for the submitted jobs. 842 * @throws IOException 843 */ 844 public JobStatus[] getAllJobs() throws IOException { 845 try { 846 org.apache.hadoop.mapreduce.JobStatus[] jobs = 847 clientUgi.doAs(new PrivilegedExceptionAction< 848 org.apache.hadoop.mapreduce.JobStatus[]> () { 849 public org.apache.hadoop.mapreduce.JobStatus[] run() 850 throws IOException, InterruptedException { 851 return cluster.getAllJobStatuses(); 852 } 853 }); 854 JobStatus[] stats = new JobStatus[jobs.length]; 855 for (int i = 0; i < jobs.length; i++) { 856 stats[i] = JobStatus.downgrade(jobs[i]); 857 } 858 return stats; 859 } catch (InterruptedException ie) { 860 throw new IOException(ie); 861 } 862 } 863 864 /** 865 * Utility that submits a job, then polls for progress until the job is 866 * complete. 867 * 868 * @param job the job configuration. 869 * @throws IOException if the job fails 870 */ 871 public static RunningJob runJob(JobConf job) throws IOException { 872 JobClient jc = new JobClient(job); 873 RunningJob rj = jc.submitJob(job); 874 try { 875 if (!jc.monitorAndPrintJob(job, rj)) { 876 throw new IOException("Job failed!"); 877 } 878 } catch (InterruptedException ie) { 879 Thread.currentThread().interrupt(); 880 } 881 return rj; 882 } 883 884 /** 885 * Monitor a job and print status in real-time as progress is made and tasks 886 * fail. 887 * @param conf the job's configuration 888 * @param job the job to track 889 * @return true if the job succeeded 890 * @throws IOException if communication to the JobTracker fails 891 */ 892 public boolean monitorAndPrintJob(JobConf conf, 893 RunningJob job 894 ) throws IOException, InterruptedException { 895 return ((NetworkedJob)job).monitorAndPrintJob(); 896 } 897 898 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 899 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 900 } 901 902 static Configuration getConfiguration(String jobTrackerSpec) 903 { 904 Configuration conf = new Configuration(); 905 if (jobTrackerSpec != null) { 906 if (jobTrackerSpec.indexOf(":") >= 0) { 907 conf.set("mapred.job.tracker", jobTrackerSpec); 908 } else { 909 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml"; 910 URL validate = conf.getResource(classpathFile); 911 if (validate == null) { 912 throw new RuntimeException(classpathFile + " not found on CLASSPATH"); 913 } 914 conf.addResource(classpathFile); 915 } 916 } 917 return conf; 918 } 919 920 /** 921 * Sets the output filter for tasks. only those tasks are printed whose 922 * output matches the filter. 923 * @param newValue task filter. 924 */ 925 @Deprecated 926 public void setTaskOutputFilter(TaskStatusFilter newValue){ 927 this.taskOutputFilter = newValue; 928 } 929 930 /** 931 * Get the task output filter out of the JobConf. 932 * 933 * @param job the JobConf to examine. 934 * @return the filter level. 935 */ 936 public static TaskStatusFilter getTaskOutputFilter(JobConf job) { 937 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 938 "FAILED")); 939 } 940 941 /** 942 * Modify the JobConf to set the task output filter. 943 * 944 * @param job the JobConf to modify. 945 * @param newValue the value to set. 946 */ 947 public static void setTaskOutputFilter(JobConf job, 948 TaskStatusFilter newValue) { 949 job.set("jobclient.output.filter", newValue.toString()); 950 } 951 952 /** 953 * Returns task output filter. 954 * @return task filter. 955 */ 956 @Deprecated 957 public TaskStatusFilter getTaskOutputFilter(){ 958 return this.taskOutputFilter; 959 } 960 961 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs, 962 String counterGroupName, String counterName) throws IOException { 963 Counters counters = Counters.downgrade(cntrs); 964 return counters.findCounter(counterGroupName, counterName).getValue(); 965 } 966 967 /** 968 * Get status information about the max available Maps in the cluster. 969 * 970 * @return the max available Maps in the cluster 971 * @throws IOException 972 */ 973 public int getDefaultMaps() throws IOException { 974 try { 975 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 976 @Override 977 public Integer run() throws IOException, InterruptedException { 978 return cluster.getClusterStatus().getMapSlotCapacity(); 979 } 980 }); 981 } catch (InterruptedException ie) { 982 throw new IOException(ie); 983 } 984 } 985 986 /** 987 * Get status information about the max available Reduces in the cluster. 988 * 989 * @return the max available Reduces in the cluster 990 * @throws IOException 991 */ 992 public int getDefaultReduces() throws IOException { 993 try { 994 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 995 @Override 996 public Integer run() throws IOException, InterruptedException { 997 return cluster.getClusterStatus().getReduceSlotCapacity(); 998 } 999 }); 1000 } catch (InterruptedException ie) { 1001 throw new IOException(ie); 1002 } 1003 } 1004 1005 /** 1006 * Grab the jobtracker system directory path where job-specific files are to be placed. 1007 * 1008 * @return the system directory where job-specific files are to be placed. 1009 */ 1010 public Path getSystemDir() { 1011 try { 1012 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 1013 @Override 1014 public Path run() throws IOException, InterruptedException { 1015 return cluster.getSystemDir(); 1016 } 1017 }); 1018 } catch (IOException ioe) { 1019 return null; 1020 } catch (InterruptedException ie) { 1021 return null; 1022 } 1023 } 1024 1025 /** 1026 * Checks if the job directory is clean and has all the required components 1027 * for (re) starting the job 1028 */ 1029 public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) 1030 throws IOException { 1031 FileStatus[] contents = fs.listStatus(jobDirPath); 1032 int matchCount = 0; 1033 if (contents != null && contents.length >= 2) { 1034 for (FileStatus status : contents) { 1035 if ("job.xml".equals(status.getPath().getName())) { 1036 ++matchCount; 1037 } 1038 if ("job.split".equals(status.getPath().getName())) { 1039 ++matchCount; 1040 } 1041 } 1042 if (matchCount == 2) { 1043 return true; 1044 } 1045 } 1046 return false; 1047 } 1048 1049 /** 1050 * Fetch the staging area directory for the application 1051 * 1052 * @return path to staging area directory 1053 * @throws IOException 1054 */ 1055 public Path getStagingAreaDir() throws IOException { 1056 try { 1057 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 1058 @Override 1059 public Path run() throws IOException, InterruptedException { 1060 return cluster.getStagingAreaDir(); 1061 } 1062 }); 1063 } catch (InterruptedException ie) { 1064 // throw RuntimeException instead for compatibility reasons 1065 throw new RuntimeException(ie); 1066 } 1067 } 1068 1069 private JobQueueInfo getJobQueueInfo(QueueInfo queue) { 1070 JobQueueInfo ret = new JobQueueInfo(queue); 1071 // make sure to convert any children 1072 if (queue.getQueueChildren().size() > 0) { 1073 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue 1074 .getQueueChildren().size()); 1075 for (QueueInfo child : queue.getQueueChildren()) { 1076 childQueues.add(getJobQueueInfo(child)); 1077 } 1078 ret.setChildren(childQueues); 1079 } 1080 return ret; 1081 } 1082 1083 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues) 1084 throws IOException { 1085 JobQueueInfo[] ret = new JobQueueInfo[queues.length]; 1086 for (int i = 0; i < queues.length; i++) { 1087 ret[i] = getJobQueueInfo(queues[i]); 1088 } 1089 return ret; 1090 } 1091 1092 /** 1093 * Returns an array of queue information objects about root level queues 1094 * configured 1095 * 1096 * @return the array of root level JobQueueInfo objects 1097 * @throws IOException 1098 */ 1099 public JobQueueInfo[] getRootQueues() throws IOException { 1100 try { 1101 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1102 public JobQueueInfo[] run() throws IOException, InterruptedException { 1103 return getJobQueueInfoArray(cluster.getRootQueues()); 1104 } 1105 }); 1106 } catch (InterruptedException ie) { 1107 throw new IOException(ie); 1108 } 1109 } 1110 1111 /** 1112 * Returns an array of queue information objects about immediate children 1113 * of queue queueName. 1114 * 1115 * @param queueName 1116 * @return the array of immediate children JobQueueInfo objects 1117 * @throws IOException 1118 */ 1119 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException { 1120 try { 1121 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1122 public JobQueueInfo[] run() throws IOException, InterruptedException { 1123 return getJobQueueInfoArray(cluster.getChildQueues(queueName)); 1124 } 1125 }); 1126 } catch (InterruptedException ie) { 1127 throw new IOException(ie); 1128 } 1129 } 1130 1131 /** 1132 * Return an array of queue information objects about all the Job Queues 1133 * configured. 1134 * 1135 * @return Array of JobQueueInfo objects 1136 * @throws IOException 1137 */ 1138 public JobQueueInfo[] getQueues() throws IOException { 1139 try { 1140 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1141 public JobQueueInfo[] run() throws IOException, InterruptedException { 1142 return getJobQueueInfoArray(cluster.getQueues()); 1143 } 1144 }); 1145 } catch (InterruptedException ie) { 1146 throw new IOException(ie); 1147 } 1148 } 1149 1150 /** 1151 * Gets all the jobs which were added to particular Job Queue 1152 * 1153 * @param queueName name of the Job Queue 1154 * @return Array of jobs present in the job queue 1155 * @throws IOException 1156 */ 1157 1158 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException { 1159 try { 1160 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() { 1161 @Override 1162 public QueueInfo run() throws IOException, InterruptedException { 1163 return cluster.getQueue(queueName); 1164 } 1165 }); 1166 if (queue == null) { 1167 return null; 1168 } 1169 org.apache.hadoop.mapreduce.JobStatus[] stats = 1170 queue.getJobStatuses(); 1171 JobStatus[] ret = new JobStatus[stats.length]; 1172 for (int i = 0 ; i < stats.length; i++ ) { 1173 ret[i] = JobStatus.downgrade(stats[i]); 1174 } 1175 return ret; 1176 } catch (InterruptedException ie) { 1177 throw new IOException(ie); 1178 } 1179 } 1180 1181 /** 1182 * Gets the queue information associated to a particular Job Queue 1183 * 1184 * @param queueName name of the job queue. 1185 * @return Queue information associated to particular queue. 1186 * @throws IOException 1187 */ 1188 public JobQueueInfo getQueueInfo(final String queueName) throws IOException { 1189 try { 1190 QueueInfo queueInfo = clientUgi.doAs(new 1191 PrivilegedExceptionAction<QueueInfo>() { 1192 public QueueInfo run() throws IOException, InterruptedException { 1193 return cluster.getQueue(queueName); 1194 } 1195 }); 1196 if (queueInfo != null) { 1197 return new JobQueueInfo(queueInfo); 1198 } 1199 return null; 1200 } catch (InterruptedException ie) { 1201 throw new IOException(ie); 1202 } 1203 } 1204 1205 /** 1206 * Gets the Queue ACLs for current user 1207 * @return array of QueueAclsInfo object for current user. 1208 * @throws IOException 1209 */ 1210 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException { 1211 try { 1212 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 1213 clientUgi.doAs(new 1214 PrivilegedExceptionAction 1215 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() { 1216 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 1217 throws IOException, InterruptedException { 1218 return cluster.getQueueAclsForCurrentUser(); 1219 } 1220 }); 1221 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length]; 1222 for (int i = 0 ; i < acls.length; i++ ) { 1223 ret[i] = QueueAclsInfo.downgrade(acls[i]); 1224 } 1225 return ret; 1226 } catch (InterruptedException ie) { 1227 throw new IOException(ie); 1228 } 1229 } 1230 1231 /** 1232 * Get a delegation token for the user from the JobTracker. 1233 * @param renewer the user who can renew the token 1234 * @return the new token 1235 * @throws IOException 1236 */ 1237 public Token<DelegationTokenIdentifier> 1238 getDelegationToken(final Text renewer) throws IOException, InterruptedException { 1239 return clientUgi.doAs(new 1240 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { 1241 public Token<DelegationTokenIdentifier> run() throws IOException, 1242 InterruptedException { 1243 return cluster.getDelegationToken(renewer); 1244 } 1245 }); 1246 } 1247 1248 /** 1249 * Renew a delegation token 1250 * @param token the token to renew 1251 * @return true if the renewal went well 1252 * @throws InvalidToken 1253 * @throws IOException 1254 * @deprecated Use {@link Token#renew} instead 1255 */ 1256 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 1257 ) throws InvalidToken, IOException, 1258 InterruptedException { 1259 return token.renew(getConf()); 1260 } 1261 1262 /** 1263 * Cancel a delegation token from the JobTracker 1264 * @param token the token to cancel 1265 * @throws IOException 1266 * @deprecated Use {@link Token#cancel} instead 1267 */ 1268 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 1269 ) throws InvalidToken, IOException, 1270 InterruptedException { 1271 token.cancel(getConf()); 1272 } 1273 1274 /** 1275 */ 1276 public static void main(String argv[]) throws Exception { 1277 int res = ToolRunner.run(new JobClient(), argv); 1278 System.exit(res); 1279 } 1280} 1281