blob: dd7546707aa4a38baa9c85cc3bb4d09605194f28 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "storage/CSBTreeIndexSubBlock.hpp"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <memory>
#include <utility>
#include <vector>
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "compression/CompressionDictionary.hpp"
#include "expressions/predicate/ComparisonPredicate.hpp"
#include "expressions/predicate/PredicateCost.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
#include "storage/CompressedTupleStorageSubBlock.hpp"
#include "storage/StorageBlockLayout.pb.h"
#include "storage/StorageConstants.hpp"
#include "storage/StorageErrors.hpp"
#include "storage/SubBlockTypeRegistry.hpp"
#include "storage/TupleIdSequence.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/operations/comparisons/Comparison.hpp"
#include "types/operations/comparisons/ComparisonFactory.hpp"
#include "types/operations/comparisons/ComparisonID.hpp"
#include "types/operations/comparisons/ComparisonUtil.hpp"
#include "utility/BitVector.hpp"
#include "utility/Macros.hpp"
#include "utility/PtrVector.hpp"
#include "utility/ScopedBuffer.hpp"
using std::memcpy;
using std::memmove;
using std::pair;
using std::size_t;
using std::sort;
using std::uint8_t;
using std::uint16_t;
using std::uint32_t;
using std::vector;
namespace quickstep {
QUICKSTEP_REGISTER_INDEX(CSBTreeIndexSubBlock, CSB_TREE);
namespace csbtree_internal {
template <typename CodeType>
class CompressedCodeLessComparator : public UncheckedComparator {
public:
~CompressedCodeLessComparator() {
}
bool compareTypedValues(const TypedValue &left, const TypedValue &right) const {
FATAL_ERROR("Can not use CompressedCodeLessComparator to compare TypedValue.");
}
inline bool compareDataPtrs(const void *left, const void *right) const {
return compareDataPtrsInl(left, right);
}
inline bool compareDataPtrsInl(const void *left, const void *right) const {
return *static_cast<const CodeType*>(left) < *static_cast<const CodeType*>(right);
}
bool compareTypedValueWithDataPtr(const TypedValue &left, const void *right) const {
FATAL_ERROR("Can not use CompressedCodeLessComparator to compare TypedValue.");
}
bool compareDataPtrWithTypedValue(const void *left, const TypedValue &right) const {
FATAL_ERROR("Can not use CompressedCodeLessComparator to compare TypedValue.");
}
};
class EntryReference {
public:
EntryReference(TypedValue &&key,
const tuple_id tuple)
: key_(key), tuple_(tuple) {
}
inline const void* getDataPtr() const {
return key_.getDataPtr();
}
inline tuple_id getTupleID() const {
return tuple_;
}
private:
TypedValue key_;
tuple_id tuple_;
};
class CompressedEntryReference {
public:
CompressedEntryReference(const uint32_t key_code, const tuple_id tuple)
: key_code_(key_code), tuple_(tuple) {
}
inline const void* getDataPtr() const {
return &key_code_;
}
inline uint32_t getKeyCode() const {
return key_code_;
}
inline tuple_id getTupleID() const {
return tuple_;
}
private:
uint32_t key_code_;
tuple_id tuple_;
};
class CompositeEntryReference {
public:
CompositeEntryReference(void *composite_key_copy, const tuple_id tuple)
: composite_key_(composite_key_copy),
tuple_(tuple) {
}
inline const void* getDataPtr() const {
return composite_key_.get();
}
inline tuple_id getTupleID() const {
return tuple_;
}
private:
ScopedBuffer composite_key_;
tuple_id tuple_;
};
class CompressedEntryReferenceComparator {
public:
inline bool operator() (const CompressedEntryReference &left,
const CompressedEntryReference &right) const {
return left.getKeyCode() < right.getKeyCode();
}
};
class CompositeEntryReferenceComparator {
public:
explicit CompositeEntryReferenceComparator(const CompositeKeyLessComparator &comparator)
: internal_comparator_(comparator) {
}
inline bool operator() (const CompositeEntryReference &left,
const CompositeEntryReference &right) const {
return internal_comparator_.compareDataPtrsInl(left.getDataPtr(), right.getDataPtr());
}
private:
const CompositeKeyLessComparator &internal_comparator_;
};
CompositeKeyLessComparator::CompositeKeyLessComparator(
const CSBTreeIndexSubBlock &owner,
const CatalogRelationSchema &relation)
: owner_(owner) {
attribute_comparators_.reserve(owner_.indexed_attribute_ids_.size());
for (vector<attribute_id>::const_iterator it = owner_.indexed_attribute_ids_.begin();
it != owner_.indexed_attribute_ids_.end();
++it) {
const Type &attribute_type = relation.getAttributeById(*it)->getType();
DEBUG_ASSERT(!attribute_type.isVariableLength());
attribute_comparators_.push_back(
ComparisonFactory::GetComparison(ComparisonID::kLess).makeUncheckedComparatorForTypes(
attribute_type,
attribute_type));
}
}
bool CompositeKeyLessComparator::compareDataPtrsInl(const void *left, const void *right) const {
DEBUG_ASSERT(attribute_comparators_.size() == owner_.indexed_attribute_offsets_.size());
vector<size_t>::const_iterator offset_it = owner_.indexed_attribute_offsets_.begin();
for (PtrVector<UncheckedComparator>::const_iterator comparator_it = attribute_comparators_.begin();
comparator_it != attribute_comparators_.end();
++comparator_it, ++offset_it) {
if (comparator_it->compareDataPtrs(static_cast<const char*>(left) + *offset_it,
static_cast<const char*>(right) + *offset_it)) {
return true;
} else if (comparator_it->compareDataPtrs(static_cast<const char*>(right) + *offset_it,
static_cast<const char*>(left) + *offset_it)) {
return false;
}
// Attributes are equal, so loop procedes to compare next attribute in
// composite key.
}
return false; // Keys are exactly equal.
}
} // namespace csbtree_internal
const int CSBTreeIndexSubBlock::kNodeGroupNone = -1;
const int CSBTreeIndexSubBlock::kNodeGroupNextLeaf = -2;
const int CSBTreeIndexSubBlock::kNodeGroupFull = -3;
CSBTreeIndexSubBlock::CSBTreeIndexSubBlock(const TupleStorageSubBlock &tuple_store,
const IndexSubBlockDescription &description,
const bool new_block,
void *sub_block_memory,
const std::size_t sub_block_memory_size)
: IndexSubBlock(tuple_store,
description,
new_block,
sub_block_memory,
sub_block_memory_size),
initialized_(false),
key_may_be_compressed_(false),
key_is_compressed_(false),
key_is_nullable_(false),
key_type_(nullptr),
next_free_node_group_(kNodeGroupNone),
num_free_node_groups_(0) {
if (!DescriptionIsValid(relation_, description_)) {
FATAL_ERROR("Attempted to construct a CSBTreeIndexSubBlock from an invalid description.");
}
const int num_indexed_attributes = description_.indexed_attribute_ids_size();
if (num_indexed_attributes > 1) {
key_is_composite_ = true;
} else {
key_is_composite_ = false;
const attribute_id key_id = description_.indexed_attribute_ids(0);
key_type_ = &(relation_.getAttributeById(key_id)->getType());
}
indexed_attribute_ids_.reserve(num_indexed_attributes);
indexed_attribute_offsets_.reserve(num_indexed_attributes);
for (int indexed_attribute_num = 0;
indexed_attribute_num < num_indexed_attributes;
++indexed_attribute_num) {
const attribute_id indexed_attribute_id = description_.indexed_attribute_ids(indexed_attribute_num);
indexed_attribute_ids_.push_back(indexed_attribute_id);
// TODO(chasseur): Support a composite key with compressed parts.
if ((!key_is_composite_) && tuple_store_.isCompressed()) {
const CompressedTupleStorageSubBlock &compressed_tuple_store
= static_cast<const CompressedTupleStorageSubBlock&>(tuple_store_);
if (compressed_tuple_store.compressedBlockIsBuilt()) {
if (compressed_tuple_store.compressedAttributeIsDictionaryCompressed(indexed_attribute_id)
|| compressed_tuple_store.compressedAttributeIsTruncationCompressed(indexed_attribute_id)) {
key_may_be_compressed_ = true;
}
} else {
if (compressed_tuple_store.compressedUnbuiltBlockAttributeMayBeCompressed(indexed_attribute_id)) {
key_may_be_compressed_ = true;
}
}
}
}
bool initialize_now = true;
if (key_may_be_compressed_) {
if (!static_cast<const CompressedTupleStorageSubBlock&>(tuple_store_).compressedBlockIsBuilt()) {
initialize_now = false;
}
}
if (initialize_now) {
if (new_block) {
initialize(new_block);
} else {
if (!initialize(new_block)) {
throw MalformedBlock();
}
}
}
}
bool CSBTreeIndexSubBlock::DescriptionIsValid(const CatalogRelationSchema &relation,
const IndexSubBlockDescription &description) {
// Make sure description is initialized and specifies CSBTree.
if (!description.IsInitialized()) {
return false;
}
if (description.sub_block_type() != IndexSubBlockDescription::CSB_TREE) {
return false;
}
// Make sure at least one key attribute is specified.
if (description.indexed_attribute_ids_size() == 0) {
return false;
}
// TODO(chasseur): Consider checking key-length here.
// Check that all key attributes exist and are fixed-length.
for (int indexed_attribute_num = 0;
indexed_attribute_num < description.indexed_attribute_ids_size();
++indexed_attribute_num) {
const attribute_id indexed_attribute_id = description.indexed_attribute_ids(indexed_attribute_num);
if (!relation.hasAttributeWithId(indexed_attribute_id)) {
return false;
}
const Type &attr_type = relation.getAttributeById(indexed_attribute_id)->getType();
if (attr_type.isVariableLength()) {
return false;
}
}
return true;
}
// TODO(chasseur): Make this more accurate by actually considering the internal
// structure of a CSBTreeIndexSubBlock. Also consider how to get better
// estimates when compression might be used.
std::size_t CSBTreeIndexSubBlock::EstimateBytesPerTuple(
const CatalogRelationSchema &relation,
const IndexSubBlockDescription &description) {
DEBUG_ASSERT(DescriptionIsValid(relation, description));
size_t key_length = 0;
for (int indexed_attribute_num = 0;
indexed_attribute_num < description.indexed_attribute_ids_size();
++indexed_attribute_num) {
key_length +=
relation.getAttributeById(description.indexed_attribute_ids(indexed_attribute_num))
->getType().maximumByteLength();
}
return (5 * key_length) >> 1;
}
std::size_t CSBTreeIndexSubBlock::EstimateBytesPerBlock(
const CatalogRelationSchema &relation,
const IndexSubBlockDescription &description) {
return kZeroSize;
}
bool CSBTreeIndexSubBlock::addEntry(const tuple_id tuple) {
DEBUG_ASSERT(initialized_);
DEBUG_ASSERT(tuple_store_.hasTupleWithID(tuple));
InsertReturnValue retval;
if (key_is_composite_) {
ScopedBuffer composite_key_buffer(makeKeyCopy(tuple));
if (key_is_nullable_) {
if (composite_key_buffer.empty()) {
// Don't insert a NULL key.
return true;
}
} else {
DEBUG_ASSERT(!composite_key_buffer.empty());
}
retval = rootInsertHelper(tuple,
composite_key_buffer.get(),
*composite_key_comparator_);
} else if (key_is_compressed_) {
// Don't insert a NULL key.
if (key_is_nullable_) {
TypedValue typed_key(tuple_store_.getAttributeValueTyped(tuple,
indexed_attribute_ids_.front()));
if (typed_key.isNull()) {
return true;
}
}
const CompressedTupleStorageSubBlock &compressed_tuple_store
= static_cast<const CompressedTupleStorageSubBlock&>(tuple_store_);
uint32_t code = compressed_tuple_store.compressedGetCode(tuple,
indexed_attribute_ids_.front());
switch (compressed_tuple_store.compressedGetCompressedAttributeSize(indexed_attribute_ids_.front())) {
case 1:
retval = compressedKeyAddEntryHelper<uint8_t>(tuple, code);
break;
case 2:
retval = compressedKeyAddEntryHelper<uint16_t>(tuple, code);
break;
case 4:
retval = compressedKeyAddEntryHelper<uint32_t>(tuple, code);
break;
default:
FATAL_ERROR("Unexpected compressed key byte-length (not 1, 2, or 4) encountered "
"in CSBTreeIndexSubBlock::addEntry()");
}
} else {
TypedValue typed_key(tuple_store_.getAttributeValueTyped(tuple, indexed_attribute_ids_.front()));
if (key_is_nullable_) {
if (typed_key.isNull()) {
// Don't insert a NULL key.
return true;
}
} else {
DEBUG_ASSERT(!typed_key.isNull());
}
InvokeOnLessComparatorForTypeIgnoreNullability(
*key_type_,
[&](const auto &comp) { // NOLINT(build/c++11)
retval = this->rootInsertHelper(tuple, typed_key.getDataPtr(), comp);
});
}
if (retval.new_node_group_id == kNodeGroupFull) {
// Needed to split a node group, but not enough space.
return false;
}
DEBUG_ASSERT(retval.new_node_group_id == kNodeGroupNone);
if (retval.split_node_least_key != NULL) {
// The root was split, must create a new root.
// Allocate the new root.
int new_root_group_id = allocateNodeGroup();
DEBUG_ASSERT(new_root_group_id >= 0);
void *new_root = getNode(new_root_group_id, 0);
// Set up the new root's header.
static_cast<NodeHeader*>(new_root)->num_keys = 1;
static_cast<NodeHeader*>(new_root)->is_leaf = false;
static_cast<NodeHeader*>(new_root)->node_group_reference = getRootNodeGroupNumber();
// Insert the split key into the new root.
memcpy(static_cast<char*>(new_root) + sizeof(NodeHeader),
retval.split_node_least_key,
key_length_bytes_);
// Update the root node group number.
setRootNodeGroupNumber(new_root_group_id);
}
return true;
}
bool CSBTreeIndexSubBlock::bulkAddEntries(const TupleIdSequence &tuples) {
DEBUG_ASSERT(initialized_);
// TODO(chasseur): Can possibly be more efficient in some cases if we sort
// and insert groups where possible.
for (TupleIdSequence::const_iterator it = tuples.begin();
it != tuples.end();
++it) {
if (!addEntry(*it)) {
// Ran out of space. Roll back.
for (TupleIdSequence::const_iterator fixer_it = tuples.begin();
fixer_it != it;
++fixer_it) {
removeEntry(*fixer_it);
}
return false;
}
}
return true;
}
void CSBTreeIndexSubBlock::removeEntry(const tuple_id tuple) {
DEBUG_ASSERT(initialized_);
if (key_is_composite_) {
ScopedBuffer composite_key_buffer(makeKeyCopy(tuple));
if (key_is_nullable_) {
if (composite_key_buffer.empty()) {
// Don't remove a NULL key (it would not have been inserted in the
// first place).
return;
}
} else {
DEBUG_ASSERT(!composite_key_buffer.empty());
}
removeEntryFromLeaf(tuple,
composite_key_buffer.get(),
*composite_key_comparator_,
findLeaf(getRootNode(),
composite_key_buffer.get(),
*composite_key_comparator_,
*composite_key_comparator_));
} else if (key_is_compressed_) {
// Don't remove a NULL key (it would not have been inserted in the first
// place).
if (key_is_nullable_) {
TypedValue typed_key(tuple_store_.getAttributeValueTyped(tuple,
indexed_attribute_ids_.front()));
if (typed_key.isNull()) {
return;
}
}
const CompressedTupleStorageSubBlock &compressed_tuple_store
= static_cast<const CompressedTupleStorageSubBlock&>(tuple_store_);
uint32_t code = compressed_tuple_store.compressedGetCode(tuple,
indexed_attribute_ids_.front());
switch (compressed_tuple_store.compressedGetCompressedAttributeSize(indexed_attribute_ids_.front())) {
case 1:
compressedKeyRemoveEntryHelper<uint8_t>(tuple, code);
break;
case 2:
compressedKeyRemoveEntryHelper<uint16_t>(tuple, code);
break;
case 4:
compressedKeyRemoveEntryHelper<uint32_t>(tuple, code);
break;
default:
FATAL_ERROR("Unexpected compressed key byte-length (not 1, 2, or 4) encountered "
"in CSBTreeIndexSubBlock::removeEntry()");
}
} else {
TypedValue typed_key(tuple_store_.getAttributeValueTyped(tuple, indexed_attribute_ids_.front()));
if (key_is_nullable_) {
if (typed_key.isNull()) {
// Don't remove a NULL key (it would not have been inserted in the
// first place).
return;
}
} else {
DEBUG_ASSERT(!typed_key.isNull());
}
InvokeOnLessComparatorForTypeIgnoreNullability(
*key_type_,
[&](const auto &comp) { // NOLINT(build/c++11)
this->removeEntryFromLeaf(tuple,
typed_key.getDataPtr(),
comp,
this->findLeaf(this->getRootNode(),
typed_key.getDataPtr(),
comp,
comp));
});
}
}
void CSBTreeIndexSubBlock::bulkRemoveEntries(const TupleIdSequence &tuples) {
DEBUG_ASSERT(initialized_);
// TODO(chasseur): Can possibly be more efficient in some cases if we sort
// and scan through leaves.
for (TupleIdSequence::const_iterator it = tuples.begin();
it != tuples.end();
++it) {
removeEntry(*it);
}
}
predicate_cost_t CSBTreeIndexSubBlock::estimatePredicateEvaluationCost(
const ComparisonPredicate &predicate) const {
if ((!key_is_composite_) && predicate.isAttributeLiteralComparisonPredicate()) {
std::pair<bool, attribute_id> comparison_attr = predicate.getAttributeFromAttributeLiteralComparison();
if (comparison_attr.second == indexed_attribute_ids_.front()) {
return predicate_cost::kTreeSearch;
}
}
return predicate_cost::kInfinite;
}
TupleIdSequence* CSBTreeIndexSubBlock::getMatchesForPredicate(const ComparisonPredicate &predicate,
const TupleIdSequence *filter) const {
DEBUG_ASSERT(initialized_);
if (key_is_composite_) {
// TODO(chasseur): Evaluate predicates on composite keys.
FATAL_ERROR("CSBTreeIndexSubBlock::getMatchesForPredicate() is unimplemented for composite keys.");
}
if (!predicate.isAttributeLiteralComparisonPredicate()) {
FATAL_ERROR("CSBTreeIndexSubBlock::getMatchesForPredicate() can not "
"evaluate predicates other than simple comparisons.");
}
const CatalogAttribute *comparison_attribute = NULL;
bool left_literal = false;
if (predicate.getLeftOperand().hasStaticValue()) {
DEBUG_ASSERT(predicate.getRightOperand().getDataSource() == Scalar::kAttribute);
comparison_attribute
= &(static_cast<const ScalarAttribute&>(predicate.getRightOperand()).getAttribute());
left_literal = true;
} else {
DEBUG_ASSERT(predicate.getLeftOperand().getDataSource() == Scalar::kAttribute);
comparison_attribute
= &(static_cast<const ScalarAttribute&>(predicate.getLeftOperand()).getAttribute());
left_literal = false;
}
if (comparison_attribute->getID() != indexed_attribute_ids_.front()) {
FATAL_ERROR("CSBTreeIndexSubBlock::getMatchesForPredicate() can not "
"evaluate predicates on non-indexed attributes.");
}
TypedValue comparison_literal;
const Type* comparison_literal_type;
if (left_literal) {
comparison_literal = predicate.getLeftOperand().getStaticValue().makeReferenceToThis();
comparison_literal_type = &(predicate.getLeftOperand().getType());
} else {
comparison_literal = predicate.getRightOperand().getStaticValue().makeReferenceToThis();
comparison_literal_type = &(predicate.getRightOperand().getType());
}
if (comparison_literal.isNull()) {
return new TupleIdSequence(tuple_store_.getMaxTupleID() + 1);
}
// If the literal is on the left, flip the comparison around.
ComparisonID comp = predicate.getComparison().getComparisonID();
if (left_literal) {
flipComparisonID(comp);
}
TupleIdSequence *idx_result;
if (key_is_compressed_) {
idx_result = evaluateComparisonPredicateOnCompressedKey(
comp,
comparison_literal,
*comparison_literal_type);
} else {
idx_result = evaluateComparisonPredicateOnUncompressedKey(
comp,
comparison_literal,
*comparison_literal_type);
}
if (filter != nullptr) {
idx_result->intersectWith(*filter);
}
return idx_result;
}
bool CSBTreeIndexSubBlock::rebuild() {
if (!initialized_) {
if (!initialize(false)) {
return false;
}
}
clearIndex();
if (tuple_store_.isEmpty()) {
return true;
}
if (!rebuildSpaceCheck()) {
return false;
}
vector<int> node_groups_this_level;
// Rebuild leaves.
uint16_t nodes_in_last_group = rebuildLeaves(&node_groups_this_level);
// Keep building intermediate levels from the bottom up until there is a
// single root node.
while (!((node_groups_this_level.size() == 1) && (nodes_in_last_group == 1))) {
vector<int> node_groups_next_level;
nodes_in_last_group = rebuildInternalLevel(node_groups_this_level,
nodes_in_last_group,
&node_groups_next_level);
node_groups_this_level.swap(node_groups_next_level);
}
// Set the root number.
setRootNodeGroupNumber(node_groups_this_level.front());
return true;
}
bool CSBTreeIndexSubBlock::initialize(const bool new_block) {
if (key_may_be_compressed_) {
const CompressedTupleStorageSubBlock &compressed_tuple_store
= static_cast<const CompressedTupleStorageSubBlock&>(tuple_store_);
if (!compressed_tuple_store.compressedBlockIsBuilt()) {
FATAL_ERROR("CSBTreeIndexSubBlock::initialize() called with a key which "
"may be compressed before the associated TupleStorageSubBlock "
"was built.");
}
if (compressed_tuple_store.compressedAttributeIsDictionaryCompressed(indexed_attribute_ids_.front())
|| compressed_tuple_store.compressedAttributeIsTruncationCompressed(indexed_attribute_ids_.front())) {
key_is_compressed_ = true;
}
}
// Compute the number of bytes needed to store a key, and fill in the vector
// of indexed attribute offsets.
key_length_bytes_ = 0;
for (vector<attribute_id>::const_iterator indexed_attr_it = indexed_attribute_ids_.begin();
indexed_attr_it != indexed_attribute_ids_.end();
++indexed_attr_it) {
indexed_attribute_offsets_.push_back(key_length_bytes_);
const Type &attr_type = relation_.getAttributeById(*indexed_attr_it)->getType();
if (attr_type.isNullable()) {
key_is_nullable_ = true;
}
if (key_is_compressed_) {
key_length_bytes_ += static_cast<const CompressedTupleStorageSubBlock&>(tuple_store_)
.compressedGetCompressedAttributeSize(*indexed_attr_it);
} else {
key_length_bytes_ += attr_type.maximumByteLength();
}
}
DEBUG_ASSERT(key_length_bytes_ > 0);
key_tuple_id_pair_length_bytes_ = key_length_bytes_ + sizeof(tuple_id);
// Compute the number of keys that can be stored in internal and leaf nodes.
// Internal nodes are just a header and a list of keys.
max_keys_internal_ = (kCSBTreeNodeSizeBytes - sizeof(NodeHeader)) / key_length_bytes_;
// Leaf nodes are a header, plus a list of (key, tuple_id) pairs.
max_keys_leaf_ = (kCSBTreeNodeSizeBytes - sizeof(NodeHeader)) / key_tuple_id_pair_length_bytes_;
if ((max_keys_internal_ < 2) || (max_keys_leaf_ < 2)) {
if (new_block) {
throw CSBTreeKeyTooLarge();
} else {
return false;
}
}
// The number of child nodes allocated to each half of a split internal node.
small_half_num_children_ = (max_keys_internal_ + 1) >> 1;
large_half_num_children_ = small_half_num_children_ + ((max_keys_internal_ + 1) & 0x1);
small_half_num_keys_leaf_ = max_keys_leaf_ >> 1;
large_half_num_keys_leaf_ = (max_keys_leaf_ >> 1) + (max_keys_leaf_ & 0x1);
// Create the less-than comparator for a composite key.
if (key_is_composite_) {
composite_key_comparator_.reset(new csbtree_internal::CompositeKeyLessComparator(*this, relation_));
}
node_group_size_bytes_ = kCSBTreeNodeSizeBytes * (max_keys_internal_ + 1);
// Perform this computation on the order of bits.
size_t num_node_groups = ((sub_block_memory_size_ - sizeof(int)) << 3)
/ ((node_group_size_bytes_ << 3) + 1);
// Compute the number of bytes needed for this SubBlock's header. The header
// consists of the root node's node group number and a bitmap of free/used
// node groups.
size_t header_size_bytes = sizeof(int) + BitVector<false>::BytesNeeded(num_node_groups);
// Node groups start after the header, and should be aligned to
// kCSBTreeNodeSizeBytes (i.e. cache lines). In some circumstances, the
// alignment requirement forces us to use one less node group than would
// otherwise be possible.
node_groups_start_ = static_cast<char*>(sub_block_memory_) + header_size_bytes;
if (reinterpret_cast<size_t>(node_groups_start_) & (kCSBTreeNodeSizeBytes - 1)) {
node_groups_start_ = static_cast<char*>(node_groups_start_)
+ kCSBTreeNodeSizeBytes
- (reinterpret_cast<size_t>(node_groups_start_) & (kCSBTreeNodeSizeBytes - 1));
}
// Adjust num_node_groups as necessary for aligned nodes.
num_node_groups = (static_cast<char*>(sub_block_memory_) + sub_block_memory_size_
- static_cast<char*>(node_groups_start_)) / node_group_size_bytes_;
if (num_node_groups == 0) {
throw BlockMemoryTooSmall("CSBTreeIndex", sub_block_memory_size_);
}
// Set up the free/used node group bitmap and the free list.
node_group_used_bitmap_.reset(new BitVector<false>(static_cast<char*>(sub_block_memory_) + sizeof(int),
num_node_groups));
initialized_ = true;
if (new_block) {
clearIndex();
} else {
num_free_node_groups_ = num_node_groups - node_group_used_bitmap_->onesCount();
if (num_free_node_groups_ > 0) {
next_free_node_group_ = node_group_used_bitmap_->firstZero();
DEBUG_ASSERT(static_cast<size_t>(next_free_node_group_) < node_group_used_bitmap_->size());
}
}
return true;
}
void CSBTreeIndexSubBlock::clearIndex() {
// Reset the free node group bitmap.
DEBUG_ASSERT(node_group_used_bitmap_->size() > 0);
node_group_used_bitmap_->clear();
next_free_node_group_ = 0;
num_free_node_groups_ = node_group_used_bitmap_->size();
// Allocate the root node.
setRootNodeGroupNumber(allocateNodeGroup());
DEBUG_ASSERT(getRootNodeGroupNumber() >= 0);
// Initialize the root node as an empty leaf node.
NodeHeader *root_header = static_cast<NodeHeader*>(getRootNode());
root_header->num_keys = 0;
root_header->is_leaf = true;
root_header->node_group_reference = kNodeGroupNone;
}
void* CSBTreeIndexSubBlock::makeKeyCopy(const tuple_id tuple) const {
DEBUG_ASSERT(tuple_store_.hasTupleWithID(tuple));
DEBUG_ASSERT(indexed_attribute_ids_.size() == indexed_attribute_offsets_.size());
ScopedBuffer key_copy(key_length_bytes_);
vector<attribute_id>::const_iterator attr_it = indexed_attribute_ids_.begin();
for (vector<size_t>::const_iterator offset_it = indexed_attribute_offsets_.begin();
offset_it != indexed_attribute_offsets_.end();
++attr_it, ++offset_it) {
TypedValue attr_value(tuple_store_.getAttributeValueTyped(tuple, *attr_it));
if (attr_value.isNull()) {
return NULL;
}
attr_value.copyInto(static_cast<char*>(key_copy.get()) + *offset_it);
}
return key_copy.release();
}
const void* CSBTreeIndexSubBlock::getLeastKey(const void *node) const {
if (static_cast<const NodeHeader*>(node)->is_leaf) {
if (static_cast<const NodeHeader*>(node)->num_keys) {
return static_cast<const char*>(node) + sizeof(NodeHeader);
} else {
return NULL;
}
} else {
DEBUG_ASSERT(static_cast<const NodeHeader*>(node)->num_keys);
const void *least_key = getLeastKey(getNode(static_cast<const NodeHeader*>(node)->node_group_reference, 0));
if (least_key == NULL) {
// If the leftmost child leaf was empty, can just use the first key here.
return static_cast<const char*>(node) + sizeof(NodeHeader);
}
return least_key;
}
}
template <typename LiteralLessKeyComparatorT,
typename KeyLessLiteralComparatorT>
void* CSBTreeIndexSubBlock::findLeaf(
const void *node,
const void *literal,
const LiteralLessKeyComparatorT &literal_less_key_comparator,
const KeyLessLiteralComparatorT &key_less_literal_comparator) const {
const NodeHeader *node_header = static_cast<const NodeHeader*>(node);
if (node_header->is_leaf) {
return const_cast<void*>(node);
}
for (uint16_t key_num = 0;
key_num < node_header->num_keys;
++key_num) {
if (literal_less_key_comparator.compareDataPtrsInl(literal,
static_cast<const char*>(node)
+ sizeof(NodeHeader)
+ key_num * key_length_bytes_)) {
return findLeaf(getNode(node_header->node_group_reference, key_num),
literal,
literal_less_key_comparator,
key_less_literal_comparator);
} else if (!key_less_literal_comparator.compareDataPtrsInl(static_cast<const char*>(node)
+ sizeof(NodeHeader)
+ key_num * key_length_bytes_,
literal)) {
// Handle the special case where duplicate keys are spread across
// multiple nodes.
// TODO(chasseur): If duplicate keys are not allowed, this branch is
// not necessary, and searches can be done more efficiently.
return findLeaf(getNode(node_header->node_group_reference, key_num),
literal,
literal_less_key_comparator,
key_less_literal_comparator);
}
}
return findLeaf(getNode(node_header->node_group_reference, node_header->num_keys),
literal,
literal_less_key_comparator,
key_less_literal_comparator);
}
void* CSBTreeIndexSubBlock::getLeftmostLeaf() const {
void* node = getRootNode();
while (!static_cast<const NodeHeader*>(node)->is_leaf) {
node = getNode(static_cast<const NodeHeader*>(node)->node_group_reference, 0);
}
return node;
}
template <typename CodeType>
CSBTreeIndexSubBlock::InsertReturnValue CSBTreeIndexSubBlock::compressedKeyAddEntryHelper(
const tuple_id tuple,
const std::uint32_t compressed_code) {
CodeType actual_code = compressed_code;
return rootInsertHelper(tuple,
&actual_code,
csbtree_internal::CompressedCodeLessComparator<CodeType>());
}
template <typename ComparatorT>
CSBTreeIndexSubBlock::InsertReturnValue CSBTreeIndexSubBlock::internalInsertHelper(
const int node_group_allocation_requirement,
const tuple_id tuple,
const void *key,
const ComparatorT &key_comparator,
const NodeHeader *parent_node_header,
void *node) {
DEBUG_ASSERT((node_group_allocation_requirement == 0) || (parent_node_header != NULL));
NodeHeader *node_header = static_cast<NodeHeader*>(node);
DEBUG_ASSERT(!node_header->is_leaf);
// Find the child to insert into.
uint16_t key_num;
for (key_num = 0;
key_num < node_header->num_keys;
++key_num) {
if (key_comparator.compareDataPtrsInl(key,
static_cast<char*>(node)
+ sizeof(NodeHeader)
+ key_num * key_length_bytes_)) {
break;
}
}
// Insert into the appropriate child.
void *child_node = getNode(node_header->node_group_reference, key_num);
int child_node_group_allocation_requirement = 0;
if (node_header->num_keys == max_keys_internal_) {
// If the child node is split, this node must also be split.
if (getRootNode() == node) {
// If this node is the root, make sure there is additional space for a
// new root.
DEBUG_ASSERT(node_group_allocation_requirement == 0);
child_node_group_allocation_requirement = 2;
} else {
child_node_group_allocation_requirement = node_group_allocation_requirement + 1;
}
} else {
// This node can accomodate an additional key without splitting.
child_node_group_allocation_requirement = 0;
}
InsertReturnValue child_return_value;
if (static_cast<NodeHeader*>(child_node)->is_leaf) {
child_return_value = leafInsertHelper(child_node_group_allocation_requirement,
tuple,
key,
key_comparator,
node_header,
child_node);
} else {
child_return_value = internalInsertHelper(child_node_group_allocation_requirement,
tuple,
key,
key_comparator,
node_header,
child_node);
}
if (child_return_value.new_node_group_id == kNodeGroupFull) {
// Insertion failed (out of space).
return child_return_value;
}
InsertReturnValue retval;
bool child_split_across_groups = !child_return_value.left_split_group_smaller
&& (key_num == small_half_num_children_);
if (child_return_value.new_node_group_id != kNodeGroupNone) {
// A new node group was allocated, and this node must be split.
DEBUG_ASSERT(child_return_value.split_node_least_key != NULL);
DEBUG_ASSERT(node_header->num_keys == max_keys_internal_);
const void *group_end = NULL;
if (node_group_allocation_requirement) {
// Parent node is full, must allocate new node group(s).
// Should already by checked by the child:
DEBUG_ASSERT(num_free_node_groups_ >= node_group_allocation_requirement);
// Split the node group.
group_end = splitNodeGroupHelper(parent_node_header, &node, &retval);
} else {
group_end = getNode(parent_node_header->node_group_reference, parent_node_header->num_keys + 1);
}
if (group_end == NULL) {
retval.split_node_least_key = splitNodeAcrossGroups(node,
retval.new_node_group_id,
child_return_value.new_node_group_id,
child_return_value.left_split_group_smaller,
child_split_across_groups);
} else {
retval.split_node_least_key = splitNodeInGroup(node,
group_end,
child_return_value.new_node_group_id,
child_return_value.left_split_group_smaller,
child_split_across_groups);
}
if (child_split_across_groups) {
// We're done here.
return retval;
}
if (!child_return_value.left_split_group_smaller) {
DEBUG_ASSERT(key_num >= large_half_num_children_);
key_num -= large_half_num_children_;
if (group_end == NULL) {
node = getNode(retval.new_node_group_id, 0);
} else {
node = static_cast<char*>(node) + kCSBTreeNodeSizeBytes;
}
}
}
if (child_return_value.split_node_least_key != NULL) {
// If the child was split, insert the new key.
node_header = static_cast<NodeHeader*>(node);
void *key_location = static_cast<char*>(node)
+ sizeof(*node_header)
+ key_num * key_length_bytes_;
// Move subsequent entries right if necessary.
if (key_num < node_header->num_keys) {
memmove(static_cast<char*>(key_location) + key_length_bytes_,
key_location,
(node_header->num_keys - key_num) * key_length_bytes_);
}
// Insert the new entry.
memcpy(key_location, child_return_value.split_node_least_key, key_length_bytes_);
// Increment the key count.
++(node_header->num_keys);
}
return retval;
}
template <typename ComparatorT>
CSBTreeIndexSubBlock::InsertReturnValue CSBTreeIndexSubBlock::leafInsertHelper(
const int node_group_allocation_requirement,
const tuple_id tuple,
const void *key,
const ComparatorT &key_comparator,
const NodeHeader *parent_node_header,
void *node) {
InsertReturnValue retval;
NodeHeader *node_header = static_cast<NodeHeader*>(node);
DEBUG_ASSERT(node_header->is_leaf);
if (node_header->num_keys == max_keys_leaf_) {
// '*node' is full and must be split.
const void *group_end = NULL;
if (node_group_allocation_requirement) {
// Parent node is full, must allocate new node group(s).
if (num_free_node_groups_ < node_group_allocation_requirement) {
// Not enough node groups to allocate, so insert must fail (although
// more efficient packing may be possible if index is rebuilt).
retval.new_node_group_id = kNodeGroupFull;
return retval;
}
// Split the node group.
group_end = splitNodeGroupHelper(parent_node_header, &node, &retval);
DEBUG_ASSERT(static_cast<const NodeHeader*>(node)->is_leaf);
} else {
// If we are splitting the root node, make sure the caller can allocate a
// new root.
if (getRootNode() == node) {
if (num_free_node_groups_ == 0) {
retval.new_node_group_id = kNodeGroupFull;
return retval;
}
}
group_end = getNode(parent_node_header->node_group_reference, parent_node_header->num_keys + 1);
}
// This node group (now) has space for a new node. If node splits are
// asymmetric (i.e. max_keys_leaf_ is odd), do the split such that the new
// entry will go into the smaller split node, leaving the nodes balanced.
if (key_comparator.compareDataPtrsInl(key,
static_cast<const char*>(node)
+ sizeof(NodeHeader)
+ (small_half_num_keys_leaf_) * key_tuple_id_pair_length_bytes_)) {
// Insert in the first half.
if (group_end == NULL) {
retval.split_node_least_key = splitNodeAcrossGroups(node,
retval.new_node_group_id,
kNodeGroupNone,
true,
false);
} else {
retval.split_node_least_key = splitNodeInGroup(node, group_end, kNodeGroupNone, true, false);
}
} else {
// Insert in the second half.
// Note: The new key may be inserted at the first position in the split
// node. The pointer 'retval.split_node_least_key' will remain correct
// if this is the case, as splitNodeInGroup() returns a pointer to the
// first leaf key's location.
if (group_end == NULL) {
retval.split_node_least_key = splitNodeAcrossGroups(node,
retval.new_node_group_id,
kNodeGroupNone,
false,
false);
node = getNode(retval.new_node_group_id, 0);
} else {
retval.split_node_least_key = splitNodeInGroup(node, group_end, kNodeGroupNone, false, false);
node = static_cast<char*>(node) + kCSBTreeNodeSizeBytes;
}
}
}
// Either splitting was not necessary, or it already occured. Insert the key.
insertEntryInLeaf(tuple, key, key_comparator, node);
return retval;
}
template <typename ComparatorT>
CSBTreeIndexSubBlock::InsertReturnValue CSBTreeIndexSubBlock::rootInsertHelper(
const tuple_id tuple,
const void *key,
const ComparatorT &key_comparator) {
void *root_node = getRootNode();
NodeHeader super_root;
super_root.num_keys = 0;
super_root.is_leaf = false;
super_root.node_group_reference = getRootNodeGroupNumber();
if (static_cast<NodeHeader*>(root_node)->is_leaf) {
return leafInsertHelper(0,
tuple,
key,
key_comparator,
&super_root,
root_node);
} else {
return internalInsertHelper(0,
tuple,
key,
key_comparator,
&super_root,
root_node);
}
}
const void* CSBTreeIndexSubBlock::splitNodeGroupHelper(
const NodeHeader *parent_node_header,
void **node,
InsertReturnValue *caller_return_value) {
const void *center_node = getNode(parent_node_header->node_group_reference, small_half_num_children_);
if (*node < center_node) {
caller_return_value->left_split_group_smaller = true;
caller_return_value->new_node_group_id = splitNodeGroup(parent_node_header, true, false);
return getNode(parent_node_header->node_group_reference, small_half_num_children_);
} else {
caller_return_value->left_split_group_smaller = false;
if (*node == center_node) {
caller_return_value->new_node_group_id = splitNodeGroup(parent_node_header, false, true);
return NULL;
} else {
caller_return_value->new_node_group_id = splitNodeGroup(parent_node_header, false, false);
// TODO(chasseur): Verify that this logic is correct.
if (static_cast<const NodeHeader*>(*node)->node_group_reference >= 0) {
*node = static_cast<char*>(getNode(caller_return_value->new_node_group_id, 0))
+ (static_cast<const char*>(*node) - static_cast<const char*>(center_node))
- kCSBTreeNodeSizeBytes;
} else {
*node = static_cast<char*>(getNode(caller_return_value->new_node_group_id, 0))
+ (static_cast<const char*>(*node) - static_cast<const char*>(center_node));
}
return getNode(caller_return_value->new_node_group_id, small_half_num_children_);
}
}
}
int CSBTreeIndexSubBlock::splitNodeGroup(const NodeHeader *parent_node_header,
const bool left_smaller,
const bool will_split_node_across_groups) {
DEBUG_ASSERT(!parent_node_header->is_leaf);
DEBUG_ASSERT(parent_node_header->num_keys == max_keys_internal_);
DEBUG_ASSERT(num_free_node_groups_ > 0);
if (will_split_node_across_groups) {
DEBUG_ASSERT(!left_smaller);
}
// Allocate a new node group.
int new_node_group_id = allocateNodeGroup();
DEBUG_ASSERT(new_node_group_id >= 0);
void *copy_destination;
if (will_split_node_across_groups) {
copy_destination = getNode(new_node_group_id, 1);
} else {
copy_destination = getNode(new_node_group_id, 0);
}
NodeHeader *rightmost_remaining_node_header;
// Move half of the nodes in the current group to the new group.
if (left_smaller) {
memcpy(copy_destination,
getNode(parent_node_header->node_group_reference, small_half_num_children_),
large_half_num_children_ * kCSBTreeNodeSizeBytes);
rightmost_remaining_node_header
= static_cast<NodeHeader*>(getNode(parent_node_header->node_group_reference,
small_half_num_children_ - 1));
} else {
memcpy(copy_destination,
getNode(parent_node_header->node_group_reference, large_half_num_children_),
small_half_num_children_ * kCSBTreeNodeSizeBytes);
rightmost_remaining_node_header
= static_cast<NodeHeader*>(getNode(parent_node_header->node_group_reference,
large_half_num_children_ - 1));
}
// If the split nodes are leaves, adjust the node_group_reference of the
// rightmost remaining node.
if (rightmost_remaining_node_header->is_leaf) {
rightmost_remaining_node_header->node_group_reference = new_node_group_id;
}
return new_node_group_id;
}
const void* CSBTreeIndexSubBlock::splitNodeInGroup(void *node,
const void *group_end,
const int right_child_node_group,
const bool left_smaller,
const bool child_was_split_across_groups) {
NodeHeader *node_header = static_cast<NodeHeader*>(node);
if (child_was_split_across_groups) {
DEBUG_ASSERT(!node_header->is_leaf);
DEBUG_ASSERT(!left_smaller);
}
if (node_header->is_leaf) {
DEBUG_ASSERT(right_child_node_group == kNodeGroupNone);
DEBUG_ASSERT(node_header->num_keys == max_keys_leaf_);
} else {
DEBUG_ASSERT(right_child_node_group >= 0);
DEBUG_ASSERT(node_header->num_keys == max_keys_internal_);
}
void *next_node = static_cast<char*>(node) + kCSBTreeNodeSizeBytes;
if (group_end != next_node) {
// Shift subsequent nodes right.
memmove(static_cast<char*>(next_node) + kCSBTreeNodeSizeBytes,
next_node,
static_cast<const char*>(group_end) - static_cast<char*>(next_node));
}
// Do the split.
NodeHeader *next_node_header = static_cast<NodeHeader*>(next_node);
if (node_header->is_leaf) {
// Set up the next node's header.
if (left_smaller) {
next_node_header->num_keys = large_half_num_keys_leaf_;
} else {
next_node_header->num_keys = small_half_num_keys_leaf_;
}
next_node_header->is_leaf = true;
next_node_header->node_group_reference = node_header->node_group_reference;
// Modify the current node's header.
if (left_smaller) {
node_header->num_keys = small_half_num_keys_leaf_;
} else {
node_header->num_keys = large_half_num_keys_leaf_;
}
node_header->node_group_reference = kNodeGroupNextLeaf;
// Copy half the keys over.
memcpy(static_cast<char*>(next_node) + sizeof(NodeHeader),
static_cast<const char*>(node)
+ sizeof(NodeHeader)
+ (node_header->num_keys * key_tuple_id_pair_length_bytes_),
next_node_header->num_keys * key_tuple_id_pair_length_bytes_);
return static_cast<const char*>(next_node) + sizeof(NodeHeader);
} else {
// Set up the next node's header.
if (left_smaller) {
next_node_header->num_keys = large_half_num_children_ - 1;
} else {
if (child_was_split_across_groups) {
next_node_header->num_keys = small_half_num_children_;
} else {
next_node_header->num_keys = small_half_num_children_ - 1;
}
}
next_node_header->is_leaf = false;
next_node_header->node_group_reference = right_child_node_group;
// Modify the current node's header.
if (left_smaller) {
node_header->num_keys = small_half_num_children_ - 1;
} else {
node_header->num_keys = large_half_num_children_ - 1;
}
if (child_was_split_across_groups) {
// Copy half the entries over.
memcpy(static_cast<char*>(next_node) + sizeof(NodeHeader),
static_cast<const char*>(node)
+ sizeof(NodeHeader)
+ ((node_header->num_keys) * key_length_bytes_),
next_node_header->num_keys * key_length_bytes_);
} else {
// Copy half the keys over (shift by one for the leftmost child).
memcpy(static_cast<char*>(next_node) + sizeof(NodeHeader),
static_cast<const char*>(node)
+ sizeof(NodeHeader)
+ ((node_header->num_keys + 1) * key_length_bytes_),
next_node_header->num_keys * key_length_bytes_);
}
// Push the middle key up.
return getLeastKey(next_node);
}
}
const void* CSBTreeIndexSubBlock::splitNodeAcrossGroups(void *node,
const int destination_group_number,
const int right_child_node_group,
const bool left_smaller,
const bool child_was_split_across_groups) {
DEBUG_ASSERT(destination_group_number >= 0);
DEBUG_ASSERT(static_cast<size_t>(destination_group_number) < node_group_used_bitmap_->size());
DEBUG_ASSERT(node_group_used_bitmap_->getBit(destination_group_number));
NodeHeader *node_header = static_cast<NodeHeader*>(node);
if (child_was_split_across_groups) {
DEBUG_ASSERT(!node_header->is_leaf);
DEBUG_ASSERT(!left_smaller);
}
if (node_header->is_leaf) {
DEBUG_ASSERT(right_child_node_group == kNodeGroupNone);
DEBUG_ASSERT(node_header->num_keys == max_keys_leaf_);
DEBUG_ASSERT(node_header->node_group_reference == destination_group_number);
} else {
DEBUG_ASSERT(right_child_node_group >= 0);
DEBUG_ASSERT(node_header->num_keys == max_keys_internal_);
}
// Do the split.
void *destination_node = getNode(destination_group_number, 0);
NodeHeader *destination_node_header = static_cast<NodeHeader*>(destination_node);
if (node_header->is_leaf) {
// Set up the destination node's header.
if (left_smaller) {
destination_node_header->num_keys = large_half_num_keys_leaf_;
} else {
destination_node_header->num_keys = small_half_num_keys_leaf_;
}
destination_node_header->is_leaf = true;
destination_node_header->node_group_reference = kNodeGroupNextLeaf;
// Modify the current node's header.
if (left_smaller) {
node_header->num_keys = small_half_num_keys_leaf_;
} else {
node_header->num_keys = large_half_num_keys_leaf_;
}
// Copy half the entries over.
memcpy(static_cast<char*>(destination_node) + sizeof(NodeHeader),
static_cast<const char*>(node)
+ sizeof(NodeHeader)
+ (node_header->num_keys * key_tuple_id_pair_length_bytes_),
destination_node_header->num_keys * key_tuple_id_pair_length_bytes_);
return static_cast<const char*>(destination_node) + sizeof(NodeHeader);
} else {
// Set up the destination node's header.
if (left_smaller) {
destination_node_header->num_keys = large_half_num_children_ - 1;
} else {
if (child_was_split_across_groups) {
destination_node_header->num_keys = small_half_num_children_;
} else {
destination_node_header->num_keys = small_half_num_children_ - 1;
}
}
destination_node_header->is_leaf = false;
destination_node_header->node_group_reference = right_child_node_group;
// Modify the current node's header.
if (left_smaller) {
node_header->num_keys = small_half_num_children_ - 1;
} else {
node_header->num_keys = large_half_num_children_ - 1;
}
if (child_was_split_across_groups) {
// Copy half the keys over.
memcpy(static_cast<char*>(destination_node) + sizeof(NodeHeader),
static_cast<const char*>(node)
+ sizeof(NodeHeader)
+ (node_header->num_keys * key_length_bytes_),
destination_node_header->num_keys * key_length_bytes_);
} else {
// Copy half the keys over (shift by one for the leftmost child).
memcpy(static_cast<char*>(destination_node) + sizeof(NodeHeader),
static_cast<const char*>(node)
+ sizeof(NodeHeader)
+ ((node_header->num_keys + 1) * key_length_bytes_),
destination_node_header->num_keys * key_length_bytes_);
}
// Push the middle key up.
return getLeastKey(destination_node);
}
}
template <typename ComparatorT>
void CSBTreeIndexSubBlock::insertEntryInLeaf(const tuple_id tuple,
const void *key,
const ComparatorT &key_comparator,
void *node) {
DEBUG_ASSERT(static_cast<NodeHeader*>(node)->is_leaf);
const uint16_t num_keys = static_cast<NodeHeader*>(node)->num_keys;
DEBUG_ASSERT(num_keys < max_keys_leaf_);
char *current_key = static_cast<char*>(node) + sizeof(NodeHeader);
for (uint16_t key_num = 0;
key_num < num_keys;
++key_num, current_key += key_tuple_id_pair_length_bytes_) {
if (key_comparator.compareDataPtrsInl(key, current_key)) {
// Shift subsequent entries right.
memmove(current_key + key_tuple_id_pair_length_bytes_,
current_key,
(num_keys - key_num) * key_tuple_id_pair_length_bytes_);
break;
}
}
// Insert the new entry.
memcpy(current_key, key, key_length_bytes_);
*reinterpret_cast<tuple_id*>(current_key + key_length_bytes_) = tuple;
// Increment the key count.
++(static_cast<NodeHeader*>(node)->num_keys);
}
template <typename CodeType>
void CSBTreeIndexSubBlock::compressedKeyRemoveEntryHelper(const tuple_id tuple,
const std::uint32_t compressed_code) {
CodeType actual_code = compressed_code;
csbtree_internal::CompressedCodeLessComparator<CodeType> comp;
removeEntryFromLeaf(tuple,
&actual_code,
comp,
findLeaf(getRootNode(),
&actual_code,
comp,
comp));
}
template <typename ComparatorT>
void CSBTreeIndexSubBlock::removeEntryFromLeaf(const tuple_id tuple,
const void *key,
const ComparatorT &key_comparator,
void *node) {
DEBUG_ASSERT(static_cast<NodeHeader*>(node)->is_leaf);
void *right_sibling;
const uint16_t num_keys = static_cast<NodeHeader*>(node)->num_keys;
// If node is totally empty, immediately chase the next sibling.
if (num_keys == 0) {
right_sibling = getRightSiblingOfLeafNode(node);
if (right_sibling != NULL) {
removeEntryFromLeaf(tuple, key, key_comparator, right_sibling);
return;
} else {
FATAL_ERROR("CSBTree: attempted to remove nonexistent entry.");
}
}
for (uint16_t key_num = 0;
key_num < num_keys;
++key_num) {
char* existing_key_ptr = static_cast<char*>(node)
+ sizeof(NodeHeader)
+ key_num * key_tuple_id_pair_length_bytes_;
if (key_comparator.compareDataPtrsInl(existing_key_ptr, key)) {
// Haven't yet reached the target key.
continue;
} else if (key_comparator.compareDataPtrsInl(key, existing_key_ptr)) {
// Past the target key, but the target has not been found.
FATAL_ERROR("CSBTree: attempted to remove nonexistent entry.");
} else {
// Key matches, so check tuple_id.
if (tuple == *reinterpret_cast<const tuple_id*>(existing_key_ptr + key_length_bytes_)) {
// Match found, remove the entry.
if (key_num != num_keys - 1) {
// Move subsequent entries forward.
memmove(existing_key_ptr,
existing_key_ptr + key_tuple_id_pair_length_bytes_,
(num_keys - key_num - 1) * key_tuple_id_pair_length_bytes_);
}
// Decrement the key count.
--(static_cast<NodeHeader*>(node)->num_keys);
return;
} else {
// Not the correct tuple_id, but there may be others with the same key.
continue;
}
}
}
// Proceed to next sibling.
right_sibling = getRightSiblingOfLeafNode(node);
if (right_sibling != NULL) {
removeEntryFromLeaf(tuple, key, key_comparator, right_sibling);
return;
} else {
FATAL_ERROR("CSBTree: attempted to remove nonexistent entry.");
}
}
namespace csbtree_internal {
// NOTE(chasseur): Ordinarily we would just use a lambda instead of writing out
// this functor. However, there is a bug in Clang (current as of 3.7.0) that
// prevents instantiations of templated functions from being compiled if they
// only occur in a lambda capture. This causes a situation where this file
// compiles successfully, but linking fails with unresolved symbols. Using a
// more verbose manually-written functor works around the problem.
class PredicateEvaluationForwarder {
public:
PredicateEvaluationForwarder(const CSBTreeIndexSubBlock &csb_tree,
const ComparisonID comp,
const TypedValue &right_literal)
: csb_tree_(csb_tree),
comp_(comp),
right_literal_(right_literal) {
}
template <typename ComparatorT>
TupleIdSequence* operator()(const ComparatorT &comparator) const {
return csb_tree_.evaluatePredicate(comp_,
right_literal_.getDataPtr(),
comparator,
comparator);
}
template <typename LiteralLessKeyComparatorT,
typename KeyLessLiteralComparatorT>
TupleIdSequence* operator()(
const LiteralLessKeyComparatorT &literal_less_key_comparator,
const KeyLessLiteralComparatorT &key_less_literal_comparator) const {
return csb_tree_.evaluatePredicate(comp_,
right_literal_.getDataPtr(),
literal_less_key_comparator,
key_less_literal_comparator);
}
private:
const CSBTreeIndexSubBlock &csb_tree_;
const ComparisonID comp_;
const TypedValue &right_literal_;
};
} // namespace csbtree_internal
TupleIdSequence* CSBTreeIndexSubBlock::evaluateComparisonPredicateOnUncompressedKey(
const ComparisonID comp,
const TypedValue &right_literal,
const Type &right_literal_type) const {
DEBUG_ASSERT(!key_is_compressed_);
DEBUG_ASSERT(!key_is_composite_);
csbtree_internal::PredicateEvaluationForwarder forwarder(*this, comp, right_literal);
if (right_literal_type.isSubsumedBy(*key_type_)) {
return InvokeOnLessComparatorForTypeIgnoreNullability(
*key_type_,
forwarder);
} else {
return InvokeOnBothLessComparatorsForDifferentTypesIgnoreNullability(
right_literal_type,
*key_type_,
forwarder);
}
}
TupleIdSequence* CSBTreeIndexSubBlock::evaluateComparisonPredicateOnCompressedKey(
ComparisonID comp,
const TypedValue &right_literal,
const Type &right_literal_type) const {
DEBUG_ASSERT(key_is_compressed_);
DEBUG_ASSERT(!key_is_composite_);
// Stack variables to hold compressed codes as needed.
uint8_t byte_code;
uint16_t short_code;
uint32_t word_code;
const CompressedTupleStorageSubBlock &compressed_tuple_store
= static_cast<const CompressedTupleStorageSubBlock&>(tuple_store_);
if (compressed_tuple_store.compressedAttributeIsDictionaryCompressed(indexed_attribute_ids_.front())) {
const CompressionDictionary &dict
= compressed_tuple_store.compressedGetDictionary(indexed_attribute_ids_.front());
switch (comp) {
case ComparisonID::kEqual:
byte_code = short_code = word_code = dict.getCodeForTypedValue(right_literal,
right_literal_type);
if (word_code == dict.numberOfCodes()) {
return new TupleIdSequence(tuple_store_.getMaxTupleID() + 1);
}
break;
case ComparisonID::kNotEqual:
byte_code = short_code = word_code = dict.getCodeForTypedValue(right_literal,
right_literal_type);
if (word_code == dict.numberOfCodes()) {
return tuple_store_.getExistenceMap();
}
break;
default: {
pair<uint32_t, uint32_t> limits = dict.getLimitCodesForComparisonTyped(comp,
right_literal,
right_literal_type);
if (limits.first == 0) {
if (limits.second == dict.numberOfCodes()) {
return tuple_store_.getExistenceMap();
} else {
byte_code = short_code = word_code = limits.second;
comp = ComparisonID::kLess;
}
} else if (limits.second == dict.numberOfCodes()) {
byte_code = short_code = word_code = limits.first;
comp = ComparisonID::kGreaterOrEqual;
} else {
FATAL_ERROR("CompressionDictionary::getLimitCodesForComparisonTyped() returned "
"limits which did not extend to either the minimum or maximum code "
"when called by CSBTreeIndexSubBlock::evaluateComparisonPredicateOnCompressedKey().");
}
break;
}
}
} else {
if (compressed_tuple_store.compressedComparisonIsAlwaysTrueForTruncatedAttribute(
comp,
indexed_attribute_ids_.front(),
right_literal,
right_literal_type)) {
return tuple_store_.getExistenceMap();
} else if (compressed_tuple_store.compressedComparisonIsAlwaysFalseForTruncatedAttribute(
comp,
indexed_attribute_ids_.front(),
right_literal,
right_literal_type)) {
return new TupleIdSequence(tuple_store_.getMaxTupleID() + 1);
} else {
switch (comp) {
case ComparisonID::kEqual:
case ComparisonID::kNotEqual:
byte_code = short_code = word_code = right_literal.getLiteral<std::int64_t>();
break;
// Adjustments for kLessOrEqual and kGreater make predicate evaluation
// a bit more efficient (particularly in the presence of repeated
// keys). Adding 1 will not overflow the code type, as a literal of
// exactly the maximum possible value for the code type would have
// already caused
// compressedComparisonIsAlwaysTrueForTruncatedAttribute() to return
// true for kLessOrEqual, or
// compressedComparisonIsAlwaysFalseForTruncatedAttribute() to return
// false for kGreater.
case ComparisonID::kLessOrEqual:
comp = ComparisonID::kLess;
byte_code = short_code = word_code
= 1 + CompressedTupleStorageSubBlock::GetEffectiveLiteralValueForComparisonWithTruncatedAttribute(
comp,
right_literal,
right_literal_type);
break;
case ComparisonID::kGreater:
comp = ComparisonID::kGreaterOrEqual;
byte_code = short_code = word_code
= 1 + CompressedTupleStorageSubBlock::GetEffectiveLiteralValueForComparisonWithTruncatedAttribute(
comp,
right_literal,
right_literal_type);
break;
default:
byte_code = short_code = word_code
= CompressedTupleStorageSubBlock::GetEffectiveLiteralValueForComparisonWithTruncatedAttribute(
comp,
right_literal,
right_literal_type);
break;
}
}
}
switch (compressed_tuple_store.compressedGetCompressedAttributeSize(indexed_attribute_ids_.front())) {
case 1: {
csbtree_internal::CompressedCodeLessComparator<uint8_t> comparator;
return evaluatePredicate(comp, &byte_code, comparator, comparator);
break;
}
case 2: {
csbtree_internal::CompressedCodeLessComparator<uint16_t> comparator;
return evaluatePredicate(comp, &short_code, comparator, comparator);
break;
}
case 4: {
csbtree_internal::CompressedCodeLessComparator<uint32_t> comparator;
return evaluatePredicate(comp, &word_code, comparator, comparator);
break;
}
default:
FATAL_ERROR("Unexpected compressed key byte-length (not 1, 2, or 4) encountered "
"in CSBTreeIndexSubBlock::getMatchesForPredicate()");
}
}
template <typename LiteralLessKeyComparatorT,
typename KeyLessLiteralComparatorT>
TupleIdSequence* CSBTreeIndexSubBlock::evaluatePredicate(
ComparisonID comp,
const void *literal,
const LiteralLessKeyComparatorT &literal_less_key_comparator,
const KeyLessLiteralComparatorT &key_less_literal_comparator) const {
switch (comp) {
case ComparisonID::kEqual:
return evaluateEqualPredicate(literal,
literal_less_key_comparator,
key_less_literal_comparator);
case ComparisonID::kNotEqual:
return evaluateNotEqualPredicate(literal,
literal_less_key_comparator,
key_less_literal_comparator);
case ComparisonID::kLess:
return evaluateLessPredicate<LiteralLessKeyComparatorT,
KeyLessLiteralComparatorT,
false>(literal,
literal_less_key_comparator,
key_less_literal_comparator);
case ComparisonID::kLessOrEqual:
return evaluateLessPredicate<LiteralLessKeyComparatorT,
KeyLessLiteralComparatorT,
true>(literal,
literal_less_key_comparator,
key_less_literal_comparator);
case ComparisonID::kGreater:
return evaluateGreaterPredicate<LiteralLessKeyComparatorT,
KeyLessLiteralComparatorT,
false>(literal,
literal_less_key_comparator,
key_less_literal_comparator);
case ComparisonID::kGreaterOrEqual:
return evaluateGreaterPredicate<LiteralLessKeyComparatorT,
KeyLessLiteralComparatorT,
true>(literal,
literal_less_key_comparator,
key_less_literal_comparator);
default:
FATAL_ERROR("Unrecognized ComparisonID in CSBTreeIndexSubBlock::evaluatePredicate()");
}
}
template <typename LiteralLessKeyComparatorT,
typename KeyLessLiteralComparatorT>
TupleIdSequence* CSBTreeIndexSubBlock::evaluateEqualPredicate(
const void *literal,
const LiteralLessKeyComparatorT &literal_less_key_comparator,
const KeyLessLiteralComparatorT &key_less_literal_comparator) const {
std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(tuple_store_.getMaxTupleID() + 1));
bool match_found = false;
const void *search_node = findLeaf(getRootNode(),
literal,
literal_less_key_comparator,
key_less_literal_comparator);
while (search_node != NULL) {
DEBUG_ASSERT(static_cast<const NodeHeader*>(search_node)->is_leaf);
const uint16_t num_keys = static_cast<const NodeHeader*>(search_node)->num_keys;
const char *key_ptr = static_cast<const char*>(search_node) + sizeof(NodeHeader);
for (uint16_t entry_num = 0; entry_num < num_keys; ++entry_num) {
if (!match_found) {
if (!key_less_literal_comparator.compareDataPtrsInl(key_ptr, literal)) {
match_found = true;
}
}
if (match_found) {
if (literal_less_key_comparator.compareDataPtrsInl(literal, key_ptr)) {
// End of matches.
return matches.release();
}
matches->set(*reinterpret_cast<const tuple_id*>(key_ptr + key_length_bytes_), true);
}
key_ptr += key_tuple_id_pair_length_bytes_;
}
search_node = getRightSiblingOfLeafNode(search_node);
}
return matches.release();
}
template <typename LiteralLessKeyComparatorT,
typename KeyLessLiteralComparatorT>
TupleIdSequence* CSBTreeIndexSubBlock::evaluateNotEqualPredicate(
const void *literal,
const LiteralLessKeyComparatorT &literal_less_key_comparator,
const KeyLessLiteralComparatorT &key_less_literal_comparator) const {
std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(tuple_store_.getMaxTupleID() + 1));
const void *boundary_node = findLeaf(getRootNode(),
literal,
literal_less_key_comparator,
key_less_literal_comparator);
const void *search_node = getLeftmostLeaf();
// Fill in all tuples from leaves definitively less than the key.
while (search_node != boundary_node) {
DEBUG_ASSERT(search_node != NULL);
DEBUG_ASSERT(static_cast<const NodeHeader*>(search_node)->is_leaf);
const uint16_t num_keys = static_cast<const NodeHeader*>(search_node)->num_keys;
const char *tuple_id_ptr = static_cast<const char*>(search_node)
+ sizeof(NodeHeader)
+ key_length_bytes_;
for (uint16_t entry_num = 0; entry_num < num_keys; ++entry_num) {
matches->set(*reinterpret_cast<const tuple_id*>(tuple_id_ptr), true);
tuple_id_ptr += key_tuple_id_pair_length_bytes_;
}
search_node = getRightSiblingOfLeafNode(search_node);
}
// Actually do comparisons in leaves that may contain the literal key.
bool equal_found = false;
bool past_equal = false;
while (search_node != NULL) {
DEBUG_ASSERT(static_cast<const NodeHeader*>(search_node)->is_leaf);
const uint16_t num_keys = static_cast<const NodeHeader*>(search_node)->num_keys;
const char *key_ptr = static_cast<const char*>(search_node) + sizeof(NodeHeader);
for (uint16_t entry_num = 0; entry_num < num_keys; ++entry_num) {
if (!equal_found) {
if (key_less_literal_comparator.compareDataPtrsInl(key_ptr, literal)) {
// key < literal
matches->set(*reinterpret_cast<const tuple_id*>(key_ptr + key_length_bytes_), true);
} else {
equal_found = true;
}
}
if (equal_found) {
if (literal_less_key_comparator.compareDataPtrsInl(literal, key_ptr)) {
// literal < key
// Fill in the rest of the keys from this leaf.
for (uint16_t subsequent_num = entry_num;
subsequent_num < num_keys;
++subsequent_num) {
matches->set(*reinterpret_cast<const tuple_id*>(key_ptr + key_length_bytes_), true);
key_ptr += key_tuple_id_pair_length_bytes_;
}
past_equal = true;
break;
}
}
key_ptr += key_tuple_id_pair_length_bytes_;
}
search_node = getRightSiblingOfLeafNode(search_node);
if (past_equal) {
break;
}
}
// Fill in all tuples from leaves definitively greater than the key.
while (search_node != NULL) {
DEBUG_ASSERT(static_cast<const NodeHeader*>(search_node)->is_leaf);
uint16_t num_keys = static_cast<const NodeHeader*>(search_node)->num_keys;
const char *tuple_id_ptr = static_cast<const char*>(search_node)
+ sizeof(NodeHeader)
+ key_length_bytes_;
for (uint16_t entry_num = 0; entry_num < num_keys; ++entry_num) {
matches->set(*reinterpret_cast<const tuple_id*>(tuple_id_ptr), true);
tuple_id_ptr += key_tuple_id_pair_length_bytes_;
}
search_node = getRightSiblingOfLeafNode(search_node);
}
return matches.release();
}
template <typename LiteralLessKeyComparatorT,
typename KeyLessLiteralComparatorT,
bool include_equal>
TupleIdSequence* CSBTreeIndexSubBlock::evaluateLessPredicate(
const void *literal,
const LiteralLessKeyComparatorT &literal_less_key_comparator,
const KeyLessLiteralComparatorT &key_less_literal_comparator) const {
std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(tuple_store_.getMaxTupleID() + 1));
const void *boundary_node = findLeaf(getRootNode(),
literal,
literal_less_key_comparator,
key_less_literal_comparator);
const void *search_node = getLeftmostLeaf();
// Fill in all tuples from leaves definitively less than the key.
while (search_node != boundary_node) {
DEBUG_ASSERT(search_node != NULL);
DEBUG_ASSERT(static_cast<const NodeHeader*>(search_node)->is_leaf);
uint16_t num_keys = static_cast<const NodeHeader*>(search_node)->num_keys;
const char *tuple_id_ptr = static_cast<const char*>(search_node)
+ sizeof(NodeHeader)
+ key_length_bytes_;
for (uint16_t entry_num = 0; entry_num < num_keys; ++entry_num) {
matches->set(*reinterpret_cast<const tuple_id*>(tuple_id_ptr), true);
tuple_id_ptr += key_tuple_id_pair_length_bytes_;
}
search_node = getRightSiblingOfLeafNode(search_node);
}
// Actually do comparisons in leaves that may contain the literal key.
if (include_equal) {
bool equal_found = false;
while (search_node != NULL) {
DEBUG_ASSERT(static_cast<const NodeHeader*>(search_node)->is_leaf);
uint16_t num_keys = static_cast<const NodeHeader*>(search_node)->num_keys;
const char *key_ptr = static_cast<const char*>(search_node) + sizeof(NodeHeader);
for (uint16_t entry_num = 0; entry_num < num_keys; ++entry_num) {
if (!equal_found) {
if (key_less_literal_comparator.compareDataPtrsInl(key_ptr, literal)) {
// key < literal
matches->set(*reinterpret_cast<const tuple_id*>(key_ptr + key_length_bytes_), true);
} else {
equal_found = true;
}
}
if (equal_found) {
if (literal_less_key_comparator.compareDataPtrsInl(literal, key_ptr)) {
// literal < key
return matches.release();
} else {
matches->set(*reinterpret_cast<const tuple_id*>(key_ptr + key_length_bytes_), true);
}
}
key_ptr += key_tuple_id_pair_length_bytes_;
}
search_node = getRightSiblingOfLeafNode(search_node);
}
} else {
while (search_node != NULL) {
DEBUG_ASSERT(static_cast<const NodeHeader*>(search_node)->is_leaf);
uint16_t num_keys = static_cast<const NodeHeader*>(search_node)->num_keys;
const char *key_ptr = static_cast<const char*>(search_node) + sizeof(NodeHeader);
for (uint16_t entry_num = 0; entry_num < num_keys; ++entry_num) {
if (key_less_literal_comparator.compareDataPtrsInl(key_ptr, literal)) {
// key < literal
matches->set(*reinterpret_cast<const tuple_id*>(key_ptr + key_length_bytes_), true);
} else {
return matches.release();
}
key_ptr += key_tuple_id_pair_length_bytes_;
}
search_node = getRightSiblingOfLeafNode(search_node);
}
}
return matches.release();
}
template <typename LiteralLessKeyComparatorT,
typename KeyLessLiteralComparatorT,
bool include_equal>
TupleIdSequence* CSBTreeIndexSubBlock::evaluateGreaterPredicate(
const void *literal,
const LiteralLessKeyComparatorT &literal_less_key_comparator,
const KeyLessLiteralComparatorT &key_less_literal_comparator) const {
std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(tuple_store_.getMaxTupleID() + 1));
const void *search_node = findLeaf(getRootNode(),
literal,
literal_less_key_comparator,
key_less_literal_comparator);
// Do comparisons in leaves that may contain the literal key.
bool match_found = false;
while (search_node != NULL) {
DEBUG_ASSERT(static_cast<const NodeHeader*>(search_node)->is_leaf);
uint16_t num_keys = static_cast<const NodeHeader*>(search_node)->num_keys;
const char *key_ptr = static_cast<const char*>(search_node) + sizeof(NodeHeader);
for (uint16_t entry_num = 0; entry_num < num_keys; ++entry_num) {
if (include_equal) {
if (!key_less_literal_comparator.compareDataPtrsInl(key_ptr, literal)) {
match_found = true;
}
} else {
if (literal_less_key_comparator.compareDataPtrsInl(literal, key_ptr)) {
match_found = true;
}
}
if (match_found) {
// Fill in the matching entries from this leaf.
for (uint16_t match_num = entry_num; match_num < num_keys; ++match_num) {
matches->set(*reinterpret_cast<const tuple_id*>(key_ptr + key_length_bytes_), true);
key_ptr += key_tuple_id_pair_length_bytes_;
}
break;
}
key_ptr += key_tuple_id_pair_length_bytes_;
}
search_node = getRightSiblingOfLeafNode(search_node);
if (match_found) {
break;
}
}
// Fill in all tuples from leaves definitively greater than the key.
while (search_node != NULL) {
DEBUG_ASSERT(static_cast<const NodeHeader*>(search_node)->is_leaf);
uint16_t num_keys = static_cast<const NodeHeader*>(search_node)->num_keys;
const char *tuple_id_ptr = static_cast<const char*>(search_node)
+ sizeof(NodeHeader)
+ key_length_bytes_;
for (uint16_t entry_num = 0; entry_num < num_keys; ++entry_num) {
matches->set(*reinterpret_cast<const tuple_id*>(tuple_id_ptr), true);
tuple_id_ptr += key_tuple_id_pair_length_bytes_;
}
search_node = getRightSiblingOfLeafNode(search_node);
}
return matches.release();
}
bool CSBTreeIndexSubBlock::rebuildSpaceCheck() const {
DEBUG_ASSERT(node_group_used_bitmap_->size() > 0);
if (tuple_store_.isEmpty()) {
return true;
}
// Check that this sub block will be able to fit entries for all tuples.
const tuple_id num_tuples = tuple_store_.numTuples();
// If all tuples can fit in a single leaf, then the root will be sufficient.
if (num_tuples > max_keys_leaf_) {
int num_node_groups_needed = 1; // 1 for the root.
const int keys_in_leaf_node_group = max_keys_leaf_ * (max_keys_internal_ + 1);
int num_node_groups_this_level = (num_tuples / keys_in_leaf_node_group)
+ (num_tuples % keys_in_leaf_node_group ? 1 : 0);
num_node_groups_needed += num_node_groups_this_level;
while (num_node_groups_this_level > 1) {
num_node_groups_this_level = (num_node_groups_this_level / (max_keys_internal_ + 1))
+ (num_node_groups_this_level % (max_keys_internal_ + 1) ? 1 : 0);
num_node_groups_needed += num_node_groups_this_level;
}
if (static_cast<size_t>(num_node_groups_needed) > node_group_used_bitmap_->size()) {
return false;
}
}
return true;
}
uint16_t CSBTreeIndexSubBlock::rebuildLeaves(std::vector<int> *used_node_groups) {
DEBUG_ASSERT(static_cast<size_t>(num_free_node_groups_) == node_group_used_bitmap_->size() - 1);
DEBUG_ASSERT(rebuildSpaceCheck());
if (key_is_compressed_) {
vector<csbtree_internal::CompressedEntryReference> entries;
generateEntryReferencesFromCompressedCodes(&entries);
return buildLeavesFromEntryReferences(&entries, used_node_groups);
} else if (key_is_composite_) {
vector<csbtree_internal::CompositeEntryReference> entries;
generateEntryReferencesFromCompositeKeys(&entries);
return buildLeavesFromEntryReferences(&entries, used_node_groups);
} else {
vector<csbtree_internal::EntryReference> entries;
generateEntryReferencesFromTypedValues(&entries);
return buildLeavesFromEntryReferences(&entries, used_node_groups);
}
}
template <class EntryReferenceT>
std::uint16_t CSBTreeIndexSubBlock::buildLeavesFromEntryReferences(
std::vector<EntryReferenceT> *entry_references,
std::vector<int> *used_node_groups) {
// Build tree from packed leaves.
int current_node_group_number = getRootNodeGroupNumber();
used_node_groups->push_back(current_node_group_number);
uint16_t current_node_number = 0;
uint16_t current_key_number = 0;
char *node_ptr = static_cast<char*>(getNode(current_node_group_number, current_node_number));
// Set up the first node's header.
// If this node is not totally full (i.e. it is the rightmost leaf),
// num_keys will be reset to the correct value after the loop below.
reinterpret_cast<NodeHeader*>(node_ptr)->num_keys = max_keys_leaf_;
reinterpret_cast<NodeHeader*>(node_ptr)->is_leaf = true;
reinterpret_cast<NodeHeader*>(node_ptr)->node_group_reference = kNodeGroupNone;
// Build all the leaves.
for (typename vector<EntryReferenceT>::const_iterator entry_it = entry_references->begin();
entry_it != entry_references->end();
++entry_it) {
if (current_key_number == max_keys_leaf_) {
// At the end of this node, most move to the next.
if (current_node_number == max_keys_internal_) {
// At the end of this node group, must allocate a new one.
int next_node_group_number = allocateNodeGroup();
DEBUG_ASSERT(next_node_group_number >= 0);
used_node_groups->push_back(next_node_group_number);
reinterpret_cast<NodeHeader*>(node_ptr)->node_group_reference = next_node_group_number;
current_node_group_number = next_node_group_number;
current_node_number = 0;
node_ptr = static_cast<char*>(getNode(current_node_group_number, current_node_number));
} else {
// Use the next node in the current group.
reinterpret_cast<NodeHeader*>(node_ptr)->node_group_reference = kNodeGroupNextLeaf;
++current_node_number;
node_ptr += kCSBTreeNodeSizeBytes;
}
// Set up new leaf node's header.
// If this node is not totally full (i.e. it is the rightmost leaf),
// num_keys will be reset to the correct value when this loop exits.
reinterpret_cast<NodeHeader*>(node_ptr)->num_keys = max_keys_leaf_;
reinterpret_cast<NodeHeader*>(node_ptr)->is_leaf = true;
reinterpret_cast<NodeHeader*>(node_ptr)->node_group_reference = kNodeGroupNone;
// Reset key number.
current_key_number = 0;
}
// Insert the key.
memcpy(node_ptr + sizeof(NodeHeader) + current_key_number * key_tuple_id_pair_length_bytes_,
entry_it->getDataPtr(),
key_length_bytes_);
// Set the tuple_id.
*reinterpret_cast<tuple_id*>(node_ptr
+ sizeof(NodeHeader)
+ current_key_number * key_tuple_id_pair_length_bytes_
+ key_length_bytes_)
= entry_it->getTupleID();
++current_key_number;
}
// Reset num_keys for the last leaf.
reinterpret_cast<NodeHeader*>(node_ptr)->num_keys = current_key_number;
return current_node_number + 1;
}
void CSBTreeIndexSubBlock::generateEntryReferencesFromCompressedCodes(
std::vector<csbtree_internal::CompressedEntryReference> *entry_references) const {
DEBUG_ASSERT(key_is_compressed_);
// TODO(chasseur): Handle NULL.
DEBUG_ASSERT(!key_is_nullable_);
DEBUG_ASSERT(entry_references->empty());
DEBUG_ASSERT(tuple_store_.isCompressed());
const CompressedTupleStorageSubBlock &compressed_tuple_store
= static_cast<const CompressedTupleStorageSubBlock&>(tuple_store_);
DEBUG_ASSERT(compressed_tuple_store.compressedBlockIsBuilt());
DEBUG_ASSERT(compressed_tuple_store.compressedAttributeIsDictionaryCompressed(indexed_attribute_ids_.front())
|| compressed_tuple_store.compressedAttributeIsTruncationCompressed(indexed_attribute_ids_.front()));
if (tuple_store_.isPacked()) {
for (tuple_id tid = 0; tid <= tuple_store_.getMaxTupleID(); ++tid) {
entry_references->emplace_back(
compressed_tuple_store.compressedGetCode(tid, indexed_attribute_ids_.front()),
tid);
}
} else {
for (tuple_id tid = 0; tid <= tuple_store_.getMaxTupleID(); ++tid) {
if (tuple_store_.hasTupleWithID(tid)) {
entry_references->emplace_back(
compressed_tuple_store.compressedGetCode(tid, indexed_attribute_ids_.front()),
tid);
}
}
}
sort(entry_references->begin(),
entry_references->end(),
csbtree_internal::CompressedEntryReferenceComparator());
DEBUG_ASSERT(static_cast<vector<csbtree_internal::CompressedEntryReference>::size_type>(
tuple_store_.numTuples()) == entry_references->size());
}
void CSBTreeIndexSubBlock::generateEntryReferencesFromTypedValues(
vector<csbtree_internal::EntryReference> *entry_references) const {
DEBUG_ASSERT(!key_is_composite_);
DEBUG_ASSERT(entry_references->empty());
tuple_id null_count = 0;
if (tuple_store_.isPacked()) {
for (tuple_id tid = 0; tid <= tuple_store_.getMaxTupleID(); ++tid) {
TypedValue literal_key(tuple_store_.getAttributeValueTyped(tid, indexed_attribute_ids_.front()));
// Don't insert a NULL key.
if (!literal_key.isNull()) {
entry_references->emplace_back(std::move(literal_key), tid);
} else {
++null_count;
}
}
} else {
for (tuple_id tid = 0; tid <= tuple_store_.getMaxTupleID(); ++tid) {
if (tuple_store_.hasTupleWithID(tid)) {
TypedValue literal_key(tuple_store_.getAttributeValueTyped(tid, indexed_attribute_ids_.front()));
// Don't insert a NULL key.
if (!literal_key.isNull()) {
entry_references->emplace_back(std::move(literal_key), tid);
} else {
++null_count;
}
}
}
}
SortWrappedValues<csbtree_internal::EntryReference,
vector<csbtree_internal::EntryReference>::iterator>(*key_type_,
entry_references->begin(),
entry_references->end());
DEBUG_ASSERT(static_cast<vector<csbtree_internal::EntryReference>::size_type>(tuple_store_.numTuples())
== entry_references->size() + null_count);
}
void CSBTreeIndexSubBlock::generateEntryReferencesFromCompositeKeys(
vector<csbtree_internal::CompositeEntryReference> *entry_references) const {
DEBUG_ASSERT(key_is_composite_);
DEBUG_ASSERT(entry_references->empty());
tuple_id null_count = 0;
if (tuple_store_.isPacked()) {
for (tuple_id tid = 0; tid <= tuple_store_.getMaxTupleID(); ++tid) {
void *key_copy = makeKeyCopy(tid);
// Don't insert a NULL key.
if (key_copy != NULL) {
entry_references->emplace_back(key_copy, tid);
} else {
++null_count;
}
}
} else {
for (tuple_id tid = 0; tid <= tuple_store_.getMaxTupleID(); ++tid) {
if (tuple_store_.hasTupleWithID(tid)) {
void *key_copy = makeKeyCopy(tid);
// Don't insert a NULL key.
if (key_copy != NULL) {
entry_references->emplace_back(key_copy, tid);
} else {
++null_count;
}
}
}
}
sort(entry_references->begin(),
entry_references->end(),
csbtree_internal::CompositeEntryReferenceComparator(*composite_key_comparator_));
DEBUG_ASSERT(static_cast<vector<csbtree_internal::EntryReference>::size_type>(tuple_store_.numTuples())
== entry_references->size() + null_count);
}
uint16_t CSBTreeIndexSubBlock::rebuildInternalLevel(const std::vector<int> &child_node_groups,
uint16_t last_child_num_nodes,
std::vector<int> *used_node_groups) {
DEBUG_ASSERT(last_child_num_nodes > 0);
DEBUG_ASSERT(!child_node_groups.empty());
std::vector<int>::const_iterator last_it = child_node_groups.end() - 1;
// Adjusted to proper value below.
std::vector<int>::const_iterator next_to_last_it = child_node_groups.end();
uint16_t next_to_last_child_num_nodes = max_keys_internal_ + 1;
if (child_node_groups.size() > 1) {
next_to_last_it -= 2;
if (last_child_num_nodes < large_half_num_children_) {
// Rebalance last node groups as needed.
DEBUG_ASSERT(child_node_groups.size() > 1);
next_to_last_child_num_nodes = rebalanceNodeGroups(*next_to_last_it,
child_node_groups.back(),
last_child_num_nodes);
last_child_num_nodes = large_half_num_children_;
}
}
int current_node_group_number = allocateNodeGroup();
DEBUG_ASSERT(current_node_group_number >= 0);
used_node_groups->push_back(current_node_group_number);
uint16_t current_node_number = 0;
for (std::vector<int>::const_iterator it = child_node_groups.begin();
it != child_node_groups.end();
++it) {
if (current_node_number == max_keys_internal_ + 1) {
// Advance to next node group.
current_node_group_number = allocateNodeGroup();
DEBUG_ASSERT(current_node_group_number >= 0);
used_node_groups->push_back(current_node_group_number);
current_node_number = 0;
}
if (it == next_to_last_it) {
makeInternalNode(*it,
next_to_last_child_num_nodes,
getNode(current_node_group_number, current_node_number));
} else if (it == last_it) {
makeInternalNode(*it,
last_child_num_nodes,
getNode(current_node_group_number, current_node_number));
} else {
makeInternalNode(*it,
max_keys_internal_ + 1,
getNode(current_node_group_number, current_node_number));
}
++current_node_number;
}
return current_node_number;
}
uint16_t CSBTreeIndexSubBlock::rebalanceNodeGroups(const int full_node_group_number,
const int underfull_node_group_number,
const uint16_t underfull_num_nodes) {
DEBUG_ASSERT(underfull_num_nodes < large_half_num_children_);
const uint16_t shift_nodes = large_half_num_children_ - underfull_num_nodes;
const uint16_t full_group_remaining_nodes = max_keys_internal_ + 1 - shift_nodes;
// Shift existing nodes in underfull node group right.
memmove(getNode(underfull_node_group_number, shift_nodes),
getNode(underfull_node_group_number, 0),
underfull_num_nodes * kCSBTreeNodeSizeBytes);
// Copy nodes from full node group over.
memcpy(getNode(underfull_node_group_number, 0),
getNode(full_node_group_number, full_group_remaining_nodes),
shift_nodes * kCSBTreeNodeSizeBytes);
// If the rebalanced nodes are leaves, correct the sibling references.
NodeHeader *full_group_last_header
= static_cast<NodeHeader*>(getNode(full_node_group_number, full_group_remaining_nodes - 1));
if (full_group_last_header->is_leaf) {
full_group_last_header->node_group_reference = underfull_node_group_number;
static_cast<NodeHeader*>(getNode(underfull_node_group_number, shift_nodes - 1))->node_group_reference
= kNodeGroupNextLeaf;
}
return full_group_remaining_nodes;
}
void CSBTreeIndexSubBlock::makeInternalNode(const int child_node_group_number,
const uint16_t num_children,
void *node) {
DEBUG_ASSERT(num_children > 1);
// Setup header.
static_cast<NodeHeader*>(node)->num_keys = num_children - 1;
static_cast<NodeHeader*>(node)->is_leaf = false;
static_cast<NodeHeader*>(node)->node_group_reference = child_node_group_number;
// Fill in keys.
char *key_ptr = static_cast<char*>(node) + sizeof(NodeHeader);
for (uint16_t child_num = 1; child_num < num_children; ++child_num) {
DEBUG_ASSERT(static_cast<const NodeHeader*>(getNode(child_node_group_number, child_num))->num_keys > 0);
// TODO(chasseur): We could simply remember the least keys of all nodes
// generated in the previous pass, but that is a time/space tradeoff
// which may not be worth it.
memcpy(key_ptr,
getLeastKey(getNode(child_node_group_number, child_num)),
key_length_bytes_);
key_ptr += key_length_bytes_;
}
}
int CSBTreeIndexSubBlock::allocateNodeGroup() {
if (num_free_node_groups_ == 0) {
// No more node groups are available.
return kNodeGroupNone;
} else {
DEBUG_ASSERT(!node_group_used_bitmap_->getBit(next_free_node_group_));
// Return the next free node group.
int retval = next_free_node_group_;
// Mark this node group as used and decrement the count of free node
// groups.
node_group_used_bitmap_->setBit(retval, true);
--num_free_node_groups_;
// If there are still free node groups remaining, locate the next one.
if (num_free_node_groups_) {
next_free_node_group_ = node_group_used_bitmap_->firstZero(retval + 1);
DEBUG_ASSERT(static_cast<size_t>(next_free_node_group_) < node_group_used_bitmap_->size());
return retval;
} else {
next_free_node_group_ = kNodeGroupNone;
}
return retval;
}
}
void CSBTreeIndexSubBlock::deallocateNodeGroup(const int node_group_number) {
DEBUG_ASSERT(node_group_number >= 0);
DEBUG_ASSERT(static_cast<size_t>(node_group_number) < node_group_used_bitmap_->size());
DEBUG_ASSERT(node_group_used_bitmap_->getBit(node_group_number));
node_group_used_bitmap_->setBit(node_group_number, false);
++num_free_node_groups_;
if (node_group_number < next_free_node_group_) {
next_free_node_group_ = node_group_number;
}
}
} // namespace quickstep