/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 **/

#ifndef QUICKSTEP_STORAGE_STORAGE_MANAGER_HPP_
#define QUICKSTEP_STORAGE_STORAGE_MANAGER_HPP_

#include <atomic>
#include <chrono>
#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "catalog/CatalogTypedefs.hpp"
#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED
#include "storage/CountedReference.hpp"

#ifdef QUICKSTEP_DISTRIBUTED
#include "storage/DataExchange.grpc.pb.h"
#endif

#include "storage/EvictionPolicy.hpp"
#include "storage/FileManager.hpp"
#include "storage/StorageBlob.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageConstants.hpp"
#include "threading/SpinSharedMutex.hpp"
#include "utility/Macros.hpp"
#include "utility/ShardedLockManager.hpp"

#include "gflags/gflags.h"
#include "gtest/gtest_prod.h"

#include "tmb/id_typedefs.h"

#ifdef QUICKSTEP_DISTRIBUTED
namespace grpc { class Channel; }
#endif

namespace tmb { class MessageBus; }

namespace quickstep {

DECLARE_int32(block_domain);
DECLARE_uint64(buffer_pool_slots);

class CatalogRelationSchema;

#ifdef QUICKSTEP_DISTRIBUTED
class PullResponse;
#endif

class StorageBlockLayout;

/** \addtogroup Storage
 *  @{
 */

/**
 * @brief A class which manages block storage in memory and is responsible for
 *        creating, saving, and loading StorageBlock and StorageBlob instances.
 **/
class StorageManager {
 public:
  /**
   * @brief Constructor.
   * @param storage_path The filesystem directory where blocks have persistent
   *        storage. This should end with a path-separator character.
   * @exception CorruptPersistentStorage The storage directory layout is not
   *            in the expected format.
   */
  explicit StorageManager(const std::string &storage_path)
      : StorageManager(storage_path,
                       FLAGS_block_domain,
                       FLAGS_buffer_pool_slots,
                       LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
                           2,
                           std::chrono::milliseconds(200))) {
  }

  /**
   * @brief Constructor.
   * @param storage_path The filesystem directory where blocks have persistent
   *        storage.
   * @param max_memory_usage The maximum amount of memory that the storage
   *                         manager should use for cached blocks in slots. If
   *                         an block is requested that is not currently in
   *                         memory and there are already max_memory_usage slots
   *                         in use in memory, then the storage manager will
   *                         attempt to evict enough blocks to make room for the
   *                         requested block; if it cannot evict enough blocks,
   *                         it will fetch the requested block anyway,
   *                         temporarily going over the memory limit.
   * @exception CorruptPersistentStorage The storage directory layout is not
   *            in the expected format.
   **/
  StorageManager(const std::string &storage_path,
                 const size_t max_memory_usage)
      : StorageManager(storage_path,
                       FLAGS_block_domain,
                       max_memory_usage,
                       LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
                           2,
                           std::chrono::milliseconds(200))) {
  }

#ifdef QUICKSTEP_DISTRIBUTED
  /**
   * @brief Constructor.
   * @param storage_path The filesystem directory where blocks have persistent
   *        storage.
   * @param block_domain The unique block domain.
   * @param block_locator_client_id The TMB client ID of the block locator.
   * @param bus A pointer to the TMB.
   *
   * @exception CorruptPersistentStorage The storage directory layout is not
   *            in the expected format.
   **/
  StorageManager(const std::string &storage_path,
                 const block_id_domain block_domain,
                 const tmb::client_id block_locator_client_id,
                 tmb::MessageBus *bus)
      : StorageManager(storage_path,
                       block_domain,
                       FLAGS_buffer_pool_slots,
                       LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
                           2,
                           std::chrono::milliseconds(200)),
                       block_locator_client_id,
                       bus) {
  }
