blob: af69f8f9bf21aead13033b8aa59e74a97a2ec1f4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "catalog/PartitionSchemeHeader.hpp"
#include <cstddef>
#include <sstream>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "catalog/Catalog.pb.h"
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "types/Type.hpp"
#include "types/Type.pb.h"
#include "types/TypeFactory.hpp"
#include "types/TypedValue.hpp"
#include "types/TypedValue.pb.h"
#include "glog/logging.h"
using std::move;
using std::size_t;
using std::vector;
namespace quickstep {
PartitionSchemeHeader::PartitionSchemeHeader(const PartitionType type,
const std::size_t num_partitions,
PartitionAttributeIds &&attr_ids) // NOLINT(whitespace/operators)
: partition_type_(type),
num_partitions_(num_partitions),
partition_attribute_ids_(move(attr_ids)) {
DCHECK_GT(num_partitions, 0u);
#ifdef QUICKSTEP_DEBUG
// Ensure that no duplicated partition attributes exist.
std::unordered_set<attribute_id> partition_attribute_ids;
for (const attribute_id attr_id : partition_attribute_ids_) {
DCHECK_NE(attr_id, kInvalidCatalogId);
CHECK_EQ(0u, partition_attribute_ids.count(attr_id));
partition_attribute_ids.insert(attr_id);
}
#endif
}
bool PartitionSchemeHeader::ProtoIsValid(
const serialization::PartitionSchemeHeader &proto) {
// Check that proto is fully initialized.
if (!proto.IsInitialized()) {
return false;
}
// Check if the partitioning attribute exists in the relation.
// TODO(gerald): Figure out how to find the max_attr_id of the
// relation so that we can check that partitioning attribute
// is not greater than the max_attr_id.
// Check that the proto has a valid partition type.
switch (proto.partition_type()) {
case serialization::PartitionSchemeHeader::HASH:
case serialization::PartitionSchemeHeader::RANDOM:
return true;
case serialization::PartitionSchemeHeader::RANGE: {
return proto.HasExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries) &&
proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_range_boundaries) ==
static_cast<int>(proto.num_partitions() - 1) &&
proto.HasExtension(serialization::RangePartitionSchemeHeader::partition_attr_types) &&
proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_attr_types) ==
proto.partition_attribute_ids_size();
}
default:
// Partition type is unknown.
return false;
}
}
PartitionSchemeHeader* PartitionSchemeHeader::ReconstructFromProto(
const serialization::PartitionSchemeHeader &proto) {
DCHECK(ProtoIsValid(proto))
<< "Attempted to create PartitionSchemeHeader from an invalid proto description:\n"
<< proto.DebugString();
PartitionAttributeIds partition_attribute_ids;
for (int i = 0; i < proto.partition_attribute_ids_size(); ++i) {
partition_attribute_ids.push_back(proto.partition_attribute_ids(i));
}
switch (proto.partition_type()) {
case serialization::PartitionSchemeHeader::HASH: {
return new HashPartitionSchemeHeader(proto.num_partitions(), move(partition_attribute_ids));
}
case serialization::PartitionSchemeHeader::RANDOM: {
return new RandomPartitionSchemeHeader(proto.num_partitions());
}
case serialization::PartitionSchemeHeader::RANGE: {
std::vector<const Type*> attr_types;
for (int i = 0; i < proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_attr_types); ++i) {
attr_types.push_back(&TypeFactory::ReconstructFromProto(
proto.GetExtension(serialization::RangePartitionSchemeHeader::partition_attr_types, i)));
}
const int partition_ranges_size =
proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_range_boundaries);
std::vector<PartitionValues> partition_ranges(partition_ranges_size);
for (int i = 0; i < partition_ranges_size; ++i) {
const auto &proto_partition_values =
proto.GetExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries, i);
for (int j = 0; j < proto_partition_values.partition_values_size(); ++j) {
partition_ranges[i].emplace_back(TypedValue::ReconstructFromProto(
proto_partition_values.partition_values(j)));
}
}
return new RangePartitionSchemeHeader(proto.num_partitions(),
move(partition_attribute_ids),
move(attr_types),
move(partition_ranges));
}
default:
LOG(FATAL) << "Invalid partition scheme header.";
// Avoid '-Werror=return-type' using GCC.
return nullptr;
}
}
serialization::PartitionSchemeHeader PartitionSchemeHeader::getProto() const {
serialization::PartitionSchemeHeader proto;
switch (partition_type_) {
case PartitionType::kHash:
proto.set_partition_type(serialization::PartitionSchemeHeader::HASH);
break;
case PartitionType::kRandom:
proto.set_partition_type(serialization::PartitionSchemeHeader::RANDOM);
break;
case PartitionType::kRange:
proto.set_partition_type(serialization::PartitionSchemeHeader::RANGE);
break;
default:
LOG(FATAL) << "Invalid Partition Type.";
}
proto.set_num_partitions(num_partitions_);
for (const attribute_id attr_id : partition_attribute_ids_) {
proto.add_partition_attribute_ids(attr_id);
}
return proto;
}
std::string PartitionSchemeHeader::toString(const CatalogRelationSchema &relation_schema) const {
std::ostringstream oss;
oss << "PARTITION BY ";
switch (partition_type_) {
case PartitionType::kHash:
oss << "HASH";
break;
case PartitionType::kRandom:
oss << "RANDOM";
break;
case PartitionType::kRange:
oss << "RANGE";
break;
default:
LOG(FATAL) << "Invalid Partition Type.";
}
oss << " ( ";
if (!partition_attribute_ids_.empty()) {
const CatalogAttribute *attr = relation_schema.getAttributeById(partition_attribute_ids_[0]);
DCHECK(attr);
oss << attr->getName();
for (size_t i = 1; i < partition_attribute_ids_.size(); ++i) {
attr = relation_schema.getAttributeById(partition_attribute_ids_[i]);
DCHECK(attr);
oss << ", " << attr->getName();
}
}
oss << " ) PARTITIONS " << num_partitions_ << '\n';
return oss.str();
}
serialization::PartitionSchemeHeader RangePartitionSchemeHeader::getProto() const {
serialization::PartitionSchemeHeader proto = PartitionSchemeHeader::getProto();
for (const Type *type : partition_attr_types_) {
proto.AddExtension(serialization::RangePartitionSchemeHeader::partition_attr_types)
->MergeFrom(type->getProto());
}
for (const PartitionValues &partition_range_boundary : partition_range_boundaries_) {
serialization::PartitionValues *proto_range_boundary =
proto.AddExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries);
for (const TypedValue &value : partition_range_boundary) {
proto_range_boundary->add_partition_values()->MergeFrom(value.getProto());
}
}
return proto;
}
} // namespace quickstep