blob: 1a44d06c77b6e8b097de5c80b45570dfe6e9f7d4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "common/factory_creator.h"
#include "common/global_types.h"
#include "common/status.h"
#include "exec/olap_common.h"
#include "io/io_common.h"
#include "pipeline/exec/file_scan_operator.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exprs/vexpr_fwd.h"
namespace doris {
class RuntimeState;
class TFileRangeDesc;
class TFileScanRange;
class TFileScanRangeParams;
namespace vectorized {
class ShardedKVCache;
class VExpr;
class VExprContext;
} // namespace vectorized
} // namespace doris
namespace doris::vectorized {
class FileScanner : public Scanner {
ENABLE_FACTORY_CREATOR(FileScanner);
public:
static constexpr const char* NAME = "FileScanner";
// sub profile name (for parquet/orc)
static const std::string FileReadBytesProfile;
static const std::string FileReadTimeProfile;
FileScanner(RuntimeState* state, pipeline::FileScanLocalState* parent, int64_t limit,
std::shared_ptr<vectorized::SplitSourceConnector> split_source,
RuntimeProfile* profile, ShardedKVCache* kv_cache,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const std::unordered_map<std::string, int>* colname_to_slot_id);
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
void try_stop() override;
Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts) override;
std::string get_name() override { return FileScanner::NAME; }
std::string get_current_scan_range_name() override { return _current_range_path; }
//only used for read one line.
FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams* params,
const std::unordered_map<std::string, int>* colname_to_slot_id,
TupleDescriptor* tuple_desc)
: Scanner(state, profile),
_params(params),
_col_name_to_slot_id(colname_to_slot_id),
_real_tuple_desc(tuple_desc) {};
Status read_lines_from_range(const TFileRangeDesc& range, const std::list<int64_t>& row_ids,
Block* result_block, const ExternalFileMappingInfo& external_info,
int64_t* init_reader_ms, int64_t* get_block_ms);
Status prepare_for_read_lines(const TFileRangeDesc& range);
void update_realtime_counters() override;
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
Status _get_block_wrapped(RuntimeState* state, Block* block, bool* eof);
Status _get_next_reader();
// TODO: cast input block columns type to string.
Status _cast_src_block(Block* block) { return Status::OK(); }
void _collect_profile_before_close() override;
// fe will add skip_bitmap_col to _input_tuple_desc iff the target olaptable has skip_bitmap_col
// and the current load is a flexible partial update
bool _should_process_skip_bitmap_col() const { return _skip_bitmap_col_idx != -1; }
protected:
const TFileScanRangeParams* _params = nullptr;
std::shared_ptr<vectorized::SplitSourceConnector> _split_source;
bool _first_scan_range = false;
TFileRangeDesc _current_range;
std::unique_ptr<GenericReader> _cur_reader;
bool _cur_reader_eof = false;
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
// File source slot descriptors
std::vector<SlotDescriptor*> _file_slot_descs;
// col names from _file_slot_descs
std::vector<std::string> _file_col_names;
// Partition source slot descriptors
std::vector<SlotDescriptor*> _partition_slot_descs;
// Partition slot id to index in _partition_slot_descs
std::unordered_map<SlotId, int> _partition_slot_index_map;
// created from param.expr_of_dest_slot
// For query, it saves default value expr of all dest columns, or nullptr for NULL.
// For load, it saves conversion expr/default value of all dest columns.
VExprContextSPtrs _dest_vexpr_ctx;
// dest slot name to index in _dest_vexpr_ctx;
std::unordered_map<std::string, int> _dest_slot_name_to_idx;
// col name to default value expr
std::unordered_map<std::string, vectorized::VExprContextSPtr> _col_default_value_ctx;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
// dest slot desc index to src slot desc index
std::unordered_map<int, int> _dest_slot_to_src_slot_index;
std::unordered_map<std::string, uint32_t> _src_block_name_to_idx;
// Get from GenericReader, save the existing columns in file to their type.
std::unordered_map<std::string, DataTypePtr> _slot_lower_name_to_col_type;
// Get from GenericReader, save columns that required by scan but not exist in file.
// These columns will be filled by default value or null.
std::unordered_set<std::string> _missing_cols;
// The col lowercase name of source file to type of source file.
std::map<std::string, DataTypePtr> _source_file_col_name_types;
// For load task
vectorized::VExprContextSPtrs _pre_conjunct_ctxs;
std::unique_ptr<RowDescriptor> _src_row_desc;
std::unique_ptr<RowDescriptor> _dest_row_desc;
// row desc for default exprs
std::unique_ptr<RowDescriptor> _default_val_row_desc;
// owned by scan node
ShardedKVCache* _kv_cache = nullptr;
std::set<TSlotId> _is_file_slot;
bool _scanner_eof = false;
int _rows = 0;
int _num_of_columns_from_file;
bool _src_block_mem_reuse = false;
bool _strict_mode;
bool _src_block_init = false;
Block* _src_block_ptr = nullptr;
Block _src_block;
VExprContextSPtrs _push_down_conjuncts;
VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
Block _runtime_filter_partition_prune_block;
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::FileReaderStats> _file_reader_stats;
std::unique_ptr<io::IOContext> _io_ctx;
// Whether to fill partition columns from path, default is true.
bool _fill_partition_from_path = true;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_col_descs;
std::unordered_map<std::string, bool> _partition_value_is_null;
std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;
// idx of skip_bitmap_col in _input_tuple_desc
int32_t _skip_bitmap_col_idx {-1};
int32_t _sequence_map_col_uid {-1};
int32_t _sequence_col_uid {-1};
private:
RuntimeProfile::Counter* _get_block_timer = nullptr;
RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
RuntimeProfile::Counter* _fully_skipped_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
RuntimeProfile::Counter* _file_read_bytes_counter = nullptr;
RuntimeProfile::Counter* _file_read_calls_counter = nullptr;
RuntimeProfile::Counter* _file_read_time_counter = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
// single slot filter conjuncts
std::unordered_map<int, VExprContextSPtrs> _slot_id_to_filter_conjuncts;
// not single(zero or multi) slot filter conjuncts
VExprContextSPtrs _not_single_slot_filter_conjuncts;
// save the path of current scan range
std::string _current_range_path = "";
// Only for load scan node.
const TupleDescriptor* _input_tuple_desc = nullptr;
// If _input_tuple_desc is set,
// the _real_tuple_desc will point to _input_tuple_desc,
// otherwise, point to _output_tuple_desc
const TupleDescriptor* _real_tuple_desc = nullptr;
std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> _row_id_column_iterator_pair = {nullptr,
-1};
private:
Status _init_expr_ctxes();
Status _init_src_block(Block* block);
Status _check_output_block_types();
Status _cast_to_input_block(Block* block);
Status _fill_columns_from_path(size_t rows);
Status _fill_missing_columns(size_t rows);
Status _pre_filter_src_block();
Status _convert_to_output_block(Block* block);
Status _truncate_char_or_varchar_columns(Block* block);
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
Status _generate_partition_columns();
Status _generate_missing_columns();
bool _check_partition_prune_expr(const VExprSPtr& expr);
void _init_runtime_filter_partition_prune_ctxs();
void _init_runtime_filter_partition_prune_block();
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
Status _process_conjuncts_for_dict_filter();
Status _process_late_arrival_conjuncts();
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
Status _init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
FileMetaCache* file_meta_cache_ptr);
Status _init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
FileMetaCache* file_meta_cache_ptr);
Status _create_row_id_column_iterator();
TFileFormatType::type _get_current_format_type() {
// for compatibility, if format_type is not set in range, use the format type of params
const TFileRangeDesc& range = _current_range;
return range.__isset.format_type ? range.format_type : _params->format_type;
};
Status _init_io_ctx() {
_io_ctx.reset(new io::IOContext());
_io_ctx->query_id = &_state->query_id();
return Status::OK();
};
void _reset_counter() {
_counter.num_rows_unselected = 0;
_counter.num_rows_filtered = 0;
}
TPushAggOp::type _get_push_down_agg_type() {
return _local_state == nullptr ? TPushAggOp::type::NONE
: _local_state->get_push_down_agg_type();
}
int64_t _get_push_down_count() { return _local_state->get_push_down_count(); }
// enable the file meta cache only when
// 1. max_external_file_meta_cache_num is > 0
// 2. the file number is less than 1/3 of cache's capacibility
// Otherwise, the cache miss rate will be high
bool _should_enable_file_meta_cache() {
return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
_split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3;
}
};
} // namespace doris::vectorized