/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 **/

#include "catalog/PartitionSchemeHeader.hpp"

#include <cstddef>
#include <unordered_set>
#include <utility>
#include <vector>

#include "catalog/Catalog.pb.h"
#include "catalog/CatalogTypedefs.hpp"
#include "types/Type.hpp"
#include "types/Type.pb.h"
#include "types/TypeFactory.hpp"
#include "types/TypedValue.hpp"
#include "types/TypedValue.pb.h"

#include "glog/logging.h"

using std::move;
using std::size_t;
using std::vector;

namespace quickstep {

PartitionSchemeHeader::PartitionSchemeHeader(const PartitionType type,
                                             const std::size_t num_partitions,
                                             PartitionAttributeIds &&attr_ids)  // NOLINT(whitespace/operators)
    : partition_type_(type),
      num_partitions_(num_partitions),
      partition_attribute_ids_(move(attr_ids)) {
  DCHECK_GT(num_partitions, 0u);

#ifdef QUICKSTEP_DEBUG
  // Ensure that no duplicated partition attributes exist.
  std::unordered_set<attribute_id> partition_attribute_ids;
  for (const attribute_id attr_id : partition_attribute_ids_) {
    DCHECK_NE(attr_id, kInvalidCatalogId);

    CHECK_EQ(0u, partition_attribute_ids.count(attr_id));
    partition_attribute_ids.insert(attr_id);
  }
#endif
}

bool PartitionSchemeHeader::ProtoIsValid(
    const serialization::PartitionSchemeHeader &proto) {
  // Check that proto is fully initialized.
  if (!proto.IsInitialized()) {
    return false;
  }
  // Check if the partitioning attribute exists in the relation.
  // TODO(gerald): Figure out how to find the max_attr_id of the
  // relation so that we can check that partitioning attribute
  // is not greater than the max_attr_id.

  // Check that the proto has a valid partition type.
  switch (proto.partition_type()) {
    case serialization::PartitionSchemeHeader::HASH:
    case serialization::PartitionSchemeHeader::RANDOM:
      return true;
    case serialization::PartitionSchemeHeader::RANGE: {
      return proto.HasExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries) &&
             proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_range_boundaries) ==
                 static_cast<int>(proto.num_partitions() - 1) &&
             proto.HasExtension(serialization::RangePartitionSchemeHeader::partition_attr_types) &&
             proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_attr_types) ==
                 proto.partition_attribute_ids_size();
    }
    default:
      // Partition type is unknown.
      return false;
  }
}

PartitionSchemeHeader* PartitionSchemeHeader::ReconstructFromProto(
    const serialization::PartitionSchemeHeader &proto) {
  DCHECK(ProtoIsValid(proto))
      << "Attempted to create PartitionSchemeHeader from an invalid proto description:\n"
      << proto.DebugString();

  PartitionAttributeIds partition_attribute_ids;
  for (int i = 0; i < proto.partition_attribute_ids_size(); ++i) {
    partition_attribute_ids.push_back(proto.partition_attribute_ids(i));
  }

  switch (proto.partition_type()) {
    case serialization::PartitionSchemeHeader::HASH: {
      return new HashPartitionSchemeHeader(proto.num_partitions(), move(partition_attribute_ids));
    }
    case serialization::PartitionSchemeHeader::RANDOM: {
      return new RandomPartitionSchemeHeader(proto.num_partitions());
    }
    case serialization::PartitionSchemeHeader::RANGE: {
      std::vector<const Type*> attr_types;
      for (int i = 0; i < proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_attr_types); ++i) {
        attr_types.push_back(&TypeFactory::ReconstructFromProto(
            proto.GetExtension(serialization::RangePartitionSchemeHeader::partition_attr_types, i)));
      }

      const int partition_ranges_size =
          proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_range_boundaries);
      std::vector<PartitionValues> partition_ranges(partition_ranges_size);
      for (int i = 0; i < partition_ranges_size; ++i) {
        const auto &proto_partition_values =
            proto.GetExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries, i);
        for (int j = 0; j < proto_partition_values.partition_values_size(); ++j) {
          partition_ranges[i].emplace_back(TypedValue::ReconstructFromProto(
              proto_partition_values.partition_values(j)));
        }
      }

      return new RangePartitionSchemeHeader(proto.num_partitions(),
                                            move(partition_attribute_ids),
                                            move(attr_types),
                                            move(partition_ranges));
    }
    default:
      LOG(FATAL) << "Invalid partition scheme header.";
      // Avoid '-Werror=return-type' using GCC.
      return nullptr;
  }
}

serialization::PartitionSchemeHeader PartitionSchemeHeader::getProto() const {
  serialization::PartitionSchemeHeader proto;

  switch (partition_type_) {
    case PartitionType::kHash:
      proto.set_partition_type(serialization::PartitionSchemeHeader::HASH);
      break;
    case PartitionType::kRandom:
      proto.set_partition_type(serialization::PartitionSchemeHeader::RANDOM);
      break;
    case PartitionType::kRange:
      proto.set_partition_type(serialization::PartitionSchemeHeader::RANGE);
      break;
    default:
      LOG(FATAL) << "Invalid Partition Type.";
  }

  proto.set_num_partitions(num_partitions_);

  for (const attribute_id attr_id : partition_attribute_ids_) {
    proto.add_partition_attribute_ids(attr_id);
  }

  return proto;
}

serialization::PartitionSchemeHeader RangePartitionSchemeHeader::getProto() const {
  serialization::PartitionSchemeHeader proto = PartitionSchemeHeader::getProto();

  for (const Type *type : partition_attr_types_) {
    proto.AddExtension(serialization::RangePartitionSchemeHeader::partition_attr_types)
        ->MergeFrom(type->getProto());
  }

  for (const PartitionValues &partition_range_boundary : partition_range_boundaries_) {
    serialization::PartitionValues *proto_range_boundary =
        proto.AddExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries);

    for (const TypedValue &value : partition_range_boundary) {
      proto_range_boundary->add_partition_values()->MergeFrom(value.getProto());
    }
  }

  return proto;
}

}  // namespace quickstep
