blob: 53d89e6395b33aee56d1cd1b6f81c95d7233aa20 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/segment_v2/segment_iterator.h"
#include <assert.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <memory>
#include <numeric>
#include <set>
#include <utility>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/consts.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/io_common.h"
#include "olap/bloom_filter_predicate.h"
#include "olap/column_predicate.h"
#include "olap/field.h"
#include "olap/iterators.h"
#include "olap/like_column_predicate.h"
#include "olap/olap_common.h"
#include "olap/primary_key_index.h"
#include "olap/rowset/segment_v2/bitmap_index_reader.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/indexed_column_reader.h"
#include "olap/rowset/segment_v2/inverted_index_reader.h"
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "olap/short_key_index.h"
#include "olap/tablet_schema.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/query_context.h"
#include "runtime/runtime_predicate.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/key_util.h"
#include "util/simd/bits.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/common/typeid_cast.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_number.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vliteral.h"
#include "vec/exprs/vslot_ref.h"
namespace doris {
using namespace ErrorCode;
namespace segment_v2 {
SegmentIterator::~SegmentIterator() = default;
// A fast range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to).
// Example:
// input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19]
// output ranges: [0,2), [4,8), [10,11), [15,20) (when max_range_size=10)
// output ranges: [0,2), [4,7), [7,8), [10,11), [15,18), [18,20) (when max_range_size=3)
class SegmentIterator::BitmapRangeIterator {
public:
BitmapRangeIterator() = default;
virtual ~BitmapRangeIterator() = default;
explicit BitmapRangeIterator(const roaring::Roaring& bitmap) {
roaring_init_iterator(&bitmap.roaring, &_iter);
_read_next_batch();
}
bool has_more_range() const { return !_eof; }
// read next range into [*from, *to) whose size <= max_range_size.
// return false when there is no more range.
virtual bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) {
if (_eof) {
return false;
}
*from = _buf[_buf_pos];
uint32_t range_size = 0;
uint32_t expect_val = _buf[_buf_pos]; // this initial value just make first batch valid
// if array is contiguous sequence then the following conditions need to be met :
// a_0: x
// a_1: x+1
// a_2: x+2
// ...
// a_p: x+p
// so we can just use (a_p-a_0)-p to check conditions
// and should notice the previous batch needs to be continuous with the current batch
while (!_eof && range_size + _buf_size - _buf_pos <= max_range_size &&
expect_val == _buf[_buf_pos] &&
_buf[_buf_size - 1] - _buf[_buf_pos] == _buf_size - 1 - _buf_pos) {
range_size += _buf_size - _buf_pos;
expect_val = _buf[_buf_size - 1] + 1;
_read_next_batch();
}
// promise remain range not will reach next batch
if (!_eof && range_size < max_range_size && expect_val == _buf[_buf_pos]) {
do {
_buf_pos++;
range_size++;
} while (range_size < max_range_size && _buf[_buf_pos] == _buf[_buf_pos - 1] + 1);
}
*to = *from + range_size;
return true;
}
private:
void _read_next_batch() {
_buf_pos = 0;
_buf_size = roaring::api::roaring_read_uint32_iterator(&_iter, _buf, kBatchSize);
_eof = (_buf_size == 0);
}
static const uint32_t kBatchSize = 256;
roaring::api::roaring_uint32_iterator_t _iter;
uint32_t _buf[kBatchSize];
uint32_t _buf_pos = 0;
uint32_t _buf_size = 0;
bool _eof = false;
};
// A backward range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to).
// Example:
// input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19]
// output ranges: , [15,20), [10,11), [4,8), [0,2) (when max_range_size=10)
// output ranges: [17,20), [15,17), [10,11), [5,8), [4, 5), [0,2) (when max_range_size=3)
class SegmentIterator::BackwardBitmapRangeIterator : public SegmentIterator::BitmapRangeIterator {
public:
explicit BackwardBitmapRangeIterator(const roaring::Roaring& bitmap) {
roaring_init_iterator_last(&bitmap.roaring, &_riter);
}
bool has_more_range() const { return !_riter.has_value; }
// read next range into [*from, *to) whose size <= max_range_size.
// return false when there is no more range.
bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) override {
if (!_riter.has_value) {
return false;
}
uint32_t range_size = 0;
*to = _riter.current_value + 1;
do {
*from = _riter.current_value;
range_size++;
roaring_previous_uint32_iterator(&_riter);
} while (range_size < max_range_size && _riter.has_value &&
_riter.current_value + 1 == *from);
return true;
}
private:
roaring::api::roaring_uint32_iterator_t _riter;
};
SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema)
: _segment(std::move(segment)),
_schema(schema),
_cur_rowid(0),
_lazy_materialization_read(false),
_lazy_inited(false),
_inited(false),
_estimate_row_size(true),
_wait_times_estimate_row_size(10),
_pool(new ObjectPool) {}
Status SegmentIterator::init(const StorageReadOptions& opts) {
// get file handle from file descriptor of segment
if (_inited) {
return Status::OK();
}
_inited = true;
_file_reader = _segment->_file_reader;
_opts = opts;
_col_predicates.clear();
for (auto& predicate : opts.column_predicates) {
if (predicate->need_to_clone()) {
ColumnPredicate* cloned;
predicate->clone(&cloned);
_pool->add(cloned);
_col_predicates.emplace_back(cloned);
} else {
_col_predicates.emplace_back(predicate);
}
}
_tablet_id = opts.tablet_id;
// Read options will not change, so that just resize here
_block_rowids.resize(_opts.block_row_max);
if (!opts.column_predicates_except_leafnode_of_andnode.empty()) {
_col_preds_except_leafnode_of_andnode = opts.column_predicates_except_leafnode_of_andnode;
}
_remaining_conjunct_roots = opts.remaining_conjunct_roots;
_common_expr_ctxs_push_down = opts.common_expr_ctxs_push_down;
_enable_common_expr_pushdown = !_common_expr_ctxs_push_down.empty();
_column_predicate_info.reset(new ColumnPredicateInfo());
for (auto& expr : _remaining_conjunct_roots) {
_calculate_pred_in_remaining_conjunct_root(expr);
}
_column_predicate_info.reset(new ColumnPredicateInfo());
if (_schema->rowid_col_idx() > 0) {
_record_rowids = true;
}
RETURN_IF_ERROR(init_iterators());
if (_char_type_idx.empty() && _char_type_idx_no_0.empty()) {
_vec_init_char_column_id();
}
if (opts.output_columns != nullptr) {
_output_columns = *(opts.output_columns);
}
return Status::OK();
}
Status SegmentIterator::init_iterators() {
RETURN_IF_ERROR(_init_return_column_iterators());
RETURN_IF_ERROR(_init_bitmap_index_iterators());
RETURN_IF_ERROR(_init_inverted_index_iterators());
return Status::OK();
}
Status SegmentIterator::_lazy_init() {
SCOPED_RAW_TIMER(&_opts.stats->block_init_ns);
DorisMetrics::instance()->segment_read_total->increment(1);
_row_bitmap.addRange(0, _segment->num_rows());
// z-order can not use prefix index
if (_segment->_tablet_schema->sort_type() != SortType::ZORDER) {
RETURN_IF_ERROR(_get_row_ranges_by_keys());
}
RETURN_IF_ERROR(_get_row_ranges_by_column_conditions());
RETURN_IF_ERROR(_vec_init_lazy_materialization());
// Remove rows that have been marked deleted
if (_opts.delete_bitmap.count(segment_id()) > 0 &&
_opts.delete_bitmap.at(segment_id()) != nullptr) {
size_t pre_size = _row_bitmap.cardinality();
_row_bitmap -= *(_opts.delete_bitmap.at(segment_id()));
_opts.stats->rows_del_by_bitmap += (pre_size - _row_bitmap.cardinality());
VLOG_DEBUG << "read on segment: " << segment_id() << ", delete bitmap cardinality: "
<< _opts.delete_bitmap.at(segment_id())->cardinality() << ", "
<< _opts.stats->rows_del_by_bitmap << " rows deleted by bitmap";
}
if (_opts.read_orderby_key_reverse) {
_range_iter.reset(new BackwardBitmapRangeIterator(_row_bitmap));
} else {
_range_iter.reset(new BitmapRangeIterator(_row_bitmap));
}
return Status::OK();
}
Status SegmentIterator::_get_row_ranges_by_keys() {
DorisMetrics::instance()->segment_row_total->increment(num_rows());
// fast path for empty segment or empty key ranges
if (_row_bitmap.isEmpty() || _opts.key_ranges.empty()) {
return Status::OK();
}
// Read & seek key columns is a waste of time when no key column in _schema
if (std::none_of(_schema->columns().begin(), _schema->columns().end(), [&](const Field* col) {
return col && _opts.tablet_schema->column_by_uid(col->unique_id()).is_key();
})) {
return Status::OK();
}
RowRanges result_ranges;
for (auto& key_range : _opts.key_ranges) {
rowid_t lower_rowid = 0;
rowid_t upper_rowid = num_rows();
RETURN_IF_ERROR(_prepare_seek(key_range));
if (key_range.upper_key != nullptr) {
// If client want to read upper_bound, the include_upper is true. So we
// should get the first ordinal at which key is larger than upper_bound.
// So we call _lookup_ordinal with include_upper's negate
RETURN_IF_ERROR(_lookup_ordinal(*key_range.upper_key, !key_range.include_upper,
num_rows(), &upper_rowid));
}
if (upper_rowid > 0 && key_range.lower_key != nullptr) {
RETURN_IF_ERROR(_lookup_ordinal(*key_range.lower_key, key_range.include_lower,
upper_rowid, &lower_rowid));
}
auto row_range = RowRanges::create_single(lower_rowid, upper_rowid);
RowRanges::ranges_union(result_ranges, row_range, &result_ranges);
}
// pre-condition: _row_ranges == [0, num_rows)
size_t pre_size = _row_bitmap.cardinality();
_row_bitmap = RowRanges::ranges_to_roaring(result_ranges);
_opts.stats->rows_key_range_filtered += (pre_size - _row_bitmap.cardinality());
return Status::OK();
}
// Set up environment for the following seek.
Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_range) {
std::vector<const Field*> key_fields;
std::set<uint32_t> column_set;
if (key_range.lower_key != nullptr) {
for (auto cid : key_range.lower_key->schema()->column_ids()) {
column_set.emplace(cid);
key_fields.emplace_back(key_range.lower_key->column_schema(cid));
}
}
if (key_range.upper_key != nullptr) {
for (auto cid : key_range.upper_key->schema()->column_ids()) {
if (column_set.count(cid) == 0) {
key_fields.emplace_back(key_range.upper_key->column_schema(cid));
column_set.emplace(cid);
}
}
}
if (!_seek_schema) {
_seek_schema = std::make_unique<Schema>(key_fields, key_fields.size());
}
// todo(wb) need refactor here, when using pk to search, _seek_block is useless
if (_seek_block.size() == 0) {
_seek_block.resize(_seek_schema->num_column_ids());
int i = 0;
for (auto cid : _seek_schema->column_ids()) {
auto column_desc = _seek_schema->column(cid);
_seek_block[i] = Schema::get_column_by_field(*column_desc);
i++;
}
}
// create used column iterator
for (auto cid : _seek_schema->column_ids()) {
int32_t unique_id = _opts.tablet_schema->column(cid).unique_id();
if (_column_iterators.count(unique_id) < 1) {
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
&_column_iterators[unique_id]));
ColumnIteratorOptions iter_opts;
iter_opts.stats = _opts.stats;
iter_opts.file_reader = _file_reader.get();
iter_opts.io_ctx = _opts.io_ctx;
RETURN_IF_ERROR(_column_iterators[unique_id]->init(iter_opts));
}
}
return Status::OK();
}
Status SegmentIterator::_get_row_ranges_by_column_conditions() {
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_ns);
if (_row_bitmap.isEmpty()) {
return Status::OK();
}
if (config::enable_index_apply_preds_except_leafnode_of_andnode) {
RETURN_IF_ERROR(_apply_index_except_leafnode_of_andnode());
if (_can_filter_by_preds_except_leafnode_of_andnode()) {
for (auto& expr : _remaining_conjunct_roots) {
_pred_except_leafnode_of_andnode_evaluate_result.clear();
auto res = _execute_predicates_except_leafnode_of_andnode(expr);
if (res.ok() && _pred_except_leafnode_of_andnode_evaluate_result.size() == 1) {
_row_bitmap &= _pred_except_leafnode_of_andnode_evaluate_result[0];
}
}
}
}
RETURN_IF_ERROR(_apply_bitmap_index());
RETURN_IF_ERROR(_apply_inverted_index());
std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
if (_opts.use_topn_opt) {
auto query_ctx = _opts.runtime_state->get_query_ctx();
runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
}
if (!_row_bitmap.isEmpty() &&
(runtime_predicate || !_opts.col_id_to_predicates.empty() ||
_opts.delete_condition_predicates->num_of_column_predicate() > 0)) {
RowRanges condition_row_ranges = RowRanges::create_single(_segment->num_rows());
RETURN_IF_ERROR(_get_row_ranges_from_conditions(&condition_row_ranges));
size_t pre_size = _row_bitmap.cardinality();
_row_bitmap &= RowRanges::ranges_to_roaring(condition_row_ranges);
_opts.stats->rows_conditions_filtered += (pre_size - _row_bitmap.cardinality());
}
// TODO(hkp): calculate filter rate to decide whether to
// use zone map/bloom filter/secondary index or not.
return Status::OK();
}
Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row_ranges) {
std::set<int32_t> cids;
for (auto& entry : _opts.col_id_to_predicates) {
cids.insert(entry.first);
}
// first filter data by bloom filter index
// bloom filter index only use CondColumn
RowRanges bf_row_ranges = RowRanges::create_single(num_rows());
for (auto& cid : cids) {
// get row ranges by bf index of this column,
RowRanges column_bf_row_ranges = RowRanges::create_single(num_rows());
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
uint32_t unique_cid = _schema->unique_id(cid);
RETURN_IF_ERROR(_column_iterators[unique_cid]->get_row_ranges_by_bloom_filter(
_opts.col_id_to_predicates.at(cid).get(), &column_bf_row_ranges));
RowRanges::ranges_intersection(bf_row_ranges, column_bf_row_ranges, &bf_row_ranges);
}
size_t pre_size = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, bf_row_ranges, condition_row_ranges);
_opts.stats->rows_bf_filtered += (pre_size - condition_row_ranges->count());
RowRanges zone_map_row_ranges = RowRanges::create_single(num_rows());
// second filter data by zone map
for (auto& cid : cids) {
// get row ranges by zone map of this column,
RowRanges column_row_ranges = RowRanges::create_single(num_rows());
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
RETURN_IF_ERROR(_column_iterators[_schema->unique_id(cid)]->get_row_ranges_by_zone_map(
_opts.col_id_to_predicates.at(cid).get(),
_opts.del_predicates_for_zone_map.count(cid) > 0
? &(_opts.del_predicates_for_zone_map.at(cid))
: nullptr,
&column_row_ranges));
// intersect different columns's row ranges to get final row ranges by zone map
RowRanges::ranges_intersection(zone_map_row_ranges, column_row_ranges,
&zone_map_row_ranges);
}
std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
if (_opts.use_topn_opt) {
auto query_ctx = _opts.runtime_state->get_query_ctx();
runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
if (runtime_predicate) {
int32_t cid = _opts.tablet_schema->column(runtime_predicate->column_id()).unique_id();
AndBlockColumnPredicate and_predicate;
auto single_predicate = new SingleColumnBlockPredicate(runtime_predicate.get());
and_predicate.add_column_predicate(single_predicate);
RowRanges column_rp_row_ranges = RowRanges::create_single(num_rows());
RETURN_IF_ERROR(_column_iterators[_schema->unique_id(cid)]->get_row_ranges_by_zone_map(
&and_predicate, nullptr, &column_rp_row_ranges));
// intersect different columns's row ranges to get final row ranges by zone map
RowRanges::ranges_intersection(zone_map_row_ranges, column_rp_row_ranges,
&zone_map_row_ranges);
}
}
pre_size = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
condition_row_ranges);
_opts.stats->rows_stats_filtered += (pre_size - condition_row_ranges->count());
return Status::OK();
}
// filter rows by evaluating column predicates using bitmap indexes.
// upon return, predicates that've been evaluated by bitmap indexes are removed from _col_predicates.
Status SegmentIterator::_apply_bitmap_index() {
SCOPED_RAW_TIMER(&_opts.stats->bitmap_index_filter_timer);
size_t input_rows = _row_bitmap.cardinality();
std::vector<ColumnPredicate*> remaining_predicates;
for (auto pred : _col_predicates) {
int32_t unique_id = _schema->unique_id(pred->column_id());
if (_bitmap_index_iterators.count(unique_id) < 1 ||
_bitmap_index_iterators[unique_id] == nullptr || pred->type() == PredicateType::BF) {
// no bitmap index for this column
remaining_predicates.push_back(pred);
} else {
RETURN_IF_ERROR(pred->evaluate(_bitmap_index_iterators[unique_id].get(),
_segment->num_rows(), &_row_bitmap));
auto column_name = _schema->column(pred->column_id())->name();
if (_check_column_pred_all_push_down(column_name) &&
!pred->predicate_params()->marked_by_runtime_filter) {
_need_read_data_indices[unique_id] = false;
}
if (_row_bitmap.isEmpty()) {
break; // all rows have been pruned, no need to process further predicates
}
}
}
_col_predicates = std::move(remaining_predicates);
_opts.stats->rows_bitmap_index_filtered += (input_rows - _row_bitmap.cardinality());
return Status::OK();
}
bool SegmentIterator::_is_literal_node(const TExprNodeType::type& node_type) {
switch (node_type) {
case TExprNodeType::BOOL_LITERAL:
case TExprNodeType::INT_LITERAL:
case TExprNodeType::LARGE_INT_LITERAL:
case TExprNodeType::FLOAT_LITERAL:
case TExprNodeType::DECIMAL_LITERAL:
case TExprNodeType::STRING_LITERAL:
case TExprNodeType::DATE_LITERAL:
return true;
default:
return false;
}
}
Status SegmentIterator::_extract_common_expr_columns(const vectorized::VExprSPtr& expr) {
auto& children = expr->children();
for (int i = 0; i < children.size(); ++i) {
RETURN_IF_ERROR(_extract_common_expr_columns(children[i]));
}
auto node_type = expr->node_type();
if (node_type == TExprNodeType::SLOT_REF) {
auto slot_expr = std::dynamic_pointer_cast<doris::vectorized::VSlotRef>(expr);
_is_common_expr_column[_schema->column_id(slot_expr->column_id())] = true;
_common_expr_columns.insert(_schema->column_id(slot_expr->column_id()));
}
return Status::OK();
}
Status SegmentIterator::_execute_predicates_except_leafnode_of_andnode(
const vectorized::VExprSPtr& expr) {
if (expr == nullptr) {
return Status::OK();
}
auto& children = expr->children();
for (int i = 0; i < children.size(); ++i) {
RETURN_IF_ERROR(_execute_predicates_except_leafnode_of_andnode(children[i]));
}
auto node_type = expr->node_type();
if (node_type == TExprNodeType::SLOT_REF) {
_column_predicate_info->column_name = expr->expr_name();
} else if (_is_literal_node(node_type)) {
auto v_literal_expr = std::dynamic_pointer_cast<doris::vectorized::VLiteral>(expr);
_column_predicate_info->query_value = v_literal_expr->value();
} else if (node_type == TExprNodeType::BINARY_PRED || node_type == TExprNodeType::MATCH_PRED) {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else {
_column_predicate_info->query_op = expr->fn().name.function_name;
}
// get child condition result in compound condtions
auto pred_result_sign = _gen_predicate_result_sign(_column_predicate_info.get());
_column_predicate_info.reset(new ColumnPredicateInfo());
if (_rowid_result_for_index.count(pred_result_sign) > 0 &&
_rowid_result_for_index[pred_result_sign].first) {
auto apply_reuslt = _rowid_result_for_index[pred_result_sign].second;
_pred_except_leafnode_of_andnode_evaluate_result.push_back(apply_reuslt);
}
} else if (node_type == TExprNodeType::COMPOUND_PRED) {
auto function_name = expr->fn().name.function_name;
// execute logic function
RETURN_IF_ERROR(_execute_compound_fn(function_name));
}
return Status::OK();
}
Status SegmentIterator::_execute_compound_fn(const std::string& function_name) {
auto size = _pred_except_leafnode_of_andnode_evaluate_result.size();
if (function_name == "and") {
if (size < 2) {
return Status::InternalError("execute and logic compute error.");
}
_pred_except_leafnode_of_andnode_evaluate_result.at(size - 2) &=
_pred_except_leafnode_of_andnode_evaluate_result.at(size - 1);
_pred_except_leafnode_of_andnode_evaluate_result.pop_back();
} else if (function_name == "or") {
if (size < 2) {
return Status::InternalError("execute or logic compute error.");
}
_pred_except_leafnode_of_andnode_evaluate_result.at(size - 2) |=
_pred_except_leafnode_of_andnode_evaluate_result.at(size - 1);
_pred_except_leafnode_of_andnode_evaluate_result.pop_back();
} else if (function_name == "not") {
if (size < 1) {
return Status::InternalError("execute not logic compute error.");
}
roaring::Roaring tmp = _row_bitmap;
tmp -= _pred_except_leafnode_of_andnode_evaluate_result.at(size - 1);
_pred_except_leafnode_of_andnode_evaluate_result.at(size - 1) = tmp;
}
return Status::OK();
}
bool SegmentIterator::_can_filter_by_preds_except_leafnode_of_andnode() {
for (auto pred : _col_preds_except_leafnode_of_andnode) {
if (_not_apply_index_pred.count(pred->column_id()) ||
(!_check_apply_by_bitmap_index(pred) && !_check_apply_by_inverted_index(pred, true))) {
return false;
}
}
return true;
}
bool SegmentIterator::_check_apply_by_bitmap_index(ColumnPredicate* pred) {
int32_t unique_id = _schema->unique_id(pred->column_id());
if (_bitmap_index_iterators.count(unique_id) < 1 ||
_bitmap_index_iterators[unique_id] == nullptr) {
// no bitmap index for this column
return false;
}
return true;
}
bool SegmentIterator::_check_apply_by_inverted_index(ColumnPredicate* pred, bool pred_in_compound) {
int32_t unique_id = _schema->unique_id(pred->column_id());
if (_inverted_index_iterators.count(unique_id) < 1 ||
_inverted_index_iterators[unique_id] == nullptr) {
//this column without inverted index
return false;
}
if (_inverted_index_not_support_pred_type(pred->type())) {
return false;
}
if ((pred->type() == PredicateType::IN_LIST || pred->type() == PredicateType::NOT_IN_LIST) &&
pred->predicate_params()->marked_by_runtime_filter) {
// in_list or not_in_list predicate produced by runtime filter
return false;
}
// Function filter no apply inverted index
if (dynamic_cast<LikeColumnPredicate*>(pred)) {
return false;
}
bool handle_by_fulltext = _column_has_fulltext_index(unique_id);
if (handle_by_fulltext) {
// when predicate in compound condition which except leafNode of andNode,
// only can apply match query for fulltext index,
// when predicate is leafNode of andNode,
// can apply 'match qeury' and 'equal query' and 'list query' for fulltext index.
return (pred_in_compound ? pred->type() == PredicateType::MATCH
: (pred->type() == PredicateType::MATCH ||
PredicateTypeTraits::is_equal_or_list(pred->type())));
}
return true;
}
Status SegmentIterator::_apply_bitmap_index_except_leafnode_of_andnode(
ColumnPredicate* pred, roaring::Roaring* output_result) {
int32_t unique_id = _schema->unique_id(pred->column_id());
RETURN_IF_ERROR(pred->evaluate(_bitmap_index_iterators[unique_id].get(), _segment->num_rows(),
output_result));
return Status::OK();
}
Status SegmentIterator::_apply_inverted_index_except_leafnode_of_andnode(
ColumnPredicate* pred, roaring::Roaring* output_result) {
int32_t unique_id = _schema->unique_id(pred->column_id());
RETURN_IF_ERROR(pred->evaluate(*_schema, _inverted_index_iterators[unique_id].get(), num_rows(),
output_result));
return Status::OK();
}
Status SegmentIterator::_apply_index_except_leafnode_of_andnode() {
for (auto pred : _col_preds_except_leafnode_of_andnode) {
auto pred_type = pred->type();
bool is_support = pred_type == PredicateType::EQ || pred_type == PredicateType::NE ||
pred_type == PredicateType::LT || pred_type == PredicateType::LE ||
pred_type == PredicateType::GT || pred_type == PredicateType::GE ||
pred_type == PredicateType::MATCH;
if (!is_support) {
continue;
}
bool can_apply_by_bitmap_index = _check_apply_by_bitmap_index(pred);
bool can_apply_by_inverted_index = _check_apply_by_inverted_index(pred, true);
roaring::Roaring bitmap = _row_bitmap;
Status res = Status::OK();
if (can_apply_by_bitmap_index) {
res = _apply_bitmap_index_except_leafnode_of_andnode(pred, &bitmap);
} else if (can_apply_by_inverted_index) {
res = _apply_inverted_index_except_leafnode_of_andnode(pred, &bitmap);
} else {
continue;
}
int32_t unique_id = _schema->unique_id(pred->column_id());
bool need_remaining_after_evaluate = _column_has_fulltext_index(unique_id) &&
PredicateTypeTraits::is_equal_or_list(pred_type);
if (!res.ok()) {
if (_downgrade_without_index(res, need_remaining_after_evaluate)) {
// downgrade without index query
_not_apply_index_pred.insert(pred->column_id());
continue;
}
LOG(WARNING) << "failed to evaluate index"
<< ", column predicate type: " << pred->pred_type_string(pred->type())
<< ", error msg: " << res;
return res;
}
std::string pred_result_sign = _gen_predicate_result_sign(pred);
_rowid_result_for_index.emplace(
std::make_pair(pred_result_sign, std::make_pair(true, bitmap)));
}
for (auto pred : _col_preds_except_leafnode_of_andnode) {
auto column_name = _schema->column(pred->column_id())->name();
if (!_remaining_conjunct_roots.empty() &&
_check_column_pred_all_push_down(column_name, true) &&
!pred->predicate_params()->marked_by_runtime_filter) {
int32_t unique_id = _schema->unique_id(pred->column_id());
_need_read_data_indices[unique_id] = false;
}
}
return Status::OK();
}
bool SegmentIterator::_downgrade_without_index(Status res, bool need_remaining) {
if (res.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND ||
res.code() == ErrorCode::INVERTED_INDEX_BYPASS ||
res.code() == ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED ||
(res.code() == ErrorCode::INVERTED_INDEX_NO_TERMS && need_remaining)) {
// 1. INVERTED_INDEX_FILE_NOT_FOUND means index file has not been built,
// usually occurs when creating a new index, queries can be downgraded
// without index.
// 2. INVERTED_INDEX_BYPASS means the hit of condition by index
// has reached the optimal limit, downgrade without index query can
// improve query performance.
// 3. INVERTED_INDEX_EVALUATE_SKIPPED means the inverted index is not
// suitable for executing this predicate, skipped it and filter data
// by function later.
// 4. INVERTED_INDEX_NO_TERMS means the column has fulltext index,
// but the column condition value no terms in specified parser,
// such as: where A = '' and B = ','
// the predicate of A and B need downgrade without index query.
// above case can downgrade without index query
return true;
}
return false;
}
std::string SegmentIterator::_gen_predicate_result_sign(ColumnPredicate* predicate) {
std::string pred_result_sign;
auto column_desc = _schema->column(predicate->column_id());
auto pred_type = predicate->type();
auto predicate_params = predicate->predicate_params();
pred_result_sign = BeConsts::BLOCK_TEMP_COLUMN_PREFIX + column_desc->name() + "_" +
predicate->pred_type_string(pred_type) + "_" + predicate_params->value;
return pred_result_sign;
}
std::string SegmentIterator::_gen_predicate_result_sign(ColumnPredicateInfo* predicate_info) {
std::string pred_result_sign;
pred_result_sign = BeConsts::BLOCK_TEMP_COLUMN_PREFIX + predicate_info->column_name + "_" +
predicate_info->query_op + "_" + predicate_info->query_value;
return pred_result_sign;
}
bool SegmentIterator::_column_has_fulltext_index(int32_t unique_id) {
bool has_fulltext_index =
_inverted_index_iterators[unique_id] != nullptr &&
_inverted_index_iterators[unique_id]->get_inverted_index_reader_type() ==
InvertedIndexReaderType::FULLTEXT;
return has_fulltext_index;
}
inline bool SegmentIterator::_inverted_index_not_support_pred_type(const PredicateType& type) {
return type == PredicateType::BF || type == PredicateType::BITMAP_FILTER;
}
#define all_predicates_are_range_predicate(predicate_set) \
std::all_of(predicate_set.begin(), predicate_set.end(), \
[](const ColumnPredicate* p) { return PredicateTypeTraits::is_range(p->type()); })
#define all_predicates_are_marked_by_runtime_filter(predicate_set) \
std::all_of(predicate_set.begin(), predicate_set.end(), [](const ColumnPredicate* p) { \
return const_cast<ColumnPredicate*>(p)->predicate_params()->marked_by_runtime_filter; \
})
Status SegmentIterator::_apply_inverted_index_on_column_predicate(
ColumnPredicate* pred, std::vector<ColumnPredicate*>& remaining_predicates,
bool* continue_apply) {
if (!_check_apply_by_inverted_index(pred)) {
remaining_predicates.emplace_back(pred);
} else {
int32_t unique_id = _schema->unique_id(pred->column_id());
bool need_remaining_after_evaluate = _column_has_fulltext_index(unique_id) &&
PredicateTypeTraits::is_equal_or_list(pred->type());
roaring::Roaring bitmap = _row_bitmap;
Status res = pred->evaluate(*_schema, _inverted_index_iterators[unique_id].get(),
num_rows(), &bitmap);
if (!res.ok()) {
if (_downgrade_without_index(res, need_remaining_after_evaluate)) {
remaining_predicates.emplace_back(pred);
return Status::OK();
}
LOG(WARNING) << "failed to evaluate index"
<< ", column predicate type: " << pred->pred_type_string(pred->type())
<< ", error msg: " << res;
return res;
}
auto pred_type = pred->type();
if (pred_type == PredicateType::MATCH) {
std::string pred_result_sign = _gen_predicate_result_sign(pred);
_rowid_result_for_index.emplace(
std::make_pair(pred_result_sign, std::make_pair(false, bitmap)));
}
_row_bitmap &= bitmap;
if (_row_bitmap.isEmpty()) {
// all rows have been pruned, no need to process further predicates
*continue_apply = false;
}
if (need_remaining_after_evaluate) {
remaining_predicates.emplace_back(pred);
return Status::OK();
}
auto column_name = _schema->column(pred->column_id())->name();
if (_check_column_pred_all_push_down(column_name) &&
!pred->predicate_params()->marked_by_runtime_filter) {
_need_read_data_indices[unique_id] = false;
}
}
return Status::OK();
}
Status SegmentIterator::_apply_inverted_index_on_block_column_predicate(
ColumnId column_id, MutilColumnBlockPredicate* pred,
std::set<const ColumnPredicate*>& no_need_to_pass_column_predicate_set,
bool* continue_apply) {
auto unique_id = _schema->unique_id(column_id);
bool handle_by_fulltext = _column_has_fulltext_index(unique_id);
std::set<const ColumnPredicate*> predicate_set {};
pred->get_all_column_predicate(predicate_set);
//four requirements here.
//1. Column has inverted index
//2. There are multiple predicates for this column.
//3. All the predicates are range predicate.
//4. if it's under fulltext parser type, we need to skip inverted index evaluate.
if (_inverted_index_iterators.count(unique_id) > 0 &&
_inverted_index_iterators[unique_id] != nullptr && predicate_set.size() > 1 &&
all_predicates_are_range_predicate(predicate_set) && !handle_by_fulltext) {
roaring::Roaring output_result = _row_bitmap;
std::string column_name = _schema->column(column_id)->name();
auto res = pred->evaluate(column_name, _inverted_index_iterators[unique_id].get(),
num_rows(), &output_result);
if (res.ok()) {
if (_check_column_pred_all_push_down(column_name) &&
!all_predicates_are_marked_by_runtime_filter(predicate_set)) {
_need_read_data_indices[unique_id] = false;
}
no_need_to_pass_column_predicate_set.insert(predicate_set.begin(), predicate_set.end());
_row_bitmap &= output_result;
if (_row_bitmap.isEmpty()) {
// all rows have been pruned, no need to process further predicates
*continue_apply = false;
}
return res;
} else {
//TODO:mock until AndBlockColumnPredicate evaluate is ok.
if (res.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) {
return Status::OK();
}
LOG(WARNING) << "failed to evaluate index"
<< ", column predicate type: range predicate"
<< ", error msg: " << res;
return res;
}
}
return Status::OK();
}
bool SegmentIterator::_need_read_data(ColumnId cid) {
if (_output_columns.count(-1)) {
// if _output_columns contains -1, it means that the light
// weight schema change may not be enabled or other reasons
// caused the column unique_id not be set, to prevent errors
// occurring, return true here that column data needs to be read
return true;
}
int32_t unique_id = _opts.tablet_schema->column(cid).unique_id();
if (_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] &&
_output_columns.count(unique_id) < 1) {
VLOG_DEBUG << "SegmentIterator no need read data for column: "
<< _opts.tablet_schema->column_by_uid(unique_id).name();
return false;
}
return true;
}
Status SegmentIterator::_apply_inverted_index() {
SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer);
size_t input_rows = _row_bitmap.cardinality();
std::vector<ColumnPredicate*> remaining_predicates;
std::set<const ColumnPredicate*> no_need_to_pass_column_predicate_set;
for (const auto& entry : _opts.col_id_to_predicates) {
ColumnId column_id = entry.first;
auto pred = entry.second;
bool continue_apply = true;
RETURN_IF_ERROR(_apply_inverted_index_on_block_column_predicate(
column_id, pred.get(), no_need_to_pass_column_predicate_set, &continue_apply));
if (!continue_apply) {
break;
}
}
for (auto pred : _col_predicates) {
if (no_need_to_pass_column_predicate_set.count(pred) > 0) {
continue;
} else {
bool continue_apply = true;
RETURN_IF_ERROR(_apply_inverted_index_on_column_predicate(pred, remaining_predicates,
&continue_apply));
if (!continue_apply) {
break;
}
}
}
_col_predicates = std::move(remaining_predicates);
_opts.stats->rows_inverted_index_filtered += (input_rows - _row_bitmap.cardinality());
return Status::OK();
}
Status SegmentIterator::_init_return_column_iterators() {
if (_cur_rowid >= num_rows()) {
return Status::OK();
}
for (auto cid : _schema->column_ids()) {
if (_schema->column(cid)->name() == BeConsts::ROWID_COL) {
_column_iterators[_schema->column(cid)->unique_id()].reset(
new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id, _segment->id()));
continue;
}
int32_t unique_id = _opts.tablet_schema->column(cid).unique_id();
if (_column_iterators.count(unique_id) < 1) {
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
&_column_iterators[unique_id]));
ColumnIteratorOptions iter_opts;
iter_opts.stats = _opts.stats;
iter_opts.use_page_cache = _opts.use_page_cache;
iter_opts.file_reader = _file_reader.get();
iter_opts.io_ctx = _opts.io_ctx;
RETURN_IF_ERROR(_column_iterators[unique_id]->init(iter_opts));
}
}
return Status::OK();
}
Status SegmentIterator::_init_bitmap_index_iterators() {
if (_cur_rowid >= num_rows()) {
return Status::OK();
}
for (auto cid : _schema->column_ids()) {
int32_t unique_id = _opts.tablet_schema->column(cid).unique_id();
if (_bitmap_index_iterators.count(unique_id) < 1) {
RETURN_IF_ERROR(_segment->new_bitmap_index_iterator(
_opts.tablet_schema->column(cid), &_bitmap_index_iterators[unique_id]));
}
}
return Status::OK();
}
Status SegmentIterator::_init_inverted_index_iterators() {
if (_cur_rowid >= num_rows()) {
return Status::OK();
}
for (auto cid : _schema->column_ids()) {
int32_t unique_id = _opts.tablet_schema->column(cid).unique_id();
if (_inverted_index_iterators.count(unique_id) < 1) {
RETURN_IF_ERROR(_segment->new_inverted_index_iterator(
_opts.tablet_schema->column(cid), _opts.tablet_schema->get_inverted_index(cid),
_opts.stats, &_inverted_index_iterators[unique_id]));
}
}
return Status::OK();
}
Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound,
rowid_t* rowid) {
if (_segment->_tablet_schema->keys_type() == UNIQUE_KEYS &&
_segment->get_primary_key_index() != nullptr) {
return _lookup_ordinal_from_pk_index(key, is_include, rowid);
}
return _lookup_ordinal_from_sk_index(key, is_include, upper_bound, rowid);
}
// look up one key to get its ordinal at which can get data by using short key index.
// 'upper_bound' is defined the max ordinal the function will search.
// We use upper_bound to reduce search times.
// If we find a valid ordinal, it will be set in rowid and with Status::OK()
// If we can not find a valid key in this segment, we will set rowid to upper_bound
// Otherwise return error.
// 1. get [start, end) ordinal through short key index
// 2. binary search to find exact ordinal that match the input condition
// Make is_include template to reduce branch
Status SegmentIterator::_lookup_ordinal_from_sk_index(const RowCursor& key, bool is_include,
rowid_t upper_bound, rowid_t* rowid) {
const ShortKeyIndexDecoder* sk_index_decoder = _segment->get_short_key_index();
DCHECK(sk_index_decoder != nullptr);
std::string index_key;
encode_key_with_padding(&index_key, key, _segment->_tablet_schema->num_short_key_columns(),
is_include);
const auto& key_col_ids = key.schema()->column_ids();
_convert_rowcursor_to_short_key(key, key_col_ids.size());
uint32_t start_block_id = 0;
auto start_iter = sk_index_decoder->lower_bound(index_key);
if (start_iter.valid()) {
// Because previous block may contain this key, so we should set rowid to
// last block's first row.
start_block_id = start_iter.ordinal();
if (start_block_id > 0) {
start_block_id--;
}
} else {
// When we don't find a valid index item, which means all short key is
// smaller than input key, this means that this key may exist in the last
// row block. so we set the rowid to first row of last row block.
start_block_id = sk_index_decoder->num_items() - 1;
}
rowid_t start = start_block_id * sk_index_decoder->num_rows_per_block();
rowid_t end = upper_bound;
auto end_iter = sk_index_decoder->upper_bound(index_key);
if (end_iter.valid()) {
end = end_iter.ordinal() * sk_index_decoder->num_rows_per_block();
}
// binary search to find the exact key
while (start < end) {
rowid_t mid = (start + end) / 2;
RETURN_IF_ERROR(_seek_and_peek(mid));
int cmp = _compare_short_key_with_seek_block(key_col_ids);
if (cmp > 0) {
start = mid + 1;
} else if (cmp == 0) {
if (is_include) {
// lower bound
end = mid;
} else {
// upper bound
start = mid + 1;
}
} else {
end = mid;
}
}
*rowid = start;
return Status::OK();
}
Status SegmentIterator::_lookup_ordinal_from_pk_index(const RowCursor& key, bool is_include,
rowid_t* rowid) {
DCHECK(_segment->_tablet_schema->keys_type() == UNIQUE_KEYS);
const PrimaryKeyIndexReader* pk_index_reader = _segment->get_primary_key_index();
DCHECK(pk_index_reader != nullptr);
std::string index_key;
encode_key_with_padding<RowCursor, true, true>(
&index_key, key, _segment->_tablet_schema->num_key_columns(), is_include);
if (index_key < _segment->min_key()) {
*rowid = 0;
return Status::OK();
} else if (index_key > _segment->max_key()) {
*rowid = num_rows();
return Status::OK();
}
bool exact_match = false;
std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator;
RETURN_IF_ERROR(pk_index_reader->new_iterator(&index_iterator));
Status status = index_iterator->seek_at_or_after(&index_key, &exact_match);
if (UNLIKELY(!status.ok())) {
*rowid = num_rows();
if (status.is<NOT_FOUND>()) {
return Status::OK();
}
return status;
}
*rowid = index_iterator->get_current_ordinal();
// The sequence column needs to be removed from primary key index when comparing key
bool has_seq_col = _segment->_tablet_schema->has_sequence_col();
if (has_seq_col) {
size_t seq_col_length =
_segment->_tablet_schema->column(_segment->_tablet_schema->sequence_col_idx())
.length() +
1;
auto index_type = vectorized::DataTypeFactory::instance().create_data_type(
_segment->_pk_index_reader->type_info()->type(), 1, 0);
auto index_column = index_type->create_column();
size_t num_to_read = 1;
size_t num_read = num_to_read;
RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column));
DCHECK(num_to_read == num_read);
Slice sought_key =
Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
Slice sought_key_without_seq =
Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length);
// compare key
if (Slice(index_key).compare(sought_key_without_seq) == 0) {
exact_match = true;
}
}
// find the key in primary key index, and the is_include is false, so move
// to the next row.
if (exact_match && !is_include) {
*rowid += 1;
}
return Status::OK();
}
// seek to the row and load that row to _key_cursor
Status SegmentIterator::_seek_and_peek(rowid_t rowid) {
{
_opts.stats->block_init_seek_num += 1;
SCOPED_RAW_TIMER(&_opts.stats->block_init_seek_ns);
RETURN_IF_ERROR(_seek_columns(_seek_schema->column_ids(), rowid));
}
size_t num_rows = 1;
//note(wb) reset _seek_block for memory reuse
// it is easier to use row based memory layout for clear memory
for (int i = 0; i < _seek_block.size(); i++) {
_seek_block[i]->clear();
}
RETURN_IF_ERROR(_read_columns(_seek_schema->column_ids(), _seek_block, num_rows));
return Status::OK();
}
Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids, rowid_t pos) {
for (auto cid : column_ids) {
if (!_need_read_data(cid)) {
continue;
}
RETURN_IF_ERROR(_column_iterators[_schema->unique_id(cid)]->seek_to_ordinal(pos));
}
return Status::OK();
}
/* ---------------------- for vectorization implementation ---------------------- */
/**
* For storage layer data type, can be measured from two perspectives:
* 1 Whether the type can be read in a fast way(batch read using SIMD)
* Such as integer type and float type, this type can be read in SIMD way.
* For the type string/bitmap/hll, they can not be read in batch way, so read this type data is slow.
* If a type can be read fast, we can try to eliminate Lazy Materialization, because we think for this type, seek cost > read cost.
* This is an estimate, if we want more precise cost, statistics collection is necessary(this is a todo).
* In short, when returned non-pred columns contains string/hll/bitmap, we using Lazy Materialization.
* Otherwise, we disable it.
*
* When Lazy Materialization enable, we need to read column at least two times.
* First time to read Pred col, second time to read non-pred.
* Here's an interesting question to research, whether read Pred col once is the best plan.
* (why not read Pred col twice or more?)
*
* When Lazy Materialization disable, we just need to read once.
*
*
* 2 Whether the predicate type can be evaluate in a fast way(using SIMD to eval pred)
* Such as integer type and float type, they can be eval fast.
* But for BloomFilter/string/date, they eval slow.
* If a type can be eval fast, we use vectorization to eval it.
* Otherwise, we use short-circuit to eval it.
*
*
*/
// todo(wb) need a UT here
Status SegmentIterator::_vec_init_lazy_materialization() {
_is_pred_column.resize(_schema->columns().size(), false);
// including short/vec/delete pred
std::set<ColumnId> pred_column_ids;
_lazy_materialization_read = false;
std::set<ColumnId> del_cond_id_set;
_opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set);
std::set<const ColumnPredicate*> delete_predicate_set {};
_opts.delete_condition_predicates->get_all_column_predicate(delete_predicate_set);
for (const auto predicate : delete_predicate_set) {
if (PredicateTypeTraits::is_range(predicate->type())) {
_delete_range_column_ids.push_back(predicate->column_id());
} else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) {
_delete_bloom_filter_column_ids.push_back(predicate->column_id());
}
}
// add runtime predicate to _col_predicates
// should NOT add for order by key,
// since key is already sorted and topn_next only need first N rows from each segment,
// but runtime predicate will filter some rows and read more than N rows.
// should add add for order by none-key column, since none-key column is not sorted and
// all rows should be read, so runtime predicate will reduce rows for topn node
if (_opts.use_topn_opt &&
!(_opts.read_orderby_key_columns != nullptr && !_opts.read_orderby_key_columns->empty())) {
auto& runtime_predicate = _opts.runtime_state->get_query_ctx()->get_runtime_predicate();
_runtime_predicate = runtime_predicate.get_predictate();
if (_runtime_predicate) {
_col_predicates.push_back(_runtime_predicate.get());
}
}
// Step1: extract columns that can be lazy materialization
if (!_col_predicates.empty() || !del_cond_id_set.empty()) {
std::set<ColumnId> short_cir_pred_col_id_set; // using set for distinct cid
std::set<ColumnId> vec_pred_col_id_set;
for (auto predicate : _col_predicates) {
auto cid = predicate->column_id();
_is_pred_column[cid] = true;
pred_column_ids.insert(cid);
// check pred using short eval or vec eval
if (_can_evaluated_by_vectorized(predicate)) {
vec_pred_col_id_set.insert(predicate->column_id());
_pre_eval_block_predicate.push_back(predicate);
} else {
short_cir_pred_col_id_set.insert(cid);
_short_cir_eval_predicate.push_back(predicate);
_filter_info_id.push_back(predicate);
}
}
// handle delete_condition
if (!del_cond_id_set.empty()) {
short_cir_pred_col_id_set.insert(del_cond_id_set.begin(), del_cond_id_set.end());
pred_column_ids.insert(del_cond_id_set.begin(), del_cond_id_set.end());
for (auto cid : del_cond_id_set) {
_is_pred_column[cid] = true;
}
}
_vec_pred_column_ids.assign(vec_pred_col_id_set.cbegin(), vec_pred_col_id_set.cend());
_short_cir_pred_column_ids.assign(short_cir_pred_col_id_set.cbegin(),
short_cir_pred_col_id_set.cend());
}
if (!_vec_pred_column_ids.empty()) {
_is_need_vec_eval = true;
}
if (!_short_cir_pred_column_ids.empty()) {
_is_need_short_eval = true;
}
// make _schema_block_id_map
_schema_block_id_map.resize(_schema->columns().size());
for (int i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
_schema_block_id_map[cid] = i;
}
// Step2: extract columns that can execute expr context
_is_common_expr_column.resize(_schema->columns().size(), false);
if (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty()) {
for (auto expr : _remaining_conjunct_roots) {
RETURN_IF_ERROR(_extract_common_expr_columns(expr));
}
if (!_common_expr_columns.empty()) {
_is_need_expr_eval = true;
for (auto cid : _schema->column_ids()) {
// pred column also needs to be filtered by expr
if (_is_common_expr_column[cid] || _is_pred_column[cid]) {
auto loc = _schema_block_id_map[cid];
_columns_to_filter.push_back(loc);
}
}
}
}
// Step 3: fill non predicate columns and second read column
// if _schema columns size equal to pred_column_ids size, lazy_materialization_read is false,
// all columns are lazy materialization columns without non predicte column.
// If common expr pushdown exists, and expr column is not contained in lazy materialization columns,
// add to second read column, which will be read after lazy materialization
if (_schema->column_ids().size() > pred_column_ids.size()) {
for (auto cid : _schema->column_ids()) {
if (!_is_pred_column[cid]) {
if (_is_need_vec_eval || _is_need_short_eval) {
_lazy_materialization_read = true;
}
if (!_is_common_expr_column[cid]) {
_non_predicate_columns.push_back(cid);
} else {
_second_read_column_ids.push_back(cid);
}
}
}
}
// Step 4: fill first read columns
if (_lazy_materialization_read) {
// insert pred cid to first_read_columns
for (auto cid : pred_column_ids) {
_first_read_column_ids.push_back(cid);
}
} else if (!_is_need_vec_eval && !_is_need_short_eval &&
!_is_need_expr_eval) { // no pred exists, just read and output column
for (int i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
_first_read_column_ids.push_back(cid);
}
} else {
if (_is_need_vec_eval || _is_need_short_eval) {
// TODO To refactor, because we suppose lazy materialization is better performance.
// pred exits, but we can eliminate lazy materialization
// insert pred/non-pred cid to first read columns
std::set<ColumnId> pred_id_set;
pred_id_set.insert(_short_cir_pred_column_ids.begin(),
_short_cir_pred_column_ids.end());
pred_id_set.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end());
std::set<ColumnId> non_pred_set(_non_predicate_columns.begin(),
_non_predicate_columns.end());
DCHECK(_second_read_column_ids.empty());
// _second_read_column_ids must be empty. Otherwise _lazy_materialization_read must not false.
for (int i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
if (pred_id_set.find(cid) != pred_id_set.end()) {
_first_read_column_ids.push_back(cid);
} else if (non_pred_set.find(cid) != non_pred_set.end()) {
_first_read_column_ids.push_back(cid);
// when _lazy_materialization_read = false, non-predicate column should also be filtered by sel idx, so we regard it as pred columns
_is_pred_column[cid] = true;
}
}
} else if (_is_need_expr_eval) {
DCHECK(!_is_need_vec_eval && !_is_need_short_eval);
for (auto cid : _common_expr_columns) {
_first_read_column_ids.push_back(cid);
}
}
}
return Status::OK();
}
bool SegmentIterator::_can_evaluated_by_vectorized(ColumnPredicate* predicate) {
auto cid = predicate->column_id();
FieldType field_type = _schema->column(cid)->type();
switch (predicate->type()) {
case PredicateType::EQ:
case PredicateType::NE:
case PredicateType::LE:
case PredicateType::LT:
case PredicateType::GE:
case PredicateType::GT: {
if (field_type == FieldType::OLAP_FIELD_TYPE_VARCHAR ||
field_type == FieldType::OLAP_FIELD_TYPE_CHAR ||
field_type == FieldType::OLAP_FIELD_TYPE_STRING) {
return config::enable_low_cardinality_optimize &&
_opts.io_ctx.reader_type == ReaderType::READER_QUERY &&
_column_iterators[_schema->unique_id(cid)]->is_all_dict_encoding();
} else if (field_type == FieldType::OLAP_FIELD_TYPE_DECIMAL) {
return false;
}
return true;
}
default:
return false;
}
}
void SegmentIterator::_vec_init_char_column_id() {
for (size_t i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
auto column_desc = _schema->column(cid);
do {
if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_CHAR) {
_char_type_idx.emplace_back(i);
if (i != 0) {
_char_type_idx_no_0.emplace_back(i);
}
break;
} else if (column_desc->type() != FieldType::OLAP_FIELD_TYPE_ARRAY) {
break;
}
// for Array<Char> or Array<Array<Char>>
column_desc = column_desc->get_sub_field(0);
} while (column_desc != nullptr);
}
}
bool SegmentIterator::_prune_column(ColumnId cid, vectorized::MutableColumnPtr& column,
bool fill_defaults, size_t num_of_defaults) {
if (_need_read_data(cid)) {
return false;
}
if (!fill_defaults) {
return true;
}
if (column->is_nullable()) {
auto nullable_col_ptr = reinterpret_cast<vectorized::ColumnNullable*>(column.get());
nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults);
nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults);
} else {
// assert(column->is_const());
column->insert_many_defaults(num_of_defaults);
}
return true;
}
Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
vectorized::MutableColumns& column_block, size_t nrows) {
for (auto cid : column_ids) {
auto& column = column_block[cid];
size_t rows_read = nrows;
if (_prune_column(cid, column, true, rows_read)) {
continue;
}
RETURN_IF_ERROR(_column_iterators[_schema->unique_id(cid)]->next_batch(&rows_read, column));
if (nrows != rows_read) {
return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", nrows,
rows_read);
}
}
return Status::OK();
}
void SegmentIterator::_init_current_block(
vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& current_columns) {
block->clear_column_data(_schema->num_column_ids());
for (size_t i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
auto column_desc = _schema->column(cid);
// the column in block must clear() here to insert new data
if (_is_pred_column[cid] ||
i >= block->columns()) { //todo(wb) maybe we can release it after output block
current_columns[cid]->clear();
} else { // non-predicate column
current_columns[cid] = std::move(*block->get_by_position(i).column).mutate();
if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_DATE) {
current_columns[cid]->set_date_type();
} else if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_DATETIME) {
current_columns[cid]->set_datetime_type();
} else if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_DECIMAL) {
current_columns[cid]->set_decimalv2_type();
}
current_columns[cid]->reserve(_opts.block_row_max);
}
}
}
void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
for (auto cid : _non_predicate_columns) {
auto loc = _schema_block_id_map[cid];
// if loc > block->columns() means the column is delete column and should
// not output by block, so just skip the column.
if (loc < block->columns()) {
block->replace_by_position(loc, std::move(_current_return_columns[cid]));
}
}
}
Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32_t& nrows_read,
bool set_block_rowid) {
SCOPED_RAW_TIMER(&_opts.stats->first_read_ns);
do {
uint32_t range_from;
uint32_t range_to;
bool has_next_range =
_range_iter->next_range(nrows_read_limit - nrows_read, &range_from, &range_to);
if (!has_next_range) {
break;
}
if (_cur_rowid == 0 || _cur_rowid != range_from) {
_cur_rowid = range_from;
_opts.stats->block_first_read_seek_num += 1;
SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
RETURN_IF_ERROR(_seek_columns(_first_read_column_ids, _cur_rowid));
}
size_t rows_to_read = range_to - range_from;
RETURN_IF_ERROR(
_read_columns(_first_read_column_ids, _current_return_columns, rows_to_read));
_cur_rowid += rows_to_read;
if (set_block_rowid) {
// Here use std::iota is better performance than for-loop, maybe for-loop is not vectorized
auto start = _block_rowids.data() + nrows_read;
auto end = start + rows_to_read;
std::iota(start, end, range_from);
nrows_read += rows_to_read;
} else {
nrows_read += rows_to_read;
}
_split_row_ranges.emplace_back(std::pair {range_from, range_to});
// if _opts.read_orderby_key_reverse is true, only read one range for fast reverse purpose
} while (nrows_read < nrows_read_limit && !_opts.read_orderby_key_reverse);
return Status::OK();
}
void SegmentIterator::_replace_version_col(size_t num_rows) {
// Only the rowset with single version need to replace the version column.
// Doris can't determine the version before publish_version finished, so
// we can't write data to __DORIS_VERSION_COL__ in segment writer, the value
// is 0 by default.
// So we need to replace the value to real version while reading.
if (_opts.version.first != _opts.version.second) {
return;
}
auto cids = _schema->column_ids();
int32_t version_idx = _schema->version_col_idx();
auto iter = std::find(cids.begin(), cids.end(), version_idx);
if (iter == cids.end()) {
return;
}
auto column_desc = _schema->column(version_idx);
auto column = Schema::get_data_type_ptr(*column_desc)->create_column();
DCHECK(_schema->column(version_idx)->type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
auto col_ptr = reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(column.get());
for (size_t j = 0; j < num_rows; j++) {
col_ptr->insert_value(_opts.version.second);
}
_current_return_columns[version_idx] = std::move(column);
VLOG_DEBUG << "replaced version column in segment iterator, version_col_idx:" << version_idx;
}
uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_idx,
uint16_t selected_size) {
SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns);
if (!_is_need_vec_eval) {
for (uint32_t i = 0; i < selected_size; ++i) {
sel_rowid_idx[i] = i;
}
return selected_size;
}
uint16_t original_size = selected_size;
bool ret_flags[original_size];
DCHECK(_pre_eval_block_predicate.size() > 0);
auto column_id = _pre_eval_block_predicate[0]->column_id();
auto& column = _current_return_columns[column_id];
_pre_eval_block_predicate[0]->evaluate_vec(*column, original_size, ret_flags);
for (int i = 1; i < _pre_eval_block_predicate.size(); i++) {
auto column_id2 = _pre_eval_block_predicate[i]->column_id();
auto& column2 = _current_return_columns[column_id2];
_pre_eval_block_predicate[i]->evaluate_and_vec(*column2, original_size, ret_flags);
}
uint16_t new_size = 0;
uint32_t sel_pos = 0;
const uint32_t sel_end = sel_pos + selected_size;
static constexpr size_t SIMD_BYTES = 32;
const uint32_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES;
while (sel_pos < sel_end_simd) {
auto mask = simd::bytes32_mask_to_bits32_mask(ret_flags + sel_pos);
if (0 == mask) {
//pass
} else if (0xffffffff == mask) {
for (uint32_t i = 0; i < SIMD_BYTES; i++) {
sel_rowid_idx[new_size++] = sel_pos + i;
}
} else {
while (mask) {
const size_t bit_pos = __builtin_ctzll(mask);
sel_rowid_idx[new_size++] = sel_pos + bit_pos;
mask = mask & (mask - 1);
}
}
sel_pos += SIMD_BYTES;
}
for (; sel_pos < sel_end; sel_pos++) {
if (ret_flags[sel_pos]) {
sel_rowid_idx[new_size++] = sel_pos;
}
}
_opts.stats->vec_cond_input_rows += original_size;
_opts.stats->rows_vec_cond_filtered += original_size - new_size;
return new_size;
}
uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_rowid_idx,
uint16_t selected_size) {
SCOPED_RAW_TIMER(&_opts.stats->short_cond_ns);
if (!_is_need_short_eval) {
return selected_size;
}
uint16_t original_size = selected_size;
for (auto predicate : _short_cir_eval_predicate) {
auto column_id = predicate->column_id();
auto& short_cir_column = _current_return_columns[column_id];
selected_size = predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, selected_size);
}
// collect profile
for (auto p : _filter_info_id) {
_opts.stats->filter_info[p->get_filter_id()] = p->get_filtered_info();
}
_opts.stats->short_circuit_cond_input_rows += original_size;
_opts.stats->rows_short_circuit_cond_filtered += original_size - selected_size;
// evaluate delete condition
original_size = selected_size;
selected_size = _opts.delete_condition_predicates->evaluate(_current_return_columns,
vec_sel_rowid_idx, selected_size);
_opts.stats->rows_vec_del_cond_filtered += original_size - selected_size;
return selected_size;
}
Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
std::vector<rowid_t>& rowid_vector,
uint16_t* sel_rowid_idx, size_t select_size,
vectorized::MutableColumns* mutable_columns) {
SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
std::vector<rowid_t> rowids(select_size);
for (size_t i = 0; i < select_size; ++i) {
rowids[i] = rowid_vector[sel_rowid_idx[i]];
}
for (auto cid : read_column_ids) {
if (_prune_column(cid, (*mutable_columns)[cid], true, select_size)) {
continue;
}
RETURN_IF_ERROR(_column_iterators[_schema->unique_id(cid)]->read_by_rowids(
rowids.data(), select_size, _current_return_columns[cid]));
}
return Status::OK();
}
Status SegmentIterator::next_batch(vectorized::Block* block) {
RETURN_IF_CATCH_EXCEPTION({ return _next_batch_internal(block); });
return Status::OK();
}
Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
bool is_mem_reuse = block->mem_reuse();
DCHECK(is_mem_reuse);
SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
if (UNLIKELY(!_lazy_inited)) {
RETURN_IF_ERROR(_lazy_init());
_lazy_inited = true;
if (_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval) {
_block_rowids.resize(_opts.block_row_max);
}
_current_return_columns.resize(_schema->columns().size());
for (size_t i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
auto column_desc = _schema->column(cid);
if (_is_pred_column[cid]) {
_current_return_columns[cid] =
Schema::get_predicate_column_ptr(*column_desc, _opts.io_ctx.reader_type);
_current_return_columns[cid]->set_rowset_segment_id(
{_segment->rowset_id(), _segment->id()});
_current_return_columns[cid]->reserve(_opts.block_row_max);
} else if (i >= block->columns()) {
// if i >= block->columns means the column and not the pred_column means `column i` is
// a delete condition column. but the column is not effective in the segment. so we just
// create a column to hold the data.
// a. origin data -> b. delete condition -> c. new load data
// the segment of c do not effective delete condition, but it still need read the column
// to match the schema.
// TODO: skip read the not effective delete column to speed up segment read.
_current_return_columns[cid] =
Schema::get_data_type_ptr(*column_desc)->create_column();
_current_return_columns[cid]->reserve(_opts.block_row_max);
}
}
}
_init_current_block(block, _current_return_columns);
_current_batch_rows_read = 0;
uint32_t nrows_read_limit = _opts.block_row_max;
if (_wait_times_estimate_row_size > 0) {
// first time, read 100 rows to estimate average row size, to avoid oom caused by a single batch being too large.
// If no valid data is read for the first time, block_row_max is read each time thereafter.
// Avoid low performance when valid data cannot be read all the time
nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100);
_wait_times_estimate_row_size--;
}
_split_row_ranges.clear();
_split_row_ranges.reserve(nrows_read_limit / 2);
RETURN_IF_ERROR(_read_columns_by_index(
nrows_read_limit, _current_batch_rows_read,
_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval));
if (std::find(_first_read_column_ids.begin(), _first_read_column_ids.end(),
_schema->version_col_idx()) != _first_read_column_ids.end()) {
_replace_version_col(_current_batch_rows_read);
}
_opts.stats->blocks_load += 1;
_opts.stats->raw_rows_read += _current_batch_rows_read;
if (_current_batch_rows_read == 0) {
for (int i = 0; i < block->columns(); i++) {
auto cid = _schema->column_id(i);
// todo(wb) abstract make column where
if (!_is_pred_column[cid]) {
block->replace_by_position(i, std::move(_current_return_columns[cid]));
}
}
block->clear_column_data();
return Status::EndOfFile("no more data in segment");
}
if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
_output_non_pred_columns(block);
_output_index_result_column(nullptr, 0, block);
} else {
uint16_t selected_size = _current_batch_rows_read;
uint16_t sel_rowid_idx[selected_size];
if (_is_need_vec_eval || _is_need_short_eval) {
_convert_dict_code_for_predicate_if_necessary();
// step 1: evaluate vectorization predicate
selected_size = _evaluate_vectorization_predicate(sel_rowid_idx, selected_size);
// step 2: evaluate short circuit predicate
// todo(wb) research whether need to read short predicate after vectorization evaluation
// to reduce cost of read short circuit columns.
// In SSB test, it make no difference; So need more scenarios to test
selected_size = _evaluate_short_circuit_predicate(sel_rowid_idx, selected_size);
if (selected_size > 0) {
// step 3.1: output short circuit and predicate column
// when lazy materialization enables, _first_read_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
// see _vec_init_lazy_materialization
// todo(wb) need to tell input columnids from output columnids
RETURN_IF_ERROR(_output_column_by_sel_idx(block, _first_read_column_ids,
sel_rowid_idx, selected_size));
// step 3.2: read remaining expr column and evaluate it.
if (_is_need_expr_eval) {
// The predicate column contains the remaining expr column, no need second read.
if (!_second_read_column_ids.empty()) {
SCOPED_RAW_TIMER(&_opts.stats->second_read_ns);
RETURN_IF_ERROR(_read_columns_by_rowids(
_second_read_column_ids, _block_rowids, sel_rowid_idx,
selected_size, &_current_return_columns));
if (std::find(_second_read_column_ids.begin(),
_second_read_column_ids.end(), _schema->version_col_idx()) !=
_second_read_column_ids.end()) {
_replace_version_col(selected_size);
}
for (auto cid : _second_read_column_ids) {
auto loc = _schema_block_id_map[cid];
block->replace_by_position(loc,
std::move(_current_return_columns[cid]));
}
}
DCHECK(block->columns() > _schema_block_id_map[*_common_expr_columns.begin()]);
// block->rows() takes the size of the first column by default. If the first column is no predicate column,
// it has not been read yet. add a const column that has been read to calculate rows().
if (block->rows() == 0) {
vectorized::MutableColumnPtr col0 =
std::move(*block->get_by_position(0).column).mutate();
auto res_column = vectorized::ColumnString::create();
res_column->insert_data("", 0);
auto col_const = vectorized::ColumnConst::create(std::move(res_column),
selected_size);
block->replace_by_position(0, std::move(col_const));
_output_index_result_column(sel_rowid_idx, selected_size, block);
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block));
block->replace_by_position(0, std::move(col0));
} else {
_output_index_result_column(sel_rowid_idx, selected_size, block);
block->shrink_char_type_column_suffix_zero(_char_type_idx);
RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block));
}
}
} else if (_is_need_expr_eval) {
for (auto cid : _second_read_column_ids) {
auto loc = _schema_block_id_map[cid];
block->replace_by_position(loc, std::move(_current_return_columns[cid]));
}
}
} else if (_is_need_expr_eval) {
DCHECK(!_first_read_column_ids.empty());
// first read all rows are insert block, initialize sel_rowid_idx to all rows.
for (auto cid : _first_read_column_ids) {
auto loc = _schema_block_id_map[cid];
block->replace_by_position(loc, std::move(_current_return_columns[cid]));
}
for (uint32_t i = 0; i < selected_size; ++i) {
sel_rowid_idx[i] = i;
}
if (block->rows() == 0) {
vectorized::MutableColumnPtr col0 =
std::move(*block->get_by_position(0).column).mutate();
auto res_column = vectorized::ColumnString::create();
res_column->insert_data("", 0);
auto col_const =
vectorized::ColumnConst::create(std::move(res_column), selected_size);
block->replace_by_position(0, std::move(col_const));
_output_index_result_column(sel_rowid_idx, selected_size, block);
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block));
block->replace_by_position(0, std::move(col0));
} else {
_output_index_result_column(sel_rowid_idx, selected_size, block);
block->shrink_char_type_column_suffix_zero(_char_type_idx);
RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block));
}
}
if (UNLIKELY(_opts.record_rowids)) {
_sel_rowid_idx.resize(selected_size);
_selected_size = selected_size;
for (auto i = 0; i < _selected_size; i++) {
_sel_rowid_idx[i] = sel_rowid_idx[i];
}
}
if (_non_predicate_columns.empty()) {
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);
if (UNLIKELY(_estimate_row_size) && block->rows() > 0) {
_update_max_row(block);
}
return Status::OK();
}
// step4: read non_predicate column
if (selected_size > 0) {
RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids,
sel_rowid_idx, selected_size,
&_current_return_columns));
if (std::find(_non_predicate_columns.begin(), _non_predicate_columns.end(),
_schema->version_col_idx()) != _non_predicate_columns.end()) {
_replace_version_col(selected_size);
}
}
// step5: output columns
_output_non_pred_columns(block);
if (!_is_need_expr_eval) {
_output_index_result_column(sel_rowid_idx, selected_size, block);
}
}
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);
if (UNLIKELY(_estimate_row_size) && block->rows() > 0) {
_update_max_row(block);
}
// reverse block row order
if (_opts.read_orderby_key_reverse) {
size_t num_rows = block->rows();
size_t num_columns = block->columns();
vectorized::IColumn::Permutation permutation;
for (size_t i = 0; i < num_rows; ++i) permutation.emplace_back(num_rows - 1 - i);
for (size_t i = 0; i < num_columns; ++i)
block->get_by_position(i).column =
block->get_by_position(i).column->permute(permutation, num_rows);
}
return Status::OK();
}
Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size,
vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns);
DCHECK(!_remaining_conjunct_roots.empty());
DCHECK(block->rows() != 0);
size_t prev_columns = block->columns();
vectorized::IColumn::Filter filter;
RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts_and_filter_block(
_common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter));
selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter);
return Status::OK();
}
uint16_t SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx,
uint16_t selected_size,
const vectorized::IColumn::Filter& filter) {
size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
if (count == 0) {
return 0;
} else {
const vectorized::UInt8* filt_pos = filter.data();
uint16_t new_size = 0;
uint32_t sel_pos = 0;
const uint32_t sel_end = selected_size;
static constexpr size_t SIMD_BYTES = 32;
const uint32_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES;
while (sel_pos < sel_end_simd) {
auto mask = simd::bytes32_mask_to_bits32_mask(filt_pos + sel_pos);
if (0 == mask) {
//pass
} else if (0xffffffff == mask) {
for (uint32_t i = 0; i < SIMD_BYTES; i++) {
sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + i];
}
} else {
while (mask) {
const size_t bit_pos = __builtin_ctzll(mask);
sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + bit_pos];
mask = mask & (mask - 1);
}
}
sel_pos += SIMD_BYTES;
}
for (; sel_pos < sel_end; sel_pos++) {
if (filt_pos[sel_pos]) {
sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos];
}
}
return new_size;
}
}
void SegmentIterator::_output_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size,
vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer);
if (block->rows() == 0) {
return;
}
for (auto& iter : _rowid_result_for_index) {
_columns_to_filter.push_back(block->columns());
block->insert({vectorized::ColumnUInt8::create(),
std::make_shared<vectorized::DataTypeUInt8>(), iter.first});
if (!iter.second.first) {
// predicate not in compound query
block->get_by_name(iter.first).column =
vectorized::DataTypeUInt8().create_column_const(block->rows(), (uint8_t)1);
continue;
}
_build_index_result_column(sel_rowid_idx, select_size, block, iter.first,
iter.second.second);
}
}
void SegmentIterator::_build_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size,
vectorized::Block* block,
const std::string& pred_result_sign,
const roaring::Roaring& index_result) {
auto index_result_column = vectorized::ColumnUInt8::create();
vectorized::ColumnUInt8::Container& vec_match_pred = index_result_column->get_data();
vec_match_pred.resize(block->rows());
size_t idx_in_block = 0;
size_t idx_in_row_range = 0;
size_t idx_in_selected = 0;
// _split_row_ranges store multiple ranges which split in function _read_columns_by_index(),
// index_result is a column predicate apply result in a whole segement,
// but a scanner thread one time can read max rows limit by block_row_max,
// so split _row_bitmap by one time scan range, in order to match size of one scanner thread read rows.
for (auto origin_row_range : _split_row_ranges) {
for (size_t rowid = origin_row_range.first; rowid < origin_row_range.second; ++rowid) {
if (sel_rowid_idx == nullptr || (idx_in_selected < select_size &&
idx_in_row_range == sel_rowid_idx[idx_in_selected])) {
if (index_result.contains(rowid)) {
vec_match_pred[idx_in_block++] = true;
} else {
vec_match_pred[idx_in_block++] = false;
}
idx_in_selected++;
}
idx_in_row_range++;
}
}
assert(block->rows() == vec_match_pred.size());
auto index_result_position = block->get_position_by_name(pred_result_sign);
block->replace_by_position(index_result_position, std::move(index_result_column));
}
void SegmentIterator::_convert_dict_code_for_predicate_if_necessary() {
for (auto predicate : _short_cir_eval_predicate) {
_convert_dict_code_for_predicate_if_necessary_impl(predicate);
}
for (auto predicate : _pre_eval_block_predicate) {
_convert_dict_code_for_predicate_if_necessary_impl(predicate);
}
for (auto column_id : _delete_range_column_ids) {
_current_return_columns[column_id].get()->convert_dict_codes_if_necessary();
}
for (auto column_id : _delete_bloom_filter_column_ids) {
_current_return_columns[column_id].get()->initialize_hash_values_for_runtime_filter();
}
}
void SegmentIterator::_convert_dict_code_for_predicate_if_necessary_impl(
ColumnPredicate* predicate) {
auto& column = _current_return_columns[predicate->column_id()];
auto* col_ptr = column.get();
if (PredicateTypeTraits::is_range(predicate->type())) {
col_ptr->convert_dict_codes_if_necessary();
} else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) {
col_ptr->initialize_hash_values_for_runtime_filter();
}
}
void SegmentIterator::_update_max_row(const vectorized::Block* block) {
_estimate_row_size = false;
auto avg_row_size = block->bytes() / block->rows();
int block_row_max = config::doris_scan_block_max_mb / avg_row_size;
_opts.block_row_max = std::min(block_row_max, _opts.block_row_max);
}
Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>* block_row_locations) {
DCHECK(_opts.record_rowids);
DCHECK_GE(_block_rowids.size(), _current_batch_rows_read);
uint32_t sid = segment_id();
if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
block_row_locations->resize(_current_batch_rows_read);
for (auto i = 0; i < _current_batch_rows_read; i++) {
(*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]);
}
} else {
block_row_locations->resize(_selected_size);
for (auto i = 0; i < _selected_size; i++) {
(*block_row_locations)[i] = RowLocation(sid, _block_rowids[_sel_rowid_idx[i]]);
}
}
return Status::OK();
}
/**
* solution 1: where cluase included nodes are all `and` leaf nodes,
* predicate pushed down and remove from vconjunct.
* for example: where A = 1 and B = 'test' and B like '%he%';
* column A : `A = 1` pushed down, this column's predicates all pushed down,
* call _check_column_pred_all_push_down will return true.
* column B : `B = 'test'` pushed down, but `B like '%he%'` remain in vconjunct,
* call _check_column_pred_all_push_down will return false.
*
* solution 2: where cluase included nodes are compound or other complex conditions,
* predicate pushed down but still remain in vconjunct.
* for exmple: where (A = 1 and B = 'test') or B = 'hi' or (C like '%ye%' and C > 'aa');
* column A : `A = 1` pushed down, check it applyed by index,
* call _check_column_pred_all_push_down will return true.
* column B : `B = 'test'`, `B = 'hi'` all pushed down, check them all applyed by index,
* call _check_column_pred_all_push_down will return true.
* column C : `C like '%ye%'` not pushed down, `C > 'aa'` pushed down, only `C > 'aa'` applyed by index,
* call _check_column_pred_all_push_down will return false.
*/
bool SegmentIterator::_check_column_pred_all_push_down(const std::string& column_name,
bool in_compound) {
if (_remaining_conjunct_roots.empty()) {
return true;
}
if (in_compound) {
auto preds_in_remaining_vconjuct = _column_pred_in_remaining_vconjunct[column_name];
for (auto pred_info : preds_in_remaining_vconjuct) {
auto column_sign = _gen_predicate_result_sign(&pred_info);
if (_rowid_result_for_index.count(column_sign) < 1) {
return false;
}
}
} else {
if (_column_pred_in_remaining_vconjunct[column_name].size() != 0) {
return false;
}
}
return true;
}
void SegmentIterator::_calculate_pred_in_remaining_conjunct_root(
const vectorized::VExprSPtr& expr) {
if (expr == nullptr) {
return;
}
auto& children = expr->children();
for (int i = 0; i < children.size(); ++i) {
_calculate_pred_in_remaining_conjunct_root(children[i]);
}
auto node_type = expr->node_type();
if (node_type == TExprNodeType::SLOT_REF) {
_column_predicate_info->column_name = expr->expr_name();
} else if (_is_literal_node(node_type)) {
auto v_literal_expr = static_cast<const doris::vectorized::VLiteral*>(expr.get());
_column_predicate_info->query_value = v_literal_expr->value();
} else {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type != TExprNodeType::COMPOUND_PRED) {
_column_predicate_info->query_op = expr->fn().name.function_name;
}
if (!_column_predicate_info->is_empty()) {
_column_pred_in_remaining_vconjunct[_column_predicate_info->column_name].push_back(
*_column_predicate_info);
_column_predicate_info.reset(new ColumnPredicateInfo());
}
}
}
} // namespace segment_v2
} // namespace doris