/**
 *   Copyright 2011-2015 Quickstep Technologies LLC.
 *   Copyright 2015-2016 Pivotal Software, Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 **/

#ifndef QUICKSTEP_STORAGE_INSERT_DESTINATION_HPP_
#define QUICKSTEP_STORAGE_INSERT_DESTINATION_HPP_

#include <cstddef>
#include <cstdlib>
#include <memory>
#include <utility>
#include <vector>

#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "catalog/PartitionSchemeHeader.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "storage/InsertDestinationInterface.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageBlockLayout.hpp"
#include "threading/SpinMutex.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/Macros.hpp"

#include "glog/logging.h"

#include "gtest/gtest_prod.h"

#include "tmb/id_typedefs.h"
#include "tmb/tagged_message.h"

namespace tmb { class MessageBus; }

namespace quickstep {

class StorageManager;
class ValueAccessor;

namespace merge_run_operator {
class RunCreator;
}  // namespace merge_run_operator

namespace serialization { class InsertDestination; }

/** \addtogroup Storage
 *  @{
 */

/**
 * @brief Base class for different strategies for getting blocks to insert
 *        tuples into.
 **/
class InsertDestination : public InsertDestinationInterface {
 public:
  /**
   * @brief Constructor.
   *
   * @param relation The relation to insert tuples into.
   * @param layout The layout to use for any newly-created blocks. If NULL,
   *        defaults to relation's default layout.
   * @param storage_manager The StorageManager to use.
   * @param relational_op_index The index of the relational operator in the
   *        QueryPlan DAG that has outputs.
   * @param foreman_client_id The TMB client ID of the Foreman thread.
   * @param bus A pointer to the TMB.
   **/
  InsertDestination(const CatalogRelationSchema &relation,
                    const StorageBlockLayout *layout,
                    StorageManager *storage_manager,
                    const std::size_t relational_op_index,
                    const tmb::client_id foreman_client_id,
                    const tmb::client_id agent_client_id,
                    tmb::MessageBus *bus);

  /**
   * @brief Virtual destructor.
   **/
  virtual ~InsertDestination() {
  }

  /**
   * @brief A factory method to generate the InsertDestination from the
   *        serialized Protocol Buffer representation.
   *
   * @param proto A serialized Protocol Buffer representation of an
   *        InsertDestination, originally generated by the optimizer.
   * @param relation The relation to insert tuples into.
   * @param storage_manager The StorageManager to use.
   * @param foreman_client_id The TMB client ID of the Foreman thread.
   * @param agent_client_id The TMB client ID of the agent that sends messages
   *        to Foreman.
   * @param bus A pointer to the TMB.
   *
   * @return The constructed InsertDestination.
   */
  static InsertDestination* ReconstructFromProto(const serialization::InsertDestination &proto,
                                                 const CatalogRelationSchema &relation,
                                                 StorageManager *storage_manager,
                                                 const tmb::client_id foreman_client_id,
                                                 const tmb::client_id agent_client_id,
                                                 tmb::MessageBus *bus);

  /**
   * @brief Check whether a serialized InsertDestination is fully-formed and
   *        all parts are valid.
   *
   * @param proto A serialized Protocol Buffer representation of an
   *        InsertDestination, originally generated by the optimizer.
   * @param relation The relation to insert tuples into.
   *
   * @return Whether proto is fully-formed and valid.
   **/
  static bool ProtoIsValid(const serialization::InsertDestination &proto,
                           const CatalogRelationSchema &relation);

  const CatalogRelationSchema& getRelation() const override {
    return relation_;
  }

  attribute_id getPartitioningAttribute() const override {
    return -1;
  }

  void insertTuple(const Tuple &tuple) override;

  void insertTupleInBatch(const Tuple &tuple) override;

  void bulkInsertTuples(ValueAccessor *accessor, bool always_mark_full = false) override;

  void bulkInsertTuplesWithRemappedAttributes(
      const std::vector<attribute_id> &attribute_map,
      ValueAccessor *accessor,
      bool always_mark_full = false) override;

  void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
                              std::vector<Tuple>::const_iterator end) override;

  /**
   * @brief Get the set of blocks that were used by clients of this
   *        InsertDestination for insertion.
   * @warning Should only be called AFTER this InsertDestination will no longer
   *          be used, and all blocks have been returned to it via
   *          returnBlock().
   *
   * @return A reference to a vector of block_ids of blocks that were used for
   *         insertion.
   **/
  const std::vector<block_id>& getTouchedBlocks() {
    SpinMutexLock lock(mutex_);
    return getTouchedBlocksInternal();
  }

