blob: 09addeb9dcedcd5dac4072ab2b76603fe60b0a51 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/scan/scanner_context.h"
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <gtest/gtest.h>
#include <list>
#include <memory>
#include <mutex>
#include <tuple>
#include "common/object_pool.h"
#include "mock_simplified_scan_scheduler.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "testutil/mock/mock_runtime_state.h"
#include "vec/core/block.h"
#include "vec/exec/scan/olap_scanner.h"
#include "vec/exec/scan/scan_node.h"
#include "vec/exec/scan/scanner_scheduler.h"
namespace doris::vectorized {
class ScannerContextTest : public testing::Test {
public:
void SetUp() override {
obj_pool = std::make_unique<ObjectPool>();
// This ScanNode has two tuples.
// First one is input tuple, second one is output tuple.
tnode.row_tuples.push_back(TTupleId(0));
tnode.row_tuples.push_back(TTupleId(1));
tbl_desc.tableType = TTableType::OLAP_TABLE;
tuple_desc.id = 0;
tuple_descs.push_back(tuple_desc);
tuple_desc.id = 1;
tuple_descs.push_back(tuple_desc);
type_node.type = TTypeNodeType::SCALAR;
scalar_type.__set_type(TPrimitiveType::STRING);
type_node.__set_scalar_type(scalar_type);
slot_desc.slotType.types.push_back(type_node);
slot_desc.id = 0;
slot_desc.parent = 0;
slot_descs.push_back(slot_desc);
slot_desc.id = 1;
slot_desc.parent = 1;
slot_descs.push_back(slot_desc);
thrift_tbl.tableDescriptors.push_back(tbl_desc);
thrift_tbl.tupleDescriptors = tuple_descs;
thrift_tbl.slotDescriptors = slot_descs;
std::ignore = DescriptorTbl::create(obj_pool.get(), thrift_tbl, &descs);
auto task_exec_ctx = std::make_shared<TaskExecutionContext>();
state->set_task_execution_context(task_exec_ctx);
output_tuple_desc = descs->get_tuple_descriptor(0);
}
private:
class MockBlock : public Block {
MockBlock() = default;
MOCK_CONST_METHOD0(allocated_bytes, size_t());
MOCK_METHOD0(mem_reuse, bool());
MOCK_METHOD1(clear_column_data, void(int64_t));
};
class MockRuntimeStateLocal : public RuntimeState {
MockRuntimeStateLocal() = default;
MOCK_CONST_METHOD0(is_cancelled, bool());
MOCK_CONST_METHOD0(cancel_reason, Status());
};
std::unique_ptr<ObjectPool> obj_pool;
TPlanNode tnode;
TTableDescriptor tbl_desc;
std::vector<TTupleDescriptor> tuple_descs;
TTupleDescriptor tuple_desc;
std::vector<TSlotDescriptor> slot_descs;
TSlotDescriptor slot_desc;
TTypeNode type_node;
TScalarType scalar_type;
TDescriptorTable thrift_tbl;
DescriptorTbl* descs = nullptr;
std::unique_ptr<RuntimeState> state = std::make_unique<MockRuntimeState>();
std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("TestProfile");
std::unique_ptr<RuntimeProfile::Counter> max_concurrency_counter =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
std::unique_ptr<RuntimeProfile::Counter> min_concurrency_counter =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
std::unique_ptr<RuntimeProfile::Counter> newly_create_free_blocks_num =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
std::unique_ptr<RuntimeProfile::Counter> scanner_memory_used_counter =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
TupleDescriptor* output_tuple_desc = nullptr;
RowDescriptor* output_row_descriptor = nullptr;
std::shared_ptr<pipeline::Dependency> scan_dependency =
pipeline::Dependency::create_shared(0, 0, "TestScanDependency");
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_shared<CgroupV2CpuCtl>(1);
std::unique_ptr<ScannerScheduler> scan_scheduler =
std::make_unique<ThreadPoolSimplifiedScanScheduler>("ForTest", cgroup_cpu_ctl);
};
TEST_F(ScannerContextTest, test_init) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scan_operator->_should_run_serial = false;
olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get();
olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get();
olap_scan_local_state->_parent = scan_operator.get();
// User specified max_scanners_concurrency is less than _max_scan_concurrency that we calculated
TQueryOptions query_options;
query_options.__set_max_scanners_concurrency(2);
query_options.__set_max_column_reader_num(0);
state->set_query_options(query_options);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
// max_scan_concurrency that we calculate will be 10 / 1 = 10;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
Status st = scanner_context->init();
ASSERT_TRUE(st.ok());
// actual max_scan_concurrency will be 2 since user specified max_scanners_concurrency is 2.
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
query_options.__set_max_scanners_concurrency(0);
state->set_query_options(query_options);
st = scanner_context->init();
ASSERT_TRUE(st.ok());
}
TEST_F(ScannerContextTest, test_serial_run) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scan_operator->_should_run_serial = true;
olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get();
olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get();
olap_scan_local_state->_parent = scan_operator.get();
TQueryOptions query_options;
query_options.__set_max_scanners_concurrency(2);
query_options.__set_max_column_reader_num(0);
state->set_query_options(query_options);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
Status st = scanner_context->init();
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
query_options.__set_max_scanners_concurrency(0);
state->set_query_options(query_options);
st = scanner_context->init();
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
}
TEST_F(ScannerContextTest, test_max_column_reader_num) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 20; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scan_operator->_should_run_serial = false;
olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get();
olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get();
olap_scan_local_state->_parent = scan_operator.get();
TQueryOptions query_options;
query_options.__set_max_scanners_concurrency(20);
query_options.__set_max_column_reader_num(1);
state->set_query_options(query_options);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
Status st = scanner_context->init();
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
}
TEST_F(ScannerContextTest, test_push_back_scan_task) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_num_scheduled_scanners = 11;
for (int i = 0; i < 5; ++i) {
auto scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
scanner_context->push_back_scan_task(scan_task);
ASSERT_EQ(scanner_context->_num_scheduled_scanners, 10 - i);
}
}
TEST_F(ScannerContextTest, get_margin) {
const int parallel_tasks = 4;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
std::shared_mutex scheduler_mutex;
std::unique_lock<std::shared_mutex> scheduler_lock(scheduler_mutex);
scanner_context->_scanner_scheduler = scan_scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
// _task_queue.size is 0.
// _num_schedule_scanners is 0.
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_shared<CgroupV2CpuCtl>(1);
// Has not submit any scan tasks.
// ScanScheduler is empty too.
// So margin shuold be equal to _min_scan_concurrency_of_scan_scheduler / parallel_tasks.
// We can make full utilization of the resource.
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(0));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(0));
scanner_context->_scanner_scheduler = scheduler.get();
int32_t margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, scanner_context->_min_scan_concurrency_of_scan_scheduler);
// ScanSchedule has 5 active threads and 10 tasks in queue.
// So remaing margin(3) is less than parallel_tasks(4).
scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(5));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 18;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
// 18 - (5 + 10) = 3
ASSERT_EQ(margin, 3);
// ScanSchedule has 10 active threads and 2 tasks in queue.
// Remaing margin(8) is greater than parallel_tasks(4).
// So margin should be equal to margin(8)/parallel_tasks(4) == 2.
scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(10));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(2));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, (scanner_context->_min_scan_concurrency_of_scan_scheduler - 12));
// ScanSchedule is busy.
// Just submit _min_scan_concurrency tasks.
scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(50));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
scanner_context->_num_scheduled_scanners = 0;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, scanner_context->_min_scan_concurrency);
// ScanSchedule is busy.
// _min_scan_concurrency is already satisfied.
scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(50));
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
scanner_context->_num_scheduled_scanners = 20;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, 0);
}
TEST_F(ScannerContextTest, pull_next_scan_task) {
const int parallel_tasks = 4;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
std::shared_mutex scheduler_mutex;
std::unique_lock<std::shared_mutex> scheduler_lock(scheduler_mutex);
scanner_context->_scanner_scheduler = scan_scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_shared<CgroupV2CpuCtl>(1);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
scanner_context->_scanner_scheduler = scan_scheduler.get();
scanner_context->_max_scan_concurrency = 1;
std::shared_ptr<ScanTask> pull_scan_task =
scanner_context->_pull_next_scan_task(nullptr, scanner_context->_max_scan_concurrency);
ASSERT_EQ(pull_scan_task, nullptr);
auto scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
pull_scan_task = scanner_context->_pull_next_scan_task(scan_task,
scanner_context->_max_scan_concurrency);
ASSERT_EQ(pull_scan_task, nullptr);
scanner_context->_max_scan_concurrency = 2;
BlockUPtr cached_block = Block::create_unique();
scan_task->cached_blocks.emplace_back(std::move(cached_block), 0);
EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task(
scan_task, scanner_context->_max_scan_concurrency - 1));
scan_task->cached_blocks.clear();
scan_task->eos = true;
EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task(
scan_task, scanner_context->_max_scan_concurrency - 1));
scan_task->cached_blocks.clear();
scan_task->eos = false;
pull_scan_task = scanner_context->_pull_next_scan_task(
scan_task, scanner_context->_max_scan_concurrency - 1);
EXPECT_EQ(pull_scan_task.get(), scan_task.get());
scanner_context->_pending_scanners = std::stack<std::shared_ptr<ScanTask>>();
pull_scan_task = scanner_context->_pull_next_scan_task(
nullptr, scanner_context->_max_scan_concurrency - 1);
EXPECT_EQ(pull_scan_task, nullptr);
scanner_context->_pending_scanners.push(
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner)));
pull_scan_task = scanner_context->_pull_next_scan_task(
nullptr, scanner_context->_max_scan_concurrency - 1);
EXPECT_NE(pull_scan_task, nullptr);
}
TEST_F(ScannerContextTest, schedule_scan_task) {
const int parallel_tasks = 4;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 15; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
std::shared_mutex scheduler_mutex;
std::unique_lock<std::shared_mutex> scheduler_lock(scheduler_mutex);
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_shared<CgroupV2CpuCtl>(1);
// Scan resource is enough.
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, get_active_threads()).WillRepeatedly(testing::Return(0));
EXPECT_CALL(*scheduler, get_queue_size()).WillRepeatedly(testing::Return(0));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 1;
scanner_context->_max_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
Status st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1);
scanner_context->_max_scan_concurrency = 10;
scanner_context->_max_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_num_scheduled_scanners, scanner_context->_max_scan_concurrency);
scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 100;
scanner_context->_min_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
int margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, scanner_context->_min_scan_concurrency_of_scan_scheduler);
st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
ASSERT_TRUE(st.ok());
// 15 since we have 15 scanners.
ASSERT_EQ(scanner_context->_num_scheduled_scanners, 15);
scanners = std::list<std::shared_ptr<ScannerDelegate>>();
for (int i = 0; i < 1; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 1;
scanner_context->_min_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
auto scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
st = scanner_context->schedule_scan_task(scan_task, transfer_lock, scheduler_lock);
// current scan task is added back.
ASSERT_EQ(scanner_context->_pending_scanners.size(), 1);
ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1);
scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 1;
scanner_context->_min_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
st = scanner_context->schedule_scan_task(nullptr, transfer_lock, scheduler_lock);
scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
scan_task->cached_blocks.emplace_back(Block::create_unique(), 0);
// Illigeal situation.
// If current scan task has cached block, it should not be called with this methods.
EXPECT_ANY_THROW(std::ignore = scanner_context->schedule_scan_task(scan_task, transfer_lock,
scheduler_lock));
}
TEST_F(ScannerContextTest, scan_queue_mem_limit) {
state->_query_options.__set_scan_queue_mem_limit(100);
ASSERT_EQ(state->scan_queue_mem_limit(), 100);
state->_query_options.__isset.scan_queue_mem_limit = false;
state->_query_options.__set_mem_limit(200);
ASSERT_EQ(state->scan_queue_mem_limit(), 200 / 20);
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get();
olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get();
olap_scan_local_state->_parent = scan_operator.get();
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
// max_scan_concurrency that we calculate will be 10 / 1 = 10;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
std::ignore = scanner_context->init();
ASSERT_EQ(scanner_context->_max_bytes_in_queue, (1024 * 1024 * 10) * (1 / 300 + 1));
}
TEST_F(ScannerContextTest, get_free_block) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get();
scanner_context->_newly_create_free_blocks_num->set(0L);
scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get();
scanner_context->_scanner_memory_used_counter->set(0L);
BlockUPtr block = scanner_context->get_free_block(/*force=*/true);
ASSERT_NE(block, nullptr);
ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 1);
scanner_context->_max_bytes_in_queue = 200;
// no free block
// force is false, _block_memory_usage < _max_bytes_in_queue
block = scanner_context->get_free_block(/*force=*/false);
ASSERT_NE(block, nullptr);
ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 2);
std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100));
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true));
scanner_context->_free_blocks.enqueue(std::move(return_block));
// get free block from queue
block = scanner_context->get_free_block(/*force=*/false);
ASSERT_NE(block, nullptr);
ASSERT_EQ(scanner_context->_block_memory_usage, -100);
ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), -100);
}
TEST_F(ScannerContextTest, return_free_block) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
scanner_context->_block_memory_usage = 0;
std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100));
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true));
EXPECT_CALL(*return_block, clear_column_data(testing::_)).WillRepeatedly(testing::Return());
scanner_context->return_free_block(std::move(return_block));
ASSERT_EQ(scanner_context->_block_memory_usage, 100);
ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), 100);
// free_block queue is stabilized, so size_approx is accurate.
ASSERT_EQ(scanner_context->_free_blocks.size_approx(), 1);
}
TEST_F(ScannerContextTest, get_block_from_queue) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});
auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
const int64_t limit = 100;
OlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
std::shared_ptr<Scanner> scanner =
OlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));
std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
scanner_context->_block_memory_usage = 0;
std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100));
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true));
EXPECT_CALL(*return_block, clear_column_data(testing::_)).WillRepeatedly(testing::Return());
std::unique_ptr<MockRuntimeStateLocal> mock_runtime_state =
std::make_unique<MockRuntimeStateLocal>();
EXPECT_CALL(*mock_runtime_state, is_cancelled()).WillOnce(testing::Return(true));
EXPECT_CALL(*mock_runtime_state, cancel_reason())
.WillOnce(testing::Return(Status::Cancelled("TestCancelMsg")));
bool eos = false;
Status st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(),
&eos, 0);
EXPECT_TRUE(!st.ok());
EXPECT_EQ(st.msg(), "TestCancelMsg");
EXPECT_CALL(*mock_runtime_state, is_cancelled()).WillRepeatedly(testing::Return(false));
scanner_context->_process_status = Status::InternalError("TestCancel");
st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(), &eos,
0);
EXPECT_TRUE(!st.ok());
EXPECT_TRUE(st.msg() == "TestCancel");
scanner_context->_process_status = Status::OK();
scanner_context->_is_finished = false;
scanner_context->_should_stop = false;
auto scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
scan_task->set_eos(true);
scanner_context->_tasks_queue.push_back(scan_task);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillOnce(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_num_finished_scanners = 0;
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(false));
st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(), &eos,
0);
EXPECT_TRUE(st.ok());
EXPECT_EQ(scanner_context->_num_finished_scanners, 1);
}
} // namespace doris::vectorized