001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 */ 019 020package org.apache.hadoop.fs.adl; 021 022import java.io.IOException; 023import java.net.URI; 024import java.util.ArrayList; 025import java.util.EnumSet; 026import java.util.List; 027 028import com.google.common.annotations.VisibleForTesting; 029import com.microsoft.azure.datalake.store.ADLStoreClient; 030import com.microsoft.azure.datalake.store.ADLStoreOptions; 031import com.microsoft.azure.datalake.store.DirectoryEntry; 032import com.microsoft.azure.datalake.store.DirectoryEntryType; 033import com.microsoft.azure.datalake.store.IfExists; 034import com.microsoft.azure.datalake.store.LatencyTracker; 035import com.microsoft.azure.datalake.store.UserGroupRepresentation; 036import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider; 037import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider; 038import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider; 039 040import org.apache.commons.lang.StringUtils; 041import org.apache.hadoop.classification.InterfaceAudience; 042import org.apache.hadoop.classification.InterfaceStability; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.BlockLocation; 045import org.apache.hadoop.fs.ContentSummary; 046import org.apache.hadoop.fs.CreateFlag; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FSDataOutputStream; 049import org.apache.hadoop.fs.FileStatus; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.InvalidPathException; 052import org.apache.hadoop.fs.Options; 053import org.apache.hadoop.fs.Options.Rename; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider; 056import org.apache.hadoop.fs.permission.AclEntry; 057import org.apache.hadoop.fs.permission.AclStatus; 058import org.apache.hadoop.fs.permission.FsAction; 059import org.apache.hadoop.fs.permission.FsPermission; 060import org.apache.hadoop.security.AccessControlException; 061import org.apache.hadoop.security.ProviderUtils; 062import org.apache.hadoop.security.UserGroupInformation; 063import org.apache.hadoop.util.Progressable; 064import org.apache.hadoop.util.ReflectionUtils; 065import org.apache.hadoop.util.VersionInfo; 066 067import static org.apache.hadoop.fs.adl.AdlConfKeys.*; 068 069/** 070 * A FileSystem to access Azure Data Lake Store. 071 */ 072@InterfaceAudience.Public 073@InterfaceStability.Evolving 074public class AdlFileSystem extends FileSystem { 075 public static final String SCHEME = "adl"; 076 static final int DEFAULT_PORT = 443; 077 private URI uri; 078 private String userName; 079 private boolean overrideOwner; 080 private ADLStoreClient adlClient; 081 private Path workingDirectory; 082 private boolean aclBitStatus; 083 private UserGroupRepresentation oidOrUpn; 084 085 086 // retained for tests 087 private AccessTokenProvider tokenProvider; 088 private AzureADTokenProvider azureTokenProvider; 089 090 @Override 091 public String getScheme() { 092 return SCHEME; 093 } 094 095 public URI getUri() { 096 return uri; 097 } 098 099 @Override 100 public int getDefaultPort() { 101 return DEFAULT_PORT; 102 } 103 104 @Override 105 public boolean supportsSymlinks() { 106 return false; 107 } 108 109 /** 110 * Called after a new FileSystem instance is constructed. 111 * 112 * @param storeUri a uri whose authority section names the host, port, etc. 113 * for this FileSystem 114 * @param conf the configuration 115 */ 116 @Override 117 public void initialize(URI storeUri, Configuration conf) throws IOException { 118 super.initialize(storeUri, conf); 119 this.setConf(conf); 120 this.uri = URI 121 .create(storeUri.getScheme() + "://" + storeUri.getAuthority()); 122 123 try { 124 userName = UserGroupInformation.getCurrentUser().getShortUserName(); 125 } catch (IOException e) { 126 userName = "hadoop"; 127 } 128 129 this.setWorkingDirectory(getHomeDirectory()); 130 131 overrideOwner = getConf().getBoolean(ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER, 132 ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT); 133 134 aclBitStatus = conf.getBoolean(ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION, 135 ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION_DEFAULT); 136 137 String accountFQDN = null; 138 String mountPoint = null; 139 String hostname = storeUri.getHost(); 140 if (!hostname.contains(".") && !hostname.equalsIgnoreCase( 141 "localhost")) { // this is a symbolic name. Resolve it. 142 String hostNameProperty = "dfs.adls." + hostname + ".hostname"; 143 String mountPointProperty = "dfs.adls." + hostname + ".mountpoint"; 144 accountFQDN = getNonEmptyVal(conf, hostNameProperty); 145 mountPoint = getNonEmptyVal(conf, mountPointProperty); 146 } else { 147 accountFQDN = hostname; 148 } 149 150 if (storeUri.getPort() > 0) { 151 accountFQDN = accountFQDN + ":" + storeUri.getPort(); 152 } 153 154 adlClient = ADLStoreClient 155 .createClient(accountFQDN, getAccessTokenProvider(conf)); 156 157 ADLStoreOptions options = new ADLStoreOptions(); 158 options.enableThrowingRemoteExceptions(); 159 160 if (getTransportScheme().equalsIgnoreCase(INSECURE_TRANSPORT_SCHEME)) { 161 options.setInsecureTransport(); 162 } 163 164 if (mountPoint != null) { 165 options.setFilePathPrefix(mountPoint); 166 } 167 168 String clusterName = conf.get(ADL_EVENTS_TRACKING_CLUSTERNAME, "UNKNOWN"); 169 String clusterType = conf.get(ADL_EVENTS_TRACKING_CLUSTERTYPE, "UNKNOWN"); 170 171 String clientVersion = ADL_HADOOP_CLIENT_NAME + (StringUtils 172 .isEmpty(VersionInfo.getVersion().trim()) ? 173 ADL_HADOOP_CLIENT_VERSION.trim() : 174 VersionInfo.getVersion().trim()); 175 options.setUserAgentSuffix(clientVersion + "/" + 176 VersionInfo.getVersion().trim() + "/" + clusterName + "/" 177 + clusterType); 178 179 adlClient.setOptions(options); 180 181 boolean trackLatency = conf 182 .getBoolean(LATENCY_TRACKER_KEY, LATENCY_TRACKER_DEFAULT); 183 if (!trackLatency) { 184 LatencyTracker.disable(); 185 } 186 187 boolean enableUPN = conf.getBoolean(ADL_ENABLEUPN_FOR_OWNERGROUP_KEY, 188 ADL_ENABLEUPN_FOR_OWNERGROUP_DEFAULT); 189 oidOrUpn = enableUPN ? UserGroupRepresentation.UPN : 190 UserGroupRepresentation.OID; 191 } 192 193 /** 194 * This method is provided for convenience for derived classes to define 195 * custom {@link AzureADTokenProvider} instance. 196 * 197 * In order to ensure secure hadoop infrastructure and user context for which 198 * respective {@link AdlFileSystem} instance is initialized, 199 * Loading {@link AzureADTokenProvider} is not sufficient. 200 * 201 * The order of loading {@link AzureADTokenProvider} is to first invoke 202 * {@link #getCustomAccessTokenProvider(Configuration)}, If method return null 203 * which means no implementation provided by derived classes, then 204 * configuration object is loaded to retrieve token configuration as specified 205 * is documentation. 206 * 207 * Custom token management takes the higher precedence during initialization. 208 * 209 * @param conf Configuration object 210 * @return null if the no custom {@link AzureADTokenProvider} token management 211 * is specified. 212 * @throws IOException if failed to initialize token provider. 213 */ 214 protected synchronized AzureADTokenProvider getCustomAccessTokenProvider( 215 Configuration conf) throws IOException { 216 String className = getNonEmptyVal(conf, AZURE_AD_TOKEN_PROVIDER_CLASS_KEY); 217 218 Class<? extends AzureADTokenProvider> azureADTokenProviderClass = 219 conf.getClass(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY, null, 220 AzureADTokenProvider.class); 221 if (azureADTokenProviderClass == null) { 222 throw new IllegalArgumentException( 223 "Configuration " + className + " " + "not defined/accessible."); 224 } 225 226 azureTokenProvider = ReflectionUtils 227 .newInstance(azureADTokenProviderClass, conf); 228 if (azureTokenProvider == null) { 229 throw new IllegalArgumentException("Failed to initialize " + className); 230 } 231 232 azureTokenProvider.initialize(conf); 233 return azureTokenProvider; 234 } 235 236 private AccessTokenProvider getAccessTokenProvider(Configuration config) 237 throws IOException { 238 Configuration conf = ProviderUtils.excludeIncompatibleCredentialProviders( 239 config, AdlFileSystem.class); 240 TokenProviderType type = conf.getEnum( 241 AdlConfKeys.AZURE_AD_TOKEN_PROVIDER_TYPE_KEY, TokenProviderType.Custom); 242 243 switch (type) { 244 case RefreshToken: 245 tokenProvider = getConfRefreshTokenBasedTokenProvider(conf); 246 break; 247 case ClientCredential: 248 tokenProvider = getConfCredentialBasedTokenProvider(conf); 249 break; 250 case Custom: 251 default: 252 AzureADTokenProvider azureADTokenProvider = getCustomAccessTokenProvider( 253 conf); 254 tokenProvider = new SdkTokenProviderAdapter(azureADTokenProvider); 255 break; 256 } 257 258 return tokenProvider; 259 } 260 261 private AccessTokenProvider getConfCredentialBasedTokenProvider( 262 Configuration conf) throws IOException { 263 String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY); 264 String refreshUrl = getPasswordString(conf, AZURE_AD_REFRESH_URL_KEY); 265 String clientSecret = getPasswordString(conf, AZURE_AD_CLIENT_SECRET_KEY); 266 return new ClientCredsTokenProvider(refreshUrl, clientId, clientSecret); 267 } 268 269 private AccessTokenProvider getConfRefreshTokenBasedTokenProvider( 270 Configuration conf) throws IOException { 271 String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY); 272 String refreshToken = getPasswordString(conf, AZURE_AD_REFRESH_TOKEN_KEY); 273 return new RefreshTokenBasedTokenProvider(clientId, refreshToken); 274 } 275 276 @VisibleForTesting 277 AccessTokenProvider getTokenProvider() { 278 return tokenProvider; 279 } 280 281 @VisibleForTesting 282 AzureADTokenProvider getAzureTokenProvider() { 283 return azureTokenProvider; 284 } 285 286 /** 287 * Constructing home directory locally is fine as long as Hadoop 288 * local user name and ADL user name relationship story is not fully baked 289 * yet. 290 * 291 * @return Hadoop local user home directory. 292 */ 293 @Override 294 public Path getHomeDirectory() { 295 return makeQualified(new Path("/user/" + userName)); 296 } 297 298 /** 299 * Create call semantic is handled differently in case of ADL. Create 300 * semantics is translated to Create/Append 301 * semantics. 302 * 1. No dedicated connection to server. 303 * 2. Buffering is locally done, Once buffer is full or flush is invoked on 304 * the by the caller. All the pending 305 * data is pushed to ADL as APPEND operation code. 306 * 3. On close - Additional call is send to server to close the stream, and 307 * release lock from the stream. 308 * 309 * Necessity of Create/Append semantics is 310 * 1. ADL backend server does not allow idle connection for longer duration 311 * . In case of slow writer scenario, 312 * observed connection timeout/Connection reset causing occasional job 313 * failures. 314 * 2. Performance boost to jobs which are slow writer, avoided network latency 315 * 3. ADL equally better performing with multiple of 4MB chunk as append 316 * calls. 317 * 318 * @param f File path 319 * @param permission Access permission for the newly created file 320 * @param overwrite Remove existing file and recreate new one if true 321 * otherwise throw error if file exist 322 * @param bufferSize Buffer size, ADL backend does not honour 323 * @param replication Replication count, ADL backend does not honour 324 * @param blockSize Block size, ADL backend does not honour 325 * @param progress Progress indicator 326 * @return FSDataOutputStream OutputStream on which application can push 327 * stream of bytes 328 * @throws IOException when system error, internal server error or user error 329 */ 330 @Override 331 public FSDataOutputStream create(Path f, FsPermission permission, 332 boolean overwrite, int bufferSize, short replication, long blockSize, 333 Progressable progress) throws IOException { 334 statistics.incrementWriteOps(1); 335 IfExists overwriteRule = overwrite ? IfExists.OVERWRITE : IfExists.FAIL; 336 return new FSDataOutputStream(new AdlFsOutputStream(adlClient 337 .createFile(toRelativeFilePath(f), overwriteRule, 338 Integer.toOctalString(applyUMask(permission).toShort()), true), 339 getConf()), this.statistics); 340 } 341 342 /** 343 * Opens an FSDataOutputStream at the indicated Path with write-progress 344 * reporting. Same as create(), except fails if parent directory doesn't 345 * already exist. 346 * 347 * @param f the file name to open 348 * @param permission Access permission for the newly created file 349 * @param flags {@link CreateFlag}s to use for this stream. 350 * @param bufferSize the size of the buffer to be used. ADL backend does 351 * not honour 352 * @param replication required block replication for the file. ADL backend 353 * does not honour 354 * @param blockSize Block size, ADL backend does not honour 355 * @param progress Progress indicator 356 * @throws IOException when system error, internal server error or user error 357 * @see #setPermission(Path, FsPermission) 358 * @deprecated API only for 0.20-append 359 */ 360 @Override 361 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 362 EnumSet<CreateFlag> flags, int bufferSize, short replication, 363 long blockSize, Progressable progress) throws IOException { 364 statistics.incrementWriteOps(1); 365 IfExists overwriteRule = IfExists.FAIL; 366 for (CreateFlag flag : flags) { 367 if (flag == CreateFlag.OVERWRITE) { 368 overwriteRule = IfExists.OVERWRITE; 369 break; 370 } 371 } 372 373 return new FSDataOutputStream(new AdlFsOutputStream(adlClient 374 .createFile(toRelativeFilePath(f), overwriteRule, 375 Integer.toOctalString(applyUMask(permission).toShort()), false), 376 getConf()), this.statistics); 377 } 378 379 /** 380 * Append to an existing file (optional operation). 381 * 382 * @param f the existing file to be appended. 383 * @param bufferSize the size of the buffer to be used. ADL backend does 384 * not honour 385 * @param progress Progress indicator 386 * @throws IOException when system error, internal server error or user error 387 */ 388 @Override 389 public FSDataOutputStream append(Path f, int bufferSize, 390 Progressable progress) throws IOException { 391 statistics.incrementWriteOps(1); 392 return new FSDataOutputStream( 393 new AdlFsOutputStream(adlClient.getAppendStream(toRelativeFilePath(f)), 394 getConf()), this.statistics); 395 } 396 397 /** 398 * Azure data lake does not support user configuration for data replication 399 * hence not leaving system to query on 400 * azure data lake. 401 * 402 * Stub implementation 403 * 404 * @param p Not honoured 405 * @param replication Not honoured 406 * @return True hard coded since ADL file system does not support 407 * replication configuration 408 * @throws IOException No exception would not thrown in this case however 409 * aligning with parent api definition. 410 */ 411 @Override 412 public boolean setReplication(final Path p, final short replication) 413 throws IOException { 414 statistics.incrementWriteOps(1); 415 return true; 416 } 417 418 /** 419 * Open call semantic is handled differently in case of ADL. Instead of 420 * network stream is returned to the user, 421 * Overridden FsInputStream is returned. 422 * 423 * @param f File path 424 * @param buffersize Buffer size, Not honoured 425 * @return FSDataInputStream InputStream on which application can read 426 * stream of bytes 427 * @throws IOException when system error, internal server error or user error 428 */ 429 @Override 430 public FSDataInputStream open(final Path f, final int buffersize) 431 throws IOException { 432 statistics.incrementReadOps(1); 433 return new FSDataInputStream( 434 new AdlFsInputStream(adlClient.getReadStream(toRelativeFilePath(f)), 435 statistics, getConf())); 436 } 437 438 /** 439 * Return a file status object that represents the path. 440 * 441 * @param f The path we want information from 442 * @return a FileStatus object 443 * @throws IOException when the path does not exist or any other error; 444 * IOException see specific implementation 445 */ 446 @Override 447 public FileStatus getFileStatus(final Path f) throws IOException { 448 statistics.incrementReadOps(1); 449 DirectoryEntry entry = 450 adlClient.getDirectoryEntry(toRelativeFilePath(f), oidOrUpn); 451 return toFileStatus(entry, f); 452 } 453 454 /** 455 * List the statuses of the files/directories in the given path if the path is 456 * a directory. 457 * 458 * @param f given path 459 * @return the statuses of the files/directories in the given patch 460 * @throws IOException when the path does not exist or any other error; 461 * IOException see specific implementation 462 */ 463 @Override 464 public FileStatus[] listStatus(final Path f) throws IOException { 465 statistics.incrementReadOps(1); 466 List<DirectoryEntry> entries = 467 adlClient.enumerateDirectory(toRelativeFilePath(f), oidOrUpn); 468 return toFileStatuses(entries, f); 469 } 470 471 /** 472 * Renames Path src to Path dst. Can take place on local fs 473 * or remote DFS. 474 * 475 * ADLS support POSIX standard for rename operation. 476 * 477 * @param src path to be renamed 478 * @param dst new path after rename 479 * @return true if rename is successful 480 * @throws IOException on failure 481 */ 482 @Override 483 public boolean rename(final Path src, final Path dst) throws IOException { 484 statistics.incrementWriteOps(1); 485 if (toRelativeFilePath(src).equals("/")) { 486 return false; 487 } 488 489 return adlClient.rename(toRelativeFilePath(src), toRelativeFilePath(dst)); 490 } 491 492 @Override 493 @Deprecated 494 public void rename(final Path src, final Path dst, 495 final Options.Rename... options) throws IOException { 496 statistics.incrementWriteOps(1); 497 boolean overwrite = false; 498 for (Rename renameOption : options) { 499 if (renameOption == Rename.OVERWRITE) { 500 overwrite = true; 501 break; 502 } 503 } 504 adlClient 505 .rename(toRelativeFilePath(src), toRelativeFilePath(dst), overwrite); 506 } 507 508 /** 509 * Concat existing files together. 510 * 511 * @param trg the path to the target destination. 512 * @param srcs the paths to the sources to use for the concatenation. 513 * @throws IOException when system error, internal server error or user error 514 */ 515 @Override 516 public void concat(final Path trg, final Path[] srcs) throws IOException { 517 statistics.incrementWriteOps(1); 518 List<String> sourcesList = new ArrayList<String>(); 519 for (Path entry : srcs) { 520 sourcesList.add(toRelativeFilePath(entry)); 521 } 522 adlClient.concatenateFiles(toRelativeFilePath(trg), sourcesList); 523 } 524 525 /** 526 * Delete a file. 527 * 528 * @param path the path to delete. 529 * @param recursive if path is a directory and set to 530 * true, the directory is deleted else throws an exception. 531 * In case of a file the recursive can be set to either 532 * true or false. 533 * @return true if delete is successful else false. 534 * @throws IOException when system error, internal server error or user error 535 */ 536 @Override 537 public boolean delete(final Path path, final boolean recursive) 538 throws IOException { 539 statistics.incrementWriteOps(1); 540 String relativePath = toRelativeFilePath(path); 541 // Delete on root directory not supported. 542 if (relativePath.equals("/")) { 543 // This is important check after recent commit 544 // HADOOP-12977 and HADOOP-13716 validates on root for 545 // 1. if root is empty and non recursive delete then return false. 546 // 2. if root is non empty and non recursive delete then throw exception. 547 if (!recursive 548 && adlClient.enumerateDirectory(toRelativeFilePath(path), 1).size() 549 > 0) { 550 throw new IOException("Delete on root is not supported."); 551 } 552 return false; 553 } 554 555 return recursive ? 556 adlClient.deleteRecursive(relativePath) : 557 adlClient.delete(relativePath); 558 } 559 560 /** 561 * Make the given file and all non-existent parents into 562 * directories. Has the semantics of Unix 'mkdir -p'. 563 * Existence of the directory hierarchy is not an error. 564 * 565 * @param path path to create 566 * @param permission to apply to path 567 */ 568 @Override 569 public boolean mkdirs(final Path path, final FsPermission permission) 570 throws IOException { 571 statistics.incrementWriteOps(1); 572 return adlClient.createDirectory(toRelativeFilePath(path), 573 Integer.toOctalString(applyUMask(permission).toShort())); 574 } 575 576 private FileStatus[] toFileStatuses(final List<DirectoryEntry> entries, 577 final Path parent) { 578 FileStatus[] fileStatuses = new FileStatus[entries.size()]; 579 int index = 0; 580 for (DirectoryEntry entry : entries) { 581 FileStatus status = toFileStatus(entry, parent); 582 if (!(entry.name == null || entry.name == "")) { 583 status.setPath( 584 new Path(parent.makeQualified(uri, workingDirectory), entry.name)); 585 } 586 587 fileStatuses[index++] = status; 588 } 589 590 return fileStatuses; 591 } 592 593 private FsPermission applyUMask(FsPermission permission) { 594 if (permission == null) { 595 permission = FsPermission.getDefault(); 596 } 597 return permission.applyUMask(FsPermission.getUMask(getConf())); 598 } 599 600 private FileStatus toFileStatus(final DirectoryEntry entry, final Path f) { 601 boolean isDirectory = entry.type == DirectoryEntryType.DIRECTORY; 602 long lastModificationData = entry.lastModifiedTime.getTime(); 603 long lastAccessTime = entry.lastAccessTime.getTime(); 604 // set aclBit from ADLS backend response if 605 // ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION is true. 606 final boolean aclBit = aclBitStatus ? entry.aclBit : false; 607 608 FsPermission permission = new AdlPermission(aclBit, 609 Short.valueOf(entry.permission, 8)); 610 String user = entry.user; 611 String group = entry.group; 612 613 FileStatus status; 614 if (overrideOwner) { 615 status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR, 616 ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission, 617 userName, "hdfs", this.makeQualified(f)); 618 } else { 619 status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR, 620 ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission, 621 user, group, this.makeQualified(f)); 622 } 623 624 return status; 625 } 626 627 /** 628 * Set owner of a path (i.e. a file or a directory). 629 * The parameters owner and group cannot both be null. 630 * 631 * @param path The path 632 * @param owner If it is null, the original username remains unchanged. 633 * @param group If it is null, the original groupname remains unchanged. 634 */ 635 @Override 636 public void setOwner(final Path path, final String owner, final String group) 637 throws IOException { 638 statistics.incrementWriteOps(1); 639 adlClient.setOwner(toRelativeFilePath(path), owner, group); 640 } 641 642 /** 643 * Set permission of a path. 644 * 645 * @param path The path 646 * @param permission Access permission 647 */ 648 @Override 649 public void setPermission(final Path path, final FsPermission permission) 650 throws IOException { 651 statistics.incrementWriteOps(1); 652 adlClient.setPermission(toRelativeFilePath(path), 653 Integer.toOctalString(permission.toShort())); 654 } 655 656 /** 657 * Modifies ACL entries of files and directories. This method can add new ACL 658 * entries or modify the permissions on existing ACL entries. All existing 659 * ACL entries that are not specified in this call are retained without 660 * changes. (Modifications are merged into the current ACL.) 661 * 662 * @param path Path to modify 663 * @param aclSpec List of AclEntry describing modifications 664 * @throws IOException if an ACL could not be modified 665 */ 666 @Override 667 public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec) 668 throws IOException { 669 statistics.incrementWriteOps(1); 670 List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new 671 ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>(); 672 for (AclEntry aclEntry : aclSpec) { 673 msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry 674 .parseAclEntry(aclEntry.toString())); 675 } 676 adlClient.modifyAclEntries(toRelativeFilePath(path), msAclEntries); 677 } 678 679 /** 680 * Removes ACL entries from files and directories. Other ACL entries are 681 * retained. 682 * 683 * @param path Path to modify 684 * @param aclSpec List of AclEntry describing entries to remove 685 * @throws IOException if an ACL could not be modified 686 */ 687 @Override 688 public void removeAclEntries(final Path path, final List<AclEntry> aclSpec) 689 throws IOException { 690 statistics.incrementWriteOps(1); 691 List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new 692 ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>(); 693 for (AclEntry aclEntry : aclSpec) { 694 msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry 695 .parseAclEntry(aclEntry.toString(), true)); 696 } 697 adlClient.removeAclEntries(toRelativeFilePath(path), msAclEntries); 698 } 699 700 /** 701 * Removes all default ACL entries from files and directories. 702 * 703 * @param path Path to modify 704 * @throws IOException if an ACL could not be modified 705 */ 706 @Override 707 public void removeDefaultAcl(final Path path) throws IOException { 708 statistics.incrementWriteOps(1); 709 adlClient.removeDefaultAcls(toRelativeFilePath(path)); 710 } 711 712 /** 713 * Removes all but the base ACL entries of files and directories. The entries 714 * for user, group, and others are retained for compatibility with permission 715 * bits. 716 * 717 * @param path Path to modify 718 * @throws IOException if an ACL could not be removed 719 */ 720 @Override 721 public void removeAcl(final Path path) throws IOException { 722 statistics.incrementWriteOps(1); 723 adlClient.removeAllAcls(toRelativeFilePath(path)); 724 } 725 726 /** 727 * Fully replaces ACL of files and directories, discarding all existing 728 * entries. 729 * 730 * @param path Path to modify 731 * @param aclSpec List of AclEntry describing modifications, must include 732 * entries for user, group, and others for compatibility with 733 * permission bits. 734 * @throws IOException if an ACL could not be modified 735 */ 736 @Override 737 public void setAcl(final Path path, final List<AclEntry> aclSpec) 738 throws IOException { 739 statistics.incrementWriteOps(1); 740 List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new 741 ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>(); 742 for (AclEntry aclEntry : aclSpec) { 743 msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry 744 .parseAclEntry(aclEntry.toString())); 745 } 746 747 adlClient.setAcl(toRelativeFilePath(path), msAclEntries); 748 } 749 750 /** 751 * Gets the ACL of a file or directory. 752 * 753 * @param path Path to get 754 * @return AclStatus describing the ACL of the file or directory 755 * @throws IOException if an ACL could not be read 756 */ 757 @Override 758 public AclStatus getAclStatus(final Path path) throws IOException { 759 statistics.incrementReadOps(1); 760 com.microsoft.azure.datalake.store.acl.AclStatus adlStatus = 761 adlClient.getAclStatus(toRelativeFilePath(path), oidOrUpn); 762 AclStatus.Builder aclStatusBuilder = new AclStatus.Builder(); 763 aclStatusBuilder.owner(adlStatus.owner); 764 aclStatusBuilder.group(adlStatus.group); 765 aclStatusBuilder.setPermission( 766 new FsPermission(Short.valueOf(adlStatus.octalPermissions, 8))); 767 aclStatusBuilder.stickyBit(adlStatus.stickyBit); 768 String aclListString = com.microsoft.azure.datalake.store.acl.AclEntry 769 .aclListToString(adlStatus.aclSpec); 770 List<AclEntry> aclEntries = AclEntry.parseAclSpec(aclListString, true); 771 aclStatusBuilder.addEntries(aclEntries); 772 return aclStatusBuilder.build(); 773 } 774 775 /** 776 * Checks if the user can access a path. The mode specifies which access 777 * checks to perform. If the requested permissions are granted, then the 778 * method returns normally. If access is denied, then the method throws an 779 * {@link AccessControlException}. 780 * 781 * @param path Path to check 782 * @param mode type of access to check 783 * @throws AccessControlException if access is denied 784 * @throws java.io.FileNotFoundException if the path does not exist 785 * @throws IOException see specific implementation 786 */ 787 @Override 788 public void access(final Path path, FsAction mode) throws IOException { 789 statistics.incrementReadOps(1); 790 if (!adlClient.checkAccess(toRelativeFilePath(path), mode.SYMBOL)) { 791 throw new AccessControlException("Access Denied : " + path.toString()); 792 } 793 } 794 795 /** 796 * Return the {@link ContentSummary} of a given {@link Path}. 797 * 798 * @param f path to use 799 */ 800 @Override 801 public ContentSummary getContentSummary(Path f) throws IOException { 802 statistics.incrementReadOps(1); 803 com.microsoft.azure.datalake.store.ContentSummary msSummary = adlClient 804 .getContentSummary(toRelativeFilePath(f)); 805 return new ContentSummary(msSummary.length, msSummary.fileCount, msSummary.directoryCount, -1L, 806 msSummary.spaceConsumed, -1L); 807 } 808 809 @VisibleForTesting 810 protected String getTransportScheme() { 811 return SECURE_TRANSPORT_SCHEME; 812 } 813 814 @VisibleForTesting 815 String toRelativeFilePath(Path path) { 816 return path.makeQualified(uri, workingDirectory).toUri().getPath(); 817 } 818 819 /** 820 * Get the current working directory for the given file system. 821 * 822 * @return the directory pathname 823 */ 824 @Override 825 public Path getWorkingDirectory() { 826 return workingDirectory; 827 } 828 829 /** 830 * Set the current working directory for the given file system. All relative 831 * paths will be resolved relative to it. 832 * 833 * @param dir Working directory path. 834 */ 835 @Override 836 public void setWorkingDirectory(final Path dir) { 837 if (dir == null) { 838 throw new InvalidPathException("Working directory cannot be set to NULL"); 839 } 840 841 /** 842 * Do not validate the scheme and URI of the passsed parameter. When Adls 843 * runs as additional file system, working directory set has the default 844 * file system scheme and uri. 845 * 846 * Found a problem during PIG execution in 847 * https://github.com/apache/pig/blob/branch-0 848 * .15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer 849 * /PigInputFormat.java#L235 850 * However similar problem would be present in other application so 851 * defaulting to build working directory using relative path only. 852 */ 853 this.workingDirectory = this.makeAbsolute(dir); 854 } 855 856 /** 857 * Return the number of bytes that large input files should be optimally 858 * be split into to minimize i/o time. 859 * 860 * @deprecated use {@link #getDefaultBlockSize(Path)} instead 861 */ 862 @Deprecated 863 public long getDefaultBlockSize() { 864 return ADL_BLOCK_SIZE; 865 } 866 867 /** 868 * Return the number of bytes that large input files should be optimally 869 * be split into to minimize i/o time. The given path will be used to 870 * locate the actual filesystem. The full path does not have to exist. 871 * 872 * @param f path of file 873 * @return the default block size for the path's filesystem 874 */ 875 public long getDefaultBlockSize(Path f) { 876 return getDefaultBlockSize(); 877 } 878 879 /** 880 * Get the block size. 881 * @param f the filename 882 * @return the number of bytes in a block 883 */ 884 /** 885 * @deprecated Use getFileStatus() instead 886 */ 887 @Deprecated 888 public long getBlockSize(Path f) throws IOException { 889 return ADL_BLOCK_SIZE; 890 } 891 892 @Override 893 public BlockLocation[] getFileBlockLocations(final FileStatus status, 894 final long offset, final long length) throws IOException { 895 if (status == null) { 896 return null; 897 } 898 899 if ((offset < 0) || (length < 0)) { 900 throw new IllegalArgumentException("Invalid start or len parameter"); 901 } 902 903 if (status.getLen() < offset) { 904 return new BlockLocation[0]; 905 } 906 907 final String[] name = {"localhost"}; 908 final String[] host = {"localhost"}; 909 long blockSize = ADL_BLOCK_SIZE; 910 int numberOfLocations = 911 (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1); 912 BlockLocation[] locations = new BlockLocation[numberOfLocations]; 913 for (int i = 0; i < locations.length; i++) { 914 long currentOffset = offset + (i * blockSize); 915 long currentLength = Math.min(blockSize, offset + length - currentOffset); 916 locations[i] = new BlockLocation(name, host, currentOffset, 917 currentLength); 918 } 919 920 return locations; 921 } 922 923 @Override 924 public BlockLocation[] getFileBlockLocations(final Path p, final long offset, 925 final long length) throws IOException { 926 // read ops incremented in getFileStatus 927 FileStatus fileStatus = getFileStatus(p); 928 return getFileBlockLocations(fileStatus, offset, length); 929 } 930 931 /** 932 * Get replication. 933 * 934 * @param src file name 935 * @return file replication 936 * @deprecated Use getFileStatus() instead 937 */ 938 @Deprecated 939 public short getReplication(Path src) { 940 return ADL_REPLICATION_FACTOR; 941 } 942 943 private Path makeAbsolute(Path path) { 944 return path.isAbsolute() ? path : new Path(this.workingDirectory, path); 945 } 946 947 private static String getNonEmptyVal(Configuration conf, String key) { 948 String value = conf.get(key); 949 if (StringUtils.isEmpty(value)) { 950 throw new IllegalArgumentException( 951 "No value for " + key + " found in conf file."); 952 } 953 return value; 954 } 955 956 /** 957 * A wrapper of {@link Configuration#getPassword(String)}. It returns 958 * <code>String</code> instead of <code>char[]</code>. 959 * 960 * @param conf the configuration 961 * @param key the property key 962 * @return the password string 963 * @throws IOException if the password was not found 964 */ 965 private static String getPasswordString(Configuration conf, String key) 966 throws IOException { 967 char[] passchars = conf.getPassword(key); 968 if (passchars == null) { 969 throw new IOException("Password " + key + " not found"); 970 } 971 return new String(passchars); 972 } 973 974 @VisibleForTesting 975 public void setUserGroupRepresentationAsUPN(boolean enableUPN) { 976 oidOrUpn = enableUPN ? UserGroupRepresentation.UPN : 977 UserGroupRepresentation.OID; 978 } 979}