blob: bd4a162291f6779793c3ccff9d6c9e13c8d88cd8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_COMMON_MERGE_ITERATOR_H
#define KUDU_COMMON_MERGE_ITERATOR_H
#include <deque>
#include <gtest/gtest_prod.h>
#include <memory>
#include <string>
#include <tuple>
#include <unordered_map>
#include <vector>
#include "kudu/common/iterator.h"
#include "kudu/common/scan_spec.h"
#include "kudu/util/object_pool.h"
namespace kudu {
class Arena;
class MergeIterState;
// An iterator which merges the results of other iterators, comparing
// based on keys.
class MergeIterator : public RowwiseIterator {
public:
// TODO: clarify whether schema is just the projection, or must include the merge
// key columns. It should probably just be the required projection, which must be
// a subset of the columns in 'iters'.
MergeIterator(const Schema &schema,
const std::vector<std::shared_ptr<RowwiseIterator> > &iters);
// The passed-in iterators should be already initialized.
Status Init(ScanSpec *spec) OVERRIDE;
virtual bool HasNext() const OVERRIDE;
virtual string ToString() const OVERRIDE;
virtual const Schema& schema() const OVERRIDE;
virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE;
virtual Status NextBlock(RowBlock* dst) OVERRIDE;
private:
void PrepareBatch(RowBlock* dst);
Status MaterializeBlock(RowBlock* dst);
Status InitSubIterators(ScanSpec *spec);
const Schema schema_;
bool initted_;
// Holds the subiterators until Init is called.
// This is required because we can't create a MergeIterState of an uninitialized iterator.
std::deque<std::shared_ptr<RowwiseIterator> > orig_iters_;
std::vector<std::shared_ptr<MergeIterState> > iters_;
// When the underlying iterators are initialized, each needs its own
// copy of the scan spec in order to do its own pushdown calculations, etc.
// The copies are allocated from this pool so they can be automatically freed
// when the UnionIterator goes out of scope.
ObjectPool<ScanSpec> scan_spec_copies_;
};
// An iterator which unions the results of other iterators.
// This is different from MergeIterator in that it lays the results out end-to-end
// rather than merging them based on keys. Hence it is more efficient since there is
// no comparison needed, and the key column does not need to be read if it is not
// part of the projection.
class UnionIterator : public RowwiseIterator {
public:
// Construct a union iterator of the given iterators.
// The iterators must have matching schemas.
// The passed-in iterators should not yet be initialized.
//
// All passed-in iterators must be fully able to evaluate all predicates - i.e.
// calling iter->Init(spec) should remove all predicates from the spec.
explicit UnionIterator(const std::vector<std::shared_ptr<RowwiseIterator> > &iters);
Status Init(ScanSpec *spec) OVERRIDE;
bool HasNext() const OVERRIDE;
string ToString() const OVERRIDE;
const Schema &schema() const OVERRIDE {
CHECK(initted_);
CHECK(schema_.get() != NULL) << "Bad schema in " << ToString();
return *CHECK_NOTNULL(schema_.get());
}
virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE;
virtual Status NextBlock(RowBlock* dst) OVERRIDE;
private:
void PrepareBatch();
Status MaterializeBlock(RowBlock* dst);
void FinishBatch();
Status InitSubIterators(ScanSpec *spec);
// Schema: initialized during Init()
gscoped_ptr<Schema> schema_;
bool initted_;
std::deque<std::shared_ptr<RowwiseIterator> > iters_;
// Since we pop from 'iters_' this field is needed in order to keep
// the underlying iterators available for GetIteratorStats.
std::vector<std::shared_ptr<RowwiseIterator> > all_iters_;
// When the underlying iterators are initialized, each needs its own
// copy of the scan spec in order to do its own pushdown calculations, etc.
// The copies are allocated from this pool so they can be automatically freed
// when the UnionIterator goes out of scope.
ObjectPool<ScanSpec> scan_spec_copies_;
};
// An iterator which wraps a ColumnwiseIterator, materializing it into full rows.
//
// Column predicates are pushed down into this iterator. While materializing a
// block, columns with associated predicates are materialized first, and the
// predicates evaluated. If the predicates succeed in filtering out an entire
// batch, then other columns may avoid doing any IO.
class MaterializingIterator : public RowwiseIterator {
public:
explicit MaterializingIterator(std::shared_ptr<ColumnwiseIterator> iter);
// Initialize the iterator, performing predicate pushdown as described above.
Status Init(ScanSpec *spec) OVERRIDE;
bool HasNext() const OVERRIDE;
string ToString() const OVERRIDE;
const Schema &schema() const OVERRIDE {
return iter_->schema();
}
virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE {
iter_->GetIteratorStats(stats);
}
virtual Status NextBlock(RowBlock* dst) OVERRIDE;
private:
FRIEND_TEST(TestMaterializingIterator, TestPredicatePushdown);
FRIEND_TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation);
Status MaterializeBlock(RowBlock *dst);
std::shared_ptr<ColumnwiseIterator> iter_;
// List of (column index, predicate) in order of most to least selective.
std::vector<std::tuple<int32_t, ColumnPredicate>> col_idx_predicates_;
// List of column indexes without predicates to materialize.
std::vector<int32_t> non_predicate_column_indexes_;
// Set only by test code to disallow pushdown.
bool disallow_pushdown_for_tests_;
};
// An iterator which wraps another iterator and evaluates any predicates that the
// wrapped iterator did not itself handle during push down.
class PredicateEvaluatingIterator : public RowwiseIterator {
public:
// Initialize the given '*base_iter' with the given 'spec'.
//
// If the base_iter accepts all predicates, then simply returns.
// Otherwise, swaps out *base_iter for a PredicateEvaluatingIterator which wraps
// the original iterator and accepts all predicates on its behalf.
//
// POSTCONDITION: spec->predicates().empty()
// POSTCONDITION: base_iter and its wrapper are initialized
static Status InitAndMaybeWrap(std::shared_ptr<RowwiseIterator> *base_iter,
ScanSpec *spec);
// Initialize the iterator.
// POSTCONDITION: spec->predicates().empty()
Status Init(ScanSpec *spec) OVERRIDE;
virtual Status NextBlock(RowBlock *dst) OVERRIDE;
bool HasNext() const OVERRIDE;
string ToString() const OVERRIDE;
const Schema &schema() const OVERRIDE {
return base_iter_->schema();
}
virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE {
base_iter_->GetIteratorStats(stats);
}
private:
// Construct the evaluating iterator.
// This is only called from ::InitAndMaybeWrap()
// REQUIRES: base_iter is already Init()ed.
explicit PredicateEvaluatingIterator(std::shared_ptr<RowwiseIterator> base_iter);
FRIEND_TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation);
std::shared_ptr<RowwiseIterator> base_iter_;
// List of (column index, predicate) in order of most to least selective.
std::vector<ColumnPredicate> col_idx_predicates_;
};
} // namespace kudu
#endif