blob: db61819c4dac1211ec6a196e445a7d24d5dd6d4a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "common/config.h"
#include "common/status.h"
#include "exec/operator/operator.h"
#include "exec/operator/spill_utils.h"
#include "exec/pipeline/dependency.h"
#include "exec/pipeline/dummy_task_queue.h"
#include "exec/pipeline/pipeline.h"
#include "exec/pipeline/pipeline_fragment_context.h"
#include "exec/pipeline/thrift_builder.h"
#include "exec/spill/spill_file.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "testutil/mock/mock_runtime_state.h"
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
#include "testutil/mock/mock_workload_group_mgr.h"
#include "util/debug_points.h"
namespace doris {
static void empty_function(RuntimeState*, Status*) {}
class PipelineTaskTest : public testing::Test {
public:
PipelineTaskTest() : _obj_pool(new ObjectPool()) {}
~PipelineTaskTest() override = default;
void SetUp() override {
_thread_mem_tracker_mgr = std::move(thread_context()->thread_mem_tracker_mgr);
thread_context()->thread_mem_tracker_mgr = std::make_unique<MockThreadMemTrackerMgr>();
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
}
void TearDown() override {
// Origin `thread_mem_tracker_mgr` must be restored otherwise `ThreadContextTest` will fail.
thread_context()->thread_mem_tracker_mgr = std::move(_thread_mem_tracker_mgr);
}
private:
void _build_fragment_context() {
int fragment_id = 0;
_context = std::make_shared<PipelineFragmentContext>(
_query_id, TPipelineFragmentParams(), _query_ctx, ExecEnv::GetInstance(),
empty_function,
std::bind<Status>(std::mem_fn(&FragmentMgr::trigger_pipeline_context_report),
ExecEnv::GetInstance()->fragment_mgr(), std::placeholders::_1,
std::placeholders::_2));
_runtime_state = std::make_unique<MockRuntimeState>(
_query_id, fragment_id, _query_options, _query_ctx->query_globals,
ExecEnv::GetInstance(), _query_ctx.get());
_runtime_state->set_task_execution_context(
std::static_pointer_cast<TaskExecutionContext>(_context));
}
std::shared_ptr<ObjectPool> _obj_pool;
std::shared_ptr<PipelineFragmentContext> _context;
std::unique_ptr<RuntimeState> _runtime_state;
std::shared_ptr<QueryContext> _query_ctx;
TUniqueId _query_id = TUniqueId();
std::unique_ptr<ThreadMemTrackerMgr> _thread_mem_tracker_mgr;
TQueryOptions _query_options;
std::unique_ptr<MockTaskScheduler> _task_scheduler;
const std::string LOCALHOST = BackendOptions::get_localhost();
const int DUMMY_PORT = config::brpc_port;
};
template class OperatorX<DummyOperatorLocalState>;
template class DataSinkOperatorX<DummySinkLocalState>;
TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
// shared state already exists
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
shared_state_map[3] = {std::make_shared<BasicSharedState>(), {}};
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
EXPECT_EQ(task->_sink_shared_state, nullptr);
// shared state not exists
shared_state_map.clear();
task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
EXPECT_NE(task->_sink_shared_state, nullptr);
EXPECT_EQ(task->_exec_state, PipelineTask::State::INITED);
}
TEST_F(PipelineTaskTest, TEST_PREPARE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
}
TEST_F(PipelineTaskTest, TEST_PREPARE_ERROR) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
_context.reset();
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_FALSE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::INITED);
}
}
TEST_F(PipelineTaskTest, TEST_OPEN) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
{
EXPECT_TRUE(task->_open().ok());
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
}
}
TEST_F(PipelineTaskTest, TEST_EXECUTE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
Dependency* read_dep;
Dependency* write_dep;
Dependency* source_finish_dep;
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
// `execute` should be called after `prepare`
bool done = false;
EXPECT_FALSE(task->execute(&done).ok());
}
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->dependencies()
.front();
write_dep = _runtime_state->get_sink_local_state()->dependencies().front();
}
{
// task is blocked by execution dependency.
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_opened);
EXPECT_FALSE(_query_ctx->get_execution_dependency()->ready());
EXPECT_FALSE(_query_ctx->get_execution_dependency()->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// task is blocked by filter dependency.
_query_ctx->get_execution_dependency()->set_ready();
task->_execution_dependencies.back()->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_opened);
EXPECT_FALSE(task->_execution_dependencies.back()->ready());
EXPECT_FALSE(task->_execution_dependencies.back()->_blocked_task.empty());
EXPECT_TRUE(task->_read_dependencies.empty());
EXPECT_TRUE(task->_write_dependencies.empty());
EXPECT_TRUE(task->_finish_dependencies.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// `open` phase. And then task is blocked by read dependency.
task->_execution_dependencies.back()->set_ready();
read_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
EXPECT_FALSE(read_dep->ready());
EXPECT_TRUE(write_dep->ready());
EXPECT_FALSE(read_dep->_blocked_task.empty());
source_finish_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->finishdependency();
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// `execute` phase. And then task is blocked by finish dependency.
read_dep->set_ready();
source_finish_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
task->_operators.front()->cast<DummyOperator>()._eos = true;
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(source_finish_dep->ready());
EXPECT_FALSE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// `execute` phase.
source_finish_dep->set_ready();
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
{
EXPECT_TRUE(task->close(Status::OK()).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
EXPECT_TRUE(task->finalize().ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
}
}
TEST_F(PipelineTaskTest, TEST_TERMINATE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
}
_query_ctx->get_execution_dependency()->set_ready();
{
std::atomic_bool terminated = false;
auto exec_func = [&]() {
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(terminated);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
};
auto terminate_func = [&]() {
// Sleep 0~5000ms randomly.
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 5000));
terminated = true;
task->set_wake_up_early();
task->terminate();
};
std::thread exec_thread(exec_func);
std::thread terminate_thread(terminate_func);
exec_thread.join();
terminate_thread.join();
}
}
TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
}
for (int i = 0; i < task->LEGAL_STATE_TRANSITION.size(); i++) {
auto target = (PipelineTask::State)i;
for (int j = 0; j < task->LEGAL_STATE_TRANSITION.size(); j++) {
task->_exec_state = (PipelineTask::State)j;
EXPECT_EQ(task->_state_transition(target).ok(),
task->LEGAL_STATE_TRANSITION[i].contains((PipelineTask::State)j));
}
}
}
TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
}
_query_ctx->get_execution_dependency()->set_ready();
{
auto& is_finished =
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>()._is_finished;
auto exec_func = [&]() {
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(is_finished);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
};
auto finish_func = [&]() {
// Sleep 0~5000ms randomly.
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 5000));
is_finished = true;
};
auto finish_check_func = [&]() {
while (!is_finished) {
task->stop_if_finished();
}
};
// Make sure `debug_string` will not be blocked.
auto debug_string_func = [&]() {
while (!is_finished) {
static_cast<void>(task->debug_string());
}
};
std::thread exec_thread(exec_func);
std::thread finish_thread(finish_func);
std::thread finish_check_thread(finish_check_func);
std::thread debug_string_thread(debug_string_func);
exec_thread.join();
finish_thread.join();
finish_check_thread.join();
debug_string_thread.join();
}
}
TEST_F(PipelineTaskTest, TEST_SINK_EOF) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
}
_query_ctx->get_execution_dependency()->set_ready();
{
task->_operators.front()->cast<DummyOperator>()._eos = true;
task->_sink->cast<DummySinkOperatorX>()._return_eof = true;
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._return_eof);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
}
TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
{
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.set_enable_reserve_memory(true)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
TWorkloadGroupInfo twg_info;
twg_info.__set_id(0);
twg_info.__set_name("_dummpy_workload_group");
twg_info.__set_version(0);
WorkloadGroupInfo workload_group_info = WorkloadGroupInfo::parse_topic_info(twg_info);
((MockRuntimeState*)_runtime_state.get())->_workload_group =
std::make_shared<WorkloadGroup>(workload_group_info);
((MockThreadMemTrackerMgr*)thread_context()->thread_mem_tracker_mgr.get())
->_test_low_memory = true;
}
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
Dependency* read_dep;
Dependency* write_dep;
Dependency* source_finish_dep;
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->dependencies()
.front();
write_dep = _runtime_state->get_sink_local_state()->dependencies().front();
}
{
_query_ctx->get_execution_dependency()->set_ready();
// Task is blocked by read dependency.
read_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
EXPECT_FALSE(read_dep->ready());
EXPECT_TRUE(write_dep->ready());
EXPECT_FALSE(read_dep->_blocked_task.empty());
source_finish_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->finishdependency();
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
// set low memory mode and do not pause.
read_dep->set_ready();
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
// Not check low memory mode here, because we temporary not use this feature, the
// system buffer should be checked globally.
// EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
// EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
// EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
{
// set low memory mode and do not pause.
task->_operators.front()->cast<DummyOperator>()._eos = true;
_query_ctx->resource_ctx()->task_controller()->set_low_memory_mode(false);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
// EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
// EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
// EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_spilling);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
}
}
// Test for reserve memory fail for non-spillable task. It will not affect anything, the query
// will continue to run. And will disable reserve memory, so that the query will failed when allocated
// memory > limit.
TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
{
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.set_enable_reserve_memory(true)
.set_enable_spill(false)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
TWorkloadGroupInfo twg_info;
twg_info.__set_id(0);
twg_info.__set_name("_dummpy_workload_group");
twg_info.__set_version(0);
WorkloadGroupInfo workload_group_info = WorkloadGroupInfo::parse_topic_info(twg_info);
((MockRuntimeState*)_runtime_state.get())->_workload_group =
std::make_shared<WorkloadGroup>(workload_group_info);
((MockThreadMemTrackerMgr*)thread_context()->thread_mem_tracker_mgr.get())
->_test_low_memory = true;
ExecEnv::GetInstance()->_workload_group_manager = new MockWorkloadGroupMgr();
}
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
Dependency* read_dep;
Dependency* write_dep;
Dependency* source_finish_dep;
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
sink_op->_spillable = true;
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->dependencies()
.front();
write_dep = _runtime_state->get_sink_local_state()->dependencies().front();
}
{
_query_ctx->get_execution_dependency()->set_ready();
// Task is blocked by read dependency.
read_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
EXPECT_FALSE(read_dep->ready());
EXPECT_TRUE(write_dep->ready());
EXPECT_FALSE(read_dep->_blocked_task.empty());
source_finish_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->finishdependency();
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size =
SpillFile::MIN_SPILL_WRITE_BATCH_MEM + 1;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size =
SpillFile::MIN_SPILL_WRITE_BATCH_MEM + 1;
}
{
// Reserve failed and but not enable spill disk, so that the query will continue to run.
read_dep->set_ready();
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(task->_spilling);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_FALSE(task->_eos);
// Not enable spill disk, so that task will not be paused.
EXPECT_FALSE(task->_spilling);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_FALSE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
{
// Reserve failed .
task->_operators.front()->cast<DummyOperator>()._disable_reserve_mem = true;
task->_spilling = false;
task->_operators.front()->cast<DummyOperator>()._eos = true;
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused = false;
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(task->_spilling);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
{
// Reserve failed and paused.
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused = false;
task->_sink->cast<DummySinkOperatorX>()._disable_reserve_mem = true;
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(task->_spilling);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
delete ExecEnv::GetInstance()->_workload_group_manager;
ExecEnv::GetInstance()->_workload_group_manager = nullptr;
}
// Test reserve memory fail for spillable pipeline task
TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL_SPILLABLE) {
{
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.set_enable_reserve_memory(true)
.set_enable_spill(true)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
TWorkloadGroupInfo twg_info;
twg_info.__set_id(0);
twg_info.__set_name("_dummpy_workload_group");
twg_info.__set_version(0);
WorkloadGroupInfo workload_group_info = WorkloadGroupInfo::parse_topic_info(twg_info);
((MockRuntimeState*)_runtime_state.get())->_workload_group =
std::make_shared<WorkloadGroup>(workload_group_info);
((MockThreadMemTrackerMgr*)thread_context()->thread_mem_tracker_mgr.get())
->_test_low_memory = true;
ExecEnv::GetInstance()->_workload_group_manager = new MockWorkloadGroupMgr();
EXPECT_TRUE(_runtime_state->enable_spill());
}
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
Dependency* read_dep;
Dependency* write_dep;
Dependency* source_finish_dep;
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
sink_op->_spillable = true;
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_GT(task->_execution_dependencies.size(), 1);
read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->dependencies()
.front();
write_dep = _runtime_state->get_sink_local_state()->dependencies().front();
}
{
_query_ctx->get_execution_dependency()->set_ready();
// Task is blocked by read dependency.
read_dep->block();
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
EXPECT_TRUE(task->_opened);
EXPECT_FALSE(read_dep->ready());
EXPECT_TRUE(write_dep->ready());
EXPECT_FALSE(read_dep->_blocked_task.empty());
source_finish_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
.value()
->finishdependency();
EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
}
{
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size =
SpillFile::MIN_SPILL_WRITE_BATCH_MEM + 1;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size =
SpillFile::MIN_SPILL_WRITE_BATCH_MEM + 1;
}
{
// Reserve failed and enable spill disk, so that the query be paused.
read_dep->set_ready();
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(task->_spilling);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_FALSE(task->_eos);
// Not enable spill disk, so that task will not be paused.
EXPECT_TRUE(task->_spilling);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_TRUE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
{
// Reserve failed and paused.
task->_operators.front()->cast<DummyOperator>()._disable_reserve_mem = true;
task->_spilling = false;
task->_operators.front()->cast<DummyOperator>()._eos = true;
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused = false;
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(task->_spilling);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_TRUE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
{
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused = false;
// Disable reserve memory, so that the get_reserve_mem_size == 0, so that reserve will always success
task->_sink->cast<DummySinkOperatorX>()._disable_reserve_mem = true;
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(task->_spilling);
EXPECT_TRUE(done);
EXPECT_FALSE(task->_wake_up_early);
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
EXPECT_TRUE(source_finish_dep->ready());
EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
EXPECT_FALSE(
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
}
delete ExecEnv::GetInstance()->_workload_group_manager;
ExecEnv::GetInstance()->_workload_group_manager = nullptr;
}
TEST_F(PipelineTaskTest, TEST_INJECT_SHARED_STATE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
{
// `_sink_shared_state` is created in constructor.
EXPECT_NE(task->_sink_shared_state, nullptr);
task->_sink_shared_state = nullptr;
}
{
std::shared_ptr<BasicSharedState> shared_state = nullptr;
EXPECT_FALSE(task->inject_shared_state(shared_state));
}
{
auto shared_state = BasicSharedState::create_shared();
shared_state->related_op_ids.insert(0);
EXPECT_TRUE(task->inject_shared_state(shared_state));
}
{
auto shared_state = BasicSharedState::create_shared();
shared_state->related_op_ids.insert(3);
EXPECT_TRUE(task->inject_shared_state(shared_state));
}
{
auto shared_state = BasicSharedState::create_shared();
shared_state->related_op_ids.insert(1);
EXPECT_FALSE(task->inject_shared_state(shared_state));
}
}
TEST_F(PipelineTaskTest, TEST_SHOULD_TRIGGER_REVOKING) {
{
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.set_enable_reserve_memory(true)
.set_enable_spill(true)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
}
TWorkloadGroupInfo twg_info;
twg_info.__set_id(0);
twg_info.__set_name("test_wg");
twg_info.__set_version(0);
auto wg = std::make_shared<WorkloadGroup>(WorkloadGroupInfo::parse_topic_info(twg_info));
const int64_t wg_mem_limit = 1000LL * 1024 * 1024; // 1 GB
wg->_memory_limit = wg_mem_limit;
wg->_memory_low_watermark = 50; // 50%
wg->_memory_high_watermark = 80; // 80%
wg->_total_mem_used = 0;
_query_ctx->_resource_ctx->set_workload_group(wg);
auto query_mem_tracker = _query_ctx->query_mem_tracker();
query_mem_tracker->set_limit(wg_mem_limit);
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(1, 2, 3));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
// reserve_size that passes the (reserve * parallelism > query_limit / 5) gate
const size_t reserve_size = wg_mem_limit / 4; // 250MB > threshold of 200MB
// Case 1: spill disabled -> false
{
((MockRuntimeState*)_runtime_state.get())->set_enable_spill(false);
EXPECT_FALSE(task->_should_trigger_revoking(reserve_size));
((MockRuntimeState*)_runtime_state.get())->set_enable_spill(true);
}
// Case 2: no workload group -> false
{
_query_ctx->_resource_ctx->set_workload_group(nullptr);
EXPECT_FALSE(task->_should_trigger_revoking(reserve_size));
_query_ctx->_resource_ctx->set_workload_group(wg);
}
// Case 3: effective query_limit = 0 (both tracker limit <= 0 and wg limit = 0) -> false
{
wg->_memory_limit = 0;
query_mem_tracker->set_limit(-1);
EXPECT_FALSE(task->_should_trigger_revoking(reserve_size));
wg->_memory_limit = wg_mem_limit;
query_mem_tracker->set_limit(wg_mem_limit);
}
// Case 4: reserve_size too small (reserve * parallelism <= query_limit / 5) -> false
{ EXPECT_FALSE(task->_should_trigger_revoking(wg_mem_limit / 5)); }
// Case 5: no memory pressure (neither query tracker nor wg watermark) -> false
{
// consumption + reserve = 100MB + 250MB = 350MB < 90% of 1GB (900MB); wg not at watermark
query_mem_tracker->consume(100LL * 1024 * 1024);
EXPECT_FALSE(task->_should_trigger_revoking(reserve_size));
query_mem_tracker->release(100LL * 1024 * 1024);
}
// Case 6: high memory pressure via query tracker, no revocable memory -> false
{
// consumption + reserve >= 90% of query_limit
const int64_t consumption = int64_t(0.9 * wg_mem_limit) - int64_t(reserve_size) + 1;
query_mem_tracker->consume(consumption);
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = 0;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = 0;
EXPECT_FALSE(task->_should_trigger_revoking(reserve_size));
query_mem_tracker->release(consumption);
}
// Case 7: high pressure via query tracker, sufficient revocable -> true
{
const int64_t consumption = int64_t(0.9 * wg_mem_limit) - int64_t(reserve_size) + 1;
query_mem_tracker->consume(consumption);
// total revocable >= 20% of query_limit = 200MB (100MB each from op and sink)
const size_t revocable = int64_t(0.2 * wg_mem_limit) / 2;
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = revocable;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = revocable;
EXPECT_TRUE(task->_should_trigger_revoking(reserve_size));
query_mem_tracker->release(consumption);
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = 0;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = 0;
}
// Case 8: high pressure via wg low watermark, sufficient revocable -> true
{
// wg total_mem_used > 50% of wg_limit -> low watermark triggered
wg->_total_mem_used = int64_t(0.51 * wg_mem_limit);
const size_t revocable = int64_t(0.2 * wg_mem_limit) / 2;
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = revocable;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = revocable;
EXPECT_TRUE(task->_should_trigger_revoking(reserve_size));
wg->_total_mem_used = 0;
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = 0;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = 0;
}
// Case 9: high pressure via wg high watermark, sufficient revocable -> true
{
// wg total_mem_used > 80% of wg_limit -> high watermark triggered
wg->_total_mem_used = int64_t(0.81 * wg_mem_limit);
const size_t revocable = int64_t(0.2 * wg_mem_limit) / 2;
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = revocable;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = revocable;
EXPECT_TRUE(task->_should_trigger_revoking(reserve_size));
wg->_total_mem_used = 0;
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = 0;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = 0;
}
// Case 10: query_limit capped to wg limit when tracker limit > wg limit -> no extra pressure
{
// effective limit = wg_mem_limit; reserve = wg_mem_limit/4 > threshold (wg_mem_limit/5)
// but no consumption added, so no pressure -> false
query_mem_tracker->set_limit(wg_mem_limit * 2);
wg->_memory_limit = wg_mem_limit;
EXPECT_FALSE(task->_should_trigger_revoking(reserve_size));
query_mem_tracker->set_limit(wg_mem_limit);
}
}
TEST_F(PipelineTaskTest, TEST_DO_REVOKE_MEMORY) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(1, 2, 3));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
_query_ctx->get_execution_dependency()->set_ready();
}
// Case 1: fragment context expired -> InternalError
{
task->_fragment_context = std::weak_ptr<PipelineFragmentContext>();
EXPECT_FALSE(task->do_revoke_memory(nullptr).ok());
// Restore the fragment context
task->_fragment_context = _context;
}
// Case 2: operators below MIN threshold, null spill_context -> no revoke_memory called
{
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = 0;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = 0;
EXPECT_TRUE(task->do_revoke_memory(nullptr).ok());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._revoke_called);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._revoke_called);
}
// Case 3: operator has sufficient revocable memory -> operator revoke_memory called
{
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size =
SpillFile::MIN_SPILL_WRITE_BATCH_MEM + 1;
task->_operators.front()->cast<DummyOperator>()._revoke_called = false;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = 0;
task->_sink->cast<DummySinkOperatorX>()._revoke_called = false;
EXPECT_TRUE(task->do_revoke_memory(nullptr).ok());
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._revoke_called);
EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._revoke_called);
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = 0;
task->_operators.front()->cast<DummyOperator>()._revoke_called = false;
}
// Case 4: sink has sufficient revocable memory -> sink revoke_memory called
{
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = 0;
task->_operators.front()->cast<DummyOperator>()._revoke_called = false;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size =
SpillFile::MIN_SPILL_WRITE_BATCH_MEM + 1;
task->_sink->cast<DummySinkOperatorX>()._revoke_called = false;
EXPECT_TRUE(task->do_revoke_memory(nullptr).ok());
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._revoke_called);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._revoke_called);
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = 0;
task->_sink->cast<DummySinkOperatorX>()._revoke_called = false;
}
// Case 5: non-null spill_context -> on_task_finished called, callback fires
{
bool callback_fired = false;
auto spill_ctx = std::make_shared<SpillContext>(
1, _query_id, [&callback_fired](SpillContext*) { callback_fired = true; });
EXPECT_TRUE(task->do_revoke_memory(spill_ctx).ok());
EXPECT_TRUE(callback_fired);
EXPECT_EQ(spill_ctx->running_tasks_count.load(), 0);
}
// Case 6: wake_up_early -> operators terminated, eos set, callback fires
{
task->_wake_up_early = true;
task->_eos = false;
task->_operators.front()->cast<DummyOperator>()._terminated = false;
task->_sink->cast<DummySinkOperatorX>()._terminated = false;
bool callback_fired = false;
auto spill_ctx = std::make_shared<SpillContext>(
1, _query_id, [&callback_fired](SpillContext*) { callback_fired = true; });
EXPECT_TRUE(task->do_revoke_memory(spill_ctx).ok());
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
EXPECT_TRUE(callback_fired);
task->_wake_up_early = false;
}
}
TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) {
// Case 1: task is finalized -> on_task_finished called immediately
{
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(1, 2, 3));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
auto rs = std::make_unique<MockRuntimeState>(_query_id, 0, _query_options,
_query_ctx->query_globals,
ExecEnv::GetInstance(), _query_ctx.get());
rs->set_task_execution_context(std::static_pointer_cast<TaskExecutionContext>(_context));
rs->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, rs.get(), _context, profile.get(),
shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
_query_ctx->get_execution_dependency()->set_ready();
}
task->_exec_state = PipelineTask::State::FINALIZED;
EXPECT_TRUE(task->is_finalized());
bool callback_fired = false;
auto spill_ctx = std::make_shared<SpillContext>(
1, _query_id, [&callback_fired](SpillContext*) { callback_fired = true; });
EXPECT_TRUE(task->revoke_memory(spill_ctx).ok());
EXPECT_TRUE(callback_fired);
EXPECT_EQ(spill_ctx->running_tasks_count.load(), 0);
}
// Case 2: _opened=true, revocable < MIN -> on_task_finished called immediately
{
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(1, 2, 3));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
auto rs = std::make_unique<MockRuntimeState>(_query_id, 0, _query_options,
_query_ctx->query_globals,
ExecEnv::GetInstance(), _query_ctx.get());
rs->set_task_execution_context(std::static_pointer_cast<TaskExecutionContext>(_context));
rs->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, rs.get(), _context, profile.get(),
shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
_query_ctx->get_execution_dependency()->set_ready();
}
task->_opened = true;
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size = 0;
task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size = 0;
bool callback_fired = false;
auto spill_ctx = std::make_shared<SpillContext>(
1, _query_id, [&callback_fired](SpillContext*) { callback_fired = true; });
EXPECT_TRUE(task->revoke_memory(spill_ctx).ok());
EXPECT_TRUE(callback_fired);
EXPECT_EQ(spill_ctx->running_tasks_count.load(), 0);
}
// Case 3: _opened=true, sufficient revocable -> RevokableTask submitted; run it to fire callback
{
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(1, 2, 3));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
auto rs = std::make_unique<MockRuntimeState>(_query_id, 0, _query_options,
_query_ctx->query_globals,
ExecEnv::GetInstance(), _query_ctx.get());
rs->set_task_execution_context(std::static_pointer_cast<TaskExecutionContext>(_context));
rs->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, rs.get(), _context, profile.get(),
shared_state_map, task_id);
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
_query_ctx->get_execution_dependency()->set_ready();
}
task->_opened = true;
task->_operators.front()->cast<DummyOperator>()._revocable_mem_size =
SpillFile::MIN_SPILL_WRITE_BATCH_MEM + 1;
bool callback_fired = false;
auto spill_ctx = std::make_shared<SpillContext>(
1, _query_id, [&callback_fired](SpillContext*) { callback_fired = true; });
EXPECT_TRUE(task->revoke_memory(spill_ctx).ok());
// RevokableTask submitted but not yet executed, callback not fired
EXPECT_FALSE(callback_fired);
EXPECT_EQ(spill_ctx->running_tasks_count.load(), 1);
// Take the submitted RevokableTask from the scheduler queue and run it
auto revokable_task =
((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0);
EXPECT_NE(revokable_task, nullptr);
bool done = false;
EXPECT_TRUE(revokable_task->execute(&done).ok());
// After execution, spill_context->on_task_finished() was called inside do_revoke_memory
EXPECT_TRUE(callback_fired);
EXPECT_EQ(spill_ctx->running_tasks_count.load(), 0);
}
}
// Test for the race condition fix between _wake_up_early and _is_pending_finish().
//
// The race: Pipeline::make_all_runnable() writes in order (A) set_wake_up_early -> (B) terminate()
// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A reads _wake_up_early=false
// (A), then Thread B writes A and B, then Thread A reads _is_pending_finish()=false (due to
// _always_ready from B), Thread A would set *done=true without calling operator terminate().
//
// The fix: terminate() is called after _is_pending_finish() in the Defer. So if Thread A sees B's
// effect (_always_ready=true), it must also see A's effect (_wake_up_early=true) on the subsequent
// read, ensuring terminate() is always called.
//
// This test uses a debug point injected into the else-if branch to simulate the exact bad timing:
// the debug point fires set_wake_up_early() + terminate() after _is_pending_finish() returns false
// (due to finish_dep being naturally unblocked) but before the second _wake_up_early check.
TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
}
_query_ctx->get_execution_dependency()->set_ready();
// Get the sink's finish dependency and block it to simulate a pending async operation
// (e.g. runtime filter size sync RPC in flight).
auto* sink_finish_dep =
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
EXPECT_NE(sink_finish_dep, nullptr);
sink_finish_dep->block();
// Drive the task to EOS so it will enter the Defer's pending-finish check.
task->_operators.front()->cast<DummyOperator>()._eos = true;
{
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
// EOS reached but still blocked on finish dependency: not done yet.
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
}
// Now unblock the finish dependency (simulates the async op completing) and activate the
// debug point. The debug point fires inside the else-if branch — after _is_pending_finish()
// returns false but before the second _wake_up_early read — and calls set_wake_up_early() +
// terminate(). This precisely reproduces the race where Thread B's writes land between
// Thread A's two reads of _wake_up_early.
sink_finish_dep->set_ready();
config::enable_debug_points = true;
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
{
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
// The key assertion: even though the task took the else-if path (not the
// if(_wake_up_early) path), operator terminate() must have been called because the
// second read of _wake_up_early correctly observed the value set by the debug point.
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
}
DebugPoints::instance()->clear();
config::enable_debug_points = false;
}
} // namespace doris