blob: 475d9d0ede897e33f79af00972573d472463be79 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/common/partition.h"
#include <algorithm>
#include <cstring>
#include <iomanip>
#include <iostream>
#include <iterator>
#include <memory>
#include <set>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/key_encoder.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/row_operations.pb.h"
#include "kudu/common/types.h"
#include "kudu/gutil/endian.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/decimal_util.h"
#include "kudu/util/hash_util.h"
#include "kudu/util/int128.h"
#include "kudu/util/logging.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/url-coding.h"
using google::protobuf::RepeatedPtrField;
using kudu::pb_util::SecureDebugString;
using std::pair;
using std::set;
using std::string;
using std::vector;
using strings::Substitute;
using strings::SubstituteAndAppend;
namespace kudu {
class faststring;
// The encoded size of a hash bucket in a partition key.
static const size_t kEncodedBucketSize = sizeof(uint32_t);
bool Partition::operator==(const Partition& rhs) const {
if (this == &rhs) {
return true;
}
if (begin_ != rhs.begin_) {
return false;
}
if (end_ != rhs.end_) {
return false;
}
if (hash_buckets_ != rhs.hash_buckets_) {
return false;
}
return true;
}
void Partition::ToPB(PartitionPB* pb) const {
pb->Clear();
pb->mutable_hash_buckets()->Reserve(hash_buckets_.size());
for (int32_t bucket : hash_buckets_) {
pb->add_hash_buckets(bucket);
}
pb->set_partition_key_start(begin().ToString());
pb->set_partition_key_end(end().ToString());
}
void Partition::FromPB(const PartitionPB& pb, Partition* partition) {
partition->hash_buckets_.clear();
partition->hash_buckets_.reserve(pb.hash_buckets_size());
for (int32_t hash_bucket : pb.hash_buckets()) {
partition->hash_buckets_.push_back(hash_bucket);
}
const size_t hash_buckets_num = partition->hash_buckets_.size();
partition->begin_ = StringToPartitionKey(
pb.partition_key_start(), hash_buckets_num);
partition->end_ = StringToPartitionKey(
pb.partition_key_end(), hash_buckets_num);
// Check the invariant: valid partition cannot span over multiple sets
// of hash buckets.
//DCHECK(partition->begin_.empty() || partition->end_.empty() ||
// partition->begin_.hash_key() == partition->end_.hash_key());
}
PartitionKey Partition::StringToPartitionKey(const std::string& key_str,
size_t hash_dimensions_num) {
if (hash_dimensions_num == 0) {
return PartitionKey("", key_str);
}
const size_t hash_part_size = kEncodedBucketSize * hash_dimensions_num;
if (key_str.size() <= hash_part_size) {
return PartitionKey(key_str, "");
}
return PartitionKey(key_str.substr(0, hash_part_size),
key_str.substr(hash_part_size));
}
string PartitionKey::DebugString() const {
std::ostringstream ss;
ss << "h:";
for (size_t i = 0; i < hash_key_.size(); ++i) {
ss << std::hex << std::setw(2) << std::setfill('0')
<< static_cast<uint16_t>(hash_key_[i]);
}
ss << " r:";
for (size_t i = 0; i < range_key_.size(); ++i) {
ss << std::hex << std::setw(2) << std::setfill('0')
<< static_cast<uint16_t>(range_key_[i]);
}
return ss.str();
}
std::ostream& operator<<(std::ostream& out, const PartitionKey& key) {
out << key.DebugString();
return out;
}
namespace {
// Extracts the column IDs from a protobuf repeated field of column identifiers.
Status ExtractColumnIds(const RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>& identifiers,
const Schema& schema,
vector<ColumnId>* column_ids) {
vector<ColumnId> new_column_ids;
new_column_ids.reserve(identifiers.size());
for (const auto& identifier : identifiers) {
switch (identifier.identifier_case()) {
case PartitionSchemaPB_ColumnIdentifierPB::kId: {
ColumnId column_id(identifier.id());
if (schema.find_column_by_id(column_id) == Schema::kColumnNotFound) {
return Status::InvalidArgument("unknown column id", SecureDebugString(identifier));
}
new_column_ids.emplace_back(std::move(column_id));
continue;
}
case PartitionSchemaPB_ColumnIdentifierPB::kName: {
int32_t column_idx = schema.find_column(identifier.name());
if (column_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument("unknown column", SecureDebugString(identifier));
}
new_column_ids.emplace_back(schema.column_id(column_idx));
continue;
}
default: return Status::InvalidArgument("unknown column", SecureDebugString(identifier));
}
}
*column_ids = std::move(new_column_ids);
return Status::OK();
}
// Sets a repeated field of column identifiers to the provided column IDs.
void SetColumnIdentifiers(const vector<ColumnId>& column_ids,
RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>* identifiers) {
identifiers->Reserve(column_ids.size());
for (const ColumnId& column_id : column_ids) {
identifiers->Add()->set_id(column_id);
}
}
} // anonymous namespace
Status PartitionSchema::ExtractHashSchemaFromPB(
const Schema& schema,
const RepeatedPtrField<PartitionSchemaPB_HashBucketSchemaPB>& hash_schema_pb,
HashSchema* hash_schema) {
for (const auto& hash_dimension_pb : hash_schema_pb) {
HashDimension hash_dimension;
RETURN_NOT_OK(ExtractColumnIds(
hash_dimension_pb.columns(), schema, &hash_dimension.column_ids));
// Hashing is column-order dependent, so sort the column_ids to ensure that
// hash components with the same columns hash consistently. This is
// important when deserializing a user-supplied partition schema during
// table creation; after that the columns should remain in sorted order.
std::sort(hash_dimension.column_ids.begin(), hash_dimension.column_ids.end());
hash_dimension.seed = hash_dimension_pb.seed();
hash_dimension.num_buckets = hash_dimension_pb.num_buckets();
hash_schema->push_back(std::move(hash_dimension));
}
return Status::OK();
}
Status PartitionSchema::FromPB(
const PartitionSchemaPB& pb,
const Schema& schema,
PartitionSchema* partition_schema,
RangesWithHashSchemas* ranges_with_hash_schemas) {
partition_schema->Clear();
RETURN_NOT_OK(ExtractHashSchemaFromPB(
schema, pb.hash_schema(), &partition_schema->hash_schema_));
const auto custom_ranges_num = pb.custom_hash_schema_ranges_size();
vector<HashSchema> range_hash_schemas;
range_hash_schemas.resize(custom_ranges_num);
vector<pair<KuduPartialRow, KuduPartialRow>> range_bounds;
for (auto i = 0; i < custom_ranges_num; ++i) {
const auto& range = pb.custom_hash_schema_ranges(i);
RETURN_NOT_OK(ExtractHashSchemaFromPB(
schema, range.hash_schema(), &range_hash_schemas[i]));
RowOperationsPBDecoder decoder(&range.range_bounds(), &schema, &schema, nullptr);
vector<DecodedRowOperation> ops;
RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
if (ops.size() != 2) {
return Status::InvalidArgument(Substitute(
"$0 ops were provided; only two ops are expected "
"for this pair of range bounds", ops.size()));
}
const DecodedRowOperation& op1 = ops[0];
const DecodedRowOperation& op2 = ops[1];
switch (op1.type) {
case RowOperationsPB::RANGE_LOWER_BOUND:
case RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND: {
if (op2.type != RowOperationsPB::RANGE_UPPER_BOUND &&
op2.type != RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
return Status::InvalidArgument("missing upper range bound in request");
}
// Lower bound range partition keys are inclusive and upper bound range partition keys
// are exclusive by design. If the provided keys are not of this format, these keys
// will be transformed to their proper format.
if (op1.type == RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) {
RETURN_NOT_OK(partition_schema->MakeLowerBoundRangePartitionKeyInclusive(
op1.split_row.get()));
}
if (op2.type == RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
RETURN_NOT_OK(partition_schema->MakeUpperBoundRangePartitionKeyExclusive(
op2.split_row.get()));
}
range_bounds.emplace_back(*op1.split_row, *op2.split_row);
break;
}
default:
return Status::InvalidArgument(
Substitute("Illegal row operation type in request: $0", op1.type));
}
}
if (pb.has_range_schema()) {
const PartitionSchemaPB_RangeSchemaPB& range_pb = pb.range_schema();
RETURN_NOT_OK(ExtractColumnIds(range_pb.columns(), schema,
&partition_schema->range_schema_.column_ids));
} else {
// Fill in the default range partition (PK columns).
// like the sorting above, this should only happen during table creation
// while deserializing the user-provided partition schema.
for (size_t column_idx = 0; column_idx < schema.num_key_columns(); ++column_idx) {
partition_schema->range_schema_.column_ids.push_back(schema.column_id(column_idx));
}
}
RangesWithHashSchemas result_ranges_with_hash_schemas;
if (!range_bounds.empty()) {
RETURN_NOT_OK(partition_schema->EncodeRangeBounds(
range_bounds, range_hash_schemas, schema,
&result_ranges_with_hash_schemas));
}
if (range_bounds.size() != result_ranges_with_hash_schemas.size()) {
return Status::InvalidArgument(Substitute("the number of range bounds "
"($0) differs from the number ranges with hash schemas ($1)",
range_bounds.size(), result_ranges_with_hash_schemas.size()));
}
auto& ranges_with_custom_hash_schemas =
partition_schema->ranges_with_custom_hash_schemas_;
const auto& table_wide_hash_schema = partition_schema->hash_schema_;
ranges_with_custom_hash_schemas.reserve(result_ranges_with_hash_schemas.size());
for (const auto& elem : result_ranges_with_hash_schemas) {
if (elem.hash_schema != table_wide_hash_schema) {
ranges_with_custom_hash_schemas.emplace_back(elem);
}
}
// Sort the ranges.
constexpr struct {
bool operator()(const PartitionSchema::RangeWithHashSchema& lhs,
const PartitionSchema::RangeWithHashSchema& rhs) const {
return lhs.lower < rhs.lower;
}
} rangeLess;
sort(ranges_with_custom_hash_schemas.begin(),
ranges_with_custom_hash_schemas.end(),
rangeLess);
auto& dict = partition_schema->hash_schema_idx_by_encoded_range_start_;
for (auto it = ranges_with_custom_hash_schemas.cbegin();
it != ranges_with_custom_hash_schemas.cend(); ++it) {
InsertOrDie(&dict, it->lower, std::distance(
ranges_with_custom_hash_schemas.cbegin(), it));
}
DCHECK_EQ(ranges_with_custom_hash_schemas.size(), dict.size());
RETURN_NOT_OK(partition_schema->Validate(schema));
if (ranges_with_hash_schemas) {
*ranges_with_hash_schemas = std::move(result_ranges_with_hash_schemas);
}
return Status::OK();
}
Status PartitionSchema::ToPB(const Schema& schema, PartitionSchemaPB* pb) const {
pb->Clear();
pb->mutable_hash_schema()->Reserve(hash_schema_.size());
for (const auto& hash_dimension : hash_schema_) {
auto* hash_dimension_pb = pb->add_hash_schema();
SetColumnIdentifiers(hash_dimension.column_ids,
hash_dimension_pb->mutable_columns());
hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
hash_dimension_pb->set_seed(hash_dimension.seed);
}
if (!ranges_with_custom_hash_schemas_.empty()) {
// Consumers of this method does not expect the information
// on ranges with the table-wide hash schema persisted into the
// 'custom_hash_schema_ranges' field.
pb->mutable_custom_hash_schema_ranges()->Reserve(
ranges_with_custom_hash_schemas_.size());
Arena arena(256);
for (const auto& range_hash_schema : ranges_with_custom_hash_schemas_) {
if (range_hash_schema.hash_schema == hash_schema_) {
continue;
}
auto* range_pb = pb->add_custom_hash_schema_ranges();
arena.Reset();
RowOperationsPBEncoder encoder(range_pb->mutable_range_bounds());
KuduPartialRow lower(&schema);
KuduPartialRow upper(&schema);
Slice s_lower = Slice(range_hash_schema.lower);
Slice s_upper = Slice(range_hash_schema.upper);
RETURN_NOT_OK(DecodeRangeKey(&s_lower, &lower, &arena));
RETURN_NOT_OK(DecodeRangeKey(&s_upper, &upper, &arena));
encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
range_pb->mutable_hash_schema()->Reserve(
range_hash_schema.hash_schema.size());
for (const auto& hash_dimension : range_hash_schema.hash_schema) {
auto* hash_dimension_pb = range_pb->add_hash_schema();
SetColumnIdentifiers(hash_dimension.column_ids,
hash_dimension_pb->mutable_columns());
hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
hash_dimension_pb->set_seed(hash_dimension.seed);
}
}
}
SetColumnIdentifiers(range_schema_.column_ids,
pb->mutable_range_schema()->mutable_columns());
return Status::OK();
}
template<typename Row>
void PartitionSchema::EncodeKeyImpl(const Row& row,
string* range_buf,
string* hash_buf) const {
DCHECK(range_buf);
DCHECK(hash_buf);
string range_key;
EncodeColumns(row, range_schema_.column_ids, &range_key);
const auto& hash_schema = GetHashSchemaForRange(range_key);
const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
string hash_key;
for (const auto& hash_dimension : hash_schema) {
const auto bucket = HashValueForRow(row, hash_dimension);
hash_encoder.Encode(&bucket, &hash_key);
}
*range_buf = std::move(range_key);
*hash_buf = std::move(hash_key);
}
PartitionKey PartitionSchema::EncodeKey(const KuduPartialRow& row) const {
string encoded_range;
string encoded_hash;
EncodeKeyImpl(row, &encoded_range, &encoded_hash);
return PartitionKey(std::move(encoded_hash), std::move(encoded_range));
}
PartitionKey PartitionSchema::EncodeKey(const ConstContiguousRow& row) const {
string encoded_range;
string encoded_hash;
EncodeKeyImpl(row, &encoded_range, &encoded_hash);
return PartitionKey(std::move(encoded_hash), std::move(encoded_range));
}
Status PartitionSchema::EncodeRangeKey(const KuduPartialRow& row,
const Schema& schema,
string* key) const {
DCHECK(key->empty());
bool contains_no_columns = true;
for (size_t column_idx = 0; column_idx < schema.num_columns(); ++column_idx) {
const ColumnSchema& column = schema.column(column_idx);
if (row.IsColumnSet(column_idx)) {
if (std::find(range_schema_.column_ids.begin(),
range_schema_.column_ids.end(),
schema.column_id(column_idx)) != range_schema_.column_ids.end()) {
contains_no_columns = false;
} else {
return Status::InvalidArgument(
"split rows may only contain values for range partition columns", column.name());
}
}
}
if (contains_no_columns) {
return Status::OK();
}
EncodeColumns(row, range_schema_.column_ids, key);
return Status::OK();
}
Status PartitionSchema::EncodeRangeSplits(const vector<KuduPartialRow>& split_rows,
const Schema& schema,
vector<string>* splits) const {
DCHECK(splits->empty());
if (split_rows.empty()) {
return Status::OK();
}
for (const KuduPartialRow& row : split_rows) {
string split;
RETURN_NOT_OK(EncodeRangeKey(row, schema, &split));
if (split.empty()) {
return Status::InvalidArgument(
"split rows must contain a value for at least one range partition column");
}
splits->emplace_back(std::move(split));
}
std::sort(splits->begin(), splits->end());
auto unique_end = std::unique(splits->begin(), splits->end());
if (unique_end != splits->end()) {
return Status::InvalidArgument("duplicate split row", RangeKeyDebugString(*unique_end, schema));
}
return Status::OK();
}
Status PartitionSchema::EncodeRangeBounds(
const vector<pair<KuduPartialRow, KuduPartialRow>>& range_bounds,
const vector<HashSchema>& range_hash_schemas,
const Schema& schema,
RangesWithHashSchemas* bounds_with_hash_schemas) const {
DCHECK(bounds_with_hash_schemas);
auto& bounds_whs = *bounds_with_hash_schemas;
DCHECK(bounds_whs.empty());
if (range_bounds.empty()) {
bounds_whs.emplace_back(RangeWithHashSchema{"", "", hash_schema_});
return Status::OK();
}
if (!range_hash_schemas.empty() &&
range_hash_schemas.size() != range_bounds.size()) {
return Status::InvalidArgument(Substitute(
"$0 vs $1: per range hash schemas and range bounds "
"must have the same size",
range_hash_schemas.size(), range_bounds.size()));
}
size_t j = 0;
for (const auto& bound : range_bounds) {
string lower;
RETURN_NOT_OK(EncodeRangeKey(bound.first, schema, &lower));
string upper;
RETURN_NOT_OK(EncodeRangeKey(bound.second, schema, &upper));
if (!lower.empty() && !upper.empty() && lower >= upper) {
return Status::InvalidArgument(
"range partition lower bound must be less than the upper bound",
RangePartitionDebugString(bound.first, bound.second));
}
RangeWithHashSchema temp{std::move(lower), std::move(upper), hash_schema_};
if (!range_hash_schemas.empty()) {
temp.hash_schema = range_hash_schemas[j++];
}
bounds_whs.emplace_back(std::move(temp));
}
std::sort(bounds_whs.begin(), bounds_whs.end(),
[](const RangeWithHashSchema& s1, const RangeWithHashSchema& s2) {
return s1.lower < s2.lower;
});
// Check that the range bounds are non-overlapping.
for (size_t i = 0; i + 1 < bounds_whs.size(); ++i) {
const string& first_upper = bounds_whs[i].upper;
const string& second_lower = bounds_whs[i + 1].lower;
if (first_upper.empty() || second_lower.empty() ||
first_upper > second_lower) {
return Status::InvalidArgument(
"overlapping range partitions",
Substitute(
"first range partition: $0, second range partition: $1",
RangePartitionDebugString(bounds_whs[i].lower,
bounds_whs[i].upper,
schema),
RangePartitionDebugString(bounds_whs[i + 1].lower,
bounds_whs[i + 1].upper,
schema)));
}
}
return Status::OK();
}
Status PartitionSchema::SplitRangeBounds(
const Schema& schema,
const vector<string>& splits,
RangesWithHashSchemas* bounds_with_hash_schemas) const {
DCHECK(bounds_with_hash_schemas);
if (splits.empty()) {
return Status::OK();
}
const auto expected_bounds =
std::max(1UL, bounds_with_hash_schemas->size()) + splits.size();
RangesWithHashSchemas new_bounds_with_hash_schemas;
new_bounds_with_hash_schemas.reserve(expected_bounds);
// Iterate through the sorted bounds and sorted splits, splitting the bounds
// as appropriate and adding them to the result list ('new_bounds').
auto split = splits.begin();
for (auto& bound : *bounds_with_hash_schemas) {
string& lower = bound.lower;
const string& upper = bound.upper;
for (; split != splits.end() && (upper.empty() || *split <= upper); ++split) {
if (!lower.empty() && *split < lower) {
return Status::InvalidArgument("split out of bounds", RangeKeyDebugString(*split, schema));
}
if (lower == *split || upper == *split) {
return Status::InvalidArgument("split matches lower or upper bound",
RangeKeyDebugString(*split, schema));
}
// Split the current bound. Add the lower section to the result list,
// and continue iterating on the upper section.
new_bounds_with_hash_schemas.emplace_back(
RangeWithHashSchema{std::move(lower), *split, hash_schema_});
lower = *split;
}
new_bounds_with_hash_schemas.emplace_back(
RangeWithHashSchema{std::move(lower), upper, hash_schema_});
}
if (split != splits.end()) {
return Status::InvalidArgument("split out of bounds", RangeKeyDebugString(*split, schema));
}
bounds_with_hash_schemas->swap(new_bounds_with_hash_schemas);
CHECK_EQ(expected_bounds, bounds_with_hash_schemas->size());
return Status::OK();
}
Status PartitionSchema::CreatePartitions(
const vector<KuduPartialRow>& split_rows,
const vector<pair<KuduPartialRow, KuduPartialRow>>& range_bounds,
const Schema& schema,
vector<Partition>* partitions) const {
DCHECK(partitions);
RETURN_NOT_OK(CheckRangeSchema(schema));
RangesWithHashSchemas bounds_with_hash_schemas;
RETURN_NOT_OK(EncodeRangeBounds(range_bounds, {}, schema,
&bounds_with_hash_schemas));
vector<string> splits;
RETURN_NOT_OK(EncodeRangeSplits(split_rows, schema, &splits));
RETURN_NOT_OK(SplitRangeBounds(schema, splits, &bounds_with_hash_schemas));
const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
const auto base_hash_partitions =
GenerateHashPartitions(hash_schema_, hash_encoder);
// Even if no hash partitioning for a table is specified, there must be at
// least one element in 'base_hash_partitions': it's used to build the result
// set of range partitions.
DCHECK_GE(base_hash_partitions.size(), 1);
// The case of a single element in base_hash_partitions is a special case.
DCHECK(base_hash_partitions.size() > 1 ||
base_hash_partitions.front().hash_buckets().empty());
// Create a partition per range bound and hash bucket combination.
vector<Partition> new_partitions;
for (const Partition& base_partition : base_hash_partitions) {
for (const auto& bound : bounds_with_hash_schemas) {
Partition p = base_partition;
p.begin_.mutable_range_key()->append(bound.lower);
p.end_.mutable_range_key()->append(bound.upper);
new_partitions.emplace_back(std::move(p));
}
}
UpdatePartitionBoundaries(hash_encoder, &new_partitions);
*partitions = std::move(new_partitions);
return Status::OK();
}
Status PartitionSchema::CreatePartitions(
const RangesWithHashSchemas& ranges_with_hash_schemas,
const Schema& schema,
vector<Partition>* partitions) const {
DCHECK(partitions);
RETURN_NOT_OK(CheckRangeSchema(schema));
const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
vector<Partition> result_partitions;
// Iterate through each bound and its hash schemas to generate hash partitions.
for (size_t i = 0; i < ranges_with_hash_schemas.size(); ++i) {
const auto& range = ranges_with_hash_schemas[i];
vector<Partition> current_bound_hash_partitions = GenerateHashPartitions(
range.hash_schema, hash_encoder);
// Add range information to the partition key.
for (Partition& p : current_bound_hash_partitions) {
DCHECK(p.begin_.range_key().empty()) << p.begin_.DebugString();
p.begin_.mutable_range_key()->assign(range.lower);
DCHECK(p.end().range_key().empty());
p.end_.mutable_range_key()->assign(range.upper);
}
result_partitions.insert(result_partitions.end(),
std::make_move_iterator(current_bound_hash_partitions.begin()),
std::make_move_iterator(current_bound_hash_partitions.end()));
}
UpdatePartitionBoundaries(hash_encoder, &result_partitions);
*partitions = std::move(result_partitions);
return Status::OK();
}
Status PartitionSchema::CreatePartitionsForRange(
const pair<KuduPartialRow, KuduPartialRow>& range_bound,
const HashSchema& range_hash_schema,
const Schema& schema,
std::vector<Partition>* partitions) const {
RETURN_NOT_OK(CheckRangeSchema(schema));
RangesWithHashSchemas ranges_with_hash_schemas;
RETURN_NOT_OK(EncodeRangeBounds(
{range_bound}, {range_hash_schema}, schema, &ranges_with_hash_schemas));
DCHECK_EQ(1, ranges_with_hash_schemas.size());
const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
const auto& range = ranges_with_hash_schemas.front();
vector<Partition> result_partitions = GenerateHashPartitions(
range.hash_schema, hash_encoder);
// Add range information to the partition key.
for (Partition& p : result_partitions) {
DCHECK(p.begin_.range_key().empty()) << p.begin_.DebugString();
p.begin_.mutable_range_key()->assign(range.lower);
DCHECK(p.end().range_key().empty());
p.end_.mutable_range_key()->assign(range.upper);
UpdatePartitionBoundaries(hash_encoder, range_hash_schema, &p);
}
*partitions = std::move(result_partitions);
return Status::OK();
}
template<typename Row>
bool PartitionSchema::PartitionContainsRowImpl(const Partition& partition,
const Row& row) const {
string range_key;
EncodeColumns(row, range_schema_.column_ids, &range_key);
const auto& hash_schema = GetHashSchemaForRange(range_key);
for (size_t i = 0; i < hash_schema.size(); ++i) {
if (!HashPartitionContainsRowImpl(partition, row, hash_schema, i)) {
return false;
}
}
return RangePartitionContainsRowImpl(partition, row);
}
template<typename Row>
bool PartitionSchema::HashPartitionContainsRowImpl(
const Partition& partition,
const Row& row,
const HashSchema& hash_schema,
int hash_value) const {
DCHECK_GE(hash_value, 0);
DCHECK_LT(hash_value, hash_schema.size());
DCHECK_EQ(partition.hash_buckets().size(), hash_schema.size());
const HashDimension& hash_dimension = hash_schema[hash_value];
const auto bucket = HashValueForRow(row, hash_dimension);
return partition.hash_buckets()[hash_value] == bucket;
}
template<typename Row>
bool PartitionSchema::RangePartitionContainsRowImpl(
const Partition& partition, const Row& row) const {
// If range partition is not used, column_ids would be empty and
// EncodedColumn() would return immediately.
string key;
EncodeColumns(row, range_schema_.column_ids, &key);
// When all hash buckets match, then the row is contained in the partition
// if the row's key is greater or equal to the lower bound, and if there is
// either no upper bound or the row's key is less than the upper bound.
return
(partition.begin().range_key() <= key) &&
(partition.end().range_key().empty() || key < partition.end().range_key());
}
bool PartitionSchema::PartitionContainsRow(const Partition& partition,
const KuduPartialRow& row) const {
return PartitionContainsRowImpl(partition, row);
}
bool PartitionSchema::PartitionContainsRow(const Partition& partition,
const ConstContiguousRow& row) const {
return PartitionContainsRowImpl(partition, row);
}
bool PartitionSchema::PartitionMayContainRow(const Partition& partition,
const KuduPartialRow& row) const {
// It's the fast and 100% sure path when the row has the primary key set.
if (row.IsKeySet()) {
return PartitionContainsRow(partition, row);
}
const Schema* schema = row.schema();
vector<ColumnId> set_column_ids;
set_column_ids.reserve(schema->num_key_columns());
for (size_t idx = 0; idx < schema->num_key_columns(); ++idx) {
DCHECK(schema->is_key_column(idx));
if (row.IsColumnSet(idx) && !row.IsNull(idx)) {
set_column_ids.emplace_back(schema->column_id(idx));
if (set_column_ids.size() > 1) {
// No futher optimizations in this case for a while.
// NOTE: This might be a false positive.
return true;
}
}
}
if (set_column_ids.empty()) {
return false;
}
DCHECK_EQ(1, set_column_ids.size());
const auto& single_column_id = set_column_ids[0];
if (range_schema_.column_ids.size() == 1 &&
range_schema_.column_ids[0] == single_column_id &&
!RangePartitionContainsRow(partition, row)) {
return false;
}
string range_key;
EncodeColumns(row, range_schema_.column_ids, &range_key);
const auto& hash_schema = GetHashSchemaForRange(range_key);
for (size_t i = 0; i < hash_schema.size(); ++i) {
const auto& hash_dimension = hash_schema[i];
if (hash_dimension.column_ids.size() == 1 &&
hash_dimension.column_ids[0] == single_column_id &&
!HashPartitionContainsRowImpl(partition, row, hash_schema, i)) {
return false;
}
}
// NOTE: This might be a false positive.
return true;
}
bool PartitionSchema::RangePartitionContainsRow(
const Partition& partition, const KuduPartialRow& row) const {
return RangePartitionContainsRowImpl(partition, row);
}
Status PartitionSchema::DecodeRangeKey(Slice* encoded_key,
KuduPartialRow* partial_row,
Arena* arena) const {
ContiguousRow cont_row(partial_row->schema(), partial_row->row_data_);
for (int i = 0; i < range_schema_.column_ids.size(); i++) {
if (encoded_key->empty()) {
// This can happen when decoding partition start and end keys, since they
// are truncated to simulate absolute upper and lower bounds.
break;
}
int32_t column_idx = partial_row->schema()->find_column_by_id(range_schema_.column_ids[i]);
const ColumnSchema& column = partial_row->schema()->column(column_idx);
const KeyEncoder<faststring>& key_encoder = GetKeyEncoder<faststring>(column.type_info());
bool is_last = i == (range_schema_.column_ids.size() - 1);
// Decode the column.
RETURN_NOT_OK_PREPEND(key_encoder.Decode(encoded_key,
is_last,
arena,
cont_row.mutable_cell_ptr(column_idx)),
Substitute("Error decoding partition key range component '$0'",
column.name()));
// Mark the column as set.
BitmapSet(partial_row->isset_bitmap_, column_idx);
if (column.type_info()->physical_type() == BINARY) {
// Copy cell value into the 'partial_row', because in the decoding process above, we just make
// row data a pointer to the memory allocated by arena.
partial_row->Set(column_idx, cont_row.cell_ptr(column_idx));
}
}
if (!encoded_key->empty()) {
return Status::InvalidArgument("unable to fully decode range key",
KUDU_REDACT(encoded_key->ToDebugString()));
}
return Status::OK();
}
// Decodes a slice of a partition key into the buckets. The slice is modified to
// remove the hash components.
Status PartitionSchema::DecodeHashBuckets(Slice* encoded_key,
vector<int32_t>* buckets) const {
size_t hash_components_size = kEncodedBucketSize * hash_schema_.size();
if (encoded_key->size() < hash_components_size) {
return Status::InvalidArgument(
Substitute("expected encoded hash key to be at least $0 bytes (only found $1)",
hash_components_size, encoded_key->size()));
}
for (const auto& _ : hash_schema_) {
(void) _; // quiet unused variable warning
uint32_t big_endian;
memcpy(&big_endian, encoded_key->data(), sizeof(uint32_t));
buckets->push_back(BigEndian::ToHost32(big_endian));
encoded_key->remove_prefix(sizeof(uint32_t));
}
return Status::OK();
}
bool PartitionSchema::IsRangePartitionKeyEmpty(const KuduPartialRow& row) const {
ConstContiguousRow const_row(row.schema(), row.row_data_);
for (const ColumnId& column_id : range_schema_.column_ids) {
if (row.IsColumnSet(row.schema()->find_column_by_id(column_id))) return false;
}
return true;
}
void PartitionSchema::AppendRangeDebugStringComponentsOrMin(const KuduPartialRow& row,
vector<string>* components) const {
ConstContiguousRow const_row(row.schema(), row.row_data_);
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t column_idx = row.schema()->find_column_by_id(column_id);
if (column_idx == Schema::kColumnNotFound) {
components->emplace_back("<unknown-column>");
continue;
}
const ColumnSchema& column_schema = row.schema()->column(column_idx);
if (!row.IsColumnSet(column_idx)) {
uint8_t min_value[kLargestTypeSize];
column_schema.type_info()->CopyMinValue(&min_value);
SimpleConstCell cell(&column_schema, &min_value);
components->emplace_back(column_schema.Stringify(cell.ptr()));
} else {
components->emplace_back(column_schema.Stringify(const_row.cell_ptr(column_idx)));
}
}
}
namespace {
// Converts a list of column IDs to a string with the column names seperated by
// a comma character.
string ColumnIdsToColumnNames(const Schema& schema,
const vector<ColumnId>& column_ids) {
vector<string> names;
names.reserve(column_ids.size());
for (const ColumnId& column_id : column_ids) {
names.push_back(schema.column(schema.find_column_by_id(column_id)).name());
}
return JoinStrings(names, ", ");
}
} // namespace
string PartitionSchema::PartitionDebugString(const Partition& partition,
const Schema& schema,
HashPartitionInfo hp) const {
// Partitions are considered metadata, so don't redact them.
ScopedDisableRedaction no_redaction;
const auto& hash_schema = GetHashSchemaForRange(partition.begin().range_key());
if (partition.hash_buckets_.size() != hash_schema.size()) {
return Substitute("<hash-partition-error: $0 vs $1 expected buckets>",
partition.hash_buckets_.size(), hash_schema.size());
}
vector<string> components;
if (hp == HashPartitionInfo::SHOW) {
for (size_t i = 0; i < hash_schema.size(); ++i) {
string s = Substitute("HASH ($0) PARTITION $1",
ColumnIdsToColumnNames(schema, hash_schema[i].column_ids),
partition.hash_buckets_[i]);
components.emplace_back(std::move(s));
}
}
if (!range_schema_.column_ids.empty()) {
string s = Substitute("RANGE ($0) PARTITION $1",
ColumnIdsToColumnNames(schema, range_schema_.column_ids),
RangePartitionDebugString(partition.begin().range_key(),
partition.end().range_key(),
schema));
components.emplace_back(std::move(s));
}
return JoinStrings(components, ", ");
}
template<typename Row>
string PartitionSchema::PartitionKeyDebugStringImpl(const Row& row) const {
string range_key;
EncodeColumns(row, range_schema_.column_ids, &range_key);
const auto& hash_schema = GetHashSchemaForRange(range_key);
vector<string> components;
components.reserve(hash_schema.size() + range_schema_.column_ids.size());
for (const auto& hash_dimension : hash_schema) {
components.emplace_back(Substitute(
"HASH ($0): $1",
ColumnIdsToColumnNames(*row.schema(), hash_dimension.column_ids),
HashValueForRow(row, hash_dimension)));
}
if (!range_schema_.column_ids.empty()) {
components.emplace_back(
Substitute("RANGE ($0): $1",
ColumnIdsToColumnNames(*row.schema(), range_schema_.column_ids),
RangeKeyDebugString(row)));
}
return JoinStrings(components, ", ");
}
template
string PartitionSchema::PartitionKeyDebugStringImpl(const KuduPartialRow& row) const;
template
string PartitionSchema::PartitionKeyDebugStringImpl(const ConstContiguousRow& row) const;
string PartitionSchema::PartitionKeyDebugString(const ConstContiguousRow& row) const {
return PartitionKeyDebugStringImpl(row);
}
string PartitionSchema::PartitionKeyDebugString(const KuduPartialRow& row) const {
return PartitionKeyDebugStringImpl(row);
}
string PartitionSchema::PartitionKeyDebugString(const PartitionKey& key,
const Schema& schema) const {
const auto& hash_schema = GetHashSchemaForRange(key.range_key());
Slice hash_key(key.hash_key());
if (hash_key.size() != kEncodedBucketSize * hash_schema.size()) {
return "<hash-decode-error>";
}
vector<string> components;
components.reserve(hash_schema.size() + 1);
for (const auto& hash_dimension : hash_schema) {
uint32_t big_endian;
memcpy(&big_endian, hash_key.data(), kEncodedBucketSize);
hash_key.remove_prefix(kEncodedBucketSize);
components.emplace_back(
Substitute("HASH ($0): $1",
ColumnIdsToColumnNames(schema, hash_dimension.column_ids),
BigEndian::ToHost32(big_endian)));
}
DCHECK(hash_key.empty());
const bool has_ranges = !range_schema_.column_ids.empty();
if (has_ranges) {
Slice range_key(key.range_key());
components.emplace_back(
Substitute("RANGE ($0): $1",
ColumnIdsToColumnNames(schema, range_schema_.column_ids),
RangeKeyDebugString(range_key, schema)));
}
return JoinStrings(components, ", ");
}
string PartitionSchema::RangePartitionDebugString(const KuduPartialRow& lower_bound,
const KuduPartialRow& upper_bound) const {
// Partitions are considered metadata, so don't redact them.
ScopedDisableRedaction no_redaction;
bool lower_unbounded = IsRangePartitionKeyEmpty(lower_bound);
bool upper_unbounded = IsRangePartitionKeyEmpty(upper_bound);
if (lower_unbounded && upper_unbounded) {
return "UNBOUNDED";
}
if (lower_unbounded) {
return Substitute("VALUES < $0", RangeKeyDebugString(upper_bound));
}
if (upper_unbounded) {
return Substitute("$0 <= VALUES", RangeKeyDebugString(lower_bound));
}
// TODO(dan): recognize when a simplified 'VALUE =' form can be used (see
// org.apache.kudu.client.Partition#formatRangePartition).
return Substitute("$0 <= VALUES < $1",
RangeKeyDebugString(lower_bound),
RangeKeyDebugString(upper_bound));
}
string PartitionSchema::RangePartitionDebugString(Slice lower_bound,
Slice upper_bound,
const Schema& schema) const {
// Partitions are considered metadata, so don't redact them.
ScopedDisableRedaction no_redaction;
Arena arena(256);
KuduPartialRow lower(&schema);
KuduPartialRow upper(&schema);
Status s = DecodeRangeKey(&lower_bound, &lower, &arena);
if (!s.ok()) {
return Substitute("<range-key-decode-error: $0>", s.ToString());
}
s = DecodeRangeKey(&upper_bound, &upper, &arena);
if (!s.ok()) {
return Substitute("<range-key-decode-error: $0>", s.ToString());
}
return RangePartitionDebugString(lower, upper);
}
string PartitionSchema::RangeWithCustomHashPartitionDebugString(Slice lower_bound,
Slice upper_bound,
const Schema& schema) const {
// Partitions are considered metadata, so don't redact them.
ScopedDisableRedaction no_redaction;
HashSchema hash_schema = GetHashSchemaForRange(lower_bound.ToString());
KuduPartialRow lower(&schema);
Arena arena(256);
// Decode the lower and upper bounds
Status s = DecodeRangeKey(&lower_bound, &lower, &arena);
if (!s.ok()) {
return Substitute("<range-key-decode-error: $0>", s.ToString());
}
KuduPartialRow upper(&schema);
s = DecodeRangeKey(&upper_bound, &upper, &arena);
if (!s.ok()) {
return Substitute("<range-key-decode-error: $0>", s.ToString());
}
// Get the range bounds information for the partition
bool lower_unbounded = IsRangePartitionKeyEmpty(lower);
bool upper_unbounded = IsRangePartitionKeyEmpty(upper);
string partition;
if (lower_unbounded && upper_unbounded) {
partition = "UNBOUNDED";
} else if (lower_unbounded) {
partition = Substitute("VALUES < $0", RangeKeyDebugString(upper));
} else if (upper_unbounded) {
partition = Substitute("$0 <= VALUES", RangeKeyDebugString(lower));
} else {
partition = Substitute("$0 <= VALUES < $1",
RangeKeyDebugString(lower),
RangeKeyDebugString(upper));
}
// Get the hash schema information for the partition
if (hash_schema != hash_schema_) {
for (const auto& hash_dimension: hash_schema) {
partition.append(Substitute(" HASH($0) PARTITIONS $1",
ColumnIdsToColumnNames(schema, hash_dimension.column_ids),
hash_dimension.num_buckets));
}
}
return partition;
}
string PartitionSchema::RangeKeyDebugString(Slice range_key, const Schema& schema) const {
Arena arena(256);
KuduPartialRow row(&schema);
Status s = DecodeRangeKey(&range_key, &row, &arena);
if (!s.ok()) {
return Substitute("<range-key-decode-error: $0>", s.ToString());
}
return RangeKeyDebugString(row);
}
string PartitionSchema::RangeKeyDebugString(const KuduPartialRow& key) const {
vector<string> components;
AppendRangeDebugStringComponentsOrMin(key, &components);
if (components.size() == 1) {
// Omit the parentheses if the range partition has a single column.
return components.back();
}
return Substitute("($0)", JoinStrings(components, ", "));
}
string PartitionSchema::RangeKeyDebugString(const ConstContiguousRow& key) const {
vector<string> components;
for (const ColumnId& column_id : range_schema_.column_ids) {
string column;
int32_t column_idx = key.schema()->find_column_by_id(column_id);
if (column_idx == Schema::kColumnNotFound) {
components.emplace_back("<unknown-column>");
break;
}
key.schema()->column(column_idx).DebugCellAppend(key.cell(column_idx), &column);
components.emplace_back(std::move(column));
}
if (components.size() == 1) {
// Omit the parentheses if the range partition has a single column.
return components.back();
}
return Substitute("($0)", JoinStrings(components, ", "));
}
vector<string> PartitionSchema::DebugStringComponents(const Schema& schema) const {
// TODO(aserbin): adapt this to custom hash schemas per range
vector<string> components;
for (const auto& hash_dimension : hash_schema_) {
string s;
SubstituteAndAppend(&s, "HASH ($0) PARTITIONS $1",
ColumnIdsToColumnNames(schema, hash_dimension.column_ids),
hash_dimension.num_buckets);
if (hash_dimension.seed != 0) {
SubstituteAndAppend(&s, " SEED $0", hash_dimension.seed);
}
components.emplace_back(std::move(s));
}
if (!range_schema_.column_ids.empty()) {
string s = Substitute("RANGE ($0)", ColumnIdsToColumnNames(
schema, range_schema_.column_ids));
components.emplace_back(std::move(s));
}
return components;
}
string PartitionSchema::DebugString(const Schema& schema) const {
return JoinStrings(DebugStringComponents(schema), ", ");
}
string PartitionSchema::DisplayString(const Schema& schema,
const vector<string>& range_partitions) const {
string display_string = JoinStrings(DebugStringComponents(schema), ",\n");
if (!range_schema_.column_ids.empty()) {
display_string.append(" (");
if (range_partitions.empty()) {
display_string.append(")");
} else {
bool is_first = true;
for (const string& range_partition : range_partitions) {
if (is_first) {
is_first = false;
} else {
display_string.push_back(',');
}
display_string.append("\n PARTITION ");
display_string.append(range_partition);
}
display_string.append("\n)");
}
}
return display_string;
}
string PartitionSchema::PartitionTableHeader(const Schema& schema) const {
string header;
if (!hash_schema_.empty()) {
header.append("<th>Hash Schema Type</th>");
header.append("<th>Hash Schema</th>");
header.append("<th>Hash Partition</th>");
}
if (!range_schema_.column_ids.empty()) {
SubstituteAndAppend(&header, "<th>RANGE ($0) PARTITION</th>",
EscapeForHtmlToString(
ColumnIdsToColumnNames(schema, range_schema_.column_ids)));
}
return header;
}
string PartitionSchema::PartitionTableEntry(const Schema& schema,
const Partition& partition) const {
string entry;
const auto* idx_ptr = FindOrNull(
hash_schema_idx_by_encoded_range_start_, partition.begin_.range_key());
if (idx_ptr) {
const auto& range = ranges_with_custom_hash_schemas_[*idx_ptr];
entry.append("<td>Range Specific</td>");
entry.append("<td>");
for (size_t i = 0; i < range.hash_schema.size(); i++) {
SubstituteAndAppend(&entry, "HASH ($0) PARTITIONS $1<br>",
EscapeForHtmlToString(ColumnIdsToColumnNames(
schema, range.hash_schema[i].column_ids)),
range.hash_schema[i].num_buckets);
}
entry.append("</td>");
entry.append("<td>");
for (size_t i = 0; i < range.hash_schema.size(); i++) {
SubstituteAndAppend(&entry, "HASH ($0) PARTITION: $1<br>",
EscapeForHtmlToString(ColumnIdsToColumnNames(
schema, range.hash_schema[i].column_ids)),
partition.hash_buckets_[i]);
}
entry.append("</td>");
} else {
entry.append("<td>Table Wide</td>");
entry.append("<td>");
for (size_t i = 0; i < hash_schema_.size(); i++) {
SubstituteAndAppend(&entry, "HASH ($0) PARTITIONS $1<br>",
EscapeForHtmlToString(ColumnIdsToColumnNames(
schema, hash_schema_[i].column_ids)),
hash_schema_[i].num_buckets);
}
entry.append("</td>");
entry.append("<td>");
for (size_t i = 0; i < hash_schema_.size(); i++) {
SubstituteAndAppend(&entry, "HASH ($0) PARTITION: $1<br>",
EscapeForHtmlToString(ColumnIdsToColumnNames(
schema, hash_schema_[i].column_ids)),
partition.hash_buckets_[i]);
}
entry.append("</td>");
}
if (!range_schema_.column_ids.empty()) {
SubstituteAndAppend(&entry, "<td>$0</td>",
EscapeForHtmlToString(
RangePartitionDebugString(partition.begin().range_key(),
partition.end().range_key(),
schema)));
}
return entry;
}
bool PartitionSchema::operator==(const PartitionSchema& rhs) const {
// TODO(aserbin): what about ranges_with_custom_hash_schemas_?
if (this == &rhs) {
return true;
}
// Compare range component.
if (range_schema_.column_ids != rhs.range_schema_.column_ids) {
return false;
}
const auto& lhs_ranges = ranges_with_custom_hash_schemas_;
const auto& rhs_ranges = rhs.ranges_with_custom_hash_schemas_;
if (hash_schema_.size() != rhs.hash_schema_.size() ||
lhs_ranges.size() != rhs_ranges.size()) {
return false;
}
// Compare table wide hash bucket schemas.
for (size_t i = 0; i < hash_schema_.size(); ++i) {
if (hash_schema_[i] != rhs.hash_schema_[i]) {
return false;
}
}
// Compare range bounds and per range hash bucket schemas.
for (size_t i = 0; i < lhs_ranges.size(); ++i) {
if (lhs_ranges[i].lower != rhs_ranges[i].lower ||
lhs_ranges[i].upper != rhs_ranges[i].upper) {
return false;
}
const auto& lhs_hash_schema = lhs_ranges[i].hash_schema;
const auto& rhs_hash_schema = rhs_ranges[i].hash_schema;
if (lhs_hash_schema.size() != rhs_hash_schema.size()) {
return false;
}
for (size_t j = 0; j < lhs_hash_schema.size(); ++j) {
if (lhs_hash_schema[j] != rhs_hash_schema[j]) {
return false;
}
}
}
return true;
}
// Encodes the specified primary key columns of the supplied row into the buffer.
void PartitionSchema::EncodeColumns(const ConstContiguousRow& row,
const vector<ColumnId>& column_ids,
string* buf) {
for (size_t i = 0; i < column_ids.size(); ++i) {
ColumnId column_id = column_ids[i];
auto column_idx = row.schema()->find_column_by_id(column_id);
CHECK(column_idx != Schema::kColumnNotFound);
const TypeInfo* type = row.schema()->column(column_idx).type_info();
GetKeyEncoder<string>(type).Encode(
row.cell_ptr(column_idx), i + 1 == column_ids.size(), buf);
}
}
// Encodes the specified primary key columns of the supplied row into the buffer.
void PartitionSchema::EncodeColumns(const KuduPartialRow& row,
const vector<ColumnId>& column_ids,
string* buf) {
for (size_t i = 0; i < column_ids.size(); ++i) {
auto column_idx = row.schema()->find_column_by_id(column_ids[i]);
CHECK(column_idx != Schema::kColumnNotFound);
const TypeInfo* type_info = row.schema()->column(column_idx).type_info();
const KeyEncoder<string>& encoder = GetKeyEncoder<string>(type_info);
if (PREDICT_FALSE(!row.IsColumnSet(column_idx))) {
uint8_t min_value[kLargestTypeSize];
type_info->CopyMinValue(min_value);
encoder.Encode(min_value, i + 1 == column_ids.size(), buf);
} else {
ContiguousRow cont_row(row.schema(), row.row_data_);
encoder.Encode(
cont_row.cell_ptr(column_idx), i + 1 == column_ids.size(), buf);
}
}
}
uint32_t PartitionSchema::HashValueForEncodedColumns(
const string& encoded_hash_columns,
const HashDimension& hash_dimension) {
uint64_t hash = HashUtil::MurmurHash2_64(encoded_hash_columns.data(),
encoded_hash_columns.length(),
hash_dimension.seed);
return hash % hash_dimension.num_buckets;
}
vector<Partition> PartitionSchema::GenerateHashPartitions(
const HashSchema& hash_schema,
const KeyEncoder<string>& hash_encoder) {
vector<Partition> hash_partitions(1);
// Create a partition for each hash bucket combination.
for (const auto& hash_dimension : hash_schema) {
vector<Partition> new_partitions;
new_partitions.reserve(hash_partitions.size() * hash_dimension.num_buckets);
// For each of the partitions created so far, replicate it
// by the number of buckets in the next hash bucketing component.
for (const Partition& base_partition : hash_partitions) {
for (uint32_t bucket = 0; bucket < hash_dimension.num_buckets; ++bucket) {
Partition partition = base_partition;
partition.hash_buckets_.push_back(bucket);
hash_encoder.Encode(&bucket, partition.begin_.mutable_hash_key());
hash_encoder.Encode(&bucket, partition.end_.mutable_hash_key());
new_partitions.emplace_back(std::move(partition));
}
}
hash_partitions = std::move(new_partitions);
}
return hash_partitions;
}
Status PartitionSchema::ValidateHashSchema(const Schema& schema,
const HashSchema& hash_schema) {
set<ColumnId> hash_columns;
for (const PartitionSchema::HashDimension& hash_dimension : hash_schema) {
if (hash_dimension.num_buckets < 2) {
return Status::InvalidArgument("must have at least two hash buckets");
}
if (hash_dimension.column_ids.empty()) {
return Status::InvalidArgument("must have at least one hash column");
}
for (const ColumnId& hash_column : hash_dimension.column_ids) {
if (!hash_columns.insert(hash_column).second) {
return Status::InvalidArgument("hash bucket schema components must not "
"contain columns in common");
}
int32_t column_idx = schema.find_column_by_id(hash_column);
if (column_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument("must specify existing columns for hash "
"bucket partition components");
}
if (column_idx >= schema.num_key_columns()) {
return Status::InvalidArgument("must specify only primary key columns for "
"hash bucket partition components");
}
}
}
return Status::OK();
}
template<typename Row>
uint32_t PartitionSchema::HashValueForRow(const Row& row,
const HashDimension& hash_dimension) {
string buf;
EncodeColumns(row, hash_dimension.column_ids, &buf);
uint64_t hash = HashUtil::MurmurHash2_64(
buf.data(), buf.length(), hash_dimension.seed);
return hash % hash_dimension.num_buckets;
}
//------------------------------------------------------------
// Template instantiations: We instantiate all possible templates to avoid linker issues.
// see: https://isocpp.org/wiki/faq/templates#separate-template-fn-defn-from-decl
//------------------------------------------------------------
template
uint32_t PartitionSchema::HashValueForRow(const KuduPartialRow& row,
const HashDimension& hash_dimension);
template
uint32_t PartitionSchema::HashValueForRow(const ConstContiguousRow& row,
const HashDimension& hash_dimension);
void PartitionSchema::Clear() {
hash_schema_idx_by_encoded_range_start_.clear();
ranges_with_custom_hash_schemas_.clear();
hash_schema_.clear();
range_schema_.column_ids.clear();
}
Status PartitionSchema::Validate(const Schema& schema) const {
RETURN_NOT_OK(ValidateHashSchema(schema, hash_schema_));
for (const auto& range_with_hash_schema : ranges_with_custom_hash_schemas_) {
RETURN_NOT_OK(ValidateHashSchema(schema, range_with_hash_schema.hash_schema));
}
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t column_idx = schema.find_column_by_id(column_id);
if (column_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument("must specify existing columns for range "
"partition component");
} else if (column_idx >= schema.num_key_columns()) {
return Status::InvalidArgument("must specify only primary key columns for "
"range partition component");
}
}
return Status::OK();
}
Status PartitionSchema::CheckRangeSchema(const Schema& schema) const {
std::unordered_set<int> range_column_idxs;
for (const ColumnId& column_id : range_schema_.column_ids) {
const auto column_idx = schema.find_column_by_id(column_id);
if (column_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument(Substitute(
"range partition column ID $0 not found in table schema", column_id));
}
if (!InsertIfNotPresent(&range_column_idxs, column_idx)) {
return Status::InvalidArgument("duplicate column in range partition",
schema.column(column_idx).name());
}
}
return Status::OK();
}
void PartitionSchema::UpdatePartitionBoundaries(
const KeyEncoder<string>& hash_encoder,
vector<Partition>* partitions) const {
for (size_t idx = 0; idx < partitions->size(); ++idx) {
auto& p = (*partitions)[idx];
UpdatePartitionBoundaries(
hash_encoder, GetHashSchemaForRange(p.begin().range_key()), &p);
}
}
void PartitionSchema::UpdatePartitionBoundaries(
const KeyEncoder<string>& hash_encoder,
const HashSchema& partition_hash_schema,
Partition* partition) {
if (partition_hash_schema.empty()) {
// A simple short-circuit in case of an empty hash schema.
return;
}
// Note: the following discussion and logic only takes effect when the table's
// partition schema includes at least one hash dimension component, and the
// absolute upper and/or absolute lower range bound is unbounded.
//
// At this point, we have the full set of partitions built up, but due
// to the convention on the unbounded range keys and how PartitionKey
// instances are compared, it's necessary to add an iota to the hash part
// of the key for each unbounded range at the right end to keep proper
// ordering of the ranges.
//
// It would be great to make the partition key space continuous
// (i.e. without holes) by carrying over iotas to next hash dimension index,
// but that would break the proper ordering of the ranges since the
// introduction of range-specific hash schemas. For example, consider the
// following partitioning:
// [-inf, 0) x 3 buckets x 3 buckets; [0, +inf) x 2 buckets x 2 buckets
//
// The original set of ranges looks like the following:
//
// [ (0, 0, ""), (0, 0, "0") )
// [ (0, 1, ""), (0, 1, "0") )
// [ (0, 2, ""), (0, 2, "0") )
// [ (1, 0, ""), (1, 0, "0") )
// [ (1, 1, ""), (1, 1, "0") )
// [ (1, 2, ""), (1, 2, "0") )
// [ (2, 0, ""), (2, 0, "0") )
// [ (2, 1, ""), (2, 1, "0") )
// [ (2, 2, ""), (2, 2, "0") )
//
// [ (0, 0, "0"), (0, 0, "") )
// [ (0, 1, "0"), (0, 1, "") )
// [ (1, 0, "0"), (1, 0, "") )
// [ (1, 1, "0"), (1, 1, "") )
//
// Below is the list of the same partitions, updated and sorted:
// [ (0, 0, ""), (0, 0, "0") )
// [ (0, 0, "0"), (0, 1, "") )
// [ (0, 1, ""), (0, 1, "0") )
// [ (0, 1, "0"), (0, 2, "") )
// [ (0, 2, ""), (0, 2, "0") )
//
// [ (1, 0, ""), (1, 0, "0") )
// [ (1, 0, "0"), (1, 1, "") )
// [ (1, 1, ""), (1, 1, "0") )
// [ (1, 1, "0"), (1, 2, "") )
// [ (1, 2, ""), (1, 2, "0") )
//
// [ (2, 0, ""), (2, 0, "0") )
// [ (2, 1, ""), (2, 1, "0") )
// [ (2, 2, ""), (2, 2, "0") )
//
// An alternate way of updating the hash components that tries to keep the
// key space continuous would transform [ (0, 1, "0"), (0, 1, "") ) into
// [ (0, 1, "0"), (1, 0, "") ), but that would put range
// [ (0, 2, ""), (0, 2, "0") ) inside the transformed one. That's not
// consistent since it messes up the interval logic of the partition pruning
// and the client's metacache: both are built under the assumption that
// partition key ranges backed by different tablets do not intersect.
DCHECK(partition);
auto& p = *partition; // just a handy shortcut for the code below
CHECK_EQ(partition_hash_schema.size(), p.hash_buckets().size());
const auto hash_dimensions_num = static_cast<int>(p.hash_buckets().size());
// Increment the hash bucket component of the last hash dimension for
// each range that has the unbounded end range key.
//
// For example, the following is the result of the range end partition keys'
// transformation, where the key is (hash_comp1, hash_comp2, range_comp) and
// the number of hush buckets per each dimensions is 2:
// [ (0, 0, "") -> (0, 1, "") ]
// [ (0, 1, "") -> (0, 2, "") ]
// [ (1, 0, "") -> (1, 1, "") ]
// [ (1, 1, "") -> (1, 2, "") ]
if (p.end().range_key().empty()) {
const int dimension_idx = hash_dimensions_num - 1;
DCHECK_GE(dimension_idx, 0);
p.end_.mutable_hash_key()->erase(kEncodedBucketSize * dimension_idx);
const int32_t bucket = p.hash_buckets()[dimension_idx] + 1;
hash_encoder.Encode(&bucket, p.end_.mutable_hash_key());
}
}
namespace {
// Increments an unset column in the row.
Status IncrementUnsetColumn(KuduPartialRow* row, int32_t idx) {
DCHECK(!row->IsColumnSet(idx));
const ColumnSchema& col = row->schema()->column(idx);
switch (col.type_info()->type()) {
case INT8:
RETURN_NOT_OK(row->SetInt8(idx, INT8_MIN + 1));
break;
case INT16:
RETURN_NOT_OK(row->SetInt16(idx, INT16_MIN + 1));
break;
case INT32:
RETURN_NOT_OK(row->SetInt32(idx, INT32_MIN + 1));
break;
case INT64:
case UNIXTIME_MICROS:
RETURN_NOT_OK(row->SetInt64(idx, INT64_MIN + 1));
break;
case DATE:
RETURN_NOT_OK(row->SetDate(idx, DataTypeTraits<DATE>::kMinValue + 1));
break;
case VARCHAR:
RETURN_NOT_OK(row->SetVarchar(idx, Slice("\0", 1)));
break;
case STRING:
RETURN_NOT_OK(row->SetStringCopy(idx, Slice("\0", 1)));
break;
case BINARY:
RETURN_NOT_OK(row->SetBinaryCopy(idx, Slice("\0", 1)));
break;
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
RETURN_NOT_OK(row->SetUnscaledDecimal(idx,
MinUnscaledDecimal(col.type_attributes().precision) + 1));
break;
default:
return Status::InvalidArgument("Invalid column type in range partition",
row->schema()->column(idx).ToString());
}
return Status::OK();
}
// Increments a column in the row, setting 'success' to true if the increment
// succeeds, or false if the column is already the maximum value.
Status IncrementColumn(KuduPartialRow* row, int32_t idx, bool* success) {
DCHECK(row->IsColumnSet(idx));
const ColumnSchema& col = row->schema()->column(idx);
*success = true;
switch (col.type_info()->type()) {
case INT8: {
int8_t value;
RETURN_NOT_OK(row->GetInt8(idx, &value));
if (value < INT8_MAX) {
RETURN_NOT_OK(row->SetInt8(idx, value + 1));
} else {
*success = false;
}
break;
}
case INT16: {
int16_t value;
RETURN_NOT_OK(row->GetInt16(idx, &value));
if (value < INT16_MAX) {
RETURN_NOT_OK(row->SetInt16(idx, value + 1));
} else {
*success = false;
}
break;
}
case INT32: {
int32_t value;
RETURN_NOT_OK(row->GetInt32(idx, &value));
if (value < INT32_MAX) {
RETURN_NOT_OK(row->SetInt32(idx, value + 1));
} else {
*success = false;
}
break;
}
case INT64: {
int64_t value;
RETURN_NOT_OK(row->GetInt64(idx, &value));
if (value < INT64_MAX) {
RETURN_NOT_OK(row->SetInt64(idx, value + 1));
} else {
*success = false;
}
break;
}
case UNIXTIME_MICROS: {
int64_t value;
RETURN_NOT_OK(row->GetUnixTimeMicros(idx, &value));
if (value < INT64_MAX) {
RETURN_NOT_OK(row->SetUnixTimeMicros(idx, value + 1));
} else {
*success = false;
}
break;
}
case DATE: {
int32_t value;
RETURN_NOT_OK(row->GetDate(idx, &value));
if (value < DataTypeTraits<DATE>::kMaxValue) {
RETURN_NOT_OK(row->SetDate(idx, value + 1));
} else {
*success = false;
}
break;
}
case DECIMAL32:
case DECIMAL64:
case DECIMAL128: {
int128_t value;
RETURN_NOT_OK(row->GetUnscaledDecimal(idx, &value));
if (value < MaxUnscaledDecimal(col.type_attributes().precision)) {
RETURN_NOT_OK(row->SetUnscaledDecimal(idx, value + 1));
} else {
*success = false;
}
break;
}
case BINARY: {
Slice value;
RETURN_NOT_OK(row->GetBinary(idx, &value));
string incremented = value.ToString();
incremented.push_back('\0');
RETURN_NOT_OK(row->SetBinaryCopy(idx, incremented));
break;
}
case VARCHAR: {
Slice value;
RETURN_NOT_OK(row->GetVarchar(idx, &value));
string incremented = value.ToString();
incremented.push_back('\0');
RETURN_NOT_OK(row->SetVarchar(idx, incremented));
break;
}
case STRING: {
Slice value;
RETURN_NOT_OK(row->GetString(idx, &value));
string incremented = value.ToString();
incremented.push_back('\0');
RETURN_NOT_OK(row->SetStringCopy(idx, incremented));
break;
}
default:
return Status::InvalidArgument("Invalid column type in range partition",
row->schema()->column(idx).ToString());
}
return Status::OK();
}
} // anonymous namespace
Status PartitionSchema::IncrementRangePartitionKey(KuduPartialRow* row, bool* increment) const {
vector<int32_t> unset_idxs;
*increment = false;
for (auto itr = range_schema_.column_ids.rbegin();
itr != range_schema_.column_ids.rend(); ++itr) {
int32_t idx = row->schema()->find_column_by_id(*itr);
if (idx == Schema::kColumnNotFound) {
return Status::InvalidArgument(Substitute("range partition column ID $0 "
"not found in range partition key schema.",
*itr));
}
if (row->IsColumnSet(idx)) {
RETURN_NOT_OK(IncrementColumn(row, idx, increment));
if (*increment) break;
} else {
RETURN_NOT_OK(IncrementUnsetColumn(row, idx));
*increment = true;
break;
}
unset_idxs.push_back(idx);
}
if (*increment) {
for (int32_t idx : unset_idxs) {
RETURN_NOT_OK(row->Unset(idx));
}
}
return Status::OK();
}
const PartitionSchema::HashSchema& PartitionSchema::GetHashSchemaForRange(
const string& range_key) const {
// Find proper hash bucket schema corresponding to the specified range key.
const auto* idx = FindFloorOrNull(
hash_schema_idx_by_encoded_range_start_, range_key);
bool has_custom_range = (idx != nullptr);
if (has_custom_range) {
DCHECK_LT(*idx, ranges_with_custom_hash_schemas_.size());
const auto& upper = ranges_with_custom_hash_schemas_[*idx].upper;
if (!upper.empty() && upper <= range_key) {
has_custom_range = false;
}
}
return has_custom_range ? ranges_with_custom_hash_schemas_[*idx].hash_schema
: hash_schema_;
}
Status PartitionSchema::MakeLowerBoundRangePartitionKeyInclusive(KuduPartialRow* row) const {
// To transform a lower bound range partition key from exclusive to inclusive,
// the key must be incremented. To increment the key, start with the least
// significant column in the key (furthest right), and increment it. If the
// increment fails because the value is already the maximum, move on to the
// next least significant column and attempt to increment it (and so on). When
// incrementing, an unset cell is equivalent to a cell set to the minimum
// value for its column (e.g. an unset Int8 column is incremented to -127
// (-2^7 + 1)). Finally, all columns less significant than the incremented
// column are unset (which means they are treated as the minimum value for
// that column). If all columns in the key are the maximum and can not be
// incremented, then the operation fails.
//
// A few examples, given a range partition of three Int8 columns. Underscore
// signifies unset:
//
// (1, 2, 3) -> (1, 2, 4)
// (1, 2, 127) -> (1, 3, _)
// (1, 127, 3) -> (1, 127, 4)
// (1, _, 3) -> (1, _, 4)
// (_, _, _) -> (_, _, 1)
// (1, 127, 127) -> (2, _, _)
// (127, 127, 127) -> fail!
bool increment;
RETURN_NOT_OK(IncrementRangePartitionKey(row, &increment));
if (!increment) {
vector<string> components;
AppendRangeDebugStringComponentsOrMin(*row, &components);
return Status::InvalidArgument("Exclusive lower bound range partition key must not "
"have maximum values for all components",
RangeKeyDebugString(*row));
}
return Status::OK();
}
Status PartitionSchema::MakeUpperBoundRangePartitionKeyExclusive(KuduPartialRow* row) const {
// To transform an upper bound range partition key from inclusive to exclusive,
// the key must be incremented. Incrementing the key follows the same steps as
// turning an exclusive lower bound key into exclusive. Upper bound keys have
// two additional special cases:
//
// * For upper bound range partition keys with all columns unset, no
// transformation is needed (all unset columns signifies unbounded,
// so there is no difference between inclusive and exclusive).
//
// * For an upper bound key that can't be incremented because all components
// are the maximum value, all columns are unset in order to transform it to
// an unbounded upper bound (this is a special case increment).
bool all_unset = true;
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t idx = row->schema()->find_column_by_id(column_id);
if (idx == Schema::kColumnNotFound) {
return Status::InvalidArgument(Substitute("range partition column ID $0 "
"not found in range partition key schema.",
column_id));
}
all_unset = !row->IsColumnSet(idx);
if (!all_unset) break;
}
if (all_unset) return Status::OK();
bool increment;
RETURN_NOT_OK(IncrementRangePartitionKey(row, &increment));
if (!increment) {
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t idx = row->schema()->find_column_by_id(column_id);
RETURN_NOT_OK(row->Unset(idx));
}
}
return Status::OK();
}
Status PartitionSchema::GetRangeSchemaColumnIndexes(
const Schema& schema,
vector<int32_t>* range_column_indexes) const {
DCHECK(range_column_indexes);
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t idx = schema.find_column_by_id(column_id);
if (idx == Schema::kColumnNotFound) {
return Status::InvalidArgument(Substitute(
"range partition column ID $0 not found in table schema", column_id));
}
range_column_indexes->push_back(idx);
}
return Status::OK();
}
Status PartitionSchema::GetHashSchemaForRange(const KuduPartialRow& lower,
const Schema& schema,
HashSchema* hash_schema) const {
string lower_enc;
RETURN_NOT_OK(EncodeRangeKey(lower, schema, &lower_enc));
*hash_schema = GetHashSchemaForRange(lower_enc);
return Status::OK();
}
Status PartitionSchema::DropRange(const KuduPartialRow& lower,
const KuduPartialRow& upper,
const Schema& schema) {
string lower_enc;
RETURN_NOT_OK(EncodeRangeKey(lower, schema, &lower_enc));
const auto* idx_ptr = FindOrNull(
hash_schema_idx_by_encoded_range_start_, lower_enc);
if (!idx_ptr) {
return Status::NotFound(Substitute(
"'$0': range with specified lower bound not found",
RangeKeyDebugString(lower_enc, schema)));
}
const auto dropped_range_idx = *idx_ptr;
// Check for the upper range match.
string upper_enc;
RETURN_NOT_OK(EncodeRangeKey(upper, schema, &upper_enc));
const auto& dropped_range_upper_enc =
ranges_with_custom_hash_schemas_[dropped_range_idx].upper;
if (dropped_range_upper_enc != upper_enc) {
// Using Status::InvalidArgument() to distinguish between an absent record
// for the range's lower bound and non-matching upper bound.
return Status::InvalidArgument(Substitute(
"range ['$0', '$1'): '$2' vs '$3': upper bound does not match",
RangeKeyDebugString(lower_enc, schema),
RangeKeyDebugString(upper_enc, schema),
RangeKeyDebugString(dropped_range_upper_enc, schema),
RangeKeyDebugString(upper_enc, schema)));
}
// Update the 'ranges_with_custom_hash_schemas_' array and the
// 'hash_schema_idx_by_encoded_range_start_' helper map.
const auto size = ranges_with_custom_hash_schemas_.size();
DCHECK_GE(size, 1);
decltype(hash_schema_idx_by_encoded_range_start_) updated_mapping;
decltype(ranges_with_custom_hash_schemas_) updated_array;
updated_array.reserve(size - 1);
for (size_t idx = 0; idx < size; ++idx) {
if (idx == dropped_range_idx) {
continue;
}
updated_array.emplace_back(std::move(ranges_with_custom_hash_schemas_[idx]));
const auto cur_idx = updated_array.size() - 1;
InsertOrDie(&updated_mapping, updated_array[cur_idx].lower, cur_idx);
}
DCHECK_EQ(updated_array.size() + 1, size);
DCHECK_EQ(updated_array.size(), updated_mapping.size());
ranges_with_custom_hash_schemas_.swap(updated_array);
hash_schema_idx_by_encoded_range_start_.swap(updated_mapping);
return Status::OK();
}
} // namespace kudu