blob: e7cfea03695b59ef0f83188794334e831fabd473 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/workload_management/workload_sched_policy.h"
#include <gen_cpp/BackendService_types.h>
#include <gtest/gtest.h>
#include "pipeline/task_scheduler.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_management/resource_context.h"
#include "runtime/workload_management/workload_condition.h"
#include "util/threadpool.h"
#include "vec/exec/scan/scanner_scheduler.h"
namespace doris {
class WorkloadSchedPolicyTest : public testing::Test {
public:
WorkloadSchedPolicyTest() = default;
~WorkloadSchedPolicyTest() override = default;
protected:
std::unique_ptr<WorkloadCondition> create_workload_condition(TWorkloadMetricType::type mtype,
TCompareOperator::type otype,
std::string value) {
TWorkloadCondition cond;
cond.metric_name = mtype;
cond.op = otype;
cond.value = value;
return WorkloadConditionFactory::create_workload_condition(&cond);
}
std::unique_ptr<WorkloadAction> create_workload_action(TWorkloadActionType::type atype) {
TWorkloadAction action;
action.action = atype;
return WorkloadActionFactory::create_workload_action(&action);
}
WorkloadAction::RuntimeContext create_runtime_context() {
std::shared_ptr<ResourceContext> resource_ctx = ResourceContext::create_shared();
WorkloadAction::RuntimeContext action_runtime_ctx(resource_ctx);
return action_runtime_ctx;
}
void SetUp() override {}
};
TEST_F(WorkloadSchedPolicyTest, one_policy_one_condition) {
// 1 empty resource
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
}
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
TCompareOperator::type::GREATER, "10"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
}
// 2 check query time
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
TCompareOperator::type::GREATER, "10"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
action_runtime_ctx.resource_ctx->task_controller()->finish();
EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
<< ": " << action_runtime_ctx.resource_ctx->task_controller()->running_time();
}
// 3 check scan rows
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
TCompareOperator::type::GREATER, "100"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(99);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
<< ": " << action_runtime_ctx.resource_ctx->io_context()->scan_rows();
action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(2);
EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
<< ": " << action_runtime_ctx.resource_ctx->io_context()->scan_rows();
action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(-2);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
<< ": " << action_runtime_ctx.resource_ctx->io_context()->scan_rows();
}
// 4 check scan bytes
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_BYTES,
TCompareOperator::type::GREATER, "1000"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(999);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
<< ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(2);
EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
<< ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(-2);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
<< ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
}
// 5 check query be memory bytes
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(
create_workload_condition(TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES,
TCompareOperator::type::GREATER, "10000"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
std::shared_ptr<MemTrackerLimiter> mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::QUERY, "Test");
action_runtime_ctx.resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(9999);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
<< ": "
<< action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes();
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(2);
EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
<< ": "
<< action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes();
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(-2);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
<< ": "
<< action_runtime_ctx.resource_ctx->memory_context()->current_memory_bytes();
}
}
TEST_F(WorkloadSchedPolicyTest, one_policy_mutl_condition) {
// 1. init policy
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::QUERY_TIME,
TCompareOperator::type::GREATER, "10"));
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
TCompareOperator::type::GREATER, "100"));
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_BYTES,
TCompareOperator::type::GREATER, "1000"));
cond_ptr_list.push_back(
create_workload_condition(TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES,
TCompareOperator::type::GREATER, "10000"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
// 2. is match
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
std::shared_ptr<MemTrackerLimiter> mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::QUERY, "Test");
action_runtime_ctx.resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
std::this_thread::sleep_for(std::chrono::milliseconds(50));
action_runtime_ctx.resource_ctx->task_controller()->finish();
EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(101);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(1001);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
action_runtime_ctx.resource_ctx->memory_context()->mem_tracker()->consume(10001);
EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
}
TEST_F(WorkloadSchedPolicyTest, test_operator) {
// LESS
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
TCompareOperator::type::LESS, "100"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(100);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
}
// EQUAL
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
TCompareOperator::type::EQUAL, "100"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
action_runtime_ctx.resource_ctx->io_context()->update_scan_rows(100);
EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
}
}
TEST_F(WorkloadSchedPolicyTest, test_wg_id_set) {
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(create_workload_condition(TWorkloadMetricType::type::BE_SCAN_ROWS,
TCompareOperator::type::GREATER_EQUAL, "0"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
std::set<int64_t> wg_id_set;
wg_id_set.insert(1);
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
EXPECT_FALSE(policy->is_match(&action_runtime_ctx));
TWorkloadGroupInfo tworkload_group_info;
tworkload_group_info.__isset.id = true;
tworkload_group_info.id = 1;
tworkload_group_info.__isset.version = true;
tworkload_group_info.version = 1;
WorkloadGroupInfo workload_group_info =
WorkloadGroupInfo::parse_topic_info(tworkload_group_info);
auto workload_group = std::make_shared<WorkloadGroup>(workload_group_info);
action_runtime_ctx.resource_ctx->set_workload_group(workload_group);
EXPECT_TRUE(policy->is_match(&action_runtime_ctx));
}
} // namespace doris