blob: 3b228926b24fbcbc1d520c660c5b84ac6f659389 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/common/partition_pruner.h"
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <iterator>
#include <memory>
#include <numeric>
#include <string>
#include <unordered_map>
#include <vector>
#include <glog/logging.h>
#include "kudu/common/column_predicate.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/key_encoder.h"
#include "kudu/common/key_util.h"
#include "kudu/common/partition.h"
#include "kudu/common/row.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/array_view.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/slice.h"
using std::distance;
using std::find;
using std::iota;
using std::lower_bound;
using std::memcpy;
using std::move;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
namespace kudu {
namespace {
// Returns true if the partition schema's range columns are a prefix of the
// primary key columns.
bool AreRangeColumnsPrefixOfPrimaryKey(const Schema& schema,
const vector<ColumnId>& range_columns) {
CHECK(range_columns.size() <= schema.num_key_columns());
for (int32_t col_idx = 0; col_idx < range_columns.size(); ++col_idx) {
if (schema.column_id(col_idx) != range_columns[col_idx]) {
return false;
return true;
// Translates the scan primary key bounds into range keys. This should only be
// used when the range columns are a prefix of the primary key columns.
void EncodeRangeKeysFromPrimaryKeyBounds(const Schema& schema,
const ScanSpec& scan_spec,
size_t num_range_columns,
string* range_key_start,
string* range_key_end) {
if (scan_spec.lower_bound_key() == nullptr && scan_spec.exclusive_upper_bound_key() == nullptr) {
// Don't bother if there are no lower and upper PK bounds
if (num_range_columns == schema.num_key_columns()) {
// The range columns are the primary key columns, so the range key is the
// primary key.
if (scan_spec.lower_bound_key() != nullptr) {
*range_key_start = scan_spec.lower_bound_key()->encoded_key().ToString();
if (scan_spec.exclusive_upper_bound_key() != nullptr) {
*range_key_end = scan_spec.exclusive_upper_bound_key()->encoded_key().ToString();
} else {
// The range-partition key columns are a prefix of the primary key columns.
// Copy the column values over to a row, and then encode the row as a range
// key.
vector<int32_t> col_idxs(num_range_columns);
iota(col_idxs.begin(), col_idxs.end(), 0);
unique_ptr<uint8_t[]> buf(new uint8_t[schema.key_byte_size()]);
ContiguousRow row(&schema, buf.get());
if (scan_spec.lower_bound_key() != nullptr) {
for (int32_t idx : col_idxs) {
key_util::EncodeKey(col_idxs, row, range_key_start);
if (scan_spec.exclusive_upper_bound_key() != nullptr) {
for (int32_t idx : col_idxs) {
// Determine if the upper bound primary key columns which aren't in the
// range-partition key are all set to the minimum value. If so, the
// range-partition key prefix of the primary key is already effectively an
// exclusive bound. If not, then we increment the range-key prefix in
// order to transform it from inclusive to exclusive.
bool min_suffix = true;
for (int32_t idx = num_range_columns; idx < schema.num_key_columns(); ++idx) {
min_suffix &= schema.column(idx)
Arena arena(std::max<size_t>(Arena::kMinimumChunkSize, schema.key_byte_size()));
if (!min_suffix) {
if (!key_util::IncrementPrimaryKey(&row, num_range_columns, &arena)) {
// The range-partition key upper bound can't be incremented, which
// means it's an inclusive bound on the maximum possible value, so
// skip it.
key_util::EncodeKey(col_idxs, row, range_key_end);
// Push the scan predicates into the range keys.
void EncodeRangeKeysFromPredicates(const Schema& schema,
const unordered_map<string, ColumnPredicate>& predicates,
const vector<ColumnId>& range_columns,
string* range_key_start,
string* range_key_end) {
// Find the column indexes of the range columns.
vector<int32_t> col_idxs;
for (const auto& column : range_columns) {
int32_t col_idx = schema.find_column_by_id(column);
CHECK_NE(Schema::kColumnNotFound, col_idx);
CHECK_LT(col_idx, schema.num_key_columns());
// Arenas must be at least the minimum chunk size, and we require at least
// enough space for the range key columns.
Arena arena(std::max<size_t>(Arena::kMinimumChunkSize, schema.key_byte_size()));
uint8_t* buf = static_cast<uint8_t*>(CHECK_NOTNULL(arena.AllocateBytes(schema.key_byte_size())));
ContiguousRow row(&schema, buf);
if (key_util::PushLowerBoundKeyPredicates(col_idxs, predicates, &row, &arena) > 0) {
key_util::EncodeKey(col_idxs, row, range_key_start);
if (key_util::PushUpperBoundKeyPredicates(col_idxs, predicates, &row, &arena) > 0) {
key_util::EncodeKey(col_idxs, row, range_key_end);
} // anonymous namespace
vector<bool> PartitionPruner::PruneHashComponent(
const PartitionSchema::HashDimension& hash_dimension,
const Schema& schema,
const ScanSpec& scan_spec) {
vector<bool> hash_bucket_bitset(hash_dimension.num_buckets, false);
vector<string> encoded_strings(1, "");
for (size_t col_offset = 0; col_offset < hash_dimension.column_ids.size(); ++col_offset) {
vector<string> new_encoded_strings;
const ColumnSchema& column = schema.column_by_id(hash_dimension.column_ids[col_offset]);
const ColumnPredicate& predicate = FindOrDie(scan_spec.predicates(),;
const KeyEncoder<string>& encoder = GetKeyEncoder<string>(column.type_info());
vector<const void*> predicate_values;
if (predicate.predicate_type() == PredicateType::Equality) {
} else {
CHECK(predicate.predicate_type() == PredicateType::InList);
// For each of the encoded string, replicate it by the number of values in
// equality and in-list predicate.
for (const string& encoded_string : encoded_strings) {
for (const void* predicate_value : predicate_values) {
string new_encoded_string = encoded_string;
col_offset + 1 == hash_dimension.column_ids.size(),
for (const string& encoded_string : encoded_strings) {
uint32_t hash_value = PartitionSchema::HashValueForEncodedColumns(
encoded_string, hash_dimension);
hash_bucket_bitset[hash_value] = true;
return hash_bucket_bitset;
vector<PartitionPruner::PartitionKeyRange> PartitionPruner::ConstructPartitionKeyRanges(
const Schema& schema,
const ScanSpec& scan_spec,
const PartitionSchema::HashSchema& hash_schema,
const RangeBounds& range_bounds) {
// Create the hash bucket portion of the partition key.
// The list of hash buckets bitset per hash component
vector<vector<bool>> hash_bucket_bitsets;
for (const auto& hash_dimension : hash_schema) {
bool can_prune = true;
for (const auto& column_id : hash_dimension.column_ids) {
const ColumnSchema& column = schema.column_by_id(column_id);
const ColumnPredicate* predicate = FindOrNull(scan_spec.predicates(),;
if (predicate == nullptr ||
(predicate->predicate_type() != PredicateType::Equality &&
predicate->predicate_type() != PredicateType::InList)) {
can_prune = false;
if (can_prune) {
auto hash_bucket_bitset = PruneHashComponent(
hash_dimension, schema, scan_spec);
} else {
hash_bucket_bitsets.emplace_back(hash_dimension.num_buckets, true);
// The index of the final constrained component in the partition key.
size_t constrained_index;
if (!range_bounds.lower.empty() || !range_bounds.upper.empty()) {
// The range component is constrained.
constrained_index = hash_schema.size();
} else {
// Search the hash bucket constraints from right to left, looking for the
// first constrained component.
constrained_index = hash_schema.size() -
[] (const vector<bool>& x) {
return std::find(x.begin(), x.end(), false) != x.end();
// Build up a set of partition key ranges out of the hash components.
// Each hash component simply appends its bucket number to the
// partition key ranges (possibly incrementing the upper bound by one bucket
// number if this is the final constraint, see note 2 in the example above).
vector<PartitionKeyRange> partition_key_ranges(1);
const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
for (size_t hash_idx = 0; hash_idx < constrained_index; ++hash_idx) {
// This is the final partition key component if this is the final constrained
// bucket, and the range upper bound is empty. In this case we need to
// increment the bucket on the upper bound to convert from inclusive to
// exclusive.
bool is_last = hash_idx + 1 == constrained_index && range_bounds.upper.empty();
vector<PartitionKeyRange> new_partition_key_ranges;
for (const auto& key_range : partition_key_ranges) {
const vector<bool>& buckets_bitset = hash_bucket_bitsets[hash_idx];
for (uint32_t bucket = 0; bucket < buckets_bitset.size(); ++bucket) {
if (!buckets_bitset[bucket]) {
uint32_t bucket_upper = is_last ? bucket + 1 : bucket;
auto lower = key_range.start.hash_key();
auto upper = key_range.end.hash_key();
hash_encoder.Encode(&bucket, &lower);
hash_encoder.Encode(&bucket_upper, &upper);
{ lower, key_range.start.range_key() },
{ upper, key_range.end.range_key() }});
// Append the (possibly empty) range bounds to the partition key ranges.
for (auto& range : partition_key_ranges) {
// Remove all partition key ranges past the scan spec's upper bound partition key.
const auto& upper_bound_partition_key = scan_spec.exclusive_upper_bound_partition_key();
if (!upper_bound_partition_key.empty()) {
for (auto range = partition_key_ranges.rbegin();
range != partition_key_ranges.rend();
++range) {
if (!(*range).end.empty() && upper_bound_partition_key >= (*range).end) {
if (upper_bound_partition_key <= (*range).start) {
} else {
(*range).end = upper_bound_partition_key;
return partition_key_ranges;
// NOTE: the lower ranges are inclusive, the upper ranges are exclusive.
void PartitionPruner::PrepareRangeSet(
const string& scan_lower_bound,
const string& scan_upper_bound,
const PartitionSchema::HashSchema& table_wide_hash_schema,
const PartitionSchema::RangesWithHashSchemas& ranges,
PartitionSchema::RangesWithHashSchemas* result_ranges) {
CHECK(scan_upper_bound.empty() || scan_lower_bound < scan_upper_bound);
// If there aren't any ranges with custom hash schemas or there isn't an
// intersection between the set of ranges with custom hash schemas and the
// scan range, the result is trivial: the whole scan range is attributed
// to the table-wide hash schema.
if (ranges.empty() ||
(!scan_upper_bound.empty() && scan_upper_bound < ranges.front().lower) ||
(!scan_lower_bound.empty() && !ranges.back().upper.empty() &&
ranges.back().upper <= scan_lower_bound)) {
*result_ranges =
{ { scan_lower_bound, scan_upper_bound, table_wide_hash_schema } };
// Find the first range that is at or after the specified bounds.
const auto range_it = std::lower_bound(ranges.cbegin(), ranges.cend(),
RangeBounds{scan_lower_bound, scan_upper_bound},
[] (const PartitionSchema::RangeWithHashSchema& range,
const RangeBounds& bounds) {
// return true if range < bounds
return !range.upper.empty() && range.upper <= bounds.lower;
CHECK(range_it != ranges.cend());
// Current position of the iterator.
string cur_point = scan_lower_bound;
// Index of the known range with custom hash schema that the iterator is
// currently pointing at or about to point if the iterator is currently
// at the scan boundary.
size_t cur_idx = distance(ranges.begin(), range_it);
CHECK_LT(cur_idx, ranges.size());
// Iterate over the scan range from one known boundary to the next one,
// enumerating the resulting consecutive sub-ranges and attributing each
// sub-range to a proper hash schema. If that's a known range with custom hash
// schema, it's attributed to its range-specific hash schema; otherwise,
// a sub-range is attributed to the table-wide hash schema.
PartitionSchema::RangesWithHashSchemas result;
while (cur_idx < ranges.size() &&
(cur_point < scan_upper_bound || scan_upper_bound.empty())) {
// Check the disposition of cur_point related to the lower boundary
// of the range pointed to by 'cur_idx'.
const auto& cur_range = ranges[cur_idx];
if (cur_point < cur_range.lower) {
// The iterator is before the current range:
// |---|
// ^
// The next known bound is either the upper bound of the current range
// or the upper bound of the scan.
auto upper_bound = scan_upper_bound.empty()
? cur_range.lower : std::min(cur_range.lower, scan_upper_bound);
cur_point, upper_bound, table_wide_hash_schema});
// Not advancing the 'cur_idx' since cur_point is either at the beginning
// of the range or before it at the upper bound of the scan.
} else if (cur_point == cur_range.lower) {
// The iterator is at the lower boundary of the current range:
// |---|
// ^
if ((!cur_range.upper.empty() && cur_range.upper <= scan_upper_bound) ||
scan_upper_bound.empty()) {
// The current range is within the scan boundaries.
} else {
// The current range spans over the upper bound of the scan.
cur_point, scan_upper_bound, cur_range.hash_schema});
// Done with the current range, advance to the next one, if any.
} else {
// The iterator is ahead of the current range's lower boundary:
// |---|
// ^
if ((!scan_upper_bound.empty() && scan_upper_bound <= cur_range.upper) ||
cur_range.upper.empty()) {
cur_point, scan_upper_bound, cur_range.hash_schema});
} else {
cur_point, cur_range.upper, cur_range.hash_schema});
// Done with the current range, advance to the next one, if any.
// Advance the iterator.
cur_point = result.back().upper;
// If exiting from the cycle above by the 'cur_idx < ranges.size()' condition,
// check if the upper bound of the scan is beyond the upper bound of the last
// range with custom hash schema. If so, add an extra range that spans from
// the upper bound of the last range to the upper bound of the scan.
if (result.back().upper != scan_upper_bound) {
DCHECK_EQ(cur_point, result.back().upper);
cur_point, scan_upper_bound, table_wide_hash_schema});
*result_ranges = std::move(result);
void PartitionPruner::Init(const Schema& schema,
const PartitionSchema& partition_schema,
const ScanSpec& scan_spec) {
// If we can already short circuit the scan, we don't need to bother with
// partition pruning. This also allows us to assume some invariants of the
// scan spec, such as no None predicates and that the lower bound PK < upper
// bound PK.
if (scan_spec.CanShortCircuit()) { return; }
// Build a set of partition key ranges which cover the tablets necessary for
// the scan.
// Example predicate sets and resulting partition key ranges, based on the
// following tablet schema:
// CREATE TABLE t (a INT32, b INT32, c INT32) PRIMARY KEY (a, b, c)
// Assume that hash(0) = 0 and hash(2) = 2.
// | Predicates | Partition Key Ranges |
// +------------+--------------------------------------------------------+
// | a = 0 | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
// | b = 2 | |
// | c = 0 | |
// +------------+--------------------------------------------------------+
// | a = 0 | [(bucket=0, bucket=2), (bucket=0, bucket=3)) |
// | b = 2 | |
// +------------+--------------------------------------------------------+
// | a = 0 | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
// | c = 0 | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
// | | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
// +------------+--------------------------------------------------------+
// | b = 2 | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
// | c = 0 | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
// +------------+--------------------------------------------------------+
// | a = 0 | [(bucket=0), (bucket=1)) |
// +------------+--------------------------------------------------------+
// | b = 2 | [(bucket=0, bucket=2), (bucket=0, bucket=3)) |
// | | [(bucket=1, bucket=2), (bucket=1, bucket=3)) |
// +------------+--------------------------------------------------------+
// | c = 0 | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
// | | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
// | | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
// | | [(bucket=1, bucket=0, c=0), (bucket=1, bucket=0, c=1)) |
// | | [(bucket=1, bucket=1, c=0), (bucket=1, bucket=1, c=1)) |
// | | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
// +------------+--------------------------------------------------------+
// | None | [(), ()) |
// If the partition key is considered as a sequence of the hash bucket
// components and a range component, then a few patterns emerge from the
// examples above:
// 1) The partition keys are truncated after the final constrained component.
// Hash bucket components are constrained when the scan is limited to a
// subset of buckets via equality or in-list predicates on that component.
// Range components are constrained if they have an upper or lower bound
// via range or equality predicates on that component.
// 2) If the final constrained component is a hash bucket, then the
// corresponding bucket in the upper bound is incremented in order to make
// it an exclusive key.
// 3) The number of partition key ranges in the result is equal to the product
// of the number of buckets of each unconstrained hash component which come
// before a final constrained component. If there are no unconstrained hash
// components, then the number of resulting partition key ranges is one. Note
// that this can be a lot of ranges, and we may find we need to limit the
// algorithm to give up on pruning if the number of ranges exceeds a limit.
// Until this becomes a problem in practice, we'll continue always pruning,
// since it is precisely these highly-hash-partitioned tables which get the
// most benefit from pruning.
// Build the range portion of the partition key by using
// the lower and upper bounds specified by the scan.
string scan_range_lower_bound;
string scan_range_upper_bound;
const vector<ColumnId>& range_columns = partition_schema.range_schema_.column_ids;
if (!range_columns.empty()) {
if (AreRangeColumnsPrefixOfPrimaryKey(schema, range_columns)) {
} else {
if (partition_schema.ranges_with_custom_hash_schemas().empty()) {
auto partition_key_ranges = ConstructPartitionKeyRanges(
schema, scan_spec, partition_schema.hash_schema_,
{scan_range_lower_bound, scan_range_upper_bound});
move(partition_key_ranges.rbegin(), partition_key_ranges.rend(),
} else {
// Build the preliminary set of ranges: that's to convey information on
// range-specific hash schemas since some ranges in the table can have
// custom (i.e. different from the table-wide) hash schemas.
PartitionSchema::RangesWithHashSchemas preliminary_ranges;
// Construct partition key ranges from the ranges and their respective hash
// schemas that falls within the scan's bounds.
for (size_t i = 0; i < preliminary_ranges.size(); ++i) {
const auto& hash_schema = preliminary_ranges[i].hash_schema;
RangeBounds range_bounds {preliminary_ranges[i].lower, preliminary_ranges[i].upper};
auto partition_key_ranges = ConstructPartitionKeyRanges(
schema, scan_spec, hash_schema, range_bounds);
partition_key_ranges_.resize(partition_key_ranges_.size() + partition_key_ranges.size());
move(partition_key_ranges.begin(), partition_key_ranges.end(),
// Reverse the order of the partition key ranges, so that it is efficient
// to remove the partition key ranges from the vector in ascending order.
constexpr struct {
bool operator()(const PartitionKeyRange& lhs, const PartitionKeyRange& rhs) const {
return lhs.start > rhs.start;
} PartitionKeyRangeLess;
// Remove all partition key ranges before the scan spec's lower bound partition key.
if (!scan_spec.lower_bound_partition_key().empty()) {
bool PartitionPruner::HasMorePartitionKeyRanges() const {
return NumRangesRemaining() != 0;
const PartitionKey& PartitionPruner::NextPartitionKey() const {
return partition_key_ranges_.back().start;
void PartitionPruner::RemovePartitionKeyRange(const PartitionKey& upper_bound) {
if (upper_bound.empty()) {
for (auto range_it = partition_key_ranges_.rbegin();
range_it != partition_key_ranges_.rend();
++range_it) {
if (upper_bound <= (*range_it).start) { break; }
// Condition met if upper_bound lies in the middle of current partition key range
if ((*range_it).end.empty() || upper_bound < (*range_it).end) {
(*range_it).start = upper_bound;
} else {
bool PartitionPruner::ShouldPrune(const Partition& partition) const {
// range is an iterator that points to the first partition key range which
// overlaps or is greater than the partition.
auto range = lower_bound(partition_key_ranges_.rbegin(), partition_key_ranges_.rend(), partition,
[] (const PartitionKeyRange& scan_range, const Partition& partition) {
// return true if scan_range < partition
const auto& scan_upper = scan_range.end;
return !scan_upper.empty() && scan_upper <= partition.begin();
if (range == partition_key_ranges_.rend()) {
return true;
return !(partition.end().empty() || partition.end() > (*range).start);
string PartitionPruner::ToString(const Schema& schema,
const PartitionSchema& partition_schema) const {
vector<string> strings;
for (auto range = partition_key_ranges_.rbegin();
range != partition_key_ranges_.rend();
++range) {
"[($0), ($1))",
(*range).start.empty() ? "<start>" :
partition_schema.PartitionKeyDebugString((*range).start, schema),
(*range).end.empty() ? "<end>" :
partition_schema.PartitionKeyDebugString((*range).end, schema)));
return JoinStrings(strings, ", ");
} // namespace kudu