blob: 883530a20e4f4332d90c52f5f147aa23391f6329 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/operator/partitioned_aggregation_test_helper.h"
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <gmock/gmock-actions.h>
#include <gmock/gmock-function-mocker.h>
#include <gmock/gmock-spec-builders.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <memory>
#include <vector>
#include "core/data_type/define_primitive_type.h"
#include "testutil/creators.h"
#include "testutil/mock/mock_operators.h"
namespace doris {
TPlanNode PartitionedAggregationTestHelper::create_test_plan_node() {
TPlanNode tnode;
tnode.node_id = 0;
tnode.node_type = TPlanNodeType::AGGREGATION_NODE;
tnode.num_children = 1;
tnode.agg_node.use_streaming_preaggregation = false;
tnode.agg_node.need_finalize = false;
tnode.agg_node.intermediate_tuple_id = 1;
tnode.agg_node.output_tuple_id = 2;
tnode.limit = -1;
auto& grouping_expr = tnode.agg_node.grouping_exprs.emplace_back();
auto& expr_node = grouping_expr.nodes.emplace_back();
expr_node.node_type = TExprNodeType::SLOT_REF;
TTypeNode type_node;
type_node.type = TTypeNodeType::SCALAR;
type_node.scalar_type.type = TPrimitiveType::INT;
type_node.__isset.scalar_type = true;
expr_node.type.types.emplace_back(type_node);
expr_node.__set_is_nullable(false);
expr_node.num_children = 0;
expr_node.slot_ref.slot_id = 0;
expr_node.slot_ref.tuple_id = 0;
auto& agg_function = tnode.agg_node.aggregate_functions.emplace_back();
auto& fn_node = agg_function.nodes.emplace_back();
fn_node.node_type = TExprNodeType::FUNCTION_CALL;
fn_node.__set_is_nullable(false);
fn_node.num_children = 1;
TFunctionName fn_name;
fn_name.function_name = "sum";
fn_node.fn.__set_name(fn_name);
TTypeDesc ret_type;
auto& ret_type_node = ret_type.types.emplace_back();
ret_type_node.scalar_type.type = TPrimitiveType::BIGINT;
ret_type_node.__isset.scalar_type = true;
ret_type_node.type = TTypeNodeType::SCALAR;
ret_type.__set_is_nullable(false);
TTypeDesc arg_type;
auto& arg_type_node = arg_type.types.emplace_back();
arg_type_node.scalar_type.type = TPrimitiveType::INT;
arg_type_node.__isset.scalar_type = true;
arg_type_node.type = TTypeNodeType::SCALAR;
fn_node.fn.__set_ret_type(ret_type);
fn_node.fn.__set_arg_types({arg_type});
fn_node.agg_expr.__set_param_types({arg_type});
auto& fn_child_node = agg_function.nodes.emplace_back();
fn_child_node.node_type = TExprNodeType::SLOT_REF;
fn_child_node.__set_is_nullable(false);
fn_child_node.num_children = 0;
fn_child_node.slot_ref.slot_id = 1;
fn_child_node.slot_ref.tuple_id = 0;
fn_child_node.type.types.emplace_back(type_node);
tnode.row_tuples.push_back(0);
return tnode;
}
TDescriptorTable PartitionedAggregationTestHelper::create_test_table_descriptor(
bool nullable = false) {
TTupleDescriptorBuilder tuple_builder;
tuple_builder
.add_slot(TSlotDescriptorBuilder()
.type(PrimitiveType::TYPE_INT)
.column_name("col1")
.column_pos(0)
.nullable(nullable)
.build())
.add_slot(TSlotDescriptorBuilder()
.type(PrimitiveType::TYPE_INT)
.column_name("col2")
.column_pos(0)
.nullable(nullable)
.build());
TDescriptorTableBuilder builder;
tuple_builder.build(&builder);
TTupleDescriptorBuilder()
.add_slot(TSlotDescriptorBuilder()
.type(TYPE_INT)
.column_name("col3")
.column_pos(0)
.nullable(nullable)
.build())
.add_slot(TSlotDescriptorBuilder()
.type(TYPE_BIGINT)
.column_name("col4")
.column_pos(0)
.nullable(true)
.build())
.build(&builder);
TTupleDescriptorBuilder()
.add_slot(TSlotDescriptorBuilder()
.type(TYPE_INT)
.column_name("col5")
.column_pos(0)
.nullable(nullable)
.build())
.add_slot(TSlotDescriptorBuilder()
.type(TYPE_BIGINT)
.column_name("col6")
.column_pos(0)
.nullable(true)
.build())
.build(&builder);
return builder.desc_tbl();
}
std::tuple<std::shared_ptr<PartitionedAggSourceOperatorX>,
std::shared_ptr<PartitionedAggSinkOperatorX>>
PartitionedAggregationTestHelper::create_operators() {
TPlanNode tnode = create_test_plan_node();
TQueryOptions query_options = runtime_state->query_options();
query_options.__set_min_revocable_mem(0);
runtime_state->set_query_options(query_options);
auto desc_tbl = runtime_state->desc_tbl();
EXPECT_EQ(desc_tbl.get_tuple_descs().size(), 3);
auto source_operator =
std::make_shared<PartitionedAggSourceOperatorX>(obj_pool.get(), tnode, 0, desc_tbl);
auto sink_operator =
std::make_shared<PartitionedAggSinkOperatorX>(obj_pool.get(), 0, 0, tnode, desc_tbl);
auto child_operator = std::make_shared<MockChildOperator>();
auto probe_side_source_operator = std::make_shared<MockChildOperator>();
auto source_side_sink_operator = std::make_shared<MockSinkOperator>();
auto [source_pipeline, _] = generate_agg_pipeline(source_operator, source_side_sink_operator,
sink_operator, child_operator);
RowDescriptor row_desc(runtime_state->desc_tbl(), {0});
child_operator->_row_descriptor = row_desc;
EXPECT_TRUE(sink_operator->set_child(child_operator));
// Setup task and state
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0, runtime_state.get(), nullptr,
nullptr, shared_state_map, 0);
return {std::move(source_operator), std::move(sink_operator)};
}
PartitionedAggLocalState* PartitionedAggregationTestHelper::create_source_local_state(
RuntimeState* state, PartitionedAggSourceOperatorX* probe_operator,
std::shared_ptr<MockPartitionedAggSharedState>& shared_state) {
auto local_state_uptr = std::make_unique<MockPartitionedAggLocalState>(state, probe_operator);
auto* local_state = local_state_uptr.get();
shared_state = std::make_shared<MockPartitionedAggSharedState>();
local_state->_shared_state = shared_state.get();
shared_state->_is_spilled = true;
ADD_TIMER(local_state->common_profile(), "ExecTime");
local_state->common_profile()->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 0);
local_state->init_spill_read_counters();
local_state->init_spill_write_counters();
local_state->_internal_runtime_profile = std::make_unique<RuntimeProfile>("inner_test");
state->emplace_local_state(probe_operator->operator_id(), std::move(local_state_uptr));
return local_state;
}
PartitionedAggSinkLocalState* PartitionedAggregationTestHelper::create_sink_local_state(
RuntimeState* state, PartitionedAggSinkOperatorX* sink_operator,
std::shared_ptr<MockPartitionedAggSharedState>& shared_state) {
auto local_state_uptr = MockPartitionedAggSinkLocalState::create_unique(sink_operator, state);
auto* local_state = local_state_uptr.get();
shared_state = std::make_shared<MockPartitionedAggSharedState>();
local_state->init_spill_counters();
ADD_TIMER(local_state->common_profile(), "ExecTime");
local_state->common_profile()->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 0);
local_state->_internal_runtime_profile = std::make_unique<RuntimeProfile>("inner_test");
local_state->_dependency = shared_state->create_sink_dependency(
sink_operator->dests_id().front(), sink_operator->operator_id(),
"PartitionedHashJoinTestDep");
state->emplace_sink_local_state(sink_operator->operator_id(), std::move(local_state_uptr));
return local_state;
}
} // namespace doris