blob: 740a57e793b1537ebfaf4f8870d84c7af51526ae [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/scan/new_olap_scan_node.h"
#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <opentelemetry/trace/tracer.h>
#include <stdio.h>
#include <algorithm>
#include <charconv>
#include <ostream>
#include <shared_mutex>
#include <unordered_map>
#include <utility>
#include <variant>
#include "common/config.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "exec/exec_node.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "runtime/decimalv2_value.h"
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "service/backend_options.h"
#include "util/time.h"
#include "util/to_string.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/common/string_ref.h"
#include "vec/exec/scan/new_olap_scanner.h"
#include "vec/exprs/vectorized_fn_call.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
namespace doris {
class DescriptorTbl;
class FunctionContext;
namespace vectorized {
class VScanner;
} // namespace vectorized
} // namespace doris
namespace doris::vectorized {
NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: VScanNode(pool, tnode, descs), _olap_scan_node(tnode.olap_scan_node) {
_output_tuple_id = tnode.olap_scan_node.tuple_id;
_col_distribute_ids = tnode.olap_scan_node.distribute_column_ids;
if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) {
_limit_per_scanner = _olap_scan_node.sort_limit;
}
}
Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
if (!_is_pipeline_scan || _should_create_scanner) {
statistics->add_scan_bytes(_read_compressed_counter->value());
statistics->add_scan_rows(_raw_rows_counter->value());
statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
}
return Status::OK();
}
Status NewOlapScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
// if you want to add some profile in scan node, even it have not new VScanner object
// could add here, not in the _init_profile() function
_tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT);
return Status::OK();
}
Status NewOlapScanNode::_init_profile() {
RETURN_IF_ERROR(VScanNode::_init_profile());
// 1. init segment profile
_segment_profile.reset(new RuntimeProfile("SegmentIterator"));
_scanner_profile->add_child(_segment_profile.get(), true, nullptr);
// 2. init timer and counters
_reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime");
_scanner_init_timer = ADD_TIMER(_scanner_profile, "ScannerInitTime");
_process_conjunct_timer = ADD_TIMER(_runtime_profile, "ProcessConjunctTime");
_read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES);
_read_uncompressed_counter =
ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES);
_block_load_timer = ADD_TIMER(_segment_profile, "BlockLoadTime");
_block_load_counter = ADD_COUNTER(_segment_profile, "BlocksLoad", TUnit::UNIT);
_block_fetch_timer = ADD_TIMER(_scanner_profile, "BlockFetchTime");
_raw_rows_counter = ADD_COUNTER(_segment_profile, "RawRowsRead", TUnit::UNIT);
_block_convert_timer = ADD_TIMER(_scanner_profile, "BlockConvertTime");
_block_init_timer = ADD_TIMER(_segment_profile, "BlockInitTime");
_block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime");
_block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT);
_block_conditions_filtered_timer = ADD_TIMER(_segment_profile, "BlockConditionsFilteredTime");
_rows_vec_cond_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT);
_rows_short_circuit_cond_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsShortCircuitPredFiltered", TUnit::UNIT);
_rows_vec_cond_input_counter =
ADD_COUNTER(_segment_profile, "RowsVectorPredInput", TUnit::UNIT);
_rows_short_circuit_cond_input_counter =
ADD_COUNTER(_segment_profile, "RowsShortCircuitPredInput", TUnit::UNIT);
_vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime");
_short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime");
_expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime");
_first_read_timer = ADD_TIMER(_segment_profile, "FirstReadTime");
_second_read_timer = ADD_TIMER(_segment_profile, "SecondReadTime");
_first_read_seek_timer = ADD_TIMER(_segment_profile, "FirstReadSeekTime");
_first_read_seek_counter = ADD_COUNTER(_segment_profile, "FirstReadSeekCount", TUnit::UNIT);
_lazy_read_timer = ADD_TIMER(_segment_profile, "LazyReadTime");
_lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime");
_lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT);
_output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime");
_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT);
_bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT);
_del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT);
_conditions_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsConditionsFiltered", TUnit::UNIT);
_key_range_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsKeyRangeFiltered", TUnit::UNIT);
_io_timer = ADD_TIMER(_segment_profile, "IOTimer");
_decompressor_timer = ADD_TIMER(_segment_profile, "DecompressorTimer");
_total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT);
_cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT);
_bitmap_index_filter_counter =
ADD_COUNTER(_segment_profile, "RowsBitmapIndexFiltered", TUnit::UNIT);
_bitmap_index_filter_timer = ADD_TIMER(_segment_profile, "BitmapIndexFilterTimer");
_inverted_index_filter_counter =
ADD_COUNTER(_segment_profile, "RowsInvertedIndexFiltered", TUnit::UNIT);
_inverted_index_filter_timer = ADD_TIMER(_segment_profile, "InvertedIndexFilterTime");
_inverted_index_query_cache_hit_counter =
ADD_COUNTER(_segment_profile, "InvertedIndexQueryCacheHit", TUnit::UNIT);
_inverted_index_query_cache_miss_counter =
ADD_COUNTER(_segment_profile, "InvertedIndexQueryCacheMiss", TUnit::UNIT);
_inverted_index_query_timer = ADD_TIMER(_segment_profile, "InvertedIndexQueryTime");
_inverted_index_query_bitmap_copy_timer =
ADD_TIMER(_segment_profile, "InvertedIndexQueryBitmapCopyTime");
_inverted_index_query_bitmap_op_timer =
ADD_TIMER(_segment_profile, "InvertedIndexQueryBitmapOpTime");
_inverted_index_searcher_open_timer =
ADD_TIMER(_segment_profile, "InvertedIndexSearcherOpenTime");
_inverted_index_searcher_search_timer =
ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchTime");
_output_index_result_column_timer = ADD_TIMER(_segment_profile, "OutputIndexResultColumnTimer");
_filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT);
_total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT);
return Status::OK();
}
static std::string olap_filter_to_string(const doris::TCondition& condition) {
auto op_name = condition.condition_op;
if (condition.condition_op == "*=") {
op_name = "IN";
} else if (condition.condition_op == "!*=") {
op_name = "NOT IN";
}
return fmt::format("{{{} {} {}}}", condition.column_name, op_name,
condition.condition_values.size() > 128
? "[more than 128 elements]"
: to_string(condition.condition_values));
}
static std::string olap_filters_to_string(const std::vector<doris::TCondition>& filters) {
std::string filters_string;
filters_string += "[";
for (auto it = filters.cbegin(); it != filters.cend(); it++) {
if (it != filters.cbegin()) {
filters_string += ", ";
}
filters_string += olap_filter_to_string(*it);
}
filters_string += "]";
return filters_string;
}
inline std::string push_down_agg_to_string(const TPushAggOp::type& op) {
if (op == TPushAggOp::MINMAX) {
return "MINMAX";
} else if (op == TPushAggOp::COUNT) {
return "COUNT";
} else if (op == TPushAggOp::MIX) {
return "MIX";
} else {
return "NONE";
}
}
static std::string tablets_id_to_string(
const std::vector<std::unique_ptr<TPaloScanRange>>& scan_ranges) {
if (scan_ranges.empty()) {
return "[empty]";
}
std::stringstream ss;
ss << "[" << scan_ranges[0]->tablet_id;
for (int i = 1; i < scan_ranges.size(); ++i) {
ss << ", " << scan_ranges[i]->tablet_id;
}
ss << "]";
return ss.str();
}
Status NewOlapScanNode::_process_conjuncts() {
SCOPED_TIMER(_process_conjunct_timer);
RETURN_IF_ERROR(VScanNode::_process_conjuncts());
if (_eos) {
return Status::OK();
}
RETURN_IF_ERROR(_build_key_ranges_and_filters());
return Status::OK();
}
Status NewOlapScanNode::_build_key_ranges_and_filters() {
if (!_olap_scan_node.__isset.push_down_agg_type_opt ||
_olap_scan_node.push_down_agg_type_opt == TPushAggOp::NONE) {
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
DCHECK(column_types.size() == column_names.size());
// 1. construct scan key except last olap engine short key
_scan_keys.set_is_convertible(limit() == -1);
// we use `exact_range` to identify a key range is an exact range or not when we convert
// it to `_scan_keys`. If `exact_range` is true, we can just discard it from `_olap_filters`.
bool exact_range = true;
bool eos = false;
for (int column_index = 0;
column_index < column_names.size() && !_scan_keys.has_range_value() && !eos;
++column_index) {
auto iter = _colname_to_value_range.find(column_names[column_index]);
if (_colname_to_value_range.end() == iter) {
break;
}
RETURN_IF_ERROR(std::visit(
[&](auto&& range) {
// make a copy or range and pass to extend_scan_key, keep the range unchanged
// because extend_scan_key method may change the first parameter.
// but the original range may be converted to olap filters, if it's not a exact_range.
auto temp_range = range;
if (range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) {
RETURN_IF_ERROR(_scan_keys.extend_scan_key(
temp_range, _max_scan_key_num, &exact_range, &eos));
if (exact_range) {
_colname_to_value_range.erase(iter->first);
}
} else {
// if exceed max_pushdown_conditions_per_column, use whole_value_rang instead
// and will not erase from _colname_to_value_range, it must be not exact_range
temp_range.set_whole_value_range();
RETURN_IF_ERROR(_scan_keys.extend_scan_key(
temp_range, _max_scan_key_num, &exact_range, &eos));
}
return Status::OK();
},
iter->second));
}
_eos |= eos;
for (auto& iter : _colname_to_value_range) {
std::vector<TCondition> filters;
std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second);
for (const auto& filter : filters) {
_olap_filters.push_back(filter);
}
}
for (auto& iter : _compound_value_ranges) {
std::vector<TCondition> filters;
std::visit(
[&](auto&& range) {
if (range.is_in_compound_value_range()) {
range.to_condition_in_compound(filters);
} else if (range.is_match_value_range()) {
range.to_match_condition(filters);
}
},
iter);
for (const auto& filter : filters) {
_compound_filters.push_back(filter);
}
}
// Append value ranges in "_not_in_value_ranges"
for (auto& range : _not_in_value_ranges) {
std::visit([&](auto&& the_range) { the_range.to_in_condition(_olap_filters, false); },
range);
}
} else {
_runtime_profile->add_info_string(
"PushDownAggregate",
push_down_agg_to_string(_olap_scan_node.push_down_agg_type_opt));
}
if (_state->enable_profile()) {
_runtime_profile->add_info_string("PushDownPredicates",
olap_filters_to_string(_olap_filters));
_runtime_profile->add_info_string("KeyRanges", _scan_keys.debug_string());
_runtime_profile->add_info_string("TabletIds", tablets_id_to_string(_scan_ranges));
}
VLOG_CRITICAL << _scan_keys.debug_string();
return Status::OK();
}
Status NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_call,
VExprContext* expr_ctx,
StringRef* constant_str,
doris::FunctionContext** fn_ctx,
VScanNode::PushDownType& pdt) {
// Now only `like` function filters is supported to push down
if (fn_call->fn().name.function_name != "like") {
pdt = PushDownType::UNACCEPTABLE;
return Status::OK();
}
const auto& children = fn_call->children();
doris::FunctionContext* func_cxt = expr_ctx->fn_context(fn_call->fn_context_index());
DCHECK(func_cxt != nullptr);
DCHECK(children.size() == 2);
for (size_t i = 0; i < children.size(); i++) {
if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
continue;
}
if (!children[1 - i]->is_constant()) {
// only handle constant value
pdt = PushDownType::UNACCEPTABLE;
return Status::OK();
} else {
DCHECK(children[1 - i]->type().is_string_type());
std::shared_ptr<ColumnPtrWrapper> const_col_wrapper;
RETURN_IF_ERROR(children[1 - i]->get_const_col(expr_ctx, &const_col_wrapper));
if (const ColumnConst* const_column =
check_and_get_column<ColumnConst>(const_col_wrapper->column_ptr)) {
*constant_str = const_column->get_data_at(0);
} else {
pdt = PushDownType::UNACCEPTABLE;
return Status::OK();
}
}
}
*fn_ctx = func_cxt;
pdt = PushDownType::ACCEPTABLE;
return Status::OK();
}
bool NewOlapScanNode::_should_push_down_common_expr() {
return _state->enable_common_expr_pushdown() &&
(_olap_scan_node.keyType == TKeysType::DUP_KEYS ||
(_olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
_olap_scan_node.__isset.enable_unique_key_merge_on_write &&
_olap_scan_node.enable_unique_key_merge_on_write));
}
// PlanFragmentExecutor will call this method to set scan range
// Doris scan range is defined in thrift file like this
// struct TPaloScanRange {
// 1: required list<Types.TNetworkAddress> hosts
// 2: required string schema_hash
// 3: required string version
// 5: required Types.TTabletId tablet_id
// 6: required string db_name
// 7: optional list<TKeyRange> partition_column_ranges
// 8: optional string index_name
// 9: optional string table_name
//}
// every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet
void NewOlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
for (auto& scan_range : scan_ranges) {
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
_scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range));
COUNTER_UPDATE(_tablet_counter, 1);
}
// telemetry::set_current_span_attribute(_tablet_counter);
}
std::string NewOlapScanNode::get_name() {
return fmt::format("VNewOlapScanNode({0})", _olap_scan_node.table_name);
}
Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
return Status::OK();
}
SCOPED_TIMER(_scanner_init_timer);
auto span = opentelemetry::trace::Tracer::GetCurrentSpan();
if (!_conjuncts.empty()) {
std::string message;
for (auto& conjunct : _conjuncts) {
if (conjunct->root()) {
if (!message.empty()) {
message += ", ";
}
message += conjunct->root()->debug_string();
}
}
_runtime_profile->add_info_string("RemainedDownPredicates", message);
}
if (!_olap_scan_node.output_column_unique_ids.empty()) {
for (auto uid : _olap_scan_node.output_column_unique_ids) {
_maybe_read_column_ids.emplace(uid);
}
}
// ranges constructed from scan keys
RETURN_IF_ERROR(_scan_keys.get_key_range(&_cond_ranges));
// if we can't get ranges from conditions, we give it a total range
if (_cond_ranges.empty()) {
_cond_ranges.emplace_back(new doris::OlapScanRange());
}
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
bool is_duplicate_key = false;
int segment_count = 0;
std::vector<std::vector<RowsetReaderSharedPtr>> rowset_readers_vector(_scan_ranges.size());
std::vector<std::vector<int>> tablet_rs_seg_count(_scan_ranges.size());
// Split tablet segment by scanner, only use in pipeline in duplicate key
// 1. if tablet count lower than scanner thread num, count segment num of all tablet ready for scan
// TODO: some tablet may do not have segment, may need split segment all case
if (_shared_scan_opt && _scan_ranges.size() < config::doris_scanner_thread_pool_thread_num) {
for (int i = 0; i < _scan_ranges.size(); ++i) {
auto& scan_range = _scan_ranges[i];
auto tablet_id = scan_range->tablet_id;
auto [tablet, status] =
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
true);
RETURN_IF_ERROR(status);
is_duplicate_key = tablet->keys_type() == DUP_KEYS;
if (!is_duplicate_key) {
break;
}
int64_t version = 0;
std::from_chars(scan_range->version.c_str(),
scan_range->version.c_str() + scan_range->version.size(), version);
std::shared_lock rdlock(tablet->get_header_lock());
// acquire tablet rowset readers at the beginning of the scan node
// to prevent this case: when there are lots of olap scanners to run for example 10000
// the rowsets maybe compacted when the last olap scanner starts
Status acquire_reader_st =
tablet->capture_rs_readers({0, version}, &rowset_readers_vector[i]);
if (!acquire_reader_st.ok()) {
LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st;
std::stringstream ss;
ss << "failed to initialize storage reader. tablet=" << tablet->full_name()
<< ", res=" << acquire_reader_st
<< ", backend=" << BackendOptions::get_localhost();
return Status::InternalError(ss.str());
}
for (const auto& rowset_reader : rowset_readers_vector[i]) {
auto num_segments = rowset_reader->rowset()->num_segments();
tablet_rs_seg_count[i].emplace_back(num_segments);
segment_count += num_segments;
}
}
}
auto build_new_scanner = [&](const TPaloScanRange& scan_range,
const std::vector<OlapScanRange*>& key_ranges,
const std::vector<RowsetReaderSharedPtr>& rs_readers,
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) {
std::shared_ptr<NewOlapScanner> scanner = NewOlapScanner::create_shared(
_state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range,
key_ranges, rs_readers, rs_reader_seg_offsets, _need_agg_finalize,
_scanner_profile.get());
RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
scanner->set_compound_filters(_compound_filters);
scanners->push_back(scanner);
return Status::OK();
};
if (is_duplicate_key) {
// 2. Split by segment count, each scanner need scan avg segment count
auto avg_segment_count =
std::max(segment_count / config::doris_scanner_thread_pool_thread_num, 1);
for (int i = 0; i < _scan_ranges.size(); ++i) {
auto& scan_range = _scan_ranges[i];
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges;
int num_ranges = ranges->size();
std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges);
for (int j = 0; j < num_ranges; ++j) {
scanner_ranges[j] = (*ranges)[j].get();
}
const auto& rs_seg_count = tablet_rs_seg_count[i];
int rs_seg_count_index = 0;
int rs_seg_start_scan = 0;
int scanner_seg_occupy = 0;
std::vector<RowsetReaderSharedPtr> rs_readers;
std::vector<std::pair<int, int>> rs_reader_seg_offsets;
while (rs_seg_count_index < rs_seg_count.size()) {
// do not generator range of segment (0, 0)
if (rs_seg_count[rs_seg_count_index] == 0) {
rs_seg_start_scan = 0;
rs_seg_count_index++;
continue;
}
auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] - rs_seg_start_scan;
rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone());
if (scanner_seg_occupy + max_add_seg_nums > avg_segment_count) {
auto need_add_seg_nums = avg_segment_count - scanner_seg_occupy;
rs_reader_seg_offsets.emplace_back(
rs_seg_start_scan,
rs_seg_start_scan + need_add_seg_nums); // only scan need_add_seg_nums
RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers,
rs_reader_seg_offsets));
rs_seg_start_scan += need_add_seg_nums;
scanner_seg_occupy = 0;
rs_readers.clear();
rs_reader_seg_offsets.clear();
} else if (scanner_seg_occupy + max_add_seg_nums == avg_segment_count) {
rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
rs_seg_count[rs_seg_count_index]);
RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers,
rs_reader_seg_offsets));
rs_seg_start_scan = 0;
scanner_seg_occupy = 0;
rs_readers.clear();
rs_reader_seg_offsets.clear();
rs_seg_count_index++;
} else {
rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
rs_seg_count[rs_seg_count_index]);
rs_seg_start_scan = 0;
scanner_seg_occupy += max_add_seg_nums;
rs_seg_count_index++;
}
}
#ifndef NDEBUG
for (const auto& offset : rs_reader_seg_offsets) {
DCHECK_NE(offset.first, offset.second);
}
#endif
// dispose some segment tail
if (!rs_readers.empty()) {
build_new_scanner(*scan_range, scanner_ranges, rs_readers, rs_reader_seg_offsets);
}
}
} else {
for (auto& scan_range : _scan_ranges) {
auto tablet_id = scan_range->tablet_id;
auto [tablet, status] =
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
true);
RETURN_IF_ERROR(status);
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges;
int size_based_scanners_per_tablet = 1;
if (config::doris_scan_range_max_mb > 0) {
size_based_scanners_per_tablet =
std::max(1, (int)(tablet->tablet_footprint() /
(config::doris_scan_range_max_mb << 20)));
}
int ranges_per_scanner =
std::max(1, (int)ranges->size() / std::min(scanners_per_tablet,
size_based_scanners_per_tablet));
int num_ranges = ranges->size();
for (int i = 0; i < num_ranges;) {
std::vector<doris::OlapScanRange*> scanner_ranges;
scanner_ranges.push_back((*ranges)[i].get());
++i;
for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
(*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
++j, ++i) {
scanner_ranges.push_back((*ranges)[i].get());
}
RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, {}, {}));
}
}
}
return Status::OK();
}
bool NewOlapScanNode::_is_key_column(const std::string& key_name) {
// all column in dup_keys table or unique_keys with merge on write table olap scan node threat
// as key column
if (_olap_scan_node.keyType == TKeysType::DUP_KEYS ||
(_olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
_olap_scan_node.__isset.enable_unique_key_merge_on_write &&
_olap_scan_node.enable_unique_key_merge_on_write)) {
return true;
}
auto res = std::find(_olap_scan_node.key_column_name.begin(),
_olap_scan_node.key_column_name.end(), key_name);
return res != _olap_scan_node.key_column_name.end();
}
void NewOlapScanNode::add_filter_info(int id, const PredicateFilterInfo& update_info) {
// update
_filter_info[id].filtered_row += update_info.filtered_row;
_filter_info[id].input_row += update_info.input_row;
_filter_info[id].type = update_info.type;
// to string
auto& info = _filter_info[id];
std::string filter_name = "RuntimeFilterInfo id ";
filter_name += std::to_string(id);
std::string info_str;
info_str += "type = " + type_to_string(static_cast<PredicateType>(info.type)) + ", ";
info_str += "input = " + std::to_string(info.input_row) + ", ";
info_str += "filtered = " + std::to_string(info.filtered_row);
info_str = "[" + info_str + "]";
// add info
_segment_profile->add_info_string(filter_name, info_str);
}
}; // namespace doris::vectorized