blob: 2965c332ff2847503c18801e852d68758001d426 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// This file implements a concurrent in-memory B-tree similar to the one
// described in the MassTree paper;
// "Cache Craftiness for Fast Multicore Key-Value Storage"
// Mao, Kohler, and Morris
// Eurosys 2012
//
// This implementation is only the B-tree component, and not the "trie of trees"
// which make up their full data structure. In addition to this, there are
// some other key differences:
// - We do not support removal of elements from the tree -- in the Kudu memrowset
// use case, we use a deletion bit to indicate a removed record, and end up
// actually removing the storage at compaction time.
// - We do not support updating elements in the tree. Because we use MVCC, we
// only append new entries. A limited form of update is allowed in that data
// may be modified so long as the size is not changed. In that case, it is
// up to the user to provide concurrency control of the update (eg by using
// atomic operations or external locking)
// - The leaf nodes are linked together with a "next" pointer. This makes
// scanning simpler (the Masstree implementation avoids this because it
// complicates the removal operation)
//
// NOTE: this code disables TSAN for the most part. This is because it uses
// some "clever" concurrency mechanisms which are difficult to model in TSAN.
// We instead ensure correctness by heavy stress-testing.
#pragma once
#include <algorithm>
#include <boost/smart_ptr/detail/yield_k.hpp>
#include <boost/utility/binary.hpp>
#include <memory>
#include <string>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/util/debug/sanitizer_scopes.h"
#include "kudu/util/inline_slice.h"
#include "kudu/util/logging.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/status.h"
//#define TRAVERSE_PREFETCH
#define SCAN_PREFETCH
// Define the following to get an ugly printout on each node split
// to see how much of the node was actually being used.
// #define DEBUG_DUMP_SPLIT_STATS
namespace kudu { namespace tablet {
namespace btree {
// All CBTree implementation classes are templatized on a traits
// structure which customizes the implementation at compile-time.
//
// This default implementation should be reasonable for most usage.
struct BTreeTraits {
enum TraitConstants {
// Number of bytes used per internal node.
kInternalNodeSize = 4 * CACHELINE_SIZE,
// Number of bytes used by a leaf node.
kLeafNodeSize = 4 * CACHELINE_SIZE,
// Tests can set this trait to a non-zero value, which inserts
// some pause-loops in key parts of the code to try to simulate
// races.
kDebugRaciness = 0
};
typedef ThreadSafeArena ArenaType;
};
template<class T>
inline void PrefetchMemory(const T *addr) {
// During the LLVM 11 upgrade, the sanitizer runs reported null pointer dereference. Wrapping the
// prefetch() call with an if statement fixed the errors. In the end, it was decided to add a
// DCHECK() here, and wrap all uses of PrefetchMemory() with the null pointer guard.
DCHECK(addr);
int size = std::min<int>(sizeof(T), 4 * CACHELINE_SIZE);
for (int i = 0; i < size; i += CACHELINE_SIZE) {
prefetch(reinterpret_cast<const char *>(addr) + i, PREFETCH_HINT_T0);
}
}
// Utility function that, when Traits::debug_raciness is non-zero
// (i.e only in debug code), will spin for some amount of time
// related to that setting.
// This can be used when trying to debug race conditions, but
// will compile away in production code.
template<class Traits>
void DebugRacyPoint() {
if (Traits::kDebugRaciness > 0) {
boost::detail::yield(Traits::kDebugRaciness);
}
}
template<class Traits> class NodeBase;
template<class Traits> class InternalNode;
template<class Traits> class LeafNode;
template<class Traits> class PreparedMutation;
template<class Traits> class CBTree;
template<class Traits> class CBTreeIterator;
typedef base::subtle::Atomic64 AtomicVersion;
struct VersionField {
public:
static AtomicVersion StableVersion(volatile AtomicVersion *version) {
for (int loop_count = 0; true; loop_count++) {
AtomicVersion v_acq = base::subtle::Acquire_Load(version);
if (PREDICT_TRUE(!IsLocked(v_acq))) {
return v_acq;
}
boost::detail::yield(loop_count++);
}
}
static void Lock(volatile AtomicVersion *version) {
int loop_count = 0;
while (true) {
AtomicVersion v_acq = base::subtle::Acquire_Load(version);
if (PREDICT_TRUE(!IsLocked(v_acq))) {
AtomicVersion v_locked = SetLockBit(v_acq, 1);
if (PREDICT_TRUE(base::subtle::Acquire_CompareAndSwap(version, v_acq, v_locked) == v_acq)) {
return;
}
}
// Either was already locked by someone else, or CAS failed.
boost::detail::yield(loop_count++);
}
}
static void Unlock(volatile AtomicVersion *version) {
// NoBarrier should be OK here, because no one else modifies the
// version while we have it locked.
AtomicVersion v = base::subtle::NoBarrier_Load(version);
DCHECK(v & BTREE_LOCK_MASK);
// If splitting, increment the splitting field
v += ((v & BTREE_SPLITTING_MASK) >> BTREE_SPLITTING_BIT) << BTREE_VSPLIT_SHIFT;
// If inserting, increment the insert field
v += ((v & BTREE_INSERTING_MASK) >> BTREE_INSERTING_BIT) << BTREE_VINSERT_SHIFT;
// Get rid of the lock, flags and any overflow into the unused section.
v = SetLockBit(v, 0);
v &= ~(BTREE_UNUSED_MASK | BTREE_INSERTING_MASK | BTREE_SPLITTING_MASK);
base::subtle::Release_Store(version, v);
}
static uint64_t GetVSplit(AtomicVersion v) {
return v & BTREE_VSPLIT_MASK;
}
static uint64_t GetVInsert(AtomicVersion v) {
return (v & BTREE_VINSERT_MASK) >> BTREE_VINSERT_SHIFT;
}
static void SetSplitting(volatile AtomicVersion *v) {
base::subtle::Release_Store(v, *v | BTREE_SPLITTING_MASK);
}
static void SetInserting(volatile AtomicVersion *v) {
base::subtle::Release_Store(v, *v | BTREE_INSERTING_MASK);
}
static void SetLockedInsertingNoBarrier(volatile AtomicVersion *v) {
*v = VersionField::BTREE_LOCK_MASK | VersionField::BTREE_INSERTING_MASK;
}
// Return true if the two version fields differ in more
// than just the lock status.
static bool IsDifferent(AtomicVersion v1, AtomicVersion v2) {
return PREDICT_FALSE((v1 & ~BTREE_LOCK_MASK) != (v2 & ~BTREE_LOCK_MASK));
}
// Return true if a split has occurred between the two versions
// or is currently in progress
static bool HasSplit(AtomicVersion v1, AtomicVersion v2) {
return PREDICT_FALSE((v1 & (BTREE_VSPLIT_MASK | BTREE_SPLITTING_MASK)) !=
(v2 & (BTREE_VSPLIT_MASK | BTREE_SPLITTING_MASK)));
}
static inline bool IsLocked(AtomicVersion v) {
return v & BTREE_LOCK_MASK;
}
static inline bool IsSplitting(AtomicVersion v) {
return v & BTREE_SPLITTING_MASK;
}
static inline bool IsInserting(AtomicVersion v) {
return v & BTREE_INSERTING_MASK;
}
static std::string Stringify(AtomicVersion v) {
return StringPrintf("[flags=%c%c%c vins=%" PRIu64 " vsplit=%" PRIu64 "]",
(v & BTREE_LOCK_MASK) ? 'L':' ',
(v & BTREE_SPLITTING_MASK) ? 'S':' ',
(v & BTREE_INSERTING_MASK) ? 'I':' ',
GetVInsert(v),
GetVSplit(v));
}
private:
enum {
BTREE_LOCK_BIT = 63,
BTREE_SPLITTING_BIT = 62,
BTREE_INSERTING_BIT = 61,
BTREE_VINSERT_SHIFT = 27,
BTREE_VSPLIT_SHIFT = 0,
#define BB(x) BOOST_BINARY(x)
BTREE_LOCK_MASK =
BB(10000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 ),
BTREE_SPLITTING_MASK =
BB(01000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 ),
BTREE_INSERTING_MASK =
BB(00100000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 ),
// There is one unused byte between the single-bit fields and the
// incremented fields. This allows us to efficiently increment the
// fields and avoid an extra instruction or two, since we don't need
// to worry about overflow. If vsplit overflows into vinsert, that's
// not a problem, since the vsplit change always results in a retry.
// If we roll over into this unused bit, we'll mask it out.
BTREE_UNUSED_MASK =
BB(00010000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 ),
BTREE_VINSERT_MASK =
BB(00001111 11111111 11111111 11111111 11111100 00000000 00000000 00000000 ),
BTREE_VSPLIT_MASK =
BB(00000000 00000000 00000000 00000000 00000011 11111111 11111111 11111111 ),
#undef BB
};
//Undeclared constructor - this is just static utilities.
VersionField();
static AtomicVersion SetLockBit(AtomicVersion v, int lock) {
DCHECK(lock == 0 || lock == 1);
v = v & ~BTREE_LOCK_MASK;
COMPILE_ASSERT(sizeof(AtomicVersion) == 8, must_use_64bit_version);
v |= (uint64_t)lock << BTREE_LOCK_BIT;
return v;
}
};
// Slice-like class for representing pointers to values in leaf nodes.
// This is used in preference to a normal Slice only so that it can have
// the same API as InlineSlice, and because it takes up less space
// inside of the tree leaves themselves.
//
// Stores the length of its data as the first sizeof(uintptr_t) bytes of
// the pointed-to data.
class ValueSlice {
private:
// We have to use a word-size field to store the length of the slice so
// that the user's data starts at a word-aligned address.
// Otherwise, the user could not use atomic operations on pointers inside
// their value (eg the mutation linked list in an MRS).
typedef uintptr_t size_type;
public:
Slice as_slice() const {
return Slice(ptr_ + sizeof(size_type),
*reinterpret_cast<const size_type*>(ptr_));
}
// Set this slice to a copy of 'src', allocated from alloc_arena.
// The copy will be word-aligned. No memory ordering is implied.
template<class ArenaType>
void set(const Slice& src, ArenaType* alloc_arena) {
uint8_t* in_arena = reinterpret_cast<uint8_t*>(
alloc_arena->AllocateBytesAligned(src.size() + sizeof(size_type),
sizeof(uint8_t*)));
// No special CAS/etc are necessary here, since anyone calling this holds the
// lock on the row. Concurrent readers never try to follow this pointer until
// they've gotten a consistent snapshot.
//
// (This is different than the keys, where concurrent tree traversers may
// actually try to follow the key indirection pointers from InlineSlice
// without copying a snapshot first).
DCHECK_LE(src.size(), MathLimits<size_type>::kMax)
<< "Slice too large for btree";
size_type size = src.size();
memcpy(in_arena, &size, sizeof(size));
memcpy(in_arena + sizeof(size), src.data(), src.size());
ptr_ = const_cast<const uint8_t*>(in_arena);
}
void prefetch() {
::prefetch(reinterpret_cast<const char*>(ptr_), PREFETCH_HINT_T0);
}
private:
const uint8_t* ptr_;
} PACKED;
// Return the index of the first entry in the array which is
// >= the given value
template<size_t N>
size_t FindInSliceArray(const InlineSlice<N, true> *array, ssize_t num_entries,
const Slice &key, bool *exact) {
DCHECK_GE(num_entries, 0);
if (PREDICT_FALSE(num_entries == 0)) {
*exact = false;
return 0;
}
size_t left = 0;
size_t right = num_entries - 1;
while (left < right) {
int mid = (left + right + 1) / 2;
// TODO: inline slices with more than 8 bytes will store a prefix of the
// slice inline, which we could use to short circuit some of these comparisons.
int compare = array[mid].as_slice().compare(key);
if (compare < 0) { // mid < key
left = mid;
} else if (compare > 0) { // mid > search
right = mid - 1;
} else { // mid == search
*exact = true;
return mid;
}
}
int compare = array[left].as_slice().compare(key);
*exact = compare == 0;
if (compare < 0) { // key > left
left++;
}
return left;
}
template<class ISlice, class ArenaType>
static void InsertInSliceArray(ISlice *array, size_t num_entries,
const Slice &src, size_t idx,
ArenaType *arena) {
DCHECK_LT(idx, num_entries);
for (size_t i = num_entries - 1; i > idx; i--) {
array[i] = array[i - 1];
}
array[idx].set(src, arena);
}
template<class Traits>
class NodeBase {
public:
AtomicVersion StableVersion() {
return VersionField::StableVersion(&version_);
}
AtomicVersion AcquireVersion() {
return base::subtle::Acquire_Load(&version_);
}
void Lock() {
VersionField::Lock(&version_);
}
bool IsLocked() {
return VersionField::IsLocked(version_);
}
void Unlock() {
VersionField::Unlock(&version_);
}
void SetSplitting() {
VersionField::SetSplitting(&version_);
}
void SetInserting() {
VersionField::SetInserting(&version_);
}
// Return the parent node for this node, with the lock acquired.
InternalNode<Traits> *GetLockedParent() {
while (true) {
InternalNode<Traits> *ret = parent_;
if (ret == NULL) {
return NULL;
}
ret->Lock();
if (PREDICT_FALSE(parent_ != ret)) {
// My parent changed after accomplishing the lock
ret->Unlock();
continue;
}
return ret;
}
}
protected:
friend class CBTree<Traits>;
NodeBase() : version_(0), parent_(NULL)
{}
public:
volatile AtomicVersion version_;
// parent_ field is protected not by this node's lock, but by
// the parent's lock. This allows reassignment of the parent_
// field to occur after a split without gathering locks for all
// the children.
InternalNode<Traits> *parent_;
private:
DISALLOW_COPY_AND_ASSIGN(NodeBase);
} PACKED;
// Wrapper around a void pointer, which encodes the type
// of the pointed-to object in its most-significant-bit.
// The pointer may reference either an internal node or a
// leaf node.
// This assumes that the most significant bit of all valid pointers is
// 0, so that that bit can be used as storage. This is true on x86, where
// pointers are truly only 48-bit.
template<class T>
struct NodePtr {
enum NodeType {
INTERNAL_NODE,
LEAF_NODE
};
NodePtr() : p_(NULL) {}
NodePtr(InternalNode<T> *p) { // NOLINT(runtime/explicit)
uintptr_t p_int = reinterpret_cast<uintptr_t>(p);
DCHECK(!(p_int & kDiscriminatorBit)) << "Pointer must not use most significant bit";
p_ = p;
}
NodePtr(LeafNode<T> *p) { // NOLINT(runtime/explicit)
uintptr_t p_int = reinterpret_cast<uintptr_t>(p);
DCHECK(!(p_int & kDiscriminatorBit)) << "Pointer must not use most significant bit";
p_ = reinterpret_cast<void *>(p_int | kDiscriminatorBit);
}
NodeType type() {
DCHECK(p_ != NULL);
if (reinterpret_cast<uintptr_t>(p_) & kDiscriminatorBit) {
return LEAF_NODE;
} else {
return INTERNAL_NODE;
}
}
bool is_null() {
return p_ == NULL;
}
InternalNode<T> *internal_node_ptr() {
DCHECK_EQ(type(), INTERNAL_NODE);
return reinterpret_cast<InternalNode<T> *>(p_);
}
LeafNode<T> *leaf_node_ptr() {
DCHECK_EQ(type(), LEAF_NODE);
return reinterpret_cast<LeafNode<T> *>(
reinterpret_cast<uintptr_t>(p_) & (~kDiscriminatorBit));
}
NodeBase<T> *base_ptr() {
DCHECK(!is_null());
return reinterpret_cast<NodeBase<T> *>(
reinterpret_cast<uintptr_t>(p_) & (~kDiscriminatorBit));
}
void *p_;
private:
enum {
kDiscriminatorBit = (1L << (sizeof(uintptr_t) * 8 - 1))
};
} PACKED;
enum InsertStatus {
INSERT_SUCCESS,
INSERT_FULL,
INSERT_DUPLICATE
};
////////////////////////////////////////////////////////////
// Internal node
////////////////////////////////////////////////////////////
template<class Traits>
class PACKED InternalNode : public NodeBase<Traits> {
public:
// Construct a new internal node, containing the given children.
// This also reassigns the parent pointer of the children.
// Because other accessors of the tree may follow the children's
// parent pointers back up to discover a new root, and the parent
// pointers are covered by their parent's lock, this requires that
// the new internal node node is constructed in LOCKED state.
InternalNode(const Slice &split_key,
NodePtr<Traits> lchild,
NodePtr<Traits> rchild,
typename Traits::ArenaType* arena)
: num_children_(0) {
DCHECK_EQ(lchild.type(), rchild.type())
<< "Only expect to create a new internal node on account of a "
<< "split: child nodes should have same type";
// Just assign the version, instead of using the proper ->Lock()
// since we don't need a CAS here.
VersionField::SetLockedInsertingNoBarrier(&this->version_);
keys_[0].set(split_key, arena);
DCHECK_GT(split_key.size(), 0);
child_pointers_[0] = lchild;
child_pointers_[1] = rchild;
ReassignParent(lchild);
ReassignParent(rchild);
num_children_ = 2;
}
// Insert a new entry to the internal node.
//
// This is typically called after one of its child nodes has split.
InsertStatus Insert(const Slice &key, NodePtr<Traits> right_child,
typename Traits::ArenaType* arena) {
DCHECK(this->IsLocked());
CHECK_GT(key.size(), 0);
bool exact;
size_t idx = Find(key, &exact);
CHECK(!exact)
<< "Trying to insert duplicate key " << KUDU_REDACT(key.ToDebugString())
<< " into an internal node! Internal node keys should result "
<< " from splits and therefore be unique.";
if (PREDICT_FALSE(num_children_ == kFanout)) {
return INSERT_FULL;
}
// About to modify this node - flag it so that concurrent
// readers will retry.
this->SetInserting();
// Insert the key and child pointer in the right spot in the list
int new_num_children = num_children_ + 1;
InsertInSliceArray(keys_, new_num_children, key, idx, arena);
for (int i = new_num_children - 1; i > idx + 1; i--) {
child_pointers_[i] = child_pointers_[i - 1];
}
child_pointers_[idx + 1] = right_child;
base::subtle::Release_Store(reinterpret_cast<volatile Atomic32*>(
&num_children_), new_num_children);
ReassignParent(right_child);
return INSERT_SUCCESS;
}
// Return the node index responsible for the given key.
// For example, if the key is less than the first discriminating
// node, returns 0. If it is between 0 and 1, returns 1, etc.
size_t Find(const Slice &key, bool *exact) {
return FindInSliceArray(keys_, key_count(), key, exact);
}
// Find the child whose subtree may contain the given key.
// Note that this result may be an invalid or incorrect pointer if the
// caller has not locked the node, in which case OCC should be
// used to verify it after its usage.
NodePtr<Traits> FindChild(const Slice &key) {
bool exact;
size_t idx = Find(key, &exact);
if (exact) {
idx++;
}
return child_pointers_[idx];
}
Slice GetKey(size_t idx) const {
DCHECK_LT(idx, key_count());
return keys_[idx].as_slice();
}
// Truncates the node, removing entries from the right to reduce
// to the new size. Also compacts the underlying storage so that all
// free space is contiguous, allowing for new inserts.
void Truncate(size_t new_num_keys) {
DCHECK(this->IsLocked());
DCHECK(VersionField::IsSplitting(this->version_));
DCHECK_GT(new_num_keys, 0);
DCHECK_LT(new_num_keys, key_count());
num_children_ = new_num_keys + 1;
#ifndef NDEBUG
// This loop isn't necessary for correct operation, but nulling the pointers
// might help us catch bugs in debug mode.
for (int i = 0; i < num_children_; i++) {
DCHECK(!child_pointers_[i].is_null());
}
for (int i = num_children_; i < kFanout; i++) {
// reset to NULL
child_pointers_[i] = NodePtr<Traits>();
}
#endif
}
std::string ToString() const {
std::string ret("[");
for (int i = 0; i < num_children_; i++) {
if (i > 0) {
ret.append(", ");
}
Slice k = keys_[i].as_slice();
ret.append(KUDU_REDACT(k.ToDebugString()));
}
ret.append("]");
return ret;
}
private:
friend class CBTree<Traits>;
void ReassignParent(NodePtr<Traits> child) {
child.base_ptr()->parent_ = this;
}
int key_count() const {
// The node uses N keys to separate N+1 child pointers.
DCHECK_GT(num_children_, 0);
return num_children_ - 1;
}
typedef InlineSlice<sizeof(void*), true> KeyInlineSlice;
enum SpaceConstants {
constant_overhead = sizeof(NodeBase<Traits>) // base class
+ sizeof(uint32_t), // num_children_
keyptr_space = Traits::kInternalNodeSize - constant_overhead,
kFanout = keyptr_space / (sizeof(KeyInlineSlice) + sizeof(NodePtr<Traits>))
};
// This ordering of members ensures KeyInlineSlices are properly aligned
// for atomic ops
KeyInlineSlice keys_[kFanout];
NodePtr<Traits> child_pointers_[kFanout];
uint32_t num_children_;
} PACKED;
////////////////////////////////////////////////////////////
// Leaf node
////////////////////////////////////////////////////////////
template<class Traits>
class LeafNode : public NodeBase<Traits> {
public:
// Construct a new leaf node.
// If initially_locked is true, then the new node is created
// with LOCKED and INSERTING set.
explicit LeafNode(bool initially_locked)
: next_(NULL),
num_entries_(0) {
if (initially_locked) {
// Just assign the version, instead of using the proper ->Lock()
// since we don't need a CAS here.
VersionField::SetLockedInsertingNoBarrier(&this->version_);
}
}
int num_entries() const { return num_entries_; }
void PrepareMutation(PreparedMutation<Traits> *ret) {
DCHECK(this->IsLocked());
ret->leaf_ = this;
ret->idx_ = Find(ret->key(), &ret->exists_);
}
// Insert a new entry into this leaf node.
InsertStatus Insert(PreparedMutation<Traits> *mut, const Slice &val) {
DCHECK_EQ(this, mut->leaf());
DCHECK(this->IsLocked());
if (PREDICT_FALSE(mut->exists())) {
return INSERT_DUPLICATE;
}
return InsertNew(mut->idx(), mut->key(), val, mut->arena());
}
// Insert an entry at the given index, which is guaranteed to be
// new.
InsertStatus InsertNew(size_t idx, const Slice &key, const Slice &val,
typename Traits::ArenaType* arena) {
if (PREDICT_FALSE(num_entries_ == kMaxEntries)) {
// Full due to metadata
return INSERT_FULL;
}
DCHECK_LT(idx, kMaxEntries);
this->SetInserting();
// The following inserts should always succeed because we
// verified that there is space available above.
num_entries_++;
InsertInSliceArray(keys_, num_entries_, key, idx, arena);
DebugRacyPoint<Traits>();
InsertInSliceArray(vals_, num_entries_, val, idx, arena);
return INSERT_SUCCESS;
}
// Find the index of the first key which is >= the given
// search key.
// If the comparison is equal, then sets *exact to true.
// If no keys in the leaf are >= the given search key,
// then returns the size of the leaf node.
//
// Note that, if the lock is not held, this may return
// bogus results, in which case OCC must be used to verify.
size_t Find(const Slice &key, bool *exact) const {
return FindInSliceArray(keys_, num_entries_, key, exact);
}
// Get the slice corresponding to the nth key.
//
// If the caller does not hold the lock, then this Slice
// may point to arbitrary data, and the result should be only
// trusted when verified by checking for conflicts.
Slice GetKey(size_t idx) const {
return keys_[idx].as_slice();
}
ValueSlice GetValue(size_t idx) const {
return vals_[idx];
}
// Get the slice corresponding to the nth key and value.
//
// If the caller does not hold the lock, then this Slice
// may point to arbitrary data, and the result should be only
// trusted when verified by checking for conflicts.
//
// NOTE: the value slice may include an *invalid pointer*, not
// just invalid data, so any readers should check for conflicts
// before accessing the value slice.
// The key, on the other hand, will always be a valid pointer, but
// may be invalid data.
void Get(size_t idx, Slice *k, ValueSlice *v) const {
*k = GetKey(idx);
*v = GetValue(idx);
}
// Truncates the node, removing entries from the right to reduce
// to the new size.
// Caller must hold the node's lock with the SPLITTING flag set.
void Truncate(size_t new_num_entries) {
DCHECK(this->IsLocked());
DCHECK(VersionField::IsSplitting(this->version_));
DCHECK_LT(new_num_entries, num_entries_);
num_entries_ = new_num_entries;
}
std::string ToString() const {
std::string ret;
for (int i = 0; i < num_entries_; i++) {
if (i > 0) {
ret.append(", ");
}
Slice k = keys_[i].as_slice();
Slice v = vals_[i].as_slice();
ret.append("[");
ret.append(KUDU_REDACT(k.ToDebugString()));
ret.append("=");
ret.append(KUDU_REDACT(v.ToDebugString()));
ret.append("]");
}
return ret;
}
private:
friend class CBTree<Traits>;
friend class InternalNode<Traits>;
friend class CBTreeIterator<Traits>;
typedef InlineSlice<sizeof(void*), true> KeyInlineSlice;
// It is necessary to name this enum so that DCHECKs can use its
// constants (the macros may attempt to specialize templates
// with the constants, which require a named type).
enum SpaceConstants {
constant_overhead = sizeof(NodeBase<Traits>) // base class
+ sizeof(LeafNode<Traits>*) // next_
+ sizeof(uint8_t), // num_entries_
kv_space = Traits::kLeafNodeSize - constant_overhead,
kMaxEntries = kv_space / (sizeof(KeyInlineSlice) + sizeof(ValueSlice))
};
// This ordering of members keeps KeyInlineSlices so pointers are aligned
LeafNode<Traits>* next_;
KeyInlineSlice keys_[kMaxEntries];
ValueSlice vals_[kMaxEntries];
uint8_t num_entries_;
} PACKED;
////////////////////////////////////////////////////////////
// Tree API
////////////////////////////////////////////////////////////
// A "scoped" object which holds a lock on a leaf node.
// Instances should be prepared with CBTree::PrepareMutation()
// and then used with a further Insert() call.
template<class Traits>
class PreparedMutation {
public:
// Construct a PreparedMutation.
//
// The data referred to by the 'key' Slice passed in themust remain
// valid for the lifetime of the PreparedMutation object.
explicit PreparedMutation(Slice key)
: key_(key), tree_(NULL), leaf_(NULL), needs_unlock_(false) {}
~PreparedMutation() {
UnPrepare();
}
void Reset(const Slice& key) {
UnPrepare();
key_ = key;
}
// Prepare a mutation against the given tree.
//
// This prepared mutation may then be used with Insert().
// In between preparing and executing the insert, the leaf node remains
// locked, so callers should endeavour to keep the critical section short.
//
// If the returned PreparedMutation object is not used with
// Insert(), it will be automatically unlocked by its destructor.
void Prepare(CBTree<Traits> *tree) {
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
CHECK(!prepared());
this->tree_ = tree;
this->arena_ = tree->arena_.get();
tree->PrepareMutation(this);
needs_unlock_ = true;
}
bool Insert(const Slice &val) {
CHECK(prepared());
return tree_->Insert(this, val);
}
// Return a slice referencing the existing data in the row.
//
// This is mutable data, but the size may not be changed.
// This can be used for updating in place if the new data
// has the same size as the original data.
Slice current_mutable_value() {
CHECK(prepared());
Slice k;
ValueSlice v;
leaf_->Get(idx_, &k, &v);
leaf_->SetInserting();
return v.as_slice();
}
// Accessors
bool prepared() const {
return tree_ != NULL;
}
// Return the key that was prepared.
const Slice &key() const { return key_; }
CBTree<Traits> *tree() const {
return tree_;
}
LeafNode<Traits> *leaf() const {
return CHECK_NOTNULL(leaf_);
}
// Return true if the key that was prepared already exists.
bool exists() const {
return exists_;
}
const size_t idx() const {
return idx_;
}
typename Traits::ArenaType* arena() {
return arena_;
}
private:
friend class CBTree<Traits>;
friend class LeafNode<Traits>;
friend class TestCBTree;
DISALLOW_COPY_AND_ASSIGN(PreparedMutation);
void mark_done() {
// set leaf_ back to NULL without unlocking it,
// since the caller will unlock it.
needs_unlock_ = false;
}
void UnPrepare() {
if (leaf_ != NULL && needs_unlock_) {
leaf_->Unlock();
needs_unlock_ = false;
}
tree_ = NULL;
}
Slice key_;
CBTree<Traits> *tree_;
// The arena where inserted data may be copied if the data is too
// large to fit entirely within a tree node.
typename Traits::ArenaType* arena_;
LeafNode<Traits> *leaf_;
size_t idx_;
bool exists_;
bool needs_unlock_;
};
template<class Traits = BTreeTraits>
class CBTree {
public:
CBTree() : CBTree(std::make_shared<typename Traits::ArenaType>(4 * 1024)) {
}
explicit CBTree(std::shared_ptr<typename Traits::ArenaType> arena)
: arena_(std::move(arena)), root_(NewLeaf(false)), frozen_(false) {}
~CBTree() {
RecursiveDelete(root_);
}
// Convenience API to insert an item.
//
// Returns true if successfully inserted, false if an item with the given
// key already exists.
//
// More advanced users can use the PreparedMutation class instead.
bool Insert(const Slice &key, const Slice &val) {
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
PreparedMutation<Traits> mutation(key);
mutation.Prepare(this);
return mutation.Insert(val);
}
void DebugPrint() const {
AtomicVersion v;
DebugPrint(StableRoot(&v), NULL, 0);
CHECK_EQ(root_.base_ptr()->AcquireVersion(), v)
<< "Concurrent modification during DebugPrint not allowed";
}
enum GetResult {
GET_SUCCESS,
GET_NOT_FOUND,
GET_TOO_BIG
};
// Get a copy of the given key, storing the result in the
// provided buffer.
// Returns SUCCESS and sets *buf_len on success
// Returns NOT_FOUND if no such key is found
// Returns TOO_BIG if the key is too large to fit in the provided buffer.
// In this case, sets *buf_len to the required buffer size.
//
// TODO: this call probably won't be necessary in the final implementation
GetResult GetCopy(const Slice &key, char *buf, size_t *buf_len) const {
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
size_t in_buf_len = *buf_len;
retry_from_root:
{
AtomicVersion version;
LeafNode<Traits> *leaf = CHECK_NOTNULL(TraverseToLeaf(key, &version));
DebugRacyPoint();
retry_in_leaf:
{
GetResult ret;
Slice key_in_node;
ValueSlice val_in_node;
bool exact;
size_t idx = leaf->Find(key, &exact);
DebugRacyPoint();
if (!exact) {
ret = GET_NOT_FOUND;
} else {
leaf->Get(idx, &key_in_node, &val_in_node);
ret = GET_SUCCESS;
}
// Got some kind of result, but may be based on racy data.
// Verify it.
AtomicVersion new_version = leaf->StableVersion();
if (VersionField::HasSplit(version, new_version)) {
goto retry_from_root;
} else if (VersionField::IsDifferent(version, new_version)) {
version = new_version;
goto retry_in_leaf;
}
// If we found a matching key earlier, and the read of the node
// wasn't racy, we can safely work with the ValueSlice.
if (ret == GET_SUCCESS) {
Slice val = val_in_node.as_slice();
*buf_len = val.size();
if (PREDICT_FALSE(val.size() > in_buf_len)) {
ret = GET_TOO_BIG;
} else {
memcpy(buf, val.data(), val.size());
}
}
return ret;
}
}
}
// Returns true if the given key is contained in the tree.
// TODO: unit test
bool ContainsKey(const Slice &key) const {
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
bool ret;
retry_from_root:
{
AtomicVersion version;
LeafNode<Traits> *leaf = CHECK_NOTNULL(TraverseToLeaf(key, &version));
DebugRacyPoint();
retry_in_leaf:
{
leaf->Find(key, &ret);
DebugRacyPoint();
// Got some kind of result, but may be based on racy data.
// Verify it.
AtomicVersion new_version = leaf->StableVersion();
if (VersionField::HasSplit(version, new_version)) {
goto retry_from_root;
} else if (VersionField::IsDifferent(version, new_version)) {
version = new_version;
goto retry_in_leaf;
}
return ret;
}
}
}
CBTreeIterator<Traits> *NewIterator() const {
return new CBTreeIterator<Traits>(this, frozen_);
}
// Return the current number of elements in the tree.
//
// Note that this requires iterating through the entire tree,
// so it is not very efficient.
size_t count() const {
std::unique_ptr<CBTreeIterator<Traits>> iter(NewIterator());
bool exact;
iter->SeekAtOrAfter(Slice(""), &exact);
size_t count = 0;
while (iter->IsValid()) {
count++;
iter->Next();
}
return count;
}
// Return true if this tree contains no elements
bool empty() const {
NodePtr<Traits> root = root_;
switch (root.type()) {
case NodePtr<Traits>::INTERNAL_NODE:
// If there's already an internal node, then we've inserted some data.
// Because we don't remove, this means we definitely have data.
return false;
case NodePtr<Traits>::LEAF_NODE:
return root.leaf_node_ptr()->num_entries() == 0;
default:
CHECK(0) << "bad type";
return true;
}
}
size_t estimate_memory_usage() const {
return arena_->memory_footprint();
}
// Mark the tree as frozen.
// Once frozen, no further mutations may occur without triggering a CHECK
// violation. But, new iterators created after this point can scan more
// efficiently.
void Freeze() {
frozen_ = true;
}
private:
friend class PreparedMutation<Traits>;
friend class CBTreeIterator<Traits>;
DISALLOW_COPY_AND_ASSIGN(CBTree);
NodePtr<Traits> StableRoot(AtomicVersion *stable_version) const {
while (true) {
NodePtr<Traits> node = root_;
NodeBase<Traits> *node_base = node.base_ptr();
*stable_version = node_base->StableVersion();
if (PREDICT_TRUE(node_base->parent_ == NULL)) {
// Found a good root
return node;
} else {
// root has been swapped out
root_ = node_base->parent_;
}
}
}
LeafNode<Traits> *TraverseToLeaf(const Slice &key,
AtomicVersion *stable_version) const {
retry_from_root:
AtomicVersion version = 0;
NodePtr<Traits> node = StableRoot(&version);
NodeBase<Traits> *node_base = node.base_ptr();
while (node.type() != NodePtr<Traits>::LEAF_NODE) {
#ifdef TRAVERSE_PREFETCH
if (node.internal_node_ptr()) {
PrefetchMemory(node.internal_node_ptr());
}
#endif
retry_in_node:
int num_children = node.internal_node_ptr()->num_children_;
NodePtr<Traits> child = node.internal_node_ptr()->FindChild(key);
NodeBase<Traits> *child_base = NULL;
AtomicVersion child_version = -1;
if (PREDICT_TRUE(!child.is_null())) {
child_base = child.base_ptr();
child_version = child_base->StableVersion();
}
AtomicVersion new_node_version = node_base->AcquireVersion();
if (VersionField::IsDifferent(version, new_node_version)) {
new_node_version = node_base->StableVersion();
if (VersionField::HasSplit(version, new_node_version)) {
goto retry_from_root;
} else {
version = new_node_version;
goto retry_in_node;
}
}
int new_children = node.internal_node_ptr()->num_children_;
DCHECK(!child.is_null())
<< "should have changed versions when child was NULL: "
<< "old version: " << VersionField::Stringify(version)
<< " new version: " << VersionField::Stringify(new_node_version)
<< " version now: " << VersionField::Stringify(node_base->AcquireVersion())
<< " num_children: " << num_children << " -> " << new_children;
node = child;
node_base = child_base;
version = child_version;
}
#ifdef TRAVERSE_PREFETCH
if (node.leaf_node_ptr()) {
PrefetchMemory(node.leaf_node_ptr());
}
#endif
*stable_version = version;
return node.leaf_node_ptr();
}
void DebugRacyPoint() const {
btree::DebugRacyPoint<Traits>();
}
// Dump the tree.
// Requires that there are no concurrent modifications/
void DebugPrint(NodePtr<Traits> node,
InternalNode<Traits> *expected_parent,
int indent) const {
std::string buf;
switch (node.type()) {
case NodePtr<Traits>::LEAF_NODE:
{
LeafNode<Traits> *leaf = node.leaf_node_ptr();
SStringPrintf(&buf, "%*sLEAF %p: ", indent, "", leaf);
buf.append(leaf->ToString());
LOG(INFO) << buf;
CHECK_EQ(leaf->parent_, expected_parent) << "failed for " << leaf;
break;
}
case NodePtr<Traits>::INTERNAL_NODE:
{
InternalNode<Traits> *inode = node.internal_node_ptr();
SStringPrintf(&buf, "%*sINTERNAL %p: ", indent, "", inode);
LOG(INFO) << buf;
for (int i = 0; i < inode->num_children_; i++) {
DebugPrint(inode->child_pointers_[i], inode, indent + 4);
if (i < inode->key_count()) {
SStringPrintf(&buf, "%*sKEY ", indent + 2, "");
buf.append(KUDU_REDACT(inode->GetKey(i).ToDebugString()));
LOG(INFO) << buf;
}
}
CHECK_EQ(inode->parent_, expected_parent) << "failed for " << inode;
break;
}
default:
CHECK(0) << "bad node type";
}
}
void RecursiveDelete(NodePtr<Traits> node) {
switch (node.type()) {
case NodePtr<Traits>::LEAF_NODE:
FreeLeaf(node.leaf_node_ptr());
break;
case NodePtr<Traits>::INTERNAL_NODE:
{
InternalNode<Traits> *inode = node.internal_node_ptr();
for (int i = 0; i < inode->num_children_; i++) {
RecursiveDelete(inode->child_pointers_[i]);
inode->child_pointers_[i] = NodePtr<Traits>();
}
FreeInternalNode(inode);
break;
}
default:
CHECK(0);
}
}
void PrepareMutation(PreparedMutation<Traits> *mutation) {
DCHECK_EQ(mutation->tree(), this);
while (true) {
AtomicVersion stable_version;
LeafNode<Traits> *lnode = TraverseToLeaf(mutation->key(), &stable_version);
lnode->Lock();
if (VersionField::HasSplit(lnode->AcquireVersion(), stable_version)) {
// Retry traversal due to a split
lnode->Unlock();
continue;
}
lnode->PrepareMutation(mutation);
return;
}
}
// Inserts the given key/value into the prepared leaf node.
// If the leaf node is already full, handles splitting it and
// propagating splits up the tree.
//
// Precondition:
// 'node' is locked
// Postcondition:
// 'node' is unlocked
bool Insert(PreparedMutation<Traits> *mutation,
const Slice &val) {
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
CHECK(!frozen_);
CHECK_NOTNULL(mutation);
DCHECK_EQ(mutation->tree(), this);
LeafNode<Traits> *node = mutation->leaf();
DCHECK(node->IsLocked());
// After this function, the prepared mutation cannot be used
// again.
mutation->mark_done();
switch (node->Insert(mutation, val)) {
case INSERT_SUCCESS:
node->Unlock();
return true;
case INSERT_DUPLICATE:
node->Unlock();
return false;
case INSERT_FULL:
return SplitLeafAndInsertUp(mutation, val);
// SplitLeafAndInsertUp takes care of unlocking
default:
CHECK(0) << "Unexpected result";
break;
}
CHECK(0) << "should not get here";
return false;
}
// Splits the node 'node', returning the newly created right-sibling
// internal node 'new_inode'.
//
// Locking conditions:
// Precondition:
// node is locked
// Postcondition:
// node is still locked and marked SPLITTING
// new_inode is locked and marked INSERTING
InternalNode<Traits> *SplitInternalNode(InternalNode<Traits> *node,
faststring *separator_key) {
DCHECK(node->IsLocked());
//VLOG(2) << "splitting internal node " << node->GetKey(0).ToString();
// TODO: simplified implementation doesn't deal with splitting
// when there are very small internal nodes.
CHECK_GT(node->key_count(), 2)
<< "TODO: currently only support splitting nodes with >2 keys";
// TODO: can we share code better between the node types here?
// perhaps by making this part of NodeBase, wrapping the K,V slice pair
// in a struct type, etc?
// Pick the split point. The split point is the key which
// will be moved up into the parent node.
int split_point = node->key_count() / 2;
Slice sep_slice = node->GetKey(split_point);
DCHECK_GT(sep_slice.size(), 0) <<
"got bad split key when splitting: " << node->ToString();
separator_key->assign_copy(sep_slice.data(), sep_slice.size());
// Example split:
// [ 0, 1, 2 ]
// / | | \ .
// [A] [B] [C] [D]
//
// split_point = 3/2 = 1
// separator_key = 1
//
// =====>
//
// [ 1 ]
// / |
// [ 0 ] [ 2 ]
// / | | \ .
// [A] [B] [C] [D]
//
NodePtr<Traits> separator_ptr;
InternalNode<Traits> *new_inode = NewInternalNode(
node->GetKey(split_point + 1),
node->child_pointers_[split_point + 1],
node->child_pointers_[split_point + 2]);
// The new inode is constructed in locked and INSERTING state.
// Copy entries to the new right-hand node.
for (int i = split_point + 2; i < node->key_count(); i++) {
Slice k = node->GetKey(i);
DCHECK_GT(k.size(), 0);
NodePtr<Traits> child = node->child_pointers_[i + 1];
DCHECK(!child.is_null());
// TODO: this could be done more efficiently since we know that
// these inserts are coming in sorted order.
CHECK_EQ(INSERT_SUCCESS, new_inode->Insert(k, child, arena_.get()));
}
// Up to this point, we haven't modified the left node, so concurrent
// reads were consistent. But, now we're about to actually mutate,
// so set the flag.
node->SetSplitting();
// Truncate the left node to remove the keys which have been
// moved to the right node
node->Truncate(split_point);
return new_inode;
}
// Split the given leaf node 'node', creating a new node
// with the higher half of the elements.
//
// N.B: the new node is initially locked, but doesn't have the
// SPLITTING flag. This function sets the SPLITTING flag before
// modifying it.
void SplitLeafNode(LeafNode<Traits> *node,
LeafNode<Traits> **new_node) {
DCHECK(node->IsLocked());
#ifdef DEBUG_DUMP_SPLIT_STATS
do {
size_t key_size = 0, val_size = 0;
for (size_t i = 0; i < node->num_entries(); i++) {
Slice k, v;
node->Get(i, &k, &v);
key_size += k.size();
val_size += v.size();
}
LOG(INFO) << "split leaf. entries=" << node->num_entries()
<< " keysize=" << key_size
<< " valsize=" << val_size;
} while (0);
#endif
LeafNode<Traits> *new_leaf = NewLeaf(true);
new_leaf->next_ = node->next_;
// Copy half the keys from node into the new leaf
int copy_start = node->num_entries() / 2;
CHECK_GT(copy_start, 0) <<
"Trying to split a node with 0 or 1 entries";
std::copy(node->keys_ + copy_start, node->keys_ + node->num_entries(),
new_leaf->keys_);
std::copy(node->vals_ + copy_start, node->vals_ + node->num_entries(),
new_leaf->vals_);
new_leaf->num_entries_ = node->num_entries() - copy_start;
// Truncate the left node to remove the keys which have been
// moved to the right node.
node->SetSplitting();
node->next_ = new_leaf;
node->Truncate(copy_start);
*new_node = new_leaf;
}
// Splits a leaf node which is full, adding the new sibling
// node to the tree.
// This recurses upward splitting internal nodes as necessary.
// The node should be locked on entrance to the function
// and will be unlocked upon exit.
bool SplitLeafAndInsertUp(PreparedMutation<Traits> *mutation,
const Slice &val) {
LeafNode<Traits> *node = mutation->leaf();
Slice key = mutation->key_;
// Leaf node should already be locked at this point
DCHECK(node->IsLocked());
//DebugPrint();
LeafNode<Traits> *new_leaf;
SplitLeafNode(node, &new_leaf);
// The new leaf node is returned still locked.
DCHECK(new_leaf->IsLocked());
// Insert the key that we were originally trying to insert in the
// correct side post-split.
Slice split_key = new_leaf->GetKey(0);
LeafNode<Traits> *dst_leaf = (key.compare(split_key) < 0) ? node : new_leaf;
// Re-prepare the mutation after the split.
dst_leaf->PrepareMutation(mutation);
CHECK_EQ(INSERT_SUCCESS, dst_leaf->Insert(mutation, val))
<< "node split at " << KUDU_REDACT(split_key.ToDebugString())
<< " did not result in enough space for key " << KUDU_REDACT(key.ToDebugString())
<< " in left node";
// Insert the new node into the parents.
PropagateSplitUpward(node, new_leaf, split_key);
// NB: No ned to unlock nodes here, since it is done by the upward
// propagation path ('ascend' label in Figure 5 in the masstree paper)
return true;
}
// Assign the parent pointer of 'right', and insert it into the tree
// by propagating splits upward.
// Locking:
// Precondition:
// left and right are both locked
// left is marked SPLITTING
// Postcondition:
// parent is non-null
// parent is marked INSERTING
// left and right are unlocked
void PropagateSplitUpward(NodePtr<Traits> left_ptr, NodePtr<Traits> right_ptr,
const Slice &split_key) {
NodeBase<Traits> *left = left_ptr.base_ptr();
NodeBase<Traits> *right = right_ptr.base_ptr();
DCHECK(left->IsLocked());
DCHECK(right->IsLocked());
InternalNode<Traits> *parent = left->GetLockedParent();
if (parent == NULL) {
// Node is the root - make new parent node
parent = NewInternalNode(split_key, left_ptr, right_ptr);
// Constructor also reassigns parents.
// root_ will be updated lazily by next traverser
left->Unlock();
right->Unlock();
parent->Unlock();
return;
}
// Parent exists. Try to insert
switch (parent->Insert(split_key, right_ptr, arena_.get())) {
case INSERT_SUCCESS:
{
VLOG(3) << "Inserted new entry into internal node "
<< parent << " for " << KUDU_REDACT(split_key.ToDebugString());
left->Unlock();
right->Unlock();
parent->Unlock();
return;
}
case INSERT_FULL:
{
// Split the node in two
faststring sep_key(0);
InternalNode<Traits> *new_inode = SplitInternalNode(parent, &sep_key);
DCHECK(new_inode->IsLocked());
DCHECK(parent->IsLocked()) << "original should still be locked";
// Insert the new entry into the appropriate half.
Slice inode_split(sep_key);
InternalNode<Traits> *dst_inode =
(split_key.compare(inode_split) < 0) ? parent : new_inode;
VLOG(2) << "Split internal node " << parent << " for insert of "
<< KUDU_REDACT(split_key.ToDebugString()) << "[" << right << "]"
<< " (split at " << KUDU_REDACT(inode_split.ToDebugString()) << ")";
CHECK_EQ(INSERT_SUCCESS, dst_inode->Insert(split_key, right_ptr, arena_.get()));
left->Unlock();
right->Unlock();
PropagateSplitUpward(parent, new_inode, inode_split);
break;
}
default:
CHECK(0);
}
}
LeafNode<Traits> *NewLeaf(bool locked) {
void *mem = CHECK_NOTNULL(arena_->AllocateBytesAligned(sizeof(LeafNode<Traits>),
sizeof(AtomicVersion)));
return new (mem) LeafNode<Traits>(locked);
}
InternalNode<Traits> *NewInternalNode(const Slice &split_key,
NodePtr<Traits> lchild,
NodePtr<Traits> rchild) {
void *mem = CHECK_NOTNULL(arena_->AllocateBytesAligned(sizeof(InternalNode<Traits>),
sizeof(AtomicVersion)));
return new (mem) InternalNode<Traits>(split_key, lchild, rchild, arena_.get());
}
void FreeLeaf(LeafNode<Traits> *leaf) {
leaf->~LeafNode();
// No need to actually free, since it came from the arena
}
void FreeInternalNode(InternalNode<Traits> *node) {
node->~InternalNode();
// No need to actually free, since it came from the arena
}
std::shared_ptr<typename Traits::ArenaType> arena_;
// marked 'mutable' because readers will lazy-update the root
// when they encounter a stale root pointer.
mutable NodePtr<Traits> root_;
// If true, the tree is no longer mutable. Once a tree becomes
// frozen, it may not be un-frozen. If an iterator is created on
// a frozen tree, it will be more efficient.
bool frozen_;
};
template<class Traits>
class CBTreeIterator {
public:
bool SeekToStart() {
bool exact;
return SeekAtOrAfter(Slice(""), &exact);
}
bool SeekAtOrAfter(const Slice &key, bool *exact) {
SeekToLeaf(key);
SeekInLeaf(key, exact, AT_OR_AFTER);
return IsValid();
}
bool SeekAtOrBefore(const Slice &key, bool *exact) {
SeekToLeaf(key);
SeekInLeaf(key, exact, AT_OR_BEFORE);
return IsValid();
}
bool IsValid() const {
return seeked_;
}
bool Next() {
DCHECK(seeked_);
idx_in_leaf_++;
if (idx_in_leaf_ < leaf_to_scan_->num_entries()) {
return true;
} else {
return SeekNextLeaf();
}
}
void GetCurrentEntry(Slice *key, Slice *val) const {
DCHECK(seeked_);
ValueSlice val_slice;
leaf_to_scan_->Get(idx_in_leaf_, key, &val_slice);
*val = val_slice.as_slice();
}
Slice GetCurrentKey() const {
DCHECK(seeked_);
return leaf_to_scan_->GetKey(idx_in_leaf_);
}
Slice GetCurrentValue() const {
DCHECK(seeked_);
ValueSlice val_slice = leaf_to_scan_->GetValue(idx_in_leaf_);
return val_slice.as_slice();
}
////////////////////////////////////////////////////////////
// Advanced functions which expose some of the internal state
// of the iterator, allowing for limited "rewind" capability
// within a given leaf.
//
// Single leaf nodes are the unit of "snapshotting" of this iterator.
// Hence, within a leaf node, the caller may rewind arbitrarily, but once
// moving to the next leaf node, there is no way to go back to the prior
// leaf node without losing consistency.
////////////////////////////////////////////////////////////
// Return the number of entries, including the current one, remaining
// in the leaf.
// For example, if the leaf has three entries [A, B, C], and GetCurrentEntry
// would return 'A', then this will return 3.
size_t remaining_in_leaf() const {
DCHECK(seeked_);
return leaf_to_scan_->num_entries() - idx_in_leaf_;
}
// Return the index of the iterator inside the current leaf node.
size_t index_in_leaf() const {
return idx_in_leaf_;
}
// Rewind the iterator to the given index in the current leaf node,
// which was probably saved off from a previous call to
// remaining_in_leaf().
//
// If Next() was called more times than remaining_in_leaf(), then
// this call will not be successful.
void RewindToIndexInLeaf(size_t new_index_in_leaf) {
DCHECK(seeked_);
DCHECK_LT(new_index_in_leaf, leaf_to_scan_->num_entries());
idx_in_leaf_ = new_index_in_leaf;
}
// Get the key at a specific leaf node
Slice GetKeyInLeaf(size_t idx) const {
DCHECK(seeked_);
return leaf_to_scan_->GetKey(idx);
}
// Get the given indexed entry in the current leaf node.
void GetEntryInLeaf(size_t idx, Slice *key, Slice *val) {
DCHECK(seeked_);
DCHECK_LT(idx, leaf_to_scan_->num_entries());
leaf_to_scan_->Get(idx, key, val);
}
private:
friend class CBTree<Traits>;
// Control the mode in SeekInLeaf.
// AT_OR_BEFORE will seek the first idx with key <= given key.
// AT_OR_AFTER will seek the first idx with key >= given key.
enum SeekMode {
AT_OR_BEFORE,
AT_OR_AFTER
};
CBTreeIterator(const CBTree<Traits> *tree,
bool tree_frozen) :
tree_(tree),
tree_frozen_(tree_frozen),
seeked_(false),
idx_in_leaf_(-1),
leaf_copy_(false),
leaf_to_scan_(&leaf_copy_)
{}
bool SeekInLeaf(const Slice &key, bool *exact, SeekMode mode) {
DCHECK(seeked_);
int idx = leaf_to_scan_->Find(key, exact);
if (mode == AT_OR_BEFORE) {
// If do not contain key equal to the given key, the result should be idx - 1;
// If contain the key(exact == true), the result should be idx.
// There are two corner case:
// 1. If all records in all leftnode > key will get idx = 0.
// 2. If all records in all leftnode < key will get idx idx = num_entries.
//
// The accuracy of this mode depends on the fact that we didn't optimize the
// internal nodes with "smallest separator" key. After optimize this function
// needs refactoring.
if (*exact) {
idx_in_leaf_ = idx;
return true;
}
if (leaf_to_scan_->num_entries() == 0 || idx == 0) {
seeked_ = false;
return false;
}
idx_in_leaf_ = idx - 1;
} else {
DCHECK_EQ(mode, AT_OR_AFTER);
idx_in_leaf_ = idx;
if (idx_in_leaf_ == leaf_to_scan_->num_entries()) {
// not found in leaf, seek to start of next leaf if it exists.
return SeekNextLeaf();
}
}
return true;
}
void SeekToLeaf(const Slice &key) {
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
retry_from_root:
{
AtomicVersion version;
LeafNode<Traits> *leaf = tree_->TraverseToLeaf(key, &version);
#ifdef SCAN_PREFETCH
if (leaf->next_) {
PrefetchMemory(leaf->next_);
}
#endif
// If the tree is frozen, we don't need to follow optimistic concurrency.
if (tree_frozen_) {
leaf_to_scan_ = leaf;
seeked_ = true;
return;
}
retry_in_leaf:
{
memcpy(static_cast<void*>(&leaf_copy_), leaf, sizeof(leaf_copy_));
AtomicVersion new_version = leaf->StableVersion();
if (VersionField::HasSplit(version, new_version)) {
goto retry_from_root;
} else if (VersionField::IsDifferent(version, new_version)) {
version = new_version;
goto retry_in_leaf;
}
// Got a consistent snapshot copy of the leaf node into
// leaf_copy_
leaf_to_scan_ = &leaf_copy_;
}
}
seeked_ = true;
}
bool SeekNextLeaf() {
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
DCHECK(seeked_);
LeafNode<Traits> *next = leaf_to_scan_->next_;
if (PREDICT_FALSE(next == NULL)) {
seeked_ = false;
return false;
}
#ifdef SCAN_PREFETCH
if (next->next_) {
PrefetchMemory(next->next_);
}
for (int i = 0; i < next->num_entries(); i++) {
next->vals_[i].prefetch();
}
#endif
// If the tree is frozen, we don't need to play optimistic concurrency
// games or make a defensive copy.
if (tree_frozen_) {
leaf_to_scan_ = next;
idx_in_leaf_ = 0;
return true;
}
while (true) {
AtomicVersion version = next->StableVersion();
memcpy(static_cast<void*>(&leaf_copy_), next, sizeof(leaf_copy_));
AtomicVersion new_version = next->StableVersion();
if (VersionField::IsDifferent(new_version, version)) {
version = new_version;
} else {
idx_in_leaf_ = 0;
leaf_to_scan_ = &leaf_copy_;
return true;
}
}
}
const CBTree<Traits> *tree_;
// If true, the tree we are scanning is completely frozen and we don't
// need to perform optimistic concurrency control or copies for safety.
bool tree_frozen_;
bool seeked_;
size_t idx_in_leaf_;
LeafNode<Traits> leaf_copy_;
LeafNode<Traits> *leaf_to_scan_;
};
} // namespace btree
} // namespace tablet
} // namespace kudu