001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 */
019
020package org.apache.hadoop.fs.adl;
021
022import java.io.IOException;
023import java.net.URI;
024import java.util.ArrayList;
025import java.util.EnumSet;
026import java.util.List;
027
028import com.google.common.annotations.VisibleForTesting;
029import com.microsoft.azure.datalake.store.ADLStoreClient;
030import com.microsoft.azure.datalake.store.ADLStoreOptions;
031import com.microsoft.azure.datalake.store.DirectoryEntry;
032import com.microsoft.azure.datalake.store.DirectoryEntryType;
033import com.microsoft.azure.datalake.store.IfExists;
034import com.microsoft.azure.datalake.store.LatencyTracker;
035import com.microsoft.azure.datalake.store.UserGroupRepresentation;
036import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
037import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
038import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider;
039
040import org.apache.commons.lang.StringUtils;
041import org.apache.hadoop.classification.InterfaceAudience;
042import org.apache.hadoop.classification.InterfaceStability;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.BlockLocation;
045import org.apache.hadoop.fs.ContentSummary;
046import org.apache.hadoop.fs.CreateFlag;
047import org.apache.hadoop.fs.FSDataInputStream;
048import org.apache.hadoop.fs.FSDataOutputStream;
049import org.apache.hadoop.fs.FileStatus;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.InvalidPathException;
052import org.apache.hadoop.fs.Options;
053import org.apache.hadoop.fs.Options.Rename;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider;
056import org.apache.hadoop.fs.permission.AclEntry;
057import org.apache.hadoop.fs.permission.AclStatus;
058import org.apache.hadoop.fs.permission.FsAction;
059import org.apache.hadoop.fs.permission.FsPermission;
060import org.apache.hadoop.security.AccessControlException;
061import org.apache.hadoop.security.ProviderUtils;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.util.Progressable;
064import org.apache.hadoop.util.ReflectionUtils;
065import org.apache.hadoop.util.VersionInfo;
066
067import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
068
069/**
070 * A FileSystem to access Azure Data Lake Store.
071 */
072@InterfaceAudience.Public
073@InterfaceStability.Evolving
074public class AdlFileSystem extends FileSystem {
075  public static final String SCHEME = "adl";
076  static final int DEFAULT_PORT = 443;
077  private URI uri;
078  private String userName;
079  private boolean overrideOwner;
080  private ADLStoreClient adlClient;
081  private Path workingDirectory;
082  private boolean aclBitStatus;
083  private UserGroupRepresentation oidOrUpn;
084
085
086  // retained for tests
087  private AccessTokenProvider tokenProvider;
088  private AzureADTokenProvider azureTokenProvider;
089
090  @Override
091  public String getScheme() {
092    return SCHEME;
093  }
094
095  public URI getUri() {
096    return uri;
097  }
098
099  @Override
100  public int getDefaultPort() {
101    return DEFAULT_PORT;
102  }
103
104  @Override
105  public boolean supportsSymlinks() {
106    return false;
107  }
108
109  /**
110   * Called after a new FileSystem instance is constructed.
111   *
112   * @param storeUri a uri whose authority section names the host, port, etc.
113   *                 for this FileSystem
114   * @param conf     the configuration
115   */
116  @Override
117  public void initialize(URI storeUri, Configuration conf) throws IOException {
118    super.initialize(storeUri, conf);
119    this.setConf(conf);
120    this.uri = URI
121        .create(storeUri.getScheme() + "://" + storeUri.getAuthority());
122
123    try {
124      userName = UserGroupInformation.getCurrentUser().getShortUserName();
125    } catch (IOException e) {
126      userName = "hadoop";
127    }
128
129    this.setWorkingDirectory(getHomeDirectory());
130
131    overrideOwner = getConf().getBoolean(ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER,
132        ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
133
134    aclBitStatus = conf.getBoolean(ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION,
135        ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION_DEFAULT);
136
137    String accountFQDN = null;
138    String mountPoint = null;
139    String hostname = storeUri.getHost();
140    if (!hostname.contains(".") && !hostname.equalsIgnoreCase(
141        "localhost")) {  // this is a symbolic name. Resolve it.
142      String hostNameProperty = "dfs.adls." + hostname + ".hostname";
143      String mountPointProperty = "dfs.adls." + hostname + ".mountpoint";
144      accountFQDN = getNonEmptyVal(conf, hostNameProperty);
145      mountPoint = getNonEmptyVal(conf, mountPointProperty);
146    } else {
147      accountFQDN = hostname;
148    }
149
150    if (storeUri.getPort() > 0) {
151      accountFQDN = accountFQDN + ":" + storeUri.getPort();
152    }
153
154    adlClient = ADLStoreClient
155        .createClient(accountFQDN, getAccessTokenProvider(conf));
156
157    ADLStoreOptions options = new ADLStoreOptions();
158    options.enableThrowingRemoteExceptions();
159
160    if (getTransportScheme().equalsIgnoreCase(INSECURE_TRANSPORT_SCHEME)) {
161      options.setInsecureTransport();
162    }
163
164    if (mountPoint != null) {
165      options.setFilePathPrefix(mountPoint);
166    }
167
168    String clusterName = conf.get(ADL_EVENTS_TRACKING_CLUSTERNAME, "UNKNOWN");
169    String clusterType = conf.get(ADL_EVENTS_TRACKING_CLUSTERTYPE, "UNKNOWN");
170
171    String clientVersion = ADL_HADOOP_CLIENT_NAME + (StringUtils
172        .isEmpty(VersionInfo.getVersion().trim()) ?
173        ADL_HADOOP_CLIENT_VERSION.trim() :
174        VersionInfo.getVersion().trim());
175    options.setUserAgentSuffix(clientVersion + "/" +
176        VersionInfo.getVersion().trim() + "/" + clusterName + "/"
177        + clusterType);
178
179    adlClient.setOptions(options);
180
181    boolean trackLatency = conf
182        .getBoolean(LATENCY_TRACKER_KEY, LATENCY_TRACKER_DEFAULT);
183    if (!trackLatency) {
184      LatencyTracker.disable();
185    }
186
187    boolean enableUPN = conf.getBoolean(ADL_ENABLEUPN_FOR_OWNERGROUP_KEY,
188        ADL_ENABLEUPN_FOR_OWNERGROUP_DEFAULT);
189    oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
190        UserGroupRepresentation.OID;
191  }
192
193  /**
194   * This method is provided for convenience for derived classes to define
195   * custom {@link AzureADTokenProvider} instance.
196   *
197   * In order to ensure secure hadoop infrastructure and user context for which
198   * respective {@link AdlFileSystem} instance is initialized,
199   * Loading {@link AzureADTokenProvider} is not sufficient.
200   *
201   * The order of loading {@link AzureADTokenProvider} is to first invoke
202   * {@link #getCustomAccessTokenProvider(Configuration)}, If method return null
203   * which means no implementation provided by derived classes, then
204   * configuration object is loaded to retrieve token configuration as specified
205   * is documentation.
206   *
207   * Custom token management takes the higher precedence during initialization.
208   *
209   * @param conf Configuration object
210   * @return null if the no custom {@link AzureADTokenProvider} token management
211   * is specified.
212   * @throws IOException if failed to initialize token provider.
213   */
214  protected synchronized AzureADTokenProvider getCustomAccessTokenProvider(
215      Configuration conf) throws IOException {
216    String className = getNonEmptyVal(conf, AZURE_AD_TOKEN_PROVIDER_CLASS_KEY);
217
218    Class<? extends AzureADTokenProvider> azureADTokenProviderClass =
219        conf.getClass(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY, null,
220            AzureADTokenProvider.class);
221    if (azureADTokenProviderClass == null) {
222      throw new IllegalArgumentException(
223          "Configuration  " + className + " " + "not defined/accessible.");
224    }
225
226    azureTokenProvider = ReflectionUtils
227        .newInstance(azureADTokenProviderClass, conf);
228    if (azureTokenProvider == null) {
229      throw new IllegalArgumentException("Failed to initialize " + className);
230    }
231
232    azureTokenProvider.initialize(conf);
233    return azureTokenProvider;
234  }
235
236  private AccessTokenProvider getAccessTokenProvider(Configuration config)
237      throws IOException {
238    Configuration conf = ProviderUtils.excludeIncompatibleCredentialProviders(
239        config, AdlFileSystem.class);
240    TokenProviderType type = conf.getEnum(
241        AdlConfKeys.AZURE_AD_TOKEN_PROVIDER_TYPE_KEY, TokenProviderType.Custom);
242
243    switch (type) {
244    case RefreshToken:
245      tokenProvider = getConfRefreshTokenBasedTokenProvider(conf);
246      break;
247    case ClientCredential:
248      tokenProvider = getConfCredentialBasedTokenProvider(conf);
249      break;
250    case Custom:
251    default:
252      AzureADTokenProvider azureADTokenProvider = getCustomAccessTokenProvider(
253          conf);
254      tokenProvider = new SdkTokenProviderAdapter(azureADTokenProvider);
255      break;
256    }
257
258    return tokenProvider;
259  }
260
261  private AccessTokenProvider getConfCredentialBasedTokenProvider(
262      Configuration conf) throws IOException {
263    String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY);
264    String refreshUrl = getPasswordString(conf, AZURE_AD_REFRESH_URL_KEY);
265    String clientSecret = getPasswordString(conf, AZURE_AD_CLIENT_SECRET_KEY);
266    return new ClientCredsTokenProvider(refreshUrl, clientId, clientSecret);
267  }
268
269  private AccessTokenProvider getConfRefreshTokenBasedTokenProvider(
270      Configuration conf) throws IOException {
271    String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY);
272    String refreshToken = getPasswordString(conf, AZURE_AD_REFRESH_TOKEN_KEY);
273    return new RefreshTokenBasedTokenProvider(clientId, refreshToken);
274  }
275
276  @VisibleForTesting
277  AccessTokenProvider getTokenProvider() {
278    return tokenProvider;
279  }
280
281  @VisibleForTesting
282  AzureADTokenProvider getAzureTokenProvider() {
283    return azureTokenProvider;
284  }
285
286  /**
287   * Constructing home directory locally is fine as long as Hadoop
288   * local user name and ADL user name relationship story is not fully baked
289   * yet.
290   *
291   * @return Hadoop local user home directory.
292   */
293  @Override
294  public Path getHomeDirectory() {
295    return makeQualified(new Path("/user/" + userName));
296  }
297
298  /**
299   * Create call semantic is handled differently in case of ADL. Create
300   * semantics is translated to Create/Append
301   * semantics.
302   * 1. No dedicated connection to server.
303   * 2. Buffering is locally done, Once buffer is full or flush is invoked on
304   * the by the caller. All the pending
305   * data is pushed to ADL as APPEND operation code.
306   * 3. On close - Additional call is send to server to close the stream, and
307   * release lock from the stream.
308   *
309   * Necessity of Create/Append semantics is
310   * 1. ADL backend server does not allow idle connection for longer duration
311   * . In case of slow writer scenario,
312   * observed connection timeout/Connection reset causing occasional job
313   * failures.
314   * 2. Performance boost to jobs which are slow writer, avoided network latency
315   * 3. ADL equally better performing with multiple of 4MB chunk as append
316   * calls.
317   *
318   * @param f           File path
319   * @param permission  Access permission for the newly created file
320   * @param overwrite   Remove existing file and recreate new one if true
321   *                    otherwise throw error if file exist
322   * @param bufferSize  Buffer size, ADL backend does not honour
323   * @param replication Replication count, ADL backend does not honour
324   * @param blockSize   Block size, ADL backend does not honour
325   * @param progress    Progress indicator
326   * @return FSDataOutputStream OutputStream on which application can push
327   * stream of bytes
328   * @throws IOException when system error, internal server error or user error
329   */
330  @Override
331  public FSDataOutputStream create(Path f, FsPermission permission,
332      boolean overwrite, int bufferSize, short replication, long blockSize,
333      Progressable progress) throws IOException {
334    statistics.incrementWriteOps(1);
335    IfExists overwriteRule = overwrite ? IfExists.OVERWRITE : IfExists.FAIL;
336    return new FSDataOutputStream(new AdlFsOutputStream(adlClient
337        .createFile(toRelativeFilePath(f), overwriteRule,
338            Integer.toOctalString(applyUMask(permission).toShort()), true),
339        getConf()), this.statistics);
340  }
341
342  /**
343   * Opens an FSDataOutputStream at the indicated Path with write-progress
344   * reporting. Same as create(), except fails if parent directory doesn't
345   * already exist.
346   *
347   * @param f           the file name to open
348   * @param permission  Access permission for the newly created file
349   * @param flags       {@link CreateFlag}s to use for this stream.
350   * @param bufferSize  the size of the buffer to be used. ADL backend does
351   *                    not honour
352   * @param replication required block replication for the file. ADL backend
353   *                    does not honour
354   * @param blockSize   Block size, ADL backend does not honour
355   * @param progress    Progress indicator
356   * @throws IOException when system error, internal server error or user error
357   * @see #setPermission(Path, FsPermission)
358   * @deprecated API only for 0.20-append
359   */
360  @Override
361  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
362      EnumSet<CreateFlag> flags, int bufferSize, short replication,
363      long blockSize, Progressable progress) throws IOException {
364    statistics.incrementWriteOps(1);
365    IfExists overwriteRule = IfExists.FAIL;
366    for (CreateFlag flag : flags) {
367      if (flag == CreateFlag.OVERWRITE) {
368        overwriteRule = IfExists.OVERWRITE;
369        break;
370      }
371    }
372
373    return new FSDataOutputStream(new AdlFsOutputStream(adlClient
374        .createFile(toRelativeFilePath(f), overwriteRule,
375            Integer.toOctalString(applyUMask(permission).toShort()), false),
376        getConf()), this.statistics);
377  }
378
379  /**
380   * Append to an existing file (optional operation).
381   *
382   * @param f          the existing file to be appended.
383   * @param bufferSize the size of the buffer to be used. ADL backend does
384   *                   not honour
385   * @param progress   Progress indicator
386   * @throws IOException when system error, internal server error or user error
387   */
388  @Override
389  public FSDataOutputStream append(Path f, int bufferSize,
390      Progressable progress) throws IOException {
391    statistics.incrementWriteOps(1);
392    return new FSDataOutputStream(
393        new AdlFsOutputStream(adlClient.getAppendStream(toRelativeFilePath(f)),
394            getConf()), this.statistics);
395  }
396
397  /**
398   * Azure data lake does not support user configuration for data replication
399   * hence not leaving system to query on
400   * azure data lake.
401   *
402   * Stub implementation
403   *
404   * @param p           Not honoured
405   * @param replication Not honoured
406   * @return True hard coded since ADL file system does not support
407   * replication configuration
408   * @throws IOException No exception would not thrown in this case however
409   *                     aligning with parent api definition.
410   */
411  @Override
412  public boolean setReplication(final Path p, final short replication)
413      throws IOException {
414    statistics.incrementWriteOps(1);
415    return true;
416  }
417
418  /**
419   * Open call semantic is handled differently in case of ADL. Instead of
420   * network stream is returned to the user,
421   * Overridden FsInputStream is returned.
422   *
423   * @param f          File path
424   * @param buffersize Buffer size, Not honoured
425   * @return FSDataInputStream InputStream on which application can read
426   * stream of bytes
427   * @throws IOException when system error, internal server error or user error
428   */
429  @Override
430  public FSDataInputStream open(final Path f, final int buffersize)
431      throws IOException {
432    statistics.incrementReadOps(1);
433    return new FSDataInputStream(
434        new AdlFsInputStream(adlClient.getReadStream(toRelativeFilePath(f)),
435            statistics, getConf()));
436  }
437
438  /**
439   * Return a file status object that represents the path.
440   *
441   * @param f The path we want information from
442   * @return a FileStatus object
443   * @throws IOException when the path does not exist or any other error;
444   *                     IOException see specific implementation
445   */
446  @Override
447  public FileStatus getFileStatus(final Path f) throws IOException {
448    statistics.incrementReadOps(1);
449    DirectoryEntry entry =
450        adlClient.getDirectoryEntry(toRelativeFilePath(f), oidOrUpn);
451    return toFileStatus(entry, f);
452  }
453
454  /**
455   * List the statuses of the files/directories in the given path if the path is
456   * a directory.
457   *
458   * @param f given path
459   * @return the statuses of the files/directories in the given patch
460   * @throws IOException when the path does not exist or any other error;
461   *                     IOException see specific implementation
462   */
463  @Override
464  public FileStatus[] listStatus(final Path f) throws IOException {
465    statistics.incrementReadOps(1);
466    List<DirectoryEntry> entries =
467        adlClient.enumerateDirectory(toRelativeFilePath(f), oidOrUpn);
468    return toFileStatuses(entries, f);
469  }
470
471  /**
472   * Renames Path src to Path dst.  Can take place on local fs
473   * or remote DFS.
474   *
475   * ADLS support POSIX standard for rename operation.
476   *
477   * @param src path to be renamed
478   * @param dst new path after rename
479   * @return true if rename is successful
480   * @throws IOException on failure
481   */
482  @Override
483  public boolean rename(final Path src, final Path dst) throws IOException {
484    statistics.incrementWriteOps(1);
485    if (toRelativeFilePath(src).equals("/")) {
486      return false;
487    }
488
489    return adlClient.rename(toRelativeFilePath(src), toRelativeFilePath(dst));
490  }
491
492  @Override
493  @Deprecated
494  public void rename(final Path src, final Path dst,
495      final Options.Rename... options) throws IOException {
496    statistics.incrementWriteOps(1);
497    boolean overwrite = false;
498    for (Rename renameOption : options) {
499      if (renameOption == Rename.OVERWRITE) {
500        overwrite = true;
501        break;
502      }
503    }
504    adlClient
505        .rename(toRelativeFilePath(src), toRelativeFilePath(dst), overwrite);
506  }
507
508  /**
509   * Concat existing files together.
510   *
511   * @param trg  the path to the target destination.
512   * @param srcs the paths to the sources to use for the concatenation.
513   * @throws IOException when system error, internal server error or user error
514   */
515  @Override
516  public void concat(final Path trg, final Path[] srcs) throws IOException {
517    statistics.incrementWriteOps(1);
518    List<String> sourcesList = new ArrayList<String>();
519    for (Path entry : srcs) {
520      sourcesList.add(toRelativeFilePath(entry));
521    }
522    adlClient.concatenateFiles(toRelativeFilePath(trg), sourcesList);
523  }
524
525  /**
526   * Delete a file.
527   *
528   * @param path      the path to delete.
529   * @param recursive if path is a directory and set to
530   *                  true, the directory is deleted else throws an exception.
531   *                  In case of a file the recursive can be set to either
532   *                  true or false.
533   * @return true if delete is successful else false.
534   * @throws IOException when system error, internal server error or user error
535   */
536  @Override
537  public boolean delete(final Path path, final boolean recursive)
538      throws IOException {
539    statistics.incrementWriteOps(1);
540    String relativePath = toRelativeFilePath(path);
541    // Delete on root directory not supported.
542    if (relativePath.equals("/")) {
543      // This is important check after recent commit
544      // HADOOP-12977 and HADOOP-13716 validates on root for
545      // 1. if root is empty and non recursive delete then return false.
546      // 2. if root is non empty and non recursive delete then throw exception.
547      if (!recursive
548          && adlClient.enumerateDirectory(toRelativeFilePath(path), 1).size()
549          > 0) {
550        throw new IOException("Delete on root is not supported.");
551      }
552      return false;
553    }
554
555    return recursive ?
556        adlClient.deleteRecursive(relativePath) :
557        adlClient.delete(relativePath);
558  }
559
560  /**
561   * Make the given file and all non-existent parents into
562   * directories. Has the semantics of Unix 'mkdir -p'.
563   * Existence of the directory hierarchy is not an error.
564   *
565   * @param path       path to create
566   * @param permission to apply to path
567   */
568  @Override
569  public boolean mkdirs(final Path path, final FsPermission permission)
570      throws IOException {
571    statistics.incrementWriteOps(1);
572    return adlClient.createDirectory(toRelativeFilePath(path),
573        Integer.toOctalString(applyUMask(permission).toShort()));
574  }
575
576  private FileStatus[] toFileStatuses(final List<DirectoryEntry> entries,
577      final Path parent) {
578    FileStatus[] fileStatuses = new FileStatus[entries.size()];
579    int index = 0;
580    for (DirectoryEntry entry : entries) {
581      FileStatus status = toFileStatus(entry, parent);
582      if (!(entry.name == null || entry.name == "")) {
583        status.setPath(
584            new Path(parent.makeQualified(uri, workingDirectory), entry.name));
585      }
586
587      fileStatuses[index++] = status;
588    }
589
590    return fileStatuses;
591  }
592
593  private FsPermission applyUMask(FsPermission permission) {
594    if (permission == null) {
595      permission = FsPermission.getDefault();
596    }
597    return permission.applyUMask(FsPermission.getUMask(getConf()));
598  }
599
600  private FileStatus toFileStatus(final DirectoryEntry entry, final Path f) {
601    boolean isDirectory = entry.type == DirectoryEntryType.DIRECTORY;
602    long lastModificationData = entry.lastModifiedTime.getTime();
603    long lastAccessTime = entry.lastAccessTime.getTime();
604    // set aclBit from ADLS backend response if
605    // ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION is true.
606    final boolean aclBit = aclBitStatus ? entry.aclBit : false;
607
608    FsPermission permission = new AdlPermission(aclBit,
609        Short.valueOf(entry.permission, 8));
610    String user = entry.user;
611    String group = entry.group;
612
613    FileStatus status;
614    if (overrideOwner) {
615      status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR,
616          ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission,
617          userName, "hdfs", this.makeQualified(f));
618    } else {
619      status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR,
620          ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission,
621          user, group, this.makeQualified(f));
622    }
623
624    return status;
625  }
626
627  /**
628   * Set owner of a path (i.e. a file or a directory).
629   * The parameters owner and group cannot both be null.
630   *
631   * @param path  The path
632   * @param owner If it is null, the original username remains unchanged.
633   * @param group If it is null, the original groupname remains unchanged.
634   */
635  @Override
636  public void setOwner(final Path path, final String owner, final String group)
637      throws IOException {
638    statistics.incrementWriteOps(1);
639    adlClient.setOwner(toRelativeFilePath(path), owner, group);
640  }
641
642  /**
643   * Set permission of a path.
644   *
645   * @param path       The path
646   * @param permission Access permission
647   */
648  @Override
649  public void setPermission(final Path path, final FsPermission permission)
650      throws IOException {
651    statistics.incrementWriteOps(1);
652    adlClient.setPermission(toRelativeFilePath(path),
653        Integer.toOctalString(permission.toShort()));
654  }
655
656  /**
657   * Modifies ACL entries of files and directories.  This method can add new ACL
658   * entries or modify the permissions on existing ACL entries.  All existing
659   * ACL entries that are not specified in this call are retained without
660   * changes.  (Modifications are merged into the current ACL.)
661   *
662   * @param path    Path to modify
663   * @param aclSpec List of AclEntry describing modifications
664   * @throws IOException if an ACL could not be modified
665   */
666  @Override
667  public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
668      throws IOException {
669    statistics.incrementWriteOps(1);
670    List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
671        ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
672    for (AclEntry aclEntry : aclSpec) {
673      msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
674          .parseAclEntry(aclEntry.toString()));
675    }
676    adlClient.modifyAclEntries(toRelativeFilePath(path), msAclEntries);
677  }
678
679  /**
680   * Removes ACL entries from files and directories.  Other ACL entries are
681   * retained.
682   *
683   * @param path    Path to modify
684   * @param aclSpec List of AclEntry describing entries to remove
685   * @throws IOException if an ACL could not be modified
686   */
687  @Override
688  public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
689      throws IOException {
690    statistics.incrementWriteOps(1);
691    List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
692        ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
693    for (AclEntry aclEntry : aclSpec) {
694      msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
695          .parseAclEntry(aclEntry.toString(), true));
696    }
697    adlClient.removeAclEntries(toRelativeFilePath(path), msAclEntries);
698  }
699
700  /**
701   * Removes all default ACL entries from files and directories.
702   *
703   * @param path Path to modify
704   * @throws IOException if an ACL could not be modified
705   */
706  @Override
707  public void removeDefaultAcl(final Path path) throws IOException {
708    statistics.incrementWriteOps(1);
709    adlClient.removeDefaultAcls(toRelativeFilePath(path));
710  }
711
712  /**
713   * Removes all but the base ACL entries of files and directories.  The entries
714   * for user, group, and others are retained for compatibility with permission
715   * bits.
716   *
717   * @param path Path to modify
718   * @throws IOException if an ACL could not be removed
719   */
720  @Override
721  public void removeAcl(final Path path) throws IOException {
722    statistics.incrementWriteOps(1);
723    adlClient.removeAllAcls(toRelativeFilePath(path));
724  }
725
726  /**
727   * Fully replaces ACL of files and directories, discarding all existing
728   * entries.
729   *
730   * @param path    Path to modify
731   * @param aclSpec List of AclEntry describing modifications, must include
732   *                entries for user, group, and others for compatibility with
733   *                permission bits.
734   * @throws IOException if an ACL could not be modified
735   */
736  @Override
737  public void setAcl(final Path path, final List<AclEntry> aclSpec)
738      throws IOException {
739    statistics.incrementWriteOps(1);
740    List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
741        ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
742    for (AclEntry aclEntry : aclSpec) {
743      msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
744          .parseAclEntry(aclEntry.toString()));
745    }
746
747    adlClient.setAcl(toRelativeFilePath(path), msAclEntries);
748  }
749
750  /**
751   * Gets the ACL of a file or directory.
752   *
753   * @param path Path to get
754   * @return AclStatus describing the ACL of the file or directory
755   * @throws IOException if an ACL could not be read
756   */
757  @Override
758  public AclStatus getAclStatus(final Path path) throws IOException {
759    statistics.incrementReadOps(1);
760    com.microsoft.azure.datalake.store.acl.AclStatus adlStatus =
761        adlClient.getAclStatus(toRelativeFilePath(path), oidOrUpn);
762    AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
763    aclStatusBuilder.owner(adlStatus.owner);
764    aclStatusBuilder.group(adlStatus.group);
765    aclStatusBuilder.setPermission(
766        new FsPermission(Short.valueOf(adlStatus.octalPermissions, 8)));
767    aclStatusBuilder.stickyBit(adlStatus.stickyBit);
768    String aclListString = com.microsoft.azure.datalake.store.acl.AclEntry
769        .aclListToString(adlStatus.aclSpec);
770    List<AclEntry> aclEntries = AclEntry.parseAclSpec(aclListString, true);
771    aclStatusBuilder.addEntries(aclEntries);
772    return aclStatusBuilder.build();
773  }
774
775  /**
776   * Checks if the user can access a path.  The mode specifies which access
777   * checks to perform.  If the requested permissions are granted, then the
778   * method returns normally.  If access is denied, then the method throws an
779   * {@link AccessControlException}.
780   *
781   * @param path Path to check
782   * @param mode type of access to check
783   * @throws AccessControlException        if access is denied
784   * @throws java.io.FileNotFoundException if the path does not exist
785   * @throws IOException                   see specific implementation
786   */
787  @Override
788  public void access(final Path path, FsAction mode) throws IOException {
789    statistics.incrementReadOps(1);
790    if (!adlClient.checkAccess(toRelativeFilePath(path), mode.SYMBOL)) {
791      throw new AccessControlException("Access Denied : " + path.toString());
792    }
793  }
794
795  /**
796   * Return the {@link ContentSummary} of a given {@link Path}.
797   *
798   * @param f path to use
799   */
800  @Override
801  public ContentSummary getContentSummary(Path f) throws IOException {
802    statistics.incrementReadOps(1);
803    com.microsoft.azure.datalake.store.ContentSummary msSummary = adlClient
804        .getContentSummary(toRelativeFilePath(f));
805    return new ContentSummary(msSummary.length, msSummary.fileCount, msSummary.directoryCount, -1L,
806        msSummary.spaceConsumed, -1L);
807  }
808
809  @VisibleForTesting
810  protected String getTransportScheme() {
811    return SECURE_TRANSPORT_SCHEME;
812  }
813
814  @VisibleForTesting
815  String toRelativeFilePath(Path path) {
816    return path.makeQualified(uri, workingDirectory).toUri().getPath();
817  }
818
819  /**
820   * Get the current working directory for the given file system.
821   *
822   * @return the directory pathname
823   */
824  @Override
825  public Path getWorkingDirectory() {
826    return workingDirectory;
827  }
828
829  /**
830   * Set the current working directory for the given file system. All relative
831   * paths will be resolved relative to it.
832   *
833   * @param dir Working directory path.
834   */
835  @Override
836  public void setWorkingDirectory(final Path dir) {
837    if (dir == null) {
838      throw new InvalidPathException("Working directory cannot be set to NULL");
839    }
840
841    /**
842     * Do not validate the scheme and URI of the passsed parameter. When Adls
843     * runs as additional file system, working directory set has the default
844     * file system scheme and uri.
845     *
846     * Found a problem during PIG execution in
847     * https://github.com/apache/pig/blob/branch-0
848     * .15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer
849     * /PigInputFormat.java#L235
850     * However similar problem would be present in other application so
851     * defaulting to build working directory using relative path only.
852     */
853    this.workingDirectory = this.makeAbsolute(dir);
854  }
855
856  /**
857   * Return the number of bytes that large input files should be optimally
858   * be split into to minimize i/o time.
859   *
860   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
861   */
862  @Deprecated
863  public long getDefaultBlockSize() {
864    return ADL_BLOCK_SIZE;
865  }
866
867  /**
868   * Return the number of bytes that large input files should be optimally
869   * be split into to minimize i/o time.  The given path will be used to
870   * locate the actual filesystem.  The full path does not have to exist.
871   *
872   * @param f path of file
873   * @return the default block size for the path's filesystem
874   */
875  public long getDefaultBlockSize(Path f) {
876    return getDefaultBlockSize();
877  }
878
879  /**
880   * Get the block size.
881   * @param f the filename
882   * @return the number of bytes in a block
883   */
884  /**
885   * @deprecated Use getFileStatus() instead
886   */
887  @Deprecated
888  public long getBlockSize(Path f) throws IOException {
889    return ADL_BLOCK_SIZE;
890  }
891
892  @Override
893  public BlockLocation[] getFileBlockLocations(final FileStatus status,
894      final long offset, final long length) throws IOException {
895    if (status == null) {
896      return null;
897    }
898
899    if ((offset < 0) || (length < 0)) {
900      throw new IllegalArgumentException("Invalid start or len parameter");
901    }
902
903    if (status.getLen() < offset) {
904      return new BlockLocation[0];
905    }
906
907    final String[] name = {"localhost"};
908    final String[] host = {"localhost"};
909    long blockSize = ADL_BLOCK_SIZE;
910    int numberOfLocations =
911        (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
912    BlockLocation[] locations = new BlockLocation[numberOfLocations];
913    for (int i = 0; i < locations.length; i++) {
914      long currentOffset = offset + (i * blockSize);
915      long currentLength = Math.min(blockSize, offset + length - currentOffset);
916      locations[i] = new BlockLocation(name, host, currentOffset,
917          currentLength);
918    }
919
920    return locations;
921  }
922
923  @Override
924  public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
925      final long length) throws IOException {
926    // read ops incremented in getFileStatus
927    FileStatus fileStatus = getFileStatus(p);
928    return getFileBlockLocations(fileStatus, offset, length);
929  }
930
931  /**
932   * Get replication.
933   *
934   * @param src file name
935   * @return file replication
936   * @deprecated Use getFileStatus() instead
937   */
938  @Deprecated
939  public short getReplication(Path src) {
940    return ADL_REPLICATION_FACTOR;
941  }
942
943  private Path makeAbsolute(Path path) {
944    return path.isAbsolute() ? path : new Path(this.workingDirectory, path);
945  }
946
947  private static String getNonEmptyVal(Configuration conf, String key) {
948    String value = conf.get(key);
949    if (StringUtils.isEmpty(value)) {
950      throw new IllegalArgumentException(
951          "No value for " + key + " found in conf file.");
952    }
953    return value;
954  }
955
956  /**
957   * A wrapper of {@link Configuration#getPassword(String)}. It returns
958   * <code>String</code> instead of <code>char[]</code>.
959   *
960   * @param conf the configuration
961   * @param key the property key
962   * @return the password string
963   * @throws IOException if the password was not found
964   */
965  private static String getPasswordString(Configuration conf, String key)
966      throws IOException {
967    char[] passchars = conf.getPassword(key);
968    if (passchars == null) {
969      throw new IOException("Password " + key + " not found");
970    }
971    return new String(passchars);
972  }
973
974  @VisibleForTesting
975  public void setUserGroupRepresentationAsUPN(boolean enableUPN) {
976    oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
977        UserGroupRepresentation.OID;
978  }
979}