001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceAudience.Private; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.fs.PathFilter; 034import org.apache.hadoop.mapreduce.JobContext; 035import org.apache.hadoop.mapreduce.JobStatus; 036import org.apache.hadoop.mapreduce.MRJobConfig; 037import org.apache.hadoop.mapreduce.OutputCommitter; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.mapreduce.TaskAttemptID; 040 041import com.google.common.annotations.VisibleForTesting; 042 043/** An {@link OutputCommitter} that commits files specified 044 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 045 **/ 046@InterfaceAudience.Public 047@InterfaceStability.Stable 048public class FileOutputCommitter extends OutputCommitter { 049 private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class); 050 051 /** 052 * Name of directory where pending data is placed. Data that has not been 053 * committed yet. 054 */ 055 public static final String PENDING_DIR_NAME = "_temporary"; 056 /** 057 * Temporary directory name 058 * 059 * The static variable to be compatible with M/R 1.x 060 */ 061 @Deprecated 062 protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME; 063 public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; 064 public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 065 "mapreduce.fileoutputcommitter.marksuccessfuljobs"; 066 public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = 067 "mapreduce.fileoutputcommitter.algorithm.version"; 068 public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1; 069 070 // Number of attempts when failure happens in commit job 071 public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS = 072 "mapreduce.fileoutputcommitter.failures.attempts"; 073 074 // default value to be 1 to keep consistent with previous behavior 075 public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1; 076 077 // Skip cleanup _temporary folders under job's output directory 078 public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED = 079 "mapreduce.fileoutputcommitter.cleanup.skipped"; 080 public static final boolean 081 FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT = false; 082 083 // Ignore exceptions in cleanup _temporary folder under job's output directory 084 public static final String FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED = 085 "mapreduce.fileoutputcommitter.cleanup-failures.ignored"; 086 public static final boolean 087 FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false; 088 089 private Path outputPath = null; 090 private Path workPath = null; 091 private final int algorithmVersion; 092 private final boolean skipCleanup; 093 private final boolean ignoreCleanupFailures; 094 095 /** 096 * Create a file output committer 097 * @param outputPath the job's output path, or null if you want the output 098 * committer to act as a noop. 099 * @param context the task's context 100 * @throws IOException 101 */ 102 public FileOutputCommitter(Path outputPath, 103 TaskAttemptContext context) throws IOException { 104 this(outputPath, (JobContext)context); 105 if (outputPath != null) { 106 workPath = getTaskAttemptPath(context, outputPath); 107 } 108 } 109 110 /** 111 * Create a file output committer 112 * @param outputPath the job's output path, or null if you want the output 113 * committer to act as a noop. 114 * @param context the task's context 115 * @throws IOException 116 */ 117 @Private 118 public FileOutputCommitter(Path outputPath, 119 JobContext context) throws IOException { 120 Configuration conf = context.getConfiguration(); 121 algorithmVersion = 122 conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 123 FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT); 124 LOG.info("File Output Committer Algorithm version is " + algorithmVersion); 125 if (algorithmVersion != 1 && algorithmVersion != 2) { 126 throw new IOException("Only 1 or 2 algorithm version is supported"); 127 } 128 129 // if skip cleanup 130 skipCleanup = conf.getBoolean( 131 FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED, 132 FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT); 133 134 // if ignore failures in cleanup 135 ignoreCleanupFailures = conf.getBoolean( 136 FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED, 137 FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT); 138 139 LOG.info("FileOutputCommitter skip cleanup _temporary folders under " + 140 "output directory:" + skipCleanup + ", ignore cleanup failures: " + 141 ignoreCleanupFailures); 142 143 if (outputPath != null) { 144 FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); 145 this.outputPath = fs.makeQualified(outputPath); 146 } 147 } 148 149 /** 150 * @return the path where final output of the job should be placed. This 151 * could also be considered the committed application attempt path. 152 */ 153 private Path getOutputPath() { 154 return this.outputPath; 155 } 156 157 /** 158 * @return true if we have an output path set, else false. 159 */ 160 private boolean hasOutputPath() { 161 return this.outputPath != null; 162 } 163 164 /** 165 * @return the path where the output of pending job attempts are 166 * stored. 167 */ 168 private Path getPendingJobAttemptsPath() { 169 return getPendingJobAttemptsPath(getOutputPath()); 170 } 171 172 /** 173 * Get the location of pending job attempts. 174 * @param out the base output directory. 175 * @return the location of pending job attempts. 176 */ 177 private static Path getPendingJobAttemptsPath(Path out) { 178 return new Path(out, PENDING_DIR_NAME); 179 } 180 181 /** 182 * Get the Application Attempt Id for this job 183 * @param context the context to look in 184 * @return the Application Attempt Id for a given job. 185 */ 186 private static int getAppAttemptId(JobContext context) { 187 return context.getConfiguration().getInt( 188 MRJobConfig.APPLICATION_ATTEMPT_ID, 0); 189 } 190 191 /** 192 * Compute the path where the output of a given job attempt will be placed. 193 * @param context the context of the job. This is used to get the 194 * application attempt id. 195 * @return the path to store job attempt data. 196 */ 197 public Path getJobAttemptPath(JobContext context) { 198 return getJobAttemptPath(context, getOutputPath()); 199 } 200 201 /** 202 * Compute the path where the output of a given job attempt will be placed. 203 * @param context the context of the job. This is used to get the 204 * application attempt id. 205 * @param out the output path to place these in. 206 * @return the path to store job attempt data. 207 */ 208 public static Path getJobAttemptPath(JobContext context, Path out) { 209 return getJobAttemptPath(getAppAttemptId(context), out); 210 } 211 212 /** 213 * Compute the path where the output of a given job attempt will be placed. 214 * @param appAttemptId the ID of the application attempt for this job. 215 * @return the path to store job attempt data. 216 */ 217 protected Path getJobAttemptPath(int appAttemptId) { 218 return getJobAttemptPath(appAttemptId, getOutputPath()); 219 } 220 221 /** 222 * Compute the path where the output of a given job attempt will be placed. 223 * @param appAttemptId the ID of the application attempt for this job. 224 * @return the path to store job attempt data. 225 */ 226 private static Path getJobAttemptPath(int appAttemptId, Path out) { 227 return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId)); 228 } 229 230 /** 231 * Compute the path where the output of pending task attempts are stored. 232 * @param context the context of the job with pending tasks. 233 * @return the path where the output of pending task attempts are stored. 234 */ 235 private Path getPendingTaskAttemptsPath(JobContext context) { 236 return getPendingTaskAttemptsPath(context, getOutputPath()); 237 } 238 239 /** 240 * Compute the path where the output of pending task attempts are stored. 241 * @param context the context of the job with pending tasks. 242 * @return the path where the output of pending task attempts are stored. 243 */ 244 private static Path getPendingTaskAttemptsPath(JobContext context, Path out) { 245 return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME); 246 } 247 248 /** 249 * Compute the path where the output of a task attempt is stored until 250 * that task is committed. 251 * 252 * @param context the context of the task attempt. 253 * @return the path where a task attempt should be stored. 254 */ 255 public Path getTaskAttemptPath(TaskAttemptContext context) { 256 return new Path(getPendingTaskAttemptsPath(context), 257 String.valueOf(context.getTaskAttemptID())); 258 } 259 260 /** 261 * Compute the path where the output of a task attempt is stored until 262 * that task is committed. 263 * 264 * @param context the context of the task attempt. 265 * @param out The output path to put things in. 266 * @return the path where a task attempt should be stored. 267 */ 268 public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) { 269 return new Path(getPendingTaskAttemptsPath(context, out), 270 String.valueOf(context.getTaskAttemptID())); 271 } 272 273 /** 274 * Compute the path where the output of a committed task is stored until 275 * the entire job is committed. 276 * @param context the context of the task attempt 277 * @return the path where the output of a committed task is stored until 278 * the entire job is committed. 279 */ 280 public Path getCommittedTaskPath(TaskAttemptContext context) { 281 return getCommittedTaskPath(getAppAttemptId(context), context); 282 } 283 284 public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) { 285 return getCommittedTaskPath(getAppAttemptId(context), context, out); 286 } 287 288 /** 289 * Compute the path where the output of a committed task is stored until the 290 * entire job is committed for a specific application attempt. 291 * @param appAttemptId the id of the application attempt to use 292 * @param context the context of any task. 293 * @return the path where the output of a committed task is stored. 294 */ 295 protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { 296 return new Path(getJobAttemptPath(appAttemptId), 297 String.valueOf(context.getTaskAttemptID().getTaskID())); 298 } 299 300 private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) { 301 return new Path(getJobAttemptPath(appAttemptId, out), 302 String.valueOf(context.getTaskAttemptID().getTaskID())); 303 } 304 305 private static class CommittedTaskFilter implements PathFilter { 306 @Override 307 public boolean accept(Path path) { 308 return !PENDING_DIR_NAME.equals(path.getName()); 309 } 310 } 311 312 /** 313 * Get a list of all paths where output from committed tasks are stored. 314 * @param context the context of the current job 315 * @return the list of these Paths/FileStatuses. 316 * @throws IOException 317 */ 318 private FileStatus[] getAllCommittedTaskPaths(JobContext context) 319 throws IOException { 320 Path jobAttemptPath = getJobAttemptPath(context); 321 FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration()); 322 return fs.listStatus(jobAttemptPath, new CommittedTaskFilter()); 323 } 324 325 /** 326 * Get the directory that the task should write results into. 327 * @return the work directory 328 * @throws IOException 329 */ 330 public Path getWorkPath() throws IOException { 331 return workPath; 332 } 333 334 /** 335 * Create the temporary directory that is the root of all of the task 336 * work directories. 337 * @param context the job's context 338 */ 339 public void setupJob(JobContext context) throws IOException { 340 if (hasOutputPath()) { 341 Path jobAttemptPath = getJobAttemptPath(context); 342 FileSystem fs = jobAttemptPath.getFileSystem( 343 context.getConfiguration()); 344 if (!fs.mkdirs(jobAttemptPath)) { 345 LOG.error("Mkdirs failed to create " + jobAttemptPath); 346 } 347 } else { 348 LOG.warn("Output Path is null in setupJob()"); 349 } 350 } 351 352 /** 353 * The job has completed, so do works in commitJobInternal(). 354 * Could retry on failure if using algorithm 2. 355 * @param context the job's context 356 */ 357 public void commitJob(JobContext context) throws IOException { 358 int maxAttemptsOnFailure = isCommitJobRepeatable(context) ? 359 context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, 360 FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1; 361 int attempt = 0; 362 boolean jobCommitNotFinished = true; 363 while (jobCommitNotFinished) { 364 try { 365 commitJobInternal(context); 366 jobCommitNotFinished = false; 367 } catch (Exception e) { 368 if (++attempt >= maxAttemptsOnFailure) { 369 throw e; 370 } else { 371 LOG.warn("Exception get thrown in job commit, retry (" + attempt + 372 ") time.", e); 373 } 374 } 375 } 376 } 377 378 /** 379 * The job has completed, so do following commit job, include: 380 * Move all committed tasks to the final output dir (algorithm 1 only). 381 * Delete the temporary directory, including all of the work directories. 382 * Create a _SUCCESS file to make it as successful. 383 * @param context the job's context 384 */ 385 @VisibleForTesting 386 protected void commitJobInternal(JobContext context) throws IOException { 387 if (hasOutputPath()) { 388 Path finalOutput = getOutputPath(); 389 FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); 390 391 if (algorithmVersion == 1) { 392 for (FileStatus stat: getAllCommittedTaskPaths(context)) { 393 mergePaths(fs, stat, finalOutput); 394 } 395 } 396 397 if (skipCleanup) { 398 LOG.info("Skip cleanup the _temporary folders under job's output " + 399 "directory in commitJob."); 400 } else { 401 // delete the _temporary folder and create a _done file in the o/p 402 // folder 403 try { 404 cleanupJob(context); 405 } catch (IOException e) { 406 if (ignoreCleanupFailures) { 407 // swallow exceptions in cleanup as user configure to make sure 408 // commitJob could be success even when cleanup get failure. 409 LOG.error("Error in cleanup job, manually cleanup is needed.", e); 410 } else { 411 // throw back exception to fail commitJob. 412 throw e; 413 } 414 } 415 } 416 // True if the job requires output.dir marked on successful job. 417 // Note that by default it is set to true. 418 if (context.getConfiguration().getBoolean( 419 SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { 420 Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); 421 // If job commit is repeatable and previous/another AM could write 422 // mark file already, we need to set overwritten to be true explicitly 423 // in case other FS implementations don't overwritten by default. 424 if (isCommitJobRepeatable(context)) { 425 fs.create(markerPath, true).close(); 426 } else { 427 fs.create(markerPath).close(); 428 } 429 } 430 } else { 431 LOG.warn("Output Path is null in commitJob()"); 432 } 433 } 434 435 /** 436 * Merge two paths together. Anything in from will be moved into to, if there 437 * are any name conflicts while merging the files or directories in from win. 438 * @param fs the File System to use 439 * @param from the path data is coming from. 440 * @param to the path data is going to. 441 * @throws IOException on any error 442 */ 443 private void mergePaths(FileSystem fs, final FileStatus from, 444 final Path to) throws IOException { 445 if (LOG.isDebugEnabled()) { 446 LOG.debug("Merging data from " + from + " to " + to); 447 } 448 FileStatus toStat; 449 try { 450 toStat = fs.getFileStatus(to); 451 } catch (FileNotFoundException fnfe) { 452 toStat = null; 453 } 454 455 if (from.isFile()) { 456 if (toStat != null) { 457 if (!fs.delete(to, true)) { 458 throw new IOException("Failed to delete " + to); 459 } 460 } 461 462 if (!fs.rename(from.getPath(), to)) { 463 throw new IOException("Failed to rename " + from + " to " + to); 464 } 465 } else if (from.isDirectory()) { 466 if (toStat != null) { 467 if (!toStat.isDirectory()) { 468 if (!fs.delete(to, true)) { 469 throw new IOException("Failed to delete " + to); 470 } 471 renameOrMerge(fs, from, to); 472 } else { 473 //It is a directory so merge everything in the directories 474 for (FileStatus subFrom : fs.listStatus(from.getPath())) { 475 Path subTo = new Path(to, subFrom.getPath().getName()); 476 mergePaths(fs, subFrom, subTo); 477 } 478 } 479 } else { 480 renameOrMerge(fs, from, to); 481 } 482 } 483 } 484 485 private void renameOrMerge(FileSystem fs, FileStatus from, Path to) 486 throws IOException { 487 if (algorithmVersion == 1) { 488 if (!fs.rename(from.getPath(), to)) { 489 throw new IOException("Failed to rename " + from + " to " + to); 490 } 491 } else { 492 fs.mkdirs(to); 493 for (FileStatus subFrom : fs.listStatus(from.getPath())) { 494 Path subTo = new Path(to, subFrom.getPath().getName()); 495 mergePaths(fs, subFrom, subTo); 496 } 497 } 498 } 499 500 @Override 501 @Deprecated 502 public void cleanupJob(JobContext context) throws IOException { 503 if (hasOutputPath()) { 504 Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); 505 FileSystem fs = pendingJobAttemptsPath 506 .getFileSystem(context.getConfiguration()); 507 // if job allow repeatable commit and pendingJobAttemptsPath could be 508 // deleted by previous AM, we should tolerate FileNotFoundException in 509 // this case. 510 try { 511 fs.delete(pendingJobAttemptsPath, true); 512 } catch (FileNotFoundException e) { 513 if (!isCommitJobRepeatable(context)) { 514 throw e; 515 } 516 } 517 } else { 518 LOG.warn("Output Path is null in cleanupJob()"); 519 } 520 } 521 522 /** 523 * Delete the temporary directory, including all of the work directories. 524 * @param context the job's context 525 */ 526 @Override 527 public void abortJob(JobContext context, JobStatus.State state) 528 throws IOException { 529 // delete the _temporary folder 530 cleanupJob(context); 531 } 532 533 /** 534 * No task setup required. 535 */ 536 @Override 537 public void setupTask(TaskAttemptContext context) throws IOException { 538 // FileOutputCommitter's setupTask doesn't do anything. Because the 539 // temporary task directory is created on demand when the 540 // task is writing. 541 } 542 543 /** 544 * Move the files from the work directory to the job output directory 545 * @param context the task context 546 */ 547 @Override 548 public void commitTask(TaskAttemptContext context) 549 throws IOException { 550 commitTask(context, null); 551 } 552 553 @Private 554 public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 555 throws IOException { 556 557 TaskAttemptID attemptId = context.getTaskAttemptID(); 558 if (hasOutputPath()) { 559 context.progress(); 560 if(taskAttemptPath == null) { 561 taskAttemptPath = getTaskAttemptPath(context); 562 } 563 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 564 FileStatus taskAttemptDirStatus; 565 try { 566 taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath); 567 } catch (FileNotFoundException e) { 568 taskAttemptDirStatus = null; 569 } 570 571 if (taskAttemptDirStatus != null) { 572 if (algorithmVersion == 1) { 573 Path committedTaskPath = getCommittedTaskPath(context); 574 if (fs.exists(committedTaskPath)) { 575 if (!fs.delete(committedTaskPath, true)) { 576 throw new IOException("Could not delete " + committedTaskPath); 577 } 578 } 579 if (!fs.rename(taskAttemptPath, committedTaskPath)) { 580 throw new IOException("Could not rename " + taskAttemptPath + " to " 581 + committedTaskPath); 582 } 583 LOG.info("Saved output of task '" + attemptId + "' to " + 584 committedTaskPath); 585 } else { 586 // directly merge everything from taskAttemptPath to output directory 587 mergePaths(fs, taskAttemptDirStatus, outputPath); 588 LOG.info("Saved output of task '" + attemptId + "' to " + 589 outputPath); 590 } 591 } else { 592 LOG.warn("No Output found for " + attemptId); 593 } 594 } else { 595 LOG.warn("Output Path is null in commitTask()"); 596 } 597 } 598 599 /** 600 * Delete the work directory 601 * @throws IOException 602 */ 603 @Override 604 public void abortTask(TaskAttemptContext context) throws IOException { 605 abortTask(context, null); 606 } 607 608 @Private 609 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { 610 if (hasOutputPath()) { 611 context.progress(); 612 if(taskAttemptPath == null) { 613 taskAttemptPath = getTaskAttemptPath(context); 614 } 615 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 616 if(!fs.delete(taskAttemptPath, true)) { 617 LOG.warn("Could not delete "+taskAttemptPath); 618 } 619 } else { 620 LOG.warn("Output Path is null in abortTask()"); 621 } 622 } 623 624 /** 625 * Did this task write any files in the work directory? 626 * @param context the task's context 627 */ 628 @Override 629 public boolean needsTaskCommit(TaskAttemptContext context 630 ) throws IOException { 631 return needsTaskCommit(context, null); 632 } 633 634 @Private 635 public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath 636 ) throws IOException { 637 if(hasOutputPath()) { 638 if(taskAttemptPath == null) { 639 taskAttemptPath = getTaskAttemptPath(context); 640 } 641 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 642 return fs.exists(taskAttemptPath); 643 } 644 return false; 645 } 646 647 @Override 648 @Deprecated 649 public boolean isRecoverySupported() { 650 return true; 651 } 652 653 @Override 654 public boolean isCommitJobRepeatable(JobContext context) throws IOException { 655 return algorithmVersion == 2; 656 } 657 658 @Override 659 public void recoverTask(TaskAttemptContext context) 660 throws IOException { 661 if(hasOutputPath()) { 662 context.progress(); 663 TaskAttemptID attemptId = context.getTaskAttemptID(); 664 int previousAttempt = getAppAttemptId(context) - 1; 665 if (previousAttempt < 0) { 666 throw new IOException ("Cannot recover task output for first attempt..."); 667 } 668 669 Path previousCommittedTaskPath = getCommittedTaskPath( 670 previousAttempt, context); 671 FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration()); 672 if (LOG.isDebugEnabled()) { 673 LOG.debug("Trying to recover task from " + previousCommittedTaskPath); 674 } 675 if (algorithmVersion == 1) { 676 if (fs.exists(previousCommittedTaskPath)) { 677 Path committedTaskPath = getCommittedTaskPath(context); 678 if (fs.exists(committedTaskPath)) { 679 if (!fs.delete(committedTaskPath, true)) { 680 throw new IOException("Could not delete "+committedTaskPath); 681 } 682 } 683 //Rename can fail if the parent directory does not yet exist. 684 Path committedParent = committedTaskPath.getParent(); 685 fs.mkdirs(committedParent); 686 if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) { 687 throw new IOException("Could not rename " + previousCommittedTaskPath + 688 " to " + committedTaskPath); 689 } 690 } else { 691 LOG.warn(attemptId+" had no output to recover."); 692 } 693 } else { 694 // essentially a no-op, but for backwards compatibility 695 // after upgrade to the new fileOutputCommitter, 696 // check if there are any output left in committedTaskPath 697 if (fs.exists(previousCommittedTaskPath)) { 698 LOG.info("Recovering task for upgrading scenario, moving files from " 699 + previousCommittedTaskPath + " to " + outputPath); 700 FileStatus from = fs.getFileStatus(previousCommittedTaskPath); 701 mergePaths(fs, from, outputPath); 702 } 703 LOG.info("Done recovering task " + attemptId); 704 } 705 } else { 706 LOG.warn("Output Path is null in recoverTask()"); 707 } 708 } 709}