blob: 8d861dcb90ec0ea17a32f9be493fe15115bebec8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/rowset_tree.h"
#include <algorithm>
#include <cstddef>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include <ostream>
#include <glog/logging.h>
#include "kudu/gutil/stl_util.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/rowset_metadata.h"
#include "kudu/util/interval_tree.h"
#include "kudu/util/interval_tree-inl.h"
#include "kudu/util/slice.h"
using std::optional;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace tablet {
namespace {
// Lexicographic, first by slice, then by rowset pointer, then by start/stop
bool RSEndpointBySliceCompare(const RowSetTree::RSEndpoint& a,
const RowSetTree::RSEndpoint& b) {
int slice_cmp = a.slice_.compare(b.slice_);
if (slice_cmp) return slice_cmp < 0;
ptrdiff_t rs_cmp = a.rowset_ - b.rowset_;
if (rs_cmp) return rs_cmp < 0;
if (a.endpoint_ != b.endpoint_) return a.endpoint_ == RowSetTree::START;
return false;
}
// Wrapper used when making batch queries into the interval tree.
struct QueryStruct {
// The slice of the operation performing the query.
Slice slice;
// The original index of this slice in the incoming batch.
int idx;
};
} // anonymous namespace
// Entry for use in the interval tree.
struct RowSetWithBounds {
string min_key;
string max_key;
// NOTE: the ordering of struct fields here is purposeful: we access
// min_key and max_key frequently, so putting them first in the struct
// ensures they fill a single 64-byte cache line (each is 32 bytes).
// The 'rowset' pointer is accessed comparitively rarely.
RowSet *rowset;
};
// Traits struct for IntervalTree.
struct RowSetIntervalTraits {
typedef Slice point_type;
typedef RowSetWithBounds *interval_type;
static Slice get_left(const RowSetWithBounds *rs) {
return Slice(rs->min_key);
}
static Slice get_right(const RowSetWithBounds *rs) {
return Slice(rs->max_key);
}
static int compare(const Slice &a, const Slice &b) {
return a.compare(b);
}
static int compare(const Slice &a, const QueryStruct &b) {
return a.compare(b.slice);
}
static int compare(const QueryStruct &a, const Slice &b) {
return -compare(b, a);
}
// When 'a' is std::nullopt:
// (1)'a' is +OO when 'positive_direction' is true;
// (2)'a' is -OO when 'positive_direction' is false.
static int compare(const optional<Slice>& a,
const Slice& b,
const EndpointIfNone& type) {
if (!a) {
return ((POSITIVE_INFINITY == type) ? 1 : -1);
}
return compare(*a, b);
}
static int compare(const Slice& a,
const optional<Slice>& b,
const EndpointIfNone& type) {
return -compare(b, a, type);
}
};
RowSetTree::RowSetTree()
: initted_(false) {
}
Status RowSetTree::Reset(const RowSetVector &rowsets) {
CHECK(!initted_);
std::vector<RowSetWithBounds *> entries;
RowSetVector unbounded;
ElementDeleter deleter(&entries);
entries.reserve(rowsets.size());
std::vector<RSEndpoint> endpoints;
endpoints.reserve(rowsets.size()*2);
// Iterate over each of the provided RowSets, fetching their
// bounds and adding them to the local vectors.
for (const shared_ptr<RowSet> &rs : rowsets) {
unique_ptr<RowSetWithBounds> rsit(new RowSetWithBounds());
rsit->rowset = rs.get();
string min_key;
string max_key;
Status s = rs->GetBounds(&min_key, &max_key);
if (s.IsNotSupported()) {
// This rowset is a MemRowSet, for which the bounds change as more
// data gets inserted. Therefore we can't put it in the static
// interval tree -- instead put it on the list which is consulted
// on every access.
unbounded.push_back(rs);
continue;
} else if (!s.ok()) {
LOG(WARNING) << "Unable to construct RowSetTree: "
<< rs->ToString() << " unable to determine its bounds: "
<< s.ToString();
return s;
}
DCHECK_LE(min_key.compare(max_key), 0)
<< "Rowset min must be <= max: " << rs->ToString();
// Load bounds and save entry
rsit->min_key = std::move(min_key);
rsit->max_key = std::move(max_key);
// Load into key endpoints.
endpoints.emplace_back(rsit->rowset, START, rsit->min_key);
endpoints.emplace_back(rsit->rowset, STOP, rsit->max_key);
entries.push_back(rsit.release());
}
// Sort endpoints
std::sort(endpoints.begin(), endpoints.end(), RSEndpointBySliceCompare);
// Install the vectors into the object.
entries_.swap(entries);
unbounded_rowsets_.swap(unbounded);
tree_.reset(new IntervalTree<RowSetIntervalTraits>(entries_));
key_endpoints_.swap(endpoints);
all_rowsets_.assign(rowsets.begin(), rowsets.end());
// Build the mapping from DRS ID to DRS.
drs_by_id_.clear();
for (auto& rs : all_rowsets_) {
if (rs->metadata()) {
InsertOrDie(&drs_by_id_, rs->metadata()->id(), rs.get());
}
}
initted_ = true;
return Status::OK();
}
void RowSetTree::FindRowSetsIntersectingInterval(const optional<Slice>& lower_bound,
const optional<Slice>& upper_bound,
vector<RowSet*>* rowsets) const {
DCHECK(initted_);
// All rowsets with unknown bounds need to be checked.
for (const shared_ptr<RowSet> &rs : unbounded_rowsets_) {
rowsets->push_back(rs.get());
}
vector<RowSetWithBounds *> from_tree;
from_tree.reserve(all_rowsets_.size());
tree_->FindIntersectingInterval(lower_bound, upper_bound, &from_tree);
rowsets->reserve(rowsets->size() + from_tree.size());
for (RowSetWithBounds *rs : from_tree) {
rowsets->push_back(rs->rowset);
}
}
void RowSetTree::FindRowSetsWithKeyInRange(const Slice &encoded_key,
vector<RowSet *> *rowsets) const {
DCHECK(initted_);
// All rowsets with unknown bounds need to be checked.
for (const shared_ptr<RowSet> &rs : unbounded_rowsets_) {
rowsets->push_back(rs.get());
}
// Query the interval tree to efficiently find rowsets with known bounds
// whose ranges overlap the probe key.
vector<RowSetWithBounds *> from_tree;
from_tree.reserve(all_rowsets_.size());
tree_->FindContainingPoint(encoded_key, &from_tree);
rowsets->reserve(rowsets->size() + from_tree.size());
for (RowSetWithBounds *rs : from_tree) {
rowsets->push_back(rs->rowset);
}
}
void RowSetTree::ForEachRowSetContainingKeys(
const std::vector<Slice>& encoded_keys,
const std::function<void(RowSet*, int)>& cb) const {
DCHECK(std::is_sorted(encoded_keys.cbegin(), encoded_keys.cend(),
Slice::Comparator()));
// All rowsets with unknown bounds need to be checked.
for (const shared_ptr<RowSet> &rs : unbounded_rowsets_) {
for (int i = 0; i < encoded_keys.size(); i++) {
cb(rs.get(), i);
}
}
// The interval tree batch query callback would naturally just give us back
// the matching Slices, but that won't allow us to easily tell the caller
// which specific operation _index_ matched the RowSet. So, we make a vector
// of QueryStructs to pair the Slice with its original index.
vector<QueryStruct> queries;
queries.resize(encoded_keys.size());
for (int i = 0; i < encoded_keys.size(); i++) {
queries[i] = {encoded_keys[i], i};
}
tree_->ForEachIntervalContainingPoints(
queries,
[&](const QueryStruct& qs, RowSetWithBounds* rs) {
cb(rs->rowset, qs.idx);
});
}
RowSetTree::~RowSetTree() {
STLDeleteElements(&entries_);
}
} // namespace tablet
} // namespace kudu