| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/PlanNodes_types.h> |
| #include <stddef.h> |
| #include <stdint.h> |
| |
| #include <memory> |
| #include <set> |
| #include <string> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include "agent/be_exec_version_manager.h" |
| #include "common/status.h" |
| #include "exprs/function_filter.h" |
| #include "io/io_common.h" |
| #include "olap/base_tablet.h" |
| #include "olap/delete_handler.h" |
| #include "olap/filter_olap_param.h" |
| #include "olap/iterators.h" |
| #include "olap/olap_common.h" |
| #include "olap/olap_tuple.h" |
| #include "olap/row_cursor.h" |
| #include "olap/rowid_conversion.h" |
| #include "olap/rowset/rowset.h" |
| #include "olap/rowset/rowset_meta.h" |
| #include "olap/rowset/rowset_reader.h" |
| #include "olap/rowset/rowset_reader_context.h" |
| #include "olap/tablet_fwd.h" |
| |
| namespace doris { |
| |
| class RuntimeState; |
| class BitmapFilterFuncBase; |
| class BloomFilterFuncBase; |
| class ColumnPredicate; |
| class DeleteBitmap; |
| class HybridSetBase; |
| class RuntimeProfile; |
| |
| namespace vectorized { |
| class VCollectIterator; |
| class Block; |
| class VExpr; |
| class Arena; |
| class VExprContext; |
| } // namespace vectorized |
| |
| // Used to compare row with input scan key. Scan key only contains key columns, |
| // row contains all key columns, which is superset of key columns. |
| // So we should compare the common prefix columns of lhs and rhs. |
| // |
| // NOTE: if you are not sure if you can use it, please don't use this function. |
| inline int compare_row_key(const RowCursor& lhs, const RowCursor& rhs) { |
| auto cmp_cids = std::min(lhs.schema()->num_column_ids(), rhs.schema()->num_column_ids()); |
| for (uint32_t cid = 0; cid < cmp_cids; ++cid) { |
| auto res = lhs.schema()->column(cid)->compare_cell(lhs.cell(cid), rhs.cell(cid)); |
| if (res != 0) { |
| return res; |
| } |
| } |
| return 0; |
| } |
| |
| class TabletReader { |
| struct KeysParam { |
| std::string to_string() const; |
| |
| std::vector<RowCursor> start_keys; |
| std::vector<RowCursor> end_keys; |
| bool start_key_include = false; |
| bool end_key_include = false; |
| }; |
| |
| public: |
| // Params for Reader, |
| // mainly include tablet, data version and fetch range. |
| struct ReaderParams { |
| bool has_single_version() const { |
| return (rs_splits.size() == 1 && |
| rs_splits[0].rs_reader->rowset()->start_version() == 0 && |
| !rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) || |
| (rs_splits.size() == 2 && |
| rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 && |
| rs_splits[1].rs_reader->rowset()->start_version() == 2 && |
| !rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()); |
| } |
| |
| int get_be_exec_version() const { |
| if (runtime_state) { |
| return runtime_state->be_exec_version(); |
| } |
| return BeExecVersionManager::get_newest_version(); |
| } |
| |
| void set_read_source(TabletReadSource read_source, bool skip_delete_bitmap = false) { |
| rs_splits = std::move(read_source.rs_splits); |
| delete_predicates = std::move(read_source.delete_predicates); |
| #ifndef BE_TEST |
| if (tablet->enable_unique_key_merge_on_write() && !skip_delete_bitmap) { |
| delete_bitmap = std::move(read_source.delete_bitmap); |
| } |
| #endif |
| } |
| |
| BaseTabletSPtr tablet; |
| TabletSchemaSPtr tablet_schema; |
| ReaderType reader_type = ReaderType::READER_QUERY; |
| bool direct_mode = false; |
| bool aggregation = false; |
| // for compaction, schema_change, check_sum: we don't use page cache |
| // for query and config::disable_storage_page_cache is false, we use page cache |
| bool use_page_cache = false; |
| Version version = Version(-1, 0); |
| |
| std::vector<OlapTuple> start_key; |
| std::vector<OlapTuple> end_key; |
| bool start_key_include = false; |
| bool end_key_include = false; |
| |
| std::vector<std::shared_ptr<ColumnPredicate>> predicates; |
| std::vector<FunctionFilter> function_filters; |
| std::vector<RowsetMetaSharedPtr> delete_predicates; |
| // slots that cast may be eliminated in storage layer |
| std::map<std::string, vectorized::DataTypePtr> target_cast_type_for_variants; |
| |
| std::map<int32_t, TColumnAccessPaths> all_access_paths; |
| std::map<int32_t, TColumnAccessPaths> predicate_access_paths; |
| |
| std::vector<RowSetSplits> rs_splits; |
| // For unique key table with merge-on-write |
| DeleteBitmapPtr delete_bitmap = nullptr; |
| |
| // return_columns is init from query schema |
| std::vector<ColumnId> return_columns; |
| // output_columns only contain columns in OrderByExprs and outputExprs |
| std::set<int32_t> output_columns; |
| RuntimeProfile* profile = nullptr; |
| RuntimeState* runtime_state = nullptr; |
| |
| // use only in vec exec engine |
| std::vector<ColumnId>* origin_return_columns = nullptr; |
| std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr; |
| TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE; |
| std::vector<vectorized::VExprSPtr> remaining_conjunct_roots; |
| vectorized::VExprContextSPtrs common_expr_ctxs_push_down; |
| |
| // used for compaction to record row ids |
| bool record_rowids = false; |
| RowIdConversion* rowid_conversion = nullptr; |
| std::vector<int> topn_filter_source_node_ids; |
| int topn_filter_target_node_id = -1; |
| // used for special optimization for query : ORDER BY key LIMIT n |
| bool read_orderby_key = false; |
| // used for special optimization for query : ORDER BY key DESC LIMIT n |
| bool read_orderby_key_reverse = false; |
| // num of columns for orderby key |
| size_t read_orderby_key_num_prefix_columns = 0; |
| // limit of rows for read_orderby_key |
| size_t read_orderby_key_limit = 0; |
| // filter_block arguments |
| vectorized::VExprContextSPtrs filter_block_conjuncts; |
| |
| // for vertical compaction |
| bool is_key_column_group = false; |
| std::vector<uint32_t> key_group_cluster_key_idxes; |
| |
| bool is_segcompaction = false; |
| |
| std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr; |
| |
| void check_validation() const; |
| |
| std::string to_string() const; |
| |
| int64_t batch_size = -1; |
| |
| std::map<ColumnId, vectorized::VExprContextSPtr> virtual_column_exprs; |
| std::map<ColumnId, size_t> vir_cid_to_idx_in_block; |
| std::map<size_t, vectorized::DataTypePtr> vir_col_idx_to_type; |
| |
| std::shared_ptr<vectorized::ScoreRuntime> score_runtime; |
| CollectionStatisticsPtr collection_statistics; |
| std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime; |
| |
| uint64_t condition_cache_digest = 0; |
| }; |
| |
| TabletReader() = default; |
| |
| virtual ~TabletReader() = default; |
| |
| TabletReader(const TabletReader&) = delete; |
| void operator=(const TabletReader&) = delete; |
| |
| // Initialize TabletReader with tablet, data version and fetch range. |
| virtual Status init(const ReaderParams& read_params); |
| |
| // Read next block with aggregation. |
| // Return OK and set `*eof` to false when next block is read |
| // Return OK and set `*eof` to true when no more rows can be read. |
| // Return others when unexpected error happens. |
| virtual Status next_block_with_aggregation(vectorized::Block* block, bool* eof) { |
| return Status::Error<ErrorCode::READER_INITIALIZE_ERROR>( |
| "TabletReader not support next_block_with_aggregation"); |
| } |
| |
| virtual uint64_t merged_rows() const { return _merged_rows; } |
| |
| uint64_t filtered_rows() const { |
| return _stats.rows_del_filtered + _stats.rows_del_by_bitmap + |
| _stats.rows_conditions_filtered + _stats.rows_vec_del_cond_filtered + |
| _stats.rows_vec_cond_filtered + _stats.rows_short_circuit_cond_filtered; |
| } |
| |
| void set_batch_size(int batch_size) { _reader_context.batch_size = batch_size; } |
| |
| int batch_size() const { return _reader_context.batch_size; } |
| |
| const OlapReaderStatistics& stats() const { return _stats; } |
| OlapReaderStatistics* mutable_stats() { return &_stats; } |
| |
| virtual void update_profile(RuntimeProfile* profile) {} |
| static Status init_reader_params_and_create_block( |
| TabletSharedPtr tablet, ReaderType reader_type, |
| const std::vector<RowsetSharedPtr>& input_rowsets, |
| TabletReader::ReaderParams* reader_params, vectorized::Block* block); |
| |
| protected: |
| friend class vectorized::VCollectIterator; |
| friend class DeleteHandler; |
| |
| Status _init_params(const ReaderParams& read_params); |
| |
| Status _capture_rs_readers(const ReaderParams& read_params); |
| |
| Status _init_keys_param(const ReaderParams& read_params); |
| |
| Status _init_orderby_keys_param(const ReaderParams& read_params); |
| |
| Status _init_conditions_param(const ReaderParams& read_params); |
| |
| virtual std::shared_ptr<ColumnPredicate> _parse_to_predicate( |
| const FunctionFilter& function_filter); |
| |
| Status _init_delete_condition(const ReaderParams& read_params); |
| |
| Status _init_return_columns(const ReaderParams& read_params); |
| |
| const BaseTabletSPtr& tablet() { return _tablet; } |
| // If original column is a variant type column, and it's predicate is normalized |
| // so in order to get the real type of column predicate, we need to reset type |
| // according to the related type in `target_cast_type_for_variants`.Since variant is not |
| // an predicate applicable type.Otherwise return the original tablet column. |
| // Eg. `where cast(v:a as bigint) > 1` will elimate cast, and materialize this variant column |
| // to type bigint |
| TabletColumn materialize_column(const TabletColumn& orig); |
| |
| const TabletSchema& tablet_schema() { return *_tablet_schema; } |
| |
| vectorized::Arena _predicate_arena; |
| std::vector<ColumnId> _return_columns; |
| |
| // used for special optimization for query : ORDER BY key [ASC|DESC] LIMIT n |
| // columns for orderby keys |
| std::vector<uint32_t> _orderby_key_columns; |
| // only use in outer join which change the column nullable which must keep same in |
| // vec query engine |
| std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr; |
| |
| BaseTabletSPtr _tablet; |
| RowsetReaderContext _reader_context; |
| TabletSchemaSPtr _tablet_schema; |
| KeysParam _keys_param; |
| std::vector<bool> _is_lower_keys_included; |
| std::vector<bool> _is_upper_keys_included; |
| std::vector<std::shared_ptr<ColumnPredicate>> _col_predicates; |
| std::vector<std::shared_ptr<ColumnPredicate>> _value_col_predicates; |
| DeleteHandler _delete_handler; |
| |
| // Indicates whether the tablets has do a aggregation in storage engine. |
| bool _aggregation = false; |
| // for agg query, we don't need to finalize when scan agg object data |
| ReaderType _reader_type = ReaderType::READER_QUERY; |
| bool _next_delete_flag = false; |
| bool _delete_sign_available = false; |
| bool _filter_delete = false; |
| int32_t _sequence_col_idx = -1; |
| bool _direct_mode = false; |
| |
| std::vector<uint32_t> _key_cids; |
| std::vector<uint32_t> _value_cids; |
| |
| uint64_t _merged_rows = 0; |
| OlapReaderStatistics _stats; |
| }; |
| |
| } // namespace doris |