blob: d8469e08eee0116dd944180af898a6233cf4db1f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/common/scan_spec.h"
#include <algorithm>
#include <string>
#include <utility>
#include <vector>
#include "kudu/common/key_util.h"
#include "kudu/common/row.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/util/auto_release_pool.h"
using std::any_of;
using std::max;
using std::move;
using std::string;
using std::vector;
namespace kudu {
void ScanSpec::AddPredicate(ColumnPredicate pred) {
ColumnPredicate* predicate = FindOrNull(predicates_, pred.column().name());
if (predicate != nullptr) {
predicate->Merge(pred);
} else {
string column_name = pred.column().name();
predicates_.emplace(move(column_name), move(pred));
}
}
void ScanSpec::RemovePredicate(const string& column_name) {
predicates_.erase(column_name);
}
void ScanSpec::RemovePredicates() {
predicates_.clear();
}
bool ScanSpec::CanShortCircuit() const {
if (lower_bound_key_ &&
exclusive_upper_bound_key_ &&
lower_bound_key_->encoded_key().compare(exclusive_upper_bound_key_->encoded_key()) >= 0) {
return false;
}
return any_of(predicates_.begin(), predicates_.end(),
[] (const pair<string, ColumnPredicate>& predicate) {
return predicate.second.predicate_type() == PredicateType::None;
});
}
void ScanSpec::SetLowerBoundKey(const EncodedKey* key) {
if (lower_bound_key_ == nullptr ||
key->encoded_key().compare(lower_bound_key_->encoded_key()) > 0) {
lower_bound_key_ = key;
}
}
void ScanSpec::SetExclusiveUpperBoundKey(const EncodedKey* key) {
if (exclusive_upper_bound_key_ == nullptr ||
key->encoded_key().compare(exclusive_upper_bound_key_->encoded_key()) < 0) {
exclusive_upper_bound_key_ = key;
}
}
void ScanSpec::SetLowerBoundPartitionKey(const Slice& partitionKey) {
if (partitionKey.compare(lower_bound_partition_key_) > 0) {
lower_bound_partition_key_ = partitionKey.ToString();
}
}
void ScanSpec::SetExclusiveUpperBoundPartitionKey(const Slice& partitionKey) {
if (exclusive_upper_bound_partition_key_.empty() ||
(!partitionKey.empty() && partitionKey.compare(exclusive_upper_bound_partition_key_) < 0)) {
exclusive_upper_bound_partition_key_ = partitionKey.ToString();
}
}
string ScanSpec::ToString(const Schema& schema) const {
vector<string> preds;
if (lower_bound_key_ || exclusive_upper_bound_key_) {
preds.push_back(EncodedKey::RangeToStringWithSchema(lower_bound_key_,
exclusive_upper_bound_key_,
schema));
}
// List predicates in stable order.
for (int idx = 0; idx < schema.num_columns(); idx++) {
const ColumnPredicate* predicate = FindOrNull(predicates_, schema.column(idx).name());
if (predicate != nullptr) {
preds.push_back(predicate->ToString());
}
}
return JoinStrings(preds, " AND ");
}
void ScanSpec::OptimizeScan(const Schema& schema,
Arena* arena,
AutoReleasePool* pool,
bool remove_pushed_predicates) {
// Don't bother if we can already short circuit the scan. This also lets us
// rely on lower_bound_key_ < exclusive_upper_bound_key_ and no None
// predicates in the optimization step.
if (!CanShortCircuit()) {
LiftPrimaryKeyBounds(schema, arena);
PushPredicatesIntoPrimaryKeyBounds(schema, arena, pool, remove_pushed_predicates);
}
}
void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const Schema& schema,
Arena* arena,
AutoReleasePool* pool,
bool remove_pushed_predicates) {
// Step 1: load key column predicate values into a pair of rows.
uint8_t* lower_buf = static_cast<uint8_t*>(
CHECK_NOTNULL(arena->AllocateBytes(schema.key_byte_size())));
uint8_t* upper_buf = static_cast<uint8_t*>(
CHECK_NOTNULL(arena->AllocateBytes(schema.key_byte_size())));
ContiguousRow lower_key(&schema, lower_buf);
ContiguousRow upper_key(&schema, upper_buf);
int lower_bound_predicates_pushed = key_util::PushLowerBoundPrimaryKeyPredicates(
predicates_, &lower_key, arena);
int upper_bound_predicates_pushed = key_util::PushUpperBoundPrimaryKeyPredicates(
predicates_, &upper_key, arena);
// Step 2: Erase pushed predicates
// Predicates through the first range predicate may be erased.
if (remove_pushed_predicates) {
for (int32_t col_idx = 0;
col_idx < max(lower_bound_predicates_pushed, upper_bound_predicates_pushed);
col_idx++) {
const string& column = schema.column(col_idx).name();
PredicateType type = FindOrDie(predicates_, schema.column(col_idx).name()).predicate_type();
if (type == PredicateType::Equality) {
RemovePredicate(column);
} else if (type == PredicateType::Range) {
RemovePredicate(column);
break;
} else {
LOG(FATAL) << "Can not remove unknown predicate type";
}
}
}
// Step 3: set the new bounds
if (lower_bound_predicates_pushed > 0) {
EncodedKey* lower = EncodedKey::FromContiguousRow(ConstContiguousRow(lower_key)).release();
pool->Add(lower);
SetLowerBoundKey(lower);
}
if (upper_bound_predicates_pushed > 0) {
EncodedKey* upper = EncodedKey::FromContiguousRow(ConstContiguousRow(upper_key)).release();
pool->Add(upper);
SetExclusiveUpperBoundKey(upper);
}
}
void ScanSpec::LiftPrimaryKeyBounds(const Schema& schema, Arena* arena) {
if (lower_bound_key_ == nullptr && exclusive_upper_bound_key_ == nullptr) { return; }
int32_t num_key_columns = schema.num_key_columns();
for (int32_t col_idx = 0; col_idx < num_key_columns; col_idx++) {
const ColumnSchema& column = schema.column(col_idx);
const void* lower = lower_bound_key_ == nullptr
? nullptr : lower_bound_key_->raw_keys()[col_idx];
const void* upper = exclusive_upper_bound_key_ == nullptr
? nullptr : exclusive_upper_bound_key_->raw_keys()[col_idx];
if (lower != nullptr && upper != nullptr && column.Compare(lower, upper) == 0) {
// We are still in the equality prefix of the bounds
AddPredicate(ColumnPredicate::Equality(column, lower));
} else {
// Determine if the upper bound column value is exclusive or inclusive.
// The value is exclusive if all of the remaining column values are
// equal to the minimum value, otherwise it's inclusive.
//
// examples, with key columns: (int8 a, int8 b):
//
// key >= (1, 2)
// < (3, min)
// should result in predicate 1 <= a < 3
//
// key >= (1, 2)
// < (3, 5)
// should result in predicate 1 <= a < 4
bool is_exclusive = true;
if (upper != nullptr) {
uint8_t min[kLargestTypeSize];
for (int32_t suffix_idx = col_idx + 1;
is_exclusive && suffix_idx < num_key_columns;
suffix_idx++) {
const ColumnSchema& suffix_column = schema.column(suffix_idx);
suffix_column.type_info()->CopyMinValue(min);
const void* suffix_val = exclusive_upper_bound_key_->raw_keys()[suffix_idx];
is_exclusive &= suffix_column.type_info()->Compare(suffix_val, min) == 0;
}
}
if (is_exclusive) {
ColumnPredicate predicate = ColumnPredicate::Range(column, lower, upper);
if (predicate.predicate_type() == PredicateType::Equality &&
col_idx + 1 < num_key_columns) {
// If this turns out to be an equality predicate, then we can add one
// more lower bound predicate from the next component (if it exists).
//
// example, with key columns (int8 a, int8 b):
//
// key >= (2, 3)
// key < (3, min)
//
// should result in the predicates:
//
// a = 2
// b >= 3
AddPredicate(ColumnPredicate::Range(schema.column(col_idx + 1),
lower_bound_key_->raw_keys()[col_idx + 1],
nullptr));
}
AddPredicate(move(predicate));
} else {
auto pred = ColumnPredicate::InclusiveRange(column, lower, upper, arena);
if (pred) AddPredicate(*pred);
}
break;
}
}
}
} // namespace kudu