blob: 1e1cb417aa211fa5b30904a4e83ae1044179d334 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "beta_rowset_reader.h"
#include <stddef.h>
#include <algorithm>
#include <memory>
#include <ostream>
#include <roaring/roaring.hh>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include "common/logging.h"
#include "common/status.h"
#include "io/io_common.h"
#include "olap/block_column_predicate.h"
#include "olap/column_predicate.h"
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/row_cursor.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "olap/schema_cache.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/olap/vgeneric_iterators.h"
namespace doris {
using namespace ErrorCode;
BetaRowsetReader::BetaRowsetReader(BetaRowsetSharedPtr rowset)
: _read_context(nullptr), _rowset(std::move(rowset)), _stats(&_owned_stats) {
_rowset->acquire();
}
void BetaRowsetReader::reset_read_options() {
_read_options.delete_condition_predicates = AndBlockColumnPredicate::create_shared();
_read_options.column_predicates.clear();
_read_options.col_id_to_predicates.clear();
_read_options.del_predicates_for_zone_map.clear();
_read_options.key_ranges.clear();
}
RowsetReaderSharedPtr BetaRowsetReader::clone() {
return RowsetReaderSharedPtr(new BetaRowsetReader(_rowset));
}
void BetaRowsetReader::update_profile(RuntimeProfile* profile) {
if (_iterator != nullptr) {
_iterator->update_profile(profile);
}
}
Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context,
std::vector<RowwiseIteratorUPtr>* out_iters,
bool use_cache) {
_read_context = read_context;
// The segment iterator is created with its own statistics,
// and the member variable '_stats' is initialized by '_stats(&owned_stats)'.
// The choice of statistics used depends on the workload of the rowset reader.
// For instance, if it's for query, the get_segment_iterators function
// will receive one valid read_context with corresponding valid statistics,
// and we will use those statistics.
// However, for compaction or schema change workloads,
// the read_context passed to the function will have null statistics,
// and in such cases we will try to use the beta rowset reader's own statistics.
if (_read_context->stats != nullptr) {
_stats = _read_context->stats;
}
SCOPED_RAW_TIMER(&_stats->rowset_reader_get_segment_iterators_timer_ns);
RETURN_IF_ERROR(_rowset->load());
// convert RowsetReaderContext to StorageReadOptions
_read_options.block_row_max = read_context->batch_size;
_read_options.stats = _stats;
_read_options.push_down_agg_type_opt = _read_context->push_down_agg_type_opt;
_read_options.remaining_conjunct_roots = _read_context->remaining_conjunct_roots;
_read_options.common_expr_ctxs_push_down = _read_context->common_expr_ctxs_push_down;
_read_options.rowset_id = _rowset->rowset_id();
_read_options.version = _rowset->version();
_read_options.tablet_id = _rowset->rowset_meta()->tablet_id();
_read_options.topn_limit = _topn_limit;
if (_read_context->lower_bound_keys != nullptr) {
for (int i = 0; i < _read_context->lower_bound_keys->size(); ++i) {
_read_options.key_ranges.emplace_back(&_read_context->lower_bound_keys->at(i),
_read_context->is_lower_keys_included->at(i),
&_read_context->upper_bound_keys->at(i),
_read_context->is_upper_keys_included->at(i));
}
}
// delete_hanlder is always set, but it maybe not init, so that it will return empty conditions
// or predicates when it is not inited.
if (_read_context->delete_handler != nullptr) {
_read_context->delete_handler->get_delete_conditions_after_version(
_rowset->end_version(), _read_options.delete_condition_predicates.get(),
&_read_options.del_predicates_for_zone_map);
}
std::vector<uint32_t> read_columns;
std::set<uint32_t> read_columns_set;
std::set<uint32_t> delete_columns_set;
for (int i = 0; i < _read_context->return_columns->size(); ++i) {
read_columns.push_back(_read_context->return_columns->at(i));
read_columns_set.insert(_read_context->return_columns->at(i));
}
_read_options.delete_condition_predicates->get_all_column_ids(delete_columns_set);
for (auto cid : delete_columns_set) {
if (read_columns_set.find(cid) == read_columns_set.end()) {
read_columns.push_back(cid);
}
}
VLOG_NOTICE << "read columns size: " << read_columns.size();
_input_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
if (_read_context->predicates != nullptr) {
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
_read_context->predicates->begin(),
_read_context->predicates->end());
for (auto pred : *(_read_context->predicates)) {
if (_read_options.col_id_to_predicates.count(pred->column_id()) < 1) {
_read_options.col_id_to_predicates.insert(
{pred->column_id(), AndBlockColumnPredicate::create_shared()});
}
_read_options.col_id_to_predicates[pred->column_id()]->add_column_predicate(
SingleColumnBlockPredicate::create_unique(pred));
}
}
// Take a delete-bitmap for each segment, the bitmap contains all deletes
// until the max read version, which is read_context->version.second
if (_read_context->delete_bitmap != nullptr) {
{
SCOPED_RAW_TIMER(&_stats->delete_bitmap_get_agg_ns);
RowsetId rowset_id = rowset()->rowset_id();
for (uint32_t seg_id = 0; seg_id < rowset()->num_segments(); ++seg_id) {
auto d = _read_context->delete_bitmap->get_agg(
{rowset_id, seg_id, _read_context->version.second});
if (d->isEmpty()) {
continue; // Empty delete bitmap for the segment
}
VLOG_TRACE << "Get the delete bitmap for rowset: " << rowset_id.to_string()
<< ", segment id:" << seg_id << ", size:" << d->cardinality();
_read_options.delete_bitmap.emplace(seg_id, std::move(d));
}
}
}
if (_should_push_down_value_predicates()) {
if (_read_context->value_predicates != nullptr) {
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
_read_context->value_predicates->begin(),
_read_context->value_predicates->end());
for (auto pred : *(_read_context->value_predicates)) {
if (_read_options.col_id_to_predicates.count(pred->column_id()) < 1) {
_read_options.col_id_to_predicates.insert(
{pred->column_id(), AndBlockColumnPredicate::create_shared()});
}
_read_options.col_id_to_predicates[pred->column_id()]->add_column_predicate(
SingleColumnBlockPredicate::create_unique(pred));
}
}
}
_read_options.use_page_cache = _read_context->use_page_cache;
_read_options.tablet_schema = _read_context->tablet_schema;
_read_options.enable_unique_key_merge_on_write =
_read_context->enable_unique_key_merge_on_write;
_read_options.record_rowids = _read_context->record_rowids;
_read_options.topn_filter_source_node_ids = _read_context->topn_filter_source_node_ids;
_read_options.topn_filter_target_node_id = _read_context->topn_filter_target_node_id;
_read_options.read_orderby_key_reverse = _read_context->read_orderby_key_reverse;
_read_options.read_orderby_key_columns = _read_context->read_orderby_key_columns;
_read_options.io_ctx.reader_type = _read_context->reader_type;
_read_options.io_ctx.file_cache_stats = &_stats->file_cache_stats;
_read_options.runtime_state = _read_context->runtime_state;
_read_options.output_columns = _read_context->output_columns;
_read_options.io_ctx.reader_type = _read_context->reader_type;
_read_options.io_ctx.is_disposable = _read_context->reader_type != ReaderType::READER_QUERY;
_read_options.target_cast_type_for_variants = _read_context->target_cast_type_for_variants;
if (_read_context->runtime_state != nullptr) {
_read_options.io_ctx.query_id = &_read_context->runtime_state->query_id();
_read_options.io_ctx.read_file_cache =
_read_context->runtime_state->query_options().enable_file_cache;
_read_options.io_ctx.is_disposable =
_read_context->runtime_state->query_options().disable_file_cache;
}
_read_options.io_ctx.expiration_time =
read_context->ttl_seconds > 0 && _rowset->rowset_meta()->newest_write_timestamp() > 0
? _rowset->rowset_meta()->newest_write_timestamp() + read_context->ttl_seconds
: 0;
if (_read_options.io_ctx.expiration_time <= UnixSeconds()) {
_read_options.io_ctx.expiration_time = 0;
}
bool enable_segment_cache = true;
auto* state = read_context->runtime_state;
if (state != nullptr) {
enable_segment_cache = state->query_options().__isset.enable_segment_cache
? state->query_options().enable_segment_cache
: true;
}
// When reader type is for query, session variable `enable_segment_cache` should be respected.
bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY &&
enable_segment_cache);
auto segment_count = _rowset->num_segments();
auto [seg_start, seg_end] = _segment_offsets;
// If seg_start == seg_end, it means that the segments of a rowset is not
// split scanned by multiple scanners, and the rowset reader is used to read the whole rowset.
if (seg_start == seg_end) {
seg_start = 0;
seg_end = segment_count;
}
if (_read_context->record_rowids && _read_context->rowid_conversion) {
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_rows;
RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows));
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_rows));
}
for (int64_t i = seg_start; i < seg_end; i++) {
SCOPED_RAW_TIMER(&_stats->rowset_reader_create_iterators_timer_ns);
std::unique_ptr<RowwiseIterator> iter;
/// For iterators, we don't need to initialize them all at once when creating them.
/// Instead, we should initialize each iterator separately when really using them.
/// This optimization minimizes the lifecycle of resources like column readers
/// and prevents excessive memory consumption, especially for wide tables.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, should_use_cache,
_input_schema, _read_options);
} else {
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, should_use_cache,
_input_schema, local_options);
}
if (iter->empty()) {
continue;
}
out_iters->push_back(std::move(iter));
}
return Status::OK();
}
Status BetaRowsetReader::init(RowsetReaderContext* read_context, const RowSetSplits& rs_splits) {
_read_context = read_context;
_read_context->rowset_id = _rowset->rowset_id();
_segment_offsets = rs_splits.segment_offsets;
_segment_row_ranges = rs_splits.segment_row_ranges;
return Status::OK();
}
Status BetaRowsetReader::_init_iterator_once() {
return _init_iter_once.call([this] { return _init_iterator(); });
}
Status BetaRowsetReader::_init_iterator() {
std::vector<RowwiseIteratorUPtr> iterators;
RETURN_IF_ERROR(get_segment_iterators(_read_context, &iterators));
SCOPED_RAW_TIMER(&_stats->rowset_reader_init_iterators_timer_ns);
if (_read_context->merged_rows == nullptr) {
_read_context->merged_rows = &_merged_rows;
}
// merge or union segment iterator
if (_is_merge_iterator()) {
auto sequence_loc = -1;
if (_read_context->sequence_id_idx != -1) {
for (size_t loc = 0; loc < _read_context->return_columns->size(); loc++) {
if (_read_context->return_columns->at(loc) == _read_context->sequence_id_idx) {
sequence_loc = loc;
break;
}
}
}
_iterator = vectorized::new_merge_iterator(
std::move(iterators), sequence_loc, _read_context->is_unique,
_read_context->read_orderby_key_reverse, _read_context->merged_rows);
} else {
if (_read_context->read_orderby_key_reverse) {
// reverse iterators to read backward for ORDER BY key DESC
std::reverse(iterators.begin(), iterators.end());
}
_iterator = vectorized::new_union_iterator(std::move(iterators));
}
auto s = _iterator->init(_read_options);
if (!s.ok()) {
LOG(WARNING) << "failed to init iterator: " << s.to_string();
_iterator.reset();
return Status::Error<ROWSET_READER_INIT>(s.to_string());
}
return Status::OK();
}
Status BetaRowsetReader::next_block(vectorized::Block* block) {
RETURN_IF_ERROR(_init_iterator_once());
SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
if (_empty) {
return Status::Error<END_OF_FILE>("BetaRowsetReader is empty");
}
RuntimeState* runtime_state = nullptr;
if (_read_context != nullptr) {
runtime_state = _read_context->runtime_state;
}
do {
auto s = _iterator->next_batch(block);
if (!s.ok()) {
if (!s.is<END_OF_FILE>()) {
LOG(WARNING) << "failed to read next block: " << s.to_string();
}
return s;
}
if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
return runtime_state->cancel_reason();
}
} while (block->empty());
return Status::OK();
}
Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
RETURN_IF_ERROR(_init_iterator_once());
SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
RuntimeState* runtime_state = nullptr;
if (_read_context != nullptr) {
runtime_state = _read_context->runtime_state;
}
do {
auto s = _iterator->next_block_view(block_view);
if (!s.ok()) {
if (!s.is<END_OF_FILE>()) {
LOG(WARNING) << "failed to read next block view: " << s.to_string();
}
return s;
}
if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
return runtime_state->cancel_reason();
}
} while (block_view->empty());
return Status::OK();
}
bool BetaRowsetReader::_should_push_down_value_predicates() const {
// if unique table with rowset [0-x] or [0-1] [2-y] [...],
// value column predicates can be pushdown on rowset [0-x] or [2-y], [2-y]
// must be compaction, not overlapping and don't have sequence column
return _rowset->keys_type() == UNIQUE_KEYS &&
(((_rowset->start_version() == 0 || _rowset->start_version() == 2) &&
!_rowset->_rowset_meta->is_segments_overlapping() &&
_read_context->sequence_id_idx == -1) ||
_read_context->enable_unique_key_merge_on_write);
}
} // namespace doris