blob: 419a922ee3d7d7547f9609f892bf95b430cb2803 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/fragment_mgr.h"
#include <gtest/gtest.h>
#include "common/config.h"
#include "exec/data_sink.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/row_batch.h"
#include "util/monotime.h"
namespace doris {
static Status s_prepare_status;
static Status s_open_status;
static int s_abort_cnt;
// Mock used for this unittest
PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
const report_status_callback& report_status_cb)
: _exec_env(exec_env), _report_status_cb(report_status_cb) {}
PlanFragmentExecutor::~PlanFragmentExecutor() {}
Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
const QueryFragmentsCtx* batch_ctx) {
return s_prepare_status;
}
Status PlanFragmentExecutor::open() {
SleepFor(MonoDelta::FromMilliseconds(50));
return s_open_status;
}
void PlanFragmentExecutor::cancel() {}
void PlanFragmentExecutor::set_abort() {
LOG(INFO) << "Plan Aborted";
s_abort_cnt++;
}
void PlanFragmentExecutor::close() {}
class FragmentMgrTest : public testing::Test {
public:
FragmentMgrTest() {}
protected:
virtual void SetUp() {
s_prepare_status = Status::OK();
s_open_status = Status::OK();
config::fragment_pool_thread_num_min = 32;
config::fragment_pool_thread_num_max = 32;
config::fragment_pool_queue_size = 1024;
}
virtual void TearDown() {}
};
TEST_F(FragmentMgrTest, Normal) {
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
ASSERT_TRUE(mgr.exec_plan_fragment(params).ok());
// Duplicated
ASSERT_TRUE(mgr.exec_plan_fragment(params).ok());
}
TEST_F(FragmentMgrTest, AddNormal) {
FragmentMgr mgr(nullptr);
for (int i = 0; i < 8; ++i) {
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100 + i);
params.params.fragment_instance_id.__set_lo(200);
ASSERT_TRUE(mgr.exec_plan_fragment(params).ok());
}
}
TEST_F(FragmentMgrTest, CancelNormal) {
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
ASSERT_TRUE(mgr.exec_plan_fragment(params).ok());
// Cancel after add
ASSERT_TRUE(mgr.cancel(params.params.fragment_instance_id).ok());
}
TEST_F(FragmentMgrTest, CancelWithoutAdd) {
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
ASSERT_TRUE(mgr.cancel(params.params.fragment_instance_id).ok());
}
TEST_F(FragmentMgrTest, PrepareFailed) {
s_prepare_status = Status::InternalError("Prepare failed.");
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
ASSERT_FALSE(mgr.exec_plan_fragment(params).ok());
}
TEST_F(FragmentMgrTest, OfferPoolFailed) {
config::fragment_pool_thread_num_min = 1;
config::fragment_pool_thread_num_max = 1;
config::fragment_pool_queue_size = 0;
s_abort_cnt = 0;
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
ASSERT_TRUE(mgr.exec_plan_fragment(params).ok());
// the first plan open will cost 50ms, so the next 3 plans will be aborted.
for (int i = 1; i < 4; ++i) {
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100 + i);
params.params.fragment_instance_id.__set_lo(200);
ASSERT_FALSE(mgr.exec_plan_fragment(params).ok());
}
ASSERT_EQ(3, s_abort_cnt);
}
} // namespace doris
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
doris::CpuInfo::init();
return RUN_ALL_TESTS();
}