﻿// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
// 
//   http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using Microsoft.Azure.DataLake.Store;
using Microsoft.Azure.DataLake.Store.FileTransfer;
using Org.Apache.REEF.Tang.Annotations;

namespace Org.Apache.REEF.IO.FileSystem.AzureDataLake
{
    /// <summary>
    /// An IFileSystem implementation for Azure Data Lake Store.
    /// </summary>
    internal sealed class AzureDataLakeFileSystem : IFileSystem
    {
        private readonly IDataLakeStoreClient _client;
        private readonly AdlsClient _adlsClient;

        [Inject]
        private AzureDataLakeFileSystem(IDataLakeStoreClient client)
        {
            _client = client;
            _adlsClient = _client.GetAdlsClient();
        }

        /// <summary>
        /// Opens the given URI for reading
        /// </summary>
        /// <exception cref="AdlsException">If the URI couldn't be opened.</exception>
        public Stream Open(Uri fileUri)
        {
            return _adlsClient.GetReadStream(fileUri.AbsolutePath);
        }

        /// <summary>
        /// Creates a new file under the given URI.
        /// </summary>
        /// <exception cref="AdlsException">If the URI couldn't be created.</exception>
        public Stream Create(Uri fileUri)
        {
            return _adlsClient.CreateFile(fileUri.AbsolutePath, IfExists.Overwrite);
        }

        /// <summary>
        /// Deletes the file under the given URI.
        /// </summary>
        /// <exception cref="IOException">If the specified file cannot be deleted</exception>
        public void Delete(Uri fileUri)
        {
            bool deleteStatus = _adlsClient.Delete(fileUri.AbsolutePath);
            if (!deleteStatus)
            {
                throw new IOException($"Cannot delete directory/file specified by {fileUri}");
            }
        }

        /// <summary>
        /// Determines whether a file exists under the given URI.
        /// </summary>
        public bool Exists(Uri fileUri)
        {
            return _adlsClient.CheckExists(fileUri.AbsolutePath);
        }

        /// <summary>
        /// Copies the file referenced by sourceUri to destinationUri.
        /// Note : This method reads from the input stream of sourceUri locally and
        /// writes to the output stream of destinationUri.
        /// This is time consuming and not recommended for large file transfers.
        /// </summary>
        /// <exception cref="IOException">If copy process encounters any exceptions</exception>
        public void Copy(Uri sourceUri, Uri destinationUri)
        {
            try
            {
                using (var readStream = Open(sourceUri))
                {
                    readStream.Position = 0;
                    using (var writeStream = Create(destinationUri))
                    {
                        readStream.CopyTo(writeStream);
                    }
                }
            }
            catch (Exception ex)
            {
                throw new IOException($"Error copying {sourceUri} to {destinationUri}", ex);
            }
        }

        /// <summary>
        /// Copies the remote file to a local file.
        /// </summary>
        /// <exception cref="IOException">If copy process encounters any exceptions</exception>
        public void CopyToLocal(Uri remoteFileUri, string localFileName)
        {
            TransferStatus status;
            try
            {
                status = _adlsClient.BulkDownload(remoteFileUri.AbsolutePath, localFileName); // throws KeyNotFoundException
            }
            catch (Exception ex)
            {
                throw new IOException($"Error in bulk download from {remoteFileUri} to {localFileName}", ex);
            }
            if (status.EntriesFailed.Count != 0)
            {
                var failedEntriesBuilder = new StringBuilder();
                for (int i = 0; i < status.EntriesFailed.Count && i < 50; i++)
                {
                    var entry = status.EntriesFailed[i];
                    failedEntriesBuilder.Append($"Entry {entry.EntryName} failed with error message {entry.Errors}. ");
                }
                throw new IOException($"{status.EntriesFailed.Count} entries failed to download. {failedEntriesBuilder.ToString()}");
            }
        }

