| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using System; |
| using System.Collections.Generic; |
| using System.IO; |
| using System.Linq; |
| using System.Text; |
| using Microsoft.Azure.DataLake.Store; |
| using Microsoft.Azure.DataLake.Store.FileTransfer; |
| using Org.Apache.REEF.Tang.Annotations; |
| |
| namespace Org.Apache.REEF.IO.FileSystem.AzureDataLake |
| { |
| /// <summary> |
| /// An IFileSystem implementation for Azure Data Lake Store. |
| /// </summary> |
| internal sealed class AzureDataLakeFileSystem : IFileSystem |
| { |
| private readonly IDataLakeStoreClient _client; |
| private readonly AdlsClient _adlsClient; |
| |
| [Inject] |
| private AzureDataLakeFileSystem(IDataLakeStoreClient client) |
| { |
| _client = client; |
| _adlsClient = _client.GetAdlsClient(); |
| } |
| |
| /// <summary> |
| /// Opens the given URI for reading |
| /// </summary> |
| /// <exception cref="AdlsException">If the URI couldn't be opened.</exception> |
| public Stream Open(Uri fileUri) |
| { |
| return _adlsClient.GetReadStream(fileUri.AbsolutePath); |
| } |
| |
| /// <summary> |
| /// Creates a new file under the given URI. |
| /// </summary> |
| /// <exception cref="AdlsException">If the URI couldn't be created.</exception> |
| public Stream Create(Uri fileUri) |
| { |
| return _adlsClient.CreateFile(fileUri.AbsolutePath, IfExists.Overwrite); |
| } |
| |
| /// <summary> |
| /// Deletes the file under the given URI. |
| /// </summary> |
| /// <exception cref="IOException">If the specified file cannot be deleted</exception> |
| public void Delete(Uri fileUri) |
| { |
| bool deleteStatus = _adlsClient.Delete(fileUri.AbsolutePath); |
| if (!deleteStatus) |
| { |
| throw new IOException($"Cannot delete directory/file specified by {fileUri}"); |
| } |
| } |
| |
| /// <summary> |
| /// Determines whether a file exists under the given URI. |
| /// </summary> |
| public bool Exists(Uri fileUri) |
| { |
| return _adlsClient.CheckExists(fileUri.AbsolutePath); |
| } |
| |
| /// <summary> |
| /// Copies the file referenced by sourceUri to destinationUri. |
| /// Note : This method reads from the input stream of sourceUri locally and |
| /// writes to the output stream of destinationUri. |
| /// This is time consuming and not recommended for large file transfers. |
| /// </summary> |
| /// <exception cref="IOException">If copy process encounters any exceptions</exception> |
| public void Copy(Uri sourceUri, Uri destinationUri) |
| { |
| try |
| { |
| using (var readStream = Open(sourceUri)) |
| { |
| readStream.Position = 0; |
| using (var writeStream = Create(destinationUri)) |
| { |
| readStream.CopyTo(writeStream); |
| } |
| } |
| } |
| catch (Exception ex) |
| { |
| throw new IOException($"Error copying {sourceUri} to {destinationUri}", ex); |
| } |
| } |
| |
| /// <summary> |
| /// Copies the remote file to a local file. |
| /// </summary> |
| /// <exception cref="IOException">If copy process encounters any exceptions</exception> |
| public void CopyToLocal(Uri remoteFileUri, string localFileName) |
| { |
| TransferStatus status; |
| try |
| { |
| status = _adlsClient.BulkDownload(remoteFileUri.AbsolutePath, localFileName); // throws KeyNotFoundException |
| } |
| catch (Exception ex) |
| { |
| throw new IOException($"Error in bulk download from {remoteFileUri} to {localFileName}", ex); |
| } |
| if (status.EntriesFailed.Count != 0) |
| { |
| var failedEntriesBuilder = new StringBuilder(); |
| for (int i = 0; i < status.EntriesFailed.Count && i < 50; i++) |
| { |
| var entry = status.EntriesFailed[i]; |
| failedEntriesBuilder.Append($"Entry {entry.EntryName} failed with error message {entry.Errors}. "); |
| } |
| throw new IOException($"{status.EntriesFailed.Count} entries failed to download. {failedEntriesBuilder.ToString()}"); |
| } |
| } |
| |
| /// <summary> |
| /// Copies the specified file to the remote location. |
| /// </summary> |
| /// <exception cref="IOException">If copy process encounters any exception</exception> |
| public void CopyFromLocal(string localFileName, Uri remoteFileUri) |
| { |
| TransferStatus status; |
| try |
| { |
| status = _adlsClient.BulkUpload(localFileName, remoteFileUri.AbsolutePath); |
| } |
| catch (Exception ex) |
| { |
| throw new IOException($"Error in bulk upload from {localFileName} to {remoteFileUri}", ex); |
| } |
| if (status.EntriesFailed.Count != 0) |
| { |
| var failedEntriesBuilder = new StringBuilder(); |
| for (int i = 0; i < status.EntriesFailed.Count && i < 50; i++) |
| { |
| var entry = status.EntriesFailed[i]; |
| failedEntriesBuilder.Append($"Entry {entry.EntryName} failed with error message {entry.Errors}. "); |
| } |
| throw new IOException($"{status.EntriesFailed.Count} entries failed to upload. {failedEntriesBuilder.ToString()}"); |
| } |
| } |
| |
| /// <summary> |
| /// Creates a new directory. |
| /// </summary> |
| /// <exception cref="IOException">If directory cannot be created</exception> |
| public void CreateDirectory(Uri directoryUri) |
| { |
| bool createDirStatus = _adlsClient.CreateDirectory(directoryUri.AbsolutePath); |
| if (!createDirStatus) |
| { |
| throw new IOException($"Cannot create directory specified by {directoryUri}"); |
| } |
| } |
| |
| /// <summary> |
| /// Deletes a directory. |
| /// </summary> |
| /// <exception cref="IOException">If directory cannot be deleted</exception> |
| public void DeleteDirectory(Uri directoryUri) |
| { |
| bool deleteStatus = Exists(directoryUri) && |
| _adlsClient.GetDirectoryEntry(directoryUri.AbsolutePath).Type == DirectoryEntryType.DIRECTORY && |
| _adlsClient.DeleteRecursive(directoryUri.AbsolutePath); |
| if (!deleteStatus) |
| { |
| throw new IOException($"Cannot delete directory specified by {directoryUri}"); |
| } |
| } |
| |
| /// <summary> |
| /// Get the children on the given URI, if that refers to a directory. |
| /// </summary> |
| /// <exception cref="IOException">If directory does not exist</exception> |
| public IEnumerable<Uri> GetChildren(Uri directoryUri) |
| { |
| if (!Exists(directoryUri) || _adlsClient.GetDirectoryEntry(directoryUri.AbsolutePath).Type != DirectoryEntryType.DIRECTORY) |
| { |
| throw new IOException($"Cannot find directory specified by {directoryUri}"); |
| } |
| |
| return _adlsClient.EnumerateDirectory(directoryUri.AbsolutePath).Select(entry => CreateUriForPath(entry.FullName)); |
| } |
| |
| /// <summary> |
| /// Create Uri from a given file path. |
| /// The file path can be full with prefix or relative without prefix. |
| /// If null is passed as the path, ArgumentException will be thrown. |
| /// </summary> |
| /// <exception cref="ArgumentNullException">If specified path is null</exception> |
| public Uri CreateUriForPath(string path) |
| { |
| if (path == null) |
| { |
| throw new ArgumentNullException(nameof(path), "Specified path is null"); |
| } |
| return new Uri($"{GetUriPrefix()}/{path.TrimStart('/')}"); |
| } |
| |
| /// <summary> |
| /// Gets the FileStatus for remote file. |
| /// </summary> |
| /// <exception cref="ArgumentNullException">If remote file URI is null</exception> |
| /// <returns>FileStatus</returns> |
| public FileStatus GetFileStatus(Uri remoteFileUri) |
| { |
| if (remoteFileUri == null) |
| { |
| throw new ArgumentNullException(nameof(remoteFileUri), "Specified uri is null"); |
| } |
| var entrySummary = _adlsClient.GetDirectoryEntry(remoteFileUri.AbsolutePath); |
| if (!entrySummary.LastModifiedTime.HasValue) |
| { |
| throw new IOException($"File/Directory at {remoteFileUri} does not have a last modified time. It may have been deleted."); |
| } |
| |
| return new FileStatus(entrySummary.LastModifiedTime.Value, entrySummary.Length); |
| } |
| |
| private string GetUriPrefix() |
| { |
| return $"adl://{_client.AccountFQDN}"; |
| } |
| } |
| } |