001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.azure; 020 021import java.io.DataInputStream; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.text.SimpleDateFormat; 029import java.util.ArrayList; 030import java.util.Date; 031import java.util.EnumSet; 032import java.util.Iterator; 033import java.util.Set; 034import java.util.TimeZone; 035import java.util.TreeSet; 036import java.util.UUID; 037import java.util.concurrent.atomic.AtomicInteger; 038import java.util.regex.Matcher; 039import java.util.regex.Pattern; 040 041import org.apache.commons.lang.StringUtils; 042import org.apache.commons.lang.exception.ExceptionUtils; 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.hadoop.classification.InterfaceAudience; 046import org.apache.hadoop.classification.InterfaceStability; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.BlockLocation; 049import org.apache.hadoop.fs.BufferedFSInputStream; 050import org.apache.hadoop.fs.CreateFlag; 051import org.apache.hadoop.fs.FSDataInputStream; 052import org.apache.hadoop.fs.FSDataOutputStream; 053import org.apache.hadoop.fs.FSInputStream; 054import org.apache.hadoop.fs.FileStatus; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; 058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; 059import org.apache.hadoop.fs.permission.FsPermission; 060import org.apache.hadoop.fs.permission.PermissionStatus; 061import org.apache.hadoop.fs.azure.AzureException; 062import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper; 063import org.apache.hadoop.io.IOUtils; 064import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; 065import org.apache.hadoop.security.UserGroupInformation; 066import org.apache.hadoop.util.Progressable; 067 068 069import org.codehaus.jackson.JsonNode; 070import org.codehaus.jackson.JsonParseException; 071import org.codehaus.jackson.JsonParser; 072import org.codehaus.jackson.map.JsonMappingException; 073import org.codehaus.jackson.map.ObjectMapper; 074 075import com.google.common.annotations.VisibleForTesting; 076import com.microsoft.windowsazure.storage.AccessCondition; 077import com.microsoft.windowsazure.storage.OperationContext; 078import com.microsoft.windowsazure.storage.StorageException; 079import com.microsoft.windowsazure.storage.blob.CloudBlob; 080import com.microsoft.windowsazure.storage.core.*; 081 082/** 083 * A {@link FileSystem} for reading and writing files stored on <a 084 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 085 * blob-based and stores files on Azure in their native form so they can be read 086 * by other Azure tools. 087 */ 088@InterfaceAudience.Public 089@InterfaceStability.Stable 090public class NativeAzureFileSystem extends FileSystem { 091 private static final int USER_WX_PERMISION = 0300; 092 093 /** 094 * A description of a folder rename operation, including the source and 095 * destination keys, and descriptions of the files in the source folder. 096 */ 097 public static class FolderRenamePending { 098 private SelfRenewingLease folderLease; 099 private String srcKey; 100 private String dstKey; 101 private FileMetadata[] fileMetadata = null; // descriptions of source files 102 private ArrayList<String> fileStrings = null; 103 private NativeAzureFileSystem fs; 104 private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000; 105 private static final int FORMATTING_BUFFER = 10000; 106 private boolean committed; 107 public static final String SUFFIX = "-RenamePending.json"; 108 109 // Prepare in-memory information needed to do or redo a folder rename. 110 public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease, 111 NativeAzureFileSystem fs) throws IOException { 112 this.srcKey = srcKey; 113 this.dstKey = dstKey; 114 this.folderLease = lease; 115 this.fs = fs; 116 ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>(); 117 118 // List all the files in the folder. 119 String priorLastKey = null; 120 do { 121 PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL, 122 AZURE_UNBOUNDED_DEPTH, priorLastKey); 123 for(FileMetadata file : listing.getFiles()) { 124 fileMetadataList.add(file); 125 } 126 priorLastKey = listing.getPriorLastKey(); 127 } while (priorLastKey != null); 128 fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]); 129 this.committed = true; 130 } 131 132 // Prepare in-memory information needed to do or redo folder rename from 133 // a -RenamePending.json file read from storage. This constructor is to use during 134 // redo processing. 135 public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs) 136 throws IllegalArgumentException, IOException { 137 138 this.fs = fs; 139 140 // open redo file 141 Path f = redoFile; 142 FSDataInputStream input = fs.open(f); 143 byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE]; 144 int l = input.read(bytes); 145 if (l < 0) { 146 throw new IOException( 147 "Error reading pending rename file contents -- no data available"); 148 } 149 if (l == MAX_RENAME_PENDING_FILE_SIZE) { 150 throw new IOException( 151 "Error reading pending rename file contents -- " 152 + "maximum file size exceeded"); 153 } 154 String contents = new String(bytes, 0, l); 155 156 // parse the JSON 157 ObjectMapper objMapper = new ObjectMapper(); 158 objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 159 JsonNode json = null; 160 try { 161 json = objMapper.readValue(contents, JsonNode.class); 162 this.committed = true; 163 } catch (JsonMappingException e) { 164 165 // The -RedoPending.json file is corrupted, so we assume it was 166 // not completely written 167 // and the redo operation did not commit. 168 this.committed = false; 169 } catch (JsonParseException e) { 170 this.committed = false; 171 } catch (IOException e) { 172 this.committed = false; 173 } 174 175 if (!this.committed) { 176 LOG.error("Deleting corruped rename pending file " 177 + redoFile + "\n" + contents); 178 179 // delete the -RenamePending.json file 180 fs.delete(redoFile, false); 181 return; 182 } 183 184 // initialize this object's fields 185 ArrayList<String> fileStrList = new ArrayList<String>(); 186 JsonNode oldFolderName = json.get("OldFolderName"); 187 JsonNode newFolderName = json.get("NewFolderName"); 188 if (oldFolderName == null || newFolderName == null) { 189 this.committed = false; 190 } else { 191 this.srcKey = oldFolderName.getTextValue(); 192 this.dstKey = newFolderName.getTextValue(); 193 if (this.srcKey == null || this.dstKey == null) { 194 this.committed = false; 195 } else { 196 JsonNode fileList = json.get("FileList"); 197 if (fileList == null) { 198 this.committed = false; 199 } else { 200 for (int i = 0; i < fileList.size(); i++) { 201 fileStrList.add(fileList.get(i).getTextValue()); 202 } 203 } 204 } 205 } 206 this.fileStrings = fileStrList; 207 } 208 209 public FileMetadata[] getFiles() { 210 return fileMetadata; 211 } 212 213 public SelfRenewingLease getFolderLease() { 214 return folderLease; 215 } 216 217 /** 218 * Write to disk the information needed to redo folder rename, 219 * in JSON format. The file name will be 220 * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json} 221 * The file format will be: 222 * <pre>{@code 223 * { 224 * FormatVersion: "1.0", 225 * OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>", 226 * OldFolderName: "<key>", 227 * NewFolderName: "<key>", 228 * FileList: [ <string> , <string> , ... ] 229 * } 230 * 231 * Here's a sample: 232 * { 233 * FormatVersion: "1.0", 234 * OperationUTCTime: "2014-07-01 23:50:35.572", 235 * OldFolderName: "user/ehans/folderToRename", 236 * NewFolderName: "user/ehans/renamedFolder", 237 * FileList: [ 238 * "innerFile", 239 * "innerFile2" 240 * ] 241 * } }</pre> 242 * @throws IOException 243 */ 244 public void writeFile(FileSystem fs) throws IOException { 245 Path path = getRenamePendingFilePath(); 246 if (LOG.isDebugEnabled()){ 247 LOG.debug("Preparing to write atomic rename state to " + path.toString()); 248 } 249 OutputStream output = null; 250 251 String contents = makeRenamePendingFileContents(); 252 253 // Write file. 254 try { 255 output = fs.create(path); 256 output.write(contents.getBytes()); 257 } catch (IOException e) { 258 throw new IOException("Unable to write RenamePending file for folder rename from " 259 + srcKey + " to " + dstKey, e); 260 } finally { 261 IOUtils.cleanup(LOG, output); 262 } 263 } 264 265 /** 266 * Return the contents of the JSON file to represent the operations 267 * to be performed for a folder rename. 268 */ 269 public String makeRenamePendingFileContents() { 270 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 271 sdf.setTimeZone(TimeZone.getTimeZone("UTC")); 272 String time = sdf.format(new Date()); 273 274 // Make file list string 275 StringBuilder builder = new StringBuilder(); 276 builder.append("[\n"); 277 for (int i = 0; i != fileMetadata.length; i++) { 278 if (i > 0) { 279 builder.append(",\n"); 280 } 281 builder.append(" "); 282 String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/"); 283 284 // Quote string file names, escaping any possible " characters or other 285 // necessary characters in the name. 286 builder.append(quote(noPrefix)); 287 if (builder.length() >= 288 MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) { 289 290 // Give up now to avoid using too much memory. 291 LOG.error("Internal error: Exceeded maximum rename pending file size of " 292 + MAX_RENAME_PENDING_FILE_SIZE + " bytes."); 293 294 // return some bad JSON with an error message to make it human readable 295 return "exceeded maximum rename pending file size"; 296 } 297 } 298 builder.append("\n ]"); 299 String fileList = builder.toString(); 300 301 // Make file contents as a string. Again, quote file names, escaping 302 // characters as appropriate. 303 String contents = "{\n" 304 + " FormatVersion: \"1.0\",\n" 305 + " OperationUTCTime: \"" + time + "\",\n" 306 + " OldFolderName: " + quote(srcKey) + ",\n" 307 + " NewFolderName: " + quote(dstKey) + ",\n" 308 + " FileList: " + fileList + "\n" 309 + "}\n"; 310 311 return contents; 312 } 313 314 /** 315 * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 316 * method. 317 * 318 * Produce a string in double quotes with backslash sequences in all the 319 * right places. A backslash will be inserted within </, allowing JSON 320 * text to be delivered in HTML. In JSON text, a string cannot contain a 321 * control character or an unescaped quote or backslash. 322 * @param string A String 323 * @return A String correctly formatted for insertion in a JSON text. 324 */ 325 private String quote(String string) { 326 if (string == null || string.length() == 0) { 327 return "\"\""; 328 } 329 330 char c = 0; 331 int i; 332 int len = string.length(); 333 StringBuilder sb = new StringBuilder(len + 4); 334 String t; 335 336 sb.append('"'); 337 for (i = 0; i < len; i += 1) { 338 c = string.charAt(i); 339 switch (c) { 340 case '\\': 341 case '"': 342 sb.append('\\'); 343 sb.append(c); 344 break; 345 case '/': 346 sb.append('\\'); 347 sb.append(c); 348 break; 349 case '\b': 350 sb.append("\\b"); 351 break; 352 case '\t': 353 sb.append("\\t"); 354 break; 355 case '\n': 356 sb.append("\\n"); 357 break; 358 case '\f': 359 sb.append("\\f"); 360 break; 361 case '\r': 362 sb.append("\\r"); 363 break; 364 default: 365 if (c < ' ') { 366 t = "000" + Integer.toHexString(c); 367 sb.append("\\u" + t.substring(t.length() - 4)); 368 } else { 369 sb.append(c); 370 } 371 } 372 } 373 sb.append('"'); 374 return sb.toString(); 375 } 376 377 public String getSrcKey() { 378 return srcKey; 379 } 380 381 public String getDstKey() { 382 return dstKey; 383 } 384 385 public FileMetadata getSourceMetadata() throws IOException { 386 return fs.getStoreInterface().retrieveMetadata(srcKey); 387 } 388 389 /** 390 * Execute a folder rename. This is the execution path followed 391 * when everything is working normally. See redo() for the alternate 392 * execution path for the case where we're recovering from a folder rename 393 * failure. 394 * @throws IOException 395 */ 396 public void execute() throws IOException { 397 398 for (FileMetadata file : this.getFiles()) { 399 400 // Rename all materialized entries under the folder to point to the 401 // final destination. 402 if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { 403 String srcName = file.getKey(); 404 String suffix = srcName.substring((this.getSrcKey()).length()); 405 String dstName = this.getDstKey() + suffix; 406 407 // Rename gets exclusive access (via a lease) for files 408 // designated for atomic rename. 409 // The main use case is for HBase write-ahead log (WAL) and data 410 // folder processing correctness. See the rename code for details. 411 boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName); 412 fs.getStoreInterface().rename(srcName, dstName, acquireLease, null); 413 } 414 } 415 416 // Rename the source folder 0-byte root file itself. 417 FileMetadata srcMetadata2 = this.getSourceMetadata(); 418 if (srcMetadata2.getBlobMaterialization() == 419 BlobMaterialization.Explicit) { 420 421 // It already has a lease on it from the "prepare" phase so there's no 422 // need to get one now. Pass in existing lease to allow file delete. 423 fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(), 424 false, folderLease); 425 } 426 427 // Update the last-modified time of the parent folders of both source and 428 // destination. 429 fs.updateParentFolderLastModifiedTime(srcKey); 430 fs.updateParentFolderLastModifiedTime(dstKey); 431 } 432 433 /** Clean up after execution of rename. 434 * @throws IOException */ 435 public void cleanup() throws IOException { 436 437 if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) { 438 439 // Remove RenamePending file 440 fs.delete(getRenamePendingFilePath(), false); 441 442 // Freeing source folder lease is not necessary since the source 443 // folder file was deleted. 444 } 445 } 446 447 private Path getRenamePendingFilePath() { 448 String fileName = srcKey + SUFFIX; 449 Path fileNamePath = keyToPath(fileName); 450 Path path = fs.makeAbsolute(fileNamePath); 451 return path; 452 } 453 454 /** 455 * Recover from a folder rename failure by redoing the intended work, 456 * as recorded in the -RenamePending.json file. 457 * 458 * @throws IOException 459 */ 460 public void redo() throws IOException { 461 462 if (!committed) { 463 464 // Nothing to do. The -RedoPending.json file should have already been 465 // deleted. 466 return; 467 } 468 469 // Try to get a lease on source folder to block concurrent access to it. 470 // It may fail if the folder is already gone. We don't check if the 471 // source exists explicitly because that could recursively trigger redo 472 // and give an infinite recursion. 473 SelfRenewingLease lease = null; 474 boolean sourceFolderGone = false; 475 try { 476 lease = fs.leaseSourceFolder(srcKey); 477 } catch (AzureException e) { 478 479 // If the source folder was not found then somebody probably 480 // raced with us and finished the rename first, or the 481 // first rename failed right before deleting the rename pending 482 // file. 483 String errorCode = ""; 484 try { 485 StorageException se = (StorageException) e.getCause(); 486 errorCode = se.getErrorCode(); 487 } catch (Exception e2) { 488 ; // do nothing -- could not get errorCode 489 } 490 if (errorCode.equals("BlobNotFound")) { 491 sourceFolderGone = true; 492 } else { 493 throw new IOException( 494 "Unexpected error when trying to lease source folder name during " 495 + "folder rename redo", 496 e); 497 } 498 } 499 500 if (!sourceFolderGone) { 501 // Make sure the target folder exists. 502 Path dst = fullPath(dstKey); 503 if (!fs.exists(dst)) { 504 fs.mkdirs(dst); 505 } 506 507 // For each file inside the folder to be renamed, 508 // make sure it has been renamed. 509 for(String fileName : fileStrings) { 510 finishSingleFileRename(fileName); 511 } 512 513 // Remove the source folder. Don't check explicitly if it exists, 514 // to avoid triggering redo recursively. 515 try { 516 fs.getStoreInterface().delete(srcKey, lease); 517 } catch (Exception e) { 518 LOG.info("Unable to delete source folder during folder rename redo. " 519 + "If the source folder is already gone, this is not an error " 520 + "condition. Continuing with redo.", e); 521 } 522 523 // Update the last-modified time of the parent folders of both source 524 // and destination. 525 fs.updateParentFolderLastModifiedTime(srcKey); 526 fs.updateParentFolderLastModifiedTime(dstKey); 527 } 528 529 // Remove the -RenamePending.json file. 530 fs.delete(getRenamePendingFilePath(), false); 531 } 532 533 // See if the source file is still there, and if it is, rename it. 534 private void finishSingleFileRename(String fileName) 535 throws IOException { 536 Path srcFile = fullPath(srcKey, fileName); 537 Path dstFile = fullPath(dstKey, fileName); 538 boolean srcExists = fs.exists(srcFile); 539 boolean dstExists = fs.exists(dstFile); 540 if (srcExists && !dstExists) { 541 542 // Rename gets exclusive access (via a lease) for HBase write-ahead log 543 // (WAL) file processing correctness. See the rename code for details. 544 String srcName = fs.pathToKey(srcFile); 545 String dstName = fs.pathToKey(dstFile); 546 fs.getStoreInterface().rename(srcName, dstName, true, null); 547 } else if (srcExists && dstExists) { 548 549 // Get a lease on source to block write access. 550 String srcName = fs.pathToKey(srcFile); 551 SelfRenewingLease lease = fs.acquireLease(srcFile); 552 553 // Delete the file. This will free the lease too. 554 fs.getStoreInterface().delete(srcName, lease); 555 } else if (!srcExists && dstExists) { 556 557 // The rename already finished, so do nothing. 558 ; 559 } else { 560 throw new IOException( 561 "Attempting to complete rename of file " + srcKey + "/" + fileName 562 + " during folder rename redo, and file was not found in source " 563 + "or destination."); 564 } 565 } 566 567 // Return an absolute path for the specific fileName within the folder 568 // specified by folderKey. 569 private Path fullPath(String folderKey, String fileName) { 570 return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName); 571 } 572 573 private Path fullPath(String fileKey) { 574 return new Path(new Path(fs.getUri()), "/" + fileKey); 575 } 576 } 577 578 private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]"; 579 private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN = 580 Pattern.compile("\\[\\[\\.\\]\\](?=$|/)"); 581 private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)"); 582 583 @Override 584 public String getScheme() { 585 return "wasb"; 586 } 587 588 589 /** 590 * <p> 591 * A {@link FileSystem} for reading and writing files stored on <a 592 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 593 * blob-based and stores files on Azure in their native form so they can be read 594 * by other Azure tools. This implementation uses HTTPS for secure network communication. 595 * </p> 596 */ 597 public static class Secure extends NativeAzureFileSystem { 598 @Override 599 public String getScheme() { 600 return "wasbs"; 601 } 602 } 603 604 public static final Log LOG = LogFactory.getLog(NativeAzureFileSystem.class); 605 606 static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; 607 /** 608 * The time span in seconds before which we consider a temp blob to be 609 * dangling (not being actively uploaded to) and up for reclamation. 610 * 611 * So e.g. if this is 60, then any temporary blobs more than a minute old 612 * would be considered dangling. 613 */ 614 static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds"; 615 private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600; 616 static final String PATH_DELIMITER = Path.SEPARATOR; 617 static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$"; 618 619 private static final int AZURE_LIST_ALL = -1; 620 private static final int AZURE_UNBOUNDED_DEPTH = -1; 621 622 private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L; 623 624 /** 625 * The configuration property that determines which group owns files created 626 * in WASB. 627 */ 628 private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup"; 629 /** 630 * The default value for fs.azure.permissions.supergroup. Chosen as the same 631 * default as DFS. 632 */ 633 static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup"; 634 635 static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = 636 "fs.azure.block.location.impersonatedhost"; 637 private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = 638 "localhost"; 639 static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME = 640 "fs.azure.ring.buffer.capacity"; 641 static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME = 642 "fs.azure.output.stream.buffer.size"; 643 644 private class NativeAzureFsInputStream extends FSInputStream { 645 private InputStream in; 646 private final String key; 647 private long pos = 0; 648 private boolean closed = false; 649 private boolean isPageBlob; 650 651 // File length, valid only for streams over block blobs. 652 private long fileLength; 653 654 public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) { 655 this.in = in; 656 this.key = key; 657 this.isPageBlob = store.isPageBlobKey(key); 658 this.fileLength = fileLength; 659 } 660 661 /** 662 * Return the size of the remaining available bytes 663 * if the size is less than or equal to {@link Integer#MAX_VALUE}, 664 * otherwise, return {@link Integer#MAX_VALUE}. 665 * 666 * This is to match the behavior of DFSInputStream.available(), 667 * which some clients may rely on (HBase write-ahead log reading in 668 * particular). 669 */ 670 @Override 671 public synchronized int available() throws IOException { 672 if (isPageBlob) { 673 return in.available(); 674 } else { 675 if (closed) { 676 throw new IOException("Stream closed"); 677 } 678 final long remaining = this.fileLength - pos; 679 return remaining <= Integer.MAX_VALUE ? 680 (int) remaining : Integer.MAX_VALUE; 681 } 682 } 683 684 /* 685 * Reads the next byte of data from the input stream. The value byte is 686 * returned as an integer in the range 0 to 255. If no byte is available 687 * because the end of the stream has been reached, the value -1 is returned. 688 * This method blocks until input data is available, the end of the stream 689 * is detected, or an exception is thrown. 690 * 691 * @returns int An integer corresponding to the byte read. 692 */ 693 @Override 694 public synchronized int read() throws IOException { 695 int result = 0; 696 result = in.read(); 697 if (result != -1) { 698 pos++; 699 if (statistics != null) { 700 statistics.incrementBytesRead(1); 701 } 702 } 703 704 // Return to the caller with the result. 705 // 706 return result; 707 } 708 709 /* 710 * Reads up to len bytes of data from the input stream into an array of 711 * bytes. An attempt is made to read as many as len bytes, but a smaller 712 * number may be read. The number of bytes actually read is returned as an 713 * integer. This method blocks until input data is available, end of file is 714 * detected, or an exception is thrown. If len is zero, then no bytes are 715 * read and 0 is returned; otherwise, there is an attempt to read at least 716 * one byte. If no byte is available because the stream is at end of file, 717 * the value -1 is returned; otherwise, at least one byte is read and stored 718 * into b. 719 * 720 * @param b -- the buffer into which data is read 721 * 722 * @param off -- the start offset in the array b at which data is written 723 * 724 * @param len -- the maximum number of bytes read 725 * 726 * @ returns int The total number of byes read into the buffer, or -1 if 727 * there is no more data because the end of stream is reached. 728 */ 729 @Override 730 public synchronized int read(byte[] b, int off, int len) throws IOException { 731 int result = 0; 732 result = in.read(b, off, len); 733 if (result > 0) { 734 pos += result; 735 } 736 737 if (null != statistics) { 738 statistics.incrementBytesRead(result); 739 } 740 741 // Return to the caller with the result. 742 return result; 743 } 744 745 @Override 746 public void close() throws IOException { 747 in.close(); 748 closed = true; 749 } 750 751 @Override 752 public synchronized void seek(long pos) throws IOException { 753 in.close(); 754 in = store.retrieve(key); 755 this.pos = in.skip(pos); 756 if (LOG.isDebugEnabled()) { 757 LOG.debug(String.format("Seek to position %d. Bytes skipped %d", pos, 758 this.pos)); 759 } 760 } 761 762 @Override 763 public synchronized long getPos() throws IOException { 764 return pos; 765 } 766 767 @Override 768 public boolean seekToNewSource(long targetPos) throws IOException { 769 return false; 770 } 771 } 772 773 private class NativeAzureFsOutputStream extends OutputStream { 774 // We should not override flush() to actually close current block and flush 775 // to DFS, this will break applications that assume flush() is a no-op. 776 // Applications are advised to use Syncable.hflush() for that purpose. 777 // NativeAzureFsOutputStream needs to implement Syncable if needed. 778 private String key; 779 private String keyEncoded; 780 private OutputStream out; 781 782 public NativeAzureFsOutputStream(OutputStream out, String aKey, 783 String anEncodedKey) throws IOException { 784 // Check input arguments. The output stream should be non-null and the 785 // keys 786 // should be valid strings. 787 if (null == out) { 788 throw new IllegalArgumentException( 789 "Illegal argument: the output stream is null."); 790 } 791 792 if (null == aKey || 0 == aKey.length()) { 793 throw new IllegalArgumentException( 794 "Illegal argument the key string is null or empty"); 795 } 796 797 if (null == anEncodedKey || 0 == anEncodedKey.length()) { 798 throw new IllegalArgumentException( 799 "Illegal argument the encoded key string is null or empty"); 800 } 801 802 // Initialize the member variables with the incoming parameters. 803 this.out = out; 804 805 setKey(aKey); 806 setEncodedKey(anEncodedKey); 807 } 808 809 @Override 810 public synchronized void close() throws IOException { 811 if (out != null) { 812 // Close the output stream and decode the key for the output stream 813 // before returning to the caller. 814 // 815 out.close(); 816 restoreKey(); 817 out = null; 818 } 819 } 820 821 /** 822 * Writes the specified byte to this output stream. The general contract for 823 * write is that one byte is written to the output stream. The byte to be 824 * written is the eight low-order bits of the argument b. The 24 high-order 825 * bits of b are ignored. 826 * 827 * @param b 828 * 32-bit integer of block of 4 bytes 829 */ 830 @Override 831 public void write(int b) throws IOException { 832 out.write(b); 833 } 834 835 /** 836 * Writes b.length bytes from the specified byte array to this output 837 * stream. The general contract for write(b) is that it should have exactly 838 * the same effect as the call write(b, 0, b.length). 839 * 840 * @param b 841 * Block of bytes to be written to the output stream. 842 */ 843 @Override 844 public void write(byte[] b) throws IOException { 845 out.write(b); 846 } 847 848 /** 849 * Writes <code>len</code> from the specified byte array starting at offset 850 * <code>off</code> to the output stream. The general contract for write(b, 851 * off, len) is that some of the bytes in the array <code> 852 * b</code b> are written to the output stream in order; element 853 * <code>b[off]</code> is the first byte written and 854 * <code>b[off+len-1]</code> is the last byte written by this operation. 855 * 856 * @param b 857 * Byte array to be written. 858 * @param off 859 * Write this offset in stream. 860 * @param len 861 * Number of bytes to be written. 862 */ 863 @Override 864 public void write(byte[] b, int off, int len) throws IOException { 865 out.write(b, off, len); 866 } 867 868 /** 869 * Get the blob name. 870 * 871 * @return String Blob name. 872 */ 873 public String getKey() { 874 return key; 875 } 876 877 /** 878 * Set the blob name. 879 * 880 * @param key 881 * Blob name. 882 */ 883 public void setKey(String key) { 884 this.key = key; 885 } 886 887 /** 888 * Get the blob name. 889 * 890 * @return String Blob name. 891 */ 892 public String getEncodedKey() { 893 return keyEncoded; 894 } 895 896 /** 897 * Set the blob name. 898 * 899 * @param anEncodedKey 900 * Blob name. 901 */ 902 public void setEncodedKey(String anEncodedKey) { 903 this.keyEncoded = anEncodedKey; 904 } 905 906 /** 907 * Restore the original key name from the m_key member variable. Note: The 908 * output file stream is created with an encoded blob store key to guarantee 909 * load balancing on the front end of the Azure storage partition servers. 910 * The create also includes the name of the original key value which is 911 * stored in the m_key member variable. This method should only be called 912 * when the stream is closed. 913 */ 914 private void restoreKey() throws IOException { 915 store.rename(getEncodedKey(), getKey()); 916 } 917 } 918 919 private URI uri; 920 private NativeFileSystemStore store; 921 private AzureNativeFileSystemStore actualStore; 922 private Path workingDir; 923 private long blockSize = MAX_AZURE_BLOCK_SIZE; 924 private AzureFileSystemInstrumentation instrumentation; 925 private String metricsSourceName; 926 private boolean isClosed = false; 927 private static boolean suppressRetryPolicy = false; 928 // A counter to create unique (within-process) names for my metrics sources. 929 private static AtomicInteger metricsSourceNameCounter = new AtomicInteger(); 930 931 932 public NativeAzureFileSystem() { 933 // set store in initialize() 934 } 935 936 public NativeAzureFileSystem(NativeFileSystemStore store) { 937 this.store = store; 938 } 939 940 /** 941 * Suppress the default retry policy for the Storage, useful in unit tests to 942 * test negative cases without waiting forever. 943 */ 944 @VisibleForTesting 945 static void suppressRetryPolicy() { 946 suppressRetryPolicy = true; 947 } 948 949 /** 950 * Undo the effect of suppressRetryPolicy. 951 */ 952 @VisibleForTesting 953 static void resumeRetryPolicy() { 954 suppressRetryPolicy = false; 955 } 956 957 /** 958 * Creates a new metrics source name that's unique within this process. 959 */ 960 @VisibleForTesting 961 public static String newMetricsSourceName() { 962 int number = metricsSourceNameCounter.incrementAndGet(); 963 final String baseName = "AzureFileSystemMetrics"; 964 if (number == 1) { // No need for a suffix for the first one 965 return baseName; 966 } else { 967 return baseName + number; 968 } 969 } 970 971 /** 972 * Checks if the given URI scheme is a scheme that's affiliated with the Azure 973 * File System. 974 * 975 * @param scheme 976 * The URI scheme. 977 * @return true iff it's an Azure File System URI scheme. 978 */ 979 private static boolean isWasbScheme(String scheme) { 980 // The valid schemes are: asv (old name), asvs (old name over HTTPS), 981 // wasb (new name), wasbs (new name over HTTPS). 982 return scheme != null 983 && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs") 984 || scheme.equalsIgnoreCase("wasb") || scheme 985 .equalsIgnoreCase("wasbs")); 986 } 987 988 /** 989 * Puts in the authority of the default file system if it is a WASB file 990 * system and the given URI's authority is null. 991 * 992 * @return The URI with reconstructed authority if necessary and possible. 993 */ 994 private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) { 995 if (null == uri.getAuthority()) { 996 // If WASB is the default file system, get the authority from there 997 URI defaultUri = FileSystem.getDefaultUri(conf); 998 if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) { 999 try { 1000 // Reconstruct the URI with the authority from the default URI. 1001 return new URI(uri.getScheme(), defaultUri.getAuthority(), 1002 uri.getPath(), uri.getQuery(), uri.getFragment()); 1003 } catch (URISyntaxException e) { 1004 // This should never happen. 1005 throw new Error("Bad URI construction", e); 1006 } 1007 } 1008 } 1009 return uri; 1010 } 1011 1012 @Override 1013 protected void checkPath(Path path) { 1014 // Make sure to reconstruct the path's authority if needed 1015 super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(), 1016 getConf()))); 1017 } 1018 1019 @Override 1020 public void initialize(URI uri, Configuration conf) 1021 throws IOException, IllegalArgumentException { 1022 // Check authority for the URI to guarantee that it is non-null. 1023 uri = reconstructAuthorityIfNeeded(uri, conf); 1024 if (null == uri.getAuthority()) { 1025 final String errMsg = String 1026 .format("Cannot initialize WASB file system, URI authority not recognized."); 1027 throw new IllegalArgumentException(errMsg); 1028 } 1029 super.initialize(uri, conf); 1030 1031 if (store == null) { 1032 store = createDefaultStore(conf); 1033 } 1034 1035 // Make sure the metrics system is available before interacting with Azure 1036 AzureFileSystemMetricsSystem.fileSystemStarted(); 1037 metricsSourceName = newMetricsSourceName(); 1038 String sourceDesc = "Azure Storage Volume File System metrics"; 1039 instrumentation = new AzureFileSystemInstrumentation(conf); 1040 AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, 1041 instrumentation); 1042 1043 store.initialize(uri, conf, instrumentation); 1044 setConf(conf); 1045 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 1046 this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() 1047 .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); 1048 this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, 1049 MAX_AZURE_BLOCK_SIZE); 1050 1051 if (LOG.isDebugEnabled()) { 1052 LOG.debug("NativeAzureFileSystem. Initializing."); 1053 LOG.debug(" blockSize = " 1054 + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); 1055 } 1056 } 1057 1058 private NativeFileSystemStore createDefaultStore(Configuration conf) { 1059 actualStore = new AzureNativeFileSystemStore(); 1060 1061 if (suppressRetryPolicy) { 1062 actualStore.suppressRetryPolicy(); 1063 } 1064 return actualStore; 1065 } 1066 1067 /** 1068 * Azure Storage doesn't allow the blob names to end in a period, 1069 * so encode this here to work around that limitation. 1070 */ 1071 private static String encodeTrailingPeriod(String toEncode) { 1072 Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode); 1073 return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER); 1074 } 1075 1076 /** 1077 * Reverse the encoding done by encodeTrailingPeriod(). 1078 */ 1079 private static String decodeTrailingPeriod(String toDecode) { 1080 Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode); 1081 return matcher.replaceAll("."); 1082 } 1083 1084 /** 1085 * Convert the path to a key. By convention, any leading or trailing slash is 1086 * removed, except for the special case of a single slash. 1087 */ 1088 @VisibleForTesting 1089 public String pathToKey(Path path) { 1090 // Convert the path to a URI to parse the scheme, the authority, and the 1091 // path from the path object. 1092 URI tmpUri = path.toUri(); 1093 String pathUri = tmpUri.getPath(); 1094 1095 // The scheme and authority is valid. If the path does not exist add a "/" 1096 // separator to list the root of the container. 1097 Path newPath = path; 1098 if ("".equals(pathUri)) { 1099 newPath = new Path(tmpUri.toString() + Path.SEPARATOR); 1100 } 1101 1102 // Verify path is absolute if the path refers to a windows drive scheme. 1103 if (!newPath.isAbsolute()) { 1104 throw new IllegalArgumentException("Path must be absolute: " + path); 1105 } 1106 1107 String key = null; 1108 key = newPath.toUri().getPath(); 1109 key = removeTrailingSlash(key); 1110 key = encodeTrailingPeriod(key); 1111 if (key.length() == 1) { 1112 return key; 1113 } else { 1114 return key.substring(1); // remove initial slash 1115 } 1116 } 1117 1118 // Remove any trailing slash except for the case of a single slash. 1119 private static String removeTrailingSlash(String key) { 1120 if (key.length() == 0 || key.length() == 1) { 1121 return key; 1122 } 1123 if (key.charAt(key.length() - 1) == '/') { 1124 return key.substring(0, key.length() - 1); 1125 } else { 1126 return key; 1127 } 1128 } 1129 1130 private static Path keyToPath(String key) { 1131 if (key.equals("/")) { 1132 return new Path("/"); // container 1133 } 1134 return new Path("/" + decodeTrailingPeriod(key)); 1135 } 1136 1137 /** 1138 * Get the absolute version of the path (fully qualified). 1139 * This is public for testing purposes. 1140 * 1141 * @param path 1142 * @return fully qualified path 1143 */ 1144 @VisibleForTesting 1145 public Path makeAbsolute(Path path) { 1146 if (path.isAbsolute()) { 1147 return path; 1148 } 1149 return new Path(workingDir, path); 1150 } 1151 1152 /** 1153 * For unit test purposes, retrieves the AzureNativeFileSystemStore store 1154 * backing this file system. 1155 * 1156 * @return The store object. 1157 */ 1158 @VisibleForTesting 1159 public AzureNativeFileSystemStore getStore() { 1160 return actualStore; 1161 } 1162 1163 NativeFileSystemStore getStoreInterface() { 1164 return store; 1165 } 1166 1167 /** 1168 * Gets the metrics source for this file system. 1169 * This is mainly here for unit testing purposes. 1170 * 1171 * @return the metrics source. 1172 */ 1173 public AzureFileSystemInstrumentation getInstrumentation() { 1174 return instrumentation; 1175 } 1176 1177 /** This optional operation is not yet supported. */ 1178 @Override 1179 public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) 1180 throws IOException { 1181 throw new IOException("Not supported"); 1182 } 1183 1184 @Override 1185 public FSDataOutputStream create(Path f, FsPermission permission, 1186 boolean overwrite, int bufferSize, short replication, long blockSize, 1187 Progressable progress) throws IOException { 1188 return create(f, permission, overwrite, true, 1189 bufferSize, replication, blockSize, progress, 1190 (SelfRenewingLease) null); 1191 } 1192 1193 /** 1194 * Get a self-renewing lease on the specified file. 1195 */ 1196 public SelfRenewingLease acquireLease(Path path) throws AzureException { 1197 String fullKey = pathToKey(makeAbsolute(path)); 1198 return getStore().acquireLease(fullKey); 1199 } 1200 1201 @Override 1202 @SuppressWarnings("deprecation") 1203 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1204 boolean overwrite, int bufferSize, short replication, long blockSize, 1205 Progressable progress) throws IOException { 1206 1207 Path parent = f.getParent(); 1208 1209 // Get exclusive access to folder if this is a directory designated 1210 // for atomic rename. The primary use case of for HBase write-ahead 1211 // log file management. 1212 SelfRenewingLease lease = null; 1213 if (store.isAtomicRenameKey(pathToKey(f))) { 1214 try { 1215 lease = acquireLease(parent); 1216 } catch (AzureException e) { 1217 1218 String errorCode = ""; 1219 try { 1220 StorageException e2 = (StorageException) e.getCause(); 1221 errorCode = e2.getErrorCode(); 1222 } catch (Exception e3) { 1223 // do nothing if cast fails 1224 } 1225 if (errorCode.equals("BlobNotFound")) { 1226 throw new FileNotFoundException("Cannot create file " + 1227 f.getName() + " because parent folder does not exist."); 1228 } 1229 1230 LOG.warn("Got unexpected exception trying to get lease on " 1231 + pathToKey(parent) + ". " + e.getMessage()); 1232 throw e; 1233 } 1234 } 1235 1236 // See if the parent folder exists. If not, throw error. 1237 // The exists() check will push any pending rename operation forward, 1238 // if there is one, and return false. 1239 // 1240 // At this point, we have exclusive access to the source folder 1241 // via the lease, so we will not conflict with an active folder 1242 // rename operation. 1243 if (!exists(parent)) { 1244 try { 1245 1246 // This'll let the keep-alive thread exit as soon as it wakes up. 1247 lease.free(); 1248 } catch (Exception e) { 1249 LOG.warn("Unable to free lease because: " + e.getMessage()); 1250 } 1251 throw new FileNotFoundException("Cannot create file " + 1252 f.getName() + " because parent folder does not exist."); 1253 } 1254 1255 // Create file inside folder. 1256 FSDataOutputStream out = null; 1257 try { 1258 out = create(f, permission, overwrite, false, 1259 bufferSize, replication, blockSize, progress, lease); 1260 } finally { 1261 // Release exclusive access to folder. 1262 try { 1263 if (lease != null) { 1264 lease.free(); 1265 } 1266 } catch (Exception e) { 1267 IOUtils.cleanup(LOG, out); 1268 String msg = "Unable to free lease on " + parent.toUri(); 1269 LOG.error(msg); 1270 throw new IOException(msg, e); 1271 } 1272 } 1273 return out; 1274 } 1275 1276 @Override 1277 @SuppressWarnings("deprecation") 1278 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1279 EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, 1280 Progressable progress) throws IOException { 1281 1282 // Check if file should be appended or overwritten. Assume that the file 1283 // is overwritten on if the CREATE and OVERWRITE create flags are set. Note 1284 // that any other combinations of create flags will result in an open new or 1285 // open with append. 1286 final EnumSet<CreateFlag> createflags = 1287 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); 1288 boolean overwrite = flags.containsAll(createflags); 1289 1290 // Delegate the create non-recursive call. 1291 return this.createNonRecursive(f, permission, overwrite, 1292 bufferSize, replication, blockSize, progress); 1293 } 1294 1295 @Override 1296 @SuppressWarnings("deprecation") 1297 public FSDataOutputStream createNonRecursive(Path f, 1298 boolean overwrite, int bufferSize, short replication, long blockSize, 1299 Progressable progress) throws IOException { 1300 return this.createNonRecursive(f, FsPermission.getFileDefault(), 1301 overwrite, bufferSize, replication, blockSize, progress); 1302 } 1303 1304 1305 /** 1306 * Create an Azure blob and return an output stream to use 1307 * to write data to it. 1308 * 1309 * @param f 1310 * @param permission 1311 * @param overwrite 1312 * @param createParent 1313 * @param bufferSize 1314 * @param replication 1315 * @param blockSize 1316 * @param progress 1317 * @param parentFolderLease Lease on parent folder (or null if 1318 * no lease). 1319 * @return 1320 * @throws IOException 1321 */ 1322 private FSDataOutputStream create(Path f, FsPermission permission, 1323 boolean overwrite, boolean createParent, int bufferSize, 1324 short replication, long blockSize, Progressable progress, 1325 SelfRenewingLease parentFolderLease) 1326 throws IOException { 1327 1328 if (LOG.isDebugEnabled()) { 1329 LOG.debug("Creating file: " + f.toString()); 1330 } 1331 1332 if (containsColon(f)) { 1333 throw new IOException("Cannot create file " + f 1334 + " through WASB that has colons in the name"); 1335 } 1336 1337 Path absolutePath = makeAbsolute(f); 1338 String key = pathToKey(absolutePath); 1339 1340 FileMetadata existingMetadata = store.retrieveMetadata(key); 1341 if (existingMetadata != null) { 1342 if (existingMetadata.isDir()) { 1343 throw new IOException("Cannot create file " + f 1344 + "; already exists as a directory."); 1345 } 1346 if (!overwrite) { 1347 throw new IOException("File already exists:" + f); 1348 } 1349 } 1350 1351 Path parentFolder = absolutePath.getParent(); 1352 if (parentFolder != null && parentFolder.getParent() != null) { // skip root 1353 // Update the parent folder last modified time if the parent folder 1354 // already exists. 1355 String parentKey = pathToKey(parentFolder); 1356 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 1357 if (parentMetadata != null && parentMetadata.isDir() && 1358 parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { 1359 store.updateFolderLastModifiedTime(parentKey, parentFolderLease); 1360 } else { 1361 // Make sure that the parent folder exists. 1362 // Create it using inherited permissions from the first existing directory going up the path 1363 Path firstExisting = parentFolder.getParent(); 1364 FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1365 while(metadata == null) { 1366 // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata 1367 firstExisting = firstExisting.getParent(); 1368 metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1369 } 1370 mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true); 1371 } 1372 } 1373 1374 // Mask the permission first (with the default permission mask as well). 1375 FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile); 1376 PermissionStatus permissionStatus = createPermissionStatus(masked); 1377 1378 OutputStream bufOutStream; 1379 if (store.isPageBlobKey(key)) { 1380 // Store page blobs directly in-place without renames. 1381 bufOutStream = store.storefile(key, permissionStatus); 1382 } else { 1383 // This is a block blob, so open the output blob stream based on the 1384 // encoded key. 1385 // 1386 String keyEncoded = encodeKey(key); 1387 1388 1389 // First create a blob at the real key, pointing back to the temporary file 1390 // This accomplishes a few things: 1391 // 1. Makes sure we can create a file there. 1392 // 2. Makes it visible to other concurrent threads/processes/nodes what 1393 // we're 1394 // doing. 1395 // 3. Makes it easier to restore/cleanup data in the event of us crashing. 1396 store.storeEmptyLinkFile(key, keyEncoded, permissionStatus); 1397 1398 // The key is encoded to point to a common container at the storage server. 1399 // This reduces the number of splits on the server side when load balancing. 1400 // Ingress to Azure storage can take advantage of earlier splits. We remove 1401 // the root path to the key and prefix a random GUID to the tail (or leaf 1402 // filename) of the key. Keys are thus broadly and randomly distributed over 1403 // a single container to ease load balancing on the storage server. When the 1404 // blob is committed it is renamed to its earlier key. Uncommitted blocks 1405 // are not cleaned up and we leave it to Azure storage to garbage collect 1406 // these 1407 // blocks. 1408 bufOutStream = new NativeAzureFsOutputStream(store.storefile( 1409 keyEncoded, permissionStatus), key, keyEncoded); 1410 } 1411 // Construct the data output stream from the buffered output stream. 1412 FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); 1413 1414 1415 // Increment the counter 1416 instrumentation.fileCreated(); 1417 1418 // Return data output stream to caller. 1419 return fsOut; 1420 } 1421 1422 @Override 1423 @Deprecated 1424 public boolean delete(Path path) throws IOException { 1425 return delete(path, true); 1426 } 1427 1428 @Override 1429 public boolean delete(Path f, boolean recursive) throws IOException { 1430 return delete(f, recursive, false); 1431 } 1432 1433 /** 1434 * Delete the specified file or folder. The parameter 1435 * skipParentFolderLastModifidedTimeUpdate 1436 * is used in the case of atomic folder rename redo. In that case, there is 1437 * a lease on the parent folder, so (without reworking the code) modifying 1438 * the parent folder update time will fail because of a conflict with the 1439 * lease. Since we are going to delete the folder soon anyway so accurate 1440 * modified time is not necessary, it's easier to just skip 1441 * the modified time update. 1442 * 1443 * @param f 1444 * @param recursive 1445 * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last 1446 * modified time. 1447 * @return true if and only if the file is deleted 1448 * @throws IOException 1449 */ 1450 public boolean delete(Path f, boolean recursive, 1451 boolean skipParentFolderLastModifidedTimeUpdate) throws IOException { 1452 1453 if (LOG.isDebugEnabled()) { 1454 LOG.debug("Deleting file: " + f.toString()); 1455 } 1456 1457 Path absolutePath = makeAbsolute(f); 1458 String key = pathToKey(absolutePath); 1459 1460 // Capture the metadata for the path. 1461 // 1462 FileMetadata metaFile = store.retrieveMetadata(key); 1463 1464 if (null == metaFile) { 1465 // The path to be deleted does not exist. 1466 return false; 1467 } 1468 1469 // The path exists, determine if it is a folder containing objects, 1470 // an empty folder, or a simple file and take the appropriate actions. 1471 if (!metaFile.isDir()) { 1472 // The path specifies a file. We need to check the parent path 1473 // to make sure it's a proper materialized directory before we 1474 // delete the file. Otherwise we may get into a situation where 1475 // the file we were deleting was the last one in an implicit directory 1476 // (e.g. the blob store only contains the blob a/b and there's no 1477 // corresponding directory blob a) and that would implicitly delete 1478 // the directory as well, which is not correct. 1479 Path parentPath = absolutePath.getParent(); 1480 if (parentPath.getParent() != null) {// Not root 1481 String parentKey = pathToKey(parentPath); 1482 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 1483 if (!parentMetadata.isDir()) { 1484 // Invalid state: the parent path is actually a file. Throw. 1485 throw new AzureException("File " + f + " has a parent directory " 1486 + parentPath + " which is also a file. Can't resolve."); 1487 } 1488 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1489 if (LOG.isDebugEnabled()) { 1490 LOG.debug("Found an implicit parent directory while trying to" 1491 + " delete the file " + f + ". Creating the directory blob for" 1492 + " it in " + parentKey + "."); 1493 } 1494 store.storeEmptyFolder(parentKey, 1495 createPermissionStatus(FsPermission.getDefault())); 1496 } else { 1497 if (!skipParentFolderLastModifidedTimeUpdate) { 1498 store.updateFolderLastModifiedTime(parentKey, null); 1499 } 1500 } 1501 } 1502 store.delete(key); 1503 instrumentation.fileDeleted(); 1504 } else { 1505 // The path specifies a folder. Recursively delete all entries under the 1506 // folder. 1507 Path parentPath = absolutePath.getParent(); 1508 if (parentPath.getParent() != null) { 1509 String parentKey = pathToKey(parentPath); 1510 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 1511 1512 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1513 if (LOG.isDebugEnabled()) { 1514 LOG.debug("Found an implicit parent directory while trying to" 1515 + " delete the directory " + f 1516 + ". Creating the directory blob for" + " it in " + parentKey 1517 + "."); 1518 } 1519 store.storeEmptyFolder(parentKey, 1520 createPermissionStatus(FsPermission.getDefault())); 1521 } 1522 } 1523 1524 // List all the blobs in the current folder. 1525 String priorLastKey = null; 1526 PartialListing listing = store.listAll(key, AZURE_LIST_ALL, 1, 1527 priorLastKey); 1528 FileMetadata[] contents = listing.getFiles(); 1529 if (!recursive && contents.length > 0) { 1530 // The folder is non-empty and recursive delete was not specified. 1531 // Throw an exception indicating that a non-recursive delete was 1532 // specified for a non-empty folder. 1533 throw new IOException("Non-recursive delete of non-empty directory " 1534 + f.toString()); 1535 } 1536 1537 // Delete all the files in the folder. 1538 for (FileMetadata p : contents) { 1539 // Tag on the directory name found as the suffix of the suffix of the 1540 // parent directory to get the new absolute path. 1541 String suffix = p.getKey().substring( 1542 p.getKey().lastIndexOf(PATH_DELIMITER)); 1543 if (!p.isDir()) { 1544 store.delete(key + suffix); 1545 instrumentation.fileDeleted(); 1546 } else { 1547 // Recursively delete contents of the sub-folders. Notice this also 1548 // deletes the blob for the directory. 1549 if (!delete(new Path(f.toString() + suffix), true)) { 1550 return false; 1551 } 1552 } 1553 } 1554 store.delete(key); 1555 1556 // Update parent directory last modified time 1557 Path parent = absolutePath.getParent(); 1558 if (parent != null && parent.getParent() != null) { // not root 1559 String parentKey = pathToKey(parent); 1560 if (!skipParentFolderLastModifidedTimeUpdate) { 1561 store.updateFolderLastModifiedTime(parentKey, null); 1562 } 1563 } 1564 instrumentation.directoryDeleted(); 1565 } 1566 1567 // File or directory was successfully deleted. 1568 return true; 1569 } 1570 1571 @Override 1572 public FileStatus getFileStatus(Path f) throws IOException { 1573 1574 if (LOG.isDebugEnabled()) { 1575 LOG.debug("Getting the file status for " + f.toString()); 1576 } 1577 1578 // Capture the absolute path and the path to key. 1579 Path absolutePath = makeAbsolute(f); 1580 String key = pathToKey(absolutePath); 1581 if (key.length() == 0) { // root always exists 1582 return newDirectory(null, absolutePath); 1583 } 1584 1585 // The path is either a folder or a file. Retrieve metadata to 1586 // determine if it is a directory or file. 1587 FileMetadata meta = store.retrieveMetadata(key); 1588 if (meta != null) { 1589 if (meta.isDir()) { 1590 // The path is a folder with files in it. 1591 // 1592 if (LOG.isDebugEnabled()) { 1593 LOG.debug("Path " + f.toString() + "is a folder."); 1594 } 1595 1596 // If a rename operation for the folder was pending, redo it. 1597 // Then the file does not exist, so signal that. 1598 if (conditionalRedoFolderRename(f)) { 1599 throw new FileNotFoundException( 1600 absolutePath + ": No such file or directory."); 1601 } 1602 1603 // Return reference to the directory object. 1604 return newDirectory(meta, absolutePath); 1605 } 1606 1607 // The path is a file. 1608 if (LOG.isDebugEnabled()) { 1609 LOG.debug("Found the path: " + f.toString() + " as a file."); 1610 } 1611 1612 // Return with reference to a file object. 1613 return newFile(meta, absolutePath); 1614 } 1615 1616 // File not found. Throw exception no such file or directory. 1617 // 1618 throw new FileNotFoundException( 1619 absolutePath + ": No such file or directory."); 1620 } 1621 1622 // Return true if there is a rename pending and we redo it, otherwise false. 1623 private boolean conditionalRedoFolderRename(Path f) throws IOException { 1624 1625 // Can't rename /, so return immediately in that case. 1626 if (f.getName().equals("")) { 1627 return false; 1628 } 1629 1630 // Check if there is a -RenamePending.json file for this folder, and if so, 1631 // redo the rename. 1632 Path absoluteRenamePendingFile = renamePendingFilePath(f); 1633 if (exists(absoluteRenamePendingFile)) { 1634 FolderRenamePending pending = 1635 new FolderRenamePending(absoluteRenamePendingFile, this); 1636 pending.redo(); 1637 return true; 1638 } else { 1639 return false; 1640 } 1641 } 1642 1643 // Return the path name that would be used for rename of folder with path f. 1644 private Path renamePendingFilePath(Path f) { 1645 Path absPath = makeAbsolute(f); 1646 String key = pathToKey(absPath); 1647 key += "-RenamePending.json"; 1648 return keyToPath(key); 1649 } 1650 1651 @Override 1652 public URI getUri() { 1653 return uri; 1654 } 1655 1656 /** 1657 * Retrieve the status of a given path if it is a file, or of all the 1658 * contained files if it is a directory. 1659 */ 1660 @Override 1661 public FileStatus[] listStatus(Path f) throws IOException { 1662 1663 if (LOG.isDebugEnabled()) { 1664 LOG.debug("Listing status for " + f.toString()); 1665 } 1666 1667 Path absolutePath = makeAbsolute(f); 1668 String key = pathToKey(absolutePath); 1669 Set<FileStatus> status = new TreeSet<FileStatus>(); 1670 FileMetadata meta = store.retrieveMetadata(key); 1671 1672 if (meta != null) { 1673 if (!meta.isDir()) { 1674 if (LOG.isDebugEnabled()) { 1675 LOG.debug("Found path as a file"); 1676 } 1677 return new FileStatus[] { newFile(meta, absolutePath) }; 1678 } 1679 String partialKey = null; 1680 PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 1681 1682 // For any -RenamePending.json files in the listing, 1683 // push the rename forward. 1684 boolean renamed = conditionalRedoFolderRenames(listing); 1685 1686 // If any renames were redone, get another listing, 1687 // since the current one may have changed due to the redo. 1688 if (renamed) { 1689 listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 1690 } 1691 1692 for (FileMetadata fileMetadata : listing.getFiles()) { 1693 Path subpath = keyToPath(fileMetadata.getKey()); 1694 1695 // Test whether the metadata represents a file or directory and 1696 // add the appropriate metadata object. 1697 // 1698 // Note: There was a very old bug here where directories were added 1699 // to the status set as files flattening out recursive listings 1700 // using "-lsr" down the file system hierarchy. 1701 if (fileMetadata.isDir()) { 1702 // Make sure we hide the temp upload folder 1703 if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) { 1704 // Don't expose that. 1705 continue; 1706 } 1707 status.add(newDirectory(fileMetadata, subpath)); 1708 } else { 1709 status.add(newFile(fileMetadata, subpath)); 1710 } 1711 } 1712 if (LOG.isDebugEnabled()) { 1713 LOG.debug("Found path as a directory with " + status.size() 1714 + " files in it."); 1715 } 1716 } else { 1717 // There is no metadata found for the path. 1718 if (LOG.isDebugEnabled()) { 1719 LOG.debug("Did not find any metadata for path: " + key); 1720 } 1721 1722 throw new FileNotFoundException("File" + f + " does not exist."); 1723 } 1724 1725 return status.toArray(new FileStatus[0]); 1726 } 1727 1728 // Redo any folder renames needed if there are rename pending files in the 1729 // directory listing. Return true if one or more redo operations were done. 1730 private boolean conditionalRedoFolderRenames(PartialListing listing) 1731 throws IllegalArgumentException, IOException { 1732 boolean renamed = false; 1733 for (FileMetadata fileMetadata : listing.getFiles()) { 1734 Path subpath = keyToPath(fileMetadata.getKey()); 1735 if (isRenamePendingFile(subpath)) { 1736 FolderRenamePending pending = 1737 new FolderRenamePending(subpath, this); 1738 pending.redo(); 1739 renamed = true; 1740 } 1741 } 1742 return renamed; 1743 } 1744 1745 // True if this is a folder rename pending file, else false. 1746 private boolean isRenamePendingFile(Path path) { 1747 return path.toString().endsWith(FolderRenamePending.SUFFIX); 1748 } 1749 1750 private FileStatus newFile(FileMetadata meta, Path path) { 1751 return new FileStatus ( 1752 meta.getLength(), 1753 false, 1754 1, 1755 blockSize, 1756 meta.getLastModified(), 1757 0, 1758 meta.getPermissionStatus().getPermission(), 1759 meta.getPermissionStatus().getUserName(), 1760 meta.getPermissionStatus().getGroupName(), 1761 path.makeQualified(getUri(), getWorkingDirectory())); 1762 } 1763 1764 private FileStatus newDirectory(FileMetadata meta, Path path) { 1765 return new FileStatus ( 1766 0, 1767 true, 1768 1, 1769 blockSize, 1770 meta == null ? 0 : meta.getLastModified(), 1771 0, 1772 meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(), 1773 meta == null ? "" : meta.getPermissionStatus().getUserName(), 1774 meta == null ? "" : meta.getPermissionStatus().getGroupName(), 1775 path.makeQualified(getUri(), getWorkingDirectory())); 1776 } 1777 1778 private static enum UMaskApplyMode { 1779 NewFile, 1780 NewDirectory, 1781 NewDirectoryNoUmask, 1782 ChangeExistingFile, 1783 ChangeExistingDirectory, 1784 } 1785 1786 /** 1787 * Applies the applicable UMASK's on the given permission. 1788 * 1789 * @param permission 1790 * The permission to mask. 1791 * @param applyMode 1792 * Whether to also apply the default umask. 1793 * @return The masked persmission. 1794 */ 1795 private FsPermission applyUMask(final FsPermission permission, 1796 final UMaskApplyMode applyMode) { 1797 FsPermission newPermission = new FsPermission(permission); 1798 // Apply the default umask - this applies for new files or directories. 1799 if (applyMode == UMaskApplyMode.NewFile 1800 || applyMode == UMaskApplyMode.NewDirectory) { 1801 newPermission = newPermission 1802 .applyUMask(FsPermission.getUMask(getConf())); 1803 } 1804 return newPermission; 1805 } 1806 1807 /** 1808 * Creates the PermissionStatus object to use for the given permission, based 1809 * on the current user in context. 1810 * 1811 * @param permission 1812 * The permission for the file. 1813 * @return The permission status object to use. 1814 * @throws IOException 1815 * If login fails in getCurrentUser 1816 */ 1817 private PermissionStatus createPermissionStatus(FsPermission permission) 1818 throws IOException { 1819 // Create the permission status for this file based on current user 1820 return new PermissionStatus( 1821 UserGroupInformation.getCurrentUser().getShortUserName(), 1822 getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME, 1823 AZURE_DEFAULT_GROUP_DEFAULT), 1824 permission); 1825 } 1826 1827 @Override 1828 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 1829 return mkdirs(f, permission, false); 1830 } 1831 1832 public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException { 1833 if (LOG.isDebugEnabled()) { 1834 LOG.debug("Creating directory: " + f.toString()); 1835 } 1836 1837 if (containsColon(f)) { 1838 throw new IOException("Cannot create directory " + f 1839 + " through WASB that has colons in the name"); 1840 } 1841 1842 Path absolutePath = makeAbsolute(f); 1843 PermissionStatus permissionStatus = null; 1844 if(noUmask) { 1845 // ensure owner still has wx permissions at the minimum 1846 permissionStatus = createPermissionStatus( 1847 applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)), 1848 UMaskApplyMode.NewDirectoryNoUmask)); 1849 } else { 1850 permissionStatus = createPermissionStatus( 1851 applyUMask(permission, UMaskApplyMode.NewDirectory)); 1852 } 1853 1854 1855 ArrayList<String> keysToCreateAsFolder = new ArrayList<String>(); 1856 ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>(); 1857 boolean childCreated = false; 1858 // Check that there is no file in the parent chain of the given path. 1859 for (Path current = absolutePath, parent = current.getParent(); 1860 parent != null; // Stop when you get to the root 1861 current = parent, parent = current.getParent()) { 1862 String currentKey = pathToKey(current); 1863 FileMetadata currentMetadata = store.retrieveMetadata(currentKey); 1864 if (currentMetadata != null && !currentMetadata.isDir()) { 1865 throw new IOException("Cannot create directory " + f + " because " + 1866 current + " is an existing file."); 1867 } else if (currentMetadata == null) { 1868 keysToCreateAsFolder.add(currentKey); 1869 childCreated = true; 1870 } else { 1871 // The directory already exists. Its last modified time need to be 1872 // updated if there is a child directory created under it. 1873 if (childCreated) { 1874 keysToUpdateAsFolder.add(currentKey); 1875 } 1876 childCreated = false; 1877 } 1878 } 1879 1880 for (String currentKey : keysToCreateAsFolder) { 1881 store.storeEmptyFolder(currentKey, permissionStatus); 1882 } 1883 1884 instrumentation.directoryCreated(); 1885 1886 // otherwise throws exception 1887 return true; 1888 } 1889 1890 @Override 1891 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 1892 if (LOG.isDebugEnabled()) { 1893 LOG.debug("Opening file: " + f.toString()); 1894 } 1895 1896 Path absolutePath = makeAbsolute(f); 1897 String key = pathToKey(absolutePath); 1898 FileMetadata meta = store.retrieveMetadata(key); 1899 if (meta == null) { 1900 throw new FileNotFoundException(f.toString()); 1901 } 1902 if (meta.isDir()) { 1903 throw new FileNotFoundException(f.toString() 1904 + " is a directory not a file."); 1905 } 1906 1907 return new FSDataInputStream(new BufferedFSInputStream( 1908 new NativeAzureFsInputStream(store.retrieve(key), key, meta.getLength()), bufferSize)); 1909 } 1910 1911 @Override 1912 public boolean rename(Path src, Path dst) throws IOException { 1913 1914 FolderRenamePending renamePending = null; 1915 1916 if (LOG.isDebugEnabled()) { 1917 LOG.debug("Moving " + src + " to " + dst); 1918 } 1919 1920 if (containsColon(dst)) { 1921 throw new IOException("Cannot rename to file " + dst 1922 + " through WASB that has colons in the name"); 1923 } 1924 1925 String srcKey = pathToKey(makeAbsolute(src)); 1926 1927 if (srcKey.length() == 0) { 1928 // Cannot rename root of file system 1929 return false; 1930 } 1931 1932 // Figure out the final destination 1933 Path absoluteDst = makeAbsolute(dst); 1934 String dstKey = pathToKey(absoluteDst); 1935 FileMetadata dstMetadata = store.retrieveMetadata(dstKey); 1936 if (dstMetadata != null && dstMetadata.isDir()) { 1937 // It's an existing directory. 1938 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 1939 if (LOG.isDebugEnabled()) { 1940 LOG.debug("Destination " + dst 1941 + " is a directory, adjusted the destination to be " + dstKey); 1942 } 1943 } else if (dstMetadata != null) { 1944 // Attempting to overwrite a file using rename() 1945 if (LOG.isDebugEnabled()) { 1946 LOG.debug("Destination " + dst 1947 + " is an already existing file, failing the rename."); 1948 } 1949 return false; 1950 } else { 1951 // Check that the parent directory exists. 1952 FileMetadata parentOfDestMetadata = 1953 store.retrieveMetadata(pathToKey(absoluteDst.getParent())); 1954 if (parentOfDestMetadata == null) { 1955 if (LOG.isDebugEnabled()) { 1956 LOG.debug("Parent of the destination " + dst 1957 + " doesn't exist, failing the rename."); 1958 } 1959 return false; 1960 } else if (!parentOfDestMetadata.isDir()) { 1961 if (LOG.isDebugEnabled()) { 1962 LOG.debug("Parent of the destination " + dst 1963 + " is a file, failing the rename."); 1964 } 1965 return false; 1966 } 1967 } 1968 FileMetadata srcMetadata = store.retrieveMetadata(srcKey); 1969 if (srcMetadata == null) { 1970 // Source doesn't exist 1971 if (LOG.isDebugEnabled()) { 1972 LOG.debug("Source " + src + " doesn't exist, failing the rename."); 1973 } 1974 return false; 1975 } else if (!srcMetadata.isDir()) { 1976 if (LOG.isDebugEnabled()) { 1977 LOG.debug("Source " + src + " found as a file, renaming."); 1978 } 1979 store.rename(srcKey, dstKey); 1980 } else { 1981 1982 // Prepare for, execute and clean up after of all files in folder, and 1983 // the root file, and update the last modified time of the source and 1984 // target parent folders. The operation can be redone if it fails part 1985 // way through, by applying the "Rename Pending" file. 1986 1987 // The following code (internally) only does atomic rename preparation 1988 // and lease management for page blob folders, limiting the scope of the 1989 // operation to HBase log file folders, where atomic rename is required. 1990 // In the future, we could generalize it easily to all folders. 1991 renamePending = prepareAtomicFolderRename(srcKey, dstKey); 1992 renamePending.execute(); 1993 if (LOG.isDebugEnabled()) { 1994 LOG.debug("Renamed " + src + " to " + dst + " successfully."); 1995 } 1996 renamePending.cleanup(); 1997 return true; 1998 } 1999 2000 // Update the last-modified time of the parent folders of both source 2001 // and destination. 2002 updateParentFolderLastModifiedTime(srcKey); 2003 updateParentFolderLastModifiedTime(dstKey); 2004 2005 if (LOG.isDebugEnabled()) { 2006 LOG.debug("Renamed " + src + " to " + dst + " successfully."); 2007 } 2008 return true; 2009 } 2010 2011 /** 2012 * Update the last-modified time of the parent folder of the file 2013 * identified by key. 2014 * @param key 2015 * @throws IOException 2016 */ 2017 private void updateParentFolderLastModifiedTime(String key) 2018 throws IOException { 2019 Path parent = makeAbsolute(keyToPath(key)).getParent(); 2020 if (parent != null && parent.getParent() != null) { // not root 2021 String parentKey = pathToKey(parent); 2022 2023 // ensure the parent is a materialized folder 2024 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 2025 // The metadata could be null if the implicit folder only contains a 2026 // single file. In this case, the parent folder no longer exists if the 2027 // file is renamed; so we can safely ignore the null pointer case. 2028 if (parentMetadata != null) { 2029 if (parentMetadata.isDir() 2030 && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2031 store.storeEmptyFolder(parentKey, 2032 createPermissionStatus(FsPermission.getDefault())); 2033 } 2034 2035 store.updateFolderLastModifiedTime(parentKey, null); 2036 } 2037 } 2038 } 2039 2040 /** 2041 * If the source is a page blob folder, 2042 * prepare to rename this folder atomically. This means to get exclusive 2043 * access to the source folder, and record the actions to be performed for 2044 * this rename in a "Rename Pending" file. This code was designed to 2045 * meet the needs of HBase, which requires atomic rename of write-ahead log 2046 * (WAL) folders for correctness. 2047 * 2048 * Before calling this method, the caller must ensure that the source is a 2049 * folder. 2050 * 2051 * For non-page-blob directories, prepare the in-memory information needed, 2052 * but don't take the lease or write the redo file. This is done to limit the 2053 * scope of atomic folder rename to HBase, at least at the time of writing 2054 * this code. 2055 * 2056 * @param srcKey Source folder name. 2057 * @param dstKey Destination folder name. 2058 * @throws IOException 2059 */ 2060 private FolderRenamePending prepareAtomicFolderRename( 2061 String srcKey, String dstKey) throws IOException { 2062 2063 if (store.isAtomicRenameKey(srcKey)) { 2064 2065 // Block unwanted concurrent access to source folder. 2066 SelfRenewingLease lease = leaseSourceFolder(srcKey); 2067 2068 // Prepare in-memory information needed to do or redo a folder rename. 2069 FolderRenamePending renamePending = 2070 new FolderRenamePending(srcKey, dstKey, lease, this); 2071 2072 // Save it to persistent storage to help recover if the operation fails. 2073 renamePending.writeFile(this); 2074 return renamePending; 2075 } else { 2076 FolderRenamePending renamePending = 2077 new FolderRenamePending(srcKey, dstKey, null, this); 2078 return renamePending; 2079 } 2080 } 2081 2082 /** 2083 * Get a self-renewing Azure blob lease on the source folder zero-byte file. 2084 */ 2085 private SelfRenewingLease leaseSourceFolder(String srcKey) 2086 throws AzureException { 2087 return store.acquireLease(srcKey); 2088 } 2089 2090 /** 2091 * Return an array containing hostnames, offset and size of 2092 * portions of the given file. For WASB we'll just lie and give 2093 * fake hosts to make sure we get many splits in MR jobs. 2094 */ 2095 @Override 2096 public BlockLocation[] getFileBlockLocations(FileStatus file, 2097 long start, long len) throws IOException { 2098 if (file == null) { 2099 return null; 2100 } 2101 2102 if ((start < 0) || (len < 0)) { 2103 throw new IllegalArgumentException("Invalid start or len parameter"); 2104 } 2105 2106 if (file.getLen() < start) { 2107 return new BlockLocation[0]; 2108 } 2109 final String blobLocationHost = getConf().get( 2110 AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, 2111 AZURE_BLOCK_LOCATION_HOST_DEFAULT); 2112 final String[] name = { blobLocationHost }; 2113 final String[] host = { blobLocationHost }; 2114 long blockSize = file.getBlockSize(); 2115 if (blockSize <= 0) { 2116 throw new IllegalArgumentException( 2117 "The block size for the given file is not a positive number: " 2118 + blockSize); 2119 } 2120 int numberOfLocations = (int) (len / blockSize) 2121 + ((len % blockSize == 0) ? 0 : 1); 2122 BlockLocation[] locations = new BlockLocation[numberOfLocations]; 2123 for (int i = 0; i < locations.length; i++) { 2124 long currentOffset = start + (i * blockSize); 2125 long currentLength = Math.min(blockSize, start + len - currentOffset); 2126 locations[i] = new BlockLocation(name, host, currentOffset, currentLength); 2127 } 2128 return locations; 2129 } 2130 2131 /** 2132 * Set the working directory to the given directory. 2133 */ 2134 @Override 2135 public void setWorkingDirectory(Path newDir) { 2136 workingDir = makeAbsolute(newDir); 2137 } 2138 2139 @Override 2140 public Path getWorkingDirectory() { 2141 return workingDir; 2142 } 2143 2144 @Override 2145 public void setPermission(Path p, FsPermission permission) throws IOException { 2146 Path absolutePath = makeAbsolute(p); 2147 String key = pathToKey(absolutePath); 2148 FileMetadata metadata = store.retrieveMetadata(key); 2149 if (metadata == null) { 2150 throw new FileNotFoundException("File doesn't exist: " + p); 2151 } 2152 permission = applyUMask(permission, 2153 metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory 2154 : UMaskApplyMode.ChangeExistingFile); 2155 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2156 // It's an implicit folder, need to materialize it. 2157 store.storeEmptyFolder(key, createPermissionStatus(permission)); 2158 } else if (!metadata.getPermissionStatus().getPermission(). 2159 equals(permission)) { 2160 store.changePermissionStatus(key, new PermissionStatus( 2161 metadata.getPermissionStatus().getUserName(), 2162 metadata.getPermissionStatus().getGroupName(), 2163 permission)); 2164 } 2165 } 2166 2167 @Override 2168 public void setOwner(Path p, String username, String groupname) 2169 throws IOException { 2170 Path absolutePath = makeAbsolute(p); 2171 String key = pathToKey(absolutePath); 2172 FileMetadata metadata = store.retrieveMetadata(key); 2173 if (metadata == null) { 2174 throw new FileNotFoundException("File doesn't exist: " + p); 2175 } 2176 PermissionStatus newPermissionStatus = new PermissionStatus( 2177 username == null ? 2178 metadata.getPermissionStatus().getUserName() : username, 2179 groupname == null ? 2180 metadata.getPermissionStatus().getGroupName() : groupname, 2181 metadata.getPermissionStatus().getPermission()); 2182 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2183 // It's an implicit folder, need to materialize it. 2184 store.storeEmptyFolder(key, newPermissionStatus); 2185 } else { 2186 store.changePermissionStatus(key, newPermissionStatus); 2187 } 2188 } 2189 2190 @Override 2191 public synchronized void close() throws IOException { 2192 if (isClosed) { 2193 return; 2194 } 2195 2196 // Call the base close() to close any resources there. 2197 super.close(); 2198 // Close the store to close any resources there - e.g. the bandwidth 2199 // updater thread would be stopped at this time. 2200 store.close(); 2201 // Notify the metrics system that this file system is closed, which may 2202 // trigger one final metrics push to get the accurate final file system 2203 // metrics out. 2204 2205 long startTime = System.currentTimeMillis(); 2206 2207 AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName); 2208 AzureFileSystemMetricsSystem.fileSystemClosed(); 2209 2210 if (LOG.isDebugEnabled()) { 2211 LOG.debug("Submitting metrics when file system closed took " 2212 + (System.currentTimeMillis() - startTime) + " ms."); 2213 } 2214 isClosed = true; 2215 } 2216 2217 /** 2218 * A handler that defines what to do with blobs whose upload was 2219 * interrupted. 2220 */ 2221 private abstract class DanglingFileHandler { 2222 abstract void handleFile(FileMetadata file, FileMetadata tempFile) 2223 throws IOException; 2224 } 2225 2226 /** 2227 * Handler implementation for just deleting dangling files and cleaning 2228 * them up. 2229 */ 2230 private class DanglingFileDeleter extends DanglingFileHandler { 2231 @Override 2232 void handleFile(FileMetadata file, FileMetadata tempFile) 2233 throws IOException { 2234 if (LOG.isDebugEnabled()) { 2235 LOG.debug("Deleting dangling file " + file.getKey()); 2236 } 2237 store.delete(file.getKey()); 2238 store.delete(tempFile.getKey()); 2239 } 2240 } 2241 2242 /** 2243 * Handler implementation for just moving dangling files to recovery 2244 * location (/lost+found). 2245 */ 2246 private class DanglingFileRecoverer extends DanglingFileHandler { 2247 private final Path destination; 2248 2249 DanglingFileRecoverer(Path destination) { 2250 this.destination = destination; 2251 } 2252 2253 @Override 2254 void handleFile(FileMetadata file, FileMetadata tempFile) 2255 throws IOException { 2256 if (LOG.isDebugEnabled()) { 2257 LOG.debug("Recovering " + file.getKey()); 2258 } 2259 // Move to the final destination 2260 String finalDestinationKey = 2261 pathToKey(new Path(destination, file.getKey())); 2262 store.rename(tempFile.getKey(), finalDestinationKey); 2263 if (!finalDestinationKey.equals(file.getKey())) { 2264 // Delete the empty link file now that we've restored it. 2265 store.delete(file.getKey()); 2266 } 2267 } 2268 } 2269 2270 /** 2271 * Check if a path has colons in its name 2272 */ 2273 private boolean containsColon(Path p) { 2274 return p.toUri().getPath().toString().contains(":"); 2275 } 2276 2277 /** 2278 * Implements recover and delete (-move and -delete) behaviors for handling 2279 * dangling files (blobs whose upload was interrupted). 2280 * 2281 * @param root 2282 * The root path to check from. 2283 * @param handler 2284 * The handler that deals with dangling files. 2285 */ 2286 private void handleFilesWithDanglingTempData(Path root, 2287 DanglingFileHandler handler) throws IOException { 2288 // Calculate the cut-off for when to consider a blob to be dangling. 2289 long cutoffForDangling = new Date().getTime() 2290 - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME, 2291 AZURE_TEMP_EXPIRY_DEFAULT) * 1000; 2292 // Go over all the blobs under the given root and look for blobs to 2293 // recover. 2294 String priorLastKey = null; 2295 do { 2296 PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL, 2297 AZURE_UNBOUNDED_DEPTH, priorLastKey); 2298 2299 for (FileMetadata file : listing.getFiles()) { 2300 if (!file.isDir()) { // We don't recover directory blobs 2301 // See if this blob has a link in it (meaning it's a place-holder 2302 // blob for when the upload to the temp blob is complete). 2303 String link = store.getLinkInFileMetadata(file.getKey()); 2304 if (link != null) { 2305 // It has a link, see if the temp blob it is pointing to is 2306 // existent and old enough to be considered dangling. 2307 FileMetadata linkMetadata = store.retrieveMetadata(link); 2308 if (linkMetadata != null 2309 && linkMetadata.getLastModified() >= cutoffForDangling) { 2310 // Found one! 2311 handler.handleFile(file, linkMetadata); 2312 } 2313 } 2314 } 2315 } 2316 priorLastKey = listing.getPriorLastKey(); 2317 } while (priorLastKey != null); 2318 } 2319 2320 /** 2321 * Looks under the given root path for any blob that are left "dangling", 2322 * meaning that they are place-holder blobs that we created while we upload 2323 * the data to a temporary blob, but for some reason we crashed in the middle 2324 * of the upload and left them there. If any are found, we move them to the 2325 * destination given. 2326 * 2327 * @param root 2328 * The root path to consider. 2329 * @param destination 2330 * The destination path to move any recovered files to. 2331 * @throws IOException 2332 */ 2333 public void recoverFilesWithDanglingTempData(Path root, Path destination) 2334 throws IOException { 2335 if (LOG.isDebugEnabled()) { 2336 LOG.debug("Recovering files with dangling temp data in " + root); 2337 } 2338 handleFilesWithDanglingTempData(root, 2339 new DanglingFileRecoverer(destination)); 2340 } 2341 2342 /** 2343 * Looks under the given root path for any blob that are left "dangling", 2344 * meaning that they are place-holder blobs that we created while we upload 2345 * the data to a temporary blob, but for some reason we crashed in the middle 2346 * of the upload and left them there. If any are found, we delete them. 2347 * 2348 * @param root 2349 * The root path to consider. 2350 * @throws IOException 2351 */ 2352 public void deleteFilesWithDanglingTempData(Path root) throws IOException { 2353 if (LOG.isDebugEnabled()) { 2354 LOG.debug("Deleting files with dangling temp data in " + root); 2355 } 2356 handleFilesWithDanglingTempData(root, new DanglingFileDeleter()); 2357 } 2358 2359 @Override 2360 protected void finalize() throws Throwable { 2361 LOG.debug("finalize() called."); 2362 close(); 2363 super.finalize(); 2364 } 2365 2366 /** 2367 * Encode the key with a random prefix for load balancing in Azure storage. 2368 * Upload data to a random temporary file then do storage side renaming to 2369 * recover the original key. 2370 * 2371 * @param aKey 2372 * @return Encoded version of the original key. 2373 */ 2374 private static String encodeKey(String aKey) { 2375 // Get the tail end of the key name. 2376 // 2377 String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1, 2378 aKey.length()); 2379 2380 // Construct the randomized prefix of the file name. The prefix ensures the 2381 // file always drops into the same folder but with a varying tail key name. 2382 String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR 2383 + UUID.randomUUID().toString(); 2384 2385 // Concatenate the randomized prefix with the tail of the key name. 2386 String randomizedKey = filePrefix + fileName; 2387 2388 // Return to the caller with the randomized key. 2389 return randomizedKey; 2390 } 2391}