/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 **/

#ifndef QUICKSTEP_CATALOG_PARTITION_SCHEME_HEADER_HPP_
#define QUICKSTEP_CATALOG_PARTITION_SCHEME_HEADER_HPP_

#include <cstddef>
#include <memory>
#include <random>
#include <string>
#include <utility>
#include <vector>

#include "catalog/Catalog.pb.h"
#include "catalog/CatalogTypedefs.hpp"
#include "storage/StorageConstants.hpp"
#include "threading/SpinMutex.hpp"
#include "types/TypedValue.hpp"
#include "types/operations/comparisons/Comparison.hpp"
#include "types/operations/comparisons/EqualComparison.hpp"
#include "types/operations/comparisons/LessComparison.hpp"
#include "utility/CompositeHash.hpp"
#include "utility/Macros.hpp"

#include "glog/logging.h"

namespace quickstep {

class CatalogRelationSchema;
class Type;

/** \addtogroup Catalog
 *  @{
 */

/**
 * @brief The base class which stores the partitioning information for a
 *        particular relation.
 **/
class PartitionSchemeHeader {
 public:
  // A vector for partitioning catalog attributes.
  typedef std::vector<attribute_id> PartitionAttributeIds;

  // The values for partition attributes.
  // PartitionValues.size() should be equal to PartitionAttributeIds.size().
  typedef std::vector<TypedValue> PartitionValues;

  enum class PartitionType {
    kHash = 0,
    kRandom,
    kRange
  };

  /**
   * @brief Virtual destructor.
   **/
  virtual ~PartitionSchemeHeader() {
  }

  /**
   * @brief Reconstruct a PartitionSchemeHeader from its serialized
   *        Protocol Buffer form.
   *
   * @param proto The Protocol Buffer serialization of a PartitionSchemeHeader,
   *        previously produced by getProto().
   * @return The reconstructed PartitionSchemeHeader object.
   **/
  static PartitionSchemeHeader* ReconstructFromProto(
      const serialization::PartitionSchemeHeader &proto);

  /**
   * @brief Check whether a serialization::PartitionSchemeHeader is fully-formed
   *        and all parts are valid.
   *
   * @param proto A serialized Protocol Buffer representation of a
   *              PartitionSchemeHeader, originally generated by getProto().
   * @return Whether proto is fully-formed and valid.
   **/
  static bool ProtoIsValid(const serialization::PartitionSchemeHeader &proto);

  /**
   * @brief Calculate the partition id into which the attribute value should
   *        be inserted.
   *
   * @param value_of_attributes A vector of attribute values for which the
   *        partition id is to be determined.
   * @return The partition id of the partition for the attribute value.
   **/
  // TODO(gerald): Make this method more efficient since currently this is
  // done for each and every tuple. We can go through the entire set of tuples
  // once using a value accessor and create bitmaps for each partition with
  // tuples that correspond to those partitions.
  virtual partition_id getPartitionId(
      const PartitionValues &value_of_attributes) const = 0;

  /**
   * @brief Serialize the Partition Scheme as Protocol Buffer.
   *
   * @return The Protocol Buffer representation of Partition Scheme.
   **/
  virtual serialization::PartitionSchemeHeader getProto() const;

  /**
   * @brief Get the partition type of the relation.
   *
   * @return The partition type used to partition the relation.
   **/
  inline PartitionType getPartitionType() const {
    return partition_type_;
  }

  /**
   * @brief Get the number of partitions for the relation.
   *
   * @return The number of partitions the relation is partitioned into.
   **/
  inline std::size_t getNumPartitions() const {
    return num_partitions_;
  }

  /**
   * @brief Get the partitioning attributes for the relation.
   *
   * @return The partitioning attributes with which the relation
   *         is partitioned into.
   **/
  inline const PartitionAttributeIds& getPartitionAttributeIds() const {
    return partition_attribute_ids_;
  }

 protected:
  /**
   * @brief Constructor.
   *
   * @param type The type of partitioning to be used to partition the
   *             relation.
   * @param num_partitions The number of partitions to be created.
   * @param attr_ids The attributes on which the partitioning happens.
   **/
  PartitionSchemeHeader(const PartitionType type,
                        const std::size_t num_partitions,
                        PartitionAttributeIds &&attr_ids);  // NOLINT(whitespace/operators)

  // The type of partitioning: Hash or Range.
  const PartitionType partition_type_;
  // The number of partitions.
  const std::size_t num_partitions_;
  // The attribute of partioning.
  const PartitionAttributeIds partition_attribute_ids_;

