001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs;
020
021import java.io.EOFException;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.channels.ClosedChannelException;
026import java.util.Arrays;
027import java.util.List;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.permission.AclEntry;
033import org.apache.hadoop.fs.permission.FsPermission;
034import org.apache.hadoop.util.DataChecksum;
035import org.apache.hadoop.util.Progressable;
036
037/****************************************************************
038 * Abstract Checksumed FileSystem.
039 * It provide a basic implementation of a Checksumed FileSystem,
040 * which creates a checksum file for each raw file.
041 * It generates & verifies checksums at the client side.
042 *
043 *****************************************************************/
044@InterfaceAudience.Public
045@InterfaceStability.Stable
046public abstract class ChecksumFileSystem extends FilterFileSystem {
047  private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
048  private int bytesPerChecksum = 512;
049  private boolean verifyChecksum = true;
050  private boolean writeChecksum = true;
051
052  public static double getApproxChkSumLength(long size) {
053    return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
054  }
055  
056  public ChecksumFileSystem(FileSystem fs) {
057    super(fs);
058  }
059
060  @Override
061  public void setConf(Configuration conf) {
062    super.setConf(conf);
063    if (conf != null) {
064      bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
065                                     LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
066    }
067  }
068  
069  /**
070   * Set whether to verify checksum.
071   */
072  @Override
073  public void setVerifyChecksum(boolean verifyChecksum) {
074    this.verifyChecksum = verifyChecksum;
075  }
076
077  @Override
078  public void setWriteChecksum(boolean writeChecksum) {
079    this.writeChecksum = writeChecksum;
080  }
081  
082  /** get the raw file system */
083  @Override
084  public FileSystem getRawFileSystem() {
085    return fs;
086  }
087
088  /** Return the name of the checksum file associated with a file.*/
089  public Path getChecksumFile(Path file) {
090    return new Path(file.getParent(), "." + file.getName() + ".crc");
091  }
092
093  /** Return true iff file is a checksum file name.*/
094  public static boolean isChecksumFile(Path file) {
095    String name = file.getName();
096    return name.startsWith(".") && name.endsWith(".crc");
097  }
098
099  /** Return the length of the checksum file given the size of the 
100   * actual file.
101   **/
102  public long getChecksumFileLength(Path file, long fileSize) {
103    return getChecksumLength(fileSize, getBytesPerSum());
104  }
105
106  /** Return the bytes Per Checksum */
107  public int getBytesPerSum() {
108    return bytesPerChecksum;
109  }
110
111  private int getSumBufferSize(int bytesPerSum, int bufferSize) {
112    int defaultBufferSize = getConf().getInt(
113                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
114                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
115    int proportionalBufferSize = bufferSize / bytesPerSum;
116    return Math.max(bytesPerSum,
117                    Math.max(proportionalBufferSize, defaultBufferSize));
118  }
119
120  /*******************************************************
121   * For open()'s FSInputStream
122   * It verifies that data matches checksums.
123   *******************************************************/
124  private static class ChecksumFSInputChecker extends FSInputChecker {
125    private ChecksumFileSystem fs;
126    private FSDataInputStream datas;
127    private FSDataInputStream sums;
128    
129    private static final int HEADER_LENGTH = 8;
130    
131    private int bytesPerSum = 1;
132    
133    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
134      throws IOException {
135      this(fs, file, fs.getConf().getInt(
136                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
137                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
138    }
139    
140    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
141      throws IOException {
142      super( file, fs.getFileStatus(file).getReplication() );
143      this.datas = fs.getRawFileSystem().open(file, bufferSize);
144      this.fs = fs;
145      Path sumFile = fs.getChecksumFile(file);
146      try {
147        int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
148        sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
149
150        byte[] version = new byte[CHECKSUM_VERSION.length];
151        sums.readFully(version);
152        if (!Arrays.equals(version, CHECKSUM_VERSION))
153          throw new IOException("Not a checksum file: "+sumFile);
154        this.bytesPerSum = sums.readInt();
155        set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
156      } catch (IOException e) {
157        // mincing the message is terrible, but java throws permission
158        // exceptions as FNF because that's all the method signatures allow!
159        if (!(e instanceof FileNotFoundException) ||
160            e.getMessage().endsWith(" (Permission denied)")) {
161          LOG.warn("Problem opening checksum file: "+ file +
162              ".  Ignoring exception: " , e);
163        }
164        set(fs.verifyChecksum, null, 1, 0);
165      }
166    }
167    
168    private long getChecksumFilePos( long dataPos ) {
169      return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
170    }
171    
172    @Override
173    protected long getChunkPosition( long dataPos ) {
174      return dataPos/bytesPerSum*bytesPerSum;
175    }
176    
177    @Override
178    public int available() throws IOException {
179      return datas.available() + super.available();
180    }
181    
182    @Override
183    public int read(long position, byte[] b, int off, int len)
184      throws IOException {
185      // parameter check
186      validatePositionedReadArgs(position, b, off, len);
187      if (len == 0) {
188        return 0;
189      }
190
191      int nread;
192      try (ChecksumFSInputChecker checker =
193               new ChecksumFSInputChecker(fs, file)) {
194        checker.seek(position);
195        nread = checker.read(b, off, len);
196        checker.close();
197      }
198      return nread;
199    }
200    
201    @Override
202    public void close() throws IOException {
203      datas.close();
204      if( sums != null ) {
205        sums.close();
206      }
207      set(fs.verifyChecksum, null, 1, 0);
208    }
209    
210
211    @Override
212    public boolean seekToNewSource(long targetPos) throws IOException {
213      long sumsPos = getChecksumFilePos(targetPos);
214      fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
215      boolean newDataSource = datas.seekToNewSource(targetPos);
216      return sums.seekToNewSource(sumsPos) || newDataSource;
217    }
218
219    @Override
220    protected int readChunk(long pos, byte[] buf, int offset, int len,
221        byte[] checksum) throws IOException {
222
223      boolean eof = false;
224      if (needChecksum()) {
225        assert checksum != null; // we have a checksum buffer
226        assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
227        assert len >= bytesPerSum; // we must read at least one chunk
228
229        final int checksumsToRead = Math.min(
230          len/bytesPerSum, // number of checksums based on len to read
231          checksum.length / CHECKSUM_SIZE); // size of checksum buffer
232        long checksumPos = getChecksumFilePos(pos); 
233        if(checksumPos != sums.getPos()) {
234          sums.seek(checksumPos);
235        }
236
237        int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
238        if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
239          throw new ChecksumException(
240            "Checksum file not a length multiple of checksum size " +
241            "in " + file + " at " + pos + " checksumpos: " + checksumPos +
242            " sumLenread: " + sumLenRead,
243            pos);
244        }
245        if (sumLenRead <= 0) { // we're at the end of the file
246          eof = true;
247        } else {
248          // Adjust amount of data to read based on how many checksum chunks we read
249          len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
250        }
251      }
252      if(pos != datas.getPos()) {
253        datas.seek(pos);
254      }
255      int nread = readFully(datas, buf, offset, len);
256      if (eof && nread > 0) {
257        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
258      }
259      return nread;
260    }
261  }
262  
263  private static class FSDataBoundedInputStream extends FSDataInputStream {
264    private FileSystem fs;
265    private Path file;
266    private long fileLen = -1L;
267
268    FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) {
269      super(in);
270      this.fs = fs;
271      this.file = file;
272    }
273    
274    @Override
275    public boolean markSupported() {
276      return false;
277    }
278    
279    /* Return the file length */
280    private long getFileLength() throws IOException {
281      if( fileLen==-1L ) {
282        fileLen = fs.getContentSummary(file).getLength();
283      }
284      return fileLen;
285    }
286    
287    /**
288     * Skips over and discards <code>n</code> bytes of data from the
289     * input stream.
290     *
291     *The <code>skip</code> method skips over some smaller number of bytes
292     * when reaching end of file before <code>n</code> bytes have been skipped.
293     * The actual number of bytes skipped is returned.  If <code>n</code> is
294     * negative, no bytes are skipped.
295     *
296     * @param      n   the number of bytes to be skipped.
297     * @return     the actual number of bytes skipped.
298     * @exception  IOException  if an I/O error occurs.
299     *             ChecksumException if the chunk to skip to is corrupted
300     */
301    @Override
302    public synchronized long skip(long n) throws IOException {
303      long curPos = getPos();
304      long fileLength = getFileLength();
305      if( n+curPos > fileLength ) {
306        n = fileLength - curPos;
307      }
308      return super.skip(n);
309    }
310    
311    /**
312     * Seek to the given position in the stream.
313     * The next read() will be from that position.
314     * 
315     * <p>This method does not allow seek past the end of the file.
316     * This produces IOException.
317     *
318     * @param      pos   the postion to seek to.
319     * @exception  IOException  if an I/O error occurs or seeks after EOF
320     *             ChecksumException if the chunk to seek to is corrupted
321     */
322
323    @Override
324    public synchronized void seek(long pos) throws IOException {
325      if (pos > getFileLength()) {
326        throw new EOFException("Cannot seek after EOF");
327      }
328      super.seek(pos);
329    }
330
331  }
332
333  /**
334   * Opens an FSDataInputStream at the indicated Path.
335   * @param f the file name to open
336   * @param bufferSize the size of the buffer to be used.
337   */
338  @Override
339  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
340    FileSystem fs;
341    InputStream in;
342    if (verifyChecksum) {
343      fs = this;
344      in = new ChecksumFSInputChecker(this, f, bufferSize);
345    } else {
346      fs = getRawFileSystem();
347      in = fs.open(f, bufferSize);
348    }
349    return new FSDataBoundedInputStream(fs, f, in);
350  }
351
352  @Override
353  public FSDataOutputStream append(Path f, int bufferSize,
354      Progressable progress) throws IOException {
355    throw new IOException("Not supported");
356  }
357
358  /**
359   * Calculated the length of the checksum file in bytes.
360   * @param size the length of the data file in bytes
361   * @param bytesPerSum the number of bytes in a checksum block
362   * @return the number of bytes in the checksum file
363   */
364  public static long getChecksumLength(long size, int bytesPerSum) {
365    //the checksum length is equal to size passed divided by bytesPerSum +
366    //bytes written in the beginning of the checksum file.  
367    return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
368             CHECKSUM_VERSION.length + 4;  
369  }
370
371  /** This class provides an output stream for a checksummed file.
372   * It generates checksums for data. */
373  private static class ChecksumFSOutputSummer extends FSOutputSummer {
374    private FSDataOutputStream datas;    
375    private FSDataOutputStream sums;
376    private static final float CHKSUM_AS_FRACTION = 0.01f;
377    private boolean isClosed = false;
378    
379    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
380                          Path file, 
381                          boolean overwrite,
382                          int bufferSize,
383                          short replication,
384                          long blockSize,
385                          Progressable progress,
386                          FsPermission permission)
387      throws IOException {
388      super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
389          fs.getBytesPerSum()));
390      int bytesPerSum = fs.getBytesPerSum();
391      this.datas = fs.getRawFileSystem().create(file, permission, overwrite,
392                                         bufferSize, replication, blockSize,
393                                         progress);
394      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
395      this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file),
396                                               permission, true, sumBufferSize,
397                                               replication, blockSize, null);
398      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
399      sums.writeInt(bytesPerSum);
400    }
401    
402    @Override
403    public void close() throws IOException {
404      try {
405        flushBuffer();
406        sums.close();
407        datas.close();
408      } finally {
409        isClosed = true;
410      }
411    }
412    
413    @Override
414    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
415        int ckoff, int cklen)
416    throws IOException {
417      datas.write(b, offset, len);
418      sums.write(checksum, ckoff, cklen);
419    }
420
421    @Override
422    protected void checkClosed() throws IOException {
423      if (isClosed) {
424        throw new ClosedChannelException();
425      }
426    }
427  }
428
429  @Override
430  public FSDataOutputStream create(Path f, FsPermission permission,
431      boolean overwrite, int bufferSize, short replication, long blockSize,
432      Progressable progress) throws IOException {
433    return create(f, permission, overwrite, true, bufferSize,
434        replication, blockSize, progress);
435  }
436
437  private FSDataOutputStream create(Path f, FsPermission permission,
438      boolean overwrite, boolean createParent, int bufferSize,
439      short replication, long blockSize,
440      Progressable progress) throws IOException {
441    Path parent = f.getParent();
442    if (parent != null) {
443      if (!createParent && !exists(parent)) {
444        throw new FileNotFoundException("Parent directory doesn't exist: "
445            + parent);
446      } else if (!mkdirs(parent)) {
447        throw new IOException("Mkdirs failed to create " + parent
448            + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
449            + ")");
450      }
451    }
452    final FSDataOutputStream out;
453    if (writeChecksum) {
454      out = new FSDataOutputStream(
455          new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
456              blockSize, progress, permission), null);
457    } else {
458      out = fs.create(f, permission, overwrite, bufferSize, replication,
459          blockSize, progress);
460      // remove the checksum file since we aren't writing one
461      Path checkFile = getChecksumFile(f);
462      if (fs.exists(checkFile)) {
463        fs.delete(checkFile, true);
464      }
465    }
466    return out;
467  }
468
469  @Override
470  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
471      boolean overwrite, int bufferSize, short replication, long blockSize,
472      Progressable progress) throws IOException {
473    return create(f, permission, overwrite, false, bufferSize, replication,
474        blockSize, progress);
475  }
476
477  abstract class FsOperation {
478    boolean run(Path p) throws IOException {
479      boolean status = apply(p);
480      if (status) {
481        Path checkFile = getChecksumFile(p);
482        if (fs.exists(checkFile)) {
483          apply(checkFile);
484        }
485      }
486      return status;
487    }
488    abstract boolean apply(Path p) throws IOException;
489  }
490
491
492  @Override
493  public void setPermission(Path src, final FsPermission permission)
494      throws IOException {
495    new FsOperation(){
496      @Override
497      boolean apply(Path p) throws IOException {
498        fs.setPermission(p, permission);
499        return true;
500      }
501    }.run(src);
502  }
503
504  @Override
505  public void setOwner(Path src, final String username, final String groupname)
506      throws IOException {
507    new FsOperation(){
508      @Override
509      boolean apply(Path p) throws IOException {
510        fs.setOwner(p, username, groupname);
511        return true;
512      }
513    }.run(src);
514  }
515
516  @Override
517  public void setAcl(Path src, final List<AclEntry> aclSpec)
518      throws IOException {
519    new FsOperation(){
520      @Override
521      boolean apply(Path p) throws IOException {
522        fs.setAcl(p, aclSpec);
523        return true;
524      }
525    }.run(src);
526  }
527
528  @Override
529  public void modifyAclEntries(Path src, final List<AclEntry> aclSpec)
530      throws IOException {
531    new FsOperation(){
532      @Override
533      boolean apply(Path p) throws IOException {
534        fs.modifyAclEntries(p, aclSpec);
535        return true;
536      }
537    }.run(src);
538  }
539
540  @Override
541  public void removeAcl(Path src) throws IOException {
542    new FsOperation(){
543      @Override
544      boolean apply(Path p) throws IOException {
545        fs.removeAcl(p);
546        return true;
547      }
548    }.run(src);
549  }
550
551  @Override
552  public void removeAclEntries(Path src, final List<AclEntry> aclSpec)
553      throws IOException {
554    new FsOperation(){
555      @Override
556      boolean apply(Path p) throws IOException {
557        fs.removeAclEntries(p, aclSpec);
558        return true;
559      }
560    }.run(src);
561  }
562
563  @Override
564  public void removeDefaultAcl(Path src) throws IOException {
565    new FsOperation(){
566      @Override
567      boolean apply(Path p) throws IOException {
568        fs.removeDefaultAcl(p);
569        return true;
570      }
571    }.run(src);
572  }
573
574  /**
575   * Set replication for an existing file.
576   * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
577   * @param src file name
578   * @param replication new replication
579   * @throws IOException
580   * @return true if successful;
581   *         false if file does not exist or is a directory
582   */
583  @Override
584  public boolean setReplication(Path src, final short replication)
585      throws IOException {
586    return new FsOperation(){
587      @Override
588      boolean apply(Path p) throws IOException {
589        return fs.setReplication(p, replication);
590      }
591    }.run(src);
592  }
593
594  /**
595   * Rename files/dirs
596   */
597  @Override
598  public boolean rename(Path src, Path dst) throws IOException {
599    if (fs.isDirectory(src)) {
600      return fs.rename(src, dst);
601    } else {
602      if (fs.isDirectory(dst)) {
603        dst = new Path(dst, src.getName());
604      }
605
606      boolean value = fs.rename(src, dst);
607      if (!value)
608        return false;
609
610      Path srcCheckFile = getChecksumFile(src);
611      Path dstCheckFile = getChecksumFile(dst);
612      if (fs.exists(srcCheckFile)) { //try to rename checksum
613        value = fs.rename(srcCheckFile, dstCheckFile);
614      } else if (fs.exists(dstCheckFile)) {
615        // no src checksum, so remove dst checksum
616        value = fs.delete(dstCheckFile, true); 
617      }
618
619      return value;
620    }
621  }
622
623  /**
624   * Implement the delete(Path, boolean) in checksum
625   * file system.
626   */
627  @Override
628  public boolean delete(Path f, boolean recursive) throws IOException{
629    FileStatus fstatus = null;
630    try {
631      fstatus = fs.getFileStatus(f);
632    } catch(FileNotFoundException e) {
633      return false;
634    }
635    if (fstatus.isDirectory()) {
636      //this works since the crcs are in the same
637      //directories and the files. so we just delete
638      //everything in the underlying filesystem
639      return fs.delete(f, recursive);
640    } else {
641      Path checkFile = getChecksumFile(f);
642      if (fs.exists(checkFile)) {
643        fs.delete(checkFile, true);
644      }
645      return fs.delete(f, true);
646    }
647  }
648    
649  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
650    @Override
651    public boolean accept(Path file) {
652      return !isChecksumFile(file);
653    }
654  };
655
656  /**
657   * List the statuses of the files/directories in the given path if the path is
658   * a directory.
659   * 
660   * @param f
661   *          given path
662   * @return the statuses of the files/directories in the given path
663   * @throws IOException
664   */
665  @Override
666  public FileStatus[] listStatus(Path f) throws IOException {
667    return fs.listStatus(f, DEFAULT_FILTER);
668  }
669  
670  /**
671   * List the statuses of the files/directories in the given path if the path is
672   * a directory.
673   * 
674   * @param f
675   *          given path
676   * @return the statuses of the files/directories in the given patch
677   * @throws IOException
678   */
679  @Override
680  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
681  throws IOException {
682    return fs.listLocatedStatus(f, DEFAULT_FILTER);
683  }
684  
685  @Override
686  public boolean mkdirs(Path f) throws IOException {
687    return fs.mkdirs(f);
688  }
689
690  @Override
691  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
692    throws IOException {
693    Configuration conf = getConf();
694    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
695  }
696
697  /**
698   * The src file is under FS, and the dst is on the local disk.
699   * Copy it from FS control to the local dst name.
700   */
701  @Override
702  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
703    throws IOException {
704    Configuration conf = getConf();
705    FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
706  }
707
708  /**
709   * The src file is under FS, and the dst is on the local disk.
710   * Copy it from FS control to the local dst name.
711   * If src and dst are directories, the copyCrc parameter
712   * determines whether to copy CRC files.
713   */
714  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
715    throws IOException {
716    if (!fs.isDirectory(src)) { // source is a file
717      fs.copyToLocalFile(src, dst);
718      FileSystem localFs = getLocal(getConf()).getRawFileSystem();
719      if (localFs.isDirectory(dst)) {
720        dst = new Path(dst, src.getName());
721      }
722      dst = getChecksumFile(dst);
723      if (localFs.exists(dst)) { //remove old local checksum file
724        localFs.delete(dst, true);
725      }
726      Path checksumFile = getChecksumFile(src);
727      if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
728        fs.copyToLocalFile(checksumFile, dst);
729      }
730    } else {
731      FileStatus[] srcs = listStatus(src);
732      for (FileStatus srcFile : srcs) {
733        copyToLocalFile(srcFile.getPath(), 
734                        new Path(dst, srcFile.getPath().getName()), copyCrc);
735      }
736    }
737  }
738
739  @Override
740  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
741    throws IOException {
742    return tmpLocalFile;
743  }
744
745  @Override
746  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
747    throws IOException {
748    moveFromLocalFile(tmpLocalFile, fsOutputFile);
749  }
750
751  /**
752   * Report a checksum error to the file system.
753   * @param f the file name containing the error
754   * @param in the stream open on the file
755   * @param inPos the position of the beginning of the bad data in the file
756   * @param sums the stream open on the checksum file
757   * @param sumsPos the position of the beginning of the bad data in the checksum file
758   * @return if retry is neccessary
759   */
760  public boolean reportChecksumFailure(Path f, FSDataInputStream in,
761                                       long inPos, FSDataInputStream sums, long sumsPos) {
762    return false;
763  }
764}