blob: ae276c1abf9db066b0b2a47b5bd6802505f9e6c0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/common/partition.h"
#include <algorithm>
#include <cstring>
#include <iterator>
#include <set>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/key_encoder.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row.h"
#include "kudu/common/types.h"
#include "kudu/gutil/endian.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/decimal_util.h"
#include "kudu/util/hash_util.h"
#include "kudu/util/int128.h"
#include "kudu/util/logging.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/url-coding.h"
using google::protobuf::RepeatedPtrField;
using kudu::pb_util::SecureDebugString;
using std::pair;
using std::set;
using std::string;
using std::vector;
using strings::Substitute;
using strings::SubstituteAndAppend;
namespace kudu {
class faststring;
// The encoded size of a hash bucket in a partition key.
static const size_t kEncodedBucketSize = sizeof(uint32_t);
Slice Partition::range_key_start() const {
return range_key(partition_key_start());
}
Slice Partition::range_key_end() const {
return range_key(partition_key_end());
}
Slice Partition::range_key(const string& partition_key) const {
size_t hash_size = kEncodedBucketSize * hash_buckets().size();
if (partition_key.size() > hash_size) {
Slice s = Slice(partition_key);
s.remove_prefix(hash_size);
return s;
} else {
return Slice();
}
}
bool Partition::Equals(const Partition& other) const {
if (this == &other) return true;
if (partition_key_start() != other.partition_key_start()) return false;
if (partition_key_end() != other.partition_key_end()) return false;
if (hash_buckets_ != other.hash_buckets_) return false;
return true;
}
void Partition::ToPB(PartitionPB* pb) const {
pb->Clear();
pb->mutable_hash_buckets()->Reserve(hash_buckets_.size());
for (int32_t bucket : hash_buckets()) {
pb->add_hash_buckets(bucket);
}
pb->set_partition_key_start(partition_key_start());
pb->set_partition_key_end(partition_key_end());
}
void Partition::FromPB(const PartitionPB& pb, Partition* partition) {
partition->hash_buckets_.clear();
partition->hash_buckets_.reserve(pb.hash_buckets_size());
for (int32_t hash_bucket : pb.hash_buckets()) {
partition->hash_buckets_.push_back(hash_bucket);
}
partition->partition_key_start_ = pb.partition_key_start();
partition->partition_key_end_ = pb.partition_key_end();
}
namespace {
// Extracts the column IDs from a protobuf repeated field of column identifiers.
Status ExtractColumnIds(const RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>& identifiers,
const Schema& schema,
vector<ColumnId>* column_ids) {
vector<ColumnId> new_column_ids;
new_column_ids.reserve(identifiers.size());
for (const auto& identifier : identifiers) {
switch (identifier.identifier_case()) {
case PartitionSchemaPB_ColumnIdentifierPB::kId: {
ColumnId column_id(identifier.id());
if (schema.find_column_by_id(column_id) == Schema::kColumnNotFound) {
return Status::InvalidArgument("unknown column id", SecureDebugString(identifier));
}
new_column_ids.emplace_back(std::move(column_id));
continue;
}
case PartitionSchemaPB_ColumnIdentifierPB::kName: {
int32_t column_idx = schema.find_column(identifier.name());
if (column_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument("unknown column", SecureDebugString(identifier));
}
new_column_ids.emplace_back(schema.column_id(column_idx));
continue;
}
default: return Status::InvalidArgument("unknown column", SecureDebugString(identifier));
}
}
*column_ids = std::move(new_column_ids);
return Status::OK();
}
// Sets a repeated field of column identifiers to the provided column IDs.
void SetColumnIdentifiers(const vector<ColumnId>& column_ids,
RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>* identifiers) {
identifiers->Reserve(column_ids.size());
for (const ColumnId& column_id : column_ids) {
identifiers->Add()->set_id(column_id);
}
}
} // namespace
Status PartitionSchema::ExtractHashBucketSchemasFromPB(
const Schema& schema,
const RepeatedPtrField<PartitionSchemaPB_HashBucketSchemaPB>& hash_buckets_pb,
HashBucketSchemas* hash_bucket_schemas) {
for (const PartitionSchemaPB_HashBucketSchemaPB& hash_bucket_pb : hash_buckets_pb) {
HashBucketSchema hash_bucket;
RETURN_NOT_OK(ExtractColumnIds(hash_bucket_pb.columns(), schema, &hash_bucket.column_ids));
// Hashing is column-order dependent, so sort the column_ids to ensure that
// hash components with the same columns hash consistently. This is
// important when deserializing a user-supplied partition schema during
// table creation; after that the columns should remain in sorted order.
std::sort(hash_bucket.column_ids.begin(), hash_bucket.column_ids.end());
hash_bucket.seed = hash_bucket_pb.seed();
hash_bucket.num_buckets = hash_bucket_pb.num_buckets();
hash_bucket_schemas->push_back(std::move(hash_bucket));
}
return Status::OK();
}
Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
const Schema& schema,
PartitionSchema* partition_schema) {
partition_schema->Clear();
RETURN_NOT_OK(ExtractHashBucketSchemasFromPB(schema, pb.hash_bucket_schemas(),
&partition_schema->hash_bucket_schemas_));
if (pb.has_range_schema()) {
const PartitionSchemaPB_RangeSchemaPB& range_pb = pb.range_schema();
RETURN_NOT_OK(ExtractColumnIds(range_pb.columns(), schema,
&partition_schema->range_schema_.column_ids));
} else {
// Fill in the default range partition (PK columns).
// like the sorting above, this should only happen during table creation
// while deserializing the user-provided partition schema.
for (int32_t column_idx = 0; column_idx < schema.num_key_columns(); column_idx++) {
partition_schema->range_schema_.column_ids.push_back(schema.column_id(column_idx));
}
}
return partition_schema->Validate(schema);
}
void PartitionSchema::ToPB(PartitionSchemaPB* pb) const {
pb->Clear();
pb->mutable_hash_bucket_schemas()->Reserve(hash_bucket_schemas_.size());
for (const HashBucketSchema& hash_bucket : hash_bucket_schemas_) {
PartitionSchemaPB_HashBucketSchemaPB* hash_bucket_pb = pb->add_hash_bucket_schemas();
SetColumnIdentifiers(hash_bucket.column_ids, hash_bucket_pb->mutable_columns());
hash_bucket_pb->set_num_buckets(hash_bucket.num_buckets);
hash_bucket_pb->set_seed(hash_bucket.seed);
}
SetColumnIdentifiers(range_schema_.column_ids, pb->mutable_range_schema()->mutable_columns());
}
template<typename Row>
Status PartitionSchema::EncodeKeyImpl(const Row& row, string* buf) const {
const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
for (const HashBucketSchema& hash_bucket_schema : hash_bucket_schemas_) {
int32_t bucket;
RETURN_NOT_OK(BucketForRow(row, hash_bucket_schema, &bucket));
hash_encoder.Encode(&bucket, buf);
}
return EncodeColumns(row, range_schema_.column_ids, buf);
}
Status PartitionSchema::EncodeKey(const KuduPartialRow& row, string* buf) const {
return EncodeKeyImpl(row, buf);
}
Status PartitionSchema::EncodeKey(const ConstContiguousRow& row, string* buf) const {
return EncodeKeyImpl(row, buf);
}
Status PartitionSchema::EncodeRangeKey(const KuduPartialRow& row,
const Schema& schema,
string* key) const {
DCHECK(key->empty());
bool contains_no_columns = true;
for (int column_idx = 0; column_idx < schema.num_columns(); column_idx++) {
const ColumnSchema& column = schema.column(column_idx);
if (row.IsColumnSet(column_idx)) {
if (std::find(range_schema_.column_ids.begin(),
range_schema_.column_ids.end(),
schema.column_id(column_idx)) != range_schema_.column_ids.end()) {
contains_no_columns = false;
} else {
return Status::InvalidArgument(
"split rows may only contain values for range partitioned columns", column.name());
}
}
}
if (contains_no_columns) {
return Status::OK();
}
return EncodeColumns(row, range_schema_.column_ids, key);
}
Status PartitionSchema::EncodeRangeSplits(const vector<KuduPartialRow>& split_rows,
const Schema& schema,
vector<string>* splits) const {
DCHECK(splits->empty());
if (split_rows.empty()) {
return Status::OK();
}
for (const KuduPartialRow& row : split_rows) {
string split;
RETURN_NOT_OK(EncodeRangeKey(row, schema, &split));
if (split.empty()) {
return Status::InvalidArgument(
"split rows must contain a value for at least one range partition column");
}
splits->emplace_back(std::move(split));
}
std::sort(splits->begin(), splits->end());
auto unique_end = std::unique(splits->begin(), splits->end());
if (unique_end != splits->end()) {
return Status::InvalidArgument("duplicate split row", RangeKeyDebugString(*unique_end, schema));
}
return Status::OK();
}
Status PartitionSchema::EncodeRangeBounds(const vector<pair<KuduPartialRow,
KuduPartialRow>>& range_bounds,
const RangeHashSchema& range_hash_schemas,
const Schema& schema,
vector<RangeWithHashSchemas>*
bounds_with_hash_schemas) const {
DCHECK(bounds_with_hash_schemas->empty());
if (range_bounds.empty()) {
bounds_with_hash_schemas->emplace_back(RangeWithHashSchemas{"", "", {}});
return Status::OK();
}
int j = 0;
for (const auto& bound : range_bounds) {
string lower;
string upper;
RETURN_NOT_OK(EncodeRangeKey(bound.first, schema, &lower));
RETURN_NOT_OK(EncodeRangeKey(bound.second, schema, &upper));
if (!lower.empty() && !upper.empty() && lower >= upper) {
return Status::InvalidArgument(
"range partition lower bound must be less than the upper bound",
RangePartitionDebugString(bound.first, bound.second));
}
RangeWithHashSchemas temp{std::move(lower), std::move(upper), {}};
if (!range_hash_schemas.empty()) {
temp.hash_schemas = range_hash_schemas[j++];
}
bounds_with_hash_schemas->emplace_back(std::move(temp));
}
std::sort(bounds_with_hash_schemas->begin(), bounds_with_hash_schemas->end(),
[](const RangeWithHashSchemas& s1, const RangeWithHashSchemas& s2) {
return s1.lower < s2.lower;
});
// Check that the range bounds are non-overlapping
for (int i = 0; i < bounds_with_hash_schemas->size() - 1; i++) {
const string& first_upper = bounds_with_hash_schemas->at(i).upper;
const string& second_lower = bounds_with_hash_schemas->at(i + 1).lower;
if (first_upper.empty() || second_lower.empty() || first_upper > second_lower) {
return Status::InvalidArgument(
"overlapping range partitions",
strings::Substitute("first range partition: $0, second range partition: $1",
RangePartitionDebugString(bounds_with_hash_schemas->at(i).lower,
bounds_with_hash_schemas->at(i).upper,
schema),
RangePartitionDebugString(bounds_with_hash_schemas->at(i + 1).lower,
bounds_with_hash_schemas->at(i + 1).upper,
schema)));
}
}
return Status::OK();
}
Status PartitionSchema::SplitRangeBounds(const Schema& schema,
vector<string> splits,
vector<RangeWithHashSchemas>*
bounds_with_hash_schemas) const {
if (splits.empty()) {
return Status::OK();
}
auto expected_bounds = std::max(1UL, bounds_with_hash_schemas->size()) + splits.size();
vector<RangeWithHashSchemas> new_bounds_with_hash_schemas;
new_bounds_with_hash_schemas.reserve(expected_bounds);
// Iterate through the sorted bounds and sorted splits, splitting the bounds
// as appropriate and adding them to the result list ('new_bounds').
auto split = splits.begin();
for (auto& bound : *bounds_with_hash_schemas) {
string& lower = bound.lower;
const string& upper = bound.upper;
for (; split != splits.end() && (upper.empty() || *split <= upper); split++) {
if (!lower.empty() && *split < lower) {
return Status::InvalidArgument("split out of bounds", RangeKeyDebugString(*split, schema));
}
if (lower == *split || upper == *split) {
return Status::InvalidArgument("split matches lower or upper bound",
RangeKeyDebugString(*split, schema));
}
// Split the current bound. Add the lower section to the result list,
// and continue iterating on the upper section.
new_bounds_with_hash_schemas.emplace_back(RangeWithHashSchemas{std::move(lower), *split, {}});
lower = std::move(*split);
}
new_bounds_with_hash_schemas.emplace_back(RangeWithHashSchemas{std::move(lower), upper, {}});
}
if (split != splits.end()) {
return Status::InvalidArgument("split out of bounds", RangeKeyDebugString(*split, schema));
}
bounds_with_hash_schemas->swap(new_bounds_with_hash_schemas);
CHECK_EQ(expected_bounds, bounds_with_hash_schemas->size());
return Status::OK();
}
vector<Partition> PartitionSchema::GenerateHashPartitions(const HashBucketSchemas& hash_schemas,
const KeyEncoder<string>& hash_encoder) {
vector<Partition> hash_partitions(1);
// Create a partition for each hash bucket combination.
for (const HashBucketSchema& bucket_schema : hash_schemas) {
auto expected_partitions = hash_partitions.size() * bucket_schema.num_buckets;
vector<Partition> new_partitions;
new_partitions.reserve(expected_partitions);
// For each of the partitions created so far, replicate it
// by the number of buckets in the next hash bucketing component.
for (const Partition& base_partition : hash_partitions) {
for (int32_t bucket = 0; bucket < bucket_schema.num_buckets; bucket++) {
Partition partition = base_partition;
partition.hash_buckets_.push_back(bucket);
hash_encoder.Encode(&bucket, &partition.partition_key_start_);
hash_encoder.Encode(&bucket, &partition.partition_key_end_);
new_partitions.push_back(partition);
}
}
hash_partitions = std::move(new_partitions);
}
return hash_partitions;
}
Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_rows,
const vector<pair<KuduPartialRow,
KuduPartialRow>>& range_bounds,
const RangeHashSchema& range_hash_schemas,
const Schema& schema,
vector<Partition>* partitions) const {
const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
if (!range_hash_schemas.empty()) {
if (!split_rows.empty()) {
return Status::InvalidArgument("Both 'split_rows' and 'range_hash_schemas' cannot be "
"populated at the same time.");
}
if (range_bounds.size() != range_hash_schemas.size()) {
return Status::InvalidArgument("The number of range bounds does not match the number of per "
"range hash schemas.");
}
}
vector<Partition> base_hash_partitions = GenerateHashPartitions(hash_bucket_schemas_,
hash_encoder);
std::unordered_set<int> range_column_idxs;
for (const ColumnId& column_id : range_schema_.column_ids) {
int column_idx = schema.find_column_by_id(column_id);
if (column_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument(Substitute("range partition column ID $0 "
"not found in table schema.", column_id));
}
if (!InsertIfNotPresent(&range_column_idxs, column_idx)) {
return Status::InvalidArgument("duplicate column in range partition",
schema.column(column_idx).name());
}
}
vector<RangeWithHashSchemas> bounds_with_hash_schemas;
vector<string> splits;
RETURN_NOT_OK(EncodeRangeBounds(range_bounds, range_hash_schemas, schema,
&bounds_with_hash_schemas));
RETURN_NOT_OK(EncodeRangeSplits(split_rows, schema, &splits));
RETURN_NOT_OK(SplitRangeBounds(schema, std::move(splits), &bounds_with_hash_schemas));
// Maps each partition to its respective hash schemas within 'bounds_with_hash_schemas',
// needed for logic later in function for filling in holes in partition key space. Will be
// empty if no per range hash schemas are used.
vector<int> partition_idx_to_hash_schemas_idx;
if (!range_hash_schemas.empty()) {
// The number of ranges should match the size of range_hash_schemas.
DCHECK_EQ(range_hash_schemas.size(), bounds_with_hash_schemas.size());
// No split rows should be defined if range_hash_schemas is populated.
DCHECK(split_rows.empty());
vector<Partition> result_partitions;
// Iterate through each bound and its hash schemas to generate hash partitions.
for (int i = 0; i < bounds_with_hash_schemas.size(); i++) {
const auto& bound = bounds_with_hash_schemas[i];
const auto& current_range_hash_schemas = bound.hash_schemas;
vector<Partition> current_bound_hash_partitions;
// If current bound's HashBucketSchema is empty, implies use of default table-wide schema.
// If not empty, generate hash partitions for all the provided hash schemas in this range.
if (current_range_hash_schemas.empty()) {
current_bound_hash_partitions = base_hash_partitions;
} else {
current_bound_hash_partitions = GenerateHashPartitions(current_range_hash_schemas,
hash_encoder);
}
// Add range part to partition key.
for (Partition& partition : current_bound_hash_partitions) {
partition.partition_key_start_.append(bound.lower);
partition.partition_key_end_.append(bound.upper);
int index = current_range_hash_schemas.empty() ? -1 : i;
partition_idx_to_hash_schemas_idx.emplace_back(index);
}
result_partitions.insert(result_partitions.end(),
std::make_move_iterator(current_bound_hash_partitions.begin()),
std::make_move_iterator(current_bound_hash_partitions.end()));
}
DCHECK_EQ(partition_idx_to_hash_schemas_idx.size(), result_partitions.size());
*partitions = std::move(result_partitions);
} else {
// Create a partition per range bound and hash bucket combination.
vector<Partition> new_partitions;
for (const Partition& base_partition : base_hash_partitions) {
for (const auto& bound : bounds_with_hash_schemas) {
Partition partition = base_partition;
partition.partition_key_start_.append(bound.lower);
partition.partition_key_end_.append(bound.upper);
new_partitions.push_back(partition);
}
}
*partitions = std::move(new_partitions);
}
// Note: the following discussion and logic only takes effect when the table's
// partition schema includes at least one hash bucket component, and the
// absolute upper and/or absolute lower range bound is unbounded.
//
// At this point, we have the full set of partitions built up, but each
// partition only covers a finite slice of the partition key-space. Some
// operations involving partitions are easier (pruning, client meta cache) if
// it can be assumed that the partition keyspace does not have holes.
//
// In order to 'fill in' the partition key space, the absolute first and last
// partitions are extended to cover the rest of the lower and upper partition
// range by clearing the start and end partition key, respectively.
//
// When the table has two or more hash components, there will be gaps in
// between partitions at the boundaries of the component ranges. Similar to
// the absolute start and end case, these holes are filled by clearing the
// partition key beginning at the hash component. For a concrete example,
// see PartitionTest::TestCreatePartitions.
const HashBucketSchema* hash_bucket_schema;
for (int j = 0; j < partitions->size(); j++) {
Partition& partition = (*partitions)[j];
// Find the first zero-valued bucket from the end and truncate the partition key
// starting from that bucket onwards for zero-valued buckets.
if (partition.range_key_start().empty()) {
for (int i = static_cast<int>(partition.hash_buckets().size()) - 1; i >= 0; i--) {
if (partition.hash_buckets()[i] != 0) {
break;
}
partition.partition_key_start_.erase(kEncodedBucketSize * i);
}
}
// Starting from the last hash bucket, truncate the partition key until we hit the first
// non-max-valued bucket, at which point, replace the encoding with the next-incremented
// bucket value. For example, the following range end partition keys should be transformed,
// where the key is (hash_comp1, hash_comp2, range_comp):
//
// [ (0, 0, "a2b2") -> (0, 1, "a2b2") ]
// [ (0, 1, "a2b2") -> (1, _, "a2b2") ]
// [ (1, 0, "a2b2") -> (1, 1, "a2b2") ]
// [ (1, 1, "a2b2") -> (_, _, "a2b2") ]
if (partition.range_key_end().empty()) {
for (int i = static_cast<int>(partition.hash_buckets().size()) - 1; i >= 0; i--) {
partition.partition_key_end_.erase(kEncodedBucketSize * i);
int32_t hash_bucket = partition.hash_buckets()[i] + 1;
if (range_hash_schemas.empty() || partition_idx_to_hash_schemas_idx[j] == -1) {
hash_bucket_schema = &hash_bucket_schemas_[i];
} else {
const auto& hash_schemas_idx = partition_idx_to_hash_schemas_idx[j];
hash_bucket_schema = &bounds_with_hash_schemas[hash_schemas_idx].hash_schemas[i];
}
if (hash_bucket != hash_bucket_schema->num_buckets) {
hash_encoder.Encode(&hash_bucket, &partition.partition_key_end_);
break;
}
}
}
}
return Status::OK();
}
template<typename Row>
Status PartitionSchema::PartitionContainsRowImpl(const Partition& partition,
const Row& row,
bool* contains) const {
CHECK_EQ(partition.hash_buckets().size(), hash_bucket_schemas_.size());
for (int i = 0; i < hash_bucket_schemas_.size(); i++) {
RETURN_NOT_OK(HashPartitionContainsRowImpl(partition, row, i, contains));
if (!*contains) {
return Status::OK();
}
}
RETURN_NOT_OK(RangePartitionContainsRowImpl(partition, row, contains));
return Status::OK();
}
template<typename Row>
Status PartitionSchema::HashPartitionContainsRowImpl(const Partition& partition,
const Row& row,
int hash_idx,
bool* contains) const {
DCHECK_EQ(partition.hash_buckets().size(), hash_bucket_schemas_.size());
const HashBucketSchema& hash_bucket_schema = hash_bucket_schemas_[hash_idx];
int32_t bucket = -1;
RETURN_NOT_OK(BucketForRow(row, hash_bucket_schema, &bucket));
*contains = (partition.hash_buckets()[hash_idx] == bucket);
return Status::OK();
}
template<typename Row>
Status PartitionSchema::RangePartitionContainsRowImpl(const Partition& partition,
const Row& row,
bool* contains) const {
string range_partition_key;
// If range partition is not used, column_ids would be empty and
// EncodedColumn() would return immediately.
RETURN_NOT_OK(EncodeColumns(row, range_schema_.column_ids, &range_partition_key));
// If all of the hash buckets match, then the row is contained in the
// partition if the row is gte the lower bound; and if there is no upper
// bound, or the row is lt the upper bound.
*contains = (Slice(range_partition_key).compare(partition.range_key_start()) >= 0)
&& (partition.range_key_end().empty()
|| Slice(range_partition_key).compare(partition.range_key_end()) < 0);
return Status::OK();
}
Status PartitionSchema::PartitionContainsRow(const Partition& partition,
const KuduPartialRow& row,
bool* contains) const {
return PartitionContainsRowImpl(partition, row, contains);
}
Status PartitionSchema::PartitionContainsRow(const Partition& partition,
const ConstContiguousRow& row,
bool* contains) const {
return PartitionContainsRowImpl(partition, row, contains);
}
Status PartitionSchema::HashPartitionContainsRow(const Partition& partition,
const KuduPartialRow& row,
int hash_idx,
bool* contains) const {
return HashPartitionContainsRowImpl(partition, row, hash_idx, contains);
}
Status PartitionSchema::HashPartitionContainsRow(const Partition& partition,
const ConstContiguousRow& row,
int hash_idx,
bool* contains) const {
return HashPartitionContainsRowImpl(partition, row, hash_idx, contains);
}
Status PartitionSchema::RangePartitionContainsRow(const Partition& partition,
const KuduPartialRow& row,
bool* contains) const {
return RangePartitionContainsRowImpl(partition, row, contains);
}
Status PartitionSchema::RangePartitionContainsRow(const Partition& partition,
const ConstContiguousRow& row,
bool* contains) const {
return RangePartitionContainsRowImpl(partition, row, contains);
}
Status PartitionSchema::DecodeRangeKey(Slice* encoded_key,
KuduPartialRow* partial_row,
Arena* arena) const {
ContiguousRow cont_row(partial_row->schema(), partial_row->row_data_);
for (int i = 0; i < range_schema_.column_ids.size(); i++) {
if (encoded_key->empty()) {
// This can happen when decoding partition start and end keys, since they
// are truncated to simulate absolute upper and lower bounds.
break;
}
int32_t column_idx = partial_row->schema()->find_column_by_id(range_schema_.column_ids[i]);
const ColumnSchema& column = partial_row->schema()->column(column_idx);
const KeyEncoder<faststring>& key_encoder = GetKeyEncoder<faststring>(column.type_info());
bool is_last = i == (range_schema_.column_ids.size() - 1);
// Decode the column.
RETURN_NOT_OK_PREPEND(key_encoder.Decode(encoded_key,
is_last,
arena,
cont_row.mutable_cell_ptr(column_idx)),
Substitute("Error decoding partition key range component '$0'",
column.name()));
// Mark the column as set.
BitmapSet(partial_row->isset_bitmap_, column_idx);
}
if (!encoded_key->empty()) {
return Status::InvalidArgument("unable to fully decode range key",
KUDU_REDACT(encoded_key->ToDebugString()));
}
return Status::OK();
}
// Decodes a slice of a partition key into the buckets. The slice is modified to
// remove the hash components.
Status PartitionSchema::DecodeHashBuckets(Slice* encoded_key,
vector<int32_t>* buckets) const {
size_t hash_components_size = kEncodedBucketSize * hash_bucket_schemas_.size();
if (encoded_key->size() < hash_components_size) {
return Status::InvalidArgument(
Substitute("expected encoded hash key to be at least $0 bytes (only found $1)",
hash_components_size, encoded_key->size()));
}
for (const auto& schema : hash_bucket_schemas_) {
(void) schema; // quiet unused variable warning
uint32_t big_endian;
memcpy(&big_endian, encoded_key->data(), sizeof(uint32_t));
buckets->push_back(BigEndian::ToHost32(big_endian));
encoded_key->remove_prefix(sizeof(uint32_t));
}
return Status::OK();
}
bool PartitionSchema::IsRangePartitionKeyEmpty(const KuduPartialRow& row) const {
ConstContiguousRow const_row(row.schema(), row.row_data_);
for (const ColumnId& column_id : range_schema_.column_ids) {
if (row.IsColumnSet(row.schema()->find_column_by_id(column_id))) return false;
}
return true;
}
void PartitionSchema::AppendRangeDebugStringComponentsOrMin(const KuduPartialRow& row,
vector<string>* components) const {
ConstContiguousRow const_row(row.schema(), row.row_data_);
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t column_idx = row.schema()->find_column_by_id(column_id);
if (column_idx == Schema::kColumnNotFound) {
components->emplace_back("<unknown-column>");
continue;
}
const ColumnSchema& column_schema = row.schema()->column(column_idx);
if (!row.IsColumnSet(column_idx)) {
uint8_t min_value[kLargestTypeSize];
column_schema.type_info()->CopyMinValue(&min_value);
SimpleConstCell cell(&column_schema, &min_value);
components->emplace_back(column_schema.Stringify(cell.ptr()));
} else {
components->emplace_back(column_schema.Stringify(const_row.cell_ptr(column_idx)));
}
}
}
namespace {
// Converts a list of column IDs to a string with the column names seperated by
// a comma character.
string ColumnIdsToColumnNames(const Schema& schema,
const vector<ColumnId>& column_ids) {
vector<string> names;
names.reserve(column_ids.size());
for (const ColumnId& column_id : column_ids) {
names.push_back(schema.column(schema.find_column_by_id(column_id)).name());
}
return JoinStrings(names, ", ");
}
} // namespace
string PartitionSchema::PartitionDebugString(const Partition& partition,
const Schema& schema) const {
// Partitions are considered metadata, so don't redact them.
ScopedDisableRedaction no_redaction;
vector<string> components;
if (partition.hash_buckets_.size() != hash_bucket_schemas_.size()) {
return "<hash-partition-error>";
}
for (int i = 0; i < hash_bucket_schemas_.size(); i++) {
string s = Substitute("HASH ($0) PARTITION $1",
ColumnIdsToColumnNames(schema, hash_bucket_schemas_[i].column_ids),
partition.hash_buckets_[i]);
components.emplace_back(std::move(s));
}
if (!range_schema_.column_ids.empty()) {
string s = Substitute("RANGE ($0) PARTITION $1",
ColumnIdsToColumnNames(schema, range_schema_.column_ids),
RangePartitionDebugString(partition.range_key_start(),
partition.range_key_end(),
schema));
components.emplace_back(std::move(s));
}
return JoinStrings(components, ", ");
}
template<typename Row>
string PartitionSchema::PartitionKeyDebugStringImpl(const Row& row) const {
vector<string> components;
for (const HashBucketSchema& hash_bucket_schema : hash_bucket_schemas_) {
int32_t bucket;
Status s = BucketForRow(row, hash_bucket_schema, &bucket);
if (s.ok()) {
components.emplace_back(
Substitute("HASH ($0): $1",
ColumnIdsToColumnNames(*row.schema(), hash_bucket_schema.column_ids),
bucket));
} else {
components.emplace_back(Substitute("<hash-error: $0>", s.ToString()));
}
}
if (!range_schema_.column_ids.empty()) {
components.emplace_back(
Substitute("RANGE ($0): $1",
ColumnIdsToColumnNames(*row.schema(), range_schema_.column_ids),
RangeKeyDebugString(row)));
}
return JoinStrings(components, ", ");
}
template
string PartitionSchema::PartitionKeyDebugStringImpl(const KuduPartialRow& row) const;
template
string PartitionSchema::PartitionKeyDebugStringImpl(const ConstContiguousRow& row) const;
string PartitionSchema::PartitionKeyDebugString(const ConstContiguousRow& row) const {
return PartitionKeyDebugStringImpl(row);
}
string PartitionSchema::PartitionKeyDebugString(const KuduPartialRow& row) const {
return PartitionKeyDebugStringImpl(row);
}
string PartitionSchema::PartitionKeyDebugString(Slice key, const Schema& schema) const {
vector<string> components;
size_t hash_components_size = kEncodedBucketSize * hash_bucket_schemas_.size();
if (key.size() < hash_components_size) {
return "<hash-decode-error>";
}
for (const auto& hash_schema : hash_bucket_schemas_) {
uint32_t big_endian;
memcpy(&big_endian, key.data(), sizeof(uint32_t));
key.remove_prefix(sizeof(uint32_t));
components.emplace_back(
Substitute("HASH ($0): $1",
ColumnIdsToColumnNames(schema, hash_schema.column_ids),
BigEndian::ToHost32(big_endian)));
}
if (!range_schema_.column_ids.empty()) {
components.emplace_back(
Substitute("RANGE ($0): $1",
ColumnIdsToColumnNames(schema, range_schema_.column_ids),
RangeKeyDebugString(key, schema)));
}
return JoinStrings(components, ", ");
}
string PartitionSchema::RangePartitionDebugString(const KuduPartialRow& lower_bound,
const KuduPartialRow& upper_bound) const {
// Partitions are considered metadata, so don't redact them.
ScopedDisableRedaction no_redaction;
bool lower_unbounded = IsRangePartitionKeyEmpty(lower_bound);
bool upper_unbounded = IsRangePartitionKeyEmpty(upper_bound);
if (lower_unbounded && upper_unbounded) {
return "UNBOUNDED";
}
if (lower_unbounded) {
return Substitute("VALUES < $0", RangeKeyDebugString(upper_bound));
}
if (upper_unbounded) {
return Substitute("VALUES >= $0", RangeKeyDebugString(lower_bound));
}
// TODO(dan): recognize when a simplified 'VALUE =' form can be used (see
// org.apache.kudu.client.Partition#formatRangePartition).
return Substitute("$0 <= VALUES < $1",
RangeKeyDebugString(lower_bound),
RangeKeyDebugString(upper_bound));
}
string PartitionSchema::RangePartitionDebugString(Slice lower_bound,
Slice upper_bound,
const Schema& schema) const {
// Partitions are considered metadata, so don't redact them.
ScopedDisableRedaction no_redaction;
Arena arena(256);
KuduPartialRow lower(&schema);
KuduPartialRow upper(&schema);
Status s = DecodeRangeKey(&lower_bound, &lower, &arena);
if (!s.ok()) {
return Substitute("<range-key-decode-error: $0>", s.ToString());
}
s = DecodeRangeKey(&upper_bound, &upper, &arena);
if (!s.ok()) {
return Substitute("<range-key-decode-error: $0>", s.ToString());
}
return RangePartitionDebugString(lower, upper);
}
string PartitionSchema::RangeKeyDebugString(Slice range_key, const Schema& schema) const {
Arena arena(256);
KuduPartialRow row(&schema);
Status s = DecodeRangeKey(&range_key, &row, &arena);
if (!s.ok()) {
return Substitute("<range-key-decode-error: $0>", s.ToString());
}
return RangeKeyDebugString(row);
}
string PartitionSchema::RangeKeyDebugString(const KuduPartialRow& key) const {
vector<string> components;
AppendRangeDebugStringComponentsOrMin(key, &components);
if (components.size() == 1) {
// Omit the parentheses if the range partition has a single column.
return components.back();
}
return Substitute("($0)", JoinStrings(components, ", "));
}
string PartitionSchema::RangeKeyDebugString(const ConstContiguousRow& key) const {
vector<string> components;
for (const ColumnId& column_id : range_schema_.column_ids) {
string column;
int32_t column_idx = key.schema()->find_column_by_id(column_id);
if (column_idx == Schema::kColumnNotFound) {
components.emplace_back("<unknown-column>");
break;
}
key.schema()->column(column_idx).DebugCellAppend(key.cell(column_idx), &column);
components.push_back(column);
}
if (components.size() == 1) {
// Omit the parentheses if the range partition has a single column.
return components.back();
}
return Substitute("($0)", JoinStrings(components, ", "));
}
vector<string> PartitionSchema::DebugStringComponents(const Schema& schema) const {
vector<string> components;
for (const auto& hash_bucket_schema : hash_bucket_schemas_) {
string s;
SubstituteAndAppend(&s, "HASH ($0) PARTITIONS $1",
ColumnIdsToColumnNames(schema, hash_bucket_schema.column_ids),
hash_bucket_schema.num_buckets);
if (hash_bucket_schema.seed != 0) {
SubstituteAndAppend(&s, " SEED $0", hash_bucket_schema.seed);
}
components.emplace_back(std::move(s));
}
if (!range_schema_.column_ids.empty()) {
string s = Substitute("RANGE ($0)", ColumnIdsToColumnNames(schema, range_schema_.column_ids));
components.emplace_back(std::move(s));
}
return components;
}
string PartitionSchema::DebugString(const Schema& schema) const {
return JoinStrings(DebugStringComponents(schema), ", ");
}
string PartitionSchema::DisplayString(const Schema& schema,
const vector<string>& range_partitions) const {
string display_string = JoinStrings(DebugStringComponents(schema), ",\n");
if (!range_schema_.column_ids.empty()) {
display_string.append(" (");
if (range_partitions.empty()) {
display_string.append(")");
} else {
bool is_first = true;
for (const string& range_partition : range_partitions) {
if (is_first) {
is_first = false;
} else {
display_string.push_back(',');
}
display_string.append("\n PARTITION ");
display_string.append(range_partition);
}
display_string.append("\n)");
}
}
return display_string;
}
string PartitionSchema::PartitionTableHeader(const Schema& schema) const {
string header;
for (const auto& hash_bucket_schema : hash_bucket_schemas_) {
SubstituteAndAppend(&header, "<th>HASH ($0) PARTITION</th>",
EscapeForHtmlToString(
ColumnIdsToColumnNames(schema, hash_bucket_schema.column_ids)));
}
if (!range_schema_.column_ids.empty()) {
SubstituteAndAppend(&header, "<th>RANGE ($0) PARTITION</th>",
EscapeForHtmlToString(
ColumnIdsToColumnNames(schema, range_schema_.column_ids)));
}
return header;
}
string PartitionSchema::PartitionTableEntry(const Schema& schema,
const Partition& partition) const {
string entry;
for (int32_t bucket : partition.hash_buckets_) {
SubstituteAndAppend(&entry, "<td>$0</td>", bucket);
}
if (!range_schema_.column_ids.empty()) {
SubstituteAndAppend(&entry, "<td>$0</td>",
EscapeForHtmlToString(
RangePartitionDebugString(partition.range_key_start(),
partition.range_key_end(),
schema)));
}
return entry;
}
bool PartitionSchema::Equals(const PartitionSchema& other) const {
if (this == &other) return true;
// Compare range component.
if (range_schema_.column_ids != other.range_schema_.column_ids) return false;
// Compare hash bucket components.
if (hash_bucket_schemas_.size() != other.hash_bucket_schemas_.size()) return false;
for (int i = 0; i < hash_bucket_schemas_.size(); i++) {
if (hash_bucket_schemas_[i].seed != other.hash_bucket_schemas_[i].seed) return false;
if (hash_bucket_schemas_[i].num_buckets
!= other.hash_bucket_schemas_[i].num_buckets) return false;
if (hash_bucket_schemas_[i].column_ids
!= other.hash_bucket_schemas_[i].column_ids) return false;
}
return true;
}
// Encodes the specified primary key columns of the supplied row into the buffer.
Status PartitionSchema::EncodeColumns(const ConstContiguousRow& row,
const vector<ColumnId>& column_ids,
string* buf) {
for (int i = 0; i < column_ids.size(); i++) {
ColumnId column_id = column_ids[i];
int32_t column_idx = row.schema()->find_column_by_id(column_id);
const TypeInfo* type = row.schema()->column(column_idx).type_info();
GetKeyEncoder<string>(type).Encode(row.cell_ptr(column_idx), i + 1 == column_ids.size(), buf);
}
return Status::OK();
}
// Encodes the specified primary key columns of the supplied row into the buffer.
Status PartitionSchema::EncodeColumns(const KuduPartialRow& row,
const vector<ColumnId>& column_ids,
string* buf) {
for (int i = 0; i < column_ids.size(); i++) {
int32_t column_idx = row.schema()->find_column_by_id(column_ids[i]);
CHECK(column_idx != Schema::kColumnNotFound);
const TypeInfo* type_info = row.schema()->column(column_idx).type_info();
const KeyEncoder<string>& encoder = GetKeyEncoder<string>(type_info);
if (PREDICT_FALSE(!row.IsColumnSet(column_idx))) {
uint8_t min_value[kLargestTypeSize];
type_info->CopyMinValue(min_value);
encoder.Encode(min_value, i + 1 == column_ids.size(), buf);
} else {
ContiguousRow cont_row(row.schema(), row.row_data_);
encoder.Encode(cont_row.cell_ptr(column_idx), i + 1 == column_ids.size(), buf);
}
}
return Status::OK();
}
int32_t PartitionSchema::BucketForEncodedColumns(const string& encoded_hash_columns,
const HashBucketSchema& hash_bucket_schema) {
uint64_t hash = HashUtil::MurmurHash2_64(encoded_hash_columns.data(),
encoded_hash_columns.length(),
hash_bucket_schema.seed);
return hash % static_cast<uint64_t>(hash_bucket_schema.num_buckets);
}
template<typename Row>
Status PartitionSchema::BucketForRow(const Row& row,
const HashBucketSchema& hash_bucket_schema,
int32_t* bucket) {
string buf;
RETURN_NOT_OK(EncodeColumns(row, hash_bucket_schema.column_ids, &buf));
uint64_t hash = HashUtil::MurmurHash2_64(buf.data(), buf.length(), hash_bucket_schema.seed);
*bucket = hash % static_cast<uint64_t>(hash_bucket_schema.num_buckets);
return Status::OK();
}
//------------------------------------------------------------
// Template instantiations: We instantiate all possible templates to avoid linker issues.
// see: https://isocpp.org/wiki/faq/templates#separate-template-fn-defn-from-decl
//------------------------------------------------------------
template
Status PartitionSchema::BucketForRow(const KuduPartialRow& row,
const HashBucketSchema& hash_bucket_schema,
int32_t* bucket);
template
Status PartitionSchema::BucketForRow(const ConstContiguousRow& row,
const HashBucketSchema& hash_bucket_schema,
int32_t* bucket);
void PartitionSchema::Clear() {
hash_bucket_schemas_.clear();
range_schema_.column_ids.clear();
}
Status PartitionSchema::Validate(const Schema& schema) const {
set<ColumnId> hash_columns;
for (const PartitionSchema::HashBucketSchema& hash_schema : hash_bucket_schemas_) {
if (hash_schema.num_buckets < 2) {
return Status::InvalidArgument("must have at least two hash buckets");
}
if (hash_schema.column_ids.size() < 1) {
return Status::InvalidArgument("must have at least one hash column");
}
for (const ColumnId& hash_column : hash_schema.column_ids) {
if (!hash_columns.insert(hash_column).second) {
return Status::InvalidArgument("hash bucket schema components must not "
"contain columns in common");
}
int32_t column_idx = schema.find_column_by_id(hash_column);
if (column_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument("must specify existing columns for hash "
"bucket partition components");
} else if (column_idx >= schema.num_key_columns()) {
return Status::InvalidArgument("must specify only primary key columns for "
"hash bucket partition components");
}
}
}
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t column_idx = schema.find_column_by_id(column_id);
if (column_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument("must specify existing columns for range "
"partition component");
} else if (column_idx >= schema.num_key_columns()) {
return Status::InvalidArgument("must specify only primary key columns for "
"range partition component");
}
}
return Status::OK();
}
namespace {
// Increments an unset column in the row.
Status IncrementUnsetColumn(KuduPartialRow* row, int32_t idx) {
DCHECK(!row->IsColumnSet(idx));
const ColumnSchema& col = row->schema()->column(idx);
switch (col.type_info()->type()) {
case INT8:
RETURN_NOT_OK(row->SetInt8(idx, INT8_MIN + 1));
break;
case INT16:
RETURN_NOT_OK(row->SetInt16(idx, INT16_MIN + 1));
break;
case INT32:
RETURN_NOT_OK(row->SetInt32(idx, INT32_MIN + 1));
break;
case INT64:
case UNIXTIME_MICROS:
RETURN_NOT_OK(row->SetInt64(idx, INT64_MIN + 1));
break;
case DATE:
RETURN_NOT_OK(row->SetDate(idx, DataTypeTraits<DATE>::kMinValue + 1));
break;
case VARCHAR:
RETURN_NOT_OK(row->SetVarchar(idx, Slice("\0", 1)));
break;
case STRING:
RETURN_NOT_OK(row->SetStringCopy(idx, Slice("\0", 1)));
break;
case BINARY:
RETURN_NOT_OK(row->SetBinaryCopy(idx, Slice("\0", 1)));
break;
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
RETURN_NOT_OK(row->SetUnscaledDecimal(idx,
MinUnscaledDecimal(col.type_attributes().precision) + 1));
break;
default:
return Status::InvalidArgument("Invalid column type in range partition",
row->schema()->column(idx).ToString());
}
return Status::OK();
}
// Increments a column in the row, setting 'success' to true if the increment
// succeeds, or false if the column is already the maximum value.
Status IncrementColumn(KuduPartialRow* row, int32_t idx, bool* success) {
DCHECK(row->IsColumnSet(idx));
const ColumnSchema& col = row->schema()->column(idx);
*success = true;
switch (col.type_info()->type()) {
case INT8: {
int8_t value;
RETURN_NOT_OK(row->GetInt8(idx, &value));
if (value < INT8_MAX) {
RETURN_NOT_OK(row->SetInt8(idx, value + 1));
} else {
*success = false;
}
break;
}
case INT16: {
int16_t value;
RETURN_NOT_OK(row->GetInt16(idx, &value));
if (value < INT16_MAX) {
RETURN_NOT_OK(row->SetInt16(idx, value + 1));
} else {
*success = false;
}
break;
}
case INT32: {
int32_t value;
RETURN_NOT_OK(row->GetInt32(idx, &value));
if (value < INT32_MAX) {
RETURN_NOT_OK(row->SetInt32(idx, value + 1));
} else {
*success = false;
}
break;
}
case INT64: {
int64_t value;
RETURN_NOT_OK(row->GetInt64(idx, &value));
if (value < INT64_MAX) {
RETURN_NOT_OK(row->SetInt64(idx, value + 1));
} else {
*success = false;
}
break;
}
case UNIXTIME_MICROS: {
int64_t value;
RETURN_NOT_OK(row->GetUnixTimeMicros(idx, &value));
if (value < INT64_MAX) {
RETURN_NOT_OK(row->SetUnixTimeMicros(idx, value + 1));
} else {
*success = false;
}
break;
}
case DATE: {
int32_t value;
RETURN_NOT_OK(row->GetDate(idx, &value));
if (value < DataTypeTraits<DATE>::kMaxValue) {
RETURN_NOT_OK(row->SetDate(idx, value + 1));
} else {
*success = false;
}
break;
}
case DECIMAL32:
case DECIMAL64:
case DECIMAL128: {
int128_t value;
RETURN_NOT_OK(row->GetUnscaledDecimal(idx, &value));
if (value < MaxUnscaledDecimal(col.type_attributes().precision)) {
RETURN_NOT_OK(row->SetUnscaledDecimal(idx, value + 1));
} else {
*success = false;
}
break;
}
case BINARY: {
Slice value;
RETURN_NOT_OK(row->GetBinary(idx, &value));
string incremented = value.ToString();
incremented.push_back('\0');
RETURN_NOT_OK(row->SetBinaryCopy(idx, incremented));
break;
}
case VARCHAR: {
Slice value;
RETURN_NOT_OK(row->GetVarchar(idx, &value));
string incremented = value.ToString();
incremented.push_back('\0');
RETURN_NOT_OK(row->SetVarchar(idx, incremented));
break;
}
case STRING: {
Slice value;
RETURN_NOT_OK(row->GetString(idx, &value));
string incremented = value.ToString();
incremented.push_back('\0');
RETURN_NOT_OK(row->SetStringCopy(idx, incremented));
break;
}
default:
return Status::InvalidArgument("Invalid column type in range partition",
row->schema()->column(idx).ToString());
}
return Status::OK();
}
} // anonymous namespace
Status PartitionSchema::IncrementRangePartitionKey(KuduPartialRow* row, bool* increment) const {
vector<int32_t> unset_idxs;
*increment = false;
for (auto itr = range_schema_.column_ids.rbegin();
itr != range_schema_.column_ids.rend(); ++itr) {
int32_t idx = row->schema()->find_column_by_id(*itr);
if (idx == Schema::kColumnNotFound) {
return Status::InvalidArgument(Substitute("range partition column ID $0 "
"not found in range partition key schema.",
*itr));
}
if (row->IsColumnSet(idx)) {
RETURN_NOT_OK(IncrementColumn(row, idx, increment));
if (*increment) break;
} else {
RETURN_NOT_OK(IncrementUnsetColumn(row, idx));
*increment = true;
break;
}
unset_idxs.push_back(idx);
}
if (*increment) {
for (int32_t idx : unset_idxs) {
RETURN_NOT_OK(row->Unset(idx));
}
}
return Status::OK();
}
Status PartitionSchema::MakeLowerBoundRangePartitionKeyInclusive(KuduPartialRow* row) const {
// To transform a lower bound range partition key from exclusive to inclusive,
// the key must be incremented. To increment the key, start with the least
// significant column in the key (furthest right), and increment it. If the
// increment fails because the value is already the maximum, move on to the
// next least significant column and attempt to increment it (and so on). When
// incrementing, an unset cell is equivalent to a cell set to the minimum
// value for its column (e.g. an unset Int8 column is incremented to -127
// (-2^7 + 1)). Finally, all columns less significant than the incremented
// column are unset (which means they are treated as the minimum value for
// that column). If all columns in the key are the maximum and can not be
// incremented, then the operation fails.
//
// A few examples, given a range partition of three Int8 columns. Underscore
// signifies unset:
//
// (1, 2, 3) -> (1, 2, 4)
// (1, 2, 127) -> (1, 3, _)
// (1, 127, 3) -> (1, 127, 4)
// (1, _, 3) -> (1, _, 4)
// (_, _, _) -> (_, _, 1)
// (1, 127, 127) -> (2, _, _)
// (127, 127, 127) -> fail!
bool increment;
RETURN_NOT_OK(IncrementRangePartitionKey(row, &increment));
if (!increment) {
vector<string> components;
AppendRangeDebugStringComponentsOrMin(*row, &components);
return Status::InvalidArgument("Exclusive lower bound range partition key must not "
"have maximum values for all components",
RangeKeyDebugString(*row));
}
return Status::OK();
}
Status PartitionSchema::MakeUpperBoundRangePartitionKeyExclusive(KuduPartialRow* row) const {
// To transform an upper bound range partition key from inclusive to exclusive,
// the key must be incremented. Incrementing the key follows the same steps as
// turning an exclusive lower bound key into exclusive. Upper bound keys have
// two additional special cases:
//
// * For upper bound range partition keys with all columns unset, no
// transformation is needed (all unset columns signifies unbounded,
// so there is no difference between inclusive and exclusive).
//
// * For an upper bound key that can't be incremented because all components
// are the maximum value, all columns are unset in order to transform it to
// an unbounded upper bound (this is a special case increment).
bool all_unset = true;
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t idx = row->schema()->find_column_by_id(column_id);
if (idx == Schema::kColumnNotFound) {
return Status::InvalidArgument(Substitute("range partition column ID $0 "
"not found in range partition key schema.",
column_id));
}
all_unset = !row->IsColumnSet(idx);
if (!all_unset) break;
}
if (all_unset) return Status::OK();
bool increment;
RETURN_NOT_OK(IncrementRangePartitionKey(row, &increment));
if (!increment) {
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t idx = row->schema()->find_column_by_id(column_id);
RETURN_NOT_OK(row->Unset(idx));
}
}
return Status::OK();
}
Status PartitionSchema::GetRangeSchemaColumnIndexes(const Schema& schema,
vector<int32_t>* range_column_idxs) const {
for (const ColumnId& column_id : range_schema_.column_ids) {
int32_t idx = schema.find_column_by_id(column_id);
if (idx == Schema::kColumnNotFound) {
return Status::InvalidArgument(Substitute("range partition column ID $0 "
"not found in range partition key schema.",
column_id));
}
range_column_idxs->push_back(idx);
}
return Status::OK();
}
int32_t PartitionSchema::TryGetSingleColumnHashPartitionIndex(const Schema& schema,
int32_t col_idx) const {
const ColumnId column_id = schema.column_id(col_idx);
for (int i = 0; i < hash_partition_schemas().size(); ++i) {
auto hash_partition = hash_partition_schemas()[i];
if (hash_partition.column_ids.size() == 1 && hash_partition.column_ids[0] == column_id) {
return i;
}
}
return -1;
}
bool PartitionSchema::IsColumnSingleRangeSchema(const Schema& schema, int32_t col_idx) const {
const ColumnId column_id = schema.column_id(col_idx);
return range_partition_schema().column_ids.size() == 1 &&
range_partition_schema().column_ids[0] == column_id;
}
} // namespace kudu