#endif

  /**
   * @brief Constructor.
   * @param storage_path The filesystem directory where blocks have persistent
   *        storage.
   * @param block_domain The unique block domain.
   * @param max_memory_usage The maximum amount of memory that the storage
   *                         manager should use for cached blocks in slots. If
   *                         an block is requested that is not currently in
   *                         memory and there are already max_memory_usage slots
   *                         in use in memory, then the storage manager will
   *                         attempt to evict enough blocks to make room for the
   *                         requested block; if it cannot evict enough blocks,
   *                         it will fetch the requested block anyway,
   *                         temporarily going over the memory limit.
   * @param eviction_policy The eviction policy that the storage manager should
   *                        use to manage the cache. The storage manager takes
   *                        ownership of *eviction_policy.
   * @param block_locator_client_id The TMB client ID of the block locator.
   * @param bus A pointer to the TMB.
   *
   * @exception CorruptPersistentStorage The storage directory layout is not
   *            in the expected format.
   **/
  StorageManager(const std::string &storage_path,
                 const block_id_domain block_domain,
                 const size_t max_memory_usage,
                 EvictionPolicy *eviction_policy,
                 const tmb::client_id block_locator_client_id = tmb::kClientIdNone,
                 tmb::MessageBus *bus = nullptr);

  /**
   * @brief Destructor which also destroys all managed blocks.
   **/
  ~StorageManager();

  /**
   * @brief Determine the number of slots needed to store the specified number
   *        of bytes.
   * @note The specified number of slots may include some "extra" bytes, modulo
   *       the size of a slot in the memory pool.
   * @note This is mainly intended to help create StorageBlobs of the
   *       appropriate size. StorageBlocks have internal structures and
   *       metadata which require additional storage beyond the "raw" bytes for
   *       tuples.
   *
   * @param bytes The desired number of bytes.
   * @return The number of slots needed to store bytes.
   **/
  static std::size_t SlotsNeededForBytes(const std::size_t bytes) {
    return (bytes + kSlotSizeBytes - 1) / kSlotSizeBytes;
  }

  /**
   * @brief Determine the size of the memory pool managed by this
   *        StorageManager.
   * @note This is provided for informational purposes and provides a snapshot
   *       of the memory pool size at the time it is called. The memory pool
   *       may grow as needed during a call to createBlock() or loadBlock().
   *
   * @return The amount of allocated memory managed by this StorageManager in
   *         bytes.
   **/
  std::size_t getMemorySize() const {
    return kSlotSizeBytes * total_memory_usage_;
  }

  /**
   * @brief Return the upper limit on the number of buffer pool slots that the
   *        StorageManager can allocate. This number is specified during the
   *        initialization of the StorageManager. The size of each slot is
   *        kSlotSizeBytes.
   * @note This information is provided for informational purposes. The memory
   *       pool may grow larger than this upper limite temporarily, depending
   *       on the path that is followed in a call to createBlock() or
   *       loadBlock().
   *
   * @return The number of buffer pool slots managed by this StorageManager.
   **/
  std::size_t getMaxBufferPoolSlots() const {
    return max_memory_usage_;
  }

  /**
   * @brief Create a new empty block.
   *
   * @param relation The relation which the new block will belong to (you must
   *                 also call addBlock() on the relation).
   * @param layout The StorageBlockLayout to use for the new block.
   * @param numa_node The NUMA node on which the block should be created. The
   *                  default value is -1 and it means that the Catalog
   *                  Relation has no NUMAPlacementScheme associated with it
   *                  and hence the block will be created as per the OS policy.
   * @return The id of the newly-created block.
   **/
  block_id createBlock(const CatalogRelationSchema &relation,
                       const StorageBlockLayout &layout,
                       const int numa_node = -1);

  /**
   * @brief Create a new StorageBlob. The blob memory will initially be
   *        zero-filled.
   *
   * @param num_slots The size of the StorageBlob in slots. Must not exceed
   *        kAllocationChunkSizeSlots.
   * @param numa_node The NUMA node on which the blob should be created. The
   *                  default value is -1 and it means that the blob will be
   *                  created as per the default OS policy.
   * @return The id of the newly-created blob.
   **/
  block_id createBlob(const std::size_t num_slots, const int numa_node = -1);

  /**
   * @brief Check whether a StorageBlock or StorageBlob is loaded into memory.
   * @note This is provided for informational purposes and determines if the
   *       specified block is loaded at the moment the method is called. It is
   *       possible for the block to be loaded by a concurrent thread
   *       immediately after this method returns.
   *
   * @param block The id of the block.
   * @return Whether the block with the specified id is in memory.
   **/
  bool blockOrBlobIsLoaded(const block_id block) const;

  /**
   * @brief Save a block or blob in memory to the persistent storage.
   *
   * @param block The id of the block or blob to save.
   * @param force Force the block to the persistent storage, even if it is not
   *        dirty (by default, only actually write dirty blocks to the
   *        persistent storage).
   * @exception UnableToOpenFile The block's persistent storage file couldn't
   *            be opened for writing.
   * @exception FileWriteError An IO error occurred while writing the block's
   *            persistent storage file.
   *
   * @return False if the block is not found in the memory. True if the block is
   *         successfully saved to the persistent storage OR the block is clean
   *         and force is false.
   **/
  bool saveBlockOrBlob(const block_id block, const bool force = false);

  /**
   * @brief Delete a block or blob's file in the persistent storage. The block
   *        is automatically evicted.
   *
   * @param block The id of the block whose file will be deleted.
   **/
  void deleteBlockOrBlobFile(const block_id block);

  /**
   * @brief Get a block. If the block is not in memory it will be loaded.
   *
   * @param block The id of the block to get.
   * @param relation The Catalog Relation this block belongs to.
   * @param numa_node The NUMA node for placing this block. If the block is
   *                  already present in the buffer pool, this is ignored.
   *                  If set to -1, the default OS memory-allocation policy
   *                  will be used.
   * @return The block with the given id.
   * @exception OutOfMemory The system has run out of memory.
   **/
  BlockReference getBlock(const block_id block,
                          const CatalogRelationSchema &relation,
                          const int numa_node = -1) {
    return BlockReference(getBlockInternal(block, relation, numa_node));
  }

  /**
   * @brief Get a mutable pointer to a block. If the block is not in memory it
   *        will be loaded.
   *
   * @param block The id of the block to get.
   * @param relation The Catalog Relation this block belongs to.
   * @param numa_node The NUMA node for placing this block. If the block is
   *                  already present in the buffer pool, this is ignored.
   *                  If set to -1, the default OS memory-allocation policy
   *                  will be used.
   * @return The block with the given id.
   * @exception OutOfMemory The system has run out of memory.
   **/
  MutableBlockReference getBlockMutable(const block_id block,
                                        const CatalogRelationSchema &relation,
                                        const int numa_node = -1) {
    return getBlockInternal(block, relation, numa_node);
  }

  /**
   * @brief Get a blob. If the block is not in memory it will be loaded.
   *
   * @param block The id of the blob to get.
   * @param numa_node The NUMA node for placing this blob. If the blob is
   *                  already present in the buffer pool, this is ignored.
   *                  If set to -1, the default OS memory-allocation policy
   *                  will be used.
   * @return The blob with the given id.
   * @exception OutOfMemory The system has run out of memory.
   **/
  BlobReference getBlob(const block_id blob, const int numa_node = -1) {
    return BlobReference(getBlobInternal(blob, numa_node));
  }

  /**
   * @brief Get a mutable pointer to a blob. If the block is not in memory it
   *        will be loaded.
   *
   * @param block The id of the blob to get.
   * @param numa_node The NUMA node for placing this blob. If the blob is
   *                  already present in the buffer pool, this is ignored.
   *                  If set to -1, the default OS memory-allocation policy
   *                  will be used.
   * @return The blob with the given id.
   * @exception OutOfMemory The system has run out of memory.
   **/
  MutableBlobReference getBlobMutable(const block_id blob,
                                      const int numa_node = -1) {
    return getBlobInternal(blob, numa_node);
  }

  /**
   * @brief Check if a block or blob is loaded in memory AND is dirty.
   *
   * @param block The id of the block or blob.
   * @return True if it's both loaded and dirty, false otherwise.
   **/
  bool blockOrBlobIsLoadedAndDirty(const block_id block);