  /**
   * @brief Get the set of blocks that were partially filled by clients of this
   *        InsertDestination for insertion.
   * @warning Should only be called AFTER this InsertDestination will no longer
   *          be used, and all blocks have been returned to it via
   *          returnBlock() and BEFORE getTouchedBlocks() is called, at all.
   *
   * @param partial_blocks A pointer to the vector of block IDs in which the
   *                       partially filled block IDs will be added.
   **/
  virtual void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) = 0;

 protected:
  /**
   * @brief Get a block to use for insertion.
   *
   * @return A block to use for inserting tuples.
   **/
  virtual MutableBlockReference getBlockForInsertion() = 0;

  /**
   * @brief Release a block after done using it for insertion.
   * @note This should ALWAYS be called when done inserting into a block.
   *
   * @param block A block, originally supplied by getBlockForInsertion(),
   *        which the client is finished using.
   * @param full If true, the client ran out of space when trying to insert
   *        into block. If false, all inserts were successful.
   **/
  virtual void returnBlock(MutableBlockReference &&block, const bool full) = 0;

  // TODO(chasseur): Once StorageManager is threadsafe, it will be safe to use
  // this without holding the mutex.
  virtual MutableBlockReference createNewBlock() = 0;

  virtual const std::vector<block_id>& getTouchedBlocksInternal() = 0;

  /**
   * @brief When a StorageBlock becomes full, pipeline the block id to Foreman.
   *
   * @param id The id of the StorageBlock to be pipelined.
   **/
  void sendBlockFilledMessage(const block_id id) const {
    serialization::DataPipelineMessage proto;
    proto.set_operator_index(relational_op_index_);
    proto.set_block_id(id);
    proto.set_relation_id(relation_.getID());

    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
    const std::size_t proto_length = proto.ByteSize();
    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
    CHECK(proto.SerializeToArray(proto_bytes, proto_length));

    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
                                      proto_length,
                                      kDataPipelineMessage);
    std::free(proto_bytes);

    const tmb::MessageBus::SendStatus send_status =
        QueryExecutionUtil::SendTMBMessage(bus_,
                                           agent_client_id_,
                                           foreman_client_id_,
                                           std::move(tagged_message));
    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
        << "Message could not be sent from thread with TMB client ID " << agent_client_id_
        << " to Foreman with TMB client ID " << foreman_client_id_;
  }

  StorageManager *storage_manager_;
  const CatalogRelationSchema &relation_;

  std::unique_ptr<const StorageBlockLayout> layout_;
  const std::size_t relational_op_index_;

  const tmb::client_id foreman_client_id_, agent_client_id_;
  tmb::MessageBus *bus_;

  // TODO(chasseur): If contention is high, finer-grained locking of internal
  // data members in subclasses is possible.
  SpinMutex mutex_;

 private:
  // TODO(shoban): Workaround to support sort. Sort needs finegrained control of
  // blocks being used to insert, since inserting in an arbitrary block could
  // lead to unsorted results. InsertDestination API changed while sort was
  // being implemented.
  friend class merge_run_operator::RunCreator;

  DISALLOW_COPY_AND_ASSIGN(InsertDestination);
};

/**
 * @brief Implementation of InsertDestination that always creates new blocks,
 *        leaving some blocks potentially very underfull.
 **/
class AlwaysCreateBlockInsertDestination : public InsertDestination {
 public:
  /**
   * @brief Constructor.
   *
   * @param relation The relation to insert tuples into.
   * @param layout The layout to use for any newly-created blocks. If NULL,
   *        defaults to relation's default layout.
   * @param storage_manager The StorageManager to use.
   * @param relational_op_index The index of the relational operator in the
   *        QueryPlan DAG that has outputs.
   * @param foreman_client_id The TMB client ID of the Foreman thread.
   * @param agent_client_id The TMB client ID of the agent that sends messages
   *        to Foreman.
   * @param bus A pointer to the TMB.
   **/
  AlwaysCreateBlockInsertDestination(const CatalogRelationSchema &relation,
                                     const StorageBlockLayout *layout,
                                     StorageManager *storage_manager,
                                     const std::size_t relational_op_index,
                                     const tmb::client_id foreman_client_id,
                                     const tmb::client_id agent_client_id,
                                     tmb::MessageBus *bus)
      : InsertDestination(relation, layout, storage_manager, relational_op_index,
                          foreman_client_id, agent_client_id, bus) {
  }

  ~AlwaysCreateBlockInsertDestination() override {
  }

 protected:
  MutableBlockReference getBlockForInsertion() override;

  void returnBlock(MutableBlockReference &&block, const bool full) override;

