| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using System; |
| using System.Collections.Generic; |
| using System.IO; |
| using System.Linq; |
| using System.Threading.Tasks; |
| using Microsoft.WindowsAzure.Storage; |
| using Microsoft.WindowsAzure.Storage.Blob; |
| using Org.Apache.REEF.Tang.Annotations; |
| |
| namespace Org.Apache.REEF.IO.FileSystem.AzureBlob |
| { |
| /// <summary> |
| /// An IFileSystem implementation for Azure Blob storage. |
| /// This particular File System implementation only supports |
| /// block blob operations |
| /// </summary> |
| internal sealed class AzureBlockBlobFileSystem : IFileSystem |
| { |
| private readonly ICloudBlobClient _client; |
| |
| [Inject] |
| private AzureBlockBlobFileSystem(ICloudBlobClient client) |
| { |
| _client = client; |
| } |
| |
| /// <summary> |
| /// Returns a Stream object to the blob specified by the fileUri. |
| /// </summary> |
| public Stream Open(Uri fileUri) |
| { |
| return _client.GetBlockBlobReference(fileUri).Open(); |
| } |
| |
| /// <summary> |
| /// Creates a blob for the specified fileUri and returns a write Stream object to it. |
| /// </summary> |
| public Stream Create(Uri fileUri) |
| { |
| var blockBlob = _client.GetBlockBlobReference(fileUri); |
| _client.GetContainerReference(blockBlob.Blob.Container.Name).CreateIfNotExists(); |
| return blockBlob.Create(); |
| } |
| |
| /// <summary> |
| /// Deletes a CloudBlockBlob |
| /// <see cref="IFileSystem.Delete"/> |
| /// </summary> |
| public void Delete(Uri fileUri) |
| { |
| _client.GetBlockBlobReference(fileUri).Delete(); |
| } |
| |
| /// <summary> |
| /// Checks if a CloudBlockBlob exists. |
| /// <see cref="IFileSystem.Exists"/> |
| /// </summary> |
| public bool Exists(Uri fileUri) |
| { |
| return _client.GetBlockBlobReference(fileUri).Exists(); |
| } |
| |
| /// <summary> |
| /// Copies to a CloudBlockBlob from another CloudBlockBlob. |
| /// <see cref="IFileSystem.Copy"/> |
| /// </summary> |
| public void Copy(Uri sourceUri, Uri destinationUri) |
| { |
| var blockBlob = _client.GetBlockBlobReference(destinationUri); |
| _client.GetContainerReference(blockBlob.Blob.Container.Name).CreateIfNotExists(); |
| blockBlob.StartCopy(sourceUri); |
| blockBlob.FetchAttributes(); |
| |
| while (blockBlob.CopyState.Status == CopyStatus.Pending) |
| { |
| Task.Delay(TimeSpan.FromMilliseconds(50)); |
| blockBlob.FetchAttributes(); |
| } |
| } |
| |
| /// <summary> |
| /// Copies from a CloudBlockBlob to a local file. |
| /// <see cref="IFileSystem.CopyToLocal"/> |
| /// </summary> |
| public void CopyToLocal(Uri remoteFileUri, string localName) |
| { |
| _client.GetBlockBlobReference(remoteFileUri).DownloadToFile(localName, FileMode.CreateNew); |
| } |
| |
| /// <summary> |
| // Copies to a CloudBlockBlob from a local file. |
| /// <see cref="IFileSystem.CopyFromLocal"/> |
| /// </summary> |
| public void CopyFromLocal(string localFileName, Uri remoteFileUri) |
| { |
| var blockBlob = _client.GetBlockBlobReference(remoteFileUri); |
| _client.GetContainerReference(blockBlob.Blob.Container.Name).CreateIfNotExists(); |
| blockBlob.UploadFromFile(localFileName, FileMode.Open); |
| } |
| |
| /// <summary> |
| /// Azure blobs does not have the sense of the existence of a "directory." |
| /// We will not throw an error, but we will not do anything either. |
| /// </summary> |
| public void CreateDirectory(Uri directoryUri) |
| { |
| } |
| |
| /// <summary> |
| /// Recursively deletes blobs under a specified "directory URI." |
| /// If only the container is specified, the entire container is deleted. |
| /// </summary> |
| public void DeleteDirectory(Uri directoryUri) |
| { |
| var uriSplit = directoryUri.AbsolutePath.Split(new[] { "/" }, StringSplitOptions.RemoveEmptyEntries); |
| if (!uriSplit.Any()) |
| { |
| throw new StorageException(string.Format("URI {0} must contain at least the container.", directoryUri)); |
| } |
| |
| var containerName = uriSplit[0]; |
| var container = _client.GetContainerReference(containerName); |
| if (uriSplit.Length == 1) |
| { |
| container.DeleteIfExists(); |
| return; |
| } |
| |
| var directory = container.GetDirectoryReference(uriSplit[1]); |
| |
| for (var i = 2; i < uriSplit.Length; i++) |
| { |
| directory = directory.GetDirectoryReference(uriSplit[i]); |
| } |
| |
| foreach (var blob in directory.ListBlobs(true).OfType<ICloudBlob>()) |
| { |
| blob.DeleteIfExistsAsync().Wait(); |
| } |
| } |
| |
| /// <summary> |
| /// Gets the children of the blob "directory." |
| /// </summary> |
| public IEnumerable<Uri> GetChildren(Uri directoryUri) |
| { |
| BlobContinuationToken blobContinuationToken = null; |
| var path = directoryUri.AbsolutePath.Trim('/'); |
| |
| do |
| { |
| var listing = _client.ListBlobsSegmented(path, false, |
| BlobListingDetails.None, null, |
| blobContinuationToken, new BlobRequestOptions(), new OperationContext()); |
| |
| if (listing.Results != null) |
| { |
| foreach (var listBlobItem in listing.Results) |
| { |
| yield return listBlobItem.Uri; |
| } |
| } |
| |
| blobContinuationToken = listing.ContinuationToken; |
| } |
| while (blobContinuationToken != null); |
| } |
| |
| /// <summary> |
| /// Creates a Uri using the relative path to the remote file (including the container), |
| /// getting the absolute URI from the Blob client's base URI. |
| /// </summary> |
| /// <param name="path">The relative path to the remote file, including the container</param> |
| /// <returns>The URI to the remote file</returns> |
| public Uri CreateUriForPath(string path) |
| { |
| return new Uri(_client.BaseUri.AbsoluteUri.TrimEnd('/') + '/' + path.Trim('/')); |
| } |
| |
| /// <summary> |
| /// Gets the FileStatus based on reports from Blob. |
| /// </summary> |
| public FileStatus GetFileStatus(Uri remoteFileUri) |
| { |
| var blobReference = _client.GetBlockBlobReference(remoteFileUri); |
| if (!blobReference.Exists()) |
| { |
| throw new StorageException("Unable to find blob at " + remoteFileUri); |
| } |
| |
| blobReference.FetchAttributes(); |
| |
| var lastModifiedTime = blobReference.Properties.LastModified; |
| if (!lastModifiedTime.HasValue) |
| { |
| throw new StorageException("Blob at " + remoteFileUri + " does not have a last modified" + |
| "time. It may have been deleted."); |
| } |
| |
| return new FileStatus(lastModifiedTime.Value.DateTime, blobReference.Properties.Length); |
| } |
| } |
| } |