// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef IMPALA_RUNTIME_TMP_FILE_MGR_H
#define IMPALA_RUNTIME_TMP_FILE_MGR_H

#include <functional>
#include <memory>
#include <utility>

#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>

#include "common/object-pool.h"
#include "common/status.h"
#include "gen-cpp/Types_types.h" // for TUniqueId
#include "util/condition-variable.h"
#include "util/mem-range.h"
#include "util/metrics-fwd.h"
#include "util/openssl-util.h"
#include "util/runtime-profile.h"
#include "util/spinlock.h"

namespace impala {
namespace io {
  class DiskIoMgr;
  class RequestContext;
  class ScanRange;
  class WriteRange;
}

/// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files
/// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch
/// directories across multiple devices, configured via the --scratch_dirs option.
/// TmpFileMgr manages I/O to scratch files in order to abstract away details of which
/// files are allocated and recovery from certain I/O errors. I/O is done via DiskIoMgr.
/// TmpFileMgr encrypts data written to disk if enabled by the --disk_spill_encryption
/// command-line flag.
///
/// FileGroups manage scratch space across multiple devices. To write to scratch space,
/// first a FileGroup is created, then FileGroup::Write() is called to asynchronously
/// write a memory buffer to one of the scratch files. FileGroup::Write() returns a
/// WriteHandle, which is used by the caller to identify that write operation. The
/// caller is notified when the asynchronous write completes via a callback, after which
/// the caller can use the WriteHandle to read back the data.
///
/// Each WriteHandle is backed by a range of data in a scratch file. The first call to
/// Write() will create files for the FileGroup with unique filenames on the configured
/// temporary devices. At most one directory per device is used (unless overridden for
/// testing). The file range of a WriteHandle can be replaced with a different one if
/// a write error is encountered and the data instead needs to be written to a different
/// disk.
///
/// Free Space Management:
/// Free space is managed within a FileGroup: once a WriteHandle is destroyed, the file
/// range backing it can be recycled for a different WriteHandle. Scratch file ranges
/// are grouped into size classes, each for a power-of-two number of bytes. Free file
/// ranges of each size class are managed separately (i.e. there is no splitting or
/// coalescing of ranges).
///
/// Resource Management:
/// TmpFileMgr provides some basic support for managing local disk space consumption.
/// A FileGroup can be created with a limit on the total number of bytes allocated across
/// all files. Writes that would exceed the limit fail with an error status.
///
/// TODO: IMPALA-4683: we could implement smarter handling of failures, e.g. to
/// temporarily blacklist devices that show I/O errors.
class TmpFileMgr {
 public:
  class File; // Needs to be public for TmpFileMgrTest.
  class WriteHandle;

  /// DeviceId is an internal unique identifier for a temporary device managed by
  /// TmpFileMgr. DeviceIds in the range [0, num tmp devices) are allocated arbitrarily.
  /// Needs to be public for TmpFileMgrTest.
  typedef int DeviceId;

  /// Same typedef as io::WriteRange::WriteDoneCallback.
  typedef std::function<void(const Status&)> WriteDoneCallback;

  /// A configured temporary directory that TmpFileMgr allocates files in.
  struct TmpDir {
    TmpDir(const std::string& path, int64_t bytes_limit, IntGauge* bytes_used_metric)
      : path(path), bytes_limit(bytes_limit), bytes_used_metric(bytes_used_metric) {}

    /// Path to the temporary directory.
    const std::string path;

    /// Limit on bytes that should be written to this path. Set to maximum value
    /// of int64_t if there is no limit.
    int64_t const bytes_limit;

    /// The current bytes of scratch used for this temporary directory.
    IntGauge* const bytes_used_metric;
  };

  /// Represents a group of temporary files - one per disk with a scratch directory. The
  /// total allocated bytes of the group can be bound by setting the space allocation
  /// limit. The owner of the FileGroup object is responsible for calling the Close()
  /// method to delete all the files in the group.
  ///
  /// Public methods of FileGroup and WriteHandle are safe to call concurrently from
  /// multiple threads as long as different WriteHandle arguments are provided.
  class FileGroup {
   public:
    /// Initialize a new file group, which will create files using 'tmp_file_mgr'
    /// and perform I/O using 'io_mgr'. Adds counters to 'profile' to track scratch
    /// space used. 'unique_id' is a unique ID that is used to prefix any scratch file
    /// names. It is an error to create multiple FileGroups with the same 'unique_id'.
    /// 'bytes_limit' is the limit on the total file space to allocate.
    FileGroup(TmpFileMgr* tmp_file_mgr, io::DiskIoMgr* io_mgr, RuntimeProfile* profile,
        const TUniqueId& unique_id, int64_t bytes_limit = -1);

