blob: ddf98d204e0dddf9c5dd875f74c595558172a12c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/QueryCache_types.h>
#include <gtest/gtest.h>
#include "common/object_pool.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "runtime/descriptors.h"
namespace doris::vectorized {
class ScanOperatorTest : public testing::Test {
public:
void SetUp() override {
obj_pool = std::make_unique<ObjectPool>();
// This ScanNode has two tuples.
// First one is input tuple, second one is output tuple.
tbl_desc.tableType = TTableType::OLAP_TABLE;
tuple_desc.id = 0;
tuple_descs.push_back(tuple_desc);
tuple_desc.id = 1;
tuple_descs.push_back(tuple_desc);
thrift_tbl.tableDescriptors.push_back(tbl_desc);
thrift_tbl.tupleDescriptors = tuple_descs;
thrift_tbl.slotDescriptors = slot_descs;
scalar_type.__set_type(TPrimitiveType::STRING);
std::ignore = DescriptorTbl::create(obj_pool.get(), thrift_tbl, &descs);
}
private:
std::unique_ptr<ObjectPool> obj_pool;
TTupleDescriptor tuple_desc;
std::vector<TTupleDescriptor> tuple_descs;
DescriptorTbl* descs = nullptr;
TTableDescriptor tbl_desc;
TScalarType scalar_type;
TDescriptorTable thrift_tbl;
std::vector<TSlotDescriptor> slot_descs;
std::unique_ptr<RuntimeState> state = std::make_unique<RuntimeState>();
};
TEST_F(ScanOperatorTest, adaptive_pipeline_task_serial_read_on_limit) {
const int parallel_pipeline_task_num = 24;
TPlanNode tnode;
tnode.row_tuples.push_back(TTupleId(0));
tnode.row_tuples.push_back(TTupleId(1));
// Scan with conjuncts
TExpr conjunct;
std::vector<TExpr> conjuncts {conjunct};
tnode.__set_conjuncts(conjuncts);
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_pipeline_task_num, TQueryCacheParam {});
TQueryOptions query_options;
// enable_adaptive_pipeline_task_serial_read_on_limit is true
query_options.__set_enable_adaptive_pipeline_task_serial_read_on_limit(true);
state->set_query_options(query_options);
std::ignore = scan_operator->init(tnode, state.get());
// With conjuncts, should_run_serial is false
ASSERT_EQ(scan_operator->_should_run_serial, false);
// Scan without conjuncts
conjuncts.clear();
tnode.__set_conjuncts(conjuncts);
// limit 10
tnode.__set_limit(10);
scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_pipeline_task_num, TQueryCacheParam {});
// enable_adaptive_pipeline_task_serial_read_on_limit is true
query_options.__set_enable_adaptive_pipeline_task_serial_read_on_limit(true);
query_options.__set_adaptive_pipeline_task_serial_read_on_limit(10);
state->set_query_options(query_options);
std::ignore = scan_operator->init(tnode, state.get());
// Without conjuncts, limit 10 <= adaptive_pipeline_task_serial_read_on_limit 10
ASSERT_EQ(scan_operator->_should_run_serial, true);
query_options.__set_adaptive_pipeline_task_serial_read_on_limit(9);
state->set_query_options(query_options);
std::ignore = scan_operator->init(tnode, state.get());
// Without conjuncts, limit 10 > adaptive_pipeline_task_serial_read_on_limit 9
ASSERT_EQ(scan_operator->_should_run_serial, true);
query_options.__set_enable_adaptive_pipeline_task_serial_read_on_limit(false);
query_options.__set_adaptive_pipeline_task_serial_read_on_limit(900);
state->set_query_options(query_options);
scan_operator->_should_run_serial = false;
std::ignore = scan_operator->init(tnode, state.get());
// Without conjuncts, enable_adaptive_pipeline_task_serial_read_on_limit is false
ASSERT_EQ(scan_operator->_should_run_serial, false);
}
} // namespace doris::vectorized