 private:
  std::string toString(const CatalogRelationSchema &relation_schema) const;

  friend class PartitionScheme;

  DISALLOW_COPY_AND_ASSIGN(PartitionSchemeHeader);
};

/**
 * @brief Implementation of PartitionSchemeHeader that partitions the tuples in
 *        a relation based on a hash function on the partitioning attribute.
**/
class HashPartitionSchemeHeader final : public PartitionSchemeHeader {
 public:
  /**
   * @brief Constructor.
   *
   * @param num_partitions The number of partitions to be created.
   * @param attributes A vector of attributes on which the partitioning happens.
   **/
  HashPartitionSchemeHeader(const std::size_t num_partitions,
                            PartitionAttributeIds &&attributes)  // NOLINT(whitespace/operators)
      : PartitionSchemeHeader(PartitionType::kHash, num_partitions, std::move(attributes)) {
  }

  /**
   * @brief Destructor.
   **/
  ~HashPartitionSchemeHeader() override {
  }

  partition_id getPartitionId(
      const PartitionValues &value_of_attributes) const override {
    DCHECK_EQ(partition_attribute_ids_.size(), value_of_attributes.size());
    // TODO(gerald): Optimize for the case where the number of partitions is a
    // power of 2. We can just mask out the lower-order hash bits rather than
    // doing a division operation.
    return HashCompositeKey(value_of_attributes) % num_partitions_;
  }

 private:
  DISALLOW_COPY_AND_ASSIGN(HashPartitionSchemeHeader);
};

/**
 * @brief Implementation of PartitionSchemeHeader that partitions the tuples in
 *        a relation randomly.
**/
class RandomPartitionSchemeHeader final : public PartitionSchemeHeader {
 public:
  /**
   * @brief Constructor.
   *
   * @param num_partitions The number of partitions to be created.
   **/
  explicit RandomPartitionSchemeHeader(const std::size_t num_partitions)
      : PartitionSchemeHeader(PartitionType::kRandom, num_partitions, {}),
        mt_(1729),
        dist_(0, num_partitions - 1) {
  }

  /**
   * @brief Destructor.
   **/
  ~RandomPartitionSchemeHeader() override {}

  partition_id getPartitionId(
      const PartitionValues &value_of_attributes) const override {
    SpinMutexLock lock(mutex_);
    return dist_(mt_);
  }

 private:
  mutable std::mt19937_64 mt_;
  mutable std::uniform_int_distribution<partition_id> dist_;

  alignas(kCacheLineBytes) mutable SpinMutex mutex_;

  DISALLOW_COPY_AND_ASSIGN(RandomPartitionSchemeHeader);
};

/**
 * @brief Implementation of PartitionSchemeHeader that partitions the tuples in
 *        a relation based on a given value range on the partitioning attribute.
**/
class RangePartitionSchemeHeader final : public PartitionSchemeHeader {
 public:
  /**
   * @brief Constructor.
   *
   * @param num_partitions The number of partitions to be created.
   * @param attributes A vector of attribute_ids on which the partitioning
   *        happens.
   * @param partition_attribute_types The types of CatalogAttributes used for
   *        partitioning.
   * @param partition_ranges The mapping between the partition ids and the upper
   *        bound of the range boundaries. If two ranges R1 and R2 are separated
   *        by a vector of boundary values V, then V would fall into range R2.
   *        For creating a range partition scheme with n partitions, you need to
   *        specify n-1 range boundaries. The first partition will have all the
   *        values less than the first item of range boundaries and the last
   *        partition would have all values greater than or equal to the last
   *        item of range boundaries.
   **/
  RangePartitionSchemeHeader(const std::size_t num_partitions,
                             PartitionAttributeIds &&attributes,  // NOLINT(whitespace/operators)
                             std::vector<const Type*> &&partition_attribute_types,
                             std::vector<PartitionValues> &&partition_ranges)
      : PartitionSchemeHeader(PartitionType::kRange, num_partitions, std::move(attributes)),
        partition_attr_types_(std::move(partition_attribute_types)),
        partition_range_boundaries_(std::move(partition_ranges)) {
    DCHECK_EQ(partition_attribute_ids_.size(), partition_attr_types_.size());
    DCHECK_EQ(num_partitions - 1, partition_range_boundaries_.size());

    const Comparison &less_comparison_op(LessComparison::Instance());
    for (const Type *type : partition_attr_types_) {
      std::unique_ptr<UncheckedComparator> less_unchecked_comparator(
          less_comparison_op.makeUncheckedComparatorForTypes(*type, *type));
      less_unchecked_comparators_.emplace_back(std::move(less_unchecked_comparator));
    }

    const Comparison &equal_comparison_op = EqualComparison::Instance();
    for (const Type *type : partition_attr_types_) {
      std::unique_ptr<UncheckedComparator> equal_unchecked_comparator(
          equal_comparison_op.makeUncheckedComparatorForTypes(*type, *type));
      equal_unchecked_comparators_.emplace_back(std::move(equal_unchecked_comparator));
    }

#ifdef QUICKSTEP_DEBUG
    checkPartitionRangeBoundaries();
#endif
  }