#ifdef QUICKSTEP_DISTRIBUTED
  /**
   * @brief Send BlockDomainToShiftbossIndexMessage to BlockLocator so that
   *        ForemanDistributed could take advantages of block locality info
   *        for a better scheduling policy.
   *
   * @param shiftboss_index The Shiftboss index.
   **/
  void sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index);

  /**
   * @brief Pull a block or a blob. Used by DataExchangerAsync.
   *
   * @param block The id of the block or blob.
   * @param response Where to store the pulled block content.
   **/
  void pullBlockOrBlob(const block_id block, PullResponse *response) const;
#endif

  /**
   * @brief Get the HDFS connector via libhdfs3.
   *
   * @return The HDFS connector.
   **/
  void* hdfs();

 private:
  struct BlockHandle {
    void *block_memory;
    std::size_t block_memory_size;  // size of block_memory in slots
    StorageBlockBase *block;
  };

#ifdef QUICKSTEP_DISTRIBUTED
  /**
   * @brief A class which connects to DataExchangerAsync to exchange data from
   *        remote peers.
   **/
  class DataExchangerClientAsync {
   public:
    /**
     * @brief Constructor.
     *
     * @param channel The RPC channel to connect DataExchangerAsync.
     * @param storage_manager The StorageManager to use.
     */
    DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
                             StorageManager *storage_manager);

    /**
     * @brief Pull a block or blob from a remote StorageManager.
     *
     * @param block The block or blob to pull.
     * @param numa_node The NUMA node for placing this block.
     * @param block_handle Where the pulled block or blob stores.
     *
     * @return Whether the pull operation is successful or not.
     */
    bool Pull(const block_id block,
              const numa_node_id numa_node,
              BlockHandle *block_handle);

   private:
    std::unique_ptr<DataExchange::Stub> stub_;

    StorageManager *storage_manager_;

    DISALLOW_COPY_AND_ASSIGN(DataExchangerClientAsync);
  };

  /**
   * @brief Get the network info of the given block domain.
   *
   * @param block_domain The domain of block or blob to pull.
   *
   * @return The network info of the given block domain.
   **/
  std::string getPeerDomainNetworkAddress(const block_id_domain block_domain);

  /**
   * @brief Update the block location info in BlockLocator.
   *
   * @param block The given block or blob.
   * @param message_type Indicate whether to add or delete a block location.
   **/
  void sendBlockLocationMessage(const block_id block,
                                const tmb::message_type_id message_type);