        /// <summary>
        /// Copies the specified file to the remote location.
        /// </summary>
        /// <exception cref="IOException">If copy process encounters any exception</exception>
        public void CopyFromLocal(string localFileName, Uri remoteFileUri)
        {
            TransferStatus status;
            try
            {
                status = _adlsClient.BulkUpload(localFileName, remoteFileUri.AbsolutePath);
            }
            catch (Exception ex)
            {
                throw new IOException($"Error in bulk upload from {localFileName} to {remoteFileUri}", ex);
            }
            if (status.EntriesFailed.Count != 0)
            {
                var failedEntriesBuilder = new StringBuilder();
                for (int i = 0; i < status.EntriesFailed.Count && i < 50; i++)
                {
                    var entry = status.EntriesFailed[i];
                    failedEntriesBuilder.Append($"Entry {entry.EntryName} failed with error message {entry.Errors}. ");
                }
                throw new IOException($"{status.EntriesFailed.Count} entries failed to upload. {failedEntriesBuilder.ToString()}");
            }
        }

        /// <summary>
        /// Creates a new directory.
        /// </summary>
        /// <exception cref="IOException">If directory cannot be created</exception>
        public void CreateDirectory(Uri directoryUri)
        {
            bool createDirStatus = _adlsClient.CreateDirectory(directoryUri.AbsolutePath);
            if (!createDirStatus)
            {
                throw new IOException($"Cannot create directory specified by {directoryUri}");
            }
        }

        /// <summary>
        /// Deletes a directory.
        /// </summary>
        /// <exception cref="IOException">If directory cannot be deleted</exception>
        public void DeleteDirectory(Uri directoryUri)
        {
            bool deleteStatus = Exists(directoryUri) && 
                _adlsClient.GetDirectoryEntry(directoryUri.AbsolutePath).Type == DirectoryEntryType.DIRECTORY &&
                _adlsClient.DeleteRecursive(directoryUri.AbsolutePath);
            if (!deleteStatus)
            {
                throw new IOException($"Cannot delete directory specified by {directoryUri}");
            }
        }

        /// <summary>
        /// Get the children on the given URI, if that refers to a directory.
        /// </summary>
        /// <exception cref="IOException">If directory does not exist</exception>
        public IEnumerable<Uri> GetChildren(Uri directoryUri)
        {
            if (!Exists(directoryUri) || _adlsClient.GetDirectoryEntry(directoryUri.AbsolutePath).Type != DirectoryEntryType.DIRECTORY)
            {
                throw new IOException($"Cannot find directory specified by {directoryUri}");
            }

            return _adlsClient.EnumerateDirectory(directoryUri.AbsolutePath).Select(entry => CreateUriForPath(entry.FullName));
        }

        /// <summary>
        /// Create Uri from a given file path.
        /// The file path can be full with prefix or relative without prefix.
        /// If null is passed as the path, ArgumentException will be thrown.
        /// </summary>
        /// <exception cref="ArgumentNullException">If specified path is null</exception>
        public Uri CreateUriForPath(string path)
        {
            if (path == null)
            {
                throw new ArgumentNullException(nameof(path), "Specified path is null");
            }
            return new Uri($"{GetUriPrefix()}/{path.TrimStart('/')}");
        }

        /// <summary>
        /// Gets the FileStatus for remote file.
        /// </summary>
        /// <exception cref="ArgumentNullException">If remote file URI is null</exception>
        /// <returns>FileStatus</returns>
        public FileStatus GetFileStatus(Uri remoteFileUri)
        {
            if (remoteFileUri == null)
            {
                throw new ArgumentNullException(nameof(remoteFileUri), "Specified uri is null");
            }
            var entrySummary = _adlsClient.GetDirectoryEntry(remoteFileUri.AbsolutePath);
            if (!entrySummary.LastModifiedTime.HasValue)
            {
                throw new IOException($"File/Directory at {remoteFileUri} does not have a last modified time. It may have been deleted.");
            }

            return new FileStatus(entrySummary.LastModifiedTime.Value, entrySummary.Length);
        }

        private string GetUriPrefix()
        {
            return $"adl://{_client.AccountFQDN}";
        }
    }
}