  MutableBlockReference createNewBlock() override;

  const std::vector<block_id>& getTouchedBlocksInternal() override {
    return returned_block_ids_;
  }

  void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) override {
  }

 private:
  std::vector<block_id> returned_block_ids_;

  DISALLOW_COPY_AND_ASSIGN(AlwaysCreateBlockInsertDestination);
};

/**
 * @brief Implementation of InsertDestination that keeps a pool of
 *        partially-full blocks. Creates new blocks as necessary when
 *        getBlockForInsertion() is called and there are no partially-full
 *        blocks from the pool which are not "checked out" by workers.
 **/
class BlockPoolInsertDestination : public InsertDestination {
 public:
  /**
   * @brief Constructor.
   *
   * @param relation The relation to insert tuples into.
   * @param layout The layout to use for any newly-created blocks. If NULL,
   *        defaults to relation's default layout.
   * @param storage_manager The StorageManager to use.
   * @param relational_op_index The index of the relational operator in the
   *        QueryPlan DAG that has outputs.
   * @param foreman_client_id The TMB client ID of the Foreman thread.
   * @param agent_client_id The TMB client ID of the agent that sends messages
   *        to Foreman.
   * @param bus A pointer to the TMB.
   **/
  BlockPoolInsertDestination(const CatalogRelationSchema &relation,
                             const StorageBlockLayout *layout,
                             StorageManager *storage_manager,
                             const std::size_t relational_op_index,
                             const tmb::client_id foreman_client_id,
                             const tmb::client_id agent_client_id,
                             tmb::MessageBus *bus)
      : InsertDestination(relation, layout, storage_manager, relational_op_index,
                          foreman_client_id, agent_client_id, bus) {
  }

  /**
   * @brief Constructor.
   *
   * @param relation The relation to insert tuples into.
   * @param layout The layout to use for any newly-created blocks. If NULL,
   *        defaults to relation's default layout.
   * @param storage_manager The StorageManager to use.
   * @blocks The existing blocks used for insertions.
   * @param relational_op_index The index of the relational operator in the
   *        QueryPlan DAG that has outputs.
   * @param foreman_client_id The TMB client ID of the Foreman thread.
   * @param agent_client_id The TMB client ID of the agent that sends messages
   *        to Foreman.
   * @param bus A pointer to the TMB.
   **/
  BlockPoolInsertDestination(const CatalogRelationSchema &relation,
                             const StorageBlockLayout *layout,
                             StorageManager *storage_manager,
                             std::vector<block_id> &&blocks,
                             const std::size_t relational_op_index,
                             const tmb::client_id foreman_client_id,
                             const tmb::client_id agent_client_id,
                             tmb::MessageBus *bus)
      : InsertDestination(relation, layout, storage_manager, relational_op_index,
                          foreman_client_id, agent_client_id, bus),
        available_block_ids_(std::move(blocks)) {
    // TODO(chasseur): Once block fill statistics are available, replace this
    // with something smarter.
  }

  ~BlockPoolInsertDestination() override {
  }

 protected:
  MutableBlockReference getBlockForInsertion() override;

  void returnBlock(MutableBlockReference &&block, const bool full) override;

  void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) override;

  const std::vector<block_id>& getTouchedBlocksInternal() override;

  MutableBlockReference createNewBlock() override;

 private:
  FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);

  // A vector of references to blocks which are loaded in memory.
  std::vector<MutableBlockReference> available_block_refs_;
  // A vector of blocks from the relation that are not loaded in memory yet.
  std::vector<block_id> available_block_ids_;
  // A vector of fully filled blocks.
  std::vector<block_id> done_block_ids_;

  DISALLOW_COPY_AND_ASSIGN(BlockPoolInsertDestination);
};