#endif

  // Helper for createBlock() and createBlob(). Allocates a block ID and memory
  // slots for a new StorageBlock or StorageBlob. Returns the allocated ID and
  // writes the allocated slot range into 'handle->slot_index_low' and
  // 'handle->slot_index_high'.
  block_id allocateNewBlockOrBlob(const std::size_t num_slots,
                                  BlockHandle *handle,
                                  const int numa_node);

  // Helper for loadBlock() and loadBlob(). Allocates memory slots for the
  // loaded block or blob and reads raw data from the persistent storage into
  // memory. Returns a partially-constructed BlockHandle which has
  // 'slot_index_low' and 'slot_index_high' properly set to indicate the memory
  // slots which raw data has been read into, but for which a StorageBlob or
  // StorageBlock object has not yet been constructed.
  BlockHandle loadBlockOrBlob(const block_id block, const int numa_node);

  // Helper for loadBlock() and loadBlob(). Inserts an entry (block, handle)
  // into 'blocks_'. If there is already an entry in 'blocks_' for 'block',
  // deletes the blob or block in 'handle', frees the slots which it occupied.
  void insertBlockHandleAfterLoad(const block_id block,
                                  const BlockHandle &handle);

  // Allocate a buffer to hold the specified number of slots. The memory
  // returned will be zeroed-out, and mapped using large pages if the system
  // supports it.
  void* allocateSlots(const std::size_t num_slots,
                      const int numa_node);

  // Deallocate a buffer allocated by allocateSlots(). This must be used
  // instead of free(), because the underlying implementation of
  // allocateSlots() may use mmap instead of malloc.
  void deallocateSlots(void *slots,
                       const std::size_t num_slots);

  /**
   * @brief Evict a block or blob from memory.
   * @note The block is NOT automatically saved, so call saveBlock() first if
   *       necessary.
   *
   * @param block The id of the block to evict.
   * @exception BlockNotFoundInMemory The block with the specified id is not
   *            in memory.
   **/
  void evictBlockOrBlob(const block_id block);

  /**
   * @brief Do the work that is common to getBlock and getBlockMutable, namely,
   *        actually loading the block if necessary.
   * @param block    The block to be acquired.
   * @param relation The relation the block is a part of.
   * @param numa_node The NUMA node where the block is to be loaded.
   * @return A CountedReference to the block.
   * @exception OutOfMemory The system has run out of memory.
   */
  MutableBlockReference getBlockInternal(const block_id block,
                                         const CatalogRelationSchema &relation,
                                         const int numa_node);

  /**
   * @brief Do the work that is common to getBlob and getBlobMutable, namely,
   *        actually loading the block if necessary.
   * @param  blob The blob to be acquired.
   * @param numa_node The NUMA node where the blob is to be loaded.
   * @return A CountedReference to the blob.
   * @exception OutOfMemory The system has run out of memory.
   */
  MutableBlobReference getBlobInternal(const block_id blob,
                                       const int numa_node);

  /**
   * @brief Evict blocks or blobs until there is enough space for a new block
   *        or blob of the requested size.
   *
   * @note This non-blocking method gives up evictions if there is a shard
   *       collision, and thus the buffer pool size may temporarily go beyond
   *       the memory limit.
   *
   * @param slots Number of slots to make room for.
   */
  void makeRoomForBlockOrBlob(const std::size_t slots);

  /**
   * @brief Load a block from the persistent storage into memory.
   *
   * @param block The id of the block to load.
   * @param relation The relation which the block belongs to.
   * @param numa_node The NUMA node where the block is to be loaded.
   * @return The StorageBlock that was loaded.
   * @exception BlockNotFoundInPersistentStorage The block with the specified
   *            id can't be found on the persistent storage.
   * @exception CorruptPersistentStorage The storage directory layout is not
   *            in the expected format.
   * @exception OutOfMemory The system has run out of memory.
   * @exception UnableToOpenFile The block's persistent storage file couldn't be
   *            opened for reading.
   **/
  StorageBlock* loadBlock(const block_id block,
                          const CatalogRelationSchema &relation,
                          const int numa_node);

  /**
   * @brief Load a StorageBlob from the persistent storage into memory.
   *
   * @param blob The ID of the blob to load.
   * @param numa_node The NUMA node where the blob is to be loaded.
   *
   * @exception BlockNotFoundInPersistentStorage The block with the specified
   *            id can't be found on the persistent storage.
   * @exception CorruptPersistentStorage The storage directory layout is not
   *            in the expected format.
   * @exception OutOfMemory The system has run out of memory.
   * @exception UnableToOpenFile The block's persistent storage file couldn't be
   *            opened for reading.
   **/
  StorageBlob* loadBlob(const block_id blob, const int numa_node);

  // File system path where block files are stored. Fixed when StorageManager
  // is created.
  const std::string storage_path_;

  // The current memory usage of all storage blocks and blobs in slots.
  std::atomic<size_t> total_memory_usage_;
  // The maximum allowed memory usage of all storage blocks and blobs in slots.
  const size_t max_memory_usage_;

  std::unique_ptr<EvictionPolicy> eviction_policy_;