    ~FileGroup();

    /// Asynchronously writes 'buffer' to a temporary file of this file group. If there
    /// are multiple scratch files, this can write to any of them, and will attempt to
    /// recover from I/O errors on one file by writing to a different file. The memory
    /// referenced by 'buffer' must remain valid until the write completes. The callee
    /// may rewrite the data in 'buffer' in-place (e.g. to do in-place encryption or
    /// compression). The caller should not modify the data in 'buffer' until the write
    /// completes or is cancelled, otherwise invalid data may be written to disk.
    ///
    /// Returns an error if the scratch space cannot be allocated or the write cannot
    /// be started. Otherwise 'handle' is set and 'cb' will be called asynchronously from
    /// a different thread when the write completes successfully or unsuccessfully or is
    /// cancelled.
    ///
    /// 'handle' must be destroyed by passing the DestroyWriteHandle() or RestoreData().
    Status Write(MemRange buffer, WriteDoneCallback cb,
        std::unique_ptr<WriteHandle>* handle) WARN_UNUSED_RESULT;

    /// Synchronously read the data referenced by 'handle' from the temporary file into
    /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
    /// after a write successfully completes. Should not be called while an async read
    /// is in flight. Equivalent to calling ReadAsync() then WaitForAsyncRead().
    Status Read(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;

    /// Asynchronously read the data referenced by 'handle' from the temporary file into
    /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
    /// after a write successfully completes. WaitForAsyncRead() must be called before the
    /// data in the buffer is valid. Should not be called while an async read
    /// is already in flight.
    Status ReadAsync(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;

    /// Wait until the read started for 'handle' by ReadAsync() completes. 'buffer'
    /// should be the same buffer passed into ReadAsync(). Returns an error if the
    /// read fails. Retrying a failed read by calling ReadAsync() again is allowed.
    Status WaitForAsyncRead(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;

    /// Restore the original data in the 'buffer' passed to Write(), decrypting or
    /// decompressing as necessary. Returns an error if restoring the data fails.
    /// The write must not be in-flight - the caller is responsible for waiting for
    /// the write to complete.
    Status RestoreData(
        std::unique_ptr<WriteHandle> handle, MemRange buffer) WARN_UNUSED_RESULT;

    /// Wait for the in-flight I/Os to complete and destroy resources associated with
    /// 'handle'.
    void DestroyWriteHandle(std::unique_ptr<WriteHandle> handle);

    /// Calls Remove() on all the files in the group and deletes them.
    void Close();

    std::string DebugString();

    const TUniqueId& unique_id() const { return unique_id_; }

    TmpFileMgr* tmp_file_mgr() const { return tmp_file_mgr_; }

   private:
    friend class File;
    friend class TmpFileMgrTest;

    /// Initializes the file group with one temporary file per disk with a scratch
    /// directory. Returns OK if at least one temporary file could be created.
    /// Returns an error if no temporary files were successfully created. Must only be
    /// called once. Must be called with 'lock_' held.
    Status CreateFiles() WARN_UNUSED_RESULT;

    /// Allocate 'num_bytes' bytes in a temporary file. Try multiple disks if error
    /// occurs. Returns an error only if no temporary files are usable or the scratch
    /// limit is exceeded. Must be called without 'lock_' held.
    Status AllocateSpace(
        int64_t num_bytes, File** tmp_file, int64_t* file_offset) WARN_UNUSED_RESULT;

    /// Add the scratch range from 'handle' to 'free_ranges_' and destroy handle. Must be
    /// called without 'lock_' held.
    void RecycleFileRange(std::unique_ptr<WriteHandle> handle);

    /// Called when the DiskIoMgr write completes for 'handle'. On error, will attempt
    /// to retry the write. On success or if the write can't be retried, calls
    /// handle->WriteComplete().
    void WriteComplete(WriteHandle* handle, const Status& write_status);

    /// Handles a write error. Logs the write error and blacklists the device for this
    /// file group if the cause was an I/O error. Blacklisting limits the number of times
    /// a write is retried because each device will only be tried once. Returns OK if it
    /// successfully reissued the write. Returns an error status if the original error
    /// was unrecoverable or an unrecoverable error is encountered when reissuing the
    /// write. The error status will include all previous I/O errors in its details.
    Status RecoverWriteError(
        WriteHandle* handle, const Status& write_status) WARN_UNUSED_RESULT;

    /// Return a SCRATCH_ALLOCATION_FAILED error with the appropriate information,
    /// including scratch directories, the amount of scratch allocated and previous
    /// errors that caused this failure. If some directories were at capacity,
    /// but had not encountered an error, the indices of these directories in
    /// tmp_file_mgr_->tmp_dir_ should be included in 'at_capacity_dirs'.
    /// 'lock_' must be held by caller.
    Status ScratchAllocationFailedStatus(const std::vector<int>& at_capacity_dirs);

    /// The TmpFileMgr it is associated with.
    TmpFileMgr* const tmp_file_mgr_;

    /// DiskIoMgr used for all I/O to temporary files.
    io::DiskIoMgr* const io_mgr_;

    /// I/O context used for all reads and writes. Registered in constructor.
    std::unique_ptr<io::RequestContext> io_ctx_;

    /// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be
    /// touched by DiskIoMgr even after the scan is finished.
    /// TODO: IMPALA-4249: remove once lifetime of ScanRange objects is better defined.
    ObjectPool scan_range_pool_;

    /// Unique across all FileGroups. Used to prefix file names.
    const TUniqueId unique_id_;

    /// Max write space allowed (-1 means no limit).
    const int64_t bytes_limit_;

    /// Number of write operations (includes writes started but not yet complete).
    RuntimeProfile::Counter* const write_counter_;

    /// Number of bytes written to disk (includes writes started but not yet complete).
    RuntimeProfile::Counter* const bytes_written_counter_;

    /// Number of read operations (includes reads started but not yet complete).
    RuntimeProfile::Counter* const read_counter_;

    /// Number of bytes read from disk (includes reads started but not yet complete).
    RuntimeProfile::Counter* const bytes_read_counter_;

    /// Amount of scratch space allocated in bytes.
    RuntimeProfile::Counter* const scratch_space_bytes_used_counter_;

    /// Time spent waiting for disk reads.
    RuntimeProfile::Counter* const disk_read_timer_;

    /// Time spent in disk spill encryption, decryption, and integrity checking.
    RuntimeProfile::Counter* encryption_timer_;

    /// Protects below members.
    SpinLock lock_;

    /// List of files representing the FileGroup.
    std::vector<std::unique_ptr<File>> tmp_files_;

    /// Total space allocated in this group's files.
    int64_t current_bytes_allocated_;

    /// Index into 'tmp_files' denoting the file to which the next temporary file range
    /// should be allocated from. Used to implement round-robin allocation from temporary
    /// files.
    int next_allocation_index_;

    /// Each vector in free_ranges_[i] is a vector of File/offset pairs for free scratch
    /// ranges of length 2^i bytes. Has 64 entries so that every int64_t length has a
    /// valid list associated with it.
    std::vector<std::vector<std::pair<File*, int64_t>>> free_ranges_;

    /// Errors encountered when creating/writing scratch files. We store the history so
    /// that we can report the original cause of the scratch errors if we run out of
    /// devices to write to.
    std::vector<Status> scratch_errors_;
  };

  /// A handle to a write operation, backed by a range of a temporary file. The operation
  /// is either in-flight or has completed. If it completed with no error and wasn't
  /// cancelled then the data is in the file and can be read back.
  ///
  /// WriteHandle is returned from FileGroup::Write(). After the write completes, the
  /// handle can be passed to FileGroup::Read() to read back the data zero or more times.
  /// FileGroup::DestroyWriteHandle() can be called at any time to destroy the handle and
  /// allow reuse of the scratch file range written to. Alternatively,
  /// FileGroup::RestoreData() can be called to reverse the effects of FileGroup::Write()
  /// by destroying the handle and restoring the original data to the buffer, so long as
  /// the data in the buffer was not modified by the caller.
  ///
  /// Public methods of WriteHandle are safe to call concurrently from multiple threads.
  class WriteHandle {
   public:
    /// The write must be destroyed by passing it to FileGroup - destroying it before
    /// the write completes is an error.
    ~WriteHandle();

    /// Cancel any in-flight read synchronously.
    void CancelRead();

    /// Path of temporary file backing the block. Intended for use in testing.
    /// Returns empty string if no backing file allocated.
    std::string TmpFilePath() const;

    /// The length of the write range in bytes.
    int64_t len() const;

    std::string DebugString();

   private:
    friend class FileGroup;
    friend class TmpFileMgrTest;

    WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb);

    /// Starts a write of 'buffer' to 'offset' of 'file'. 'write_in_flight_' must be false
    /// before calling. After returning, 'write_in_flight_' is true on success or false on
    /// failure and 'is_cancelled_' is set to true on failure.
    Status Write(io::RequestContext* io_ctx, File* file,
        int64_t offset, MemRange buffer,
        WriteDoneCallback callback) WARN_UNUSED_RESULT;

    /// Retry the write after the initial write failed with an error, instead writing to
    /// 'offset' of 'file'. 'write_in_flight_' must be true before calling.
    /// After returning, 'write_in_flight_' is true on success or false on failure.
    Status RetryWrite(io::RequestContext* io_ctx, File* file,
        int64_t offset) WARN_UNUSED_RESULT;

    /// Called when the write has completed successfully or not. Sets 'write_in_flight_'
    /// then calls 'cb_'.
    void WriteComplete(const Status& write_status);

    /// Cancels any in-flight writes or reads. Reads are cancelled synchronously and
    /// writes are cancelled asynchronously. After Cancel() is called, writes are not
    /// retried. The write callback may be called with a CANCELLED_INTERNALLY status
    /// (unless it succeeded or encountered a different error first).
    void Cancel();

    /// Blocks until the write completes either successfully or unsuccessfully.
    /// May return before the write callback has been called.
    void WaitForWrite();

    /// Encrypts the data in 'buffer' in-place and computes 'hash_'.
    Status EncryptAndHash(MemRange buffer) WARN_UNUSED_RESULT;

    /// Verifies the integrity hash and decrypts the contents of 'buffer' in place.
    Status CheckHashAndDecrypt(MemRange buffer) WARN_UNUSED_RESULT;

    /// Callback to be called when the write completes.
    WriteDoneCallback cb_;

    /// Reference to the FileGroup's 'encryption_timer_'.
    RuntimeProfile::Counter* encryption_timer_;

    /// The DiskIoMgr write range for this write.
    boost::scoped_ptr<io::WriteRange> write_range_;

    /// The temporary file being written to.
    File* file_;

    /// If --disk_spill_encryption is on, a AES 256-bit key and initialization vector.
    /// Regenerated for each write.
    EncryptionKey key_;

    /// If --disk_spill_encryption is on, our hash of the data being written. Filled in
    /// on writes; verified on reads. This is calculated _after_ encryption.
    IntegrityHash hash_;

    /// The scan range for the read that is currently in flight. NULL when no read is in
    /// flight.
    io::ScanRange* read_range_;

    /// Protects all fields below while 'write_in_flight_' is true. At other times, it is
    /// invalid to call WriteRange/FileGroup methods concurrently from multiple threads,
    /// so no locking is required. This is a terminal lock and should not be held while
    /// acquiring other locks or invoking 'cb_'.
    boost::mutex write_state_lock_;

    /// True if the the write has been cancelled (but is not necessarily complete).
    bool is_cancelled_;

    /// True if a write is in flight.
    bool write_in_flight_;

    /// Signalled when the write completes and 'write_in_flight_' becomes false, before
    /// 'cb_' is invoked.
    ConditionVariable write_complete_cv_;
  };

  TmpFileMgr();

  /// Creates the configured tmp directories. If multiple directories are specified per
  /// disk, only one is created and used. Must be called after DiskInfo::Init().
  Status Init(MetricGroup* metrics) WARN_UNUSED_RESULT;

  /// Custom initialization - initializes with the provided list of directories.
  /// If one_dir_per_device is true, only use one temporary directory per device.
  /// This interface is intended for testing purposes. 'tmp_dir_specifiers'
  /// use the command-line syntax, i.e. <path>[:<limit>]. The first variant takes
  /// a comma-separated list, the second takes a vector.
  Status InitCustom(const std::string& tmp_dirs_spec, bool one_dir_per_device,
      MetricGroup* metrics) WARN_UNUSED_RESULT;
  Status InitCustom(const std::vector<std::string>& tmp_dir_specifiers,
      bool one_dir_per_device, MetricGroup* metrics) WARN_UNUSED_RESULT;

  /// Return the scratch directory path for the device.
  std::string GetTmpDirPath(DeviceId device_id) const;

  /// Total number of devices with tmp directories that are active. There is one tmp
  /// directory per device.
  int NumActiveTmpDevices();

  /// Return vector with device ids of all tmp devices being actively used.
  /// I.e. those that haven't been blacklisted.
  std::vector<DeviceId> ActiveTmpDevices();

 private:
  friend class TmpFileMgrTest;

  /// Return a new File handle with a path based on file_group->unique_id. The file is
  /// associated with the 'file_group' and the file path is within the (single) scratch
  /// directory on the specified device id. The caller owns the returned handle and is
  /// responsible for deleting it. The file is not created - creation is deferred until
  /// the file is written.
  void NewFile(FileGroup* file_group, DeviceId device_id,
    std::unique_ptr<File>* new_file);

  bool initialized_;

  /// The paths of the created tmp directories.
  std::vector<TmpDir> tmp_dirs_;

  /// Metrics to track active scratch directories.
  IntGauge* num_active_scratch_dirs_metric_;
  SetMetric<std::string>* active_scratch_dirs_metric_;

  /// Metrics to track the scratch space HWM.
  AtomicHighWaterMarkGauge* scratch_bytes_used_metric_;
};

}

#endif