class PartitionAwareInsertDestination : public InsertDestination {
 public:
  /**
   * @brief Constructor.
   *
   * @note PartitionAwareInsertDestination takes ownership of \c
   *       partition_scheme_header.
   *
   * @param partition_scheme_header The partitioned scheme header information.
   * @param storage_manager The StorageManager to use.
   * @param relation The relation to insert tuples into.
   * @param layout The layout to use for any newly-created blocks. If NULL,
   *        defaults to relation's default layout.
   * @param partitions The blocks in partitions.
   * @param relational_op_index The index of the relational operator in the
   *        QueryPlan DAG that has outputs.
   * @param foreman_client_id The TMB client ID of the Foreman thread.
   * @param agent_client_id The TMB client ID of the agent that sends messages
   *        to Foreman.
   * @param bus A pointer to the TMB.
   **/
  PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
                                  const CatalogRelationSchema &relation,
                                  const StorageBlockLayout *layout,
                                  StorageManager *storage_manager,
                                  std::vector<std::vector<block_id>> &&partitions,
                                  const std::size_t relational_op_index,
                                  const tmb::client_id foreman_client_id,
                                  const tmb::client_id agent_client_id,
                                  tmb::MessageBus *bus);

  ~PartitionAwareInsertDestination() override {
    delete[] mutexes_for_partition_;
  }

  /**
   * @brief Manually add a block to the pool.
   * @warning Call only ONCE for each block to add to the pool.
   *
   * @param bid The ID of the block to add to the pool.
   * @part_id The partition to add the block to.
   **/
  void addBlockToPool(const block_id bid, const partition_id part_id) {
    SpinMutexLock lock(mutexes_for_partition_[part_id]);
    available_block_ids_[part_id].push_back(bid);
  }

  void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) override {
    // Iterate through each partition and return the partially filled blocks
    // in each partition.
    for (partition_id part_id = 0; part_id < partition_scheme_header_->getNumPartitions(); ++part_id) {
      getPartiallyFilledBlocksInPartition(partial_blocks, part_id);
    }
  }

  /**
   * @brief Get the set of blocks that were partially filled by clients of this
   *        InsertDestination for insertion.
   * @warning Should only be called AFTER this InsertDestination will no longer
   *          be used, and all blocks have been returned to it via
   *          returnBlock() and BEFORE getTouchedBlocks() is called, at all.
   *
   * @param partial_blocks A pointer to the vector of block IDs in which the
   *                       partially filled block IDs will be added.
   * @param part_id The partition id for which we want the partially filled blocks.
   **/
  void getPartiallyFilledBlocksInPartition(std::vector<MutableBlockReference> *partial_blocks, partition_id part_id) {
    SpinMutexLock lock(mutexes_for_partition_[part_id]);
    for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_[part_id].size(); ++i) {
      partial_blocks->push_back((std::move(available_block_refs_[part_id][i])));
    }
    available_block_refs_[part_id].clear();
  }

  attribute_id getPartitioningAttribute() const override;

  void insertTuple(const Tuple &tuple) override;

  void insertTupleInBatch(const Tuple &tuple) override;

  void bulkInsertTuples(ValueAccessor *accessor, bool always_mark_full = false) override;

  void bulkInsertTuplesWithRemappedAttributes(
      const std::vector<attribute_id> &attribute_map,
      ValueAccessor *accessor,
      bool always_mark_full = false) override;

  void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
                              std::vector<Tuple>::const_iterator end) override;

 protected:
  MutableBlockReference getBlockForInsertion() override;

  /**
   * @brief Get a block to use for insertion from a partition.
   *
   * @param part_id The partition id for which the client requests a block from.
   *
   * @return A block to use for inserting tuples belonging to a particular partition.
   **/
  MutableBlockReference getBlockForInsertionInPartition(const partition_id part_id);

  void returnBlock(MutableBlockReference &&block, const bool full) override;

  /**
   * @brief Release a block after done using it for insertion.
   * @note This should ALWAYS be called when done inserting into a block.
   *
   * @param block A block, originally supplied by getBlockForInsertion(),
   *        which the client is finished using.
   * @param full If true, the client ran out of space when trying to insert
   *        into block. If false, all inserts were successful.
   * @param part_id The partition id into which we should return the block into.
   **/
  void returnBlockInPartition(MutableBlockReference &&block, const bool full, const partition_id part_id);

  MutableBlockReference createNewBlock() override;
  MutableBlockReference createNewBlockInPartition(const partition_id part_id);

  const std::vector<block_id>& getTouchedBlocksInternal() override;
  const std::vector<block_id>& getTouchedBlocksInternalInPartition(partition_id part_id);

 private:
  std::unique_ptr<const PartitionSchemeHeader> partition_scheme_header_;

  // A vector of available block references for each partition.
  std::vector< std::vector<MutableBlockReference> > available_block_refs_;
  // A vector of available block ids for each partition.
  std::vector< std::vector<block_id> > available_block_ids_;
  // A vector of done block ids for each partition.
  std::vector< std::vector<block_id> > done_block_ids_;
  // Done block ids across all partitions.
  std::vector<block_id> all_partitions_done_block_ids_;
  // Mutex for locking each partition separately.
  SpinMutex *mutexes_for_partition_;

  DISALLOW_COPY_AND_ASSIGN(PartitionAwareInsertDestination);
};
/** @} */

}  // namespace quickstep

#endif  // QUICKSTEP_STORAGE_INSERT_DESTINATION_HPP_
