blob: 9f0313f3c02dd7022682e0da01ea0b00fef63978 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/olap_file.pb.h>
#include <stdint.h>
#include <memory>
#include <utility>
#include <vector>
#include "common/status.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/schema.h"
#include "olap/segment_loader.h"
#include "util/once.h"
#include "vec/core/block.h"
namespace doris {
class RuntimeProfile;
class Schema;
struct RowLocation;
struct RowsetReaderContext;
class BetaRowsetReader : public RowsetReader {
public:
BetaRowsetReader(BetaRowsetSharedPtr rowset);
~BetaRowsetReader() override { _rowset->release(); }
Status init(RowsetReaderContext* read_context, const RowSetSplits& rs_splits) override;
Status get_segment_iterators(RowsetReaderContext* read_context,
std::vector<RowwiseIteratorUPtr>* out_iters,
bool use_cache = false) override;
void reset_read_options() override;
Status next_batch(vectorized::Block* block) override { return _next_batch(block); }
Status next_batch(vectorized::BlockView* block_view) override {
return _next_batch(block_view);
}
Status next_batch(BlockWithSameBit* block_with_same_bit) override {
return _next_batch(block_with_same_bit);
}
bool is_merge_iterator() const override {
return _read_context->need_ordered_result &&
_rowset->rowset_meta()->is_segments_overlapping() && _get_segment_num() > 1;
}
bool delete_flag() override { return _rowset->delete_flag(); }
Version version() override { return _rowset->version(); }
int64_t newest_write_timestamp() override { return _rowset->newest_write_timestamp(); }
RowsetSharedPtr rowset() override { return std::dynamic_pointer_cast<Rowset>(_rowset); }
// Return the total number of filtered rows, will be used for validation of schema change
int64_t filtered_rows() override {
return _stats->rows_del_filtered + _stats->rows_del_by_bitmap +
_stats->rows_conditions_filtered + _stats->rows_vec_del_cond_filtered +
_stats->rows_vec_cond_filtered + _stats->rows_short_circuit_cond_filtered;
}
uint64_t merged_rows() override { return *(_read_context->merged_rows); }
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
Status current_block_row_locations(std::vector<RowLocation>* locations) override {
return _iterator->current_block_row_locations(locations);
}
void update_profile(RuntimeProfile* profile) override;
RowsetReaderSharedPtr clone() override;
void set_topn_limit(size_t topn_limit) override { _topn_limit = topn_limit; }
OlapReaderStatistics* get_stats() { return _stats; }
private:
template <typename T>
Status _next_batch(T* block) {
RETURN_IF_ERROR(_init_iterator_once());
SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
if (_empty) {
return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty");
}
RuntimeState* runtime_state = nullptr;
if (_read_context != nullptr) {
runtime_state = _read_context->runtime_state;
}
do {
Status s = _iterator->next_batch(block);
if (!s.ok()) {
if (!s.is<ErrorCode::END_OF_FILE>()) {
LOG(WARNING) << "failed to read next block: " << s.to_string();
}
return s;
}
if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
return runtime_state->cancel_reason();
}
} while (block->empty());
return Status::OK();
}
[[nodiscard]] Status _init_iterator_once();
[[nodiscard]] Status _init_iterator();
bool _should_push_down_value_predicates() const;
int64_t _get_segment_num() const {
auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
seg_start = 0;
seg_end = _rowset->num_segments();
}
return seg_end - seg_start;
}
DorisCallOnce<Status> _init_iter_once;
std::pair<int64_t, int64_t> _segment_offsets;
std::vector<RowRanges> _segment_row_ranges;
SchemaSPtr _input_schema;
RowsetReaderContext* _read_context = nullptr;
BetaRowsetSharedPtr _rowset;
OlapReaderStatistics _owned_stats;
OlapReaderStatistics* _stats = nullptr;
std::unique_ptr<RowwiseIterator> _iterator;
StorageReadOptions _read_options;
bool _empty = false;
size_t _topn_limit = 0;
uint64_t _merged_rows = 0;
};
} // namespace doris