blob: 14a48e37a9a7a03bb10307bcf21d16622d5fe77f [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/scan/scanner_context.h"
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <gtest/gtest.h>
#include <list>
#include <memory>
#include <mutex>
#include <tuple>
#include "common/object_pool.h"
#include "core/block/block.h"
#include "exec/operator/olap_scan_operator.h"
#include "exec/pipeline/dependency.h"
#include "exec/scan/mock_simplified_scan_scheduler.h"
#include "exec/scan/olap_scanner.h"
#include "exec/scan/scan_node.h"
#include "exec/scan/scanner_scheduler.h"
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "testutil/mock/mock_runtime_state.h"
namespace doris {
class ScannerContextTest : public testing::Test {
public:
void SetUp() override {
obj_pool = std::make_unique<ObjectPool>();
// This ScanNode has two tuples.
// First one is input tuple, second one is output tuple.
tnode.row_tuples.push_back(TTupleId(0));
tnode.row_tuples.push_back(TTupleId(1));
tbl_desc.tableType = TTableType::OLAP_TABLE;
tuple_desc.id = 0;
tuple_descs.push_back(tuple_desc);
tuple_desc.id = 1;
tuple_descs.push_back(tuple_desc);
type_node.type = TTypeNodeType::SCALAR;
scalar_type.__set_type(TPrimitiveType::STRING);
type_node.__set_scalar_type(scalar_type);
slot_desc.slotType.types.push_back(type_node);
slot_desc.id = 0;
slot_desc.parent = 0;
slot_descs.push_back(slot_desc);
slot_desc.id = 1;
slot_desc.parent = 1;
slot_descs.push_back(slot_desc);
thrift_tbl.tableDescriptors.push_back(tbl_desc);
thrift_tbl.tupleDescriptors = tuple_descs;
thrift_tbl.slotDescriptors = slot_descs;
std::ignore = DescriptorTbl::create(obj_pool.get(), thrift_tbl, &descs);
auto task_exec_ctx = std::make_shared<TaskExecutionContext>();
state->set_task_execution_context(task_exec_ctx);
output_tuple_desc = descs->get_tuple_descriptor(0);
}
private:
class MockBlock : public Block {
MockBlock() = default;
MOCK_CONST_METHOD0(allocated_bytes, size_t());
MOCK_METHOD0(mem_reuse, bool());
MOCK_METHOD1(clear_column_data, void(int64_t));
};
class MockRuntimeStateLocal : public RuntimeState {
MockRuntimeStateLocal() = default;
MOCK_CONST_METHOD0(is_cancelled, bool());
MOCK_CONST_METHOD0(cancel_reason, Status());
};
std::unique_ptr<ObjectPool> obj_pool;
TPlanNode tnode;
TTableDescriptor tbl_desc;
std::vector<TTupleDescriptor> tuple_descs;
TTupleDescriptor tuple_desc;
std::vector<TSlotDescriptor> slot_descs;
TSlotDescriptor slot_desc;
TTypeNode type_node;
TScalarType scalar_type;
TDescriptorTable thrift_tbl;
DescriptorTbl* descs = nullptr;
std::unique_ptr<RuntimeState> state = std::make_unique<MockRuntimeState>();
std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("TestProfile");
std::unique_ptr<RuntimeProfile::Counter> max_concurrency_counter =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
std::unique_ptr<RuntimeProfile::Counter> min_concurrency_counter =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
std::unique_ptr<RuntimeProfile::Counter> newly_create_free_blocks_num =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
std::unique_ptr<RuntimeProfile::Counter> scanner_memory_used_counter =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
TupleDescriptor* output_tuple_desc = nullptr;
RowDescriptor* output_row_descriptor = nullptr;
std::shared_ptr<Dependency> scan_dependency =
Dependency::create_shared(0, 0, "TestScanDependency");
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_shared<CgroupV2CpuCtl>(1);
std::unique_ptr<ScannerScheduler> scan_scheduler =
std::make_unique<ThreadPoolSimplifiedScanScheduler>("ForTest", cgroup_cpu_ctl);
std::atomic<int64_t> shared_limit {-1};
};
TEST_F(ScannerContextTest, test_init) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
scan_operator->_should_run_serial = false;
olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get();
olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get();
olap_scan_local_state->_parent = scan_operator.get();
// User specified max_scanners_concurrency is less than _max_scan_concurrency that we calculated
TQueryOptions query_options;
query_options.__set_max_scanners_concurrency(2);
query_options.__set_max_column_reader_num(0);
state->set_query_options(query_options);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
// max_scan_concurrency that we calculate will be 10 / 1 = 10;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
Status st = scanner_context->init();
ASSERT_TRUE(st.ok());
// actual max_scan_concurrency will be 2 since user specified max_scanners_concurrency is 2.
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
query_options.__set_max_scanners_concurrency(0);
state->set_query_options(query_options);
st = scanner_context->init();
ASSERT_TRUE(st.ok());
}
TEST_F(ScannerContextTest, test_serial_run) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
scan_operator->_should_run_serial = true;
olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get();
olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get();
olap_scan_local_state->_parent = scan_operator.get();
TQueryOptions query_options;
query_options.__set_max_scanners_concurrency(2);
query_options.__set_max_column_reader_num(0);
state->set_query_options(query_options);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
Status st = scanner_context->init();
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
query_options.__set_max_scanners_concurrency(0);
state->set_query_options(query_options);
st = scanner_context->init();
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
}
TEST_F(ScannerContextTest, test_max_column_reader_num) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 20; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
scan_operator->_should_run_serial = false;
olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get();
olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get();
olap_scan_local_state->_parent = scan_operator.get();
TQueryOptions query_options;
query_options.__set_max_scanners_concurrency(20);
query_options.__set_max_column_reader_num(1);
state->set_query_options(query_options);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
Status st = scanner_context->init();
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
}
TEST_F(ScannerContextTest, test_push_back_scan_task) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
scanner_context->_in_flight_tasks_num = 11;
for (int i = 0; i < 5; ++i) {
auto scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
scanner_context->push_back_scan_task(scan_task);
ASSERT_EQ(scanner_context->_in_flight_tasks_num, 10 - i);
}
}
TEST_F(ScannerContextTest, get_margin) {
const int parallel_tasks = 4;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
std::shared_mutex scheduler_mutex;
std::unique_lock<std::shared_mutex> scheduler_lock(scheduler_mutex);
scanner_context->_scanner_scheduler = scan_scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
// _task_queue.size is 0.
// _num_schedule_scanners is 0.
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_shared<CgroupV2CpuCtl>(1);
// Has not submit any scan tasks.
// ScanScheduler is empty too.
// So margin shuold be equal to _min_scan_concurrency_of_scan_scheduler / parallel_tasks.
// We can make full utilization of the resource.
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(0));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(0));
scanner_context->_scanner_scheduler = scheduler.get();
int32_t margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, scanner_context->_min_scan_concurrency_of_scan_scheduler);
// ScanSchedule has 5 active threads and 10 tasks in queue.
// So remaing margin(3) is less than parallel_tasks(4).
scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(5));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 18;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
// 18 - (5 + 10) = 3
ASSERT_EQ(margin, 3);
// ScanSchedule has 10 active threads and 2 tasks in queue.
// Remaing margin(8) is greater than parallel_tasks(4).
// So margin should be equal to margin(8)/parallel_tasks(4) == 2.
scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(10));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(2));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, (scanner_context->_min_scan_concurrency_of_scan_scheduler - 12));
// ScanSchedule is busy.
// Just submit _min_scan_concurrency tasks.
scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(50));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
scanner_context->_in_flight_tasks_num = 0;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, scanner_context->_min_scan_concurrency);
// ScanSchedule is busy.
// _min_scan_concurrency is already satisfied.
scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(50));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
scanner_context->_in_flight_tasks_num = 20;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, 0);
}
TEST_F(ScannerContextTest, pull_next_scan_task) {
const int parallel_tasks = 4;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
std::shared_mutex scheduler_mutex;
std::unique_lock<std::shared_mutex> scheduler_lock(scheduler_mutex);
scanner_context->_scanner_scheduler = scan_scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_shared<CgroupV2CpuCtl>(1);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
scanner_context->_scanner_scheduler = scan_scheduler.get();
scanner_context->_max_scan_concurrency = 1;
std::shared_ptr<ScanTask> pull_scan_task =
scanner_context->_pull_next_scan_task(nullptr, scanner_context->_max_scan_concurrency);
ASSERT_EQ(pull_scan_task, nullptr);
auto scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
pull_scan_task = scanner_context->_pull_next_scan_task(scan_task,
scanner_context->_max_scan_concurrency);
ASSERT_EQ(pull_scan_task, nullptr);
scanner_context->_max_scan_concurrency = 2;
BlockUPtr cached_block = Block::create_unique();
scan_task->cached_block = std::move(cached_block);
EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task(
scan_task, scanner_context->_max_scan_concurrency - 1));
scan_task->cached_block.reset();
scan_task->_state = ScanTask::State::IN_FLIGHT;
scan_task->set_state(ScanTask::State::EOS);
EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task(
scan_task, scanner_context->_max_scan_concurrency - 1));
scan_task->cached_block.reset();
scan_task->_state = ScanTask::State::IN_FLIGHT;
pull_scan_task = scanner_context->_pull_next_scan_task(
scan_task, scanner_context->_max_scan_concurrency - 1);
EXPECT_EQ(pull_scan_task.get(), scan_task.get());
scanner_context->_pending_tasks = std::stack<std::shared_ptr<ScanTask>>();
pull_scan_task = scanner_context->_pull_next_scan_task(
nullptr, scanner_context->_max_scan_concurrency - 1);
EXPECT_EQ(pull_scan_task, nullptr);
scanner_context->_pending_tasks.push(
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner)));
pull_scan_task = scanner_context->_pull_next_scan_task(
nullptr, scanner_context->_max_scan_concurrency - 1);
EXPECT_NE(pull_scan_task, nullptr);
}
TEST_F(ScannerContextTest, schedule_scan_task) {
const int parallel_tasks = 4;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 15; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
std::shared_mutex scheduler_mutex;
std::unique_lock<std::shared_mutex> scheduler_lock(scheduler_mutex);
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_shared<CgroupV2CpuCtl>(1);
// Scan resource is enough.
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillRepeatedly(testing::Return(0));
EXPECT_CALL(*scheduler, get_queue_size()).WillRepeatedly(testing::Return(0));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 1;
scanner_context->_max_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
Status st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_in_flight_tasks_num, 1);
scanner_context->_max_scan_concurrency = 10;
scanner_context->_max_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_in_flight_tasks_num, scanner_context->_max_scan_concurrency);
scanner_context = ScannerContext::create_shared(state.get(), olap_scan_local_state.get(),
output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit,
nullptr, nullptr, 0, false, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 100;
scanner_context->_min_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
int margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, scanner_context->_min_scan_concurrency_of_scan_scheduler);
st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
ASSERT_TRUE(st.ok());
// 15 since we have 15 scanners.
ASSERT_EQ(scanner_context->_in_flight_tasks_num, 15);
scanners = std::list<std::shared_ptr<ScannerDelegate>>();
for (int i = 0; i < 1; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
scanner_context = ScannerContext::create_shared(state.get(), olap_scan_local_state.get(),
output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit,
nullptr, nullptr, 0, false, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 1;
scanner_context->_min_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
auto scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
st = scanner_context->schedule_scan_task(scan_task, transfer_lock, scheduler_lock);
// current scan task is added back.
ASSERT_EQ(scanner_context->_pending_tasks.size(), 1);
ASSERT_EQ(scanner_context->_in_flight_tasks_num, 1);
scanner_context = ScannerContext::create_shared(state.get(), olap_scan_local_state.get(),
output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit,
nullptr, nullptr, 0, false, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 1;
scanner_context->_min_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
scan_task->cached_block = Block::create_unique();
// Illigeal situation.
// If current scan task has cached block, it should not be called with this methods.
EXPECT_ANY_THROW(std::ignore = scanner_context->schedule_scan_task(scan_task, transfer_lock,
scheduler_lock));
}
TEST_F(ScannerContextTest, scan_queue_mem_limit) {
state->_query_options.__set_scan_queue_mem_limit(100);
ASSERT_EQ(state->scan_queue_mem_limit(), 100);
state->_query_options.__isset.scan_queue_mem_limit = false;
state->_query_options.__set_mem_limit(200);
ASSERT_EQ(state->scan_queue_mem_limit(), 200 / 20);
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get();
olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get();
olap_scan_local_state->_parent = scan_operator.get();
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
// max_scan_concurrency that we calculate will be 10 / 1 = 10;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
std::ignore = scanner_context->init();
ASSERT_EQ(scanner_context->_max_bytes_in_queue, (1024 * 1024 * 10) * (1 / 300 + 1));
}
TEST_F(ScannerContextTest, get_free_block) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get();
scanner_context->_newly_create_free_blocks_num->set(0L);
scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get();
scanner_context->_scanner_memory_used_counter->set(0L);
BlockUPtr block = scanner_context->get_free_block(/*force=*/true);
ASSERT_NE(block, nullptr);
ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 1);
scanner_context->_max_bytes_in_queue = 200;
// no free block
// force is false, _block_memory_usage < _max_bytes_in_queue
block = scanner_context->get_free_block(/*force=*/false);
ASSERT_NE(block, nullptr);
ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 2);
std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100));
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true));
scanner_context->_free_blocks.enqueue(std::move(return_block));
// get free block from queue
block = scanner_context->get_free_block(/*force=*/false);
ASSERT_NE(block, nullptr);
ASSERT_EQ(scanner_context->_block_memory_usage, -100);
ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), -100);
}
TEST_F(ScannerContextTest, return_free_block) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
scanner_context->_block_memory_usage = 0;
std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100));
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true));
EXPECT_CALL(*return_block, clear_column_data(testing::_)).WillRepeatedly(testing::Return());
scanner_context->return_free_block(std::move(return_block));
ASSERT_EQ(scanner_context->_block_memory_usage, 100);
ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), 100);
// free_block queue is stabilized, so size_approx is accurate.
ASSERT_EQ(scanner_context->_free_blocks.size_approx(), 1);
}
TEST_F(ScannerContextTest, get_block_from_queue) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr, 0, false,
parallel_tasks);
scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
scanner_context->_block_memory_usage = 0;
std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100));
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true));
EXPECT_CALL(*return_block, clear_column_data(testing::_)).WillRepeatedly(testing::Return());
std::unique_ptr<MockRuntimeStateLocal> mock_runtime_state =
std::make_unique<MockRuntimeStateLocal>();
EXPECT_CALL(*mock_runtime_state, is_cancelled()).WillOnce(testing::Return(true));
EXPECT_CALL(*mock_runtime_state, cancel_reason())
.WillOnce(testing::Return(Status::Cancelled("TestCancelMsg")));
bool eos = false;
Status st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(),
&eos, 0);
EXPECT_TRUE(!st.ok());
EXPECT_EQ(st.msg(), "TestCancelMsg");
EXPECT_CALL(*mock_runtime_state, is_cancelled()).WillRepeatedly(testing::Return(false));
scanner_context->_process_status = Status::InternalError("TestCancel");
st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(), &eos,
0);
EXPECT_TRUE(!st.ok());
EXPECT_TRUE(st.msg() == "TestCancel");
scanner_context->_process_status = Status::OK();
scanner_context->_is_finished = false;
scanner_context->_should_stop = false;
auto scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
scan_task->_state = ScanTask::State::IN_FLIGHT;
scan_task->set_state(ScanTask::State::EOS);
scanner_context->_completed_tasks.push_back(scan_task);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillOnce(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_num_finished_scanners = 0;
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(false));
st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(), &eos,
0);
EXPECT_TRUE(st.ok());
EXPECT_EQ(scanner_context->_num_finished_scanners, 1);
}
/**
MemShareArbitrator Tests (5 tests)
- scanner_mem_share_arbitrator_basic: Tests initialization, query_id, memory limits, and initial state
- scanner_mem_share_arbitrator_register_scan_node: Tests registering scan nodes and default memory allocation (64MB)
- scanner_mem_share_arbitrator_update_mem_bytes: Tests updating memory bytes and handling zero values
- scanner_mem_share_arbitrator_proportional_sharing: Tests proportional memory distribution across multiple contexts
- scanner_mem_share_arbitrator_zero_ratio: Tests edge case with zero scan ratio
MemLimiter Tests (9 tests)
- scanner_mem_limiter_basic: Tests initialization and default values
- scanner_mem_limiter_reestimated_block_mem_bytes: Tests averaging algorithm for block memory estimation
- scanner_mem_limiter_reestimated_zero_value: Tests that zero values are properly ignored
- scanner_mem_limiter_available_scanner_count: Tests scanner count calculation based on memory limits
- scanner_mem_limiter_serial_scan: Tests serial scan mode behavior
- scanner_mem_limiter_update_running_tasks_count: Tests atomic counter updates
- scanner_mem_limiter_update_open_tasks_count: Tests context count tracking
- scanner_mem_limiter_update_arb_mem_bytes: Tests memory capping at query limit
- scanner_mem_limiter_available_count_distribution: Tests fair distribution across parallel instances
ScannerContext with Memory Control Tests (4 tests)
- scanner_context_with_adaptive_memory: Tests integration with arbitrator and limiter
- scanner_context_adjust_scan_mem_limit: Tests dynamic memory limit adjustment
- scanner_context_reestimated_block_mem_bytes: Tests block memory re-estimation propagation
- scanner_context_update_peak_running_scanner: Tests peak scanner tracking with memory control
Total: 18 new test cases
All tests follow the existing patterns in the codebase and cover:
- Normal operation scenarios
- Edge cases (zero values, limits, etc.)
- Integration between components
- Atomic operations and thread safety
- Memory distribution algorithms
*/
// ==================== MemShareArbitrator Tests ====================
TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_basic) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t query_mem_limit = 1024 * 1024 * 1024;
double max_scan_ratio = 0.3;
auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio);
ASSERT_EQ(arbitrator->query_id.hi, 1);
ASSERT_EQ(arbitrator->query_id.lo, 2);
ASSERT_EQ(arbitrator->query_mem_limit, query_mem_limit);
ASSERT_EQ(arbitrator->mem_limit, static_cast<int64_t>(query_mem_limit * max_scan_ratio));
ASSERT_EQ(arbitrator->total_mem_bytes.load(), 0);
}
TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_register_scan_node) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t query_mem_limit = 1024 * 1024 * 1024;
double max_scan_ratio = 0.3;
auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio);
arbitrator->register_scan_node();
ASSERT_EQ(arbitrator->total_mem_bytes.load(), 64 * 1024 * 1024);
arbitrator->register_scan_node();
ASSERT_EQ(arbitrator->total_mem_bytes.load(), 128 * 1024 * 1024);
}
TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_update_mem_bytes) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t query_mem_limit = 1024 * 1024 * 1024;
double max_scan_ratio = 0.3;
auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio);
int64_t new_limit = arbitrator->update_mem_bytes(0, 100 * 1024 * 1024);
ASSERT_EQ(arbitrator->total_mem_bytes.load(), 100 * 1024 * 1024);
ASSERT_GT(new_limit, 0);
new_limit = arbitrator->update_mem_bytes(100 * 1024 * 1024, 0);
ASSERT_EQ(new_limit, 0);
ASSERT_EQ(arbitrator->total_mem_bytes.load(), 0);
}
TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_proportional_sharing) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t query_mem_limit = 1024 * 1024 * 1024;
double max_scan_ratio = 0.5;
auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio);
int64_t limit1 = arbitrator->update_mem_bytes(0, 200 * 1024 * 1024);
int64_t limit2 = arbitrator->update_mem_bytes(0, 300 * 1024 * 1024);
ASSERT_LT(limit2, limit1);
ASSERT_EQ(arbitrator->total_mem_bytes.load(), 500 * 1024 * 1024);
}
TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_zero_ratio) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t query_mem_limit = 1024 * 1024 * 1024;
double max_scan_ratio = 0.0;
auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, max_scan_ratio);
ASSERT_GE(arbitrator->mem_limit, 1);
}
// ==================== MemLimiter Tests ====================
TEST_F(ScannerContextTest, scanner_mem_limiter_basic) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t parallelism = 4;
bool serial_scan = false;
int64_t mem_limit = 512 * 1024 * 1024;
auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit);
ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 0);
ASSERT_EQ(limiter->get_arb_scanner_mem_bytes(), 0);
}
TEST_F(ScannerContextTest, scanner_mem_limiter_reestimated_block_mem_bytes) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t parallelism = 4;
bool serial_scan = false;
int64_t mem_limit = 512 * 1024 * 1024;
auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit);
limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 100 * 1024 * 1024);
limiter->reestimated_block_mem_bytes(200 * 1024 * 1024);
ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 150 * 1024 * 1024);
limiter->reestimated_block_mem_bytes(300 * 1024 * 1024);
ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 200 * 1024 * 1024);
}
TEST_F(ScannerContextTest, scanner_mem_limiter_reestimated_zero_value) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t parallelism = 4;
bool serial_scan = false;
int64_t mem_limit = 512 * 1024 * 1024;
auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit);
limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 100 * 1024 * 1024);
limiter->reestimated_block_mem_bytes(0);
ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 100 * 1024 * 1024);
}
TEST_F(ScannerContextTest, scanner_mem_limiter_available_scanner_count) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t parallelism = 4;
bool serial_scan = false;
int64_t mem_limit = 512 * 1024 * 1024;
auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit);
limiter->update_mem_limit(400 * 1024 * 1024);
limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
int count = limiter->available_scanner_count(0);
ASSERT_GE(count, 1);
}
TEST_F(ScannerContextTest, scanner_mem_limiter_serial_scan) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t parallelism = 4;
bool serial_scan = true;
int64_t mem_limit = 512 * 1024 * 1024;
auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit);
limiter->update_mem_limit(400 * 1024 * 1024);
limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
int count = limiter->available_scanner_count(0);
ASSERT_GE(count, 1);
}
TEST_F(ScannerContextTest, scanner_mem_limiter_update_running_tasks_count) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t parallelism = 4;
bool serial_scan = false;
int64_t mem_limit = 512 * 1024 * 1024;
auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit);
ASSERT_EQ(limiter->update_running_tasks_count(5), 5);
ASSERT_EQ(limiter->update_running_tasks_count(-2), 3);
ASSERT_EQ(limiter->update_running_tasks_count(1), 4);
}
TEST_F(ScannerContextTest, scanner_mem_limiter_update_open_tasks_count) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t parallelism = 4;
bool serial_scan = false;
int64_t mem_limit = 512 * 1024 * 1024;
auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit);
ASSERT_EQ(limiter->update_open_tasks_count(1), 0);
ASSERT_EQ(limiter->update_open_tasks_count(1), 1);
ASSERT_EQ(limiter->update_open_tasks_count(-1), 2);
ASSERT_EQ(limiter->update_open_tasks_count(-1), 1);
}
TEST_F(ScannerContextTest, scanner_mem_limiter_update_arb_mem_bytes) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t parallelism = 4;
bool serial_scan = false;
int64_t mem_limit = 512 * 1024 * 1024;
auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit);
limiter->update_arb_mem_bytes(100 * 1024 * 1024);
ASSERT_EQ(limiter->get_arb_scanner_mem_bytes(), 100 * 1024 * 1024);
limiter->update_arb_mem_bytes(1024 * 1024 * 1024);
ASSERT_EQ(limiter->get_arb_scanner_mem_bytes(), mem_limit);
}
TEST_F(ScannerContextTest, scanner_mem_limiter_available_count_distribution) {
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 2;
int64_t parallelism = 3;
bool serial_scan = false;
int64_t mem_limit = 512 * 1024 * 1024;
auto limiter = MemLimiter::create_shared(query_id, parallelism, serial_scan, mem_limit);
limiter->update_mem_limit(500 * 1024 * 1024);
limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
int count0 = limiter->available_scanner_count(0);
int count1 = limiter->available_scanner_count(1);
int count2 = limiter->available_scanner_count(2);
ASSERT_GE(count0, 1);
ASSERT_GE(count1, 1);
ASSERT_GE(count2, 1);
}
// ==================== ScannerContext with Memory Control Tests ====================
TEST_F(ScannerContextTest, scanner_context_with_adaptive_memory) {
const int parallel_tasks = 2;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>();
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 5; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
TUniqueId query_id = state->get_query_ctx()->query_id();
int64_t query_mem_limit = 1024 * 1024 * 1024;
auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, 0.3);
auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false,
static_cast<int64_t>(query_mem_limit * 0.3));
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, arbitrator, limiter, 0, true,
parallel_tasks);
limiter->update_open_tasks_count(1);
ASSERT_TRUE(scanner_context->_enable_adaptive_scanners);
ASSERT_NE(scanner_context->_mem_share_arb, nullptr);
ASSERT_NE(scanner_context->_scanner_mem_limiter, nullptr);
}
TEST_F(ScannerContextTest, scanner_context_adjust_scan_mem_limit) {
const int parallel_tasks = 2;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>();
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 5; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
TUniqueId query_id = state->get_query_ctx()->query_id();
int64_t query_mem_limit = 1024 * 1024 * 1024;
auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, 0.3);
auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false,
static_cast<int64_t>(query_mem_limit * 0.3));
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, arbitrator, limiter, 0, true,
parallel_tasks);
int64_t old_mem = 100 * 1024 * 1024;
int64_t new_mem = 200 * 1024 * 1024;
scanner_context->_adjust_scan_mem_limit(old_mem, new_mem);
limiter->update_open_tasks_count(1);
ASSERT_GT(arbitrator->total_mem_bytes.load(), 0);
}
TEST_F(ScannerContextTest, scanner_context_reestimated_block_mem_bytes) {
const int parallel_tasks = 2;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>();
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 5; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
TUniqueId query_id = state->get_query_ctx()->query_id();
int64_t query_mem_limit = 1024 * 1024 * 1024;
auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, 0.3);
auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false,
static_cast<int64_t>(query_mem_limit * 0.3));
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, arbitrator, limiter, 0, true,
parallel_tasks);
scanner_context->reestimated_block_mem_bytes(150 * 1024 * 1024);
ASSERT_GT(limiter->get_estimated_block_mem_bytes(), 0);
limiter->update_open_tasks_count(1);
}
TEST_F(ScannerContextTest, scanner_context_update_peak_running_scanner) {
const int parallel_tasks = 2;
auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(), tnode, 0, *descs,
parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
OlapScanLocalState::create_unique(state.get(), scan_operator.get());
olap_scan_local_state->_parent = scan_operator.get();
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>();
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 5; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
TUniqueId query_id = state->get_query_ctx()->query_id();
int64_t query_mem_limit = 1024 * 1024 * 1024;
auto arbitrator = MemShareArbitrator::create_shared(query_id, query_mem_limit, 0.3);
auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false,
static_cast<int64_t>(query_mem_limit * 0.3));
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, &shared_limit, arbitrator, limiter, 0, true,
parallel_tasks);
scanner_context->update_peak_running_scanner(3);
ASSERT_EQ(limiter->update_running_tasks_count(0), 3);
limiter->update_open_tasks_count(1);
}
} // namespace doris