001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.WeakReference;
024import java.lang.ref.ReferenceQueue;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.IdentityHashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.NoSuchElementException;
038import java.util.ServiceConfigurationError;
039import java.util.ServiceLoader;
040import java.util.Set;
041import java.util.Stack;
042import java.util.TreeSet;
043import java.util.concurrent.atomic.AtomicLong;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.apache.hadoop.classification.InterfaceAudience;
048import org.apache.hadoop.classification.InterfaceStability;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
052import org.apache.hadoop.fs.Options.ChecksumOpt;
053import org.apache.hadoop.fs.Options.Rename;
054import org.apache.hadoop.fs.permission.AclEntry;
055import org.apache.hadoop.fs.permission.AclStatus;
056import org.apache.hadoop.fs.permission.FsAction;
057import org.apache.hadoop.fs.permission.FsCreateModes;
058import org.apache.hadoop.fs.permission.FsPermission;
059import org.apache.hadoop.io.MultipleIOException;
060import org.apache.hadoop.io.Text;
061import org.apache.hadoop.net.NetUtils;
062import org.apache.hadoop.security.AccessControlException;
063import org.apache.hadoop.security.Credentials;
064import org.apache.hadoop.security.SecurityUtil;
065import org.apache.hadoop.security.UserGroupInformation;
066import org.apache.hadoop.security.token.Token;
067import org.apache.hadoop.util.ClassUtil;
068import org.apache.hadoop.util.DataChecksum;
069import org.apache.hadoop.util.Progressable;
070import org.apache.hadoop.util.ReflectionUtils;
071import org.apache.hadoop.util.ShutdownHookManager;
072import org.apache.htrace.core.Tracer;
073import org.apache.htrace.core.TraceScope;
074
075import com.google.common.base.Preconditions;
076import com.google.common.annotations.VisibleForTesting;
077
078/****************************************************************
079 * An abstract base class for a fairly generic filesystem.  It
080 * may be implemented as a distributed filesystem, or as a "local"
081 * one that reflects the locally-connected disk.  The local version
082 * exists for small Hadoop instances and for testing.
083 *
084 * <p>
085 *
086 * All user code that may potentially use the Hadoop Distributed
087 * File System should be written to use a FileSystem object.  The
088 * Hadoop DFS is a multi-machine system that appears as a single
089 * disk.  It's useful because of its fault tolerance and potentially
090 * very large capacity.
091 * 
092 * <p>
093 * The local implementation is {@link LocalFileSystem} and distributed
094 * implementation is DistributedFileSystem.
095 *****************************************************************/
096@InterfaceAudience.Public
097@InterfaceStability.Stable
098public abstract class FileSystem extends Configured implements Closeable {
099  public static final String FS_DEFAULT_NAME_KEY = 
100                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
101  public static final String DEFAULT_FS = 
102                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
103
104  public static final Log LOG = LogFactory.getLog(FileSystem.class);
105
106  /**
107   * Priority of the FileSystem shutdown hook.
108   */
109  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
110
111  public static final String TRASH_PREFIX = ".Trash";
112  public static final String USER_HOME_PREFIX = "/user";
113
114  /** FileSystem cache */
115  static final Cache CACHE = new Cache();
116
117  /** The key this instance is stored under in the cache. */
118  private Cache.Key key;
119
120  /** Recording statistics per a FileSystem class */
121  private static final Map<Class<? extends FileSystem>, Statistics> 
122    statisticsTable =
123      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
124  
125  /**
126   * The statistics for this file system.
127   */
128  protected Statistics statistics;
129
130  /**
131   * A cache of files that should be deleted when filsystem is closed
132   * or the JVM is exited.
133   */
134  private Set<Path> deleteOnExit = new TreeSet<Path>();
135  
136  boolean resolveSymlinks;
137
138  private Tracer tracer;
139
140  protected final Tracer getTracer() {
141    return tracer;
142  }
143
144  /**
145   * This method adds a file system for testing so that we can find it later. It
146   * is only for testing.
147   * @param uri the uri to store it under
148   * @param conf the configuration to store it under
149   * @param fs the file system to store
150   * @throws IOException
151   */
152  static void addFileSystemForTesting(URI uri, Configuration conf,
153      FileSystem fs) throws IOException {
154    CACHE.map.put(new Cache.Key(uri, conf), fs);
155  }
156
157  /**
158   * Get a filesystem instance based on the uri, the passed
159   * configuration and the user
160   * @param uri of the filesystem
161   * @param conf the configuration to use
162   * @param user to perform the get as
163   * @return the filesystem instance
164   * @throws IOException
165   * @throws InterruptedException
166   */
167  public static FileSystem get(final URI uri, final Configuration conf,
168        final String user) throws IOException, InterruptedException {
169    String ticketCachePath =
170      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
171    UserGroupInformation ugi =
172        UserGroupInformation.getBestUGI(ticketCachePath, user);
173    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
174      @Override
175      public FileSystem run() throws IOException {
176        return get(uri, conf);
177      }
178    });
179  }
180
181  /**
182   * Returns the configured filesystem implementation.
183   * @param conf the configuration to use
184   */
185  public static FileSystem get(Configuration conf) throws IOException {
186    return get(getDefaultUri(conf), conf);
187  }
188  
189  /** Get the default filesystem URI from a configuration.
190   * @param conf the configuration to use
191   * @return the uri of the default filesystem
192   */
193  public static URI getDefaultUri(Configuration conf) {
194    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
195  }
196
197  /** Set the default filesystem URI in a configuration.
198   * @param conf the configuration to alter
199   * @param uri the new default filesystem uri
200   */
201  public static void setDefaultUri(Configuration conf, URI uri) {
202    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
203  }
204
205  /** Set the default filesystem URI in a configuration.
206   * @param conf the configuration to alter
207   * @param uri the new default filesystem uri
208   */
209  public static void setDefaultUri(Configuration conf, String uri) {
210    setDefaultUri(conf, URI.create(fixName(uri)));
211  }
212
213  /** Called after a new FileSystem instance is constructed.
214   * @param name a uri whose authority section names the host, port, etc.
215   *   for this FileSystem
216   * @param conf the configuration
217   */
218  public void initialize(URI name, Configuration conf) throws IOException {
219    statistics = getStatistics(name.getScheme(), getClass());    
220    resolveSymlinks = conf.getBoolean(
221        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
222        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
223  }
224
225  /**
226   * Return the protocol scheme for the FileSystem.
227   * <p/>
228   * This implementation throws an <code>UnsupportedOperationException</code>.
229   *
230   * @return the protocol scheme for the FileSystem.
231   */
232  public String getScheme() {
233    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
234  }
235
236  /** Returns a URI whose scheme and authority identify this FileSystem.*/
237  public abstract URI getUri();
238  
239  /**
240   * Return a canonicalized form of this FileSystem's URI.
241   * 
242   * The default implementation simply calls {@link #canonicalizeUri(URI)}
243   * on the filesystem's own URI, so subclasses typically only need to
244   * implement that method.
245   *
246   * @see #canonicalizeUri(URI)
247   */
248  protected URI getCanonicalUri() {
249    return canonicalizeUri(getUri());
250  }
251  
252  /**
253   * Canonicalize the given URI.
254   * 
255   * This is filesystem-dependent, but may for example consist of
256   * canonicalizing the hostname using DNS and adding the default
257   * port if not specified.
258   * 
259   * The default implementation simply fills in the default port if
260   * not specified and if the filesystem has a default port.
261   *
262   * @return URI
263   * @see NetUtils#getCanonicalUri(URI, int)
264   */
265  protected URI canonicalizeUri(URI uri) {
266    if (uri.getPort() == -1 && getDefaultPort() > 0) {
267      // reconstruct the uri with the default port set
268      try {
269        uri = new URI(uri.getScheme(), uri.getUserInfo(),
270            uri.getHost(), getDefaultPort(),
271            uri.getPath(), uri.getQuery(), uri.getFragment());
272      } catch (URISyntaxException e) {
273        // Should never happen!
274        throw new AssertionError("Valid URI became unparseable: " +
275            uri);
276      }
277    }
278    
279    return uri;
280  }
281  
282  /**
283   * Get the default port for this file system.
284   * @return the default port or 0 if there isn't one
285   */
286  protected int getDefaultPort() {
287    return 0;
288  }
289
290  protected static FileSystem getFSofPath(final Path absOrFqPath,
291      final Configuration conf)
292      throws UnsupportedFileSystemException, IOException {
293    absOrFqPath.checkNotSchemeWithRelative();
294    absOrFqPath.checkNotRelative();
295
296    // Uses the default file system if not fully qualified
297    return get(absOrFqPath.toUri(), conf);
298  }
299
300  /**
301   * Get a canonical service name for this file system.  The token cache is
302   * the only user of the canonical service name, and uses it to lookup this
303   * filesystem's service tokens.
304   * If file system provides a token of its own then it must have a canonical
305   * name, otherwise canonical name can be null.
306   * 
307   * Default Impl: If the file system has child file systems 
308   * (such as an embedded file system) then it is assumed that the fs has no
309   * tokens of its own and hence returns a null name; otherwise a service
310   * name is built using Uri and port.
311   * 
312   * @return a service string that uniquely identifies this file system, null
313   *         if the filesystem does not implement tokens
314   * @see SecurityUtil#buildDTServiceName(URI, int) 
315   */
316  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
317  public String getCanonicalServiceName() {
318    return (getChildFileSystems() == null)
319      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
320      : null;
321  }
322
323  /** @deprecated call #getUri() instead.*/
324  @Deprecated
325  public String getName() { return getUri().toString(); }
326
327  /** @deprecated call #get(URI,Configuration) instead. */
328  @Deprecated
329  public static FileSystem getNamed(String name, Configuration conf)
330    throws IOException {
331    return get(URI.create(fixName(name)), conf);
332  }
333  
334  /** Update old-format filesystem names, for back-compatibility.  This should
335   * eventually be replaced with a checkName() method that throws an exception
336   * for old-format names. */ 
337  private static String fixName(String name) {
338    // convert old-format name to new-format name
339    if (name.equals("local")) {         // "local" is now "file:///".
340      LOG.warn("\"local\" is a deprecated filesystem name."
341               +" Use \"file:///\" instead.");
342      name = "file:///";
343    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
344      LOG.warn("\""+name+"\" is a deprecated filesystem name."
345               +" Use \"hdfs://"+name+"/\" instead.");
346      name = "hdfs://"+name;
347    }
348    return name;
349  }
350
351  /**
352   * Get the local file system.
353   * @param conf the configuration to configure the file system with
354   * @return a LocalFileSystem
355   */
356  public static LocalFileSystem getLocal(Configuration conf)
357    throws IOException {
358    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
359  }
360
361  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
362   * of the URI determines a configuration property name,
363   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
364   * The entire URI is passed to the FileSystem instance's initialize method.
365   */
366  public static FileSystem get(URI uri, Configuration conf) throws IOException {
367    String scheme = uri.getScheme();
368    String authority = uri.getAuthority();
369
370    if (scheme == null && authority == null) {     // use default FS
371      return get(conf);
372    }
373
374    if (scheme != null && authority == null) {     // no authority
375      URI defaultUri = getDefaultUri(conf);
376      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
377          && defaultUri.getAuthority() != null) {  // & default has authority
378        return get(defaultUri, conf);              // return default
379      }
380    }
381    
382    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
383    if (conf.getBoolean(disableCacheName, false)) {
384      return createFileSystem(uri, conf);
385    }
386
387    return CACHE.get(uri, conf);
388  }
389
390  /**
391   * Returns the FileSystem for this URI's scheme and authority and the 
392   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
393   * @param uri of the filesystem
394   * @param conf the configuration to use
395   * @param user to perform the get as
396   * @return filesystem instance
397   * @throws IOException
398   * @throws InterruptedException
399   */
400  public static FileSystem newInstance(final URI uri, final Configuration conf,
401      final String user) throws IOException, InterruptedException {
402    String ticketCachePath =
403      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
404    UserGroupInformation ugi =
405        UserGroupInformation.getBestUGI(ticketCachePath, user);
406    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
407      @Override
408      public FileSystem run() throws IOException {
409        return newInstance(uri,conf); 
410      }
411    });
412  }
413  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
414   * of the URI determines a configuration property name,
415   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
416   * The entire URI is passed to the FileSystem instance's initialize method.
417   * This always returns a new FileSystem object.
418   */
419  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
420    String scheme = uri.getScheme();
421    String authority = uri.getAuthority();
422
423    if (scheme == null) {                       // no scheme: use default FS
424      return newInstance(conf);
425    }
426
427    if (authority == null) {                       // no authority
428      URI defaultUri = getDefaultUri(conf);
429      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
430          && defaultUri.getAuthority() != null) {  // & default has authority
431        return newInstance(defaultUri, conf);              // return default
432      }
433    }
434    return CACHE.getUnique(uri, conf);
435  }
436
437  /** Returns a unique configured filesystem implementation.
438   * This always returns a new FileSystem object.
439   * @param conf the configuration to use
440   */
441  public static FileSystem newInstance(Configuration conf) throws IOException {
442    return newInstance(getDefaultUri(conf), conf);
443  }
444
445  /**
446   * Get a unique local file system object
447   * @param conf the configuration to configure the file system with
448   * @return a LocalFileSystem
449   * This always returns a new FileSystem object.
450   */
451  public static LocalFileSystem newInstanceLocal(Configuration conf)
452    throws IOException {
453    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
454  }
455
456  /**
457   * Close all cached filesystems. Be sure those filesystems are not
458   * used anymore.
459   * 
460   * @throws IOException
461   */
462  public static void closeAll() throws IOException {
463    CACHE.closeAll();
464  }
465
466  /**
467   * Close all cached filesystems for a given UGI. Be sure those filesystems 
468   * are not used anymore.
469   * @param ugi user group info to close
470   * @throws IOException
471   */
472  public static void closeAllForUGI(UserGroupInformation ugi) 
473  throws IOException {
474    CACHE.closeAll(ugi);
475  }
476
477  /** 
478   * Make sure that a path specifies a FileSystem.
479   * @param path to use
480   */
481  public Path makeQualified(Path path) {
482    checkPath(path);
483    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
484  }
485    
486  /**
487   * Get a new delegation token for this file system.
488   * This is an internal method that should have been declared protected
489   * but wasn't historically.
490   * Callers should use {@link #addDelegationTokens(String, Credentials)}
491   * 
492   * @param renewer the account name that is allowed to renew the token.
493   * @return a new delegation token
494   * @throws IOException
495   */
496  @InterfaceAudience.Private()
497  public Token<?> getDelegationToken(String renewer) throws IOException {
498    return null;
499  }
500  
501  /**
502   * Obtain all delegation tokens used by this FileSystem that are not
503   * already present in the given Credentials.  Existing tokens will neither
504   * be verified as valid nor having the given renewer.  Missing tokens will
505   * be acquired and added to the given Credentials.
506   * 
507   * Default Impl: works for simple fs with its own token
508   * and also for an embedded fs whose tokens are those of its
509   * children file system (i.e. the embedded fs has not tokens of its
510   * own).
511   * 
512   * @param renewer the user allowed to renew the delegation tokens
513   * @param credentials cache in which to add new delegation tokens
514   * @return list of new delegation tokens
515   * @throws IOException
516   */
517  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
518  public Token<?>[] addDelegationTokens(
519      final String renewer, Credentials credentials) throws IOException {
520    if (credentials == null) {
521      credentials = new Credentials();
522    }
523    final List<Token<?>> tokens = new ArrayList<Token<?>>();
524    collectDelegationTokens(renewer, credentials, tokens);
525    return tokens.toArray(new Token<?>[tokens.size()]);
526  }
527  
528  /**
529   * Recursively obtain the tokens for this FileSystem and all descended
530   * FileSystems as determined by getChildFileSystems().
531   * @param renewer the user allowed to renew the delegation tokens
532   * @param credentials cache in which to add the new delegation tokens
533   * @param tokens list in which to add acquired tokens
534   * @throws IOException
535   */
536  private void collectDelegationTokens(final String renewer,
537                                       final Credentials credentials,
538                                       final List<Token<?>> tokens)
539                                           throws IOException {
540    final String serviceName = getCanonicalServiceName();
541    // Collect token of the this filesystem and then of its embedded children
542    if (serviceName != null) { // fs has token, grab it
543      final Text service = new Text(serviceName);
544      Token<?> token = credentials.getToken(service);
545      if (token == null) {
546        token = getDelegationToken(renewer);
547        if (token != null) {
548          tokens.add(token);
549          credentials.addToken(service, token);
550        }
551      }
552    }
553    // Now collect the tokens from the children
554    final FileSystem[] children = getChildFileSystems();
555    if (children != null) {
556      for (final FileSystem fs : children) {
557        fs.collectDelegationTokens(renewer, credentials, tokens);
558      }
559    }
560  }
561
562  /**
563   * Get all the immediate child FileSystems embedded in this FileSystem.
564   * It does not recurse and get grand children.  If a FileSystem
565   * has multiple child FileSystems, then it should return a unique list
566   * of those FileSystems.  Default is to return null to signify no children.
567   * 
568   * @return FileSystems used by this FileSystem
569   */
570  @InterfaceAudience.LimitedPrivate({ "HDFS" })
571  @VisibleForTesting
572  public FileSystem[] getChildFileSystems() {
573    return null;
574  }
575  
576  /** create a file with the provided permission
577   * The permission of the file is set to be the provided permission as in
578   * setPermission, not permission&~umask
579   * 
580   * It is implemented using two RPCs. It is understood that it is inefficient,
581   * but the implementation is thread-safe. The other option is to change the
582   * value of umask in configuration to be 0, but it is not thread-safe.
583   * 
584   * @param fs file system handle
585   * @param file the name of the file to be created
586   * @param permission the permission of the file
587   * @return an output stream
588   * @throws IOException
589   */
590  public static FSDataOutputStream create(FileSystem fs,
591      Path file, FsPermission permission) throws IOException {
592    // create the file with default permission
593    FSDataOutputStream out = fs.create(file);
594    // set its permission to the supplied one
595    fs.setPermission(file, permission);
596    return out;
597  }
598
599  /** create a directory with the provided permission
600   * The permission of the directory is set to be the provided permission as in
601   * setPermission, not permission&~umask
602   * 
603   * @see #create(FileSystem, Path, FsPermission)
604   * 
605   * @param fs file system handle
606   * @param dir the name of the directory to be created
607   * @param permission the permission of the directory
608   * @return true if the directory creation succeeds; false otherwise
609   * @throws IOException
610   */
611  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
612  throws IOException {
613    // create the directory using the default permission
614    boolean result = fs.mkdirs(dir);
615    // set its permission to be the supplied one
616    fs.setPermission(dir, permission);
617    return result;
618  }
619
620  ///////////////////////////////////////////////////////////////
621  // FileSystem
622  ///////////////////////////////////////////////////////////////
623
624  protected FileSystem() {
625    super(null);
626  }
627
628  /** 
629   * Check that a Path belongs to this FileSystem.
630   * @param path to check
631   */
632  protected void checkPath(Path path) {
633    URI uri = path.toUri();
634    String thatScheme = uri.getScheme();
635    if (thatScheme == null)                // fs is relative
636      return;
637    URI thisUri = getCanonicalUri();
638    String thisScheme = thisUri.getScheme();
639    //authority and scheme are not case sensitive
640    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
641      String thisAuthority = thisUri.getAuthority();
642      String thatAuthority = uri.getAuthority();
643      if (thatAuthority == null &&                // path's authority is null
644          thisAuthority != null) {                // fs has an authority
645        URI defaultUri = getDefaultUri(getConf());
646        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
647          uri = defaultUri; // schemes match, so use this uri instead
648        } else {
649          uri = null; // can't determine auth of the path
650        }
651      }
652      if (uri != null) {
653        // canonicalize uri before comparing with this fs
654        uri = canonicalizeUri(uri);
655        thatAuthority = uri.getAuthority();
656        if (thisAuthority == thatAuthority ||       // authorities match
657            (thisAuthority != null &&
658             thisAuthority.equalsIgnoreCase(thatAuthority)))
659          return;
660      }
661    }
662    throw new IllegalArgumentException("Wrong FS: "+path+
663                                       ", expected: "+this.getUri());
664  }
665
666  /**
667   * Return an array containing hostnames, offset and size of 
668   * portions of the given file.  For a nonexistent 
669   * file or regions, null will be returned.
670   *
671   * This call is most helpful with DFS, where it returns 
672   * hostnames of machines that contain the given file.
673   *
674   * The FileSystem will simply return an elt containing 'localhost'.
675   *
676   * @param file FilesStatus to get data from
677   * @param start offset into the given file
678   * @param len length for which to get locations for
679   */
680  public BlockLocation[] getFileBlockLocations(FileStatus file, 
681      long start, long len) throws IOException {
682    if (file == null) {
683      return null;
684    }
685
686    if (start < 0 || len < 0) {
687      throw new IllegalArgumentException("Invalid start or len parameter");
688    }
689
690    if (file.getLen() <= start) {
691      return new BlockLocation[0];
692
693    }
694    String[] name = { "localhost:50010" };
695    String[] host = { "localhost" };
696    return new BlockLocation[] {
697      new BlockLocation(name, host, 0, file.getLen()) };
698  }
699 
700
701  /**
702   * Return an array containing hostnames, offset and size of 
703   * portions of the given file.  For a nonexistent 
704   * file or regions, null will be returned.
705   *
706   * This call is most helpful with DFS, where it returns 
707   * hostnames of machines that contain the given file.
708   *
709   * The FileSystem will simply return an elt containing 'localhost'.
710   *
711   * @param p path is used to identify an FS since an FS could have
712   *          another FS that it could be delegating the call to
713   * @param start offset into the given file
714   * @param len length for which to get locations for
715   */
716  public BlockLocation[] getFileBlockLocations(Path p, 
717      long start, long len) throws IOException {
718    if (p == null) {
719      throw new NullPointerException();
720    }
721    FileStatus file = getFileStatus(p);
722    return getFileBlockLocations(file, start, len);
723  }
724  
725  /**
726   * Return a set of server default configuration values
727   * @return server default configuration values
728   * @throws IOException
729   * @deprecated use {@link #getServerDefaults(Path)} instead
730   */
731  @Deprecated
732  public FsServerDefaults getServerDefaults() throws IOException {
733    Configuration conf = getConf();
734    // CRC32 is chosen as default as it is available in all 
735    // releases that support checksum.
736    // The client trash configuration is ignored.
737    return new FsServerDefaults(getDefaultBlockSize(), 
738        conf.getInt("io.bytes.per.checksum", 512), 
739        64 * 1024, 
740        getDefaultReplication(),
741        conf.getInt("io.file.buffer.size", 4096),
742        false,
743        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
744        DataChecksum.Type.CRC32,
745        "");
746  }
747
748  /**
749   * Return a set of server default configuration values
750   * @param p path is used to identify an FS since an FS could have
751   *          another FS that it could be delegating the call to
752   * @return server default configuration values
753   * @throws IOException
754   */
755  public FsServerDefaults getServerDefaults(Path p) throws IOException {
756    return getServerDefaults();
757  }
758
759  /**
760   * Return the fully-qualified path of path f resolving the path
761   * through any symlinks or mount point
762   * @param p path to be resolved
763   * @return fully qualified path 
764   * @throws FileNotFoundException
765   */
766   public Path resolvePath(final Path p) throws IOException {
767     checkPath(p);
768     return getFileStatus(p).getPath();
769   }
770
771  /**
772   * Opens an FSDataInputStream at the indicated Path.
773   * @param f the file name to open
774   * @param bufferSize the size of the buffer to be used.
775   */
776  public abstract FSDataInputStream open(Path f, int bufferSize)
777    throws IOException;
778    
779  /**
780   * Opens an FSDataInputStream at the indicated Path.
781   * @param f the file to open
782   */
783  public FSDataInputStream open(Path f) throws IOException {
784    return open(f, getConf().getInt("io.file.buffer.size", 4096));
785  }
786
787  /**
788   * Create an FSDataOutputStream at the indicated Path.
789   * Files are overwritten by default.
790   * @param f the file to create
791   */
792  public FSDataOutputStream create(Path f) throws IOException {
793    return create(f, true);
794  }
795
796  /**
797   * Create an FSDataOutputStream at the indicated Path.
798   * @param f the file to create
799   * @param overwrite if a file with this name already exists, then if true,
800   *   the file will be overwritten, and if false an exception will be thrown.
801   */
802  public FSDataOutputStream create(Path f, boolean overwrite)
803      throws IOException {
804    return create(f, overwrite, 
805                  getConf().getInt("io.file.buffer.size", 4096),
806                  getDefaultReplication(f),
807                  getDefaultBlockSize(f));
808  }
809
810  /**
811   * Create an FSDataOutputStream at the indicated Path with write-progress
812   * reporting.
813   * Files are overwritten by default.
814   * @param f the file to create
815   * @param progress to report progress
816   */
817  public FSDataOutputStream create(Path f, Progressable progress) 
818      throws IOException {
819    return create(f, true, 
820                  getConf().getInt("io.file.buffer.size", 4096),
821                  getDefaultReplication(f),
822                  getDefaultBlockSize(f), progress);
823  }
824
825  /**
826   * Create an FSDataOutputStream at the indicated Path.
827   * Files are overwritten by default.
828   * @param f the file to create
829   * @param replication the replication factor
830   */
831  public FSDataOutputStream create(Path f, short replication)
832      throws IOException {
833    return create(f, true, 
834                  getConf().getInt("io.file.buffer.size", 4096),
835                  replication,
836                  getDefaultBlockSize(f));
837  }
838
839  /**
840   * Create an FSDataOutputStream at the indicated Path with write-progress
841   * reporting.
842   * Files are overwritten by default.
843   * @param f the file to create
844   * @param replication the replication factor
845   * @param progress to report progress
846   */
847  public FSDataOutputStream create(Path f, short replication, 
848      Progressable progress) throws IOException {
849    return create(f, true, 
850                  getConf().getInt(
851                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
852                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
853                  replication,
854                  getDefaultBlockSize(f), progress);
855  }
856
857    
858  /**
859   * Create an FSDataOutputStream at the indicated Path.
860   * @param f the file name to create
861   * @param overwrite if a file with this name already exists, then if true,
862   *   the file will be overwritten, and if false an error will be thrown.
863   * @param bufferSize the size of the buffer to be used.
864   */
865  public FSDataOutputStream create(Path f, 
866                                   boolean overwrite,
867                                   int bufferSize
868                                   ) throws IOException {
869    return create(f, overwrite, bufferSize, 
870                  getDefaultReplication(f),
871                  getDefaultBlockSize(f));
872  }
873    
874  /**
875   * Create an FSDataOutputStream at the indicated Path with write-progress
876   * reporting.
877   * @param f the path of the file to open
878   * @param overwrite if a file with this name already exists, then if true,
879   *   the file will be overwritten, and if false an error will be thrown.
880   * @param bufferSize the size of the buffer to be used.
881   */
882  public FSDataOutputStream create(Path f, 
883                                   boolean overwrite,
884                                   int bufferSize,
885                                   Progressable progress
886                                   ) throws IOException {
887    return create(f, overwrite, bufferSize, 
888                  getDefaultReplication(f),
889                  getDefaultBlockSize(f), progress);
890  }
891    
892    
893  /**
894   * Create an FSDataOutputStream at the indicated Path.
895   * @param f the file name to open
896   * @param overwrite if a file with this name already exists, then if true,
897   *   the file will be overwritten, and if false an error will be thrown.
898   * @param bufferSize the size of the buffer to be used.
899   * @param replication required block replication for the file. 
900   */
901  public FSDataOutputStream create(Path f, 
902                                   boolean overwrite,
903                                   int bufferSize,
904                                   short replication,
905                                   long blockSize
906                                   ) throws IOException {
907    return create(f, overwrite, bufferSize, replication, blockSize, null);
908  }
909
910  /**
911   * Create an FSDataOutputStream at the indicated Path with write-progress
912   * reporting.
913   * @param f the file name to open
914   * @param overwrite if a file with this name already exists, then if true,
915   *   the file will be overwritten, and if false an error will be thrown.
916   * @param bufferSize the size of the buffer to be used.
917   * @param replication required block replication for the file. 
918   */
919  public FSDataOutputStream create(Path f,
920                                            boolean overwrite,
921                                            int bufferSize,
922                                            short replication,
923                                            long blockSize,
924                                            Progressable progress
925                                            ) throws IOException {
926    return this.create(f, FsCreateModes.applyUMask(
927        FsPermission.getFileDefault(), FsPermission.getUMask(getConf())),
928        overwrite, bufferSize, replication, blockSize, progress);
929  }
930
931  /**
932   * Create an FSDataOutputStream at the indicated Path with write-progress
933   * reporting.
934   * @param f the file name to open
935   * @param permission
936   * @param overwrite if a file with this name already exists, then if true,
937   *   the file will be overwritten, and if false an error will be thrown.
938   * @param bufferSize the size of the buffer to be used.
939   * @param replication required block replication for the file.
940   * @param blockSize
941   * @param progress
942   * @throws IOException
943   * @see #setPermission(Path, FsPermission)
944   */
945  public abstract FSDataOutputStream create(Path f,
946      FsPermission permission,
947      boolean overwrite,
948      int bufferSize,
949      short replication,
950      long blockSize,
951      Progressable progress) throws IOException;
952  
953  /**
954   * Create an FSDataOutputStream at the indicated Path with write-progress
955   * reporting.
956   * @param f the file name to open
957   * @param permission
958   * @param flags {@link CreateFlag}s to use for this stream.
959   * @param bufferSize the size of the buffer to be used.
960   * @param replication required block replication for the file.
961   * @param blockSize
962   * @param progress
963   * @throws IOException
964   * @see #setPermission(Path, FsPermission)
965   */
966  public FSDataOutputStream create(Path f,
967      FsPermission permission,
968      EnumSet<CreateFlag> flags,
969      int bufferSize,
970      short replication,
971      long blockSize,
972      Progressable progress) throws IOException {
973    return create(f, permission, flags, bufferSize, replication,
974        blockSize, progress, null);
975  }
976  
977  /**
978   * Create an FSDataOutputStream at the indicated Path with a custom
979   * checksum option
980   * @param f the file name to open
981   * @param permission
982   * @param flags {@link CreateFlag}s to use for this stream.
983   * @param bufferSize the size of the buffer to be used.
984   * @param replication required block replication for the file.
985   * @param blockSize
986   * @param progress
987   * @param checksumOpt checksum parameter. If null, the values
988   *        found in conf will be used.
989   * @throws IOException
990   * @see #setPermission(Path, FsPermission)
991   */
992  public FSDataOutputStream create(Path f,
993      FsPermission permission,
994      EnumSet<CreateFlag> flags,
995      int bufferSize,
996      short replication,
997      long blockSize,
998      Progressable progress,
999      ChecksumOpt checksumOpt) throws IOException {
1000    // Checksum options are ignored by default. The file systems that
1001    // implement checksum need to override this method. The full
1002    // support is currently only available in DFS.
1003    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
1004        bufferSize, replication, blockSize, progress);
1005  }
1006
1007  /*.
1008   * This create has been added to support the FileContext that processes
1009   * the permission
1010   * with umask before calling this method.
1011   * This a temporary method added to support the transition from FileSystem
1012   * to FileContext for user applications.
1013   */
1014  @Deprecated
1015  protected FSDataOutputStream primitiveCreate(Path f,
1016     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
1017     short replication, long blockSize, Progressable progress,
1018     ChecksumOpt checksumOpt) throws IOException {
1019
1020    boolean pathExists = exists(f);
1021    CreateFlag.validate(f, pathExists, flag);
1022    
1023    // Default impl  assumes that permissions do not matter and 
1024    // nor does the bytesPerChecksum  hence
1025    // calling the regular create is good enough.
1026    // FSs that implement permissions should override this.
1027
1028    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1029      return append(f, bufferSize, progress);
1030    }
1031    
1032    return this.create(f, absolutePermission,
1033        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1034        blockSize, progress);
1035  }
1036  
1037  /**
1038   * This version of the mkdirs method assumes that the permission is absolute.
1039   * It has been added to support the FileContext that processes the permission
1040   * with umask before calling this method.
1041   * This a temporary method added to support the transition from FileSystem
1042   * to FileContext for user applications.
1043   */
1044  @Deprecated
1045  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1046    throws IOException {
1047    // Default impl is to assume that permissions do not matter and hence
1048    // calling the regular mkdirs is good enough.
1049    // FSs that implement permissions should override this.
1050   return this.mkdirs(f, absolutePermission);
1051  }
1052
1053
1054  /**
1055   * This version of the mkdirs method assumes that the permission is absolute.
1056   * It has been added to support the FileContext that processes the permission
1057   * with umask before calling this method.
1058   * This a temporary method added to support the transition from FileSystem
1059   * to FileContext for user applications.
1060   */
1061  @Deprecated
1062  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1063                    boolean createParent)
1064    throws IOException {
1065    
1066    if (!createParent) { // parent must exist.
1067      // since the this.mkdirs makes parent dirs automatically
1068      // we must throw exception if parent does not exist.
1069      final FileStatus stat = getFileStatus(f.getParent());
1070      if (stat == null) {
1071        throw new FileNotFoundException("Missing parent:" + f);
1072      }
1073      if (!stat.isDirectory()) {
1074        throw new ParentNotDirectoryException("parent is not a dir");
1075      }
1076      // parent does exist - go ahead with mkdir of leaf
1077    }
1078    // Default impl is to assume that permissions do not matter and hence
1079    // calling the regular mkdirs is good enough.
1080    // FSs that implement permissions should override this.
1081    if (!this.mkdirs(f, absolutePermission)) {
1082      throw new IOException("mkdir of "+ f + " failed");
1083    }
1084  }
1085
1086  /**
1087   * Opens an FSDataOutputStream at the indicated Path with write-progress
1088   * reporting. Same as create(), except fails if parent directory doesn't
1089   * already exist.
1090   * @param f the file name to open
1091   * @param overwrite if a file with this name already exists, then if true,
1092   * the file will be overwritten, and if false an error will be thrown.
1093   * @param bufferSize the size of the buffer to be used.
1094   * @param replication required block replication for the file.
1095   * @param blockSize
1096   * @param progress
1097   * @throws IOException
1098   * @see #setPermission(Path, FsPermission)
1099   * @deprecated API only for 0.20-append
1100   */
1101  @Deprecated
1102  public FSDataOutputStream createNonRecursive(Path f,
1103      boolean overwrite,
1104      int bufferSize, short replication, long blockSize,
1105      Progressable progress) throws IOException {
1106    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1107        overwrite, bufferSize, replication, blockSize, progress);
1108  }
1109
1110  /**
1111   * Opens an FSDataOutputStream at the indicated Path with write-progress
1112   * reporting. Same as create(), except fails if parent directory doesn't
1113   * already exist.
1114   * @param f the file name to open
1115   * @param permission
1116   * @param overwrite if a file with this name already exists, then if true,
1117   * the file will be overwritten, and if false an error will be thrown.
1118   * @param bufferSize the size of the buffer to be used.
1119   * @param replication required block replication for the file.
1120   * @param blockSize
1121   * @param progress
1122   * @throws IOException
1123   * @see #setPermission(Path, FsPermission)
1124   * @deprecated API only for 0.20-append
1125   */
1126   @Deprecated
1127   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1128       boolean overwrite, int bufferSize, short replication, long blockSize,
1129       Progressable progress) throws IOException {
1130     return createNonRecursive(f, permission,
1131         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1132             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1133             replication, blockSize, progress);
1134   }
1135
1136   /**
1137    * Opens an FSDataOutputStream at the indicated Path with write-progress
1138    * reporting. Same as create(), except fails if parent directory doesn't
1139    * already exist.
1140    * @param f the file name to open
1141    * @param permission
1142    * @param flags {@link CreateFlag}s to use for this stream.
1143    * @param bufferSize the size of the buffer to be used.
1144    * @param replication required block replication for the file.
1145    * @param blockSize
1146    * @param progress
1147    * @throws IOException
1148    * @see #setPermission(Path, FsPermission)
1149    * @deprecated API only for 0.20-append
1150    */
1151    @Deprecated
1152    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1153        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1154        Progressable progress) throws IOException {
1155      throw new IOException("createNonRecursive unsupported for this filesystem "
1156          + this.getClass());
1157    }
1158
1159  /**
1160   * Creates the given Path as a brand-new zero-length file.  If
1161   * create fails, or if it already existed, return false.
1162   *
1163   * @param f path to use for create
1164   */
1165  public boolean createNewFile(Path f) throws IOException {
1166    if (exists(f)) {
1167      return false;
1168    } else {
1169      create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1170      return true;
1171    }
1172  }
1173
1174  /**
1175   * Append to an existing file (optional operation).
1176   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1177   * @param f the existing file to be appended.
1178   * @throws IOException
1179   */
1180  public FSDataOutputStream append(Path f) throws IOException {
1181    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1182  }
1183  /**
1184   * Append to an existing file (optional operation).
1185   * Same as append(f, bufferSize, null).
1186   * @param f the existing file to be appended.
1187   * @param bufferSize the size of the buffer to be used.
1188   * @throws IOException
1189   */
1190  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1191    return append(f, bufferSize, null);
1192  }
1193
1194  /**
1195   * Append to an existing file (optional operation).
1196   * @param f the existing file to be appended.
1197   * @param bufferSize the size of the buffer to be used.
1198   * @param progress for reporting progress if it is not null.
1199   * @throws IOException
1200   */
1201  public abstract FSDataOutputStream append(Path f, int bufferSize,
1202      Progressable progress) throws IOException;
1203
1204  /**
1205   * Concat existing files together.
1206   * @param trg the path to the target destination.
1207   * @param psrcs the paths to the sources to use for the concatenation.
1208   * @throws IOException
1209   */
1210  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1211    throw new UnsupportedOperationException("Not implemented by the " + 
1212        getClass().getSimpleName() + " FileSystem implementation");
1213  }
1214
1215 /**
1216   * Get replication.
1217   * 
1218   * @deprecated Use getFileStatus() instead
1219   * @param src file name
1220   * @return file replication
1221   * @throws IOException
1222   */ 
1223  @Deprecated
1224  public short getReplication(Path src) throws IOException {
1225    return getFileStatus(src).getReplication();
1226  }
1227
1228  /**
1229   * Set replication for an existing file.
1230   * 
1231   * @param src file name
1232   * @param replication new replication
1233   * @throws IOException
1234   * @return true if successful;
1235   *         false if file does not exist or is a directory
1236   */
1237  public boolean setReplication(Path src, short replication)
1238    throws IOException {
1239    return true;
1240  }
1241
1242  /**
1243   * Renames Path src to Path dst.  Can take place on local fs
1244   * or remote DFS.
1245   * @param src path to be renamed
1246   * @param dst new path after rename
1247   * @throws IOException on failure
1248   * @return true if rename is successful
1249   */
1250  public abstract boolean rename(Path src, Path dst) throws IOException;
1251
1252  /**
1253   * Renames Path src to Path dst
1254   * <ul>
1255   * <li
1256   * <li>Fails if src is a file and dst is a directory.
1257   * <li>Fails if src is a directory and dst is a file.
1258   * <li>Fails if the parent of dst does not exist or is a file.
1259   * </ul>
1260   * <p>
1261   * If OVERWRITE option is not passed as an argument, rename fails
1262   * if the dst already exists.
1263   * <p>
1264   * If OVERWRITE option is passed as an argument, rename overwrites
1265   * the dst if it is a file or an empty directory. Rename fails if dst is
1266   * a non-empty directory.
1267   * <p>
1268   * Note that atomicity of rename is dependent on the file system
1269   * implementation. Please refer to the file system documentation for
1270   * details. This default implementation is non atomic.
1271   * <p>
1272   * This method is deprecated since it is a temporary method added to 
1273   * support the transition from FileSystem to FileContext for user 
1274   * applications.
1275   * 
1276   * @param src path to be renamed
1277   * @param dst new path after rename
1278   * @throws IOException on failure
1279   */
1280  @Deprecated
1281  protected void rename(final Path src, final Path dst,
1282      final Rename... options) throws IOException {
1283    // Default implementation
1284    final FileStatus srcStatus = getFileLinkStatus(src);
1285    if (srcStatus == null) {
1286      throw new FileNotFoundException("rename source " + src + " not found.");
1287    }
1288
1289    boolean overwrite = false;
1290    if (null != options) {
1291      for (Rename option : options) {
1292        if (option == Rename.OVERWRITE) {
1293          overwrite = true;
1294        }
1295      }
1296    }
1297
1298    FileStatus dstStatus;
1299    try {
1300      dstStatus = getFileLinkStatus(dst);
1301    } catch (IOException e) {
1302      dstStatus = null;
1303    }
1304    if (dstStatus != null) {
1305      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1306        throw new IOException("Source " + src + " Destination " + dst
1307            + " both should be either file or directory");
1308      }
1309      if (!overwrite) {
1310        throw new FileAlreadyExistsException("rename destination " + dst
1311            + " already exists.");
1312      }
1313      // Delete the destination that is a file or an empty directory
1314      if (dstStatus.isDirectory()) {
1315        FileStatus[] list = listStatus(dst);
1316        if (list != null && list.length != 0) {
1317          throw new IOException(
1318              "rename cannot overwrite non empty destination directory " + dst);
1319        }
1320      }
1321      delete(dst, false);
1322    } else {
1323      final Path parent = dst.getParent();
1324      final FileStatus parentStatus = getFileStatus(parent);
1325      if (parentStatus == null) {
1326        throw new FileNotFoundException("rename destination parent " + parent
1327            + " not found.");
1328      }
1329      if (!parentStatus.isDirectory()) {
1330        throw new ParentNotDirectoryException("rename destination parent " + parent
1331            + " is a file.");
1332      }
1333    }
1334    if (!rename(src, dst)) {
1335      throw new IOException("rename from " + src + " to " + dst + " failed.");
1336    }
1337  }
1338  
1339  /**
1340   * Delete a file 
1341   * @deprecated Use {@link #delete(Path, boolean)} instead.
1342   */
1343  @Deprecated
1344  public boolean delete(Path f) throws IOException {
1345    return delete(f, true);
1346  }
1347  
1348  /** Delete a file.
1349   *
1350   * @param f the path to delete.
1351   * @param recursive if path is a directory and set to 
1352   * true, the directory is deleted else throws an exception. In
1353   * case of a file the recursive can be set to either true or false. 
1354   * @return  true if delete is successful else false. 
1355   * @throws IOException
1356   */
1357  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1358
1359  /**
1360   * Mark a path to be deleted when FileSystem is closed.
1361   * When the JVM shuts down,
1362   * all FileSystem objects will be closed automatically.
1363   * Then,
1364   * the marked path will be deleted as a result of closing the FileSystem.
1365   *
1366   * The path has to exist in the file system.
1367   * 
1368   * @param f the path to delete.
1369   * @return  true if deleteOnExit is successful, otherwise false.
1370   * @throws IOException
1371   */
1372  public boolean deleteOnExit(Path f) throws IOException {
1373    if (!exists(f)) {
1374      return false;
1375    }
1376    synchronized (deleteOnExit) {
1377      deleteOnExit.add(f);
1378    }
1379    return true;
1380  }
1381  
1382  /**
1383   * Cancel the deletion of the path when the FileSystem is closed
1384   * @param f the path to cancel deletion
1385   */
1386  public boolean cancelDeleteOnExit(Path f) {
1387    synchronized (deleteOnExit) {
1388      return deleteOnExit.remove(f);
1389    }
1390  }
1391
1392  /**
1393   * Delete all files that were marked as delete-on-exit. This recursively
1394   * deletes all files in the specified paths.
1395   */
1396  protected void processDeleteOnExit() {
1397    synchronized (deleteOnExit) {
1398      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1399        Path path = iter.next();
1400        try {
1401          if (exists(path)) {
1402            delete(path, true);
1403          }
1404        }
1405        catch (IOException e) {
1406          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1407        }
1408        iter.remove();
1409      }
1410    }
1411  }
1412  
1413  /** Check if exists.
1414   * @param f source file
1415   */
1416  public boolean exists(Path f) throws IOException {
1417    try {
1418      return getFileStatus(f) != null;
1419    } catch (FileNotFoundException e) {
1420      return false;
1421    }
1422  }
1423
1424  /** True iff the named path is a directory.
1425   * Note: Avoid using this method. Instead reuse the FileStatus 
1426   * returned by getFileStatus() or listStatus() methods.
1427   * @param f path to check
1428   */
1429  public boolean isDirectory(Path f) throws IOException {
1430    try {
1431      return getFileStatus(f).isDirectory();
1432    } catch (FileNotFoundException e) {
1433      return false;               // f does not exist
1434    }
1435  }
1436
1437  /** True iff the named path is a regular file.
1438   * Note: Avoid using this method. Instead reuse the FileStatus 
1439   * returned by getFileStatus() or listStatus() methods.
1440   * @param f path to check
1441   */
1442  public boolean isFile(Path f) throws IOException {
1443    try {
1444      return getFileStatus(f).isFile();
1445    } catch (FileNotFoundException e) {
1446      return false;               // f does not exist
1447    }
1448  }
1449  
1450  /** The number of bytes in a file. */
1451  /** @deprecated Use getFileStatus() instead */
1452  @Deprecated
1453  public long getLength(Path f) throws IOException {
1454    return getFileStatus(f).getLen();
1455  }
1456    
1457  /** Return the {@link ContentSummary} of a given {@link Path}.
1458  * @param f path to use
1459  */
1460  public ContentSummary getContentSummary(Path f) throws IOException {
1461    FileStatus status = getFileStatus(f);
1462    if (status.isFile()) {
1463      // f is a file
1464      return new ContentSummary(status.getLen(), 1, 0);
1465    }
1466    // f is a directory
1467    long[] summary = {0, 0, 1};
1468    for(FileStatus s : listStatus(f)) {
1469      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1470                                     new ContentSummary(s.getLen(), 1, 0);
1471      summary[0] += c.getLength();
1472      summary[1] += c.getFileCount();
1473      summary[2] += c.getDirectoryCount();
1474    }
1475    return new ContentSummary(summary[0], summary[1], summary[2]);
1476  }
1477
1478  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1479      @Override
1480      public boolean accept(Path file) {
1481        return true;
1482      }     
1483    };
1484    
1485  /**
1486   * List the statuses of the files/directories in the given path if the path is
1487   * a directory.
1488   * <p>
1489   * Does not guarantee to return the List of files/directories status in a
1490   * sorted order.
1491   * @param f given path
1492   * @return the statuses of the files/directories in the given patch
1493   * @throws FileNotFoundException when the path does not exist;
1494   *         IOException see specific implementation
1495   */
1496  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1497                                                         IOException;
1498
1499  /**
1500   * Represents a batch of directory entries when iteratively listing a
1501   * directory. This is a private API not meant for use by end users.
1502   * <p>
1503   * For internal use by FileSystem subclasses that override
1504   * {@link FileSystem#listStatusBatch(Path, byte[])} to implement iterative
1505   * listing.
1506   */
1507  @InterfaceAudience.Private
1508  public static class DirectoryEntries {
1509    private final FileStatus[] entries;
1510    private final byte[] token;
1511    private final boolean hasMore;
1512
1513    public DirectoryEntries(FileStatus[] entries, byte[] token, boolean
1514        hasMore) {
1515      this.entries = entries;
1516      if (token != null) {
1517        this.token = token.clone();
1518      } else {
1519        this.token = null;
1520      }
1521      this.hasMore = hasMore;
1522    }
1523
1524    public FileStatus[] getEntries() {
1525      return entries;
1526    }
1527
1528    public byte[] getToken() {
1529      return token;
1530    }
1531
1532    public boolean hasMore() {
1533      return hasMore;
1534    }
1535  }
1536
1537  /**
1538   * Given an opaque iteration token, return the next batch of entries in a
1539   * directory. This is a private API not meant for use by end users.
1540   * <p>
1541   * This method should be overridden by FileSystem subclasses that want to
1542   * use the generic {@link FileSystem#listStatusIterator(Path)} implementation.
1543   * @param f Path to list
1544   * @param token opaque iteration token returned by previous call, or null
1545   *              if this is the first call.
1546   * @return
1547   * @throws FileNotFoundException
1548   * @throws IOException
1549   */
1550  @InterfaceAudience.Private
1551  protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws
1552      FileNotFoundException, IOException {
1553    // The default implementation returns the entire listing as a single batch.
1554    // Thus, there is never a second batch, and no need to respect the passed
1555    // token or set a token in the returned DirectoryEntries.
1556    FileStatus[] listing = listStatus(f);
1557    return new DirectoryEntries(listing, null, false);
1558  }
1559
1560  /*
1561   * Filter files/directories in the given path using the user-supplied path
1562   * filter. Results are added to the given array <code>results</code>.
1563   */
1564  private void listStatus(ArrayList<FileStatus> results, Path f,
1565      PathFilter filter) throws FileNotFoundException, IOException {
1566    FileStatus listing[] = listStatus(f);
1567    if (listing == null) {
1568      throw new IOException("Error accessing " + f);
1569    }
1570
1571    for (int i = 0; i < listing.length; i++) {
1572      if (filter.accept(listing[i].getPath())) {
1573        results.add(listing[i]);
1574      }
1575    }
1576  }
1577
1578  /**
1579   * @return an iterator over the corrupt files under the given path
1580   * (may contain duplicates if a file has more than one corrupt block)
1581   * @throws IOException
1582   */
1583  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1584    throws IOException {
1585    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1586                                            " does not support" +
1587                                            " listCorruptFileBlocks");
1588  }
1589
1590  /**
1591   * Filter files/directories in the given path using the user-supplied path
1592   * filter.
1593   * <p>
1594   * Does not guarantee to return the List of files/directories status in a
1595   * sorted order.
1596   * 
1597   * @param f
1598   *          a path name
1599   * @param filter
1600   *          the user-supplied path filter
1601   * @return an array of FileStatus objects for the files under the given path
1602   *         after applying the filter
1603   * @throws FileNotFoundException when the path does not exist;
1604   *         IOException see specific implementation   
1605   */
1606  public FileStatus[] listStatus(Path f, PathFilter filter) 
1607                                   throws FileNotFoundException, IOException {
1608    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1609    listStatus(results, f, filter);
1610    return results.toArray(new FileStatus[results.size()]);
1611  }
1612
1613  /**
1614   * Filter files/directories in the given list of paths using default
1615   * path filter.
1616   * <p>
1617   * Does not guarantee to return the List of files/directories status in a
1618   * sorted order.
1619   * 
1620   * @param files
1621   *          a list of paths
1622   * @return a list of statuses for the files under the given paths after
1623   *         applying the filter default Path filter
1624   * @throws FileNotFoundException when the path does not exist;
1625   *         IOException see specific implementation
1626   */
1627  public FileStatus[] listStatus(Path[] files)
1628      throws FileNotFoundException, IOException {
1629    return listStatus(files, DEFAULT_FILTER);
1630  }
1631
1632  /**
1633   * Filter files/directories in the given list of paths using user-supplied
1634   * path filter.
1635   * <p>
1636   * Does not guarantee to return the List of files/directories status in a
1637   * sorted order.
1638   * 
1639   * @param files
1640   *          a list of paths
1641   * @param filter
1642   *          the user-supplied path filter
1643   * @return a list of statuses for the files under the given paths after
1644   *         applying the filter
1645   * @throws FileNotFoundException when the path does not exist;
1646   *         IOException see specific implementation
1647   */
1648  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1649      throws FileNotFoundException, IOException {
1650    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1651    for (int i = 0; i < files.length; i++) {
1652      listStatus(results, files[i], filter);
1653    }
1654    return results.toArray(new FileStatus[results.size()]);
1655  }
1656
1657  /**
1658   * <p>Return all the files that match filePattern and are not checksum
1659   * files. Results are sorted by their names.
1660   * 
1661   * <p>
1662   * A filename pattern is composed of <i>regular</i> characters and
1663   * <i>special pattern matching</i> characters, which are:
1664   *
1665   * <dl>
1666   *  <dd>
1667   *   <dl>
1668   *    <p>
1669   *    <dt> <tt> ? </tt>
1670   *    <dd> Matches any single character.
1671   *
1672   *    <p>
1673   *    <dt> <tt> * </tt>
1674   *    <dd> Matches zero or more characters.
1675   *
1676   *    <p>
1677   *    <dt> <tt> [<i>abc</i>] </tt>
1678   *    <dd> Matches a single character from character set
1679   *     <tt>{<i>a,b,c</i>}</tt>.
1680   *
1681   *    <p>
1682   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1683   *    <dd> Matches a single character from the character range
1684   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1685   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1686   *
1687   *    <p>
1688   *    <dt> <tt> [^<i>a</i>] </tt>
1689   *    <dd> Matches a single character that is not from character set or range
1690   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1691   *     immediately to the right of the opening bracket.
1692   *
1693   *    <p>
1694   *    <dt> <tt> \<i>c</i> </tt>
1695   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1696   *
1697   *    <p>
1698   *    <dt> <tt> {ab,cd} </tt>
1699   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1700   *    
1701   *    <p>
1702   *    <dt> <tt> {ab,c{de,fh}} </tt>
1703   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1704   *
1705   *   </dl>
1706   *  </dd>
1707   * </dl>
1708   *
1709   * @param pathPattern a regular expression specifying a pth pattern
1710
1711   * @return an array of paths that match the path pattern
1712   * @throws IOException
1713   */
1714  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1715    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1716  }
1717  
1718  /**
1719   * Return an array of FileStatus objects whose path names match pathPattern
1720   * and is accepted by the user-supplied path filter. Results are sorted by
1721   * their path names.
1722   * Return null if pathPattern has no glob and the path does not exist.
1723   * Return an empty array if pathPattern has a glob and no path matches it. 
1724   * 
1725   * @param pathPattern
1726   *          a regular expression specifying the path pattern
1727   * @param filter
1728   *          a user-supplied path filter
1729   * @return an array of FileStatus objects
1730   * @throws IOException if any I/O error occurs when fetching file status
1731   */
1732  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1733      throws IOException {
1734    return new Globber(this, pathPattern, filter).glob();
1735  }
1736  
1737  /**
1738   * List the statuses of the files/directories in the given path if the path is
1739   * a directory. 
1740   * Return the file's status and block locations If the path is a file.
1741   * 
1742   * If a returned status is a file, it contains the file's block locations.
1743   * 
1744   * @param f is the path
1745   *
1746   * @return an iterator that traverses statuses of the files/directories 
1747   *         in the given path
1748   *
1749   * @throws FileNotFoundException If <code>f</code> does not exist
1750   * @throws IOException If an I/O error occurred
1751   */
1752  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1753  throws FileNotFoundException, IOException {
1754    return listLocatedStatus(f, DEFAULT_FILTER);
1755  }
1756
1757  /**
1758   * Listing a directory
1759   * The returned results include its block location if it is a file
1760   * The results are filtered by the given path filter
1761   * @param f a path
1762   * @param filter a path filter
1763   * @return an iterator that traverses statuses of the files/directories 
1764   *         in the given path
1765   * @throws FileNotFoundException if <code>f</code> does not exist
1766   * @throws IOException if any I/O error occurred
1767   */
1768  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1769      final PathFilter filter)
1770  throws FileNotFoundException, IOException {
1771    return new RemoteIterator<LocatedFileStatus>() {
1772      private final FileStatus[] stats = listStatus(f, filter);
1773      private int i = 0;
1774
1775      @Override
1776      public boolean hasNext() {
1777        return i<stats.length;
1778      }
1779
1780      @Override
1781      public LocatedFileStatus next() throws IOException {
1782        if (!hasNext()) {
1783          throw new NoSuchElementException("No more entry in " + f);
1784        }
1785        FileStatus result = stats[i++];
1786        // for files, use getBlockLocations(FileStatus, int, int) to avoid
1787        // calling getFileStatus(Path) to load the FileStatus again
1788        BlockLocation[] locs = result.isFile() ?
1789            getFileBlockLocations(result, 0, result.getLen()) :
1790            null;
1791        return new LocatedFileStatus(result, locs);
1792      }
1793    };
1794  }
1795
1796  /**
1797   * Generic iterator for implementing {@link #listStatusIterator(Path)}.
1798   */
1799  private class DirListingIterator<T extends FileStatus> implements
1800      RemoteIterator<T> {
1801
1802    private final Path path;
1803    private DirectoryEntries entries;
1804    private int i = 0;
1805
1806    DirListingIterator(Path path) {
1807      this.path = path;
1808    }
1809
1810    @Override
1811    public boolean hasNext() throws IOException {
1812      if (entries == null) {
1813        fetchMore();
1814      }
1815      return i < entries.getEntries().length ||
1816          entries.hasMore();
1817    }
1818
1819    private void fetchMore() throws IOException {
1820      byte[] token = null;
1821      if (entries != null) {
1822        token = entries.getToken();
1823      }
1824      entries = listStatusBatch(path, token);
1825      i = 0;
1826    }
1827
1828    @Override
1829    @SuppressWarnings("unchecked")
1830    public T next() throws IOException {
1831      Preconditions.checkState(hasNext(), "No more items in iterator");
1832      if (i == entries.getEntries().length) {
1833        fetchMore();
1834      }
1835      return (T)entries.getEntries()[i++];
1836    }
1837  }
1838
1839  /**
1840   * Returns a remote iterator so that followup calls are made on demand
1841   * while consuming the entries. Each file system implementation should
1842   * override this method and provide a more efficient implementation, if
1843   * possible. 
1844   * Does not guarantee to return the iterator that traverses statuses
1845   * of the files in a sorted order.
1846   *
1847   * @param p target path
1848   * @return remote iterator
1849   */
1850  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
1851  throws FileNotFoundException, IOException {
1852    return new DirListingIterator<>(p);
1853  }
1854
1855  /**
1856   * List the statuses and block locations of the files in the given path.
1857   * Does not guarantee to return the iterator that traverses statuses
1858   * of the files in a sorted order.
1859   * 
1860   * If the path is a directory, 
1861   *   if recursive is false, returns files in the directory;
1862   *   if recursive is true, return files in the subtree rooted at the path.
1863   * If the path is a file, return the file's status and block locations.
1864   * 
1865   * @param f is the path
1866   * @param recursive if the subdirectories need to be traversed recursively
1867   *
1868   * @return an iterator that traverses statuses of the files
1869   *
1870   * @throws FileNotFoundException when the path does not exist;
1871   *         IOException see specific implementation
1872   */
1873  public RemoteIterator<LocatedFileStatus> listFiles(
1874      final Path f, final boolean recursive)
1875  throws FileNotFoundException, IOException {
1876    return new RemoteIterator<LocatedFileStatus>() {
1877      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1878        new Stack<RemoteIterator<LocatedFileStatus>>();
1879      private RemoteIterator<LocatedFileStatus> curItor =
1880        listLocatedStatus(f);
1881      private LocatedFileStatus curFile;
1882     
1883      @Override
1884      public boolean hasNext() throws IOException {
1885        while (curFile == null) {
1886          if (curItor.hasNext()) {
1887            handleFileStat(curItor.next());
1888          } else if (!itors.empty()) {
1889            curItor = itors.pop();
1890          } else {
1891            return false;
1892          }
1893        }
1894        return true;
1895      }
1896
1897      /**
1898       * Process the input stat.
1899       * If it is a file, return the file stat.
1900       * If it is a directory, traverse the directory if recursive is true;
1901       * ignore it if recursive is false.
1902       * @param stat input status
1903       * @throws IOException if any IO error occurs
1904       */
1905      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1906        if (stat.isFile()) { // file
1907          curFile = stat;
1908        } else if (recursive) { // directory
1909          itors.push(curItor);
1910          curItor = listLocatedStatus(stat.getPath());
1911        }
1912      }
1913
1914      @Override
1915      public LocatedFileStatus next() throws IOException {
1916        if (hasNext()) {
1917          LocatedFileStatus result = curFile;
1918          curFile = null;
1919          return result;
1920        } 
1921        throw new java.util.NoSuchElementException("No more entry in " + f);
1922      }
1923    };
1924  }
1925  
1926  /** Return the current user's home directory in this filesystem.
1927   * The default implementation returns "/user/$USER/".
1928   */
1929  public Path getHomeDirectory() {
1930    return this.makeQualified(
1931        new Path(USER_HOME_PREFIX + "/" + System.getProperty("user.name")));
1932  }
1933
1934
1935  /**
1936   * Set the current working directory for the given file system. All relative
1937   * paths will be resolved relative to it.
1938   * 
1939   * @param new_dir
1940   */
1941  public abstract void setWorkingDirectory(Path new_dir);
1942    
1943  /**
1944   * Get the current working directory for the given file system
1945   * @return the directory pathname
1946   */
1947  public abstract Path getWorkingDirectory();
1948  
1949  
1950  /**
1951   * Note: with the new FilesContext class, getWorkingDirectory()
1952   * will be removed. 
1953   * The working directory is implemented in FilesContext.
1954   * 
1955   * Some file systems like LocalFileSystem have an initial workingDir
1956   * that we use as the starting workingDir. For other file systems
1957   * like HDFS there is no built in notion of an initial workingDir.
1958   * 
1959   * @return if there is built in notion of workingDir then it
1960   * is returned; else a null is returned.
1961   */
1962  protected Path getInitialWorkingDirectory() {
1963    return null;
1964  }
1965
1966  /**
1967   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1968   */
1969  public boolean mkdirs(Path f) throws IOException {
1970    return mkdirs(f, FsPermission.getDirDefault());
1971  }
1972
1973  /**
1974   * Make the given file and all non-existent parents into
1975   * directories. Has the semantics of Unix 'mkdir -p'.
1976   * Existence of the directory hierarchy is not an error.
1977   * @param f path to create
1978   * @param permission to apply to f
1979   */
1980  public abstract boolean mkdirs(Path f, FsPermission permission
1981      ) throws IOException;
1982
1983  /**
1984   * The src file is on the local disk.  Add it to FS at
1985   * the given dst name and the source is kept intact afterwards
1986   * @param src path
1987   * @param dst path
1988   */
1989  public void copyFromLocalFile(Path src, Path dst)
1990    throws IOException {
1991    copyFromLocalFile(false, src, dst);
1992  }
1993
1994  /**
1995   * The src files is on the local disk.  Add it to FS at
1996   * the given dst name, removing the source afterwards.
1997   * @param srcs path
1998   * @param dst path
1999   */
2000  public void moveFromLocalFile(Path[] srcs, Path dst)
2001    throws IOException {
2002    copyFromLocalFile(true, true, srcs, dst);
2003  }
2004
2005  /**
2006   * The src file is on the local disk.  Add it to FS at
2007   * the given dst name, removing the source afterwards.
2008   * @param src path
2009   * @param dst path
2010   */
2011  public void moveFromLocalFile(Path src, Path dst)
2012    throws IOException {
2013    copyFromLocalFile(true, src, dst);
2014  }
2015
2016  /**
2017   * The src file is on the local disk.  Add it to FS at
2018   * the given dst name.
2019   * delSrc indicates if the source should be removed
2020   * @param delSrc whether to delete the src
2021   * @param src path
2022   * @param dst path
2023   */
2024  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
2025    throws IOException {
2026    copyFromLocalFile(delSrc, true, src, dst);
2027  }
2028  
2029  /**
2030   * The src files are on the local disk.  Add it to FS at
2031   * the given dst name.
2032   * delSrc indicates if the source should be removed
2033   * @param delSrc whether to delete the src
2034   * @param overwrite whether to overwrite an existing file
2035   * @param srcs array of paths which are source
2036   * @param dst path
2037   */
2038  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
2039                                Path[] srcs, Path dst)
2040    throws IOException {
2041    Configuration conf = getConf();
2042    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
2043  }
2044  
2045  /**
2046   * The src file is on the local disk.  Add it to FS at
2047   * the given dst name.
2048   * delSrc indicates if the source should be removed
2049   * @param delSrc whether to delete the src
2050   * @param overwrite whether to overwrite an existing file
2051   * @param src path
2052   * @param dst path
2053   */
2054  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
2055                                Path src, Path dst)
2056    throws IOException {
2057    Configuration conf = getConf();
2058    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
2059  }
2060    
2061  /**
2062   * The src file is under FS, and the dst is on the local disk.
2063   * Copy it from FS control to the local dst name.
2064   * @param src path
2065   * @param dst path
2066   */
2067  public void copyToLocalFile(Path src, Path dst) throws IOException {
2068    copyToLocalFile(false, src, dst);
2069  }
2070    
2071  /**
2072   * The src file is under FS, and the dst is on the local disk.
2073   * Copy it from FS control to the local dst name.
2074   * Remove the source afterwards
2075   * @param src path
2076   * @param dst path
2077   */
2078  public void moveToLocalFile(Path src, Path dst) throws IOException {
2079    copyToLocalFile(true, src, dst);
2080  }
2081
2082  /**
2083   * The src file is under FS, and the dst is on the local disk.
2084   * Copy it from FS control to the local dst name.
2085   * delSrc indicates if the src will be removed or not.
2086   * @param delSrc whether to delete the src
2087   * @param src path
2088   * @param dst path
2089   */   
2090  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
2091    throws IOException {
2092    copyToLocalFile(delSrc, src, dst, false);
2093  }
2094  
2095    /**
2096   * The src file is under FS, and the dst is on the local disk. Copy it from FS
2097   * control to the local dst name. delSrc indicates if the src will be removed
2098   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
2099   * as local file system or not. RawLocalFileSystem is non crc file system.So,
2100   * It will not create any crc files at local.
2101   * 
2102   * @param delSrc
2103   *          whether to delete the src
2104   * @param src
2105   *          path
2106   * @param dst
2107   *          path
2108   * @param useRawLocalFileSystem
2109   *          whether to use RawLocalFileSystem as local file system or not.
2110   * 
2111   * @throws IOException
2112   *           - if any IO error
2113   */
2114  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
2115      boolean useRawLocalFileSystem) throws IOException {
2116    Configuration conf = getConf();
2117    FileSystem local = null;
2118    if (useRawLocalFileSystem) {
2119      local = getLocal(conf).getRawFileSystem();
2120    } else {
2121      local = getLocal(conf);
2122    }
2123    FileUtil.copy(this, src, local, dst, delSrc, conf);
2124  }
2125
2126  /**
2127   * Returns a local File that the user can write output to.  The caller
2128   * provides both the eventual FS target name and the local working
2129   * file.  If the FS is local, we write directly into the target.  If
2130   * the FS is remote, we write into the tmp local area.
2131   * @param fsOutputFile path of output file
2132   * @param tmpLocalFile path of local tmp file
2133   */
2134  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2135    throws IOException {
2136    return tmpLocalFile;
2137  }
2138
2139  /**
2140   * Called when we're all done writing to the target.  A local FS will
2141   * do nothing, because we've written to exactly the right place.  A remote
2142   * FS will copy the contents of tmpLocalFile to the correct target at
2143   * fsOutputFile.
2144   * @param fsOutputFile path of output file
2145   * @param tmpLocalFile path to local tmp file
2146   */
2147  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2148    throws IOException {
2149    moveFromLocalFile(tmpLocalFile, fsOutputFile);
2150  }
2151
2152  /**
2153   * No more filesystem operations are needed.  Will
2154   * release any held locks.
2155   */
2156  @Override
2157  public void close() throws IOException {
2158    // delete all files that were marked as delete-on-exit.
2159    processDeleteOnExit();
2160    CACHE.remove(this.key, this);
2161  }
2162
2163  /** Return the total size of all files in the filesystem.*/
2164  public long getUsed() throws IOException{
2165    long used = 0;
2166    FileStatus[] files = listStatus(new Path("/"));
2167    for(FileStatus file:files){
2168      used += file.getLen();
2169    }
2170    return used;
2171  }
2172  
2173  /**
2174   * Get the block size for a particular file.
2175   * @param f the filename
2176   * @return the number of bytes in a block
2177   */
2178  /** @deprecated Use getFileStatus() instead */
2179  @Deprecated
2180  public long getBlockSize(Path f) throws IOException {
2181    return getFileStatus(f).getBlockSize();
2182  }
2183
2184  /**
2185   * Return the number of bytes that large input files should be optimally
2186   * be split into to minimize i/o time.
2187   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2188   */
2189  @Deprecated
2190  public long getDefaultBlockSize() {
2191    // default to 32MB: large enough to minimize the impact of seeks
2192    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2193  }
2194    
2195  /** Return the number of bytes that large input files should be optimally
2196   * be split into to minimize i/o time.  The given path will be used to
2197   * locate the actual filesystem.  The full path does not have to exist.
2198   * @param f path of file
2199   * @return the default block size for the path's filesystem
2200   */
2201  public long getDefaultBlockSize(Path f) {
2202    return getDefaultBlockSize();
2203  }
2204
2205  /**
2206   * Get the default replication.
2207   * @deprecated use {@link #getDefaultReplication(Path)} instead
2208   */
2209  @Deprecated
2210  public short getDefaultReplication() { return 1; }
2211
2212  /**
2213   * Get the default replication for a path.   The given path will be used to
2214   * locate the actual filesystem.  The full path does not have to exist.
2215   * @param path of the file
2216   * @return default replication for the path's filesystem 
2217   */
2218  public short getDefaultReplication(Path path) {
2219    return getDefaultReplication();
2220  }
2221  
2222  /**
2223   * Return a file status object that represents the path.
2224   * @param f The path we want information from
2225   * @return a FileStatus object
2226   * @throws FileNotFoundException when the path does not exist;
2227   *         IOException see specific implementation
2228   */
2229  public abstract FileStatus getFileStatus(Path f) throws IOException;
2230
2231  /**
2232   * Checks if the user can access a path.  The mode specifies which access
2233   * checks to perform.  If the requested permissions are granted, then the
2234   * method returns normally.  If access is denied, then the method throws an
2235   * {@link AccessControlException}.
2236   * <p/>
2237   * The default implementation of this method calls {@link #getFileStatus(Path)}
2238   * and checks the returned permissions against the requested permissions.
2239   * Note that the getFileStatus call will be subject to authorization checks.
2240   * Typically, this requires search (execute) permissions on each directory in
2241   * the path's prefix, but this is implementation-defined.  Any file system
2242   * that provides a richer authorization model (such as ACLs) may override the
2243   * default implementation so that it checks against that model instead.
2244   * <p>
2245   * In general, applications should avoid using this method, due to the risk of
2246   * time-of-check/time-of-use race conditions.  The permissions on a file may
2247   * change immediately after the access call returns.  Most applications should
2248   * prefer running specific file system actions as the desired user represented
2249   * by a {@link UserGroupInformation}.
2250   *
2251   * @param path Path to check
2252   * @param mode type of access to check
2253   * @throws AccessControlException if access is denied
2254   * @throws FileNotFoundException if the path does not exist
2255   * @throws IOException see specific implementation
2256   */
2257  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2258  public void access(Path path, FsAction mode) throws AccessControlException,
2259      FileNotFoundException, IOException {
2260    checkAccessPermissions(this.getFileStatus(path), mode);
2261  }
2262
2263  /**
2264   * This method provides the default implementation of
2265   * {@link #access(Path, FsAction)}.
2266   *
2267   * @param stat FileStatus to check
2268   * @param mode type of access to check
2269   * @throws IOException for any error
2270   */
2271  @InterfaceAudience.Private
2272  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2273      throws IOException {
2274    FsPermission perm = stat.getPermission();
2275    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2276    String user = ugi.getShortUserName();
2277    if (user.equals(stat.getOwner())) {
2278      if (perm.getUserAction().implies(mode)) {
2279        return;
2280      }
2281    } else if (ugi.getGroups().contains(stat.getGroup())) {
2282      if (perm.getGroupAction().implies(mode)) {
2283        return;
2284      }
2285    } else {
2286      if (perm.getOtherAction().implies(mode)) {
2287        return;
2288      }
2289    }
2290    throw new AccessControlException(String.format(
2291      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2292      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2293  }
2294
2295  /**
2296   * See {@link FileContext#fixRelativePart}
2297   */
2298  protected Path fixRelativePart(Path p) {
2299    if (p.isUriPathAbsolute()) {
2300      return p;
2301    } else {
2302      return new Path(getWorkingDirectory(), p);
2303    }
2304  }
2305
2306  /**
2307   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2308   */
2309  public void createSymlink(final Path target, final Path link,
2310      final boolean createParent) throws AccessControlException,
2311      FileAlreadyExistsException, FileNotFoundException,
2312      ParentNotDirectoryException, UnsupportedFileSystemException, 
2313      IOException {
2314    // Supporting filesystems should override this method
2315    throw new UnsupportedOperationException(
2316        "Filesystem does not support symlinks!");
2317  }
2318
2319  /**
2320   * See {@link FileContext#getFileLinkStatus(Path)}
2321   */
2322  public FileStatus getFileLinkStatus(final Path f)
2323      throws AccessControlException, FileNotFoundException,
2324      UnsupportedFileSystemException, IOException {
2325    // Supporting filesystems should override this method
2326    return getFileStatus(f);
2327  }
2328
2329  /**
2330   * See {@link AbstractFileSystem#supportsSymlinks()}
2331   */
2332  public boolean supportsSymlinks() {
2333    return false;
2334  }
2335
2336  /**
2337   * See {@link FileContext#getLinkTarget(Path)}
2338   */
2339  public Path getLinkTarget(Path f) throws IOException {
2340    // Supporting filesystems should override this method
2341    throw new UnsupportedOperationException(
2342        "Filesystem does not support symlinks!");
2343  }
2344
2345  /**
2346   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2347   */
2348  protected Path resolveLink(Path f) throws IOException {
2349    // Supporting filesystems should override this method
2350    throw new UnsupportedOperationException(
2351        "Filesystem does not support symlinks!");
2352  }
2353
2354  /**
2355   * Get the checksum of a file.
2356   *
2357   * @param f The file path
2358   * @return The file checksum.  The default return value is null,
2359   *  which indicates that no checksum algorithm is implemented
2360   *  in the corresponding FileSystem.
2361   */
2362  public FileChecksum getFileChecksum(Path f) throws IOException {
2363    return getFileChecksum(f, Long.MAX_VALUE);
2364  }
2365
2366  /**
2367   * Get the checksum of a file, from the beginning of the file till the
2368   * specific length.
2369   * @param f The file path
2370   * @param length The length of the file range for checksum calculation
2371   * @return The file checksum.
2372   */
2373  public FileChecksum getFileChecksum(Path f, final long length)
2374      throws IOException {
2375    return null;
2376  }
2377
2378  /**
2379   * Set the verify checksum flag. This is only applicable if the 
2380   * corresponding FileSystem supports checksum. By default doesn't do anything.
2381   * @param verifyChecksum
2382   */
2383  public void setVerifyChecksum(boolean verifyChecksum) {
2384    //doesn't do anything
2385  }
2386
2387  /**
2388   * Set the write checksum flag. This is only applicable if the 
2389   * corresponding FileSystem supports checksum. By default doesn't do anything.
2390   * @param writeChecksum
2391   */
2392  public void setWriteChecksum(boolean writeChecksum) {
2393    //doesn't do anything
2394  }
2395
2396  /**
2397   * Returns a status object describing the use and capacity of the
2398   * file system. If the file system has multiple partitions, the
2399   * use and capacity of the root partition is reflected.
2400   * 
2401   * @return a FsStatus object
2402   * @throws IOException
2403   *           see specific implementation
2404   */
2405  public FsStatus getStatus() throws IOException {
2406    return getStatus(null);
2407  }
2408
2409  /**
2410   * Returns a status object describing the use and capacity of the
2411   * file system. If the file system has multiple partitions, the
2412   * use and capacity of the partition pointed to by the specified
2413   * path is reflected.
2414   * @param p Path for which status should be obtained. null means
2415   * the default partition. 
2416   * @return a FsStatus object
2417   * @throws IOException
2418   *           see specific implementation
2419   */
2420  public FsStatus getStatus(Path p) throws IOException {
2421    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2422  }
2423
2424  /**
2425   * Set permission of a path.
2426   * @param p
2427   * @param permission
2428   */
2429  public void setPermission(Path p, FsPermission permission
2430      ) throws IOException {
2431  }
2432
2433  /**
2434   * Set owner of a path (i.e. a file or a directory).
2435   * The parameters username and groupname cannot both be null.
2436   * @param p The path
2437   * @param username If it is null, the original username remains unchanged.
2438   * @param groupname If it is null, the original groupname remains unchanged.
2439   */
2440  public void setOwner(Path p, String username, String groupname
2441      ) throws IOException {
2442  }
2443
2444  /**
2445   * Set access time of a file
2446   * @param p The path
2447   * @param mtime Set the modification time of this file.
2448   *              The number of milliseconds since Jan 1, 1970. 
2449   *              A value of -1 means that this call should not set modification time.
2450   * @param atime Set the access time of this file.
2451   *              The number of milliseconds since Jan 1, 1970. 
2452   *              A value of -1 means that this call should not set access time.
2453   */
2454  public void setTimes(Path p, long mtime, long atime
2455      ) throws IOException {
2456  }
2457
2458  /**
2459   * Create a snapshot with a default name.
2460   * @param path The directory where snapshots will be taken.
2461   * @return the snapshot path.
2462   */
2463  public final Path createSnapshot(Path path) throws IOException {
2464    return createSnapshot(path, null);
2465  }
2466
2467  /**
2468   * Create a snapshot
2469   * @param path The directory where snapshots will be taken.
2470   * @param snapshotName The name of the snapshot
2471   * @return the snapshot path.
2472   */
2473  public Path createSnapshot(Path path, String snapshotName)
2474      throws IOException {
2475    throw new UnsupportedOperationException(getClass().getSimpleName()
2476        + " doesn't support createSnapshot");
2477  }
2478  
2479  /**
2480   * Rename a snapshot
2481   * @param path The directory path where the snapshot was taken
2482   * @param snapshotOldName Old name of the snapshot
2483   * @param snapshotNewName New name of the snapshot
2484   * @throws IOException
2485   */
2486  public void renameSnapshot(Path path, String snapshotOldName,
2487      String snapshotNewName) throws IOException {
2488    throw new UnsupportedOperationException(getClass().getSimpleName()
2489        + " doesn't support renameSnapshot");
2490  }
2491  
2492  /**
2493   * Delete a snapshot of a directory
2494   * @param path  The directory that the to-be-deleted snapshot belongs to
2495   * @param snapshotName The name of the snapshot
2496   */
2497  public void deleteSnapshot(Path path, String snapshotName)
2498      throws IOException {
2499    throw new UnsupportedOperationException(getClass().getSimpleName()
2500        + " doesn't support deleteSnapshot");
2501  }
2502  
2503  /**
2504   * Modifies ACL entries of files and directories.  This method can add new ACL
2505   * entries or modify the permissions on existing ACL entries.  All existing
2506   * ACL entries that are not specified in this call are retained without
2507   * changes.  (Modifications are merged into the current ACL.)
2508   *
2509   * @param path Path to modify
2510   * @param aclSpec List<AclEntry> describing modifications
2511   * @throws IOException if an ACL could not be modified
2512   */
2513  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2514      throws IOException {
2515    throw new UnsupportedOperationException(getClass().getSimpleName()
2516        + " doesn't support modifyAclEntries");
2517  }
2518
2519  /**
2520   * Removes ACL entries from files and directories.  Other ACL entries are
2521   * retained.
2522   *
2523   * @param path Path to modify
2524   * @param aclSpec List<AclEntry> describing entries to remove
2525   * @throws IOException if an ACL could not be modified
2526   */
2527  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2528      throws IOException {
2529    throw new UnsupportedOperationException(getClass().getSimpleName()
2530        + " doesn't support removeAclEntries");
2531  }
2532
2533  /**
2534   * Removes all default ACL entries from files and directories.
2535   *
2536   * @param path Path to modify
2537   * @throws IOException if an ACL could not be modified
2538   */
2539  public void removeDefaultAcl(Path path)
2540      throws IOException {
2541    throw new UnsupportedOperationException(getClass().getSimpleName()
2542        + " doesn't support removeDefaultAcl");
2543  }
2544
2545  /**
2546   * Removes all but the base ACL entries of files and directories.  The entries
2547   * for user, group, and others are retained for compatibility with permission
2548   * bits.
2549   *
2550   * @param path Path to modify
2551   * @throws IOException if an ACL could not be removed
2552   */
2553  public void removeAcl(Path path)
2554      throws IOException {
2555    throw new UnsupportedOperationException(getClass().getSimpleName()
2556        + " doesn't support removeAcl");
2557  }
2558
2559  /**
2560   * Fully replaces ACL of files and directories, discarding all existing
2561   * entries.
2562   *
2563   * @param path Path to modify
2564   * @param aclSpec List<AclEntry> describing modifications, must include entries
2565   *   for user, group, and others for compatibility with permission bits.
2566   * @throws IOException if an ACL could not be modified
2567   */
2568  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2569    throw new UnsupportedOperationException(getClass().getSimpleName()
2570        + " doesn't support setAcl");
2571  }
2572
2573  /**
2574   * Gets the ACL of a file or directory.
2575   *
2576   * @param path Path to get
2577   * @return AclStatus describing the ACL of the file or directory
2578   * @throws IOException if an ACL could not be read
2579   */
2580  public AclStatus getAclStatus(Path path) throws IOException {
2581    throw new UnsupportedOperationException(getClass().getSimpleName()
2582        + " doesn't support getAclStatus");
2583  }
2584
2585  /**
2586   * Set an xattr of a file or directory.
2587   * The name must be prefixed with the namespace followed by ".". For example,
2588   * "user.attr".
2589   * <p/>
2590   * Refer to the HDFS extended attributes user documentation for details.
2591   *
2592   * @param path Path to modify
2593   * @param name xattr name.
2594   * @param value xattr value.
2595   * @throws IOException
2596   */
2597  public void setXAttr(Path path, String name, byte[] value)
2598      throws IOException {
2599    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2600        XAttrSetFlag.REPLACE));
2601  }
2602
2603  /**
2604   * Set an xattr of a file or directory.
2605   * The name must be prefixed with the namespace followed by ".". For example,
2606   * "user.attr".
2607   * <p/>
2608   * Refer to the HDFS extended attributes user documentation for details.
2609   *
2610   * @param path Path to modify
2611   * @param name xattr name.
2612   * @param value xattr value.
2613   * @param flag xattr set flag
2614   * @throws IOException
2615   */
2616  public void setXAttr(Path path, String name, byte[] value,
2617      EnumSet<XAttrSetFlag> flag) throws IOException {
2618    throw new UnsupportedOperationException(getClass().getSimpleName()
2619        + " doesn't support setXAttr");
2620  }
2621
2622  /**
2623   * Get an xattr name and value for a file or directory.
2624   * The name must be prefixed with the namespace followed by ".". For example,
2625   * "user.attr".
2626   * <p/>
2627   * Refer to the HDFS extended attributes user documentation for details.
2628   *
2629   * @param path Path to get extended attribute
2630   * @param name xattr name.
2631   * @return byte[] xattr value.
2632   * @throws IOException
2633   */
2634  public byte[] getXAttr(Path path, String name) throws IOException {
2635    throw new UnsupportedOperationException(getClass().getSimpleName()
2636        + " doesn't support getXAttr");
2637  }
2638
2639  /**
2640   * Get all of the xattr name/value pairs for a file or directory.
2641   * Only those xattrs which the logged-in user has permissions to view
2642   * are returned.
2643   * <p/>
2644   * Refer to the HDFS extended attributes user documentation for details.
2645   *
2646   * @param path Path to get extended attributes
2647   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2648   * @throws IOException
2649   */
2650  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2651    throw new UnsupportedOperationException(getClass().getSimpleName()
2652        + " doesn't support getXAttrs");
2653  }
2654
2655  /**
2656   * Get all of the xattrs name/value pairs for a file or directory.
2657   * Only those xattrs which the logged-in user has permissions to view
2658   * are returned.
2659   * <p/>
2660   * Refer to the HDFS extended attributes user documentation for details.
2661   *
2662   * @param path Path to get extended attributes
2663   * @param names XAttr names.
2664   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2665   * @throws IOException
2666   */
2667  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2668      throws IOException {
2669    throw new UnsupportedOperationException(getClass().getSimpleName()
2670        + " doesn't support getXAttrs");
2671  }
2672
2673  /**
2674   * Get all of the xattr names for a file or directory.
2675   * Only those xattr names which the logged-in user has permissions to view
2676   * are returned.
2677   * <p/>
2678   * Refer to the HDFS extended attributes user documentation for details.
2679   *
2680   * @param path Path to get extended attributes
2681   * @return List<String> of the XAttr names of the file or directory
2682   * @throws IOException
2683   */
2684  public List<String> listXAttrs(Path path) throws IOException {
2685    throw new UnsupportedOperationException(getClass().getSimpleName()
2686            + " doesn't support listXAttrs");
2687  }
2688
2689  /**
2690   * Remove an xattr of a file or directory.
2691   * The name must be prefixed with the namespace followed by ".". For example,
2692   * "user.attr".
2693   * <p/>
2694   * Refer to the HDFS extended attributes user documentation for details.
2695   *
2696   * @param path Path to remove extended attribute
2697   * @param name xattr name
2698   * @throws IOException
2699   */
2700  public void removeXAttr(Path path, String name) throws IOException {
2701    throw new UnsupportedOperationException(getClass().getSimpleName()
2702        + " doesn't support removeXAttr");
2703  }
2704
2705  /**
2706   * Get the root directory of Trash for current user when the path specified
2707   * is deleted.
2708   *
2709   * @param path the trash root of the path to be determined.
2710   * @return the default implementation returns "/user/$USER/.Trash".
2711   */
2712  public Path getTrashRoot(Path path) {
2713    return this.makeQualified(new Path(getHomeDirectory().toUri().getPath(),
2714        TRASH_PREFIX));
2715  }
2716
2717  /**
2718   * Get all the trash roots for current user or all users.
2719   *
2720   * @param allUsers return trash roots for all users if true.
2721   * @return all the trash root directories.
2722   *         Default FileSystem returns .Trash under users' home directories if
2723   *         /user/$USER/.Trash exists.
2724   */
2725  public Collection<FileStatus> getTrashRoots(boolean allUsers) {
2726    Path userHome = new Path(getHomeDirectory().toUri().getPath());
2727    List<FileStatus> ret = new ArrayList<>();
2728    try {
2729      if (!allUsers) {
2730        Path userTrash = new Path(userHome, TRASH_PREFIX);
2731        if (exists(userTrash)) {
2732          ret.add(getFileStatus(userTrash));
2733        }
2734      } else {
2735        Path homeParent = userHome.getParent();
2736        if (exists(homeParent)) {
2737          FileStatus[] candidates = listStatus(homeParent);
2738          for (FileStatus candidate : candidates) {
2739            Path userTrash = new Path(candidate.getPath(), TRASH_PREFIX);
2740            if (exists(userTrash)) {
2741              candidate.setPath(userTrash);
2742              ret.add(candidate);
2743            }
2744          }
2745        }
2746      }
2747    } catch (IOException e) {
2748      LOG.warn("Cannot get all trash roots", e);
2749    }
2750    return ret;
2751  }
2752
2753  // making it volatile to be able to do a double checked locking
2754  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2755
2756  private static final Map<String, Class<? extends FileSystem>>
2757    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2758
2759  private static void loadFileSystems() {
2760    synchronized (FileSystem.class) {
2761      if (!FILE_SYSTEMS_LOADED) {
2762        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2763        Iterator<FileSystem> it = serviceLoader.iterator();
2764        while (it.hasNext()) {
2765          FileSystem fs = null;
2766          try {
2767            fs = it.next();
2768            try {
2769              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2770            } catch (Exception e) {
2771              LOG.warn("Cannot load: " + fs + " from " +
2772                  ClassUtil.findContainingJar(fs.getClass()), e);
2773            }
2774          } catch (ServiceConfigurationError ee) {
2775            LOG.warn("Cannot load filesystem", ee);
2776          }
2777        }
2778        FILE_SYSTEMS_LOADED = true;
2779      }
2780    }
2781  }
2782
2783  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2784      Configuration conf) throws IOException {
2785    if (!FILE_SYSTEMS_LOADED) {
2786      loadFileSystems();
2787    }
2788    Class<? extends FileSystem> clazz = null;
2789    if (conf != null) {
2790      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2791    }
2792    if (clazz == null) {
2793      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2794    }
2795    if (clazz == null) {
2796      throw new IOException("No FileSystem for scheme: " + scheme);
2797    }
2798    return clazz;
2799  }
2800
2801  private static FileSystem createFileSystem(URI uri, Configuration conf
2802      ) throws IOException {
2803    Tracer tracer = FsTracer.get(conf);
2804    TraceScope scope = null;
2805    if (tracer != null) {
2806      scope = tracer.newScope("FileSystem#createFileSystem");
2807      scope.addKVAnnotation("scheme", uri.getScheme());
2808    }
2809    try {
2810      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2811      if (clazz == null) {
2812        throw new IOException("No FileSystem for scheme: " + uri.getScheme());
2813      }
2814      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2815      fs.tracer = tracer;
2816      fs.initialize(uri, conf);
2817      return fs;
2818    } finally {
2819      if (scope != null) scope.close();
2820    }
2821  }
2822
2823  /** Caching FileSystem objects */
2824  static class Cache {
2825    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2826
2827    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2828    private final Set<Key> toAutoClose = new HashSet<Key>();
2829
2830    /** A variable that makes all objects in the cache unique */
2831    private static AtomicLong unique = new AtomicLong(1);
2832
2833    FileSystem get(URI uri, Configuration conf) throws IOException{
2834      Key key = new Key(uri, conf);
2835      return getInternal(uri, conf, key);
2836    }
2837
2838    /** The objects inserted into the cache using this method are all unique */
2839    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2840      Key key = new Key(uri, conf, unique.getAndIncrement());
2841      return getInternal(uri, conf, key);
2842    }
2843
2844    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2845      FileSystem fs;
2846      synchronized (this) {
2847        fs = map.get(key);
2848      }
2849      if (fs != null) {
2850        return fs;
2851      }
2852
2853      fs = createFileSystem(uri, conf);
2854      synchronized (this) { // refetch the lock again
2855        FileSystem oldfs = map.get(key);
2856        if (oldfs != null) { // a file system is created while lock is releasing
2857          fs.close(); // close the new file system
2858          return oldfs;  // return the old file system
2859        }
2860        
2861        // now insert the new file system into the map
2862        if (map.isEmpty()
2863                && !ShutdownHookManager.get().isShutdownInProgress()) {
2864          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2865        }
2866        fs.key = key;
2867        map.put(key, fs);
2868        if (conf.getBoolean("fs.automatic.close", true)) {
2869          toAutoClose.add(key);
2870        }
2871        return fs;
2872      }
2873    }
2874
2875    synchronized void remove(Key key, FileSystem fs) {
2876      FileSystem cachedFs = map.remove(key);
2877      if (fs == cachedFs) {
2878        toAutoClose.remove(key);
2879      } else if (cachedFs != null) {
2880        map.put(key, cachedFs);
2881      }
2882    }
2883
2884    synchronized void closeAll() throws IOException {
2885      closeAll(false);
2886    }
2887
2888    /**
2889     * Close all FileSystem instances in the Cache.
2890     * @param onlyAutomatic only close those that are marked for automatic closing
2891     */
2892    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2893      List<IOException> exceptions = new ArrayList<IOException>();
2894
2895      // Make a copy of the keys in the map since we'll be modifying
2896      // the map while iterating over it, which isn't safe.
2897      List<Key> keys = new ArrayList<Key>();
2898      keys.addAll(map.keySet());
2899
2900      for (Key key : keys) {
2901        final FileSystem fs = map.get(key);
2902
2903        if (onlyAutomatic && !toAutoClose.contains(key)) {
2904          continue;
2905        }
2906
2907        //remove from cache
2908        map.remove(key);
2909        toAutoClose.remove(key);
2910
2911        if (fs != null) {
2912          try {
2913            fs.close();
2914          }
2915          catch(IOException ioe) {
2916            exceptions.add(ioe);
2917          }
2918        }
2919      }
2920
2921      if (!exceptions.isEmpty()) {
2922        throw MultipleIOException.createIOException(exceptions);
2923      }
2924    }
2925
2926    private class ClientFinalizer implements Runnable {
2927      @Override
2928      public synchronized void run() {
2929        try {
2930          closeAll(true);
2931        } catch (IOException e) {
2932          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2933        }
2934      }
2935    }
2936
2937    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2938      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2939      //Make a pass over the list and collect the filesystems to close
2940      //we cannot close inline since close() removes the entry from the Map
2941      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2942        final Key key = entry.getKey();
2943        final FileSystem fs = entry.getValue();
2944        if (ugi.equals(key.ugi) && fs != null) {
2945          targetFSList.add(fs);   
2946        }
2947      }
2948      List<IOException> exceptions = new ArrayList<IOException>();
2949      //now make a pass over the target list and close each
2950      for (FileSystem fs : targetFSList) {
2951        try {
2952          fs.close();
2953        }
2954        catch(IOException ioe) {
2955          exceptions.add(ioe);
2956        }
2957      }
2958      if (!exceptions.isEmpty()) {
2959        throw MultipleIOException.createIOException(exceptions);
2960      }
2961    }
2962
2963    /** FileSystem.Cache.Key */
2964    static class Key {
2965      final String scheme;
2966      final String authority;
2967      final UserGroupInformation ugi;
2968      final long unique;   // an artificial way to make a key unique
2969
2970      Key(URI uri, Configuration conf) throws IOException {
2971        this(uri, conf, 0);
2972      }
2973
2974      Key(URI uri, Configuration conf, long unique) throws IOException {
2975        scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
2976        authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
2977        this.unique = unique;
2978        
2979        this.ugi = UserGroupInformation.getCurrentUser();
2980      }
2981
2982      @Override
2983      public int hashCode() {
2984        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2985      }
2986
2987      static boolean isEqual(Object a, Object b) {
2988        return a == b || (a != null && a.equals(b));        
2989      }
2990
2991      @Override
2992      public boolean equals(Object obj) {
2993        if (obj == this) {
2994          return true;
2995        }
2996        if (obj != null && obj instanceof Key) {
2997          Key that = (Key)obj;
2998          return isEqual(this.scheme, that.scheme)
2999                 && isEqual(this.authority, that.authority)
3000                 && isEqual(this.ugi, that.ugi)
3001                 && (this.unique == that.unique);
3002        }
3003        return false;        
3004      }
3005
3006      @Override
3007      public String toString() {
3008        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
3009      }
3010    }
3011  }
3012  
3013  /**
3014   * Tracks statistics about how many reads, writes, and so forth have been
3015   * done in a FileSystem.
3016   * 
3017   * Since there is only one of these objects per FileSystem, there will 
3018   * typically be many threads writing to this object.  Almost every operation
3019   * on an open file will involve a write to this object.  In contrast, reading
3020   * statistics is done infrequently by most programs, and not at all by others.
3021   * Hence, this is optimized for writes.
3022   * 
3023   * Each thread writes to its own thread-local area of memory.  This removes 
3024   * contention and allows us to scale up to many, many threads.  To read
3025   * statistics, the reader thread totals up the contents of all of the 
3026   * thread-local data areas.
3027   */
3028  public static final class Statistics {
3029    /**
3030     * Statistics data.
3031     * 
3032     * There is only a single writer to thread-local StatisticsData objects.
3033     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
3034     * to prevent lost updates.
3035     * The Java specification guarantees that updates to volatile longs will
3036     * be perceived as atomic with respect to other threads, which is all we
3037     * need.
3038     */
3039    public static class StatisticsData {
3040      volatile long bytesRead;
3041      volatile long bytesWritten;
3042      volatile int readOps;
3043      volatile int largeReadOps;
3044      volatile int writeOps;
3045
3046      /**
3047       * Add another StatisticsData object to this one.
3048       */
3049      void add(StatisticsData other) {
3050        this.bytesRead += other.bytesRead;
3051        this.bytesWritten += other.bytesWritten;
3052        this.readOps += other.readOps;
3053        this.largeReadOps += other.largeReadOps;
3054        this.writeOps += other.writeOps;
3055      }
3056
3057      /**
3058       * Negate the values of all statistics.
3059       */
3060      void negate() {
3061        this.bytesRead = -this.bytesRead;
3062        this.bytesWritten = -this.bytesWritten;
3063        this.readOps = -this.readOps;
3064        this.largeReadOps = -this.largeReadOps;
3065        this.writeOps = -this.writeOps;
3066      }
3067
3068      @Override
3069      public String toString() {
3070        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
3071            + readOps + " read ops, " + largeReadOps + " large read ops, "
3072            + writeOps + " write ops";
3073      }
3074      
3075      public long getBytesRead() {
3076        return bytesRead;
3077      }
3078      
3079      public long getBytesWritten() {
3080        return bytesWritten;
3081      }
3082      
3083      public int getReadOps() {
3084        return readOps;
3085      }
3086      
3087      public int getLargeReadOps() {
3088        return largeReadOps;
3089      }
3090      
3091      public int getWriteOps() {
3092        return writeOps;
3093      }
3094    }
3095
3096    private interface StatisticsAggregator<T> {
3097      void accept(StatisticsData data);
3098      T aggregate();
3099    }
3100
3101    private final String scheme;
3102
3103    /**
3104     * rootData is data that doesn't belong to any thread, but will be added
3105     * to the totals.  This is useful for making copies of Statistics objects,
3106     * and for storing data that pertains to threads that have been garbage
3107     * collected.  Protected by the Statistics lock.
3108     */
3109    private final StatisticsData rootData;
3110
3111    /**
3112     * Thread-local data.
3113     */
3114    private final ThreadLocal<StatisticsData> threadData;
3115
3116    /**
3117     * Set of all thread-local data areas.  Protected by the Statistics lock.
3118     * The references to the statistics data are kept using weak references
3119     * to the associated threads. Proper clean-up is performed by the cleaner
3120     * thread when the threads are garbage collected.
3121     */
3122    private final Set<StatisticsDataReference> allData;
3123
3124    /**
3125     * Global reference queue and a cleaner thread that manage statistics data
3126     * references from all filesystem instances.
3127     */
3128    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
3129    private static final Thread STATS_DATA_CLEANER;
3130
3131    static {
3132      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
3133      // start a single daemon cleaner thread
3134      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
3135      STATS_DATA_CLEANER.
3136          setName(StatisticsDataReferenceCleaner.class.getName());
3137      STATS_DATA_CLEANER.setDaemon(true);
3138      STATS_DATA_CLEANER.start();
3139    }
3140
3141    public Statistics(String scheme) {
3142      this.scheme = scheme;
3143      this.rootData = new StatisticsData();
3144      this.threadData = new ThreadLocal<StatisticsData>();
3145      this.allData = new HashSet<StatisticsDataReference>();
3146    }
3147
3148    /**
3149     * Copy constructor.
3150     * 
3151     * @param other    The input Statistics object which is cloned.
3152     */
3153    public Statistics(Statistics other) {
3154      this.scheme = other.scheme;
3155      this.rootData = new StatisticsData();
3156      other.visitAll(new StatisticsAggregator<Void>() {
3157        @Override
3158        public void accept(StatisticsData data) {
3159          rootData.add(data);
3160        }
3161
3162        public Void aggregate() {
3163          return null;
3164        }
3165      });
3166      this.threadData = new ThreadLocal<StatisticsData>();
3167      this.allData = new HashSet<StatisticsDataReference>();
3168    }
3169
3170    /**
3171     * A weak reference to a thread that also includes the data associated
3172     * with that thread. On the thread being garbage collected, it is enqueued
3173     * to the reference queue for clean-up.
3174     */
3175    private class StatisticsDataReference extends WeakReference<Thread> {
3176      private final StatisticsData data;
3177
3178      public StatisticsDataReference(StatisticsData data, Thread thread) {
3179        super(thread, STATS_DATA_REF_QUEUE);
3180        this.data = data;
3181      }
3182
3183      public StatisticsData getData() {
3184        return data;
3185      }
3186
3187      /**
3188       * Performs clean-up action when the associated thread is garbage
3189       * collected.
3190       */
3191      public void cleanUp() {
3192        // use the statistics lock for safety
3193        synchronized (Statistics.this) {
3194          /*
3195           * If the thread that created this thread-local data no longer exists,
3196           * remove the StatisticsData from our list and fold the values into
3197           * rootData.
3198           */
3199          rootData.add(data);
3200          allData.remove(this);
3201        }
3202      }
3203    }
3204
3205    /**
3206     * Background action to act on references being removed.
3207     */
3208    private static class StatisticsDataReferenceCleaner implements Runnable {
3209      @Override
3210      public void run() {
3211        while (!Thread.interrupted()) {
3212          try {
3213            StatisticsDataReference ref =
3214                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
3215            ref.cleanUp();
3216          } catch (InterruptedException ie) {
3217            LOG.warn("Cleaner thread interrupted, will stop", ie);
3218            Thread.currentThread().interrupt();
3219          } catch (Throwable th) {
3220            LOG.warn("Exception in the cleaner thread but it will continue to "
3221                + "run", th);
3222          }
3223        }
3224      }
3225    }
3226
3227    /**
3228     * Get or create the thread-local data associated with the current thread.
3229     */
3230    public StatisticsData getThreadStatistics() {
3231      StatisticsData data = threadData.get();
3232      if (data == null) {
3233        data = new StatisticsData();
3234        threadData.set(data);
3235        StatisticsDataReference ref =
3236            new StatisticsDataReference(data, Thread.currentThread());
3237        synchronized(this) {
3238          allData.add(ref);
3239        }
3240      }
3241      return data;
3242    }
3243
3244    /**
3245     * Increment the bytes read in the statistics
3246     * @param newBytes the additional bytes read
3247     */
3248    public void incrementBytesRead(long newBytes) {
3249      getThreadStatistics().bytesRead += newBytes;
3250    }
3251    
3252    /**
3253     * Increment the bytes written in the statistics
3254     * @param newBytes the additional bytes written
3255     */
3256    public void incrementBytesWritten(long newBytes) {
3257      getThreadStatistics().bytesWritten += newBytes;
3258    }
3259    
3260    /**
3261     * Increment the number of read operations
3262     * @param count number of read operations
3263     */
3264    public void incrementReadOps(int count) {
3265      getThreadStatistics().readOps += count;
3266    }
3267
3268    /**
3269     * Increment the number of large read operations
3270     * @param count number of large read operations
3271     */
3272    public void incrementLargeReadOps(int count) {
3273      getThreadStatistics().largeReadOps += count;
3274    }
3275
3276    /**
3277     * Increment the number of write operations
3278     * @param count number of write operations
3279     */
3280    public void incrementWriteOps(int count) {
3281      getThreadStatistics().writeOps += count;
3282    }
3283
3284    /**
3285     * Apply the given aggregator to all StatisticsData objects associated with
3286     * this Statistics object.
3287     *
3288     * For each StatisticsData object, we will call accept on the visitor.
3289     * Finally, at the end, we will call aggregate to get the final total. 
3290     *
3291     * @param         visitor to use.
3292     * @return        The total.
3293     */
3294    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3295      visitor.accept(rootData);
3296      for (StatisticsDataReference ref: allData) {
3297        StatisticsData data = ref.getData();
3298        visitor.accept(data);
3299      }
3300      return visitor.aggregate();
3301    }
3302
3303    /**
3304     * Get the total number of bytes read
3305     * @return the number of bytes
3306     */
3307    public long getBytesRead() {
3308      return visitAll(new StatisticsAggregator<Long>() {
3309        private long bytesRead = 0;
3310
3311        @Override
3312        public void accept(StatisticsData data) {
3313          bytesRead += data.bytesRead;
3314        }
3315
3316        public Long aggregate() {
3317          return bytesRead;
3318        }
3319      });
3320    }
3321    
3322    /**
3323     * Get the total number of bytes written
3324     * @return the number of bytes
3325     */
3326    public long getBytesWritten() {
3327      return visitAll(new StatisticsAggregator<Long>() {
3328        private long bytesWritten = 0;
3329
3330        @Override
3331        public void accept(StatisticsData data) {
3332          bytesWritten += data.bytesWritten;
3333        }
3334
3335        public Long aggregate() {
3336          return bytesWritten;
3337        }
3338      });
3339    }
3340    
3341    /**
3342     * Get the number of file system read operations such as list files
3343     * @return number of read operations
3344     */
3345    public int getReadOps() {
3346      return visitAll(new StatisticsAggregator<Integer>() {
3347        private int readOps = 0;
3348
3349        @Override
3350        public void accept(StatisticsData data) {
3351          readOps += data.readOps;
3352          readOps += data.largeReadOps;
3353        }
3354
3355        public Integer aggregate() {
3356          return readOps;
3357        }
3358      });
3359    }
3360
3361    /**
3362     * Get the number of large file system read operations such as list files
3363     * under a large directory
3364     * @return number of large read operations
3365     */
3366    public int getLargeReadOps() {
3367      return visitAll(new StatisticsAggregator<Integer>() {
3368        private int largeReadOps = 0;
3369
3370        @Override
3371        public void accept(StatisticsData data) {
3372          largeReadOps += data.largeReadOps;
3373        }
3374
3375        public Integer aggregate() {
3376          return largeReadOps;
3377        }
3378      });
3379    }
3380
3381    /**
3382     * Get the number of file system write operations such as create, append 
3383     * rename etc.
3384     * @return number of write operations
3385     */
3386    public int getWriteOps() {
3387      return visitAll(new StatisticsAggregator<Integer>() {
3388        private int writeOps = 0;
3389
3390        @Override
3391        public void accept(StatisticsData data) {
3392          writeOps += data.writeOps;
3393        }
3394
3395        public Integer aggregate() {
3396          return writeOps;
3397        }
3398      });
3399    }
3400
3401    /**
3402     * Get all statistics data
3403     * MR or other frameworks can use the method to get all statistics at once.
3404     * @return the StatisticsData
3405     */
3406    public StatisticsData getData() {
3407      return visitAll(new StatisticsAggregator<StatisticsData>() {
3408        private StatisticsData all = new StatisticsData();
3409
3410        @Override
3411        public void accept(StatisticsData data) {
3412          all.add(data);
3413        }
3414
3415        public StatisticsData aggregate() {
3416          return all;
3417        }
3418      });
3419    }
3420
3421    @Override
3422    public String toString() {
3423      return visitAll(new StatisticsAggregator<String>() {
3424        private StatisticsData total = new StatisticsData();
3425
3426        @Override
3427        public void accept(StatisticsData data) {
3428          total.add(data);
3429        }
3430
3431        public String aggregate() {
3432          return total.toString();
3433        }
3434      });
3435    }
3436
3437    /**
3438     * Resets all statistics to 0.
3439     *
3440     * In order to reset, we add up all the thread-local statistics data, and
3441     * set rootData to the negative of that.
3442     *
3443     * This may seem like a counterintuitive way to reset the statsitics.  Why
3444     * can't we just zero out all the thread-local data?  Well, thread-local
3445     * data can only be modified by the thread that owns it.  If we tried to
3446     * modify the thread-local data from this thread, our modification might get
3447     * interleaved with a read-modify-write operation done by the thread that
3448     * owns the data.  That would result in our update getting lost.
3449     *
3450     * The approach used here avoids this problem because it only ever reads
3451     * (not writes) the thread-local data.  Both reads and writes to rootData
3452     * are done under the lock, so we're free to modify rootData from any thread
3453     * that holds the lock.
3454     */
3455    public void reset() {
3456      visitAll(new StatisticsAggregator<Void>() {
3457        private StatisticsData total = new StatisticsData();
3458
3459        @Override
3460        public void accept(StatisticsData data) {
3461          total.add(data);
3462        }
3463
3464        public Void aggregate() {
3465          total.negate();
3466          rootData.add(total);
3467          return null;
3468        }
3469      });
3470    }
3471    
3472    /**
3473     * Get the uri scheme associated with this statistics object.
3474     * @return the schema associated with this set of statistics
3475     */
3476    public String getScheme() {
3477      return scheme;
3478    }
3479
3480    @VisibleForTesting
3481    synchronized int getAllThreadLocalDataSize() {
3482      return allData.size();
3483    }
3484  }
3485  
3486  /**
3487   * Get the Map of Statistics object indexed by URI Scheme.
3488   * @return a Map having a key as URI scheme and value as Statistics object
3489   * @deprecated use {@link #getGlobalStorageStatistics()}
3490   */
3491  @Deprecated
3492  public static synchronized Map<String, Statistics> getStatistics() {
3493    Map<String, Statistics> result = new HashMap<String, Statistics>();
3494    for(Statistics stat: statisticsTable.values()) {
3495      result.put(stat.getScheme(), stat);
3496    }
3497    return result;
3498  }
3499
3500  /**
3501   * Return the FileSystem classes that have Statistics.
3502   * @deprecated use {@link #getGlobalStorageStatistics()}
3503   */
3504  @Deprecated
3505  public static synchronized List<Statistics> getAllStatistics() {
3506    return new ArrayList<Statistics>(statisticsTable.values());
3507  }
3508  
3509  /**
3510   * Get the statistics for a particular file system
3511   * @param cls the class to lookup
3512   * @return a statistics object
3513   * @deprecated use {@link #getGlobalStorageStatistics()}
3514   */
3515  @Deprecated
3516  public static synchronized Statistics getStatistics(final String scheme,
3517      Class<? extends FileSystem> cls) {
3518    Statistics result = statisticsTable.get(cls);
3519    if (result == null) {
3520      final Statistics newStats = new Statistics(scheme);
3521      statisticsTable.put(cls, newStats);
3522      result = newStats;
3523      GlobalStorageStatistics.INSTANCE.put(scheme,
3524          new StorageStatisticsProvider() {
3525            @Override
3526            public StorageStatistics provide() {
3527              return new FileSystemStorageStatistics(scheme, newStats);
3528            }
3529          });
3530    }
3531    return result;
3532  }
3533  
3534  /**
3535   * Reset all statistics for all file systems
3536   */
3537  public static synchronized void clearStatistics() {
3538    final Iterator<StorageStatistics> iterator =
3539        GlobalStorageStatistics.INSTANCE.iterator();
3540    while (iterator.hasNext()) {
3541      final StorageStatistics statistics = iterator.next();
3542      statistics.reset();
3543    }
3544  }
3545
3546  /**
3547   * Print all statistics for all file systems
3548   */
3549  public static synchronized
3550  void printStatistics() throws IOException {
3551    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3552            statisticsTable.entrySet()) {
3553      System.out.println("  FileSystem " + pair.getKey().getName() + 
3554                         ": " + pair.getValue());
3555    }
3556  }
3557
3558  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3559  private static boolean symlinksEnabled = false;
3560
3561  private static Configuration conf = null;
3562
3563  @VisibleForTesting
3564  public static boolean areSymlinksEnabled() {
3565    return symlinksEnabled;
3566  }
3567
3568  @VisibleForTesting
3569  public static void enableSymlinks() {
3570    symlinksEnabled = true;
3571  }
3572
3573  /**
3574   * Get the StorageStatistics for this FileSystem object.  These statistics are
3575   * per-instance.  They are not shared with any other FileSystem object.
3576   *
3577   * <p>This is a default method which is intended to be overridden by
3578   * subclasses. The default implementation returns an empty storage statistics
3579   * object.</p>
3580   *
3581   * @return    The StorageStatistics for this FileSystem instance.
3582   *            Will never be null.
3583   */
3584  public StorageStatistics getStorageStatistics() {
3585    return new EmptyStorageStatistics(getUri().toString());
3586  }
3587
3588  /**
3589   * Get the global storage statistics.
3590   */
3591  public static GlobalStorageStatistics getGlobalStorageStatistics() {
3592    return GlobalStorageStatistics.INSTANCE;
3593  }
3594}