001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.azure;
020
021import java.io.DataInputStream;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.text.SimpleDateFormat;
029import java.util.ArrayList;
030import java.util.Date;
031import java.util.EnumSet;
032import java.util.Iterator;
033import java.util.Set;
034import java.util.TimeZone;
035import java.util.TreeSet;
036import java.util.UUID;
037import java.util.concurrent.atomic.AtomicInteger;
038import java.util.regex.Matcher;
039import java.util.regex.Pattern;
040
041import org.apache.commons.lang.StringUtils;
042import org.apache.commons.lang.exception.ExceptionUtils;
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.apache.hadoop.classification.InterfaceAudience;
046import org.apache.hadoop.classification.InterfaceStability;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.BlockLocation;
049import org.apache.hadoop.fs.BufferedFSInputStream;
050import org.apache.hadoop.fs.CreateFlag;
051import org.apache.hadoop.fs.FSDataInputStream;
052import org.apache.hadoop.fs.FSDataOutputStream;
053import org.apache.hadoop.fs.FSInputStream;
054import org.apache.hadoop.fs.FileStatus;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.Path;
057import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
059import org.apache.hadoop.fs.permission.FsPermission;
060import org.apache.hadoop.fs.permission.PermissionStatus;
061import org.apache.hadoop.fs.azure.AzureException;
062import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
063import org.apache.hadoop.io.IOUtils;
064import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
065import org.apache.hadoop.security.UserGroupInformation;
066import org.apache.hadoop.util.Progressable;
067
068
069import org.codehaus.jackson.JsonNode;
070import org.codehaus.jackson.JsonParseException;
071import org.codehaus.jackson.JsonParser;
072import org.codehaus.jackson.map.JsonMappingException;
073import org.codehaus.jackson.map.ObjectMapper;
074
075import com.google.common.annotations.VisibleForTesting;
076import com.microsoft.windowsazure.storage.AccessCondition;
077import com.microsoft.windowsazure.storage.OperationContext;
078import com.microsoft.windowsazure.storage.StorageException;
079import com.microsoft.windowsazure.storage.blob.CloudBlob;
080import com.microsoft.windowsazure.storage.core.*;
081
082/**
083 * A {@link FileSystem} for reading and writing files stored on <a
084 * href="http://store.azure.com/">Windows Azure</a>. This implementation is
085 * blob-based and stores files on Azure in their native form so they can be read
086 * by other Azure tools.
087 */
088@InterfaceAudience.Public
089@InterfaceStability.Stable
090public class NativeAzureFileSystem extends FileSystem {
091  private static final int USER_WX_PERMISION = 0300;
092
093  /**
094   * A description of a folder rename operation, including the source and
095   * destination keys, and descriptions of the files in the source folder.
096   */
097  public static class FolderRenamePending {
098    private SelfRenewingLease folderLease;
099    private String srcKey;
100    private String dstKey;
101    private FileMetadata[] fileMetadata = null;    // descriptions of source files
102    private ArrayList<String> fileStrings = null;
103    private NativeAzureFileSystem fs;
104    private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000;
105    private static final int FORMATTING_BUFFER = 10000;
106    private boolean committed;
107    public static final String SUFFIX = "-RenamePending.json";
108
109    // Prepare in-memory information needed to do or redo a folder rename.
110    public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease,
111        NativeAzureFileSystem fs) throws IOException {
112      this.srcKey = srcKey;
113      this.dstKey = dstKey;
114      this.folderLease = lease;
115      this.fs = fs;
116      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
117
118      // List all the files in the folder.
119      String priorLastKey = null;
120      do {
121        PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
122          AZURE_UNBOUNDED_DEPTH, priorLastKey);
123        for(FileMetadata file : listing.getFiles()) {
124          fileMetadataList.add(file);
125        }
126        priorLastKey = listing.getPriorLastKey();
127      } while (priorLastKey != null);
128      fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
129      this.committed = true;
130    }
131
132    // Prepare in-memory information needed to do or redo folder rename from
133    // a -RenamePending.json file read from storage. This constructor is to use during
134    // redo processing.
135    public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs)
136        throws IllegalArgumentException, IOException {
137
138      this.fs = fs;
139
140      // open redo file
141      Path f = redoFile;
142      FSDataInputStream input = fs.open(f);
143      byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE];
144      int l = input.read(bytes);
145      if (l < 0) {
146        throw new IOException(
147            "Error reading pending rename file contents -- no data available");
148      }
149      if (l == MAX_RENAME_PENDING_FILE_SIZE) {
150        throw new IOException(
151            "Error reading pending rename file contents -- "
152                + "maximum file size exceeded");
153      }
154      String contents = new String(bytes, 0, l);
155
156      // parse the JSON
157      ObjectMapper objMapper = new ObjectMapper();
158      objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
159      JsonNode json = null;
160      try {
161        json = objMapper.readValue(contents, JsonNode.class);
162        this.committed = true;
163      } catch (JsonMappingException e) {
164
165        // The -RedoPending.json file is corrupted, so we assume it was
166        // not completely written
167        // and the redo operation did not commit.
168        this.committed = false;
169      } catch (JsonParseException e) {
170        this.committed = false;
171      } catch (IOException e) {
172        this.committed = false;  
173      }
174      
175      if (!this.committed) {
176        LOG.error("Deleting corruped rename pending file "
177            + redoFile + "\n" + contents);
178
179        // delete the -RenamePending.json file
180        fs.delete(redoFile, false);
181        return;
182      }
183
184      // initialize this object's fields
185      ArrayList<String> fileStrList = new ArrayList<String>();
186      JsonNode oldFolderName = json.get("OldFolderName");
187      JsonNode newFolderName = json.get("NewFolderName");
188      if (oldFolderName == null || newFolderName == null) {
189          this.committed = false;
190      } else {
191        this.srcKey = oldFolderName.getTextValue();
192        this.dstKey = newFolderName.getTextValue();
193        if (this.srcKey == null || this.dstKey == null) {
194          this.committed = false;         
195        } else {
196          JsonNode fileList = json.get("FileList");
197          if (fileList == null) {
198            this.committed = false;     
199          } else {
200            for (int i = 0; i < fileList.size(); i++) {
201              fileStrList.add(fileList.get(i).getTextValue());
202            }
203          }
204        }
205      }
206      this.fileStrings = fileStrList;
207    }
208
209    public FileMetadata[] getFiles() {
210      return fileMetadata;
211    }
212
213    public SelfRenewingLease getFolderLease() {
214      return folderLease;
215    }
216
217    /**
218     * Write to disk the information needed to redo folder rename,
219     * in JSON format. The file name will be
220     * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json}
221     * The file format will be:
222     * <pre>{@code
223     * {
224     *   FormatVersion: "1.0",
225     *   OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>",
226     *   OldFolderName: "<key>",
227     *   NewFolderName: "<key>",
228     *   FileList: [ <string> , <string> , ... ]
229     * }
230     *
231     * Here's a sample:
232     * {
233     *  FormatVersion: "1.0",
234     *  OperationUTCTime: "2014-07-01 23:50:35.572",
235     *  OldFolderName: "user/ehans/folderToRename",
236     *  NewFolderName: "user/ehans/renamedFolder",
237     *  FileList: [
238     *    "innerFile",
239     *    "innerFile2"
240     *  ]
241     * } }</pre>
242     * @throws IOException
243     */
244    public void writeFile(FileSystem fs) throws IOException {
245      Path path = getRenamePendingFilePath();
246      if (LOG.isDebugEnabled()){
247        LOG.debug("Preparing to write atomic rename state to " + path.toString());
248      }
249      OutputStream output = null;
250
251      String contents = makeRenamePendingFileContents();
252
253      // Write file.
254      try {
255        output = fs.create(path);
256        output.write(contents.getBytes());
257      } catch (IOException e) {
258        throw new IOException("Unable to write RenamePending file for folder rename from "
259            + srcKey + " to " + dstKey, e);
260      } finally {
261        IOUtils.cleanup(LOG, output);
262      }
263    }
264
265    /**
266     * Return the contents of the JSON file to represent the operations
267     * to be performed for a folder rename.
268     */
269    public String makeRenamePendingFileContents() {
270      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
271      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
272      String time = sdf.format(new Date());
273
274      // Make file list string
275      StringBuilder builder = new StringBuilder();
276      builder.append("[\n");
277      for (int i = 0; i != fileMetadata.length; i++) {
278        if (i > 0) {
279          builder.append(",\n");
280        }
281        builder.append("    ");
282        String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/");
283
284        // Quote string file names, escaping any possible " characters or other
285        // necessary characters in the name.
286        builder.append(quote(noPrefix));
287        if (builder.length() >=
288            MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) {
289
290          // Give up now to avoid using too much memory.
291          LOG.error("Internal error: Exceeded maximum rename pending file size of "
292              + MAX_RENAME_PENDING_FILE_SIZE + " bytes.");
293
294          // return some bad JSON with an error message to make it human readable
295          return "exceeded maximum rename pending file size";
296        }
297      }
298      builder.append("\n  ]");
299      String fileList = builder.toString();
300
301      // Make file contents as a string. Again, quote file names, escaping
302      // characters as appropriate.
303      String contents = "{\n"
304          + "  FormatVersion: \"1.0\",\n"
305          + "  OperationUTCTime: \"" + time + "\",\n"
306          + "  OldFolderName: " + quote(srcKey) + ",\n"
307          + "  NewFolderName: " + quote(dstKey) + ",\n"
308          + "  FileList: " + fileList + "\n"
309          + "}\n";
310
311      return contents;
312    }
313    
314    /**
315     * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 
316     * method.
317     * 
318     * Produce a string in double quotes with backslash sequences in all the
319     * right places. A backslash will be inserted within </, allowing JSON
320     * text to be delivered in HTML. In JSON text, a string cannot contain a
321     * control character or an unescaped quote or backslash.
322     * @param string A String
323     * @return  A String correctly formatted for insertion in a JSON text.
324     */
325    private String quote(String string) {
326        if (string == null || string.length() == 0) {
327            return "\"\"";
328        }
329
330        char c = 0;
331        int  i;
332        int  len = string.length();
333        StringBuilder sb = new StringBuilder(len + 4);
334        String t;
335
336        sb.append('"');
337        for (i = 0; i < len; i += 1) {
338            c = string.charAt(i);
339            switch (c) {
340            case '\\':
341            case '"':
342                sb.append('\\');
343                sb.append(c);
344                break;
345            case '/':
346                sb.append('\\');
347                sb.append(c);
348                break;
349            case '\b':
350                sb.append("\\b");
351                break;
352            case '\t':
353                sb.append("\\t");
354                break;
355            case '\n':
356                sb.append("\\n");
357                break;
358            case '\f':
359                sb.append("\\f");
360                break;
361            case '\r':
362                sb.append("\\r");
363                break;
364            default:
365                if (c < ' ') {
366                    t = "000" + Integer.toHexString(c);
367                    sb.append("\\u" + t.substring(t.length() - 4));
368                } else {
369                    sb.append(c);
370                }
371            }
372        }
373        sb.append('"');
374        return sb.toString();
375    }
376
377    public String getSrcKey() {
378      return srcKey;
379    }
380
381    public String getDstKey() {
382      return dstKey;
383    }
384
385    public FileMetadata getSourceMetadata() throws IOException {
386      return fs.getStoreInterface().retrieveMetadata(srcKey);
387    }
388
389    /**
390     * Execute a folder rename. This is the execution path followed
391     * when everything is working normally. See redo() for the alternate
392     * execution path for the case where we're recovering from a folder rename
393     * failure.
394     * @throws IOException
395     */
396    public void execute() throws IOException {
397
398      for (FileMetadata file : this.getFiles()) {
399
400        // Rename all materialized entries under the folder to point to the
401        // final destination.
402        if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
403          String srcName = file.getKey();
404          String suffix  = srcName.substring((this.getSrcKey()).length());
405          String dstName = this.getDstKey() + suffix;
406
407          // Rename gets exclusive access (via a lease) for files
408          // designated for atomic rename.
409          // The main use case is for HBase write-ahead log (WAL) and data
410          // folder processing correctness.  See the rename code for details.
411          boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName);
412          fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
413        }
414      }
415
416      // Rename the source folder 0-byte root file itself.
417      FileMetadata srcMetadata2 = this.getSourceMetadata();
418      if (srcMetadata2.getBlobMaterialization() ==
419          BlobMaterialization.Explicit) {
420
421        // It already has a lease on it from the "prepare" phase so there's no
422        // need to get one now. Pass in existing lease to allow file delete.
423        fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(),
424            false, folderLease);
425      }
426
427      // Update the last-modified time of the parent folders of both source and
428      // destination.
429      fs.updateParentFolderLastModifiedTime(srcKey);
430      fs.updateParentFolderLastModifiedTime(dstKey);
431    }
432
433    /** Clean up after execution of rename.
434     * @throws IOException */
435    public void cleanup() throws IOException {
436
437      if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) {
438
439        // Remove RenamePending file
440        fs.delete(getRenamePendingFilePath(), false);
441
442        // Freeing source folder lease is not necessary since the source
443        // folder file was deleted.
444      }
445    }
446
447    private Path getRenamePendingFilePath() {
448      String fileName = srcKey + SUFFIX;
449      Path fileNamePath = keyToPath(fileName);
450      Path path = fs.makeAbsolute(fileNamePath);
451      return path;
452    }
453
454    /**
455     * Recover from a folder rename failure by redoing the intended work,
456     * as recorded in the -RenamePending.json file.
457     * 
458     * @throws IOException
459     */
460    public void redo() throws IOException {
461
462      if (!committed) {
463
464        // Nothing to do. The -RedoPending.json file should have already been
465        // deleted.
466        return;
467      }
468
469      // Try to get a lease on source folder to block concurrent access to it.
470      // It may fail if the folder is already gone. We don't check if the
471      // source exists explicitly because that could recursively trigger redo
472      // and give an infinite recursion.
473      SelfRenewingLease lease = null;
474      boolean sourceFolderGone = false;
475      try {
476        lease = fs.leaseSourceFolder(srcKey);
477      } catch (AzureException e) {
478
479        // If the source folder was not found then somebody probably
480        // raced with us and finished the rename first, or the
481        // first rename failed right before deleting the rename pending
482        // file.
483        String errorCode = "";
484        try {
485          StorageException se = (StorageException) e.getCause();
486          errorCode = se.getErrorCode();
487        } catch (Exception e2) {
488          ; // do nothing -- could not get errorCode
489        }
490        if (errorCode.equals("BlobNotFound")) {
491          sourceFolderGone = true;
492        } else {
493          throw new IOException(
494              "Unexpected error when trying to lease source folder name during "
495              + "folder rename redo",
496              e);
497        }
498      }
499
500      if (!sourceFolderGone) {
501        // Make sure the target folder exists.
502        Path dst = fullPath(dstKey);
503        if (!fs.exists(dst)) {
504          fs.mkdirs(dst);
505        }
506
507        // For each file inside the folder to be renamed,
508        // make sure it has been renamed.
509        for(String fileName : fileStrings) {
510          finishSingleFileRename(fileName);
511        }
512
513        // Remove the source folder. Don't check explicitly if it exists,
514        // to avoid triggering redo recursively.
515        try {
516          fs.getStoreInterface().delete(srcKey, lease);
517        } catch (Exception e) {
518          LOG.info("Unable to delete source folder during folder rename redo. "
519              + "If the source folder is already gone, this is not an error "
520              + "condition. Continuing with redo.", e);
521        }
522
523        // Update the last-modified time of the parent folders of both source
524        // and destination.
525        fs.updateParentFolderLastModifiedTime(srcKey);
526        fs.updateParentFolderLastModifiedTime(dstKey);
527      }
528
529      // Remove the -RenamePending.json file.
530      fs.delete(getRenamePendingFilePath(), false);
531    }
532
533    // See if the source file is still there, and if it is, rename it.
534    private void finishSingleFileRename(String fileName)
535        throws IOException {
536      Path srcFile = fullPath(srcKey, fileName);
537      Path dstFile = fullPath(dstKey, fileName);
538      boolean srcExists = fs.exists(srcFile);
539      boolean dstExists = fs.exists(dstFile);
540      if (srcExists && !dstExists) {
541
542        // Rename gets exclusive access (via a lease) for HBase write-ahead log
543        // (WAL) file processing correctness.  See the rename code for details.
544        String srcName = fs.pathToKey(srcFile);
545        String dstName = fs.pathToKey(dstFile);
546        fs.getStoreInterface().rename(srcName, dstName, true, null);
547      } else if (srcExists && dstExists) {
548
549        // Get a lease on source to block write access.
550        String srcName = fs.pathToKey(srcFile);
551        SelfRenewingLease lease = fs.acquireLease(srcFile);
552
553        // Delete the file. This will free the lease too.
554        fs.getStoreInterface().delete(srcName, lease);
555      } else if (!srcExists && dstExists) {
556
557        // The rename already finished, so do nothing.
558        ;
559      } else {
560        throw new IOException(
561            "Attempting to complete rename of file " + srcKey + "/" + fileName
562            + " during folder rename redo, and file was not found in source "
563            + "or destination.");
564      }
565    }
566
567    // Return an absolute path for the specific fileName within the folder
568    // specified by folderKey.
569    private Path fullPath(String folderKey, String fileName) {
570      return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName);
571    }
572
573    private Path fullPath(String fileKey) {
574      return new Path(new Path(fs.getUri()), "/" + fileKey);
575    }
576  }
577
578  private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]";
579  private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN =
580      Pattern.compile("\\[\\[\\.\\]\\](?=$|/)");
581  private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)");
582
583  @Override
584  public String getScheme() {
585    return "wasb";
586  }
587
588  
589  /**
590   * <p>
591   * A {@link FileSystem} for reading and writing files stored on <a
592   * href="http://store.azure.com/">Windows Azure</a>. This implementation is
593   * blob-based and stores files on Azure in their native form so they can be read
594   * by other Azure tools. This implementation uses HTTPS for secure network communication.
595   * </p>
596   */
597  public static class Secure extends NativeAzureFileSystem {
598    @Override
599    public String getScheme() {
600      return "wasbs";
601    }
602  }
603
604  public static final Log LOG = LogFactory.getLog(NativeAzureFileSystem.class);
605
606  static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
607  /**
608   * The time span in seconds before which we consider a temp blob to be
609   * dangling (not being actively uploaded to) and up for reclamation.
610   * 
611   * So e.g. if this is 60, then any temporary blobs more than a minute old
612   * would be considered dangling.
613   */
614  static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds";
615  private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600;
616  static final String PATH_DELIMITER = Path.SEPARATOR;
617  static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$";
618
619  private static final int AZURE_LIST_ALL = -1;
620  private static final int AZURE_UNBOUNDED_DEPTH = -1;
621
622  private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
623
624  /**
625   * The configuration property that determines which group owns files created
626   * in WASB.
627   */
628  private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup";
629  /**
630   * The default value for fs.azure.permissions.supergroup. Chosen as the same
631   * default as DFS.
632   */
633  static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
634
635  static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
636      "fs.azure.block.location.impersonatedhost";
637  private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
638      "localhost";
639  static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
640      "fs.azure.ring.buffer.capacity";
641  static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
642      "fs.azure.output.stream.buffer.size";
643
644  private class NativeAzureFsInputStream extends FSInputStream {
645    private InputStream in;
646    private final String key;
647    private long pos = 0;
648    private boolean closed = false;
649    private boolean isPageBlob;
650
651    // File length, valid only for streams over block blobs.
652    private long fileLength;
653
654    public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
655      this.in = in;
656      this.key = key;
657      this.isPageBlob = store.isPageBlobKey(key);
658      this.fileLength = fileLength;
659    }
660
661    /**
662     * Return the size of the remaining available bytes
663     * if the size is less than or equal to {@link Integer#MAX_VALUE},
664     * otherwise, return {@link Integer#MAX_VALUE}.
665     *
666     * This is to match the behavior of DFSInputStream.available(),
667     * which some clients may rely on (HBase write-ahead log reading in
668     * particular).
669     */
670    @Override
671    public synchronized int available() throws IOException {
672      if (isPageBlob) {
673        return in.available();
674      } else {
675        if (closed) {
676          throw new IOException("Stream closed");
677        }
678        final long remaining = this.fileLength - pos;
679        return remaining <= Integer.MAX_VALUE ?
680            (int) remaining : Integer.MAX_VALUE;
681      }
682    }
683
684    /*
685     * Reads the next byte of data from the input stream. The value byte is
686     * returned as an integer in the range 0 to 255. If no byte is available
687     * because the end of the stream has been reached, the value -1 is returned.
688     * This method blocks until input data is available, the end of the stream
689     * is detected, or an exception is thrown.
690     *
691     * @returns int An integer corresponding to the byte read.
692     */
693    @Override
694    public synchronized int read() throws IOException {
695      int result = 0;
696      result = in.read();
697      if (result != -1) {
698        pos++;
699        if (statistics != null) {
700          statistics.incrementBytesRead(1);
701        }
702      }
703
704      // Return to the caller with the result.
705      //
706      return result;
707    }
708
709    /*
710     * Reads up to len bytes of data from the input stream into an array of
711     * bytes. An attempt is made to read as many as len bytes, but a smaller
712     * number may be read. The number of bytes actually read is returned as an
713     * integer. This method blocks until input data is available, end of file is
714     * detected, or an exception is thrown. If len is zero, then no bytes are
715     * read and 0 is returned; otherwise, there is an attempt to read at least
716     * one byte. If no byte is available because the stream is at end of file,
717     * the value -1 is returned; otherwise, at least one byte is read and stored
718     * into b.
719     *
720     * @param b -- the buffer into which data is read
721     *
722     * @param off -- the start offset in the array b at which data is written
723     *
724     * @param len -- the maximum number of bytes read
725     *
726     * @ returns int The total number of byes read into the buffer, or -1 if
727     * there is no more data because the end of stream is reached.
728     */
729    @Override
730    public synchronized int read(byte[] b, int off, int len) throws IOException {
731      int result = 0;
732      result = in.read(b, off, len);
733      if (result > 0) {
734        pos += result;
735      }
736
737      if (null != statistics) {
738        statistics.incrementBytesRead(result);
739      }
740
741      // Return to the caller with the result.
742      return result;
743    }
744
745    @Override
746    public void close() throws IOException {
747      in.close();
748      closed = true;
749    }
750
751    @Override
752    public synchronized void seek(long pos) throws IOException {
753     in.close();
754     in = store.retrieve(key);
755     this.pos = in.skip(pos);
756     if (LOG.isDebugEnabled()) {
757       LOG.debug(String.format("Seek to position %d. Bytes skipped %d", pos,
758         this.pos));
759     }
760    }
761
762    @Override
763    public synchronized long getPos() throws IOException {
764      return pos;
765    }
766
767    @Override
768    public boolean seekToNewSource(long targetPos) throws IOException {
769      return false;
770    }
771  }
772
773  private class NativeAzureFsOutputStream extends OutputStream {
774    // We should not override flush() to actually close current block and flush
775    // to DFS, this will break applications that assume flush() is a no-op.
776    // Applications are advised to use Syncable.hflush() for that purpose.
777    // NativeAzureFsOutputStream needs to implement Syncable if needed.
778    private String key;
779    private String keyEncoded;
780    private OutputStream out;
781
782    public NativeAzureFsOutputStream(OutputStream out, String aKey,
783        String anEncodedKey) throws IOException {
784      // Check input arguments. The output stream should be non-null and the
785      // keys
786      // should be valid strings.
787      if (null == out) {
788        throw new IllegalArgumentException(
789            "Illegal argument: the output stream is null.");
790      }
791
792      if (null == aKey || 0 == aKey.length()) {
793        throw new IllegalArgumentException(
794            "Illegal argument the key string is null or empty");
795      }
796
797      if (null == anEncodedKey || 0 == anEncodedKey.length()) {
798        throw new IllegalArgumentException(
799            "Illegal argument the encoded key string is null or empty");
800      }
801
802      // Initialize the member variables with the incoming parameters.
803      this.out = out;
804
805      setKey(aKey);
806      setEncodedKey(anEncodedKey);
807    }
808
809    @Override
810    public synchronized void close() throws IOException {
811      if (out != null) {
812        // Close the output stream and decode the key for the output stream
813        // before returning to the caller.
814        //
815        out.close();
816        restoreKey();
817        out = null;
818      }
819    }
820
821    /**
822     * Writes the specified byte to this output stream. The general contract for
823     * write is that one byte is written to the output stream. The byte to be
824     * written is the eight low-order bits of the argument b. The 24 high-order
825     * bits of b are ignored.
826     * 
827     * @param b
828     *          32-bit integer of block of 4 bytes
829     */
830    @Override
831    public void write(int b) throws IOException {
832      out.write(b);
833    }
834
835    /**
836     * Writes b.length bytes from the specified byte array to this output
837     * stream. The general contract for write(b) is that it should have exactly
838     * the same effect as the call write(b, 0, b.length).
839     * 
840     * @param b
841     *          Block of bytes to be written to the output stream.
842     */
843    @Override
844    public void write(byte[] b) throws IOException {
845      out.write(b);
846    }
847
848    /**
849     * Writes <code>len</code> from the specified byte array starting at offset
850     * <code>off</code> to the output stream. The general contract for write(b,
851     * off, len) is that some of the bytes in the array <code>
852     * b</code b> are written to the output stream in order; element
853     * <code>b[off]</code> is the first byte written and
854     * <code>b[off+len-1]</code> is the last byte written by this operation.
855     * 
856     * @param b
857     *          Byte array to be written.
858     * @param off
859     *          Write this offset in stream.
860     * @param len
861     *          Number of bytes to be written.
862     */
863    @Override
864    public void write(byte[] b, int off, int len) throws IOException {
865      out.write(b, off, len);
866    }
867
868    /**
869     * Get the blob name.
870     * 
871     * @return String Blob name.
872     */
873    public String getKey() {
874      return key;
875    }
876
877    /**
878     * Set the blob name.
879     * 
880     * @param key
881     *          Blob name.
882     */
883    public void setKey(String key) {
884      this.key = key;
885    }
886
887    /**
888     * Get the blob name.
889     * 
890     * @return String Blob name.
891     */
892    public String getEncodedKey() {
893      return keyEncoded;
894    }
895
896    /**
897     * Set the blob name.
898     * 
899     * @param anEncodedKey
900     *          Blob name.
901     */
902    public void setEncodedKey(String anEncodedKey) {
903      this.keyEncoded = anEncodedKey;
904    }
905
906    /**
907     * Restore the original key name from the m_key member variable. Note: The
908     * output file stream is created with an encoded blob store key to guarantee
909     * load balancing on the front end of the Azure storage partition servers.
910     * The create also includes the name of the original key value which is
911     * stored in the m_key member variable. This method should only be called
912     * when the stream is closed.
913     */
914    private void restoreKey() throws IOException {
915      store.rename(getEncodedKey(), getKey());
916    }
917  }
918
919  private URI uri;
920  private NativeFileSystemStore store;
921  private AzureNativeFileSystemStore actualStore;
922  private Path workingDir;
923  private long blockSize = MAX_AZURE_BLOCK_SIZE;
924  private AzureFileSystemInstrumentation instrumentation;
925  private String metricsSourceName;
926  private boolean isClosed = false;
927  private static boolean suppressRetryPolicy = false;
928  // A counter to create unique (within-process) names for my metrics sources.
929  private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
930
931  
932  public NativeAzureFileSystem() {
933    // set store in initialize()
934  }
935
936  public NativeAzureFileSystem(NativeFileSystemStore store) {
937    this.store = store;
938  }
939
940  /**
941   * Suppress the default retry policy for the Storage, useful in unit tests to
942   * test negative cases without waiting forever.
943   */
944  @VisibleForTesting
945  static void suppressRetryPolicy() {
946    suppressRetryPolicy = true;
947  }
948
949  /**
950   * Undo the effect of suppressRetryPolicy.
951   */
952  @VisibleForTesting
953  static void resumeRetryPolicy() {
954    suppressRetryPolicy = false;
955  }
956
957  /**
958   * Creates a new metrics source name that's unique within this process.
959   */
960  @VisibleForTesting
961  public static String newMetricsSourceName() {
962    int number = metricsSourceNameCounter.incrementAndGet();
963    final String baseName = "AzureFileSystemMetrics";
964    if (number == 1) { // No need for a suffix for the first one
965      return baseName;
966    } else {
967      return baseName + number;
968    }
969  }
970  
971  /**
972   * Checks if the given URI scheme is a scheme that's affiliated with the Azure
973   * File System.
974   * 
975   * @param scheme
976   *          The URI scheme.
977   * @return true iff it's an Azure File System URI scheme.
978   */
979  private static boolean isWasbScheme(String scheme) {
980    // The valid schemes are: asv (old name), asvs (old name over HTTPS),
981    // wasb (new name), wasbs (new name over HTTPS).
982    return scheme != null
983        && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs")
984            || scheme.equalsIgnoreCase("wasb") || scheme
985              .equalsIgnoreCase("wasbs"));
986  }
987
988  /**
989   * Puts in the authority of the default file system if it is a WASB file
990   * system and the given URI's authority is null.
991   * 
992   * @return The URI with reconstructed authority if necessary and possible.
993   */
994  private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
995    if (null == uri.getAuthority()) {
996      // If WASB is the default file system, get the authority from there
997      URI defaultUri = FileSystem.getDefaultUri(conf);
998      if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) {
999        try {
1000          // Reconstruct the URI with the authority from the default URI.
1001          return new URI(uri.getScheme(), defaultUri.getAuthority(),
1002              uri.getPath(), uri.getQuery(), uri.getFragment());
1003        } catch (URISyntaxException e) {
1004          // This should never happen.
1005          throw new Error("Bad URI construction", e);
1006        }
1007      }
1008    }
1009    return uri;
1010  }
1011
1012  @Override
1013  protected void checkPath(Path path) {
1014    // Make sure to reconstruct the path's authority if needed
1015    super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(),
1016        getConf())));
1017  }
1018
1019  @Override
1020  public void initialize(URI uri, Configuration conf)
1021      throws IOException, IllegalArgumentException {
1022    // Check authority for the URI to guarantee that it is non-null.
1023    uri = reconstructAuthorityIfNeeded(uri, conf);
1024    if (null == uri.getAuthority()) {
1025      final String errMsg = String
1026          .format("Cannot initialize WASB file system, URI authority not recognized.");
1027      throw new IllegalArgumentException(errMsg);
1028    }
1029    super.initialize(uri, conf);
1030
1031    if (store == null) {
1032      store = createDefaultStore(conf);
1033    }
1034
1035    // Make sure the metrics system is available before interacting with Azure
1036    AzureFileSystemMetricsSystem.fileSystemStarted();
1037    metricsSourceName = newMetricsSourceName();
1038    String sourceDesc = "Azure Storage Volume File System metrics";
1039    instrumentation = new AzureFileSystemInstrumentation(conf);
1040    AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc,
1041        instrumentation);
1042
1043    store.initialize(uri, conf, instrumentation);
1044    setConf(conf);
1045    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
1046    this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
1047        .getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
1048    this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
1049        MAX_AZURE_BLOCK_SIZE);
1050
1051    if (LOG.isDebugEnabled()) {
1052      LOG.debug("NativeAzureFileSystem. Initializing.");
1053      LOG.debug("  blockSize  = "
1054          + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
1055    }
1056  }
1057
1058  private NativeFileSystemStore createDefaultStore(Configuration conf) {
1059    actualStore = new AzureNativeFileSystemStore();
1060
1061    if (suppressRetryPolicy) {
1062      actualStore.suppressRetryPolicy();
1063    }
1064    return actualStore;
1065  }
1066
1067  /**
1068   * Azure Storage doesn't allow the blob names to end in a period,
1069   * so encode this here to work around that limitation.
1070   */
1071  private static String encodeTrailingPeriod(String toEncode) {
1072    Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode);
1073    return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER);
1074  }
1075
1076  /**
1077   * Reverse the encoding done by encodeTrailingPeriod().
1078   */
1079  private static String decodeTrailingPeriod(String toDecode) {
1080    Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode);
1081    return matcher.replaceAll(".");
1082  }
1083
1084  /**
1085   * Convert the path to a key. By convention, any leading or trailing slash is
1086   * removed, except for the special case of a single slash.
1087   */
1088  @VisibleForTesting
1089  public String pathToKey(Path path) {
1090    // Convert the path to a URI to parse the scheme, the authority, and the
1091    // path from the path object.
1092    URI tmpUri = path.toUri();
1093    String pathUri = tmpUri.getPath();
1094
1095    // The scheme and authority is valid. If the path does not exist add a "/"
1096    // separator to list the root of the container.
1097    Path newPath = path;
1098    if ("".equals(pathUri)) {
1099      newPath = new Path(tmpUri.toString() + Path.SEPARATOR);
1100    }
1101
1102    // Verify path is absolute if the path refers to a windows drive scheme.
1103    if (!newPath.isAbsolute()) {
1104      throw new IllegalArgumentException("Path must be absolute: " + path);
1105    }
1106
1107    String key = null;
1108    key = newPath.toUri().getPath();
1109    key = removeTrailingSlash(key);
1110    key = encodeTrailingPeriod(key);
1111    if (key.length() == 1) {
1112      return key;
1113    } else {
1114      return key.substring(1); // remove initial slash
1115    }
1116  }
1117
1118  // Remove any trailing slash except for the case of a single slash.
1119  private static String removeTrailingSlash(String key) {
1120    if (key.length() == 0 || key.length() == 1) {
1121      return key;
1122    }
1123    if (key.charAt(key.length() - 1) == '/') {
1124      return key.substring(0, key.length() - 1);
1125    } else {
1126      return key;
1127    }
1128  }
1129
1130  private static Path keyToPath(String key) {
1131    if (key.equals("/")) {
1132      return new Path("/"); // container
1133    }
1134    return new Path("/" + decodeTrailingPeriod(key));
1135  }
1136
1137  /**
1138   * Get the absolute version of the path (fully qualified).
1139   * This is public for testing purposes.
1140   *
1141   * @param path
1142   * @return fully qualified path
1143   */
1144  @VisibleForTesting
1145  public Path makeAbsolute(Path path) {
1146    if (path.isAbsolute()) {
1147      return path;
1148    }
1149    return new Path(workingDir, path);
1150  }
1151
1152  /**
1153   * For unit test purposes, retrieves the AzureNativeFileSystemStore store
1154   * backing this file system.
1155   * 
1156   * @return The store object.
1157   */
1158  @VisibleForTesting
1159  public AzureNativeFileSystemStore getStore() {
1160    return actualStore;
1161  }
1162  
1163  NativeFileSystemStore getStoreInterface() {
1164    return store;
1165  }
1166
1167  /**
1168   * Gets the metrics source for this file system.
1169   * This is mainly here for unit testing purposes.
1170   *
1171   * @return the metrics source.
1172   */
1173  public AzureFileSystemInstrumentation getInstrumentation() {
1174    return instrumentation;
1175  }
1176
1177  /** This optional operation is not yet supported. */
1178  @Override
1179  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
1180      throws IOException {
1181    throw new IOException("Not supported");
1182  }
1183
1184  @Override
1185  public FSDataOutputStream create(Path f, FsPermission permission,
1186      boolean overwrite, int bufferSize, short replication, long blockSize,
1187      Progressable progress) throws IOException {
1188    return create(f, permission, overwrite, true,
1189        bufferSize, replication, blockSize, progress,
1190        (SelfRenewingLease) null);
1191  }
1192
1193  /**
1194   * Get a self-renewing lease on the specified file.
1195   */
1196  public SelfRenewingLease acquireLease(Path path) throws AzureException {
1197    String fullKey = pathToKey(makeAbsolute(path));
1198    return getStore().acquireLease(fullKey);
1199  }
1200
1201  @Override
1202  @SuppressWarnings("deprecation")
1203  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1204      boolean overwrite, int bufferSize, short replication, long blockSize,
1205      Progressable progress) throws IOException {
1206
1207    Path parent = f.getParent();
1208
1209    // Get exclusive access to folder if this is a directory designated
1210    // for atomic rename. The primary use case of for HBase write-ahead
1211    // log file management.
1212    SelfRenewingLease lease = null;
1213    if (store.isAtomicRenameKey(pathToKey(f))) {
1214      try {
1215        lease = acquireLease(parent);
1216      } catch (AzureException e) {
1217
1218        String errorCode = "";
1219        try {
1220          StorageException e2 = (StorageException) e.getCause();
1221          errorCode = e2.getErrorCode();
1222        } catch (Exception e3) {
1223          // do nothing if cast fails
1224        }
1225        if (errorCode.equals("BlobNotFound")) {
1226          throw new FileNotFoundException("Cannot create file " +
1227              f.getName() + " because parent folder does not exist.");
1228        }
1229
1230        LOG.warn("Got unexpected exception trying to get lease on "
1231          + pathToKey(parent) + ". " + e.getMessage());
1232        throw e;
1233      }
1234    }
1235
1236    // See if the parent folder exists. If not, throw error.
1237    // The exists() check will push any pending rename operation forward,
1238    // if there is one, and return false.
1239    //
1240    // At this point, we have exclusive access to the source folder
1241    // via the lease, so we will not conflict with an active folder
1242    // rename operation.
1243    if (!exists(parent)) {
1244      try {
1245
1246        // This'll let the keep-alive thread exit as soon as it wakes up.
1247        lease.free();
1248      } catch (Exception e) {
1249        LOG.warn("Unable to free lease because: " + e.getMessage());
1250      }
1251      throw new FileNotFoundException("Cannot create file " +
1252          f.getName() + " because parent folder does not exist.");
1253    }
1254
1255    // Create file inside folder.
1256    FSDataOutputStream out = null;
1257    try {
1258      out = create(f, permission, overwrite, false,
1259          bufferSize, replication, blockSize, progress, lease);
1260    } finally {
1261      // Release exclusive access to folder.
1262      try {
1263        if (lease != null) {
1264          lease.free();
1265        }
1266      } catch (Exception e) {
1267        IOUtils.cleanup(LOG, out);
1268        String msg = "Unable to free lease on " + parent.toUri();
1269        LOG.error(msg);
1270        throw new IOException(msg, e);
1271      }
1272    }
1273    return out;
1274  }
1275
1276  @Override
1277  @SuppressWarnings("deprecation")
1278  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1279      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1280      Progressable progress) throws IOException {
1281
1282    // Check if file should be appended or overwritten. Assume that the file
1283    // is overwritten on if the CREATE and OVERWRITE create flags are set. Note
1284    // that any other combinations of create flags will result in an open new or
1285    // open with append.
1286    final EnumSet<CreateFlag> createflags =
1287        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
1288    boolean overwrite = flags.containsAll(createflags);
1289
1290    // Delegate the create non-recursive call.
1291    return this.createNonRecursive(f, permission, overwrite,
1292        bufferSize, replication, blockSize, progress);
1293  }
1294
1295  @Override
1296  @SuppressWarnings("deprecation")
1297  public FSDataOutputStream createNonRecursive(Path f,
1298      boolean overwrite, int bufferSize, short replication, long blockSize,
1299      Progressable progress) throws IOException {
1300    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1301        overwrite, bufferSize, replication, blockSize, progress);
1302  }
1303
1304
1305  /**
1306   * Create an Azure blob and return an output stream to use
1307   * to write data to it.
1308   *
1309   * @param f
1310   * @param permission
1311   * @param overwrite
1312   * @param createParent
1313   * @param bufferSize
1314   * @param replication
1315   * @param blockSize
1316   * @param progress
1317   * @param parentFolderLease Lease on parent folder (or null if
1318   * no lease).
1319   * @return
1320   * @throws IOException
1321   */
1322  private FSDataOutputStream create(Path f, FsPermission permission,
1323      boolean overwrite, boolean createParent, int bufferSize,
1324      short replication, long blockSize, Progressable progress,
1325      SelfRenewingLease parentFolderLease)
1326          throws IOException {
1327
1328    if (LOG.isDebugEnabled()) {
1329      LOG.debug("Creating file: " + f.toString());
1330    }
1331
1332    if (containsColon(f)) {
1333      throw new IOException("Cannot create file " + f
1334          + " through WASB that has colons in the name");
1335    }
1336
1337    Path absolutePath = makeAbsolute(f);
1338    String key = pathToKey(absolutePath);
1339
1340    FileMetadata existingMetadata = store.retrieveMetadata(key);
1341    if (existingMetadata != null) {
1342      if (existingMetadata.isDir()) {
1343        throw new IOException("Cannot create file " + f
1344            + "; already exists as a directory.");
1345      }
1346      if (!overwrite) {
1347        throw new IOException("File already exists:" + f);
1348      }
1349    }
1350
1351    Path parentFolder = absolutePath.getParent();
1352    if (parentFolder != null && parentFolder.getParent() != null) { // skip root
1353      // Update the parent folder last modified time if the parent folder
1354      // already exists.
1355      String parentKey = pathToKey(parentFolder);
1356      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
1357      if (parentMetadata != null && parentMetadata.isDir() &&
1358          parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
1359        store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
1360      } else {
1361        // Make sure that the parent folder exists.
1362        // Create it using inherited permissions from the first existing directory going up the path
1363        Path firstExisting = parentFolder.getParent();
1364        FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting));
1365        while(metadata == null) {
1366          // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata
1367          firstExisting = firstExisting.getParent();
1368          metadata = store.retrieveMetadata(pathToKey(firstExisting));
1369        }
1370        mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
1371      }
1372    }
1373
1374    // Mask the permission first (with the default permission mask as well).
1375    FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
1376    PermissionStatus permissionStatus = createPermissionStatus(masked);
1377
1378    OutputStream bufOutStream;
1379    if (store.isPageBlobKey(key)) {
1380      // Store page blobs directly in-place without renames.
1381      bufOutStream = store.storefile(key, permissionStatus);
1382    } else {
1383      // This is a block blob, so open the output blob stream based on the
1384      // encoded key.
1385      //
1386      String keyEncoded = encodeKey(key);
1387
1388
1389      // First create a blob at the real key, pointing back to the temporary file
1390      // This accomplishes a few things:
1391      // 1. Makes sure we can create a file there.
1392      // 2. Makes it visible to other concurrent threads/processes/nodes what
1393      // we're
1394      // doing.
1395      // 3. Makes it easier to restore/cleanup data in the event of us crashing.
1396      store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
1397
1398      // The key is encoded to point to a common container at the storage server.
1399      // This reduces the number of splits on the server side when load balancing.
1400      // Ingress to Azure storage can take advantage of earlier splits. We remove
1401      // the root path to the key and prefix a random GUID to the tail (or leaf
1402      // filename) of the key. Keys are thus broadly and randomly distributed over
1403      // a single container to ease load balancing on the storage server. When the
1404      // blob is committed it is renamed to its earlier key. Uncommitted blocks
1405      // are not cleaned up and we leave it to Azure storage to garbage collect
1406      // these
1407      // blocks.
1408      bufOutStream = new NativeAzureFsOutputStream(store.storefile(
1409          keyEncoded, permissionStatus), key, keyEncoded);
1410    }
1411    // Construct the data output stream from the buffered output stream.
1412    FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
1413
1414    
1415    // Increment the counter
1416    instrumentation.fileCreated();
1417    
1418    // Return data output stream to caller.
1419    return fsOut;
1420  }
1421
1422  @Override
1423  @Deprecated
1424  public boolean delete(Path path) throws IOException {
1425    return delete(path, true);
1426  }
1427
1428  @Override
1429  public boolean delete(Path f, boolean recursive) throws IOException {
1430    return delete(f, recursive, false);
1431  }
1432
1433  /**
1434   * Delete the specified file or folder. The parameter
1435   * skipParentFolderLastModifidedTimeUpdate
1436   * is used in the case of atomic folder rename redo. In that case, there is
1437   * a lease on the parent folder, so (without reworking the code) modifying
1438   * the parent folder update time will fail because of a conflict with the
1439   * lease. Since we are going to delete the folder soon anyway so accurate
1440   * modified time is not necessary, it's easier to just skip
1441   * the modified time update.
1442   *
1443   * @param f
1444   * @param recursive
1445   * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last
1446   * modified time.
1447   * @return true if and only if the file is deleted
1448   * @throws IOException
1449   */
1450  public boolean delete(Path f, boolean recursive,
1451      boolean skipParentFolderLastModifidedTimeUpdate) throws IOException {
1452
1453    if (LOG.isDebugEnabled()) {
1454      LOG.debug("Deleting file: " + f.toString());
1455    }
1456
1457    Path absolutePath = makeAbsolute(f);
1458    String key = pathToKey(absolutePath);
1459
1460    // Capture the metadata for the path.
1461    //
1462    FileMetadata metaFile = store.retrieveMetadata(key);
1463
1464    if (null == metaFile) {
1465      // The path to be deleted does not exist.
1466      return false;
1467    }
1468
1469    // The path exists, determine if it is a folder containing objects,
1470    // an empty folder, or a simple file and take the appropriate actions.
1471    if (!metaFile.isDir()) {
1472      // The path specifies a file. We need to check the parent path
1473      // to make sure it's a proper materialized directory before we
1474      // delete the file. Otherwise we may get into a situation where
1475      // the file we were deleting was the last one in an implicit directory
1476      // (e.g. the blob store only contains the blob a/b and there's no
1477      // corresponding directory blob a) and that would implicitly delete
1478      // the directory as well, which is not correct.
1479      Path parentPath = absolutePath.getParent();
1480      if (parentPath.getParent() != null) {// Not root
1481        String parentKey = pathToKey(parentPath);
1482        FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
1483        if (!parentMetadata.isDir()) {
1484          // Invalid state: the parent path is actually a file. Throw.
1485          throw new AzureException("File " + f + " has a parent directory "
1486              + parentPath + " which is also a file. Can't resolve.");
1487        }
1488        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1489          if (LOG.isDebugEnabled()) {
1490            LOG.debug("Found an implicit parent directory while trying to"
1491                + " delete the file " + f + ". Creating the directory blob for"
1492                + " it in " + parentKey + ".");
1493          }
1494          store.storeEmptyFolder(parentKey,
1495              createPermissionStatus(FsPermission.getDefault()));
1496        } else {
1497          if (!skipParentFolderLastModifidedTimeUpdate) {
1498            store.updateFolderLastModifiedTime(parentKey, null);
1499          }
1500        }
1501      }
1502      store.delete(key);
1503      instrumentation.fileDeleted();
1504    } else {
1505      // The path specifies a folder. Recursively delete all entries under the
1506      // folder.
1507      Path parentPath = absolutePath.getParent();
1508      if (parentPath.getParent() != null) {
1509        String parentKey = pathToKey(parentPath);
1510        FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
1511
1512        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1513          if (LOG.isDebugEnabled()) {
1514            LOG.debug("Found an implicit parent directory while trying to"
1515                + " delete the directory " + f
1516                + ". Creating the directory blob for" + " it in " + parentKey
1517                + ".");
1518          }
1519          store.storeEmptyFolder(parentKey,
1520              createPermissionStatus(FsPermission.getDefault()));
1521        }
1522      }
1523
1524      // List all the blobs in the current folder.
1525      String priorLastKey = null;
1526      PartialListing listing = store.listAll(key, AZURE_LIST_ALL, 1,
1527          priorLastKey);
1528      FileMetadata[] contents = listing.getFiles();
1529      if (!recursive && contents.length > 0) {
1530        // The folder is non-empty and recursive delete was not specified.
1531        // Throw an exception indicating that a non-recursive delete was
1532        // specified for a non-empty folder.
1533        throw new IOException("Non-recursive delete of non-empty directory "
1534            + f.toString());
1535      }
1536
1537      // Delete all the files in the folder.
1538      for (FileMetadata p : contents) {
1539        // Tag on the directory name found as the suffix of the suffix of the
1540        // parent directory to get the new absolute path.
1541        String suffix = p.getKey().substring(
1542            p.getKey().lastIndexOf(PATH_DELIMITER));
1543        if (!p.isDir()) {
1544          store.delete(key + suffix);
1545          instrumentation.fileDeleted();
1546        } else {
1547          // Recursively delete contents of the sub-folders. Notice this also
1548          // deletes the blob for the directory.
1549          if (!delete(new Path(f.toString() + suffix), true)) {
1550            return false;
1551          }
1552        }
1553      }
1554      store.delete(key);
1555
1556      // Update parent directory last modified time
1557      Path parent = absolutePath.getParent();
1558      if (parent != null && parent.getParent() != null) { // not root
1559        String parentKey = pathToKey(parent);
1560        if (!skipParentFolderLastModifidedTimeUpdate) {
1561          store.updateFolderLastModifiedTime(parentKey, null);
1562        }
1563      }
1564      instrumentation.directoryDeleted();
1565    }
1566
1567    // File or directory was successfully deleted.
1568    return true;
1569  }
1570
1571  @Override
1572  public FileStatus getFileStatus(Path f) throws IOException {
1573
1574    if (LOG.isDebugEnabled()) {
1575      LOG.debug("Getting the file status for " + f.toString());
1576    }
1577
1578    // Capture the absolute path and the path to key.
1579    Path absolutePath = makeAbsolute(f);
1580    String key = pathToKey(absolutePath);
1581    if (key.length() == 0) { // root always exists
1582      return newDirectory(null, absolutePath);
1583    }
1584
1585    // The path is either a folder or a file. Retrieve metadata to
1586    // determine if it is a directory or file.
1587    FileMetadata meta = store.retrieveMetadata(key);
1588    if (meta != null) {
1589      if (meta.isDir()) {
1590        // The path is a folder with files in it.
1591        //
1592        if (LOG.isDebugEnabled()) {
1593          LOG.debug("Path " + f.toString() + "is a folder.");
1594        }
1595
1596        // If a rename operation for the folder was pending, redo it.
1597        // Then the file does not exist, so signal that.
1598        if (conditionalRedoFolderRename(f)) {
1599          throw new FileNotFoundException(
1600              absolutePath + ": No such file or directory.");
1601        }
1602
1603        // Return reference to the directory object.
1604        return newDirectory(meta, absolutePath);
1605      }
1606
1607      // The path is a file.
1608      if (LOG.isDebugEnabled()) {
1609        LOG.debug("Found the path: " + f.toString() + " as a file.");
1610      }
1611
1612      // Return with reference to a file object.
1613      return newFile(meta, absolutePath);
1614    }
1615
1616    // File not found. Throw exception no such file or directory.
1617    //
1618    throw new FileNotFoundException(
1619        absolutePath + ": No such file or directory.");
1620  }
1621
1622  // Return true if there is a rename pending and we redo it, otherwise false.
1623  private boolean conditionalRedoFolderRename(Path f) throws IOException {
1624
1625    // Can't rename /, so return immediately in that case.
1626    if (f.getName().equals("")) {
1627      return false;
1628    }
1629
1630    // Check if there is a -RenamePending.json file for this folder, and if so,
1631    // redo the rename.
1632    Path absoluteRenamePendingFile = renamePendingFilePath(f);
1633    if (exists(absoluteRenamePendingFile)) {
1634      FolderRenamePending pending =
1635          new FolderRenamePending(absoluteRenamePendingFile, this);
1636      pending.redo();
1637      return true;
1638    } else {
1639      return false;
1640    }
1641  }
1642
1643  // Return the path name that would be used for rename of folder with path f.
1644  private Path renamePendingFilePath(Path f) {
1645    Path absPath = makeAbsolute(f);
1646    String key = pathToKey(absPath);
1647    key += "-RenamePending.json";
1648    return keyToPath(key);
1649  }
1650
1651  @Override
1652  public URI getUri() {
1653    return uri;
1654  }
1655
1656  /**
1657   * Retrieve the status of a given path if it is a file, or of all the
1658   * contained files if it is a directory.
1659   */
1660  @Override
1661  public FileStatus[] listStatus(Path f) throws IOException {
1662
1663    if (LOG.isDebugEnabled()) {
1664      LOG.debug("Listing status for " + f.toString());
1665    }
1666
1667    Path absolutePath = makeAbsolute(f);
1668    String key = pathToKey(absolutePath);
1669    Set<FileStatus> status = new TreeSet<FileStatus>();
1670    FileMetadata meta = store.retrieveMetadata(key);
1671
1672    if (meta != null) {
1673      if (!meta.isDir()) {
1674        if (LOG.isDebugEnabled()) {
1675          LOG.debug("Found path as a file");
1676        }
1677        return new FileStatus[] { newFile(meta, absolutePath) };
1678      }
1679      String partialKey = null;
1680      PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
1681
1682      // For any -RenamePending.json files in the listing,
1683      // push the rename forward.
1684      boolean renamed = conditionalRedoFolderRenames(listing);
1685
1686      // If any renames were redone, get another listing,
1687      // since the current one may have changed due to the redo.
1688      if (renamed) {
1689        listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
1690      }
1691
1692      for (FileMetadata fileMetadata : listing.getFiles()) {
1693        Path subpath = keyToPath(fileMetadata.getKey());
1694
1695        // Test whether the metadata represents a file or directory and
1696        // add the appropriate metadata object.
1697        //
1698        // Note: There was a very old bug here where directories were added
1699        // to the status set as files flattening out recursive listings
1700        // using "-lsr" down the file system hierarchy.
1701        if (fileMetadata.isDir()) {
1702          // Make sure we hide the temp upload folder
1703          if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
1704            // Don't expose that.
1705            continue;
1706          }
1707          status.add(newDirectory(fileMetadata, subpath));
1708        } else {
1709          status.add(newFile(fileMetadata, subpath));
1710        }
1711      }
1712      if (LOG.isDebugEnabled()) {
1713        LOG.debug("Found path as a directory with " + status.size()
1714            + " files in it.");
1715      }
1716    } else {
1717      // There is no metadata found for the path.
1718      if (LOG.isDebugEnabled()) {
1719        LOG.debug("Did not find any metadata for path: " + key);
1720      }
1721
1722      throw new FileNotFoundException("File" + f + " does not exist.");
1723    }
1724
1725    return status.toArray(new FileStatus[0]);
1726  }
1727
1728  // Redo any folder renames needed if there are rename pending files in the
1729  // directory listing. Return true if one or more redo operations were done.
1730  private boolean conditionalRedoFolderRenames(PartialListing listing)
1731      throws IllegalArgumentException, IOException {
1732    boolean renamed = false;
1733    for (FileMetadata fileMetadata : listing.getFiles()) {
1734      Path subpath = keyToPath(fileMetadata.getKey());
1735      if (isRenamePendingFile(subpath)) {
1736        FolderRenamePending pending =
1737            new FolderRenamePending(subpath, this);
1738        pending.redo();
1739        renamed = true;
1740      }
1741    }
1742    return renamed;
1743  }
1744
1745  // True if this is a folder rename pending file, else false.
1746  private boolean isRenamePendingFile(Path path) {
1747    return path.toString().endsWith(FolderRenamePending.SUFFIX);
1748  }
1749
1750  private FileStatus newFile(FileMetadata meta, Path path) {
1751    return new FileStatus (
1752        meta.getLength(),
1753        false,
1754        1,
1755        blockSize,
1756        meta.getLastModified(),
1757        0,
1758        meta.getPermissionStatus().getPermission(),
1759        meta.getPermissionStatus().getUserName(),
1760        meta.getPermissionStatus().getGroupName(),
1761        path.makeQualified(getUri(), getWorkingDirectory()));
1762  }
1763
1764  private FileStatus newDirectory(FileMetadata meta, Path path) {
1765    return new FileStatus (
1766        0,
1767        true,
1768        1,
1769        blockSize,
1770        meta == null ? 0 : meta.getLastModified(),
1771        0,
1772        meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
1773        meta == null ? "" : meta.getPermissionStatus().getUserName(),
1774        meta == null ? "" : meta.getPermissionStatus().getGroupName(),
1775        path.makeQualified(getUri(), getWorkingDirectory()));
1776  }
1777
1778  private static enum UMaskApplyMode {
1779    NewFile,
1780    NewDirectory,
1781    NewDirectoryNoUmask,
1782    ChangeExistingFile,
1783    ChangeExistingDirectory,
1784  }
1785
1786  /**
1787   * Applies the applicable UMASK's on the given permission.
1788   * 
1789   * @param permission
1790   *          The permission to mask.
1791   * @param applyMode
1792   *          Whether to also apply the default umask.
1793   * @return The masked persmission.
1794   */
1795  private FsPermission applyUMask(final FsPermission permission,
1796      final UMaskApplyMode applyMode) {
1797    FsPermission newPermission = new FsPermission(permission);
1798    // Apply the default umask - this applies for new files or directories.
1799    if (applyMode == UMaskApplyMode.NewFile
1800        || applyMode == UMaskApplyMode.NewDirectory) {
1801      newPermission = newPermission
1802          .applyUMask(FsPermission.getUMask(getConf()));
1803    }
1804    return newPermission;
1805  }
1806
1807  /**
1808   * Creates the PermissionStatus object to use for the given permission, based
1809   * on the current user in context.
1810   * 
1811   * @param permission
1812   *          The permission for the file.
1813   * @return The permission status object to use.
1814   * @throws IOException
1815   *           If login fails in getCurrentUser
1816   */
1817  private PermissionStatus createPermissionStatus(FsPermission permission)
1818      throws IOException {
1819    // Create the permission status for this file based on current user
1820    return new PermissionStatus(
1821        UserGroupInformation.getCurrentUser().getShortUserName(),
1822        getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
1823            AZURE_DEFAULT_GROUP_DEFAULT),
1824        permission);
1825  }
1826
1827  @Override
1828  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
1829      return mkdirs(f, permission, false);
1830  }
1831
1832  public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {
1833    if (LOG.isDebugEnabled()) {
1834      LOG.debug("Creating directory: " + f.toString());
1835    }
1836
1837    if (containsColon(f)) {
1838      throw new IOException("Cannot create directory " + f
1839          + " through WASB that has colons in the name");
1840    }
1841
1842    Path absolutePath = makeAbsolute(f);
1843    PermissionStatus permissionStatus = null;
1844    if(noUmask) {
1845      // ensure owner still has wx permissions at the minimum
1846      permissionStatus = createPermissionStatus(
1847          applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)),
1848              UMaskApplyMode.NewDirectoryNoUmask));
1849    } else {
1850      permissionStatus = createPermissionStatus(
1851          applyUMask(permission, UMaskApplyMode.NewDirectory));
1852    }
1853
1854
1855    ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
1856    ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>();
1857    boolean childCreated = false;
1858    // Check that there is no file in the parent chain of the given path.
1859    for (Path current = absolutePath, parent = current.getParent();
1860        parent != null; // Stop when you get to the root
1861        current = parent, parent = current.getParent()) {
1862      String currentKey = pathToKey(current);
1863      FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
1864      if (currentMetadata != null && !currentMetadata.isDir()) {
1865        throw new IOException("Cannot create directory " + f + " because " +
1866            current + " is an existing file.");
1867      } else if (currentMetadata == null) {
1868        keysToCreateAsFolder.add(currentKey);
1869        childCreated = true;
1870      } else {
1871        // The directory already exists. Its last modified time need to be
1872        // updated if there is a child directory created under it.
1873        if (childCreated) {
1874          keysToUpdateAsFolder.add(currentKey);
1875        }
1876        childCreated = false;
1877      }
1878    }
1879
1880    for (String currentKey : keysToCreateAsFolder) {
1881      store.storeEmptyFolder(currentKey, permissionStatus);
1882    }
1883
1884    instrumentation.directoryCreated();
1885
1886    // otherwise throws exception
1887    return true;
1888  }
1889
1890  @Override
1891  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
1892    if (LOG.isDebugEnabled()) {
1893      LOG.debug("Opening file: " + f.toString());
1894    }
1895
1896    Path absolutePath = makeAbsolute(f);
1897    String key = pathToKey(absolutePath);
1898    FileMetadata meta = store.retrieveMetadata(key);
1899    if (meta == null) {
1900      throw new FileNotFoundException(f.toString());
1901    }
1902    if (meta.isDir()) {
1903      throw new FileNotFoundException(f.toString()
1904          + " is a directory not a file.");
1905    }
1906
1907    return new FSDataInputStream(new BufferedFSInputStream(
1908        new NativeAzureFsInputStream(store.retrieve(key), key, meta.getLength()), bufferSize));
1909  }
1910
1911  @Override
1912  public boolean rename(Path src, Path dst) throws IOException {
1913
1914    FolderRenamePending renamePending = null;
1915
1916    if (LOG.isDebugEnabled()) {
1917      LOG.debug("Moving " + src + " to " + dst);
1918    }
1919
1920    if (containsColon(dst)) {
1921      throw new IOException("Cannot rename to file " + dst
1922          + " through WASB that has colons in the name");
1923    }
1924
1925    String srcKey = pathToKey(makeAbsolute(src));
1926
1927    if (srcKey.length() == 0) {
1928      // Cannot rename root of file system
1929      return false;
1930    }
1931
1932    // Figure out the final destination
1933    Path absoluteDst = makeAbsolute(dst);
1934    String dstKey = pathToKey(absoluteDst);
1935    FileMetadata dstMetadata = store.retrieveMetadata(dstKey);
1936    if (dstMetadata != null && dstMetadata.isDir()) {
1937      // It's an existing directory.
1938      dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
1939      if (LOG.isDebugEnabled()) {
1940        LOG.debug("Destination " + dst
1941            + " is a directory, adjusted the destination to be " + dstKey);
1942      }
1943    } else if (dstMetadata != null) {
1944      // Attempting to overwrite a file using rename()
1945      if (LOG.isDebugEnabled()) {
1946        LOG.debug("Destination " + dst
1947            + " is an already existing file, failing the rename.");
1948      }
1949      return false;
1950    } else {
1951      // Check that the parent directory exists.
1952      FileMetadata parentOfDestMetadata =
1953          store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
1954      if (parentOfDestMetadata == null) {
1955        if (LOG.isDebugEnabled()) {
1956          LOG.debug("Parent of the destination " + dst
1957              + " doesn't exist, failing the rename.");
1958        }
1959        return false;
1960      } else if (!parentOfDestMetadata.isDir()) {
1961        if (LOG.isDebugEnabled()) {
1962          LOG.debug("Parent of the destination " + dst
1963              + " is a file, failing the rename.");
1964        }
1965        return false;
1966      }
1967    }
1968    FileMetadata srcMetadata = store.retrieveMetadata(srcKey);
1969    if (srcMetadata == null) {
1970      // Source doesn't exist
1971      if (LOG.isDebugEnabled()) {
1972        LOG.debug("Source " + src + " doesn't exist, failing the rename.");
1973      }
1974      return false;
1975    } else if (!srcMetadata.isDir()) {
1976      if (LOG.isDebugEnabled()) {
1977        LOG.debug("Source " + src + " found as a file, renaming.");
1978      }
1979      store.rename(srcKey, dstKey);
1980    } else {
1981
1982      // Prepare for, execute and clean up after of all files in folder, and
1983      // the root file, and update the last modified time of the source and
1984      // target parent folders. The operation can be redone if it fails part
1985      // way through, by applying the "Rename Pending" file.
1986
1987      // The following code (internally) only does atomic rename preparation
1988      // and lease management for page blob folders, limiting the scope of the
1989      // operation to HBase log file folders, where atomic rename is required.
1990      // In the future, we could generalize it easily to all folders.
1991      renamePending = prepareAtomicFolderRename(srcKey, dstKey);
1992      renamePending.execute();
1993      if (LOG.isDebugEnabled()) {
1994        LOG.debug("Renamed " + src + " to " + dst + " successfully.");
1995      }
1996      renamePending.cleanup();
1997      return true;
1998    }
1999
2000    // Update the last-modified time of the parent folders of both source
2001    // and destination.
2002    updateParentFolderLastModifiedTime(srcKey);
2003    updateParentFolderLastModifiedTime(dstKey);
2004
2005    if (LOG.isDebugEnabled()) {
2006      LOG.debug("Renamed " + src + " to " + dst + " successfully.");
2007    }
2008    return true;
2009  }
2010
2011  /**
2012   * Update the last-modified time of the parent folder of the file
2013   * identified by key.
2014   * @param key
2015   * @throws IOException
2016   */
2017  private void updateParentFolderLastModifiedTime(String key)
2018      throws IOException {
2019    Path parent = makeAbsolute(keyToPath(key)).getParent();
2020    if (parent != null && parent.getParent() != null) { // not root
2021      String parentKey = pathToKey(parent);
2022
2023      // ensure the parent is a materialized folder
2024      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
2025      // The metadata could be null if the implicit folder only contains a
2026      // single file. In this case, the parent folder no longer exists if the
2027      // file is renamed; so we can safely ignore the null pointer case.
2028      if (parentMetadata != null) {
2029        if (parentMetadata.isDir()
2030            && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2031          store.storeEmptyFolder(parentKey,
2032              createPermissionStatus(FsPermission.getDefault()));
2033        }
2034
2035        store.updateFolderLastModifiedTime(parentKey, null);
2036      }
2037    }
2038  }
2039
2040  /**
2041   * If the source is a page blob folder,
2042   * prepare to rename this folder atomically. This means to get exclusive
2043   * access to the source folder, and record the actions to be performed for
2044   * this rename in a "Rename Pending" file. This code was designed to
2045   * meet the needs of HBase, which requires atomic rename of write-ahead log
2046   * (WAL) folders for correctness.
2047   *
2048   * Before calling this method, the caller must ensure that the source is a
2049   * folder.
2050   *
2051   * For non-page-blob directories, prepare the in-memory information needed,
2052   * but don't take the lease or write the redo file. This is done to limit the
2053   * scope of atomic folder rename to HBase, at least at the time of writing
2054   * this code.
2055   *
2056   * @param srcKey Source folder name.
2057   * @param dstKey Destination folder name.
2058   * @throws IOException
2059   */
2060  private FolderRenamePending prepareAtomicFolderRename(
2061      String srcKey, String dstKey) throws IOException {
2062
2063    if (store.isAtomicRenameKey(srcKey)) {
2064
2065      // Block unwanted concurrent access to source folder.
2066      SelfRenewingLease lease = leaseSourceFolder(srcKey);
2067
2068      // Prepare in-memory information needed to do or redo a folder rename.
2069      FolderRenamePending renamePending =
2070          new FolderRenamePending(srcKey, dstKey, lease, this);
2071
2072      // Save it to persistent storage to help recover if the operation fails.
2073      renamePending.writeFile(this);
2074      return renamePending;
2075    } else {
2076      FolderRenamePending renamePending =
2077          new FolderRenamePending(srcKey, dstKey, null, this);
2078      return renamePending;
2079    }
2080  }
2081
2082  /**
2083   * Get a self-renewing Azure blob lease on the source folder zero-byte file.
2084   */
2085  private SelfRenewingLease leaseSourceFolder(String srcKey)
2086      throws AzureException {
2087    return store.acquireLease(srcKey);
2088  }
2089
2090  /**
2091   * Return an array containing hostnames, offset and size of
2092   * portions of the given file. For WASB we'll just lie and give
2093   * fake hosts to make sure we get many splits in MR jobs.
2094   */
2095  @Override
2096  public BlockLocation[] getFileBlockLocations(FileStatus file,
2097      long start, long len) throws IOException {
2098    if (file == null) {
2099      return null;
2100    }
2101
2102    if ((start < 0) || (len < 0)) {
2103      throw new IllegalArgumentException("Invalid start or len parameter");
2104    }
2105
2106    if (file.getLen() < start) {
2107      return new BlockLocation[0];
2108    }
2109    final String blobLocationHost = getConf().get(
2110        AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
2111        AZURE_BLOCK_LOCATION_HOST_DEFAULT);
2112    final String[] name = { blobLocationHost };
2113    final String[] host = { blobLocationHost };
2114    long blockSize = file.getBlockSize();
2115    if (blockSize <= 0) {
2116      throw new IllegalArgumentException(
2117          "The block size for the given file is not a positive number: "
2118              + blockSize);
2119    }
2120    int numberOfLocations = (int) (len / blockSize)
2121        + ((len % blockSize == 0) ? 0 : 1);
2122    BlockLocation[] locations = new BlockLocation[numberOfLocations];
2123    for (int i = 0; i < locations.length; i++) {
2124      long currentOffset = start + (i * blockSize);
2125      long currentLength = Math.min(blockSize, start + len - currentOffset);
2126      locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
2127    }
2128    return locations;
2129  }
2130
2131  /**
2132   * Set the working directory to the given directory.
2133   */
2134  @Override
2135  public void setWorkingDirectory(Path newDir) {
2136    workingDir = makeAbsolute(newDir);
2137  }
2138
2139  @Override
2140  public Path getWorkingDirectory() {
2141    return workingDir;
2142  }
2143
2144  @Override
2145  public void setPermission(Path p, FsPermission permission) throws IOException {
2146    Path absolutePath = makeAbsolute(p);
2147    String key = pathToKey(absolutePath);
2148    FileMetadata metadata = store.retrieveMetadata(key);
2149    if (metadata == null) {
2150      throw new FileNotFoundException("File doesn't exist: " + p);
2151    }
2152    permission = applyUMask(permission,
2153        metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
2154            : UMaskApplyMode.ChangeExistingFile);
2155    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2156      // It's an implicit folder, need to materialize it.
2157      store.storeEmptyFolder(key, createPermissionStatus(permission));
2158    } else if (!metadata.getPermissionStatus().getPermission().
2159        equals(permission)) {
2160      store.changePermissionStatus(key, new PermissionStatus(
2161          metadata.getPermissionStatus().getUserName(),
2162          metadata.getPermissionStatus().getGroupName(),
2163          permission));
2164    }
2165  }
2166
2167  @Override
2168  public void setOwner(Path p, String username, String groupname)
2169      throws IOException {
2170    Path absolutePath = makeAbsolute(p);
2171    String key = pathToKey(absolutePath);
2172    FileMetadata metadata = store.retrieveMetadata(key);
2173    if (metadata == null) {
2174      throw new FileNotFoundException("File doesn't exist: " + p);
2175    }
2176    PermissionStatus newPermissionStatus = new PermissionStatus(
2177        username == null ?
2178            metadata.getPermissionStatus().getUserName() : username,
2179        groupname == null ?
2180            metadata.getPermissionStatus().getGroupName() : groupname,
2181        metadata.getPermissionStatus().getPermission());
2182    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2183      // It's an implicit folder, need to materialize it.
2184      store.storeEmptyFolder(key, newPermissionStatus);
2185    } else {
2186      store.changePermissionStatus(key, newPermissionStatus);
2187    }
2188  }
2189
2190  @Override
2191  public synchronized void close() throws IOException {
2192    if (isClosed) {
2193      return;
2194    }
2195
2196    // Call the base close() to close any resources there.
2197    super.close();
2198    // Close the store to close any resources there - e.g. the bandwidth
2199    // updater thread would be stopped at this time.
2200    store.close();
2201    // Notify the metrics system that this file system is closed, which may
2202    // trigger one final metrics push to get the accurate final file system
2203    // metrics out.
2204
2205    long startTime = System.currentTimeMillis();
2206
2207    AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName);
2208    AzureFileSystemMetricsSystem.fileSystemClosed();
2209
2210    if (LOG.isDebugEnabled()) {
2211        LOG.debug("Submitting metrics when file system closed took "
2212                + (System.currentTimeMillis() - startTime) + " ms.");
2213    }
2214    isClosed = true;
2215  }
2216
2217  /**
2218   * A handler that defines what to do with blobs whose upload was
2219   * interrupted.
2220   */
2221  private abstract class DanglingFileHandler {
2222    abstract void handleFile(FileMetadata file, FileMetadata tempFile)
2223      throws IOException;
2224  }
2225
2226  /**
2227   * Handler implementation for just deleting dangling files and cleaning
2228   * them up.
2229   */
2230  private class DanglingFileDeleter extends DanglingFileHandler {
2231    @Override
2232    void handleFile(FileMetadata file, FileMetadata tempFile)
2233        throws IOException {
2234      if (LOG.isDebugEnabled()) {
2235        LOG.debug("Deleting dangling file " + file.getKey());
2236      }
2237      store.delete(file.getKey());
2238      store.delete(tempFile.getKey());
2239    }
2240  }
2241
2242  /**
2243   * Handler implementation for just moving dangling files to recovery
2244   * location (/lost+found).
2245   */
2246  private class DanglingFileRecoverer extends DanglingFileHandler {
2247    private final Path destination;
2248
2249    DanglingFileRecoverer(Path destination) {
2250      this.destination = destination;
2251    }
2252
2253    @Override
2254    void handleFile(FileMetadata file, FileMetadata tempFile)
2255        throws IOException {
2256      if (LOG.isDebugEnabled()) {
2257        LOG.debug("Recovering " + file.getKey());
2258      }
2259      // Move to the final destination
2260      String finalDestinationKey =
2261          pathToKey(new Path(destination, file.getKey()));
2262      store.rename(tempFile.getKey(), finalDestinationKey);
2263      if (!finalDestinationKey.equals(file.getKey())) {
2264        // Delete the empty link file now that we've restored it.
2265        store.delete(file.getKey());
2266      }
2267    }
2268  }
2269
2270  /**
2271   * Check if a path has colons in its name
2272   */
2273  private boolean containsColon(Path p) {
2274    return p.toUri().getPath().toString().contains(":");
2275  }
2276
2277  /**
2278   * Implements recover and delete (-move and -delete) behaviors for handling
2279   * dangling files (blobs whose upload was interrupted).
2280   * 
2281   * @param root
2282   *          The root path to check from.
2283   * @param handler
2284   *          The handler that deals with dangling files.
2285   */
2286  private void handleFilesWithDanglingTempData(Path root,
2287      DanglingFileHandler handler) throws IOException {
2288    // Calculate the cut-off for when to consider a blob to be dangling.
2289    long cutoffForDangling = new Date().getTime()
2290        - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME,
2291            AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
2292    // Go over all the blobs under the given root and look for blobs to
2293    // recover.
2294    String priorLastKey = null;
2295    do {
2296      PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
2297          AZURE_UNBOUNDED_DEPTH, priorLastKey);
2298
2299      for (FileMetadata file : listing.getFiles()) {
2300        if (!file.isDir()) { // We don't recover directory blobs
2301          // See if this blob has a link in it (meaning it's a place-holder
2302          // blob for when the upload to the temp blob is complete).
2303          String link = store.getLinkInFileMetadata(file.getKey());
2304          if (link != null) {
2305            // It has a link, see if the temp blob it is pointing to is
2306            // existent and old enough to be considered dangling.
2307            FileMetadata linkMetadata = store.retrieveMetadata(link);
2308            if (linkMetadata != null
2309                && linkMetadata.getLastModified() >= cutoffForDangling) {
2310              // Found one!
2311              handler.handleFile(file, linkMetadata);
2312            }
2313          }
2314        }
2315      }
2316      priorLastKey = listing.getPriorLastKey();
2317    } while (priorLastKey != null);
2318  }
2319
2320  /**
2321   * Looks under the given root path for any blob that are left "dangling",
2322   * meaning that they are place-holder blobs that we created while we upload
2323   * the data to a temporary blob, but for some reason we crashed in the middle
2324   * of the upload and left them there. If any are found, we move them to the
2325   * destination given.
2326   * 
2327   * @param root
2328   *          The root path to consider.
2329   * @param destination
2330   *          The destination path to move any recovered files to.
2331   * @throws IOException
2332   */
2333  public void recoverFilesWithDanglingTempData(Path root, Path destination)
2334      throws IOException {
2335    if (LOG.isDebugEnabled()) {
2336      LOG.debug("Recovering files with dangling temp data in " + root);
2337    }
2338    handleFilesWithDanglingTempData(root,
2339        new DanglingFileRecoverer(destination));
2340  }
2341
2342  /**
2343   * Looks under the given root path for any blob that are left "dangling",
2344   * meaning that they are place-holder blobs that we created while we upload
2345   * the data to a temporary blob, but for some reason we crashed in the middle
2346   * of the upload and left them there. If any are found, we delete them.
2347   * 
2348   * @param root
2349   *          The root path to consider.
2350   * @throws IOException
2351   */
2352  public void deleteFilesWithDanglingTempData(Path root) throws IOException {
2353    if (LOG.isDebugEnabled()) {
2354      LOG.debug("Deleting files with dangling temp data in " + root);
2355    }
2356    handleFilesWithDanglingTempData(root, new DanglingFileDeleter());
2357  }
2358
2359  @Override
2360  protected void finalize() throws Throwable {
2361    LOG.debug("finalize() called.");
2362    close();
2363    super.finalize();
2364  }
2365
2366  /**
2367   * Encode the key with a random prefix for load balancing in Azure storage.
2368   * Upload data to a random temporary file then do storage side renaming to
2369   * recover the original key.
2370   * 
2371   * @param aKey
2372   * @return Encoded version of the original key.
2373   */
2374  private static String encodeKey(String aKey) {
2375    // Get the tail end of the key name.
2376    //
2377    String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1,
2378        aKey.length());
2379
2380    // Construct the randomized prefix of the file name. The prefix ensures the
2381    // file always drops into the same folder but with a varying tail key name.
2382    String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR
2383        + UUID.randomUUID().toString();
2384
2385    // Concatenate the randomized prefix with the tail of the key name.
2386    String randomizedKey = filePrefix + fileName;
2387
2388    // Return to the caller with the randomized key.
2389    return randomizedKey;
2390  }
2391}