blob: bdabd1c5f099343a06b6b3ea81d6c1a36d45c627 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <memory>
#include <string>
#include <tuple>
#include <utility>
#include "kudu/common/generic_iterators.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/memory/arena.h"
using std::all_of;
using std::get;
using std::move;
using std::remove_if;
using std::shared_ptr;
using std::sort;
using std::string;
using std::tuple;
DEFINE_bool(materializing_iterator_do_pushdown, true,
"Should MaterializingIterator do predicate pushdown");
TAG_FLAG(materializing_iterator_do_pushdown, hidden);
namespace kudu {
////////////////////////////////////////////////////////////
// Merge iterator
////////////////////////////////////////////////////////////
// TODO: size by bytes, not # rows
static const int kMergeRowBuffer = 1000;
// MergeIterState wraps a RowwiseIterator for use by the MergeIterator.
// Importantly, it also filters out unselected rows from the wrapped RowwiseIterator,
// such that all returned rows are valid.
class MergeIterState {
public:
explicit MergeIterState(const shared_ptr<RowwiseIterator> &iter) :
iter_(iter),
arena_(1024, 256*1024),
read_block_(iter->schema(), kMergeRowBuffer, &arena_),
next_row_idx_(0),
num_advanced_(0),
num_valid_(0)
{}
const RowBlockRow& next_row() {
DCHECK_LT(num_advanced_, num_valid_);
return next_row_;
}
Status Advance() {
num_advanced_++;
if (IsBlockExhausted()) {
arena_.Reset();
return PullNextBlock();
} else {
// Seek to the next selected row.
SelectionVector *selection = read_block_.selection_vector();
for (++next_row_idx_; next_row_idx_ < read_block_.nrows(); next_row_idx_++) {
if (selection->IsRowSelected(next_row_idx_)) {
next_row_.Reset(&read_block_, next_row_idx_);
break;
}
}
DCHECK_NE(next_row_idx_, read_block_.nrows()+1) << "No selected rows found!";
return Status::OK();
}
}
bool IsBlockExhausted() const {
return num_advanced_ == num_valid_;
}
bool IsFullyExhausted() const {
return num_valid_ == 0;
}
Status PullNextBlock() {
CHECK_EQ(num_advanced_, num_valid_)
<< "should not pull next block until current block is exhausted";
if (!iter_->HasNext()) {
// Fully exhausted
num_advanced_ = 0;
num_valid_ = 0;
return Status::OK();
}
RETURN_NOT_OK(iter_->NextBlock(&read_block_));
num_advanced_ = 0;
// Honor the selection vector of the read_block_, since not all rows are necessarily selected.
SelectionVector *selection = read_block_.selection_vector();
DCHECK_EQ(selection->nrows(), read_block_.nrows());
DCHECK_LE(selection->CountSelected(), read_block_.nrows());
num_valid_ = selection->CountSelected();
VLOG(2) << selection->CountSelected() << "/" << read_block_.nrows() << " rows selected";
// Seek next_row_ to the first selected row.
for (next_row_idx_ = 0; next_row_idx_ < read_block_.nrows(); next_row_idx_++) {
if (selection->IsRowSelected(next_row_idx_)) {
next_row_.Reset(&read_block_, next_row_idx_);
break;
}
}
DCHECK_NE(next_row_idx_, read_block_.nrows()+1) << "No selected rows found!";
return Status::OK();
}
size_t remaining_in_block() const {
return num_valid_ - num_advanced_;
}
const shared_ptr<RowwiseIterator>& iter() const {
return iter_;
}
shared_ptr<RowwiseIterator> iter_;
Arena arena_;
RowBlock read_block_;
// The row currently pointed to by the iterator.
RowBlockRow next_row_;
// Row index of next_row_ in read_block_.
size_t next_row_idx_;
// Number of rows we've advanced past in the current RowBlock.
size_t num_advanced_;
// Number of valid (selected) rows in the current RowBlock.
size_t num_valid_;
};
MergeIterator::MergeIterator(
const Schema &schema,
const vector<shared_ptr<RowwiseIterator> > &iters)
: schema_(schema),
initted_(false) {
CHECK_GT(iters.size(), 0);
CHECK_GT(schema.num_key_columns(), 0);
orig_iters_.assign(iters.begin(), iters.end());
}
Status MergeIterator::Init(ScanSpec *spec) {
CHECK(!initted_);
// TODO: check that schemas match up!
RETURN_NOT_OK(InitSubIterators(spec));
for (shared_ptr<MergeIterState> &state : iters_) {
RETURN_NOT_OK(state->PullNextBlock());
}
// Before we copy any rows, clean up any iterators which were empty
// to start with. Otherwise, HasNext() won't properly return false
// if we were passed only empty iterators.
iters_.erase(
remove_if(iters_.begin(), iters_.end(), [] (const shared_ptr<MergeIterState>& iter) {
return PREDICT_FALSE(iter->IsFullyExhausted());
}),
iters_.end());
initted_ = true;
return Status::OK();
}
bool MergeIterator::HasNext() const {
CHECK(initted_);
return !iters_.empty();
}
Status MergeIterator::InitSubIterators(ScanSpec *spec) {
// Initialize all the sub iterators.
for (shared_ptr<RowwiseIterator> &iter : orig_iters_) {
ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&iter, spec_copy));
iters_.push_back(shared_ptr<MergeIterState>(new MergeIterState(iter)));
}
// Since we handle predicates in all the wrapped iterators, we can clear
// them here.
if (spec != nullptr) {
spec->RemovePredicates();
}
return Status::OK();
}
Status MergeIterator::NextBlock(RowBlock* dst) {
CHECK(initted_);
DCHECK_SCHEMA_EQ(dst->schema(), schema());
PrepareBatch(dst);
RETURN_NOT_OK(MaterializeBlock(dst));
return Status::OK();
}
void MergeIterator::PrepareBatch(RowBlock* dst) {
if (dst->arena()) {
dst->arena()->Reset();
}
// We can always provide at least as many rows as are remaining
// in the currently queued up blocks.
size_t available = 0;
for (shared_ptr<MergeIterState> &iter : iters_) {
available += iter->remaining_in_block();
}
dst->Resize(std::min(dst->row_capacity(), available));
}
// TODO: this is an obvious spot to add codegen - there's a ton of branching
// and such around the comparisons. A simple experiment indicated there's some
// 2x to be gained.
Status MergeIterator::MaterializeBlock(RowBlock *dst) {
// Initialize the selection vector.
// MergeIterState only returns selected rows.
dst->selection_vector()->SetAllTrue();
for (size_t dst_row_idx = 0; dst_row_idx < dst->nrows(); dst_row_idx++) {
RowBlockRow dst_row = dst->row(dst_row_idx);
// Find the sub-iterator which is currently smallest
MergeIterState *smallest = nullptr;
ssize_t smallest_idx = -1;
// Typically the number of iters_ is not that large, so using a priority
// queue is not worth it
for (size_t i = 0; i < iters_.size(); i++) {
shared_ptr<MergeIterState> &state = iters_[i];
if (smallest == nullptr ||
schema_.Compare(state->next_row(), smallest->next_row()) < 0) {
smallest = state.get();
smallest_idx = i;
}
}
// If no iterators had any row left, then we're done iterating.
if (PREDICT_FALSE(smallest == nullptr)) break;
// Otherwise, copy the row from the smallest one, and advance it
RETURN_NOT_OK(CopyRow(smallest->next_row(), &dst_row, dst->arena()));
RETURN_NOT_OK(smallest->Advance());
if (smallest->IsFullyExhausted()) {
iters_.erase(iters_.begin() + smallest_idx);
}
}
return Status::OK();
}
string MergeIterator::ToString() const {
string s;
s.append("Merge(");
bool first = true;
for (const shared_ptr<RowwiseIterator> &iter : orig_iters_) {
s.append(iter->ToString());
if (!first) {
s.append(", ");
}
first = false;
}
s.append(")");
return s;
}
const Schema& MergeIterator::schema() const {
CHECK(initted_);
return schema_;
}
void MergeIterator::GetIteratorStats(vector<IteratorStats>* stats) const {
CHECK(initted_);
vector<vector<IteratorStats> > stats_by_iter;
for (const shared_ptr<RowwiseIterator>& iter : orig_iters_) {
vector<IteratorStats> stats_for_iter;
iter->GetIteratorStats(&stats_for_iter);
stats_by_iter.push_back(stats_for_iter);
}
for (size_t idx = 0; idx < schema_.num_columns(); ++idx) {
IteratorStats stats_for_col;
for (const vector<IteratorStats>& stats_for_iter : stats_by_iter) {
stats_for_col.AddStats(stats_for_iter[idx]);
}
stats->push_back(stats_for_col);
}
}
////////////////////////////////////////////////////////////
// Union iterator
////////////////////////////////////////////////////////////
UnionIterator::UnionIterator(const vector<shared_ptr<RowwiseIterator> > &iters)
: initted_(false),
iters_(iters.size()) {
CHECK_GT(iters.size(), 0);
iters_.assign(iters.begin(), iters.end());
all_iters_.assign(iters.begin(), iters.end());
}
Status UnionIterator::Init(ScanSpec *spec) {
CHECK(!initted_);
// Initialize the underlying iterators
RETURN_NOT_OK(InitSubIterators(spec));
// Verify schemas match.
// Important to do the verification after initializing the
// sub-iterators, since they may not know their own schemas
// until they've been initialized (in the case of a union of unions)
schema_.reset(new Schema(iters_.front()->schema()));
for (const shared_ptr<RowwiseIterator> &iter : iters_) {
if (!iter->schema().Equals(*schema_)) {
return Status::InvalidArgument(
string("Schemas do not match: ") + schema_->ToString()
+ " vs " + iter->schema().ToString());
}
}
initted_ = true;
return Status::OK();
}
Status UnionIterator::InitSubIterators(ScanSpec *spec) {
for (shared_ptr<RowwiseIterator> &iter : iters_) {
ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&iter, spec_copy));
}
// Since we handle predicates in all the wrapped iterators, we can clear
// them here.
if (spec != nullptr) {
spec->RemovePredicates();
}
return Status::OK();
}
bool UnionIterator::HasNext() const {
CHECK(initted_);
for (const shared_ptr<RowwiseIterator> &iter : iters_) {
if (iter->HasNext()) return true;
}
return false;
}
Status UnionIterator::NextBlock(RowBlock* dst) {
CHECK(initted_);
PrepareBatch();
RETURN_NOT_OK(MaterializeBlock(dst));
FinishBatch();
return Status::OK();
}
void UnionIterator::PrepareBatch() {
CHECK(initted_);
while (!iters_.empty() &&
!iters_.front()->HasNext()) {
iters_.pop_front();
}
}
Status UnionIterator::MaterializeBlock(RowBlock *dst) {
return iters_.front()->NextBlock(dst);
}
void UnionIterator::FinishBatch() {
if (!iters_.front()->HasNext()) {
// Iterator exhausted, remove it.
iters_.pop_front();
}
}
string UnionIterator::ToString() const {
string s;
s.append("Union(");
bool first = true;
for (const shared_ptr<RowwiseIterator> &iter : iters_) {
if (!first) {
s.append(", ");
}
first = false;
s.append(iter->ToString());
}
s.append(")");
return s;
}
void UnionIterator::GetIteratorStats(std::vector<IteratorStats>* stats) const {
CHECK(initted_);
vector<vector<IteratorStats> > stats_by_iter;
for (const shared_ptr<RowwiseIterator>& iter : all_iters_) {
vector<IteratorStats> stats_for_iter;
iter->GetIteratorStats(&stats_for_iter);
stats_by_iter.push_back(stats_for_iter);
}
for (size_t idx = 0; idx < schema_->num_columns(); ++idx) {
IteratorStats stats_for_col;
for (const vector<IteratorStats>& stats_for_iter : stats_by_iter) {
stats_for_col.AddStats(stats_for_iter[idx]);
}
stats->push_back(stats_for_col);
}
}
////////////////////////////////////////////////////////////
// Materializing iterator
////////////////////////////////////////////////////////////
MaterializingIterator::MaterializingIterator(shared_ptr<ColumnwiseIterator> iter)
: iter_(move(iter)),
disallow_pushdown_for_tests_(!FLAGS_materializing_iterator_do_pushdown) {
}
Status MaterializingIterator::Init(ScanSpec *spec) {
RETURN_NOT_OK(iter_->Init(spec));
int32_t num_columns = schema().num_columns();
col_idx_predicates_.clear();
non_predicate_column_indexes_.clear();
if (spec != nullptr && !disallow_pushdown_for_tests_) {
col_idx_predicates_.reserve(spec->predicates().size());
non_predicate_column_indexes_.reserve(num_columns - spec->predicates().size());
for (const auto& col_pred : spec->predicates()) {
const ColumnPredicate& pred = col_pred.second;
int col_idx = schema().find_column(pred.column().name());
if (col_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument("No such column", col_pred.first);
}
VLOG(1) << "Pushing down predicate " << pred.ToString();
col_idx_predicates_.emplace_back(col_idx, move(col_pred.second));
}
for (int32_t col_idx = 0; col_idx < schema().num_columns(); col_idx++) {
if (!ContainsKey(spec->predicates(), schema().column(col_idx).name())) {
non_predicate_column_indexes_.emplace_back(col_idx);
}
}
// Since we'll evaluate these predicates ourselves, remove them from the
// scan spec so higher layers don't repeat our work.
spec->RemovePredicates();
} else {
non_predicate_column_indexes_.reserve(num_columns);
for (int32_t col_idx = 0; col_idx < num_columns; col_idx++) {
non_predicate_column_indexes_.emplace_back(col_idx);
}
}
// Sort the predicates by selectivity so that the most selective are evaluated earlier.
sort(col_idx_predicates_.begin(), col_idx_predicates_.end(),
[] (const tuple<int32_t, ColumnPredicate>& left,
const tuple<int32_t, ColumnPredicate>& right) {
return SelectivityComparator(get<1>(left), get<1>(right));
});
return Status::OK();
}
bool MaterializingIterator::HasNext() const {
return iter_->HasNext();
}
Status MaterializingIterator::NextBlock(RowBlock* dst) {
size_t n = dst->row_capacity();
if (dst->arena()) {
dst->arena()->Reset();
}
RETURN_NOT_OK(iter_->PrepareBatch(&n));
dst->Resize(n);
RETURN_NOT_OK(MaterializeBlock(dst));
RETURN_NOT_OK(iter_->FinishBatch());
return Status::OK();
}
Status MaterializingIterator::MaterializeBlock(RowBlock *dst) {
// Initialize the selection vector indicating which rows have been
// been deleted.
RETURN_NOT_OK(iter_->InitializeSelectionVector(dst->selection_vector()));
for (const auto& col_pred : col_idx_predicates_) {
// Materialize the column itself into the row block.
ColumnBlock dst_col(dst->column_block(get<0>(col_pred)));
RETURN_NOT_OK(iter_->MaterializeColumn(get<0>(col_pred), &dst_col));
// Evaluate the column predicate.
get<1>(col_pred).Evaluate(dst_col, dst->selection_vector());
// If after evaluating this predicate the entire row block has been filtered
// out, we don't need to materialize other columns at all.
if (!dst->selection_vector()->AnySelected()) {
DVLOG(1) << "0/" << dst->nrows() << " passed predicate";
return Status::OK();
}
}
for (size_t col_idx : non_predicate_column_indexes_) {
// Materialize the column itself into the row block.
ColumnBlock dst_col(dst->column_block(col_idx));
RETURN_NOT_OK(iter_->MaterializeColumn(col_idx, &dst_col));
}
DVLOG(1) << dst->selection_vector()->CountSelected() << "/"
<< dst->nrows() << " passed predicate";
return Status::OK();
}
string MaterializingIterator::ToString() const {
string s;
s.append("Materializing(").append(iter_->ToString()).append(")");
return s;
}
////////////////////////////////////////////////////////////
// PredicateEvaluatingIterator
////////////////////////////////////////////////////////////
PredicateEvaluatingIterator::PredicateEvaluatingIterator(shared_ptr<RowwiseIterator> base_iter)
: base_iter_(move(base_iter)) {
}
Status PredicateEvaluatingIterator::InitAndMaybeWrap(
shared_ptr<RowwiseIterator> *base_iter, ScanSpec *spec) {
RETURN_NOT_OK((*base_iter)->Init(spec));
if (spec != nullptr && !spec->predicates().empty()) {
// Underlying iterator did not accept all predicates. Wrap it.
shared_ptr<RowwiseIterator> wrapper(new PredicateEvaluatingIterator(*base_iter));
CHECK_OK(wrapper->Init(spec));
base_iter->swap(wrapper);
}
return Status::OK();
}
Status PredicateEvaluatingIterator::Init(ScanSpec *spec) {
// base_iter_ already Init()ed before this is constructed.
CHECK_NOTNULL(spec);
// Gather any predicates that the base iterator did not pushdown, and remove
// the predicates from the spec.
col_idx_predicates_.clear();
col_idx_predicates_.reserve(spec->predicates().size());
for (auto& predicate : spec->predicates()) {
col_idx_predicates_.emplace_back(move(predicate.second));
}
spec->RemovePredicates();
// Sort the predicates by selectivity so that the most selective are evaluated earlier.
sort(col_idx_predicates_.begin(), col_idx_predicates_.end(), SelectivityComparator);
return Status::OK();
}
bool PredicateEvaluatingIterator::HasNext() const {
return base_iter_->HasNext();
}
Status PredicateEvaluatingIterator::NextBlock(RowBlock *dst) {
RETURN_NOT_OK(base_iter_->NextBlock(dst));
for (const auto& predicate : col_idx_predicates_) {
int32_t col_idx = dst->schema().find_column(predicate.column().name());
if (col_idx == Schema::kColumnNotFound) {
return Status::InvalidArgument("Unknown column in predicate", predicate.ToString());
}
predicate.Evaluate(dst->column_block(col_idx), dst->selection_vector());
// If after evaluating this predicate, the entire row block has now been
// filtered out, we don't need to evaluate any further predicates.
if (!dst->selection_vector()->AnySelected()) {
break;
}
}
return Status::OK();
}
string PredicateEvaluatingIterator::ToString() const {
return strings::Substitute("PredicateEvaluating($0)", base_iter_->ToString());
}
} // namespace kudu