  /**
   * @brief Destructor.
   **/
  ~RangePartitionSchemeHeader() override {
  }

  partition_id getPartitionId(
      const PartitionValues &value_of_attributes) const override {
    DCHECK_EQ(partition_attribute_ids_.size(), value_of_attributes.size());

    partition_id start = 0, end = partition_range_boundaries_.size() - 1;
    if (!lessThan(value_of_attributes, partition_range_boundaries_[end])) {
      return num_partitions_ - 1;
    }

    while (start < end) {
      const partition_id mid = start + ((end - start) >> 1);
      if (lessThan(value_of_attributes, partition_range_boundaries_[mid])) {
        end = mid;
      } else {
        start = mid + 1;
      }
    }

    return start;
  }

  serialization::PartitionSchemeHeader getProto() const override;

  /**
   * @brief Get the range boundaries for partitions.
   *
   * @return The vector of range boundaries for partitions.
   **/
  inline const std::vector<PartitionValues>& getPartitionRangeBoundaries() const {
    return partition_range_boundaries_;
  }

 private:
  /**
   * @brief Check if the partition range boundaries are in ascending order.
   **/
  void checkPartitionRangeBoundaries() {
    for (const PartitionValues &partition_range_boundary : partition_range_boundaries_) {
      CHECK_EQ(partition_attribute_ids_.size(), partition_range_boundary.size())
          << "A partition boundary has different size than that of partition attributes.";
    }

    for (partition_id part_id = 1; part_id < partition_range_boundaries_.size(); ++part_id) {
      CHECK(lessThan(partition_range_boundaries_[part_id - 1], partition_range_boundaries_[part_id]))
          << "Partition boundaries are not in ascending order.";
    }
  }

  /**
   * @brief Check if the partition values are in the lexicographical order.
   *
   * @note (l_0, l_1, ..., l_n) < (r_0, r_1, ..., r_n) iff l_0 < r_0, or
   *       (l_0 == r_0) && (l_1, ..., l_n) < (r_1, ..., r_n).
   **/
  bool lessThan(const PartitionValues &lhs, const PartitionValues &rhs) const {
    DCHECK_EQ(partition_attribute_ids_.size(), lhs.size());
    DCHECK_EQ(partition_attribute_ids_.size(), rhs.size());

    for (std::size_t attr_index = 0; attr_index < partition_attribute_ids_.size(); ++attr_index) {
      if (less_unchecked_comparators_[attr_index]->compareTypedValues(lhs[attr_index], rhs[attr_index])) {
        break;
      } else if (equal_unchecked_comparators_[attr_index]->compareTypedValues(lhs[attr_index], rhs[attr_index])) {
        if (attr_index == partition_attribute_ids_.size() - 1) {
          return false;
        }
      } else {
        return false;
      }
    }

    return true;
  }

  // The size is equal to 'partition_attribute_ids_.size()'.
  const std::vector<const Type*> partition_attr_types_;

  // The boundaries for each range in the RangePartitionSchemeHeader.
  // The upper bound of the range is stored here.
  const std::vector<PartitionValues> partition_range_boundaries_;

  // Both size are equal to 'partition_attr_types_.size()'.
  std::vector<std::unique_ptr<UncheckedComparator>> less_unchecked_comparators_;
  std::vector<std::unique_ptr<UncheckedComparator>> equal_unchecked_comparators_;

  DISALLOW_COPY_AND_ASSIGN(RangePartitionSchemeHeader);
};

/** @} */

}  // namespace quickstep

#endif  // QUICKSTEP_CATALOG_PARTITION_SCHEME_HEADER_HPP_
