blob: f57c34d1be334433a470d19132e6d087504b997c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <memory>
#include <vector>
#include "core/block/block.h"
#include "exec/operator/mock_operator.h"
#include "exec/operator/operator_helper.h"
#include "exec/operator/union_sink_operator.h"
#include "exec/operator/union_source_operator.h"
#include "testutil/column_helper.h"
#include "testutil/mock/mock_descriptors.h"
#include "testutil/mock/mock_literal_expr.h"
#include "testutil/mock/mock_slot_ref.h"
namespace doris {
struct MockUnionSourceOperator : public UnionSourceOperatorX {
MockUnionSourceOperator(int32_t child_size, DataTypes types, ObjectPool* pool)
: UnionSourceOperatorX(child_size), _mock_row_descriptor(types, pool) {}
RowDescriptor& row_descriptor() override { return _mock_row_descriptor; }
MockRowDescriptor _mock_row_descriptor;
};
struct MockUnionSinkOperator : public UnionSinkOperatorX {
MockUnionSinkOperator(int child_size, int cur_child_id, int first_materialized_child_idx,
DataTypes types, ObjectPool* pool)
: UnionSinkOperatorX(child_size, cur_child_id, first_materialized_child_idx),
_mock_row_descriptor(types, pool) {}
RowDescriptor& row_descriptor() override { return _mock_row_descriptor; }
MockRowDescriptor _mock_row_descriptor;
};
struct UnionOperatorTest : public ::testing::Test {
void SetUp() override {
state = std::make_shared<MockRuntimeState>();
state->batsh_size = 10;
for (int i = 0; i < child_size; i++) {
sink_state.push_back(std::make_shared<MockRuntimeState>());
sink_ops.push_back(nullptr);
}
}
std::shared_ptr<MockUnionSourceOperator> source_op;
UnionSourceLocalState* source_local_state;
std::shared_ptr<MockRuntimeState> state;
RuntimeProfile profile {""};
ObjectPool pool;
const int child_size = 3;
const int first_materialized_child_idx = 1;
std::vector<std::shared_ptr<MockUnionSinkOperator>> sink_ops;
std::vector<std::shared_ptr<MockRuntimeState>> sink_state;
};
TEST_F(UnionOperatorTest, test_all_const_expr) {
state->batsh_size = 2;
source_op.reset(new MockUnionSourceOperator {
0,
{std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()},
&pool});
EXPECT_TRUE(source_op->prepare(state.get()));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({1, 10, 100, 1000}));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({2, 20, 200, 2000}));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({3, 30, 300, 3000}));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({4, 40, 400, 4000}));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({5, 50, 500, 5000}));
auto source_local_state_uptr =
std::make_unique<UnionSourceLocalState>(state.get(), source_op.get());
source_local_state = source_local_state_uptr.get();
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = nullptr,
.shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(source_local_state->init(state.get(), info));
state->resize_op_id_to_local_state(-100);
state->emplace_local_state(source_op->operator_id(), std::move(source_local_state_uptr));
EXPECT_TRUE(source_local_state->open(state.get()));
EXPECT_EQ(source_local_state->dependencies().size(), 1);
EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies()));
EXPECT_TRUE(source_local_state->_need_read_for_const_expr);
EXPECT_EQ(source_local_state->_const_expr_list_idx, 0);
EXPECT_TRUE(source_op->has_more_const(state.get()));
{
Block block;
bool eos;
EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
EXPECT_EQ(block.rows(), 2);
EXPECT_FALSE(eos);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block, Block {
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2}),
ColumnHelper::create_column_with_name<DataTypeInt64>({10, 20}),
ColumnHelper::create_column_with_name<DataTypeInt64>({100, 200}),
ColumnHelper::create_column_with_name<DataTypeInt64>({1000, 2000}),
}));
}
{
Block block;
bool eos;
EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
EXPECT_EQ(block.rows(), 2);
EXPECT_FALSE(eos);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block, Block {
ColumnHelper::create_column_with_name<DataTypeInt64>({3, 4}),
ColumnHelper::create_column_with_name<DataTypeInt64>({30, 40}),
ColumnHelper::create_column_with_name<DataTypeInt64>({300, 400}),
ColumnHelper::create_column_with_name<DataTypeInt64>({3000, 4000}),
}));
}
{
Block block;
bool eos;
EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
EXPECT_EQ(block.rows(), 1);
EXPECT_TRUE(eos);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block, Block {
ColumnHelper::create_column_with_name<DataTypeInt64>({5}),
ColumnHelper::create_column_with_name<DataTypeInt64>({50}),
ColumnHelper::create_column_with_name<DataTypeInt64>({500}),
ColumnHelper::create_column_with_name<DataTypeInt64>({5000}),
}));
}
}
TEST_F(UnionOperatorTest, test_sink_and_source) {
DataTypes types = {std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()};
source_op.reset(new MockUnionSourceOperator {child_size, types, &pool});
EXPECT_TRUE(source_op->prepare(state.get()));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({1, 10}));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({2, 20}));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({3, 30}));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({4, 40}));
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({5, 50}));
for (int i = 0; i < child_size; i++) {
sink_ops[i].reset(new MockUnionSinkOperator {child_size, i, first_materialized_child_idx,
types, &pool});
sink_ops[i]->_child_expr = MockSlotRef::create_mock_contexts(types);
}
auto shared_state = sink_ops[0]->create_shared_state();
//auto & data_queue = dynamic_cast<UnionSharedState*>(shared_state.get())->data_queue;
EXPECT_TRUE(shared_state != nullptr);
EXPECT_TRUE(sink_ops[1]->create_shared_state() == nullptr);
EXPECT_TRUE(sink_ops[2]->create_shared_state() == nullptr);
{
auto source_local_state_uptr =
std::make_unique<UnionSourceLocalState>(state.get(), source_op.get());
source_local_state = source_local_state_uptr.get();
LocalStateInfo info {.parent_profile = &profile,
.scan_ranges = {},
.shared_state = shared_state.get(),
.shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(source_local_state->init(state.get(), info));
state->resize_op_id_to_local_state(-100);
state->emplace_local_state(source_op->operator_id(), std::move(source_local_state_uptr));
EXPECT_TRUE(source_local_state->open(state.get()));
EXPECT_EQ(source_local_state->dependencies().size(), 1);
EXPECT_TRUE(OperatorHelper::is_block(source_local_state->dependencies()));
}
{
for (int i = 0; i < child_size; i++) {
auto sink_local_state_uptr =
std::make_unique<UnionSinkLocalState>(sink_ops[i].get(), sink_state[i].get());
auto* sink_local_state = sink_local_state_uptr.get();
LocalSinkStateInfo info {.task_idx = 0,
.parent_profile = &profile,
.sender_id = 0,
.shared_state = shared_state.get(),
.shared_state_map = {},
.tsink = TDataSink {}};
EXPECT_TRUE(sink_local_state->init(sink_state[i].get(), info));
sink_state[i]->resize_op_id_to_local_state(-100);
sink_state[i]->emplace_sink_local_state(sink_ops[i]->operator_id(),
std::move(sink_local_state_uptr));
EXPECT_TRUE(sink_local_state->open(sink_state[i].get()));
EXPECT_EQ(sink_local_state->dependencies().size(), 1);
EXPECT_TRUE(OperatorHelper::is_ready(sink_local_state->dependencies()));
}
}
{
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4});
EXPECT_TRUE(sink_ops[0]->sink(sink_state[0].get(), &block, false));
}
{
Block block;
bool eos;
EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
EXPECT_EQ(block.rows(), 5);
EXPECT_FALSE(eos);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block,
Block {
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5}),
ColumnHelper::create_column_with_name<DataTypeInt64>({10, 20, 30, 40, 50}),
}));
}
EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies()));
{
Block block;
bool eos;
EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
EXPECT_EQ(block.rows(), 2);
EXPECT_FALSE(eos);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4})));
}
EXPECT_TRUE(OperatorHelper::is_block(source_local_state->dependencies()));
{
for (int i = 0; i < child_size; i++) {
sink_state[i]->batsh_size = 2;
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4});
EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false));
}
for (int i = 0; i < child_size; i++) {
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4});
EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false));
}
for (int i = 0; i < child_size; i++) {
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4});
EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, true));
}
}
EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies()));
for (int i = 0; i < 8; i++) {
Block block;
bool eos;
EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
EXPECT_EQ(block.rows(), 2);
EXPECT_FALSE(eos);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4})));
}
{
Block block;
bool eos;
EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
EXPECT_EQ(block.rows(), 2);
EXPECT_TRUE(eos);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3, 4})));
}
}
} // namespace doris