blob: 97a00b723276558f29265bf18a0d0e1e578a811b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "common/status.h"
#include "dummy_task_queue.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "testutil/mock/mock_runtime_state.h"
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
#include "testutil/mock/mock_workload_group_mgr.h"
#include "thrift_builder.h"
namespace doris::pipeline {
static void empty_function(RuntimeState*, Status*) {}
class PipelineTaskTest : public testing::Test {
public:
PipelineTaskTest() : _obj_pool(new ObjectPool()) {}
~PipelineTaskTest() override = default;
void SetUp() override {
_thread_mem_tracker_mgr = std::move(thread_context()->thread_mem_tracker_mgr);
thread_context()->thread_mem_tracker_mgr = std::make_unique<MockThreadMemTrackerMgr>();
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
}
void TearDown() override {
// Origin `thread_mem_tracker_mgr` must be restored otherwise `ThreadContextTest` will fail.
thread_context()->thread_mem_tracker_mgr = std::move(_thread_mem_tracker_mgr);
}
private:
void _build_fragment_context() {
int fragment_id = 0;
_context = std::make_shared<PipelineFragmentContext>(
_query_id, TPipelineFragmentParams(), _query_ctx, ExecEnv::GetInstance(),
empty_function,
std::bind<Status>(std::mem_fn(&FragmentMgr::trigger_pipeline_context_report),
ExecEnv::GetInstance()->fragment_mgr(), std::placeholders::_1,
std::placeholders::_2));
_runtime_state = std::make_unique<MockRuntimeState>(
_query_id, fragment_id, _query_options, _query_ctx->query_globals,
ExecEnv::GetInstance(), _query_ctx.get());
_runtime_state->set_task_execution_context(
std::static_pointer_cast<TaskExecutionContext>(_context));
}
std::shared_ptr<ObjectPool> _obj_pool;
std::shared_ptr<PipelineFragmentContext> _context;
std::unique_ptr<RuntimeState> _runtime_state;
std::shared_ptr<QueryContext> _query_ctx;
TUniqueId _query_id = TUniqueId();
std::unique_ptr<ThreadMemTrackerMgr> _thread_mem_tracker_mgr;
TQueryOptions _query_options;
std::unique_ptr<MockTaskScheduler> _task_scheduler;
const std::string LOCALHOST = BackendOptions::get_localhost();
const int DUMMY_PORT = config::brpc_port;
};
template class OperatorX<DummyOperatorLocalState>;
template class DataSinkOperatorX<DummySinkLocalState>;
TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
// shared state already exists
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
shared_state_map[3] = {std::make_shared<BasicSharedState>(), {}};
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
EXPECT_EQ(task->_sink_shared_state, nullptr);
// shared state not exists
shared_state_map.clear();
task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
EXPECT_NE(task->_sink_shared_state, nullptr);
EXPECT_EQ(task->_exec_state, PipelineTask::State::INITED);
}
TEST_F(PipelineTaskTest, TEST_PREPARE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
}
TEST_F(PipelineTaskTest, TEST_PREPARE_ERROR) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
_context.reset();
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_FALSE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::INITED);
}
}
TEST_F(PipelineTaskTest, TEST_OPEN) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
{
EXPECT_TRUE(task->_open().ok());
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
}
}
TEST_F(PipelineTaskTest, TEST_EXECUTE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
Dependency* read_dep;
Dependency* write_dep;
Dependency* source_finish_dep;
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
// `execute` should be called after `prepare`
bool done = false;
EXPECT_FALSE(task->execute(&done).ok());
}
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->dependencies()
.front();
write_dep = _runtime_state->get_sink_local_state()->dependencies().front();
}
{
// task is blocked by execution dependency.
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_opened);
EXPECT_FALSE(_query_ctx->get_execution_dependency()->ready());
EXPECT_FALSE(_query_ctx->get_execution_dependency()->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// task is blocked by filter dependency.
_query_ctx->get_execution_dependency()->set_ready();
task->_execution_dependencies.back()->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_opened);
EXPECT_FALSE(task->_execution_dependencies.back()->ready());
EXPECT_FALSE(task->_execution_dependencies.back()->_blocked_task.empty());
EXPECT_TRUE(task->_read_dependencies.empty());
EXPECT_TRUE(task->_write_dependencies.empty());
EXPECT_TRUE(task->_finish_dependencies.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// `open` phase. And then task is blocked by read dependency.
task->_execution_dependencies.back()->set_ready();
read_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
EXPECT_FALSE(read_dep->ready());
EXPECT_TRUE(write_dep->ready());
EXPECT_FALSE(read_dep->_blocked_task.empty());
source_finish_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->finishdependency();
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// `execute` phase. And then task is blocked by finish dependency.
read_dep->set_ready();
source_finish_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
task->_operators.front()->cast<DummyOperator>()._eos = true;
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(source_finish_dep->ready());
EXPECT_FALSE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// `execute` phase.
source_finish_dep->set_ready();
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
{
EXPECT_TRUE(task->close(Status::OK()).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
EXPECT_TRUE(task->finalize().ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
}
}
TEST_F(PipelineTaskTest, TEST_TERMINATE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
}
_query_ctx->get_execution_dependency()->set_ready();
{
std::atomic_bool terminated = false;
auto exec_func = [&]() {
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(terminated);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
};
auto terminate_func = [&]() {
// Sleep 0~5000ms randomly.
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 5000));
terminated = true;
task->set_wake_up_early();
task->terminate();
};
std::thread exec_thread(exec_func);
std::thread terminate_thread(terminate_func);
exec_thread.join();
terminate_thread.join();
}
}
TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
}
for (int i = 0; i < task->LEGAL_STATE_TRANSITION.size(); i++) {
auto target = (PipelineTask::State)i;
for (int j = 0; j < task->LEGAL_STATE_TRANSITION.size(); j++) {
task->_exec_state = (PipelineTask::State)j;
EXPECT_EQ(task->_state_transition(target).ok(),
task->LEGAL_STATE_TRANSITION[i].contains((PipelineTask::State)j));
}
}
}
TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
}
_query_ctx->get_execution_dependency()->set_ready();
{
auto& is_finished =
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>()._is_finished;
auto exec_func = [&]() {
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(is_finished);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
};
auto finish_func = [&]() {
// Sleep 0~5000ms randomly.
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 5000));
is_finished = true;
};
auto finish_check_func = [&]() {
while (!is_finished) {
task->stop_if_finished();
}
};
// Make sure `debug_string` will not be blocked.
auto debug_string_func = [&]() {
while (!is_finished) {
static_cast<void>(task->debug_string());
}
};
std::thread exec_thread(exec_func);
std::thread finish_thread(finish_func);
std::thread finish_check_thread(finish_check_func);
std::thread debug_string_thread(debug_string_func);
exec_thread.join();
finish_thread.join();
finish_check_thread.join();
debug_string_thread.join();
}
}
TEST_F(PipelineTaskTest, TEST_SINK_EOF) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
}
_query_ctx->get_execution_dependency()->set_ready();
{
task->_operators.front()->cast<DummyOperator>()._eos = true;
task->_sink->cast<DummySinkOperatorX>()._return_eof = true;
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._return_eof);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
}
TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
{
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.set_enable_reserve_memory(true)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
TWorkloadGroupInfo twg_info;
twg_info.__set_id(0);
twg_info.__set_name("_dummpy_workload_group");
twg_info.__set_version(0);
WorkloadGroupInfo workload_group_info = WorkloadGroupInfo::parse_topic_info(twg_info);
((MockRuntimeState*)_runtime_state.get())->_workload_group =
std::make_shared<WorkloadGroup>(workload_group_info);
((MockThreadMemTrackerMgr*)thread_context()->thread_mem_tracker_mgr.get())
->_test_low_memory = true;
}
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
Dependency* read_dep;
Dependency* write_dep;
Dependency* source_finish_dep;
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->dependencies()
.front();
write_dep = _runtime_state->get_sink_local_state()->dependencies().front();
}
{
_query_ctx->get_execution_dependency()->set_ready();
// Task is blocked by read dependency.
read_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
EXPECT_FALSE(read_dep->ready());
EXPECT_TRUE(write_dep->ready());
EXPECT_FALSE(read_dep->_blocked_task.empty());
source_finish_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->finishdependency();
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// set low memory mode and do not pause.
read_dep->set_ready();
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
// Not check low memory mode here, because we temporary not use this feature, the
// system buffer should be checked globally.
// EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
// EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
// EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
{
// set low memory mode and do not pause.
task->_operators.front()->cast<DummyOperator>()._eos = true;
_query_ctx->resource_ctx()->task_controller()->set_low_memory_mode(false);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
// EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
// EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
// EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_spilling);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
}
// Test for reserve memory fail for non-spillable task. It will not affect anything, the query
// will continue to run. And will disable reserve memory, so that the query will failed when allocated
// memory > limit.
TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
{
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.set_enable_reserve_memory(true)
.set_enable_spill(false)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
TWorkloadGroupInfo twg_info;
twg_info.__set_id(0);
twg_info.__set_name("_dummpy_workload_group");
twg_info.__set_version(0);
WorkloadGroupInfo workload_group_info = WorkloadGroupInfo::parse_topic_info(twg_info);
((MockRuntimeState*)_runtime_state.get())->_workload_group =
std::make_shared<WorkloadGroup>(workload_group_info);
((MockThreadMemTrackerMgr*)thread_context()->thread_mem_tracker_mgr.get())
->_test_low_memory = true;
ExecEnv::GetInstance()->_workload_group_manager = new MockWorkloadGroupMgr();
}
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
Dependency* read_dep;
Dependency* write_dep;
Dependency* source_finish_dep;
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
sink_op->_spillable = true;
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->dependencies()
.front();
write_dep = _runtime_state->get_sink_local_state()->dependencies().front();
}
{
_query_ctx->get_execution_dependency()->set_ready();
// Task is blocked by read dependency.
read_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
EXPECT_FALSE(read_dep->ready());
EXPECT_TRUE(write_dep->ready());
EXPECT_FALSE(read_dep->_blocked_task.empty());
source_finish_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->finishdependency();
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size =
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size =
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
}
{
// Reserve failed and but not enable spill disk, so that the query will continue to run.
read_dep->set_ready();
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(task->_spilling);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_FALSE(task->_eos);
// Not enable spill disk, so that task will not be paused.
EXPECT_FALSE(task->_spilling);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_FALSE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
{
// Reserve failed .
task->_operators.front()->cast<DummyOperator>()._disable_reserve_mem = true;
task->_spilling = false;
task->_operators.front()->cast<DummyOperator>()._eos = true;
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused = false;
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(task->_spilling);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
{
// Reserve failed and paused.
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused = false;
task->_sink->cast<DummySinkOperatorX>()._disable_reserve_mem = true;
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(task->_spilling);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
delete ExecEnv::GetInstance()->_workload_group_manager;
}
// Test reserve memory fail for spillable pipeline task
TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL_SPILLABLE) {
{
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.set_enable_reserve_memory(true)
.set_enable_spill(true)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
TWorkloadGroupInfo twg_info;
twg_info.__set_id(0);
twg_info.__set_name("_dummpy_workload_group");
twg_info.__set_version(0);
WorkloadGroupInfo workload_group_info = WorkloadGroupInfo::parse_topic_info(twg_info);
((MockRuntimeState*)_runtime_state.get())->_workload_group =
std::make_shared<WorkloadGroup>(workload_group_info);
((MockThreadMemTrackerMgr*)thread_context()->thread_mem_tracker_mgr.get())
->_test_low_memory = true;
ExecEnv::GetInstance()->_workload_group_manager = new MockWorkloadGroupMgr();
EXPECT_TRUE(_runtime_state->enable_spill());
}
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
Dependency* read_dep;
Dependency* write_dep;
Dependency* source_finish_dep;
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
sink_op->_spillable = true;
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->dependencies()
.front();
write_dep = _runtime_state->get_sink_local_state()->dependencies().front();
}
{
_query_ctx->get_execution_dependency()->set_ready();
// Task is blocked by read dependency.
read_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
EXPECT_FALSE(read_dep->ready());
EXPECT_TRUE(write_dep->ready());
EXPECT_FALSE(read_dep->_blocked_task.empty());
source_finish_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->finishdependency();
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size =
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size =
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
}
{
// Reserve failed and enable spill disk, so that the query be paused.
read_dep->set_ready();
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(task->_spilling);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_FALSE(task->_eos);
// Not enable spill disk, so that task will not be paused.
EXPECT_TRUE(task->_spilling);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_TRUE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
{
// Reserve failed and paused.
task->_operators.front()->cast<DummyOperator>()._disable_reserve_mem = true;
task->_spilling = false;
task->_operators.front()->cast<DummyOperator>()._eos = true;
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused = false;
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(task->_spilling);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_TRUE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
{
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused = false;
// Disable reserve memory, so that the get_reserve_mem_size == 0, so that reserve will always success
task->_sink->cast<DummySinkOperatorX>()._disable_reserve_mem = true;
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(task->_spilling);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
delete ExecEnv::GetInstance()->_workload_group_manager;
}
TEST_F(PipelineTaskTest, TEST_INJECT_SHARED_STATE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
{
// `_sink_shared_state` is created in constructor.
EXPECT_NE(task->_sink_shared_state, nullptr);
task->_sink_shared_state = nullptr;
}
{
std::shared_ptr<BasicSharedState> shared_state = nullptr;
EXPECT_FALSE(task->inject_shared_state(shared_state));
}
{
auto shared_state = BasicSharedState::create_shared();
shared_state->related_op_ids.insert(0);
EXPECT_TRUE(task->inject_shared_state(shared_state));
}
{
auto shared_state = BasicSharedState::create_shared();
shared_state->related_op_ids.insert(3);
EXPECT_TRUE(task->inject_shared_state(shared_state));
}
{
auto shared_state = BasicSharedState::create_shared();
shared_state->related_op_ids.insert(1);
EXPECT_FALSE(task->inject_shared_state(shared_state));
}
}
} // namespace doris::pipeline