001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceAudience.Private;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.PathFilter;
034import org.apache.hadoop.mapreduce.JobContext;
035import org.apache.hadoop.mapreduce.JobStatus;
036import org.apache.hadoop.mapreduce.MRJobConfig;
037import org.apache.hadoop.mapreduce.OutputCommitter;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.mapreduce.TaskAttemptID;
040
041import com.google.common.annotations.VisibleForTesting;
042
043/** An {@link OutputCommitter} that commits files specified 
044 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
045 **/
046@InterfaceAudience.Public
047@InterfaceStability.Stable
048public class FileOutputCommitter extends OutputCommitter {
049  private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
050
051  /** 
052   * Name of directory where pending data is placed.  Data that has not been
053   * committed yet.
054   */
055  public static final String PENDING_DIR_NAME = "_temporary";
056  /**
057   * Temporary directory name 
058   *
059   * The static variable to be compatible with M/R 1.x
060   */
061  @Deprecated
062  protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
063  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
064  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
065      "mapreduce.fileoutputcommitter.marksuccessfuljobs";
066  public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
067      "mapreduce.fileoutputcommitter.algorithm.version";
068  public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
069
070  // Number of attempts when failure happens in commit job
071  public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS =
072      "mapreduce.fileoutputcommitter.failures.attempts";
073
074  // default value to be 1 to keep consistent with previous behavior
075  public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
076
077  // Skip cleanup _temporary folders under job's output directory
078  public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED =
079      "mapreduce.fileoutputcommitter.cleanup.skipped";
080  public static final boolean
081      FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT = false;
082
083  // Ignore exceptions in cleanup _temporary folder under job's output directory
084  public static final String FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED =
085      "mapreduce.fileoutputcommitter.cleanup-failures.ignored";
086  public static final boolean
087      FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false;
088
089  private Path outputPath = null;
090  private Path workPath = null;
091  private final int algorithmVersion;
092  private final boolean skipCleanup;
093  private final boolean ignoreCleanupFailures;
094
095  /**
096   * Create a file output committer
097   * @param outputPath the job's output path, or null if you want the output
098   * committer to act as a noop.
099   * @param context the task's context
100   * @throws IOException
101   */
102  public FileOutputCommitter(Path outputPath, 
103                             TaskAttemptContext context) throws IOException {
104    this(outputPath, (JobContext)context);
105    if (outputPath != null) {
106      workPath = getTaskAttemptPath(context, outputPath);
107    }
108  }
109  
110  /**
111   * Create a file output committer
112   * @param outputPath the job's output path, or null if you want the output
113   * committer to act as a noop.
114   * @param context the task's context
115   * @throws IOException
116   */
117  @Private
118  public FileOutputCommitter(Path outputPath, 
119                             JobContext context) throws IOException {
120    Configuration conf = context.getConfiguration();
121    algorithmVersion =
122        conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
123                    FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
124    LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
125    if (algorithmVersion != 1 && algorithmVersion != 2) {
126      throw new IOException("Only 1 or 2 algorithm version is supported");
127    }
128
129    // if skip cleanup
130    skipCleanup = conf.getBoolean(
131        FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED,
132        FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT);
133
134    // if ignore failures in cleanup
135    ignoreCleanupFailures = conf.getBoolean(
136        FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED,
137        FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT);
138
139    LOG.info("FileOutputCommitter skip cleanup _temporary folders under " +
140        "output directory:" + skipCleanup + ", ignore cleanup failures: " +
141        ignoreCleanupFailures);
142
143    if (outputPath != null) {
144      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
145      this.outputPath = fs.makeQualified(outputPath);
146    }
147  }
148  
149  /**
150   * @return the path where final output of the job should be placed.  This
151   * could also be considered the committed application attempt path.
152   */
153  private Path getOutputPath() {
154    return this.outputPath;
155  }
156  
157  /**
158   * @return true if we have an output path set, else false.
159   */
160  private boolean hasOutputPath() {
161    return this.outputPath != null;
162  }
163  
164  /**
165   * @return the path where the output of pending job attempts are
166   * stored.
167   */
168  private Path getPendingJobAttemptsPath() {
169    return getPendingJobAttemptsPath(getOutputPath());
170  }
171  
172  /**
173   * Get the location of pending job attempts.
174   * @param out the base output directory.
175   * @return the location of pending job attempts.
176   */
177  private static Path getPendingJobAttemptsPath(Path out) {
178    return new Path(out, PENDING_DIR_NAME);
179  }
180  
181  /**
182   * Get the Application Attempt Id for this job
183   * @param context the context to look in
184   * @return the Application Attempt Id for a given job.
185   */
186  private static int getAppAttemptId(JobContext context) {
187    return context.getConfiguration().getInt(
188        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
189  }
190  
191  /**
192   * Compute the path where the output of a given job attempt will be placed. 
193   * @param context the context of the job.  This is used to get the
194   * application attempt id.
195   * @return the path to store job attempt data.
196   */
197  public Path getJobAttemptPath(JobContext context) {
198    return getJobAttemptPath(context, getOutputPath());
199  }
200  
201  /**
202   * Compute the path where the output of a given job attempt will be placed. 
203   * @param context the context of the job.  This is used to get the
204   * application attempt id.
205   * @param out the output path to place these in.
206   * @return the path to store job attempt data.
207   */
208  public static Path getJobAttemptPath(JobContext context, Path out) {
209    return getJobAttemptPath(getAppAttemptId(context), out);
210  }
211  
212  /**
213   * Compute the path where the output of a given job attempt will be placed. 
214   * @param appAttemptId the ID of the application attempt for this job.
215   * @return the path to store job attempt data.
216   */
217  protected Path getJobAttemptPath(int appAttemptId) {
218    return getJobAttemptPath(appAttemptId, getOutputPath());
219  }
220  
221  /**
222   * Compute the path where the output of a given job attempt will be placed. 
223   * @param appAttemptId the ID of the application attempt for this job.
224   * @return the path to store job attempt data.
225   */
226  private static Path getJobAttemptPath(int appAttemptId, Path out) {
227    return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
228  }
229  
230  /**
231   * Compute the path where the output of pending task attempts are stored.
232   * @param context the context of the job with pending tasks. 
233   * @return the path where the output of pending task attempts are stored.
234   */
235  private Path getPendingTaskAttemptsPath(JobContext context) {
236    return getPendingTaskAttemptsPath(context, getOutputPath());
237  }
238  
239  /**
240   * Compute the path where the output of pending task attempts are stored.
241   * @param context the context of the job with pending tasks. 
242   * @return the path where the output of pending task attempts are stored.
243   */
244  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
245    return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
246  }
247  
248  /**
249   * Compute the path where the output of a task attempt is stored until
250   * that task is committed.
251   * 
252   * @param context the context of the task attempt.
253   * @return the path where a task attempt should be stored.
254   */
255  public Path getTaskAttemptPath(TaskAttemptContext context) {
256    return new Path(getPendingTaskAttemptsPath(context), 
257        String.valueOf(context.getTaskAttemptID()));
258  }
259  
260  /**
261   * Compute the path where the output of a task attempt is stored until
262   * that task is committed.
263   * 
264   * @param context the context of the task attempt.
265   * @param out The output path to put things in.
266   * @return the path where a task attempt should be stored.
267   */
268  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
269    return new Path(getPendingTaskAttemptsPath(context, out), 
270        String.valueOf(context.getTaskAttemptID()));
271  }
272  
273  /**
274   * Compute the path where the output of a committed task is stored until
275   * the entire job is committed.
276   * @param context the context of the task attempt
277   * @return the path where the output of a committed task is stored until
278   * the entire job is committed.
279   */
280  public Path getCommittedTaskPath(TaskAttemptContext context) {
281    return getCommittedTaskPath(getAppAttemptId(context), context);
282  }
283  
284  public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
285    return getCommittedTaskPath(getAppAttemptId(context), context, out);
286  }
287  
288  /**
289   * Compute the path where the output of a committed task is stored until the
290   * entire job is committed for a specific application attempt.
291   * @param appAttemptId the id of the application attempt to use
292   * @param context the context of any task.
293   * @return the path where the output of a committed task is stored.
294   */
295  protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
296    return new Path(getJobAttemptPath(appAttemptId),
297        String.valueOf(context.getTaskAttemptID().getTaskID()));
298  }
299  
300  private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
301    return new Path(getJobAttemptPath(appAttemptId, out),
302        String.valueOf(context.getTaskAttemptID().getTaskID()));
303  }
304
305  private static class CommittedTaskFilter implements PathFilter {
306    @Override
307    public boolean accept(Path path) {
308      return !PENDING_DIR_NAME.equals(path.getName());
309    }
310  }
311
312  /**
313   * Get a list of all paths where output from committed tasks are stored.
314   * @param context the context of the current job
315   * @return the list of these Paths/FileStatuses. 
316   * @throws IOException
317   */
318  private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
319    throws IOException {
320    Path jobAttemptPath = getJobAttemptPath(context);
321    FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
322    return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
323  }
324
325  /**
326   * Get the directory that the task should write results into.
327   * @return the work directory
328   * @throws IOException
329   */
330  public Path getWorkPath() throws IOException {
331    return workPath;
332  }
333
334  /**
335   * Create the temporary directory that is the root of all of the task 
336   * work directories.
337   * @param context the job's context
338   */
339  public void setupJob(JobContext context) throws IOException {
340    if (hasOutputPath()) {
341      Path jobAttemptPath = getJobAttemptPath(context);
342      FileSystem fs = jobAttemptPath.getFileSystem(
343          context.getConfiguration());
344      if (!fs.mkdirs(jobAttemptPath)) {
345        LOG.error("Mkdirs failed to create " + jobAttemptPath);
346      }
347    } else {
348      LOG.warn("Output Path is null in setupJob()");
349    }
350  }
351
352  /**
353   * The job has completed, so do works in commitJobInternal().
354   * Could retry on failure if using algorithm 2.
355   * @param context the job's context
356   */
357  public void commitJob(JobContext context) throws IOException {
358    int maxAttemptsOnFailure = isCommitJobRepeatable(context) ?
359        context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
360            FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1;
361    int attempt = 0;
362    boolean jobCommitNotFinished = true;
363    while (jobCommitNotFinished) {
364      try {
365        commitJobInternal(context);
366        jobCommitNotFinished = false;
367      } catch (Exception e) {
368        if (++attempt >= maxAttemptsOnFailure) {
369          throw e;
370        } else {
371          LOG.warn("Exception get thrown in job commit, retry (" + attempt +
372              ") time.", e);
373        }
374      }
375    }
376  }
377
378  /**
379   * The job has completed, so do following commit job, include:
380   * Move all committed tasks to the final output dir (algorithm 1 only).
381   * Delete the temporary directory, including all of the work directories.
382   * Create a _SUCCESS file to make it as successful.
383   * @param context the job's context
384   */
385  @VisibleForTesting
386  protected void commitJobInternal(JobContext context) throws IOException {
387    if (hasOutputPath()) {
388      Path finalOutput = getOutputPath();
389      FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
390
391      if (algorithmVersion == 1) {
392        for (FileStatus stat: getAllCommittedTaskPaths(context)) {
393          mergePaths(fs, stat, finalOutput);
394        }
395      }
396
397      if (skipCleanup) {
398        LOG.info("Skip cleanup the _temporary folders under job's output " +
399            "directory in commitJob.");
400      } else {
401        // delete the _temporary folder and create a _done file in the o/p
402        // folder
403        try {
404          cleanupJob(context);
405        } catch (IOException e) {
406          if (ignoreCleanupFailures) {
407            // swallow exceptions in cleanup as user configure to make sure
408            // commitJob could be success even when cleanup get failure.
409            LOG.error("Error in cleanup job, manually cleanup is needed.", e);
410          } else {
411            // throw back exception to fail commitJob.
412            throw e;
413          }
414        }
415      }
416      // True if the job requires output.dir marked on successful job.
417      // Note that by default it is set to true.
418      if (context.getConfiguration().getBoolean(
419          SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
420        Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
421        // If job commit is repeatable and previous/another AM could write
422        // mark file already, we need to set overwritten to be true explicitly
423        // in case other FS implementations don't overwritten by default.
424        if (isCommitJobRepeatable(context)) {
425          fs.create(markerPath, true).close();
426        } else {
427          fs.create(markerPath).close();
428        }
429      }
430    } else {
431      LOG.warn("Output Path is null in commitJob()");
432    }
433  }
434
435  /**
436   * Merge two paths together.  Anything in from will be moved into to, if there
437   * are any name conflicts while merging the files or directories in from win.
438   * @param fs the File System to use
439   * @param from the path data is coming from.
440   * @param to the path data is going to.
441   * @throws IOException on any error
442   */
443  private void mergePaths(FileSystem fs, final FileStatus from,
444      final Path to) throws IOException {
445    if (LOG.isDebugEnabled()) {
446      LOG.debug("Merging data from " + from + " to " + to);
447    }
448    FileStatus toStat;
449    try {
450      toStat = fs.getFileStatus(to);
451    } catch (FileNotFoundException fnfe) {
452      toStat = null;
453    }
454
455    if (from.isFile()) {
456      if (toStat != null) {
457        if (!fs.delete(to, true)) {
458          throw new IOException("Failed to delete " + to);
459        }
460      }
461
462      if (!fs.rename(from.getPath(), to)) {
463        throw new IOException("Failed to rename " + from + " to " + to);
464      }
465    } else if (from.isDirectory()) {
466      if (toStat != null) {
467        if (!toStat.isDirectory()) {
468          if (!fs.delete(to, true)) {
469            throw new IOException("Failed to delete " + to);
470          }
471          renameOrMerge(fs, from, to);
472        } else {
473          //It is a directory so merge everything in the directories
474          for (FileStatus subFrom : fs.listStatus(from.getPath())) {
475            Path subTo = new Path(to, subFrom.getPath().getName());
476            mergePaths(fs, subFrom, subTo);
477          }
478        }
479      } else {
480        renameOrMerge(fs, from, to);
481      }
482    }
483  }
484
485  private void renameOrMerge(FileSystem fs, FileStatus from, Path to)
486      throws IOException {
487    if (algorithmVersion == 1) {
488      if (!fs.rename(from.getPath(), to)) {
489        throw new IOException("Failed to rename " + from + " to " + to);
490      }
491    } else {
492      fs.mkdirs(to);
493      for (FileStatus subFrom : fs.listStatus(from.getPath())) {
494        Path subTo = new Path(to, subFrom.getPath().getName());
495        mergePaths(fs, subFrom, subTo);
496      }
497    }
498  }
499
500  @Override
501  @Deprecated
502  public void cleanupJob(JobContext context) throws IOException {
503    if (hasOutputPath()) {
504      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
505      FileSystem fs = pendingJobAttemptsPath
506          .getFileSystem(context.getConfiguration());
507      // if job allow repeatable commit and pendingJobAttemptsPath could be
508      // deleted by previous AM, we should tolerate FileNotFoundException in
509      // this case.
510      try {
511        fs.delete(pendingJobAttemptsPath, true);
512      } catch (FileNotFoundException e) {
513        if (!isCommitJobRepeatable(context)) {
514          throw e;
515        }
516      }
517    } else {
518      LOG.warn("Output Path is null in cleanupJob()");
519    }
520  }
521
522  /**
523   * Delete the temporary directory, including all of the work directories.
524   * @param context the job's context
525   */
526  @Override
527  public void abortJob(JobContext context, JobStatus.State state) 
528  throws IOException {
529    // delete the _temporary folder
530    cleanupJob(context);
531  }
532  
533  /**
534   * No task setup required.
535   */
536  @Override
537  public void setupTask(TaskAttemptContext context) throws IOException {
538    // FileOutputCommitter's setupTask doesn't do anything. Because the
539    // temporary task directory is created on demand when the 
540    // task is writing.
541  }
542
543  /**
544   * Move the files from the work directory to the job output directory
545   * @param context the task context
546   */
547  @Override
548  public void commitTask(TaskAttemptContext context) 
549  throws IOException {
550    commitTask(context, null);
551  }
552
553  @Private
554  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
555      throws IOException {
556
557    TaskAttemptID attemptId = context.getTaskAttemptID();
558    if (hasOutputPath()) {
559      context.progress();
560      if(taskAttemptPath == null) {
561        taskAttemptPath = getTaskAttemptPath(context);
562      }
563      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
564      FileStatus taskAttemptDirStatus;
565      try {
566        taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
567      } catch (FileNotFoundException e) {
568        taskAttemptDirStatus = null;
569      }
570
571      if (taskAttemptDirStatus != null) {
572        if (algorithmVersion == 1) {
573          Path committedTaskPath = getCommittedTaskPath(context);
574          if (fs.exists(committedTaskPath)) {
575             if (!fs.delete(committedTaskPath, true)) {
576               throw new IOException("Could not delete " + committedTaskPath);
577             }
578          }
579          if (!fs.rename(taskAttemptPath, committedTaskPath)) {
580            throw new IOException("Could not rename " + taskAttemptPath + " to "
581                + committedTaskPath);
582          }
583          LOG.info("Saved output of task '" + attemptId + "' to " +
584              committedTaskPath);
585        } else {
586          // directly merge everything from taskAttemptPath to output directory
587          mergePaths(fs, taskAttemptDirStatus, outputPath);
588          LOG.info("Saved output of task '" + attemptId + "' to " +
589              outputPath);
590        }
591      } else {
592        LOG.warn("No Output found for " + attemptId);
593      }
594    } else {
595      LOG.warn("Output Path is null in commitTask()");
596    }
597  }
598
599  /**
600   * Delete the work directory
601   * @throws IOException 
602   */
603  @Override
604  public void abortTask(TaskAttemptContext context) throws IOException {
605    abortTask(context, null);
606  }
607
608  @Private
609  public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
610    if (hasOutputPath()) { 
611      context.progress();
612      if(taskAttemptPath == null) {
613        taskAttemptPath = getTaskAttemptPath(context);
614      }
615      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
616      if(!fs.delete(taskAttemptPath, true)) {
617        LOG.warn("Could not delete "+taskAttemptPath);
618      }
619    } else {
620      LOG.warn("Output Path is null in abortTask()");
621    }
622  }
623
624  /**
625   * Did this task write any files in the work directory?
626   * @param context the task's context
627   */
628  @Override
629  public boolean needsTaskCommit(TaskAttemptContext context
630                                 ) throws IOException {
631    return needsTaskCommit(context, null);
632  }
633
634  @Private
635  public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
636    ) throws IOException {
637    if(hasOutputPath()) {
638      if(taskAttemptPath == null) {
639        taskAttemptPath = getTaskAttemptPath(context);
640      }
641      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
642      return fs.exists(taskAttemptPath);
643    }
644    return false;
645  }
646
647  @Override
648  @Deprecated
649  public boolean isRecoverySupported() {
650    return true;
651  }
652
653  @Override
654  public boolean isCommitJobRepeatable(JobContext context) throws IOException {
655    return algorithmVersion == 2;
656  }
657
658  @Override
659  public void recoverTask(TaskAttemptContext context)
660      throws IOException {
661    if(hasOutputPath()) {
662      context.progress();
663      TaskAttemptID attemptId = context.getTaskAttemptID();
664      int previousAttempt = getAppAttemptId(context) - 1;
665      if (previousAttempt < 0) {
666        throw new IOException ("Cannot recover task output for first attempt...");
667      }
668
669      Path previousCommittedTaskPath = getCommittedTaskPath(
670          previousAttempt, context);
671      FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
672      if (LOG.isDebugEnabled()) {
673        LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
674      }
675      if (algorithmVersion == 1) {
676        if (fs.exists(previousCommittedTaskPath)) {
677          Path committedTaskPath = getCommittedTaskPath(context);
678          if (fs.exists(committedTaskPath)) {
679            if (!fs.delete(committedTaskPath, true)) {
680              throw new IOException("Could not delete "+committedTaskPath);
681            }
682          }
683          //Rename can fail if the parent directory does not yet exist.
684          Path committedParent = committedTaskPath.getParent();
685          fs.mkdirs(committedParent);
686          if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
687            throw new IOException("Could not rename " + previousCommittedTaskPath +
688                " to " + committedTaskPath);
689          }
690        } else {
691            LOG.warn(attemptId+" had no output to recover.");
692        }
693      } else {
694        // essentially a no-op, but for backwards compatibility
695        // after upgrade to the new fileOutputCommitter,
696        // check if there are any output left in committedTaskPath
697        if (fs.exists(previousCommittedTaskPath)) {
698          LOG.info("Recovering task for upgrading scenario, moving files from "
699              + previousCommittedTaskPath + " to " + outputPath);
700          FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
701          mergePaths(fs, from, outputPath);
702        }
703        LOG.info("Done recovering task " + attemptId);
704      }
705    } else {
706      LOG.warn("Output Path is null in recoverTask()");
707    }
708  }
709}