blob: 945fd0f9f1fc816201c371417f07f818472ed441 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <memory>
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
#include "exec/operator/aggregation_sink_operator.h"
#include "exec/operator/aggregation_source_operator.h"
#include "exec/operator/assert_num_rows_operator.h"
#include "exec/operator/mock_operator.h"
#include "exec/operator/operator_helper.h"
#include "exec/pipeline/dependency.h"
#include "testutil/column_helper.h"
#include "testutil/mock/mock_agg_fn_evaluator.h"
#include "testutil/mock/mock_slot_ref.h"
namespace doris {
auto static init_sink_and_source(std::shared_ptr<AggSinkOperatorX> sink_op,
std::shared_ptr<AggSourceOperatorX> source_op,
OperatorContext& ctx) {
auto shared_state = sink_op->create_shared_state();
{
auto local_state = AggSinkOperatorX::LocalState ::create_unique(sink_op.get(), &ctx.state);
LocalSinkStateInfo info {.task_idx = 0,
.parent_profile = &ctx.profile,
.sender_id = 0,
.shared_state = shared_state.get(),
.shared_state_map = {},
.tsink = TDataSink {}};
EXPECT_TRUE(local_state->init(&ctx.state, info).ok());
ctx.state.emplace_sink_local_state(0, std::move(local_state));
}
{
auto local_state =
AggSourceOperatorX::LocalState::create_unique(&ctx.state, source_op.get());
LocalStateInfo info {.parent_profile = &ctx.profile,
.scan_ranges = {},
.shared_state = shared_state.get(),
.shared_state_map = {},
.task_idx = 0};
EXPECT_TRUE(local_state->init(&ctx.state, info).ok());
ctx.state.resize_op_id_to_local_state(-100);
ctx.state.emplace_local_state(source_op->operator_id(), std::move(local_state));
}
{
auto* sink_local_state = ctx.state.get_sink_local_state();
EXPECT_TRUE(sink_local_state->open(&ctx.state).ok());
}
{
auto* source_local_state = ctx.state.get_local_state(source_op->operator_id());
EXPECT_TRUE(source_local_state->open(&ctx.state).ok());
}
return shared_state;
}
struct MockAggsinkOperator : public AggSinkOperatorX {
MockAggsinkOperator() = default;
Status _init_probe_expr_ctx(RuntimeState* state) override { return Status::OK(); }
Status _init_aggregate_evaluators(RuntimeState* state) override { return Status::OK(); }
Status _check_agg_fn_output() override { return Status::OK(); }
};
struct MockAggSourceOperator : public AggSourceOperatorX {
MockAggSourceOperator() = default;
RowDescriptor& row_descriptor() override { return *mock_row_descriptor; }
std::unique_ptr<RowDescriptor> mock_row_descriptor;
};
std::shared_ptr<AggSinkOperatorX> create_agg_sink_op(OperatorContext& ctx, bool is_merge,
bool without_key) {
auto op = std::make_shared<MockAggsinkOperator>();
op->_aggregate_evaluators.push_back(
create_mock_agg_fn_evaluator(ctx.pool, is_merge, without_key));
op->_pool = &ctx.pool;
EXPECT_TRUE(op->prepare(&ctx.state).ok());
return op;
}
std::shared_ptr<AggSourceOperatorX> create_agg_source_op(OperatorContext& ctx, bool without_key,
bool needs_finalize) {
auto op = std::make_shared<MockAggSourceOperator>();
op->mock_row_descriptor.reset(
new MockRowDescriptor {{std::make_shared<DataTypeInt64>()}, &ctx.pool});
op->_without_key = without_key;
op->_needs_finalize = needs_finalize;
EXPECT_TRUE(op->prepare(&ctx.state).ok());
return op;
}
TEST(AggOperatorTestWithOutGroupBy, test_need_finalize) {
OperatorContext ctx;
auto sink_op = create_agg_sink_op(ctx, false, true);
auto source_op = create_agg_source_op(ctx, true, true);
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3});
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block = ColumnHelper::create_block<DataTypeInt64>({});
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(eos);
EXPECT_EQ(block.rows(), 1);
EXPECT_TRUE(
ColumnHelper::block_equal(block, ColumnHelper::create_block<DataTypeInt64>({6})));
}
}
TEST(AggOperatorTestWithOutGroupBy, test_no_need_finalize) {
OperatorContext ctx;
auto sink_op = create_agg_sink_op(ctx, false, true);
auto source_op = create_agg_source_op(ctx, true, false);
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3});
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block = ColumnHelper::create_block<DataTypeInt64>({});
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(eos);
EXPECT_EQ(block.rows(), 1);
EXPECT_TRUE(
check_and_get_column<ColumnFixedLengthObject>(*block.get_by_position(0).column));
}
}
Block test_agg_1_phase(Block origin_block) {
OperatorContext ctx;
auto sink_op = create_agg_sink_op(ctx, false, true);
auto source_op = create_agg_source_op(ctx, true, false);
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
EXPECT_TRUE(sink_op->sink(&ctx.state, &origin_block, true).ok());
Block serialize_block = ColumnHelper::create_block<DataTypeInt64>({});
bool eos = false;
EXPECT_TRUE(source_op->get_block(&ctx.state, &serialize_block, &eos).ok());
EXPECT_TRUE(eos);
EXPECT_EQ(serialize_block.rows(), 1);
EXPECT_TRUE(check_and_get_column<ColumnFixedLengthObject>(
*serialize_block.get_by_position(0).column));
return serialize_block;
}
void test_agg_2_phase(Block serialize_block) {
OperatorContext ctx;
auto sink_op = create_agg_sink_op(ctx, true, false);
auto source_op = create_agg_source_op(ctx, true, true);
auto shared_state2 = init_sink_and_source(sink_op, source_op, ctx);
EXPECT_TRUE(sink_op->sink(&ctx.state, &serialize_block, true).ok());
Block result_block = ColumnHelper::create_block<DataTypeInt64>({});
bool eos = false;
EXPECT_TRUE(source_op->get_block(&ctx.state, &result_block, &eos).ok());
EXPECT_TRUE(eos);
EXPECT_EQ(result_block.rows(), 1);
EXPECT_TRUE(ColumnHelper::block_equal(result_block,
ColumnHelper::create_block<DataTypeInt64>({6})));
}
TEST(AggOperatorTestWithOutGroupBy, test_2_phase) {
auto serialize_block = test_agg_1_phase(ColumnHelper::create_block<DataTypeInt64>({1, 2, 3}));
test_agg_2_phase(serialize_block);
}
TEST(AggOperatorTestWithOutGroupBy, test_multi_input) {
OperatorContext ctx;
auto sink_op = std::make_shared<MockAggsinkOperator>();
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(
ctx.pool, MockSlotRef::create_mock_contexts(0, std::make_shared<const DataTypeInt64>()),
false, true));
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(
ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<const DataTypeInt64>()),
false, true));
sink_op->_pool = &ctx.pool;
EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()}, &ctx.pool});
source_op->_without_key = true;
source_op->_needs_finalize = true;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block {ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3}),
ColumnHelper::create_column_with_name<DataTypeInt64>({4, 5, 6})};
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block;
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(ColumnHelper::block_equal(
block, Block {ColumnHelper::create_column_with_name<DataTypeInt64>({6}),
ColumnHelper::create_column_with_name<DataTypeInt64>({15})}));
}
}
struct AggOperatorTestWithGroupBy : public testing::Test {
public:
void SetUp() override {}
};
TEST_F(AggOperatorTestWithGroupBy, test_need_finalize_only_key) {
/*
group by key and sum(value)
+---------------+
|column(Int64) |
+---------------+
| 1|
| 2|
| 3|
| 1|
| 2|
| 3|
+---------------+
+---------------+---------------+
|(Int64) |(Int64) |
+---------------+---------------+
| 1| 2|
| 2| 4|
| 3| 6|
+---------------+---------------+
*/
OperatorContext ctx;
auto sink_op = std::make_shared<MockAggsinkOperator>();
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(ctx.pool, false, false));
sink_op->_pool = &ctx.pool;
EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs =
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()}, &ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = true;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 1, 2, 3});
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block;
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(ColumnHelper::block_equal(
block, Block {ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3}),
ColumnHelper::create_column_with_name<DataTypeInt64>({2, 4, 6})}));
}
}
TEST_F(AggOperatorTestWithGroupBy, test_need_finalize) {
/*
group by key | sum(value)
+---------------+---------------+
|column(Int64) |column(Int64) |
+---------------+---------------+
| 1| 1|
| 1| 1|
| 2| 100|
| 2| 100|
| 2| 100|
| 3| 1000|
+---------------+---------------+
+---------------+---------------+
|(Int64) |(Int64) |
+---------------+---------------+
| 1| 2|
| 2| 300|
| 3| 1000|
+---------------+---------------+
*/
OperatorContext ctx;
auto sink_op = std::make_shared<MockAggsinkOperator>();
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(
ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<DataTypeInt64>()),
false, false));
sink_op->_pool = &ctx.pool;
EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs =
MockSlotRef::create_mock_contexts(0, std::make_shared<DataTypeInt64>());
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()}, &ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = true;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block {
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 1, 2, 2, 2, 3}),
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 1, 100, 100, 100, 1000})};
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block;
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(ColumnHelper::block_equal(
block,
Block {ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3}),
ColumnHelper::create_column_with_name<DataTypeInt64>({2, 300, 1000})}));
}
}
TEST_F(AggOperatorTestWithGroupBy, test_2_phase) {
/*
group by key | sum(value)
+---------------+---------------+
|column(Int64) |column(Int64) |
+---------------+---------------+
| 1| 1|
| 1| 1|
| 2| 100|
| 2| 100|
| 2| 100|
| 3| 1000|
+---------------+---------------+
+---------------+---------------+
|(Int64) |(Int64) |
+---------------+---------------+
| 1| 2|
| 2| 300|
| 3| 1000|
+---------------+---------------+
*/
auto phase1 = []() {
OperatorContext ctx;
auto sink_op = std::make_shared<MockAggsinkOperator>();
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(
ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<DataTypeInt64>()),
false, false));
sink_op->_pool = &ctx.pool;
EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs =
MockSlotRef::create_mock_contexts(0, std::make_shared<DataTypeInt64>());
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()}, &ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = false;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block {ColumnHelper::create_column_with_name<DataTypeInt64>({1, 1, 2, 2, 2, 3}),
ColumnHelper::create_column_with_name<DataTypeInt64>(
{1, 1, 100, 100, 100, 1000})};
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block;
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
return block;
}
};
auto phase2 = [](Block& serialize_block) {
OperatorContext ctx;
auto sink_op = std::make_shared<MockAggsinkOperator>();
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(
ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<DataTypeInt64>()),
true, false));
sink_op->_pool = &ctx.pool;
EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs =
MockSlotRef::create_mock_contexts(0, std::make_shared<DataTypeInt64>());
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()}, &ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = true;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
auto st = sink_op->sink(&ctx.state, &serialize_block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block;
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(ColumnHelper::block_equal(
block,
Block {ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3}),
ColumnHelper::create_column_with_name<DataTypeInt64>({2, 300, 1000})}));
}
};
auto block = phase1();
phase2(block);
}
TEST_F(AggOperatorTestWithGroupBy, other_case_1) {
OperatorContext ctx;
auto sink_op = std::make_shared<MockAggsinkOperator>();
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(
ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<DataTypeInt64>()),
false, false));
sink_op->_pool = &ctx.pool;
EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_is_merge = true;
sink_op->_probe_expr_ctxs =
MockSlotRef::create_mock_contexts(0, std::make_shared<DataTypeInt64>());
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeInt64>()}, &ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = false;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block {
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 1, 2, 2, 2, 3}),
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 1, 100, 100, 100, 1000})};
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
}
TEST(AggOperatorTestWithOutGroupBy, other_case_1) {
OperatorContext ctx;
auto sink_op = create_agg_sink_op(ctx, false, true);
sink_op->_is_merge = true;
auto source_op = create_agg_source_op(ctx, true, true);
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3});
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block = ColumnHelper::create_block<DataTypeInt64>({});
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(eos);
EXPECT_EQ(block.rows(), 1);
EXPECT_TRUE(
ColumnHelper::block_equal(block, ColumnHelper::create_block<DataTypeInt64>({6})));
}
}
TEST(AggOperatorTestWithOutGroupBy, other_case_2) {
OperatorContext ctx;
auto sink_op = create_agg_sink_op(ctx, false, true);
sink_op->_is_merge = true;
auto source_op = create_agg_source_op(ctx, true, true);
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
static_cast<AggSharedState*>(shared_state.get())->make_nullable_keys.push_back(0);
auto* local_state =
static_cast<AggLocalState*>(ctx.state.get_local_state(source_op->operator_id()));
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3});
local_state->make_nullable_output_key(&block);
EXPECT_TRUE(ColumnHelper::block_equal(block, ColumnHelper::create_nullable_block<DataTypeInt64>(
{1, 2, 3}, {false, false, false})));
}
TEST_F(AggOperatorTestWithGroupBy, other_case_2) {
OperatorContext ctx;
auto sink_op = std::make_shared<MockAggsinkOperator>();
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(ctx.pool, false, false));
sink_op->_pool = &ctx.pool;
EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()),
std::make_shared<DataTypeInt64>()},
&ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = true;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block = ColumnHelper::create_nullable_block<DataTypeInt64>(
{1, 2, 3, 1, 2, 3}, {false, false, false, true, true, true});
auto* local_state =
static_cast<AggSinkOperatorX::LocalState*>(ctx.state.get_sink_local_state());
ColumnRawPtrs key_columns;
key_columns.push_back(block.get_by_position(0).column.get());
local_state->_places.resize(block.rows());
local_state->_emplace_into_hash_table(local_state->_places.data(), key_columns,
block.rows());
EXPECT_EQ(local_state->get_hash_table_size(), 4); // [1,2,3,null]
}
}
TEST_F(AggOperatorTestWithGroupBy, other_case_3) {
auto phase1 = []() {
OperatorContext ctx;
auto sink_op = std::make_shared<MockAggsinkOperator>();
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(
ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<DataTypeInt64>()),
false, false));
sink_op->_pool = &ctx.pool;
EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(
0, std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()),
std::make_shared<DataTypeInt64>()},
&ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = false;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block {ColumnHelper::create_nullable_column_with_name<DataTypeInt64>(
{1, 1, 2, 2, 2, 3}, {false, false, false, true, false, false}),
ColumnHelper::create_column_with_name<DataTypeInt64>(
{1, 1, 100, 100, 100, 1000})};
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block;
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
return block;
}
};
auto phase2 = [](Block& serialize_block) {
OperatorContext ctx;
auto sink_op = std::make_shared<MockAggsinkOperator>();
sink_op->_aggregate_evaluators.push_back(create_mock_agg_fn_evaluator(
ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<DataTypeInt64>()),
true, false));
sink_op->_pool = &ctx.pool;
EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(
0, std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()),
std::make_shared<DataTypeInt64>()},
&ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = true;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
auto st = sink_op->sink(&ctx.state, &serialize_block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block;
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block, Block {ColumnHelper::create_nullable_column_with_name<DataTypeInt64>(
{1, 2, 3, 0}, {false, false, false, true}),
ColumnHelper::create_column_with_name<DataTypeInt64>(
{2, 200, 1000, 100})}));
}
};
auto block = phase1();
phase2(block);
}
TEST(AggOperatorTestWithOutGroupBy, other_case_3) {
OperatorContext ctx;
auto sink_op = create_agg_sink_op(ctx, false, true);
sink_op->_is_merge = true;
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(new MockRowDescriptor {
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())}, &ctx.pool});
source_op->_without_key = true;
source_op->_needs_finalize = true;
EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
{
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3});
auto st = sink_op->sink(&ctx.state, &block, true);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
Block block = ColumnHelper::create_block<DataTypeInt64>({});
bool eos = false;
auto st = source_op->get_block(&ctx.state, &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(eos);
EXPECT_EQ(block.rows(), 1);
EXPECT_TRUE(ColumnHelper::block_equal(
block, ColumnHelper::create_nullable_block<DataTypeInt64>({6}, {false})));
}
}
} // namespace doris