| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/common/generic_iterators.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <map> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <random> |
| #include <string> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <glog/stl_logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/common/column_materialization_context.h" |
| #include "kudu/common/column_predicate.h" |
| #include "kudu/common/columnblock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/iterator_stats.h" |
| #include "kudu/common/key_encoder.h" |
| #include "kudu/common/predicate_effectiveness.h" |
| #include "kudu/common/rowblock.h" |
| #include "kudu/common/rowblock_memory.h" |
| #include "kudu/common/scan_spec.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/types.h" |
| #include "kudu/gutil/integral_types.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/mathlimits.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/block_bloom_filter.h" |
| #include "kudu/util/hash.pb.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DEFINE_int32(num_lists, 3, "Number of lists to merge"); |
| DEFINE_int32(num_rows, 1000, "Number of entries per list"); |
| DEFINE_int32(num_iters, 1, "Number of times to run merge"); |
| DECLARE_bool(materializing_iterator_do_pushdown); |
| DECLARE_int32(predicate_effectivess_num_skip_blocks); |
| |
| using std::map; |
| using std::optional; |
| using std::pair; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| static const int kValColIdx = 0; // Index of 'val' column in these test schemas. |
| static const Schema kIntSchema({ ColumnSchema("val", INT64) }, |
| /*key_columns=*/1); |
| static const bool kIsDeletedReadDefault = false; |
| static const Schema kIntSchemaWithVCol({ ColumnSchema("val", INT64), |
| ColumnSchema("is_deleted", IS_DELETED, |
| /*is_nullable=*/false, |
| /*is_immutable=*/false, |
| /*read_default=*/&kIsDeletedReadDefault) }, |
| /*key_columns=*/1); |
| |
| // Test iterator which just yields integer rows from a provided |
| // vector. |
| class VectorIterator : public ColumnwiseIterator { |
| public: |
| VectorIterator(vector<int64_t> ints, vector<uint8_t> is_deleted, Schema schema) |
| : ints_(std::move(ints)), |
| is_deleted_(std::move(is_deleted)), |
| schema_(std::move(schema)), |
| cur_idx_(0), |
| block_size_(ints_.size()), |
| sel_vec_(nullptr) { |
| CHECK_EQ(ints_.size(), is_deleted_.size()); |
| } |
| |
| explicit VectorIterator(const vector<int64_t>& ints) |
| : VectorIterator(ints, vector<uint8_t>(ints.size()), kIntSchema) { |
| } |
| |
| // Set the number of rows that will be returned in each |
| // call to PrepareBatch(). |
| void set_block_size(int block_size) { |
| block_size_ = block_size; |
| } |
| |
| void set_selection_vector(SelectionVector* sv) { |
| sel_vec_ = sv; |
| } |
| |
| Status Init(ScanSpec* /*spec*/) override { |
| return Status::OK(); |
| } |
| |
| Status PrepareBatch(size_t* nrows) override { |
| prepared_ = std::min<int64_t>({ |
| static_cast<int64_t>(ints_.size()) - cur_idx_, |
| block_size_, |
| static_cast<int64_t>(*nrows) }); |
| *nrows = prepared_; |
| return Status::OK(); |
| } |
| |
| Status InitializeSelectionVector(SelectionVector* sv) override { |
| if (!sel_vec_) { |
| sv->SetAllTrue(); |
| return Status::OK(); |
| } |
| for (int i = 0; i < sv->nrows(); i++) { |
| size_t row_idx = cur_idx_ + i; |
| if (row_idx > sel_vec_->nrows() || !sel_vec_->IsRowSelected(row_idx)) { |
| sv->SetRowUnselected(i); |
| } else { |
| DCHECK(sel_vec_->IsRowSelected(row_idx)); |
| sv->SetRowSelected(i); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status MaterializeColumn(ColumnMaterializationContext* ctx) override { |
| ctx->SetDecoderEvalNotSupported(); |
| DCHECK_LE(prepared_, ctx->block()->nrows()); |
| |
| switch (ctx->block()->type_info()->physical_type()) { |
| case INT64: |
| for (size_t i = 0; i < prepared_; i++) { |
| ctx->block()->SetCellValue(i, &(ints_[cur_idx_ + i])); |
| } |
| break; |
| case BOOL: |
| for (size_t i = 0; i < prepared_; i++) { |
| ctx->block()->SetCellValue(i, &(is_deleted_[cur_idx_ + i])); |
| } |
| break; |
| default: |
| LOG(FATAL) << "unsupported column type in VectorIterator"; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status FinishBatch() override { |
| cur_idx_ += prepared_; |
| prepared_ = 0; |
| return Status::OK(); |
| } |
| |
| bool HasNext() const override { |
| return cur_idx_ < ints_.size(); |
| } |
| |
| string ToString() const override { |
| return Substitute("VectorIterator [$0,$1]", ints_[0], ints_[ints_.size() - 1]); |
| } |
| |
| const Schema& schema() const override { |
| return schema_; |
| } |
| |
| void GetIteratorStats(vector<IteratorStats>* stats) const override { |
| stats->resize(schema().num_columns()); |
| } |
| |
| private: |
| vector<int64_t> ints_; |
| // We use vector<uint8_t> instead of vector<bool> to represent the IS_DELETED |
| // column so we can call ColumnBlock::SetCellValue() in MaterializeColumn(), |
| // whose API requires taking an address to a non-temporary for the value. |
| vector<uint8_t> is_deleted_; |
| const Schema schema_; |
| int cur_idx_; |
| int block_size_; |
| size_t prepared_; |
| SelectionVector* sel_vec_; |
| }; |
| |
| // Test that empty input to a merger behaves correctly. |
| TEST(TestMergeIterator, TestMergeEmpty) { |
| unique_ptr<RowwiseIterator> iter( |
| NewMaterializingIterator( |
| unique_ptr<ColumnwiseIterator>(new VectorIterator({})))); |
| vector<IterWithBounds> input; |
| IterWithBounds iwb; |
| iwb.iter = std::move(iter); |
| input.emplace_back(std::move(iwb)); |
| unique_ptr<RowwiseIterator> merger(NewMergeIterator( |
| MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input))); |
| ASSERT_OK(merger->Init(nullptr)); |
| ASSERT_FALSE(merger->HasNext()); |
| } |
| |
| // Test that non-empty input to a merger with a zeroed selection vector |
| // behaves correctly. |
| TEST(TestMergeIterator, TestMergeEmptyViaSelectionVector) { |
| SelectionVector sv(3); |
| sv.SetAllFalse(); |
| unique_ptr<VectorIterator> vec(new VectorIterator({ 1, 2, 3 })); |
| vec->set_selection_vector(&sv); |
| unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(vec))); |
| vector<IterWithBounds> input; |
| IterWithBounds iwb; |
| iwb.iter = std::move(iter); |
| input.emplace_back(std::move(iwb)); |
| unique_ptr<RowwiseIterator> merger(NewMergeIterator( |
| MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input))); |
| ASSERT_OK(merger->Init(nullptr)); |
| ASSERT_FALSE(merger->HasNext()); |
| } |
| |
| // Tests that if we stop using a MergeIterator with several elements remaining, |
| // it is cleaned up properly. |
| TEST(TestMergeIterator, TestNotConsumedCleanup) { |
| unique_ptr<VectorIterator> vec1(new VectorIterator({ 1 })); |
| unique_ptr<VectorIterator> vec2(new VectorIterator({ 2 })); |
| unique_ptr<VectorIterator> vec3(new VectorIterator({ 3 })); |
| |
| vector<IterWithBounds> input; |
| IterWithBounds iwb1; |
| iwb1.iter = NewMaterializingIterator(std::move(vec1)); |
| input.emplace_back(std::move(iwb1)); |
| IterWithBounds iwb2; |
| iwb2.iter = NewMaterializingIterator(std::move(vec2)); |
| input.emplace_back(std::move(iwb2)); |
| IterWithBounds iwb3; |
| iwb3.iter = NewMaterializingIterator(std::move(vec3)); |
| input.emplace_back(std::move(iwb3)); |
| unique_ptr<RowwiseIterator> merger(NewMergeIterator( |
| MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input))); |
| ASSERT_OK(merger->Init(nullptr)); |
| |
| ASSERT_TRUE(merger->HasNext()); |
| RowBlockMemory mem; |
| RowBlock dst(&kIntSchema, 1, &mem); |
| ASSERT_OK(merger->NextBlock(&dst)); |
| ASSERT_EQ(1, dst.nrows()); |
| ASSERT_TRUE(merger->HasNext()); |
| |
| // Let the MergeIterator go out of scope with some remaining elements. |
| } |
| |
| class TestIntRangePredicate { |
| public: |
| TestIntRangePredicate(int64_t lower, int64_t upper, const ColumnSchema& column) |
| : lower_(lower), |
| upper_(upper), |
| pred_(ColumnPredicate::Range(column, &lower_, &upper_)) { |
| } |
| |
| TestIntRangePredicate(int64_t lower, int64_t upper) |
| : TestIntRangePredicate(lower, upper, kIntSchema.column(0)) { |
| } |
| |
| int64_t lower_, upper_; |
| ColumnPredicate pred_; |
| }; |
| |
| void TestMerge(const Schema& schema, |
| const TestIntRangePredicate &predicate, |
| bool overlapping_ranges = true, |
| bool include_deleted_rows = false) { |
| struct List { |
| explicit List(int num_rows) |
| : sv(new SelectionVector(num_rows)) { |
| ints.reserve(num_rows); |
| is_deleted.reserve(num_rows); |
| } |
| |
| vector<int64_t> ints; |
| vector<uint8_t> is_deleted; |
| unique_ptr<SelectionVector> sv; |
| optional<pair<string, string>> encoded_bounds; |
| }; |
| |
| vector<List> all_ints; |
| map<int64_t, bool> expected; |
| unordered_set<int64_t> seen_live; |
| const auto& encoder = GetKeyEncoder<string>(GetTypeInfo(INT64)); |
| Random prng(SeedRandom()); |
| |
| int64_t entry = 0; |
| for (int i = 0; i < FLAGS_num_lists; i++) { |
| List list(FLAGS_num_rows); |
| unordered_set<int64_t> seen_this_list; |
| |
| optional<int64_t> min_entry; |
| optional<int64_t> max_entry; |
| if (overlapping_ranges) { |
| entry = 0; |
| } |
| for (int j = 0; j < FLAGS_num_rows; j++) { |
| int64_t potential; |
| bool is_deleted = false; |
| // The merge iterator does not support duplicate non-deleted keys. |
| while (true) { |
| potential = entry + prng.Uniform(FLAGS_num_rows * FLAGS_num_lists * 10); |
| // Only one live version of a row can exist across all lists. |
| // Including several duplicate deleted keys is fine. |
| if (ContainsKey(seen_live, potential)) continue; |
| |
| // No duplicate keys are allowed in the same list (same RowSet). |
| if (ContainsKey(seen_this_list, potential)) continue; |
| InsertOrDie(&seen_this_list, potential); |
| |
| // If we are including deleted rows, with some probability make this a |
| // deleted row. |
| if (include_deleted_rows) { |
| is_deleted = prng.OneIn(4); |
| if (is_deleted) { |
| break; |
| } |
| } |
| |
| // This is a new live row. Un-mark it as deleted if necessary. |
| if (!is_deleted) { |
| InsertOrDie(&seen_live, potential); |
| } |
| break; |
| } |
| entry = potential; |
| |
| list.ints.emplace_back(entry); |
| list.is_deleted.emplace_back(is_deleted); |
| |
| if (!max_entry || entry > max_entry) { |
| max_entry = entry; |
| } |
| if (!min_entry || entry < min_entry) { |
| min_entry = entry; |
| } |
| |
| // Some entries are randomly deselected in order to exercise the selection |
| // vector logic in the MergeIterator. This is reflected both in the input |
| // to the MergeIterator as well as the expected output (see below). |
| bool row_selected = prng.Uniform(8) > 0; |
| VLOG(2) << Substitute("Row $0 with value $1 selected? $2", |
| j, entry, row_selected); |
| if (row_selected) { |
| list.sv->SetRowSelected(j); |
| } else { |
| list.sv->SetRowUnselected(j); |
| } |
| |
| // Consider the predicate and the selection vector before inserting this |
| // row into 'expected'. |
| if (entry >= predicate.lower_ && entry < predicate.upper_ && row_selected) { |
| auto result = expected.emplace(entry, is_deleted); |
| if (!result.second) { |
| // We should only be overwriting a deleted row. |
| bool existing_is_deleted = result.first->second; |
| CHECK_EQ(true, existing_is_deleted); |
| result.first->second = is_deleted; |
| } |
| } |
| } |
| |
| if (prng.Uniform(10) > 0) { |
| // Use the smallest and largest entries as bounds most of the time. They |
| // are randomly adjusted to reflect their inexactness in the real world. |
| list.encoded_bounds.emplace(); |
| DCHECK(min_entry); |
| DCHECK(max_entry); |
| min_entry = *min_entry - prng.Uniform(5); |
| max_entry = *max_entry + prng.Uniform(5); |
| encoder.Encode(&(*min_entry), &list.encoded_bounds->first); |
| encoder.Encode(&(*max_entry), &list.encoded_bounds->second); |
| } |
| all_ints.emplace_back(std::move(list)); |
| } |
| |
| LOG_TIMING(INFO, "shuffling the inputs") { |
| std::random_device rdev; |
| std::mt19937 gen(rdev()); |
| std::shuffle(all_ints.begin(), all_ints.end(), gen); |
| } |
| |
| VLOG(1) << "Predicate expects " << expected.size() << " results: " << expected; |
| |
| for (int trial = 0; trial < FLAGS_num_iters; trial++) { |
| vector<IterWithBounds> to_merge; |
| for (const auto& list : all_ints) { |
| unique_ptr<VectorIterator> vec_it(new VectorIterator(list.ints, list.is_deleted, schema)); |
| vec_it->set_block_size(16); |
| vec_it->set_selection_vector(list.sv.get()); |
| unique_ptr<RowwiseIterator> mat_it(NewMaterializingIterator(std::move(vec_it))); |
| IterWithBounds mat_iwb; |
| mat_iwb.iter = std::move(mat_it); |
| vector<IterWithBounds> un_input; |
| un_input.emplace_back(std::move(mat_iwb)); |
| unique_ptr<RowwiseIterator> un_it(NewUnionIterator(std::move(un_input))); |
| IterWithBounds un_iwb; |
| un_iwb.iter = std::move(un_it); |
| if (list.encoded_bounds) { |
| un_iwb.encoded_bounds = list.encoded_bounds; |
| } |
| to_merge.emplace_back(std::move(un_iwb)); |
| } |
| |
| // Setup predicate exclusion |
| ScanSpec spec; |
| spec.AddPredicate(predicate.pred_); |
| LOG(INFO) << "Predicate: " << predicate.pred_.ToString(); |
| |
| LOG_TIMING(INFO, "iterating merged lists") { |
| unique_ptr<RowwiseIterator> merger(NewMergeIterator( |
| MergeIteratorOptions(include_deleted_rows), std::move(to_merge))); |
| ASSERT_OK(merger->Init(&spec)); |
| |
| // The RowBlock is sized to a power of 2 to improve BitmapCopy performance |
| // when copying another RowBlock into it. |
| RowBlockMemory mem; |
| RowBlock dst(&schema, 128, &mem); |
| size_t total_idx = 0; |
| auto expected_iter = expected.cbegin(); |
| while (merger->HasNext()) { |
| ASSERT_OK(merger->NextBlock(&dst)); |
| ASSERT_GT(dst.nrows(), 0) << |
| "if HasNext() returns true, must return some rows"; |
| |
| for (int i = 0; i < dst.nrows(); i++) { |
| if (!dst.selection_vector()->IsRowSelected(i)) { |
| continue; |
| } |
| ASSERT_NE(expected.end(), expected_iter); |
| int64_t expected_key = expected_iter->first; |
| bool expected_is_deleted = expected_iter->second; |
| int64_t row_key = *schema.ExtractColumnFromRow<INT64>(dst.row(i), kValColIdx); |
| ASSERT_GE(row_key, predicate.lower_) << "Yielded integer excluded by predicate"; |
| ASSERT_LT(row_key, predicate.upper_) << "Yielded integer excluded by predicate"; |
| EXPECT_EQ(expected_key, row_key) << "Yielded out of order at idx " << total_idx; |
| bool is_deleted = false; |
| if (include_deleted_rows) { |
| CHECK_NE(Schema::kColumnNotFound, |
| schema.first_is_deleted_virtual_column_idx()); |
| is_deleted = *schema.ExtractColumnFromRow<IS_DELETED>( |
| dst.row(i), schema.first_is_deleted_virtual_column_idx()); |
| EXPECT_EQ(expected_is_deleted, is_deleted) |
| << "Row " << row_key << " has unexpected IS_DELETED value at index " << total_idx; |
| } |
| VLOG(2) << "Observed: val=" << row_key << ", is_deleted=" << is_deleted; |
| VLOG(2) << "Expected: val=" << expected_key << ", is_deleted=" << expected_is_deleted; |
| ++expected_iter; |
| ++total_idx; |
| } |
| } |
| ASSERT_EQ(expected.size(), total_idx); |
| ASSERT_EQ(expected.end(), expected_iter); |
| } |
| } |
| } |
| |
| TEST(TestMergeIterator, TestMerge) { |
| TestIntRangePredicate predicate(0, MathLimits<int64_t>::kMax); |
| NO_FATALS(TestMerge(kIntSchema, predicate)); |
| } |
| |
| TEST(TestMergeIterator, TestMergeNonOverlapping) { |
| TestIntRangePredicate predicate(0, MathLimits<int64_t>::kMax); |
| NO_FATALS(TestMerge(kIntSchema, predicate, /*overlapping_ranges=*/false)); |
| } |
| |
| TEST(TestMergeIterator, TestMergePredicate) { |
| TestIntRangePredicate predicate(0, FLAGS_num_rows / 5); |
| NO_FATALS(TestMerge(kIntSchema, predicate)); |
| } |
| |
| // Regression test for a bug in the merge which would incorrectly |
| // drop a merge input if it received an entirely non-selected block. |
| // This predicate excludes the first half of the rows but accepts the |
| // second half. |
| TEST(TestMergeIterator, TestMergePredicate2) { |
| TestIntRangePredicate predicate(FLAGS_num_rows / 2, MathLimits<int64_t>::kMax); |
| NO_FATALS(TestMerge(kIntSchema, predicate)); |
| } |
| |
| TEST(TestMergeIterator, TestDeDupGhostRows) { |
| TestIntRangePredicate match_all_pred(0, MathLimits<int64_t>::kMax); |
| NO_FATALS(TestMerge(kIntSchemaWithVCol, match_all_pred, |
| /*overlapping_ranges=*/true, |
| /*include_deleted_rows=*/true)); |
| } |
| |
| // Test that the MaterializingIterator properly evaluates predicates when they apply |
| // to single columns. |
| TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) { |
| ScanSpec spec; |
| TestIntRangePredicate pred1(20, 30); |
| spec.AddPredicate(pred1.pred_); |
| LOG(INFO) << "Predicate: " << pred1.pred_.ToString(); |
| |
| vector<int64_t> ints(100); |
| for (int i = 0; i < 100; i++) { |
| ints[i] = i; |
| } |
| |
| unique_ptr<VectorIterator> colwise(new VectorIterator(ints)); |
| unique_ptr<RowwiseIterator> materializing(NewMaterializingIterator(std::move(colwise))); |
| ASSERT_OK(materializing->Init(&spec)); |
| ASSERT_EQ(0, spec.predicates().size()) << "Iterator should have pushed down predicate"; |
| |
| RowBlockMemory mem(1024); |
| RowBlock dst(&kIntSchema, 100, &mem); |
| ASSERT_OK(materializing->NextBlock(&dst)); |
| ASSERT_EQ(dst.nrows(), 100); |
| |
| // Check that the resulting selection vector is correct (rows 20-29 selected) |
| ASSERT_EQ(10, dst.selection_vector()->CountSelected()); |
| ASSERT_FALSE(dst.selection_vector()->IsRowSelected(0)); |
| ASSERT_TRUE(dst.selection_vector()->IsRowSelected(20)); |
| ASSERT_TRUE(dst.selection_vector()->IsRowSelected(29)); |
| ASSERT_FALSE(dst.selection_vector()->IsRowSelected(30)); |
| } |
| |
| // Test that PredicateEvaluatingIterator will properly evaluate predicates on its |
| // input. |
| TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) { |
| ScanSpec spec; |
| TestIntRangePredicate pred1(20, 30); |
| spec.AddPredicate(pred1.pred_); |
| LOG(INFO) << "Predicate: " << pred1.pred_.ToString(); |
| |
| vector<int64_t> ints(100); |
| for (int i = 0; i < 100; i++) { |
| ints[i] = i; |
| } |
| |
| // Set up a MaterializingIterator with pushdown disabled, so that the |
| // PredicateEvaluatingIterator will wrap it and do evaluation. |
| unique_ptr<VectorIterator> colwise(new VectorIterator(ints)); |
| google::FlagSaver saver; |
| FLAGS_materializing_iterator_do_pushdown = false; |
| unique_ptr<RowwiseIterator> materializing( |
| NewMaterializingIterator(std::move(colwise))); |
| |
| // Wrap it in another iterator to do the evaluation |
| const RowwiseIterator* mat_iter_addr = materializing.get(); |
| unique_ptr<RowwiseIterator> outer_iter(std::move(materializing)); |
| ASSERT_OK(InitAndMaybeWrap(&outer_iter, &spec)); |
| |
| ASSERT_NE(reinterpret_cast<uintptr_t>(outer_iter.get()), |
| reinterpret_cast<uintptr_t>(mat_iter_addr)) |
| << "Iterator pointer should differ after wrapping"; |
| |
| ASSERT_EQ(0, spec.predicates().size()) |
| << "Iterator tree should have accepted predicate"; |
| ASSERT_EQ(1, GetIteratorPredicatesForTests(outer_iter).size()) |
| << "Predicate should be evaluated by the outer iterator"; |
| |
| RowBlockMemory mem(1024); |
| RowBlock dst(&kIntSchema, 100, &mem); |
| ASSERT_OK(outer_iter->NextBlock(&dst)); |
| ASSERT_EQ(dst.nrows(), 100); |
| |
| // Check that the resulting selection vector is correct (rows 20-29 selected) |
| ASSERT_EQ(10, dst.selection_vector()->CountSelected()); |
| ASSERT_FALSE(dst.selection_vector()->IsRowSelected(0)); |
| ASSERT_TRUE(dst.selection_vector()->IsRowSelected(20)); |
| ASSERT_TRUE(dst.selection_vector()->IsRowSelected(29)); |
| ASSERT_FALSE(dst.selection_vector()->IsRowSelected(30)); |
| } |
| |
| // Test that PredicateEvaluatingIterator::InitAndMaybeWrap doesn't wrap an underlying |
| // iterator when there are no predicates left. |
| TEST(TestPredicateEvaluatingIterator, TestDontWrapWhenNoPredicates) { |
| ScanSpec spec; |
| |
| unique_ptr<VectorIterator> colwise(new VectorIterator({})); |
| unique_ptr<RowwiseIterator> materializing( |
| NewMaterializingIterator(std::move(colwise))); |
| const RowwiseIterator* mat_iter_addr = materializing.get(); |
| unique_ptr<RowwiseIterator> outer_iter(std::move(materializing)); |
| ASSERT_OK(InitAndMaybeWrap(&outer_iter, &spec)); |
| ASSERT_EQ(reinterpret_cast<uintptr_t>(outer_iter.get()), |
| reinterpret_cast<uintptr_t>(mat_iter_addr)) |
| << "InitAndMaybeWrap should not have wrapped iter"; |
| } |
| |
| // Test row-wise iterator which does nothing. |
| class DummyIterator : public RowwiseIterator { |
| public: |
| |
| explicit DummyIterator(const Schema& schema) |
| : schema_(schema) { |
| } |
| |
| Status Init(ScanSpec* /*spec*/) override { |
| return Status::OK(); |
| } |
| |
| Status NextBlock(RowBlock* /*dst*/) override { |
| LOG(FATAL) << "unimplemented!"; |
| return Status::OK(); |
| } |
| |
| bool HasNext() const override { |
| LOG(FATAL) << "unimplemented!"; |
| return false; |
| } |
| |
| string ToString() const override { |
| return "DummyIterator"; |
| } |
| |
| const Schema& schema() const override { |
| return schema_; |
| } |
| |
| void GetIteratorStats(vector<IteratorStats>* stats) const override { |
| stats->resize(schema().num_columns()); |
| } |
| |
| private: |
| Schema schema_; |
| }; |
| |
| // Verify the vectors of ColumnPredicate are the same. |
| void CheckColumnPredicatesAreEqual(const vector<ColumnPredicate>& expected, |
| const vector<const ColumnPredicate*>& actual) { |
| ASSERT_EQ(expected.size(), actual.size()); |
| for (int i = 0; i < expected.size(); i++) { |
| ASSERT_EQ(expected[i], *actual[i]); |
| } |
| } |
| |
| TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluationOrder) { |
| Schema schema({ ColumnSchema("a_int64", INT64), |
| ColumnSchema("b_int64", INT64), |
| ColumnSchema("c_int32", INT32) }, 3); |
| |
| int64_t zero = 0; |
| int64_t two = 2; |
| auto a_equality = ColumnPredicate::Equality(schema.column(0), &zero); |
| auto b_equality = ColumnPredicate::Equality(schema.column(1), &zero); |
| auto c_equality = ColumnPredicate::Equality(schema.column(2), &zero); |
| auto a_range = ColumnPredicate::Range(schema.column(0), &zero, &two); |
| |
| { // Test that more selective predicates come before others. |
| ScanSpec spec; |
| spec.AddPredicate(a_range); |
| spec.AddPredicate(b_equality); |
| spec.AddPredicate(c_equality); |
| |
| unique_ptr<RowwiseIterator> iter(new DummyIterator(schema)); |
| ASSERT_OK(InitAndMaybeWrap(&iter, &spec)); |
| NO_FATALS(CheckColumnPredicatesAreEqual( |
| vector<ColumnPredicate>({ c_equality, b_equality, a_range }), |
| GetIteratorPredicatesForTests(iter))); |
| } |
| |
| { // Test that smaller columns come before larger ones, and ties are broken by idx. |
| ScanSpec spec; |
| spec.AddPredicate(b_equality); |
| spec.AddPredicate(a_equality); |
| spec.AddPredicate(c_equality); |
| |
| unique_ptr<RowwiseIterator> iter(new DummyIterator(schema)); |
| ASSERT_OK(InitAndMaybeWrap(&iter, &spec)); |
| |
| NO_FATALS(CheckColumnPredicatesAreEqual( |
| vector<ColumnPredicate>({ c_equality, a_equality, b_equality }), |
| GetIteratorPredicatesForTests(iter))); |
| } |
| } |
| |
| // Class to test column predicate effectiveness. |
| class PredicateEffectivenessTest : |
| public KuduTest, |
| public ::testing::WithParamInterface<bool> { |
| public: |
| // For 'all_values' true case, initialize the Bloom filter 'bf' with all values that are |
| // added to the output 'ints' vector. Otherwise insert subset of the values in the Bloom filter. |
| static ColumnPredicate CreateBloomFilterPredicate(bool all_values, BlockBloomFilter* bf, |
| vector<int64_t>* ints) { |
| CHECK_OK(bf->Init(BlockBloomFilter::MinLogSpace(kNumRows, 0.01), FAST_HASH, 0)); |
| ints->resize(kNumRows); |
| |
| for (int64_t i = 0; i < kNumRows; i++) { |
| (*ints)[i] = i; |
| if (all_values || i < kSubsetNums) { |
| bf->Insert(Slice(reinterpret_cast<uint8 *>(&i), sizeof(i))); |
| } |
| } |
| |
| vector<BlockBloomFilter*> bloom_filters; |
| bloom_filters.push_back(bf); |
| auto bf_pred = ColumnPredicate::InBloomFilter(kIntSchema.column(0), bloom_filters, nullptr, |
| nullptr); |
| LOG(INFO) << "Bloom filter predicate: " << bf_pred.ToString(); |
| return bf_pred; |
| } |
| |
| // For 'all_values' true case, verify predicate effectiveness for the iterator 'iter' |
| // when the Bloom filter predicate was initialized with all values being iterated. |
| // Otherwise verify for the case when Bloom filter was initialized with subset of values |
| // being iterated. |
| static void VerifyPredicateEffectiveness(bool all_values, |
| const unique_ptr<RowwiseIterator>& iter) { |
| ASSERT_EQ(1, GetIteratorPredicateEffectivenessCtxForTests(iter).num_predicate_ctxs()) |
| << "Predicate effectiveness contexts must match with number of predicates"; |
| ASSERT_TRUE(GetIteratorPredicateEffectivenessCtxForTests(iter)[0].enabled) |
| << "Predicate must be enabled to begin with"; |
| |
| RowBlockMemory mem; |
| FLAGS_predicate_effectivess_num_skip_blocks = 4; |
| if (all_values) { |
| for (int i = 0; i < kNumRows / kBatchSize; i++) { |
| RowBlock dst(&kIntSchema, kBatchSize, &mem); |
| ASSERT_OK(iter->NextBlock(&dst)); |
| ASSERT_EQ(kBatchSize, dst.nrows()); |
| ASSERT_EQ(kBatchSize, dst.selection_vector()->CountSelected()); |
| // For all values case, the predicate gets disabled on first effectiveness check. |
| if (i >= FLAGS_predicate_effectivess_num_skip_blocks) { |
| ASSERT_FALSE(GetIteratorPredicateEffectivenessCtxForTests(iter)[0].enabled); |
| } |
| } |
| } else { |
| for (int i = 0; i < kNumRows / kBatchSize; i++) { |
| RowBlock dst(&kIntSchema, kBatchSize, &mem); |
| ASSERT_OK(iter->NextBlock(&dst)); |
| ASSERT_EQ(kBatchSize, dst.nrows()); |
| // For subset case, the predicate should never be disabled. |
| ASSERT_TRUE(GetIteratorPredicateEffectivenessCtxForTests(iter)[0].enabled); |
| auto rows_selected = dst.selection_vector()->CountSelected(); |
| if (i == 0) { |
| ASSERT_EQ(kBatchSize, rows_selected); |
| } else { |
| ASSERT_EQ(0, rows_selected); |
| } |
| } |
| } |
| } |
| |
| private: |
| static constexpr int kNumRows = 1000; |
| static constexpr int kSubsetNums = 100; |
| static constexpr int kBatchSize = 100; |
| }; |
| |
| // Despite being a static constexpr integer, this static variable needs explicit definition |
| // outside to avoid linker error because it's used with ASSERT_EQ() macro that binds 1st variable |
| // to const reference. |
| constexpr int PredicateEffectivenessTest::kBatchSize; |
| |
| // Test effectiveness of Bloom filter predicate with PredicateEvaluatingIterator. |
| TEST_P(PredicateEffectivenessTest, PredicateEvaluatingIterator) { |
| // For 'all_values' true case, initialize the Bloom filter with all values that are inserted in |
| // the iterator. This helps test the case of ineffective Bloom filter predicate. |
| // For 'all_values' false case i.e. subset values case, initialize the Bloom filter with a subset |
| // of values inserted in the iterator. This helps test the case of effective Bloom filter |
| // predicate that filters out rows. |
| bool all_values = GetParam(); |
| BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton()); |
| vector<int64_t> ints; |
| auto bf_pred = CreateBloomFilterPredicate(all_values, &bf, &ints); |
| |
| ScanSpec spec; |
| spec.AddPredicate(bf_pred); |
| |
| // Set up a MaterializingIterator with pushdown disabled, so that the |
| // PredicateEvaluatingIterator will wrap it and do evaluation. |
| unique_ptr<VectorIterator> colwise(new VectorIterator(ints)); |
| FLAGS_materializing_iterator_do_pushdown = false; |
| unique_ptr<RowwiseIterator> materializing(NewMaterializingIterator(std::move(colwise))); |
| |
| // Wrap it in another iterator to do the evaluation |
| const RowwiseIterator* mat_iter_addr = materializing.get(); |
| unique_ptr<RowwiseIterator> outer_iter(std::move(materializing)); |
| ASSERT_OK(InitAndMaybeWrap(&outer_iter, &spec)); |
| |
| ASSERT_NE(reinterpret_cast<uintptr_t>(outer_iter.get()), |
| reinterpret_cast<uintptr_t>(mat_iter_addr)) |
| << "Iterator pointer should differ after wrapping"; |
| ASSERT_EQ(0, spec.predicates().size()) << "Iterator tree should have accepted predicate"; |
| ASSERT_EQ(1, GetIteratorPredicatesForTests(outer_iter).size()) |
| << "Predicate should be evaluated by the outer iterator"; |
| VerifyPredicateEffectiveness(all_values, outer_iter); |
| } |
| |
| // Test effectiveness of Bloom filter predicate with MaterializingIterator. |
| TEST_P(PredicateEffectivenessTest, MaterializingIterator) { |
| // For 'all_values' true case, initialize the Bloom filter with all values that are inserted in |
| // the iterator. This helps test the case of ineffective Bloom filter predicate. |
| // For 'all_values' false case i.e. subset values case, initialize the Bloom filter with a subset |
| // of values inserted in the iterator. This helps test the case of effective Bloom filter |
| // predicate that filters out rows. |
| bool all_values = GetParam(); |
| BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton()); |
| vector<int64_t> ints; |
| auto bf_pred = CreateBloomFilterPredicate(all_values, &bf, &ints); |
| |
| ScanSpec spec; |
| spec.AddPredicate(bf_pred); |
| |
| // Setup a materializing iterator with pushdown enabled(default). |
| unique_ptr<VectorIterator> colwise(new VectorIterator(ints)); |
| unique_ptr<RowwiseIterator> mat_iter( |
| NewMaterializingIterator(std::move(colwise))); |
| |
| ASSERT_OK(mat_iter->Init(&spec)); |
| ASSERT_EQ(0, spec.predicates().size()) << "Iterator tree should have accepted predicate"; |
| |
| VerifyPredicateEffectiveness(all_values, mat_iter); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(, PredicateEffectivenessTest, ::testing::Bool()); |
| |
| } // namespace kudu |