#ifdef QUICKSTEP_DISTRIBUTED
  const block_id_domain block_domain_;

  tmb::client_id storage_manager_client_id_;
#endif

  const tmb::client_id block_locator_client_id_;
  tmb::MessageBus *bus_;

  std::unique_ptr<FileManager> file_manager_;

  // Used to generate unique IDs in allocateNewBlockOrBlob().
  std::atomic<block_id> block_index_;

  // Directory of in-memory blocks. Read by blockIsLoaded(), saveBlock(),
  // getBlock()/getBlockMutable() and blockOrBlobIsLoadedAndDirty().
  // Modified by createBlock(), loadBlock(), and evictBlock().
  //
  // TODO(chasseur): Look into concurrent map implementations that allow
  // finer-grained locking.
  std::unordered_map<block_id, BlockHandle> blocks_;
  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> blocks_shared_mutex_;

  // Used to pull a remote block.
  std::unordered_map<block_id_domain, std::string> block_domain_network_addresses_;
  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_domain_network_addresses_shared_mutex_;

  // This lock manager is used with the following contract:
  //   (1) A block cannot be evicted unless an exclusive lock is held on its
  //       lock shard.
  //   (2) If it is not safe to evict a block, then either that block's
  //       reference count is greater than 0 or a shared lock is held on the
  //       block's lock shard.
  // TODO(jmp): Would be good to set this more intelligently in the future
  //            based on the hardware concurrency, the amount of main memory
  //            and slot size. For now pick the largest prime that is less
  //            than 8K.
  static constexpr std::size_t kLockManagerNumShards = 0x2000-1;
  ShardedLockManager<block_id, kLockManagerNumShards, SpinSharedMutex<false>> lock_manager_;

  friend class BlockLocatorTest;
  FRIEND_TEST(BlockLocatorTest, BlockTest);
  FRIEND_TEST(BlockLocatorTest, BlobTest);

  FRIEND_TEST(StorageManagerTest, DifferentNUMANodeBlobTestWithEviction);
  FRIEND_TEST(StorageManagerTest, EvictFromSameShardTest);

  DISALLOW_COPY_AND_ASSIGN(StorageManager);
};

/** @} */

}  // namespace quickstep

#endif  // QUICKSTEP_STORAGE_STORAGE_MANAGER_HPP_
