blob: 7a46e6eb4f3c59fb69721bd1ee575c37575ec359 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <limits>
#include <memory>
#include <random>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/Expressions.pb.h"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/SortMergeRunOperator.hpp"
#include "relational_operators/SortMergeRunOperatorHelpers.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/BasicColumnStoreValueAccessor.hpp"
#include "storage/CompressedColumnStoreValueAccessor.hpp"
#include "storage/CompressedPackedRowStoreValueAccessor.hpp"
#include "storage/CountedReference.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/InsertDestination.pb.h"
#include "storage/SplitRowStoreValueAccessor.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageBlockLayout.hpp"
#include "storage/StorageManager.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "threading/ThreadIDBasedMap.hpp"
#include "types/IntType.hpp"
#include "types/Type.hpp"
#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/PtrVector.hpp"
#include "utility/SortConfiguration.hpp"
#include "utility/SortConfiguration.pb.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "gtest/gtest.h"
#include "tmb/id_typedefs.h"
#include "tmb/tagged_message.h"
using std::move;
using std::unique_ptr;
using std::vector;
namespace quickstep {
namespace {
constexpr std::size_t kQueryId = 0;
constexpr const std::size_t kOpIndex = 0;
// Helper struct for test tuple that will that will be inserted and sorted.
class TestTuple {
public:
static const int kCol1Bits;
static const int kCol2Bits;
static const int kCol3Bits;
// Intialize tuple based on seed number.
explicit TestTuple(int num)
: tid_(num),
col1_(Bits(num, 3, kCol1Bits)),
col2_(Bits(num, 6, kCol2Bits)),
col3_(Bits(num, 1, kCol3Bits)) {}
int tid_; // Tuple-ID
int col1_; // Col-1
int col2_; // Col-2
int col3_; // Col-3
static inline int Bits(int value, int offset, int length) {
return (value >> offset) & (0xffff >> (16 - length));
}
};
const int TestTuple::kCol1Bits = 5;
const int TestTuple::kCol2Bits = 2;
const int TestTuple::kCol3Bits = 3;
// To simplify testing nullable attributes, we create another meta tuple from
// column values from TestTuple struct. Column 1, 2, 3 are same as TestTuple.
// Columns 4, 5, 6 are same as 1, 2, 3 respectively when non-zero, and null
// otherwise. That is, i^th and (i + 3)^th columns are same when the value is
// non-zero. When the value of i^th column is zero, (i + 3)^th column is NULL.
// null_c{i} captures if column i was NULL in the tuple.
struct TestTupleAttrs {
int c1, c2, c3;
bool null_c4, null_c5, null_c6;
};
// Convert from TestTuple to TestTupleAttrs.
struct TestTupleAttrs TupleToTupleAttr(const Tuple &tuple) {
auto attr = tuple.getAttributeValueVector();
struct TestTupleAttrs out {};
out.c1 = attr[0].getLiteral<int>();
out.c2 = attr[1].getLiteral<int>();
out.c3 = attr[2].getLiteral<int>();
out.null_c4 = attr[3].isNull();
out.null_c5 = attr[4].isNull();
out.null_c6 = attr[5].isNull();
return out;
}
} // namespace
namespace merge_run_operator {
// Test Run class.
class RunTest : public ::testing::Test {
public:
static const tuple_id kNumTuples;
static const tuple_id kNumTuplesPerBlock;
protected:
static const relation_id kTableId = 100;
static const relation_id kResultTableId = kTableId + 1;
static const char kTableName[];
static const char kStoragePath[];
virtual void SetUp() {
// Initialize the TMB, register this thread as sender and receiver for
// appropriate types of messages.
bus_.Initialize();
foreman_client_id_ = bus_.Connect();
bus_.RegisterClientAsReceiver(foreman_client_id_, kCatalogRelationNewBlockMessage);
bus_.RegisterClientAsReceiver(foreman_client_id_, kDataPipelineMessage);
const tmb::client_id worker_thread_client_id = bus_.Connect();
bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage);
bus_.RegisterClientAsSender(worker_thread_client_id, kDataPipelineMessage);
thread_id_map_ = ClientIDMap::Instance();
// Usually the worker thread makes the following call. In this test setup,
// we don't have a worker thread hence we have to explicitly make the call.
thread_id_map_->addValue(worker_thread_client_id);
storage_manager_.reset(new StorageManager(kStoragePath));
table_.reset(new CatalogRelation(nullptr, kTableName, kTableId));
const Type &int_type = IntType::InstanceNonNullable();
table_->addAttribute(new CatalogAttribute(table_.get(), "col-1", int_type));
table_->addAttribute(new CatalogAttribute(table_.get(), "col-2", int_type));
table_->addAttribute(new CatalogAttribute(table_.get(), "col-3", int_type));
col1_ = table_->getAttributeByName("col-1")->getID();
col2_ = table_->getAttributeByName("col-2")->getID();
col3_ = table_->getAttributeByName("col-3")->getID();
table_->setDefaultStorageBlockLayout(StorageBlockLayout::GenerateDefaultLayout(*table_, false));
insert_destination_.reset(
new BlockPoolInsertDestination(*table_,
nullptr,
storage_manager_.get(),
kOpIndex,
0, // dummy query ID.
foreman_client_id_,
&bus_));
}
virtual void TearDown() {
// Usually the worker thread makes the following call. In this test setup,
// we don't have a worker thread hence we have to explicitly make the call.
thread_id_map_->removeValue();
// Drop blocks from relations and InsertDestination.
const vector<block_id> tmp_blocks = insert_destination_->getTouchedBlocks();
for (const block_id block : tmp_blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
const vector<block_id> blocks = table_->getBlocksSnapshot();
for (const block_id block : blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
}
// Helper method to insert test tuples.
Tuple *createTuple(tuple_id id) {
std::vector<TypedValue> values;
values.emplace_back(static_cast<int>(id));
values.emplace_back(static_cast<int>(id));
values.emplace_back(static_cast<int>(id));
return new Tuple(std::move(values));
}
std::unique_ptr<StorageManager> storage_manager_;
std::unique_ptr<CatalogRelation> table_;
std::unique_ptr<InsertDestination> insert_destination_;
attribute_id col1_;
attribute_id col2_;
attribute_id col3_;
merge_run_operator::Run run_;
MessageBusImpl bus_;
tmb::client_id foreman_client_id_;
// This map is needed for InsertDestination and some operators that send
// messages to Foreman directly. To know the reason behind the design of this
// map, see the note in InsertDestination.hpp.
ClientIDMap *thread_id_map_;
};
const char RunTest::kTableName[] = "table";
const char RunTest::kStoragePath[] = "./sort_merge_run_operator_test_data";
const tuple_id RunTest::kNumTuples = 100;
const tuple_id RunTest::kNumTuplesPerBlock = 10;
// Test creating an empty run.
TEST_F(RunTest, RunCreatorEmptyRun) {
{
RunCreator run_creator(&run_, insert_destination_.get());
// Need run_creator to destruct to finalize the output run.
}
EXPECT_EQ(0u, run_.size());
}
// Test creating a run with few tuples.
TEST_F(RunTest, RunCreatorFewTuples) {
tuple_id max_tuples = 10;
{
RunCreator run_creator(&run_, insert_destination_.get());
std::unique_ptr<Tuple> tuple;
for (tuple_id i = 0; i < max_tuples; ++i) {
tuple.reset(createTuple(i));
run_creator.appendTuple(*tuple);
}
EXPECT_EQ(1u, run_.size());
// Need run_creator to destruct to finalize the output run.
}
BlockReference block(storage_manager_->getBlock(run_.back(), *table_));
std::unique_ptr<ValueAccessor> accessor(
block->getTupleStorageSubBlock().createValueAccessor());
tuple_id num_tuples = 0;
InvokeOnValueAccessorNotAdapter(
accessor.get(),
[&](auto *accessor) -> void { // NOLINT(build/c++11)
while (accessor->next()) {
const tuple_id current_num_tuples = num_tuples;
EXPECT_EQ(static_cast<int>(current_num_tuples),
accessor->getTypedValue(col1_).template getLiteral<int>());
++num_tuples;
}
});
EXPECT_EQ(max_tuples, num_tuples);
}
// Test creating a run with multiple blocks.
TEST_F(RunTest, RunCreatorMultipleBlocks) {
std::size_t max_blocks = 3;
tuple_id max_tuples = 0;
{
RunCreator run_creator(&run_, insert_destination_.get());
std::unique_ptr<Tuple> tuple;
std::size_t blocks = 0;
while (blocks < max_blocks) {
tuple.reset(createTuple(max_tuples));
if (run_creator.appendTuple(*tuple)) {
++blocks;
}
++max_tuples;
}
EXPECT_EQ(max_blocks + 1, run_.size());
// Need run_creator to destruct to finalize the output run.
}
tuple_id num_tuples = 0;
for (block_id bid : run_) {
BlockReference block(storage_manager_->getBlock(bid, *table_));
std::unique_ptr<ValueAccessor> accessor(
block->getTupleStorageSubBlock().createValueAccessor());
InvokeOnValueAccessorNotAdapter(
accessor.get(),
[&](auto *accessor) -> void { // NOLINT(build/c++11)
while (accessor->next()) {
const tuple_id current_num_tuples = num_tuples;
EXPECT_EQ(static_cast<int>(current_num_tuples),
accessor->getTypedValue(col1_).template getLiteral<int>());
++num_tuples;
}
}); // NOLINT(whitespace/parens)
}
EXPECT_EQ(max_tuples, num_tuples);
}
// Test run iterator to see if tuples are accessed in the same order as stored.
TEST_F(RunTest, IterateTuples) {
std::unique_ptr<Tuple> tuple;
MutableBlockReference storage_block;
for (tuple_id i = 0; i < kNumTuples; i += kNumTuplesPerBlock) {
// Create block
block_id block_id = storage_manager_->createBlock(*table_, table_->getDefaultStorageBlockLayout());
storage_block = storage_manager_->getBlockMutable(block_id, *table_);
table_->addBlock(block_id);
run_.push_back(block_id);
// Insert tuples
tuple_id block_bound = i + kNumTuplesPerBlock < kNumTuples
? i + kNumTuplesPerBlock
: kNumTuples;
for (tuple_id tid = i; tid < block_bound; ++tid) {
tuple.reset(createTuple(tid));
EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
}
storage_block->rebuild();
// Create empty block
block_id = storage_manager_->createBlock(*table_, table_->getDefaultStorageBlockLayout());
storage_block = storage_manager_->getBlockMutable(block_id, *table_);
table_->addBlock(block_id);
run_.push_back(block_id);
storage_block->rebuild();
}
BlockReference block(storage_manager_->getBlock(run_.front(), *table_));
std::unique_ptr<ValueAccessor> dummy_accessor(
block->getTupleStorageSubBlock().createValueAccessor());
tuple_id num_tuples = 0;
InvokeOnValueAccessorNotAdapter(
dummy_accessor.get(),
[&](auto *dummy_accessor) -> void { // NOLINT(build/c++11)
RunIterator<
typename std::remove_reference<decltype(*dummy_accessor)>::type>
run_iterator(run_, storage_manager_.get(), *table_);
while (run_iterator.next()) {
const tuple_id current_num_tuples = num_tuples;
EXPECT_EQ(static_cast<int>(current_num_tuples),
run_iterator.getValueAccessor()
->getTypedValue(col1_).template getLiteral<int>());
++num_tuples;
}
});
EXPECT_EQ(kNumTuples, num_tuples);
}
// Test RunMerger class.
class RunMergerTest : public ::testing::Test {
public:
static const std::size_t kNumTuplesPerBlock;
static const tuple_id kNumBlocksPerRun;
static const std::size_t kNumRuns;
static const tuple_id kNumTuples;
static const std::size_t kTopK;
protected:
static const relation_id kTableId = 100;
static const relation_id kResultTableId = kTableId + 1;
static const char kTableName[];
static const char kStoragePath[];
virtual void SetUp() {
// Initialize the TMB, register this thread as sender and receiver for
// appropriate types of messages.
bus_.Initialize();
foreman_client_id_ = bus_.Connect();
bus_.RegisterClientAsReceiver(foreman_client_id_, kDataPipelineMessage);
bus_.RegisterClientAsReceiver(foreman_client_id_, kCatalogRelationNewBlockMessage);
const tmb::client_id worker_thread_client_id = bus_.Connect();
bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage);
bus_.RegisterClientAsSender(worker_thread_client_id, kDataPipelineMessage);
thread_id_map_ = ClientIDMap::Instance();
// Usually the worker thread makes the following call. In this test setup,
// we don't have a worker thread hence we have to explicitly make the call.
thread_id_map_->addValue(worker_thread_client_id);
storage_manager_.reset(new StorageManager(kStoragePath));
// Initialize table attributes.
table_.reset(new CatalogRelation(nullptr, kTableName, kTableId));
const Type &int_type = IntType::InstanceNonNullable();
const Type &null_int_type = IntType::InstanceNullable();
table_->addAttribute(new CatalogAttribute(table_.get(), "col-1", int_type));
table_->addAttribute(new CatalogAttribute(table_.get(), "col-2", int_type));
table_->addAttribute(new CatalogAttribute(table_.get(), "col-3", int_type));
table_->addAttribute(new CatalogAttribute(table_.get(), "null-col-1", null_int_type));
table_->addAttribute(new CatalogAttribute(table_.get(), "null-col-2", null_int_type));
table_->addAttribute(new CatalogAttribute(table_.get(), "null-col-3", null_int_type));
table_->addAttribute(new CatalogAttribute(table_.get(), "tid", null_int_type));
col1_ = table_->getAttributeByName("col-1")->getID();
col2_ = table_->getAttributeByName("col-2")->getID();
col3_ = table_->getAttributeByName("col-3")->getID();
null_col1_ = table_->getAttributeByName("null-col-1")->getID();
null_col2_ = table_->getAttributeByName("null-col-2")->getID();
null_col3_ = table_->getAttributeByName("null-col-3")->getID();
tid_col_ = table_->getAttributeByName("tid")->getID();
table_->setDefaultStorageBlockLayout(StorageBlockLayout::GenerateDefaultLayout(*table_, false));
insert_destination_.reset(
new BlockPoolInsertDestination(*table_,
nullptr,
storage_manager_.get(),
kOpIndex,
0, // dummy query ID.
foreman_client_id_,
&bus_));
}
virtual void TearDown() {
// Usually the worker thread makes the following call. In this test setup,
// we don't have a worker thread hence we have to explicitly make the call.
thread_id_map_->removeValue();
// Drop blocks from relations and InsertDestination.
const vector<block_id> tmp_blocks = insert_destination_->getTouchedBlocks();
for (const block_id block : tmp_blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
const vector<block_id> blocks = table_->getBlocksSnapshot();
for (const block_id block : blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
}
// Helper method to create test tuples.
Tuple *createTuple(tuple_id id) {
std::vector<TypedValue> values;
TestTuple tuple(id);
values.emplace_back(static_cast<int>(tuple.col1_));
values.emplace_back(static_cast<int>(tuple.col2_));
values.emplace_back(static_cast<int>(tuple.col3_));
if (!tuple.col1_) {
values.emplace_back(kInt);
} else {
values.emplace_back(static_cast<int>(tuple.col1_));
}
if (!tuple.col2_) {
values.emplace_back(kInt);
} else {
values.emplace_back(static_cast<int>(tuple.col2_));
}
if (!tuple.col3_) {
values.emplace_back(kInt);
} else {
values.emplace_back(static_cast<int>(tuple.col3_));
}
values.emplace_back(static_cast<int>(tuple.tid_));
return new Tuple(std::move(values));
}
// Method to runs which are sorted based on supplied Tuple comparator.
template <typename ComparatorT>
void createRuns(const ComparatorT &comparator, bool maintain_top_k = false) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dist(0, (1 << 8) - 1);
std::unique_ptr<Tuple> tuple;
std::vector<TestTuple> top_k_tuples;
MutableBlockReference storage_block;
auto sort_comparator =
[this, comparator](const TestTuple &l, const TestTuple &r) -> bool {
std::unique_ptr<Tuple> left(createTuple(l.tid_)),
right(createTuple(r.tid_));
return comparator(*left, *right);
};
input_runs_.clear();
for (std::size_t run_id = 0; run_id < kNumRuns; ++run_id) {
std::vector<TestTuple> tuples;
input_runs_.emplace_back();
for (std::size_t i = 0; i < kNumBlocksPerRun * kNumTuplesPerBlock; ++i) {
tuples.emplace_back(dist(gen));
}
// Sort tuples before creating the run.
std::sort(tuples.begin(), tuples.end(), sort_comparator);
// Maintain top-K to test top-K output after merge.
if (maintain_top_k) {
top_k_tuples.insert(top_k_tuples.end(),
tuples.begin(),
tuples.begin() + std::min(tuples.size(), kTopK));
std::partial_sort(
top_k_tuples.begin(),
top_k_tuples.begin() + std::min(top_k_tuples.size(), kTopK),
top_k_tuples.end(),
sort_comparator);
if (top_k_tuples.size() > kTopK) {
top_k_tuples.erase(top_k_tuples.begin() + kTopK, top_k_tuples.end());
}
}
// Insert sorted tuples in run.
for (std::size_t id = 0; id < kNumBlocksPerRun; ++id) {
std::string run_name = "run[" + std::to_string(run_id) + "]";
// Create block.
block_id block_id = storage_manager_->createBlock(*table_, table_->getDefaultStorageBlockLayout());
storage_block = storage_manager_->getBlockMutable(block_id, *table_);
table_->addBlock(block_id);
input_runs_.back().push_back(block_id);
for (std::size_t tid = 0; tid < kNumTuplesPerBlock; ++tid) {
// Insert tuples.
tuple.reset(createTuple(tuples[id * kNumTuplesPerBlock + tid].tid_));
printTuple(run_name.c_str(), *tuple);
EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
}
storage_block->rebuild();
}
}
if (maintain_top_k) {
// Last expected ORDER BY key of the sort.
last_expected_tuple_.reset(createTuple(top_k_tuples.back().tid_));
}
// Insert empty run.
input_runs_.emplace_back();
}
// Debug print method.
void printTuple(const char *prefix, const Tuple &tuple) {
if (VLOG_IS_ON(2)) {
std::ostringstream out;
out << prefix << ' ';
CatalogRelation::const_iterator attr_it = table_->begin();
Tuple::const_iterator value_it = tuple.begin();
for (; attr_it != table_->end(); ++attr_it, ++value_it) {
if (value_it->isNull()) {
out << "NULL";
} else {
out << attr_it->getType().printValueToString(*value_it);
}
out << '|';
}
out << '\n';
VLOG(2) << out.str();
}
}
// Merge runs.
void mergeRuns(const std::vector<attribute_id> &attrs,
vector<bool> &&ordering,
vector<bool> &&null_ordering,
const std::size_t top_k = 0) {
PtrVector<Scalar> order_by;
for (const attribute_id attr : attrs) {
order_by.push_back(new ScalarAttribute(*table_->getAttributeById(attr)));
}
SortConfiguration sort_config(order_by, move(ordering), move(null_ordering));
RunMerger merge(sort_config,
std::move(input_runs_),
top_k,
*table_,
insert_destination_.get(),
0,
storage_manager_.get());
merge.doMerge();
output_run_ = std::move(*merge.getOutputRunMutable());
EXPECT_GT(output_run_.size(), 0u);
}
// Check if the merged run is sorted.
template <typename ComparatorT>
void checkOutputRun(const ComparatorT comparator,
const tuple_id expected_num_tuples = kNumTuples) {
BlockReference block(
storage_manager_->getBlock(output_run_.front(), *table_));
std::unique_ptr<ValueAccessor> dummy_accessor(
block->getTupleStorageSubBlock().createValueAccessor());
tuple_id num_tuples = 0;
std::unique_ptr<Tuple> prev, current;
InvokeOnValueAccessorNotAdapter(
dummy_accessor.get(),
[&](auto *dummy_accessor) -> void { // NOLINT(build/c++11)
RunIterator<
typename std::remove_reference<decltype(*dummy_accessor)>::type>
iterator(output_run_, storage_manager_.get(), *table_);
while (iterator.next()) {
prev = std::move(current);
current.reset(iterator.getValueAccessor()->getTuple());
++num_tuples;
if (num_tuples > 1) {
EXPECT_FALSE(comparator(*current, *prev));
}
std::unique_ptr<Tuple> tuple(iterator.getValueAccessor()->getTuple());
this->printTuple(">", *tuple);
}
});
EXPECT_EQ(expected_num_tuples, num_tuples);
last_actual_tuple_ = std::move(current);
}
// Check attribute value of last expected tuple and last output tuple.
void checkLastTupleAttrValueIsEqual(const attribute_id id) {
printTuple("Expected:", *last_expected_tuple_);
printTuple("Actual: ", *last_actual_tuple_);
EXPECT_EQ(last_expected_tuple_->getAttributeValue(id).isNull(),
last_actual_tuple_->getAttributeValue(id).isNull());
if (!last_expected_tuple_->getAttributeValue(id).isNull() &&
!last_actual_tuple_->getAttributeValue(id).isNull()) {
EXPECT_TRUE(last_expected_tuple_->getAttributeValue(id).fastEqualCheck(
last_actual_tuple_->getAttributeValue(id)))
<< "Expected: "
<< last_expected_tuple_->getAttributeValue(id).getLiteral<int>()
<< "; Actual: "
<< last_actual_tuple_->getAttributeValue(id).getLiteral<int>();
}
}
std::unique_ptr<StorageManager> storage_manager_;
std::unique_ptr<CatalogRelation> table_;
std::unique_ptr<InsertDestination> insert_destination_;
attribute_id col1_;
attribute_id col2_;
attribute_id col3_;
attribute_id null_col1_;
attribute_id null_col2_;
attribute_id null_col3_;
attribute_id tid_col_;
merge_run_operator::Run output_run_;
std::vector<merge_run_operator::Run> input_runs_;
std::unique_ptr<Tuple> last_actual_tuple_;
std::unique_ptr<Tuple> last_expected_tuple_; // This is only populated for top-k tests.
MessageBusImpl bus_;
tmb::client_id foreman_client_id_;
// This map is needed for InsertDestination and some operators that send
// messages to Foreman directly. To know the reason behind the design of this
// map, see the note in InsertDestination.hpp.
ClientIDMap *thread_id_map_;
};
const char RunMergerTest::kTableName[] = "table";
const char RunMergerTest::kStoragePath[] = "./sort_merge_run_operator_test_data";
const std::size_t RunMergerTest::kNumTuplesPerBlock = 10;
const tuple_id RunMergerTest::kNumBlocksPerRun = 10;
const std::size_t RunMergerTest::kNumRuns = 10;
const tuple_id RunMergerTest::kNumTuples =
RunMergerTest::kNumTuplesPerBlock * RunMergerTest::kNumBlocksPerRun * RunMergerTest::kNumRuns;
const std::size_t RunMergerTest::kTopK = 5;
namespace {
// Comparator for col-1 ASC.
auto kCol1AscComparator = [](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
if (l.c1 < r.c1) {
return true;
}
return false;
};
// Comparator for col-1 DESC.
auto kCol1DescComparator = [](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
if (l.c1 > r.c1) {
return true;
}
return false;
};
// Comparator for null-col-1 ASC NULLS LAST.
auto kNullLastCol1AscComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
l.c1 = l.null_c4 ? std::numeric_limits<int>::max() : l.c1;
r.c1 = r.null_c4 ? std::numeric_limits<int>::max() : r.c1;
if (l.c1 < r.c1) {
return true;
}
return false;
};
// Comparator for null-col-1 ASC NULLS FIRST.
auto kNullFirstCol1AscComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
l.c1 = l.null_c4 ? std::numeric_limits<int>::min() : l.c1;
r.c1 = r.null_c4 ? std::numeric_limits<int>::min() : r.c1;
if (l.c1 < r.c1) {
return true;
}
return false;
};
// Comparator for null-col-1 DESC NULLS LAST.
auto kNullLastCol1DescComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
l.c1 = l.null_c4 ? std::numeric_limits<int>::min() : l.c1;
r.c1 = r.null_c4 ? std::numeric_limits<int>::min() : r.c1;
if (l.c1 > r.c1) {
return true;
}
return false;
};
// Comparator for null-col-1 DESC NULLS FIRST.
auto kNullFirstCol1DescComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
l.c1 = l.null_c4 ? std::numeric_limits<int>::max() : l.c1;
r.c1 = r.null_c4 ? std::numeric_limits<int>::max() : r.c1;
if (l.c1 > r.c1) {
return true;
}
return false;
};
// Comparator for col-1 ASC, col-2 ASC, col-3 ASC.
auto kCol1Col2Col3AscComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
if (l.c1 < r.c1) {
return true;
} else if (l.c1 == r.c1) {
if (l.c2 < r.c2) {
return true;
} else if (l.c2 == r.c2) {
if (l.c3 < r.c3) {
return true;
}
}
}
return false;
};
// Comparator for col-1 DESC, col-2 DESC, col-3 DESC.
auto kCol1Col2Col3DescComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
if (l.c1 > r.c1) {
return true;
} else if (l.c1 == r.c1) {
if (l.c2 > r.c2) {
return true;
} else if (l.c2 == r.c2) {
if (l.c3 > r.c3) {
return true;
}
}
}
return false;
};
// Comparator for col-1 ASC NULLS LAST, col-2 ASC NULLS LAST, col-3 ASC NULLS
// LAST.
auto kNullLastCol1Col2Col3AscComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
l.c1 = l.null_c4 ? std::numeric_limits<int>::max() : l.c1;
l.c2 = l.null_c5 ? std::numeric_limits<int>::max() : l.c2;
l.c3 = l.null_c6 ? std::numeric_limits<int>::max() : l.c3;
r.c1 = r.null_c4 ? std::numeric_limits<int>::max() : r.c1;
r.c2 = r.null_c5 ? std::numeric_limits<int>::max() : r.c2;
r.c3 = r.null_c6 ? std::numeric_limits<int>::max() : r.c3;
if (l.c1 < r.c1) {
return true;
} else if (l.c1 == r.c1) {
if (l.c2 < r.c2) {
return true;
} else if (l.c2 == r.c2) {
if (l.c3 < r.c3) {
return true;
}
}
}
return false;
};
// Comparator for col-1 DESC NULLS LAST, col-2 DESC NULLS LAST, col-3 DESC NULLS
// LAST.
auto kNullLastCol1Col2Col3DescComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
l.c1 = l.null_c4 ? std::numeric_limits<int>::min() : l.c1;
l.c2 = l.null_c5 ? std::numeric_limits<int>::min() : l.c2;
l.c3 = l.null_c6 ? std::numeric_limits<int>::min() : l.c3;
r.c1 = r.null_c4 ? std::numeric_limits<int>::min() : r.c1;
r.c2 = r.null_c5 ? std::numeric_limits<int>::min() : r.c2;
r.c3 = r.null_c6 ? std::numeric_limits<int>::min() : r.c3;
if (l.c1 > r.c1) {
return true;
} else if (l.c1 == r.c1) {
if (l.c2 > r.c2) {
return true;
} else if (l.c2 == r.c2) {
if (l.c3 > r.c3) {
return true;
}
}
}
return false;
};
// Comparator for col-1 ASC NULLS FIRST, col-2 ASC NULLS FIRST, col-3 ASC NULLS
// FIRST.
auto kNullFirstCol1Col2Col3AscComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
l.c1 = l.null_c4 ? std::numeric_limits<int>::min() : l.c1;
l.c2 = l.null_c5 ? std::numeric_limits<int>::min() : l.c2;
l.c3 = l.null_c6 ? std::numeric_limits<int>::min() : l.c3;
r.c1 = r.null_c4 ? std::numeric_limits<int>::min() : r.c1;
r.c2 = r.null_c5 ? std::numeric_limits<int>::min() : r.c2;
r.c3 = r.null_c6 ? std::numeric_limits<int>::min() : r.c3;
if (l.c1 < r.c1) {
return true;
} else if (l.c1 == r.c1) {
if (l.c2 < r.c2) {
return true;
} else if (l.c2 == r.c2) {
if (l.c3 < r.c3) {
return true;
}
}
}
return false;
};
// Comparator for col-1 DESC NULLS FIRST, col-2 DESC NULLS FIRST, col-3 DESC
// NULLS FIRST.
auto kNullFirstCol1Col2Col3DescComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
l.c1 = l.null_c4 ? std::numeric_limits<int>::max() : l.c1;
l.c2 = l.null_c5 ? std::numeric_limits<int>::max() : l.c2;
l.c3 = l.null_c6 ? std::numeric_limits<int>::max() : l.c3;
r.c1 = r.null_c4 ? std::numeric_limits<int>::max() : r.c1;
r.c2 = r.null_c5 ? std::numeric_limits<int>::max() : r.c2;
r.c3 = r.null_c6 ? std::numeric_limits<int>::max() : r.c3;
if (l.c1 > r.c1) {
return true;
} else if (l.c1 == r.c1) {
if (l.c2 > r.c2) {
return true;
} else if (l.c2 == r.c2) {
if (l.c3 > r.c3) {
return true;
}
}
}
return false;
};
// Comparator for col-1 ASC NULLS FIRST, col-2 DESC NULLS LAST, col-3 ASC NULLS
// LAST.
auto kMixedNullFLLCol1Col2Col3Comparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
l.c1 = l.null_c4 ? std::numeric_limits<int>::min() : l.c1;
l.c2 = l.null_c5 ? std::numeric_limits<int>::min() : l.c2;
l.c3 = l.null_c6 ? std::numeric_limits<int>::max() : l.c3;
r.c1 = r.null_c4 ? std::numeric_limits<int>::min() : r.c1;
r.c2 = r.null_c5 ? std::numeric_limits<int>::min() : r.c2;
r.c3 = r.null_c6 ? std::numeric_limits<int>::max() : r.c3;
if (l.c1 < r.c1) {
return true;
} else if (l.c1 == r.c1) {
if (l.c2 > r.c2) {
return true;
} else if (l.c2 == r.c2) {
if (l.c3 < r.c3) {
return true;
}
}
}
return false;
};
} // namespace
TEST_F(RunMergerTest, 1Column_NonNull_Asc) {
std::vector<attribute_id> attrs{col1_};
std::vector<bool> ordering{kSortAscending};
std::vector<bool> null_ordering{kSortNullLast};
createRuns(kCol1AscComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kCol1AscComparator);
}
TEST_F(RunMergerTest, 1Column_NonNull_Asc_TopK) {
std::vector<attribute_id> attrs{col1_};
std::vector<bool> ordering{kSortAscending};
std::vector<bool> null_ordering{kSortNullLast};
createRuns(kCol1AscComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kCol1AscComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(RunMergerTest, 1Column_NonNull_Desc) {
std::vector<attribute_id> attrs{col1_};
std::vector<bool> ordering{kSortDescending};
std::vector<bool> null_ordering{kSortNullLast};
createRuns(kCol1DescComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kCol1DescComparator);
}
TEST_F(RunMergerTest, 1Column_NonNull_Desc_TopK) {
std::vector<attribute_id> attrs{col1_};
std::vector<bool> ordering{kSortDescending};
std::vector<bool> null_ordering{kSortNullLast};
createRuns(kCol1DescComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kCol1DescComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(RunMergerTest, 1Column_NullLast_Asc) {
std::vector<attribute_id> attrs{null_col1_};
std::vector<bool> ordering{kSortAscending};
std::vector<bool> null_ordering{kSortNullLast};
createRuns(kNullLastCol1AscComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kNullLastCol1AscComparator);
}
TEST_F(RunMergerTest, 1Column_NullLast_Asc_TopK) {
std::vector<attribute_id> attrs{null_col1_};
std::vector<bool> ordering{kSortAscending};
std::vector<bool> null_ordering{kSortNullLast};
createRuns(kNullLastCol1AscComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kNullLastCol1AscComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(null_col1_);
}
TEST_F(RunMergerTest, 1Column_NullFirst_Asc) {
std::vector<attribute_id> attrs{null_col1_};
std::vector<bool> ordering{kSortAscending};
std::vector<bool> null_ordering{kSortNullFirst};
createRuns(kNullFirstCol1AscComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kNullFirstCol1AscComparator);
}
TEST_F(RunMergerTest, 1Column_NullFirst_Asc_TopK) {
std::vector<attribute_id> attrs{null_col1_};
std::vector<bool> ordering{kSortAscending};
std::vector<bool> null_ordering{kSortNullFirst};
createRuns(kNullFirstCol1AscComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kNullFirstCol1AscComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(null_col1_);
}
TEST_F(RunMergerTest, 1Column_NullLast_Desc) {
std::vector<attribute_id> attrs{null_col1_};
std::vector<bool> ordering{kSortDescending};
std::vector<bool> null_ordering{kSortNullLast};
createRuns(kNullLastCol1DescComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kNullLastCol1DescComparator);
}
TEST_F(RunMergerTest, 1Column_NullLast_Desc_TopK) {
std::vector<attribute_id> attrs{null_col1_};
std::vector<bool> ordering{kSortDescending};
std::vector<bool> null_ordering{kSortNullLast};
createRuns(kNullLastCol1DescComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kNullLastCol1DescComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(null_col1_);
}
TEST_F(RunMergerTest, 1Column_NullFirst_Desc) {
std::vector<attribute_id> attrs{null_col1_};
std::vector<bool> ordering{kSortDescending};
std::vector<bool> null_ordering{kSortNullFirst};
createRuns(kNullFirstCol1DescComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kNullFirstCol1DescComparator);
}
TEST_F(RunMergerTest, 1Column_NullFirst_Desc_TopK) {
std::vector<attribute_id> attrs{null_col1_};
std::vector<bool> ordering{kSortDescending};
std::vector<bool> null_ordering{kSortNullFirst};
createRuns(kNullFirstCol1DescComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kNullFirstCol1DescComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(null_col1_);
}
TEST_F(RunMergerTest, 3Column_NonNull_Asc) {
std::vector<attribute_id> attrs{col1_, col2_, col3_};
std::vector<bool> ordering{kSortAscending, kSortAscending, kSortAscending};
std::vector<bool> null_ordering{kSortNullLast, kSortNullLast, kSortNullLast};
createRuns(kCol1Col2Col3AscComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kCol1Col2Col3AscComparator);
}
TEST_F(RunMergerTest, 3Column_NonNull_Asc_TopK) {
std::vector<attribute_id> attrs{col1_, col2_, col3_};
std::vector<bool> ordering{kSortAscending, kSortAscending, kSortAscending};
std::vector<bool> null_ordering{kSortNullLast, kSortNullLast, kSortNullLast};
createRuns(kCol1Col2Col3AscComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kCol1Col2Col3AscComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(col1_);
checkLastTupleAttrValueIsEqual(col2_);
checkLastTupleAttrValueIsEqual(col3_);
}
TEST_F(RunMergerTest, 3Column_NonNull_Desc) {
std::vector<attribute_id> attrs{col1_, col2_, col3_};
std::vector<bool> ordering{kSortDescending, kSortDescending, kSortDescending};
std::vector<bool> null_ordering{kSortNullLast, kSortNullLast, kSortNullLast};
createRuns(kCol1Col2Col3DescComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kCol1Col2Col3DescComparator);
}
TEST_F(RunMergerTest, 3Column_NonNull_Desc_TopK) {
std::vector<attribute_id> attrs{col1_, col2_, col3_};
std::vector<bool> ordering{kSortDescending, kSortDescending, kSortDescending};
std::vector<bool> null_ordering{kSortNullLast, kSortNullLast, kSortNullLast};
createRuns(kCol1Col2Col3DescComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kCol1Col2Col3DescComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(col1_);
checkLastTupleAttrValueIsEqual(col2_);
checkLastTupleAttrValueIsEqual(col3_);
}
TEST_F(RunMergerTest, 3Column_NullLast_Asc) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortAscending, kSortAscending, kSortAscending};
std::vector<bool> null_ordering{kSortNullLast, kSortNullLast, kSortNullLast};
createRuns(kNullLastCol1Col2Col3AscComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kNullLastCol1Col2Col3AscComparator);
}
TEST_F(RunMergerTest, 3Column_NullLast_Asc_TopK) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortAscending, kSortAscending, kSortAscending};
std::vector<bool> null_ordering{kSortNullLast, kSortNullLast, kSortNullLast};
createRuns(kNullLastCol1Col2Col3AscComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kNullLastCol1Col2Col3AscComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(null_col1_);
checkLastTupleAttrValueIsEqual(null_col2_);
checkLastTupleAttrValueIsEqual(null_col3_);
}
TEST_F(RunMergerTest, 3Column_NullLast_Desc) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortDescending, kSortDescending, kSortDescending};
std::vector<bool> null_ordering{kSortNullLast, kSortNullLast, kSortNullLast};
createRuns(kNullLastCol1Col2Col3DescComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kNullLastCol1Col2Col3DescComparator);
}
TEST_F(RunMergerTest, 3Column_NullLast_Desc_TopK) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortDescending, kSortDescending, kSortDescending};
std::vector<bool> null_ordering{kSortNullLast, kSortNullLast, kSortNullLast};
createRuns(kNullLastCol1Col2Col3DescComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kNullLastCol1Col2Col3DescComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(null_col1_);
checkLastTupleAttrValueIsEqual(null_col2_);
checkLastTupleAttrValueIsEqual(null_col3_);
}
TEST_F(RunMergerTest, 3Column_NullFirst_Asc) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortAscending, kSortAscending, kSortAscending};
std::vector<bool> null_ordering{kSortNullFirst, kSortNullFirst, kSortNullFirst};
createRuns(kNullFirstCol1Col2Col3AscComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kNullFirstCol1Col2Col3AscComparator);
}
TEST_F(RunMergerTest, 3Column_NullFirst_Asc_TopK) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortAscending, kSortAscending, kSortAscending};
std::vector<bool> null_ordering{kSortNullFirst, kSortNullFirst, kSortNullFirst};
createRuns(kNullFirstCol1Col2Col3AscComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kNullFirstCol1Col2Col3AscComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(null_col1_);
checkLastTupleAttrValueIsEqual(null_col2_);
checkLastTupleAttrValueIsEqual(null_col3_);
}
TEST_F(RunMergerTest, 3Column_NullFirst_Desc) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortDescending, kSortDescending, kSortDescending};
std::vector<bool> null_ordering{kSortNullFirst, kSortNullFirst, kSortNullFirst};
createRuns(kNullFirstCol1Col2Col3DescComparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kNullFirstCol1Col2Col3DescComparator);
}
TEST_F(RunMergerTest, 3Column_NullFirst_Desc_TopK) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortDescending, kSortDescending, kSortDescending};
std::vector<bool> null_ordering{kSortNullFirst, kSortNullFirst, kSortNullFirst};
createRuns(kNullFirstCol1Col2Col3DescComparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kNullFirstCol1Col2Col3DescComparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(null_col1_);
checkLastTupleAttrValueIsEqual(null_col2_);
checkLastTupleAttrValueIsEqual(null_col3_);
}
TEST_F(RunMergerTest, 3Column_MixedNullOrdering_MixedOrdering) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortAscending, kSortDescending, kSortAscending};
std::vector<bool> null_ordering{kSortNullFirst, kSortNullLast, kSortNullLast};
createRuns(kMixedNullFLLCol1Col2Col3Comparator);
mergeRuns(attrs, move(ordering), move(null_ordering));
checkOutputRun(kMixedNullFLLCol1Col2Col3Comparator);
}
TEST_F(RunMergerTest, 3Column_MixedNullOrdering_MixedOrdering_TopK) {
std::vector<attribute_id> attrs{null_col1_, null_col2_, null_col3_};
std::vector<bool> ordering{kSortAscending, kSortDescending, kSortAscending};
std::vector<bool> null_ordering{kSortNullFirst, kSortNullLast, kSortNullLast};
createRuns(kMixedNullFLLCol1Col2Col3Comparator, true);
mergeRuns(attrs, move(ordering), move(null_ordering), RunMergerTest::kTopK);
checkOutputRun(kMixedNullFLLCol1Col2Col3Comparator, RunMergerTest::kTopK);
checkLastTupleAttrValueIsEqual(null_col1_);
checkLastTupleAttrValueIsEqual(null_col2_);
checkLastTupleAttrValueIsEqual(null_col3_);
}
} // namespace merge_run_operator
// Test SortMergeRunOperator class. Since RunMerger is tested for various
// sort configurations, we only test if merge trees are computed and executed
// correctly keeping the sort configuration the same.
class SortMergeRunOperatorTest : public ::testing::Test {
protected:
static const relation_id kTableId = 100;
static const relation_id kResultTableId = kTableId + 1;
static const relation_id kRunTableId = kTableId + 2;
static const char kTableName[];
static const char kResultTableName[];
static const char kRunTableName[];
static const char kStoragePath[];
static const char kDatabaseName[];
virtual void SetUp() {
// Initialize the TMB, register this thread as sender and receiver for
// appropriate types of messages.
bus_.Initialize();
const tmb::client_id worker_thread_client_id = bus_.Connect();
bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage);
bus_.RegisterClientAsSender(worker_thread_client_id, kDataPipelineMessage);
bus_.RegisterClientAsSender(worker_thread_client_id, kWorkOrderFeedbackMessage);
thread_id_map_ = ClientIDMap::Instance();
// Usually the worker thread makes the following call. In this test setup,
// we don't have a worker thread hence we have to explicitly make the call.
thread_id_map_->addValue(worker_thread_client_id);
foreman_client_id_ = bus_.Connect();
bus_.RegisterClientAsReceiver(foreman_client_id_, kCatalogRelationNewBlockMessage);
bus_.RegisterClientAsReceiver(foreman_client_id_, kDataPipelineMessage);
bus_.RegisterClientAsReceiver(foreman_client_id_, kWorkOrderFeedbackMessage);
storage_manager_.reset(new StorageManager(kStoragePath));
// Create a database.
db_.reset(new CatalogDatabase(nullptr, kDatabaseName));
// Create input_table_, owned by db_.
input_table_ = createTable(kTableName, kTableId);
db_->addRelation(input_table_);
col1_ = input_table_->getAttributeByName("col-1")->getID();
col2_ = input_table_->getAttributeByName("col-2")->getID();
col3_ = input_table_->getAttributeByName("col-3")->getID();
null_col1_ = input_table_->getAttributeByName("null-col-1")->getID();
null_col2_ = input_table_->getAttributeByName("null-col-2")->getID();
null_col3_ = input_table_->getAttributeByName("null-col-3")->getID();
tid_col_ = input_table_->getAttributeByName("tid")->getID();
// Create result_table_, owned by db_.
result_table_ = createTable(kResultTableName, kResultTableId);
const relation_id result_table_id = db_->addRelation(result_table_);
ASSERT_EQ(col1_, result_table_->getAttributeByName("col-1")->getID());
ASSERT_EQ(col2_, result_table_->getAttributeByName("col-2")->getID());
ASSERT_EQ(col3_, result_table_->getAttributeByName("col-3")->getID());
ASSERT_EQ(null_col1_, result_table_->getAttributeByName("null-col-1")->getID());
ASSERT_EQ(null_col2_, result_table_->getAttributeByName("null-col-2")->getID());
ASSERT_EQ(null_col3_, result_table_->getAttributeByName("null-col-3")->getID());
ASSERT_EQ(tid_col_, result_table_->getAttributeByName("tid")->getID());
query_context_proto_.set_query_id(0); // dummy query ID.
// Setup the InsertDestination proto in the query context proto.
insert_destination_index_ = query_context_proto_.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto_.add_insert_destinations();
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(result_table_id);
insert_destination_proto->set_relational_op_index(kOpIndex);
// Create run_table_, owned by db_.
run_table_ = createTable(kRunTableName, kRunTableId);
const relation_id run_table_id = db_->addRelation(run_table_);
ASSERT_EQ(col1_, run_table_->getAttributeByName("col-1")->getID());
ASSERT_EQ(col2_, run_table_->getAttributeByName("col-2")->getID());
ASSERT_EQ(col3_, run_table_->getAttributeByName("col-3")->getID());
ASSERT_EQ(null_col1_, run_table_->getAttributeByName("null-col-1")->getID());
ASSERT_EQ(null_col2_, run_table_->getAttributeByName("null-col-2")->getID());
ASSERT_EQ(null_col3_, run_table_->getAttributeByName("null-col-3")->getID());
ASSERT_EQ(tid_col_, run_table_->getAttributeByName("tid")->getID());
run_destination_index_ = query_context_proto_.insert_destinations_size();
insert_destination_proto = query_context_proto_.add_insert_destinations();
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(run_table_id);
insert_destination_proto->set_relational_op_index(kOpIndex);
// Set up the QueryContext.
query_context_.reset(new QueryContext(query_context_proto_,
*db_,
storage_manager_.get(),
foreman_client_id_,
&bus_));
}
virtual void TearDown() {
// Usually the worker thread makes the following call. In this test setup,
// we don't have a worker thread hence we have to explicitly make the call.
thread_id_map_->removeValue();
// Drop blocks from relations.
const vector<block_id> input_blocks = input_table_->getBlocksSnapshot();
for (const block_id block : input_blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
const vector<block_id> result_blocks = result_table_->getBlocksSnapshot();
for (const block_id block : result_blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
const vector<block_id> run_blocks = run_table_->getBlocksSnapshot();
for (const block_id block : run_blocks) {
storage_manager_->deleteBlockOrBlobFile(block);
}
}
CatalogRelation *createTable(const char *name, const relation_id rel_id) {
CatalogRelation *table = new CatalogRelation(nullptr, name, rel_id);
const Type &int_type = IntType::InstanceNonNullable();
const Type &null_int_type = IntType::InstanceNullable();
table->addAttribute(new CatalogAttribute(table, "col-1", int_type));
table->addAttribute(new CatalogAttribute(table, "col-2", int_type));
table->addAttribute(new CatalogAttribute(table, "col-3", int_type));
table->addAttribute(new CatalogAttribute(table, "null-col-1", null_int_type));
table->addAttribute(new CatalogAttribute(table, "null-col-2", null_int_type));
table->addAttribute(new CatalogAttribute(table, "null-col-3", null_int_type));
table->addAttribute(new CatalogAttribute(table, "tid", int_type));
table->setDefaultStorageBlockLayout(StorageBlockLayout::GenerateDefaultLayout(*table, false));
return table;
}
// Helper method to create tuple.
Tuple *createTuple(tuple_id id) {
std::vector<TypedValue> values;
TestTuple tuple(id);
values.emplace_back(static_cast<int>(tuple.col1_));
values.emplace_back(static_cast<int>(tuple.col2_));
values.emplace_back(static_cast<int>(tuple.col3_));
if (!tuple.col1_) {
values.emplace_back(kInt);
} else {
values.emplace_back(static_cast<int>(tuple.col1_));
}
if (!tuple.col2_) {
values.emplace_back(kInt);
} else {
values.emplace_back(static_cast<int>(tuple.col2_));
}
if (!tuple.col3_) {
values.emplace_back(kInt);
} else {
values.emplace_back(static_cast<int>(tuple.col3_));
}
values.emplace_back(static_cast<int>(tuple.tid_));
return new Tuple(std::move(values));
}
// Create (sorted) blocks for input to SortMergeRunOperator.
template <typename ComparatorT>
void createBlocks(const std::size_t num_tuples_per_block,
const std::size_t num_blocks,
const ComparatorT &comparator,
const std::size_t top_k = 0) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dist(0, (1 << 8) - 1);
std::unique_ptr<Tuple> tuple;
std::vector<TestTuple> top_k_tuples;
MutableBlockReference storage_block;
auto sort_comparator =
[this, comparator](const TestTuple &l, const TestTuple &r) -> bool {
std::unique_ptr<Tuple> left(createTuple(l.tid_)),
right(createTuple(r.tid_));
return comparator(*left, *right);
};
expect_num_tuples_ = 0;
for (std::size_t bid = 0; bid < num_blocks; ++bid) {
// Generate tuples.
std::vector<TestTuple> tuples;
for (std::size_t i = 0; i < num_tuples_per_block; ++i) {
tuples.emplace_back(dist(gen));
}
// Sort tuples before creating the run.
std::sort(tuples.begin(), tuples.end(), sort_comparator);
// Maintain top-k.
if (top_k > 0) {
top_k_tuples.insert(top_k_tuples.end(),
tuples.begin(),
tuples.begin() + std::min(tuples.size(), top_k));
std::partial_sort(
top_k_tuples.begin(),
top_k_tuples.begin() + std::min(top_k, top_k_tuples.size()),
top_k_tuples.end(),
sort_comparator);
if (top_k_tuples.size() > top_k) {
top_k_tuples.erase(top_k_tuples.begin() + top_k, top_k_tuples.end());
}
}
// Create block.
block_id block_id = storage_manager_->createBlock(*input_table_, input_table_->getDefaultStorageBlockLayout());
storage_block = storage_manager_->getBlockMutable(block_id, *input_table_);
input_table_->addBlock(block_id);
// Insert sorted tuples.
for (std::size_t i = 0; i < num_tuples_per_block; ++i) {
tuple.reset(createTuple(tuples[i].tid_));
EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
}
storage_block->rebuild();
expect_num_tuples_ += num_tuples_per_block;
}
if (top_k > 0) {
expect_num_tuples_ = top_k;
last_expected_tuple_.reset(createTuple(top_k_tuples.back().tid_));
}
}
// Debug print method.
void printTuple(const char *prefix, const Tuple &tuple) {
if (VLOG_IS_ON(4)) {
std::ostringstream out;
out << prefix << ' ';
CatalogRelation::const_iterator attr_it = input_table_->begin();
Tuple::const_iterator value_it = tuple.begin();
for (; attr_it != input_table_->end(); ++attr_it, ++value_it) {
if (value_it->isNull()) {
out << "NULL";
} else {
out << attr_it->getType().printValueToString(*value_it);
}
out << '|';
}
out << '\n';
VLOG(4) << out.str();
}
}
void processMessages(const std::size_t num_expected_feedback_messages = 0) {
AnnotatedMessage msg;
std::size_t num_receieved_feedback_messages = 0;
do {
if (bus_.ReceiveIfAvailable(foreman_client_id_, &msg)) {
const TaggedMessage &tagged_message = msg.tagged_message;
switch (tagged_message.message_type()) {
case kWorkOrderFeedbackMessage: {
// Dispatch feedback messages to SortMergeRunOperator.
WorkOrder::FeedbackMessage feedback_msg(
const_cast<void *>(tagged_message.message()),
tagged_message.message_bytes());
EXPECT_EQ(kOpIndex, feedback_msg.header().rel_op_index);
merge_op_->receiveFeedbackMessage(feedback_msg);
++num_receieved_feedback_messages;
break;
}
case kCatalogRelationNewBlockMessage: {
serialization::CatalogRelationNewBlockMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
CatalogRelation *relation = db_->getRelationByIdMutable(proto.relation_id());
relation->addBlock(proto.block_id());
break;
}
default:
// It is safe to discard other kinds of messages (e.g. pipeline) in
// this funtion.
break;
}
}
} while (num_receieved_feedback_messages < num_expected_feedback_messages);
}
void executeOperatorUntilDone() {
bool done;
WorkOrdersContainer container(kOpIndex + 1, 0);
do {
done = merge_op_->getAllWorkOrders(&container,
query_context_.get(),
storage_manager_.get(),
foreman_client_id_,
&bus_);
while (container.hasNormalWorkOrder(kOpIndex)) {
std::unique_ptr<WorkOrder> order(container.getNormalWorkOrder(kOpIndex));
order->execute();
processMessages(1);
}
} while (!done);
}
bool executeOperator() {
bool done = false;
WorkOrdersContainer container(kOpIndex + 1, 0);
bool executed;
do {
if (!done) {
// Find work orders to execute, if not done already.
done = merge_op_->getAllWorkOrders(&container,
query_context_.get(),
storage_manager_.get(),
foreman_client_id_,
&bus_);
}
executed = false;
if (container.hasNormalWorkOrder(kOpIndex)) {
std::unique_ptr<WorkOrder> order(container.getNormalWorkOrder(kOpIndex));
order->execute();
processMessages(1);
executed = true;
}
} while (container.hasNormalWorkOrder(kOpIndex) || executed);
return done;
}
// Send doneFeedingInputBlocks callback.
void feedingDone(RelationalOperator *op) {
SortMergeRunOperator *merge_op = static_cast<SortMergeRunOperator *>(op);
merge_op->doneFeedingInputBlocks(input_table_->getID());
}
QueryContext::sort_config_id createSortConfigProto(const std::vector<attribute_id> &attrs,
const std::vector<bool> &ordering,
const std::vector<bool> &null_ordering) {
// Setup the SortConfiguration proto.
DCHECK_EQ(attrs.size(), ordering.size());
DCHECK_EQ(attrs.size(), null_ordering.size());
const QueryContext::sort_config_id sort_config_index = query_context_proto_.sort_configs_size();
serialization::SortConfiguration *sort_config_proto = query_context_proto_.add_sort_configs();
for (std::size_t i = 0; i < attrs.size(); ++i) {
serialization::SortConfiguration::OrderBy *order_by_proto = sort_config_proto->add_order_by_list();
unique_ptr<Scalar> scalar(
new ScalarAttribute(*input_table_->getAttributeById(attrs[i])));
order_by_proto->mutable_expression()->CopyFrom(scalar->getProto());
order_by_proto->set_is_ascending(ordering[i]);
order_by_proto->set_null_first(null_ordering[i]);
}
return sort_config_index;
}
// Execute in pipeline breaking mode.
void executeNotPipelined(const std::vector<attribute_id> &attrs,
const std::vector<bool> &ordering,
const std::vector<bool> &null_ordering,
const std::uint32_t merge_factor,
const std::size_t top_k = 0) {
const QueryContext::sort_config_id sort_config_index = createSortConfigProto(attrs, ordering, null_ordering);
merge_op_.reset(new SortMergeRunOperator(kQueryId,
*input_table_,
*result_table_,
insert_destination_index_,
*run_table_,
run_destination_index_,
sort_config_index,
merge_factor,
top_k,
true));
merge_op_->setOperatorIndex(kOpIndex);
// Set up the QueryContext.
query_context_.reset(new QueryContext(query_context_proto_,
*db_,
storage_manager_.get(),
foreman_client_id_,
&bus_));
executeOperatorUntilDone();
}
void feedBlocks(const std::size_t num_blocks, std::vector<block_id> *blocks) {
std::size_t count = std::min(blocks->size(), num_blocks);
std::vector<block_id> to_feed(blocks->begin() + blocks->size() - count,
blocks->end());
// Feed blocks.
DVLOG(1) << "Feeding " << to_feed.size() << " blocks.";
for (const block_id block : to_feed) {
merge_op_->feedInputBlock(block, input_table_->getID(), 0 /* partition_id */);
}
// Remove fed blocks.
blocks->erase(blocks->begin() + blocks->size() - count, blocks->end());
}
// Execute in pipelined mode.
void executePipelined(const std::vector<attribute_id> &attrs,
const std::vector<bool> &ordering,
const std::vector<bool> &null_ordering,
const std::uint32_t merge_factor,
const std::size_t num_blocks_to_feed_per_iteration,
const std::size_t top_k = 0) {
const QueryContext::sort_config_id sort_config_index = createSortConfigProto(attrs, ordering, null_ordering);
merge_op_.reset(new SortMergeRunOperator(kQueryId,
*input_table_,
*result_table_,
insert_destination_index_,
*run_table_,
run_destination_index_,
sort_config_index,
merge_factor,
top_k,
false));
merge_op_->setOperatorIndex(kOpIndex);
// Set up the QueryContext.
query_context_.reset(new QueryContext(query_context_proto_,
*db_,
storage_manager_.get(),
foreman_client_id_,
&bus_));
std::vector<block_id> blocks = input_table_->getBlocksSnapshot();
while (!blocks.empty()) {
feedBlocks(num_blocks_to_feed_per_iteration, &blocks);
EXPECT_FALSE(executeOperator());
if (blocks.empty()) {
merge_op_->doneFeedingInputBlocks(input_table_->getID());
}
// Expect the operator to finish execution with 'done' response when block
// feedDone() is signalled.
EXPECT_EQ(blocks.empty(), executeOperator());
}
}
// Check if the merged run is sorted.
template <typename ComparatorT>
void checkOutputRun(const ComparatorT comparator) {
std::vector<block_id> sorted_blocks = result_table_->getBlocksSnapshot();
std::unique_ptr<Tuple> prev, current;
std::unique_ptr<ValueAccessor> accessor;
ASSERT_GT(sorted_blocks.size(), 0u);
tuple_id num_tuples = 0;
for (const block_id &id : sorted_blocks) {
BlockReference block(storage_manager_->getBlock(id, *result_table_));
accessor.reset(block->getTupleStorageSubBlock().createValueAccessor());
InvokeOnValueAccessorNotAdapter(
accessor.get(),
[&](auto *accessor) -> void { // NOLINT(build/c++11)
while (accessor->next()) {
prev = std::move(current);
current.reset(accessor->getTuple());
++num_tuples;
if (prev) {
EXPECT_FALSE(comparator(*current, *prev));
}
this->printTuple(">", *current);
}
});
}
last_actual_tuple_ = std::move(current);
EXPECT_LT(0, expect_num_tuples_);
EXPECT_EQ(expect_num_tuples_, num_tuples);
}
void checkLastTupleAttrValueIsEqual(const attribute_id id) {
printTuple("Expected:", *last_expected_tuple_);
printTuple("Actual: ", *last_actual_tuple_);
EXPECT_EQ(last_expected_tuple_->getAttributeValue(id).isNull(),
last_actual_tuple_->getAttributeValue(id).isNull());
if (!last_expected_tuple_->getAttributeValue(id).isNull() &&
!last_actual_tuple_->getAttributeValue(id).isNull()) {
EXPECT_TRUE(last_expected_tuple_->getAttributeValue(id).fastEqualCheck(
last_actual_tuple_->getAttributeValue(id)));
}
}
std::unique_ptr<StorageManager> storage_manager_;
serialization::QueryContext query_context_proto_;
std::unique_ptr<QueryContext> query_context_;
std::unique_ptr<CatalogDatabase> db_;
CatalogRelation *input_table_, *result_table_, *run_table_;
QueryContext::insert_destination_id insert_destination_index_, run_destination_index_;
std::unique_ptr<RelationalOperator> merge_op_;
attribute_id col1_;
attribute_id col2_;
attribute_id col3_;
attribute_id null_col1_;
attribute_id null_col2_;
attribute_id null_col3_;
attribute_id tid_col_;
tuple_id expect_num_tuples_;
std::unique_ptr<Tuple> last_expected_tuple_;
std::unique_ptr<Tuple> last_actual_tuple_;
MessageBusImpl bus_;
tmb::client_id foreman_client_id_;
// This map is needed for InsertDestination and some operators that send
// messages to Foreman directly. To know the reason behind the design of this
// map, see the note in InsertDestination.hpp.
ClientIDMap *thread_id_map_;
};
const char SortMergeRunOperatorTest::kTableName[] = "table";
const char SortMergeRunOperatorTest::kResultTableName[] = "result-table";
const char SortMergeRunOperatorTest::kRunTableName[] = "run-table";
const char SortMergeRunOperatorTest::kStoragePath[] = "./sort_merge_run_operator_test_data";
const char SortMergeRunOperatorTest::kDatabaseName[] = "database";
namespace {
static const std::size_t kNumTuplesPerBlock = 100;
auto kDefaultSortComparator =
[](const Tuple &left, const Tuple &right) -> bool {
TestTupleAttrs l = TupleToTupleAttr(left);
TestTupleAttrs r = TupleToTupleAttr(right);
if (l.c1 < r.c1) {
return true;
}
return false;
};
std::vector<bool> kDefaultSortOrdering{kSortAscending};
std::vector<bool> kDefaultSortNullOrdering{kSortNullLast};
} // namespace
TEST_F(SortMergeRunOperatorTest, NotPipelined_SinglePass_SameRunsAsMergeFactor) {
const std::size_t merge_factor = 8;
const std::size_t num_blocks = merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_SinglePass_SameRunsAsMergeFactor_TopK) {
const std::size_t merge_factor = 8;
const std::size_t num_blocks = merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
std::size_t top_k = 20;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_SinglePass_LessRunsThanMergeFactor) {
const std::size_t merge_factor = 8;
const std::size_t num_blocks = merge_factor / 2;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_SinglePass_OneRun) {
const std::size_t merge_factor = 8;
const std::size_t num_blocks = 1;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_TwoPass_CompleteMerges) {
// Should cause:
// [3, 3, 3]-way merges in first pass.
// [3]-way merges in second pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_TwoPass_PartialFirstPass) {
// Should cause:
// [3, 3, 2]-way merges in first pass.
// [3]-way merges in second pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor - 1;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_TwoPass_PartialBothPasses) {
// Should cause:
// [3, 2]-way merge in first pass.
// 2-way merge in second pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor + 2;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_ThreePass_CompleteMerges) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 3, 3]-way merges in first pass.
// [3, 3, 3]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_ThreePass_CompleteMerges_TopK) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 3, 3]-way merges in first pass.
// [3, 3, 3]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
const std::size_t top_k = 33;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_ThreePass_PartialMergesInFirstPass) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 3, 1]-way merges in first pass.
// [3, 3, 3]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor - 2;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_ThreePass_PartialMergesInFirstSecondPass) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 2]-way merges in first pass.
// [3, 3, 2]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor - 4;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_ThreePass_PartialMergesInAllPasses) {
// Should cause:
// [3, 3, 3, 3, 2]-way merges in first pass.
// [3, 2]-way merges in second pass.
// [2]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = 2 * merge_factor * merge_factor - 4;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_ManyPasses_MergeFactor7) {
const std::size_t merge_factor = 7;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_ManyPasses_MergeFactor7_TopK) {
const std::size_t merge_factor = 7;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
const std::size_t top_k = 17;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_ManyPasses_MergeFactor17) {
const std::size_t merge_factor = 17;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, NotPipelined_ManyPasses_MergeFactor17_TopK) {
const std::size_t merge_factor = 17;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
const std::size_t top_k = 53;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executeNotPipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_SinglePass_LessRunsThanMergeFactor_AllFeed) {
const std::size_t merge_factor = 8;
const std::size_t num_blocks = merge_factor / 2;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_SinglePass_LessRunsThanMergeFactor_SlowFeed) {
const std::size_t merge_factor = 16;
const std::size_t num_blocks = merge_factor - 1;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_SinglePass_SameRunsAsMergeFactor_AllFeed) {
const std::size_t merge_factor = 8;
const std::size_t num_blocks = merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_SinglePass_SameRunsAsMergeFactor_AllFeed_TopK) {
const std::size_t merge_factor = 8;
const std::size_t num_blocks = merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
const std::size_t top_k = 13;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executePipelined(
kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks, top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_SinglePass_SameRunsAsMergeFactor_SlowFeed) {
const std::size_t merge_factor = 8;
const std::size_t num_blocks = merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_SinglePass_OneRun) {
const std::size_t merge_factor = 8;
const std::size_t num_blocks = 1;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_TwoPass_CompleteMerges_AllFeed) {
// Should cause:
// [3, 3, 3]-way merges in first pass.
// [3]-way merges in second pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_TwoPass_CompleteMerges_SlowFeed) {
// Should cause:
// [3, 3, 3]-way merges in first pass.
// [3]-way merges in second pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_TwoPass_PartialFirstPass_AllFeed) {
// Should cause:
// [3, 3, 2]-way merges in first pass.
// [3]-way merges in second pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor - 1;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_TwoPass_PartialFirstPass_SlowFeed) {
// Should cause:
// [3, 3, 2]-way merges in first pass.
// [3]-way merges in second pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor - 1;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_TwoPass_PartialBothPasses_AllFeed) {
// Should cause:
// [3, 2]-way merge in first pass.
// 2-way merge in second pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor + 2;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_TwoPass_PartialBothPasses_SlowFeed) {
// Should cause:
// [3, 2]-way merge in first pass.
// 2-way merge in second pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor + 2;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ThreePass_CompleteMerges_AllFeed) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 3, 3]-way merges in first pass.
// [3, 3, 3]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ThreePass_CompleteMerges_SlowFeed) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 3, 3]-way merges in first pass.
// [3, 3, 3]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ThreePass_CompleteMerges_SlowFeed_TopK) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 3, 3]-way merges in first pass.
// [3, 3, 3]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
const std::size_t top_k = 19;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1, top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ThreePass_PartialMergesInFirstPass_AllFeed) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 3, 1]-way merges in first pass.
// [3, 3, 3]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor - 2;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ThreePass_PartialMergesInFirstPass_SlowFeed) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 3, 1]-way merges in first pass.
// [3, 3, 3]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor - 2;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ThreePass_PartialMergesInFirstSecondPass_AllFeed) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 2]-way merges in first pass.
// [3, 3, 2]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor - 4;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ThreePass_PartialMergesInFirstSecondPass_SlowFeed) {
// Should cause:
// [3, 3, 3, 3, 3, 3, 3, 2]-way merges in first pass.
// [3, 3, 2]-way merges in second pass.
// [3]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = merge_factor * merge_factor * merge_factor - 4;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ThreePass_PartialMergesInAllPasses_AllFeed) {
// Should cause:
// [3, 3, 3, 3, 2]-way merges in first pass.
// [3, 2]-way merges in second pass.
// [2]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = 2 * merge_factor * merge_factor - 4;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ThreePass_PartialMergesInAllPasses_SlowFeed) {
// Should cause:
// [3, 3, 3, 3, 2]-way merges in first pass.
// [3, 2]-way merges in second pass.
// [2]-way merges in third pass.
const std::size_t merge_factor = 3;
const std::size_t num_blocks = 2 * merge_factor * merge_factor - 4;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ManyPasses_MergeFactor7_AllFeed) {
const std::size_t merge_factor = 7;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ManyPasses_MergeFactor7_AllFeed_TopK) {
const std::size_t merge_factor = 7;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
const std::size_t top_k = 78;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks,
top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ManyPasses_MergeFactor7_SlowFeed) {
const std::size_t merge_factor = 7;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ManyPasses_MergeFactor7_SlowFeed_TopK) {
const std::size_t merge_factor = 7;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
const std::size_t top_k = 37;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1, top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ManyPasses_MergeFactor17_AllFeed) {
const std::size_t merge_factor = 17;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ManyPasses_MergeFactor17_AllFeed_TopK) {
const std::size_t merge_factor = 17;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
const std::size_t top_k = 32;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executePipelined(
kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, num_blocks, top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ManyPasses_MergeFactor17_SlowFeed) {
const std::size_t merge_factor = 17;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1);
checkOutputRun(kDefaultSortComparator);
}
TEST_F(SortMergeRunOperatorTest, Pipleined_ManyPasses_MergeFactor17_SlowFeed_TopK) {
const std::size_t merge_factor = 17;
const std::size_t num_blocks = 509;
std::vector<attribute_id> kDefaultSortAttributes{col1_};
const std::size_t top_k = 42;
createBlocks(kNumTuplesPerBlock, num_blocks, kDefaultSortComparator, top_k);
executePipelined(kDefaultSortAttributes, kDefaultSortOrdering, kDefaultSortNullOrdering, merge_factor, 1, top_k);
checkOutputRun(kDefaultSortComparator);
checkLastTupleAttrValueIsEqual(col1_);
}
} // namespace quickstep
int main(int argc, char* argv[]) {
google::InitGoogleLogging(argv[0]);
// Honor FLAGS_buffer_pool_slots in StorageManager.
gflags::ParseCommandLineFlags(&argc, &argv, true);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}