| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/scan/file_scanner.h" |
| |
| #include <fmt/format.h> |
| #include <gen_cpp/Exprs_types.h> |
| #include <gen_cpp/Metrics_types.h> |
| #include <gen_cpp/Opcodes_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/PlanNodes_types.h> |
| #include <glog/logging.h> |
| |
| #include <algorithm> |
| #include <boost/iterator/iterator_facade.hpp> |
| #include <map> |
| #include <ranges> |
| #include <tuple> |
| #include <unordered_map> |
| #include <utility> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/config.h" |
| #include "common/consts.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "core/block/column_with_type_and_name.h" |
| #include "core/block/columns_with_type_and_name.h" |
| #include "core/column/column.h" |
| #include "core/column/column_nullable.h" |
| #include "core/column/column_vector.h" |
| #include "core/data_type/data_type.h" |
| #include "core/data_type/data_type_nullable.h" |
| #include "core/data_type/data_type_string.h" |
| #include "core/string_ref.h" |
| #include "exec/common/stringop_substring.h" |
| #include "exec/rowid_fetcher.h" |
| #include "exec/scan/scan_node.h" |
| #include "exprs/aggregate/aggregate_function.h" |
| #include "exprs/function/function.h" |
| #include "exprs/function/simple_function_factory.h" |
| #include "exprs/vexpr.h" |
| #include "exprs/vexpr_context.h" |
| #include "exprs/vexpr_fwd.h" |
| #include "exprs/vslot_ref.h" |
| #include "format/arrow/arrow_stream_reader.h" |
| #include "format/count_reader.h" |
| #include "format/csv/csv_reader.h" |
| #include "format/json/new_json_reader.h" |
| #include "format/native/native_reader.h" |
| #include "format/orc/vorc_reader.h" |
| #include "format/parquet/vparquet_reader.h" |
| #include "format/table/es/es_http_reader.h" |
| #include "format/table/hive_reader.h" |
| #include "format/table/hudi_jni_reader.h" |
| #include "format/table/hudi_reader.h" |
| #include "format/table/iceberg_reader.h" |
| #include "format/table/iceberg_sys_table_jni_reader.h" |
| #include "format/table/jdbc_jni_reader.h" |
| #include "format/table/max_compute_jni_reader.h" |
| #include "format/table/paimon_cpp_reader.h" |
| #include "format/table/paimon_jni_reader.h" |
| #include "format/table/paimon_predicate_converter.h" |
| #include "format/table/paimon_reader.h" |
| #include "format/table/remote_doris_reader.h" |
| #include "format/table/transactional_hive_reader.h" |
| #include "format/table/trino_connector_jni_reader.h" |
| #include "format/text/text_reader.h" |
| #ifdef BUILD_RUST_READERS |
| #include "format/lance/lance_rust_reader.h" |
| #endif |
| #include "io/cache/block_file_cache_profile.h" |
| #include "load/group_commit/wal/wal_reader.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/runtime_profile.h" |
| #include "runtime/runtime_state.h" |
| |
| namespace cctz { |
| class time_zone; |
| } // namespace cctz |
| namespace doris { |
| class ShardedKVCache; |
| } // namespace doris |
| |
| namespace doris { |
| using namespace ErrorCode; |
| |
| const std::string FileScanner::FileReadBytesProfile = "FileReadBytes"; |
| const std::string FileScanner::FileReadTimeProfile = "FileReadTime"; |
| |
| FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit, |
| std::shared_ptr<SplitSourceConnector> split_source, |
| RuntimeProfile* profile, ShardedKVCache* kv_cache, |
| const std::unordered_map<std::string, int>* colname_to_slot_id) |
| : Scanner(state, local_state, limit, profile), |
| _split_source(split_source), |
| _cur_reader(nullptr), |
| _cur_reader_eof(false), |
| _kv_cache(kv_cache), |
| _strict_mode(false), |
| _col_name_to_slot_id(colname_to_slot_id) { |
| if (state->get_query_ctx() != nullptr && |
| state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) { |
| _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]); |
| } else { |
| // old fe thrift protocol |
| _params = _split_source->get_params(); |
| } |
| if (_params->__isset.strict_mode) { |
| _strict_mode = _params->strict_mode; |
| } |
| |
| // For load scanner, there are input and output tuple. |
| // For query scanner, there is only output tuple |
| _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id); |
| _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc; |
| _is_load = (_input_tuple_desc != nullptr); |
| _configure_file_scan_handlers(); |
| } |
| |
| void FileScanner::_configure_file_scan_handlers() { |
| if (_is_load) { |
| _init_src_block_handler = &FileScanner::_init_src_block_for_load; |
| _process_src_block_after_read_handler = |
| &FileScanner::_process_src_block_after_read_for_load; |
| _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_load; |
| _should_enable_condition_cache_handler = |
| &FileScanner::_should_enable_condition_cache_for_load; |
| } else { |
| _init_src_block_handler = &FileScanner::_init_src_block_for_query; |
| _process_src_block_after_read_handler = |
| &FileScanner::_process_src_block_after_read_for_query; |
| _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_query; |
| _should_enable_condition_cache_handler = |
| &FileScanner::_should_enable_condition_cache_for_query; |
| } |
| } |
| |
| Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { |
| RETURN_IF_ERROR(Scanner::init(state, conjuncts)); |
| _get_block_timer = |
| ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1); |
| _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), |
| "FileScannerCastInputBlockTime", 1); |
| _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), |
| "FileScannerFillMissingColumnTime", 1); |
| _pre_filter_timer = |
| ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1); |
| _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), |
| "FileScannerConvertOuputBlockTime", 1); |
| _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL( |
| _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1); |
| _empty_file_counter = |
| ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1); |
| _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), |
| "NotFoundFileNum", TUnit::UNIT, 1); |
| _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), |
| "FullySkippedFileNum", TUnit::UNIT, 1); |
| _file_counter = |
| ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1); |
| |
| _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), |
| FileReadBytesProfile, TUnit::BYTES, 1); |
| _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), |
| "FileReadCalls", TUnit::UNIT, 1); |
| _file_read_time_counter = |
| ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1); |
| |
| _runtime_filter_partition_pruned_range_counter = |
| ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), |
| "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1); |
| |
| _file_cache_statistics.reset(new io::FileCacheStatistics()); |
| _file_reader_stats.reset(new io::FileReaderStats()); |
| |
| RETURN_IF_ERROR(_init_io_ctx()); |
| _io_ctx->file_cache_stats = _file_cache_statistics.get(); |
| _io_ctx->file_reader_stats = _file_reader_stats.get(); |
| _io_ctx->is_disposable = _state->query_options().disable_file_cache; |
| |
| if (_is_load) { |
| _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(), |
| std::vector<TupleId>({_input_tuple_desc->id()}))); |
| // prepare pre filters |
| if (_params->__isset.pre_filter_exprs_list) { |
| RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list, |
| _pre_conjunct_ctxs)); |
| } else if (_params->__isset.pre_filter_exprs) { |
| VExprContextSPtr context; |
| RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context)); |
| _pre_conjunct_ctxs.emplace_back(context); |
| } |
| |
| for (auto& conjunct : _pre_conjunct_ctxs) { |
| RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc)); |
| RETURN_IF_ERROR(conjunct->open(_state)); |
| } |
| |
| _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(), |
| std::vector<TupleId>({_output_tuple_desc->id()}))); |
| } |
| |
| _default_val_row_desc.reset( |
| new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()}))); |
| |
| return Status::OK(); |
| } |
| |
| // check if the expr is a partition pruning expr |
| bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) { |
| if (expr->is_slot_ref()) { |
| auto* slot_ref = static_cast<VSlotRef*>(expr.get()); |
| return _partition_slot_index_map.find(slot_ref->slot_id()) != |
| _partition_slot_index_map.end(); |
| } |
| if (expr->is_literal()) { |
| return true; |
| } |
| return std::ranges::all_of(expr->children(), [this](const auto& child) { |
| return _check_partition_prune_expr(child); |
| }); |
| } |
| |
| void FileScanner::_init_runtime_filter_partition_prune_ctxs() { |
| _runtime_filter_partition_prune_ctxs.clear(); |
| for (auto& conjunct : _conjuncts) { |
| auto impl = conjunct->root()->get_impl(); |
| // If impl is not null, which means this a conjuncts from runtime filter. |
| auto expr = impl ? impl : conjunct->root(); |
| if (_check_partition_prune_expr(expr)) { |
| _runtime_filter_partition_prune_ctxs.emplace_back(conjunct); |
| } |
| } |
| } |
| |
| void FileScanner::_init_runtime_filter_partition_prune_block() { |
| // init block with empty column |
| for (auto const* slot_desc : _real_tuple_desc->slots()) { |
| _runtime_filter_partition_prune_block.insert( |
| ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), |
| slot_desc->get_data_type_ptr(), slot_desc->col_name())); |
| } |
| } |
| |
| Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) { |
| SCOPED_TIMER(_runtime_filter_partition_prune_timer); |
| if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) { |
| return Status::OK(); |
| } |
| size_t partition_value_column_size = 1; |
| |
| // 1. Get partition key values to string columns. |
| std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column; |
| for (auto const& partition_col_desc : _partition_col_descs) { |
| const auto& [partition_value, partition_slot_desc] = partition_col_desc.second; |
| auto data_type = partition_slot_desc->get_data_type_ptr(); |
| auto test_serde = data_type->get_serde(); |
| auto partition_value_column = data_type->create_column(); |
| auto* col_ptr = static_cast<IColumn*>(partition_value_column.get()); |
| Slice slice(partition_value.data(), partition_value.size()); |
| uint64_t num_deserialized = 0; |
| DataTypeSerDe::FormatOptions options {}; |
| if (_partition_value_is_null.contains(partition_slot_desc->col_name())) { |
| // for iceberg/paimon table |
| // NOTICE: column is always be nullable for iceberg/paimon table now |
| DCHECK(data_type->is_nullable()); |
| test_serde = test_serde->get_nested_serdes()[0]; |
| auto* null_column = assert_cast<ColumnNullable*>(col_ptr); |
| if (_partition_value_is_null[partition_slot_desc->col_name()]) { |
| null_column->insert_many_defaults(partition_value_column_size); |
| } else { |
| // If the partition value is not null, we set null map to 0 and deserialize it normally. |
| null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size); |
| RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json( |
| null_column->get_nested_column(), slice, partition_value_column_size, |
| &num_deserialized, options)); |
| } |
| } else { |
| // for hive/hudi table, the null value is set as "\\N" |
| // TODO: this will be unified as iceberg/paimon table in the future |
| RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json( |
| *col_ptr, slice, partition_value_column_size, &num_deserialized, options)); |
| } |
| |
| partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column); |
| } |
| |
| // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block. |
| // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing. |
| size_t index = 0; |
| bool first_column_filled = false; |
| for (auto const* slot_desc : _real_tuple_desc->slots()) { |
| if (partition_slot_id_to_column.find(slot_desc->id()) != |
| partition_slot_id_to_column.end()) { |
| auto data_type = slot_desc->get_data_type_ptr(); |
| auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]); |
| if (data_type->is_nullable()) { |
| _runtime_filter_partition_prune_block.insert( |
| index, ColumnWithTypeAndName( |
| ColumnNullable::create( |
| std::move(partition_value_column), |
| ColumnUInt8::create(partition_value_column_size, 0)), |
| data_type, slot_desc->col_name())); |
| } else { |
| _runtime_filter_partition_prune_block.insert( |
| index, ColumnWithTypeAndName(std::move(partition_value_column), data_type, |
| slot_desc->col_name())); |
| } |
| if (index == 0) { |
| first_column_filled = true; |
| } |
| } |
| index++; |
| } |
| |
| // 2.2 Execute conjuncts. |
| if (!first_column_filled) { |
| // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 |
| // The following process may be tricky and time-consuming, but we have no other way. |
| _runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize( |
| partition_value_column_size); |
| } |
| IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1); |
| RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr, |
| &_runtime_filter_partition_prune_block, |
| &result_filter, &can_filter_all)); |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_process_conjuncts() { |
| _slot_id_to_filter_conjuncts.clear(); |
| _not_single_slot_filter_conjuncts.clear(); |
| for (auto& conjunct : _push_down_conjuncts) { |
| auto impl = conjunct->root()->get_impl(); |
| // If impl is not null, which means this a conjuncts from runtime filter. |
| auto cur_expr = impl ? impl : conjunct->root(); |
| |
| std::vector<int> slot_ids; |
| _get_slot_ids(cur_expr.get(), &slot_ids); |
| if (slot_ids.empty()) { |
| _not_single_slot_filter_conjuncts.emplace_back(conjunct); |
| continue; |
| } |
| bool single_slot = true; |
| for (int i = 1; i < slot_ids.size(); i++) { |
| if (slot_ids[i] != slot_ids[0]) { |
| single_slot = false; |
| break; |
| } |
| } |
| if (single_slot) { |
| SlotId slot_id = slot_ids[0]; |
| _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct); |
| } else { |
| _not_single_slot_filter_conjuncts.emplace_back(conjunct); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_process_late_arrival_conjuncts() { |
| if (_push_down_conjuncts.size() < _conjuncts.size()) { |
| _push_down_conjuncts = _conjuncts; |
| // Do not clear _conjuncts here! |
| // We must keep it for fallback filtering, especially when mixing |
| // Native readers (which use _push_down_conjuncts) and JNI readers (which rely on _conjuncts). |
| // _conjuncts.clear(); |
| RETURN_IF_ERROR(_process_conjuncts()); |
| } |
| if (_applied_rf_num == _total_rf_num) { |
| _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True"); |
| } |
| return Status::OK(); |
| } |
| |
| void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) { |
| for (auto& child_expr : expr->children()) { |
| if (child_expr->is_slot_ref()) { |
| VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get()); |
| SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id()); |
| slot_desc->set_is_predicate(true); |
| slot_ids->emplace_back(slot_ref->slot_id()); |
| } else { |
| _get_slot_ids(child_expr.get(), slot_ids); |
| } |
| } |
| } |
| |
| Status FileScanner::_open_impl(RuntimeState* state) { |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(Scanner::_open_impl(state)); |
| if (_local_state) { |
| _condition_cache_digest = _local_state->get_condition_cache_digest(); |
| } |
| RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range)); |
| if (_first_scan_range) { |
| RETURN_IF_ERROR(_init_expr_ctxes()); |
| if (_state->query_options().enable_runtime_filter_partition_prune && |
| !_partition_slot_index_map.empty()) { |
| _init_runtime_filter_partition_prune_ctxs(); |
| _init_runtime_filter_partition_prune_block(); |
| } |
| } else { |
| // there's no scan range in split source. stop scanner directly. |
| _scanner_eof = true; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
| Status st = _get_block_wrapped(state, block, eof); |
| |
| if (!st.ok()) { |
| // add cur path in error msg for easy debugging |
| return std::move(st.append(". cur path: " + get_current_scan_range_name())); |
| } |
| return st; |
| } |
| |
| // For query: |
| // [exist cols] [non-exist cols] [col from path] input output |
| // A B C D E |
| // _init_src_block x x x x x - x |
| // get_next_block x x x - - - x |
| // _cast_to_input_block - - - - - - - |
| // _fill_columns_from_path - - - - x - x |
| // _fill_missing_columns - - - x - - x |
| // _convert_to_output_block - - - - - - - |
| // |
| // For load: |
| // [exist cols] [non-exist cols] [col from path] input output |
| // A B C D E |
| // _init_src_block x x x x x x - |
| // get_next_block x x x - - x - |
| // _cast_to_input_block x x x - - x - |
| // _fill_columns_from_path - - - - x x - |
| // _fill_missing_columns - - - x - x - |
| // _convert_to_output_block - - - - - - x |
| Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) { |
| do { |
| RETURN_IF_CANCELLED(state); |
| if (_cur_reader == nullptr || _cur_reader_eof) { |
| _finalize_reader_condition_cache(); |
| // The file may not exist because the file list is got from meta cache, |
| // And the file may already be removed from storage. |
| // Just ignore not found files. |
| Status st = _get_next_reader(); |
| if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) { |
| _cur_reader_eof = true; |
| COUNTER_UPDATE(_not_found_file_counter, 1); |
| continue; |
| } else if (st.is<ErrorCode::END_OF_FILE>()) { |
| _cur_reader_eof = true; |
| COUNTER_UPDATE(_fully_skipped_file_counter, 1); |
| continue; |
| } else if (!st) { |
| return st; |
| } |
| _init_reader_condition_cache(); |
| } |
| |
| if (_scanner_eof) { |
| *eof = true; |
| return Status::OK(); |
| } |
| |
| // Init src block for load job based on the data file schema (e.g. parquet) |
| // For query job, simply set _src_block_ptr to block. |
| size_t read_rows = 0; |
| RETURN_IF_ERROR(_init_src_block(block)); |
| { |
| SCOPED_TIMER(_get_block_timer); |
| |
| // Read next block. |
| // Some of column in block may not be filled (column not exist in file) |
| RETURN_IF_ERROR( |
| _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof)); |
| } |
| // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr |
| // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result. |
| if (read_rows > 0) { |
| if ((!_cur_reader->count_read_rows()) && _io_ctx) { |
| _io_ctx->file_reader_stats->read_rows += read_rows; |
| } |
| // If the push_down_agg_type is COUNT, no need to do the rest, |
| // because we only save a number in block. |
| if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) { |
| RETURN_IF_ERROR(_process_src_block_after_read(block)); |
| } |
| } |
| break; |
| } while (true); |
| |
| // Update filtered rows and unselected rows for load, reset counter. |
| // { |
| // state->update_num_rows_load_filtered(_counter.num_rows_filtered); |
| // state->update_num_rows_load_unselected(_counter.num_rows_unselected); |
| // _reset_counter(); |
| // } |
| return Status::OK(); |
| } |
| |
| /** |
| * Check whether there are complex types in parquet/orc reader in broker/stream load. |
| * Broker/stream load will cast any type as string type, and complex types will be casted wrong. |
| * This is a temporary method, and will be replaced by tvf. |
| */ |
| Status FileScanner::_check_output_block_types() { |
| // Only called from _init_src_block_for_load, so _is_load is always true. |
| TFileFormatType::type format_type = _params->format_type; |
| if (format_type == TFileFormatType::FORMAT_PARQUET || |
| format_type == TFileFormatType::FORMAT_ORC) { |
| for (auto slot : _output_tuple_desc->slots()) { |
| if (is_complex_type(slot->type()->get_primitive_type())) { |
| return Status::InternalError( |
| "Parquet/orc doesn't support complex types in broker/stream load, " |
| "please use tvf(table value function) to insert complex types."); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_init_src_block(Block* block) { |
| DCHECK(_init_src_block_handler != nullptr); |
| return (this->*_init_src_block_handler)(block); |
| } |
| |
| Status FileScanner::_init_src_block_for_query(Block* block) { |
| _src_block_ptr = block; |
| |
| // Build name to index map only once on first call. |
| if (_src_block_name_to_idx.empty()) { |
| _src_block_name_to_idx = block->get_name_to_pos_map(); |
| } |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_init_src_block_for_load(Block* block) { |
| static_cast<void>(block); |
| RETURN_IF_ERROR(_check_output_block_types()); |
| |
| // if (_src_block_init) { |
| // _src_block.clear_column_data(); |
| // _src_block_ptr = &_src_block; |
| // return Status::OK(); |
| // } |
| |
| _src_block.clear(); |
| uint32_t idx = 0; |
| // slots in _input_tuple_desc contains all slots describe in load statement, eg: |
| // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1" |
| // _input_tuple_desc will contains: k1, k2, tmp1 |
| // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1 |
| // _input_tuple_desc also contains columns from path |
| for (auto& slot : _input_tuple_desc->slots()) { |
| DataTypePtr data_type; |
| auto it = _slot_lower_name_to_col_type.find(slot->col_name()); |
| if (slot->is_skip_bitmap_col()) { |
| _skip_bitmap_col_idx = idx; |
| } |
| if (_params->__isset.sequence_map_col) { |
| if (_params->sequence_map_col == slot->col_name()) { |
| _sequence_map_col_uid = slot->col_unique_id(); |
| } |
| } |
| data_type = |
| it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second); |
| MutableColumnPtr data_column = data_type->create_column(); |
| _src_block.insert( |
| ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name())); |
| _src_block_name_to_idx.emplace(slot->col_name(), idx++); |
| } |
| if (_params->__isset.sequence_map_col) { |
| for (const auto& slot : _output_tuple_desc->slots()) { |
| // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__, |
| // so we should get its column unique id from _output_tuple_desc |
| if (slot->is_sequence_col()) { |
| _sequence_col_uid = slot->col_unique_id(); |
| } |
| } |
| } |
| _src_block_ptr = &_src_block; |
| _src_block_init = true; |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_cast_to_input_block(Block* block) { |
| // Only called from _process_src_block_after_read_for_load, so _is_load is always true. |
| SCOPED_TIMER(_cast_to_input_block_timer); |
| // cast primitive type(PT0) to primitive type(PT1) |
| uint32_t idx = 0; |
| for (auto& slot_desc : _input_tuple_desc->slots()) { |
| if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) == |
| _slot_lower_name_to_col_type.end()) { |
| // skip columns which does not exist in file |
| continue; |
| } |
| auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]); |
| auto return_type = slot_desc->get_data_type_ptr(); |
| // remove nullable here, let the get_function decide whether nullable |
| auto data_type = get_data_type_with_default_argument(remove_nullable(return_type)); |
| ColumnsWithTypeAndName arguments { |
| arg, {data_type->create_column(), data_type, slot_desc->col_name()}}; |
| auto func_cast = |
| SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {}); |
| if (!func_cast) { |
| return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!", |
| arg.type->get_name(), slot_desc->col_name(), |
| return_type->get_name()); |
| } |
| idx = _src_block_name_to_idx[slot_desc->col_name()]; |
| DCHECK(_state != nullptr); |
| auto ctx = FunctionContext::create_context(_state, {}, {}); |
| RETURN_IF_ERROR( |
| func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size())); |
| _src_block_ptr->get_by_position(idx).type = std::move(return_type); |
| } |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_pre_filter_src_block() { |
| // Only called from _process_src_block_after_read_for_load, so _is_load is always true. |
| if (!_pre_conjunct_ctxs.empty()) { |
| SCOPED_TIMER(_pre_filter_timer); |
| auto origin_column_num = _src_block_ptr->columns(); |
| auto old_rows = _src_block_ptr->rows(); |
| RETURN_IF_ERROR( |
| VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num)); |
| _counter.num_rows_unselected += old_rows - _src_block_ptr->rows(); |
| } |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_convert_to_output_block(Block* block) { |
| // Only called from _process_src_block_after_read_for_load, so _is_load is always true. |
| SCOPED_TIMER(_convert_to_output_block_timer); |
| // The block is passed from scanner context's free blocks, |
| // which is initialized by output columns |
| // so no need to clear it |
| // block->clear(); |
| |
| int ctx_idx = 0; |
| size_t rows = _src_block_ptr->rows(); |
| auto filter_column = ColumnUInt8::create(rows, 1); |
| auto& filter_map = filter_column->get_data(); |
| |
| // After convert, the column_ptr should be copied into output block. |
| // Can not use block->insert() because it may cause use_count() non-zero bug |
| MutableBlock mutable_output_block = |
| VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc); |
| auto& mutable_output_columns = mutable_output_block.mutable_columns(); |
| |
| std::vector<BitmapValue>* skip_bitmaps {nullptr}; |
| if (_should_process_skip_bitmap_col()) { |
| auto* skip_bitmap_nullable_col_ptr = |
| assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx) |
| .column->assume_mutable() |
| .get()); |
| skip_bitmaps = &(assert_cast<ColumnBitmap*>( |
| skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get()) |
| ->get_data()); |
| // NOTE: |
| // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether |
| // __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row |
| // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc, |
| // so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap. |
| // So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked |
| if (_sequence_map_col_uid != -1) { |
| for (int j = 0; j < rows; ++j) { |
| if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) { |
| (*skip_bitmaps)[j].add(_sequence_col_uid); |
| } |
| } |
| } |
| } |
| |
| // for (auto slot_desc : _output_tuple_desc->slots()) { |
| for (int j = 0; j < mutable_output_columns.size(); ++j) { |
| auto* slot_desc = _output_tuple_desc->slots()[j]; |
| int dest_index = ctx_idx; |
| ColumnPtr column_ptr; |
| |
| auto& ctx = _dest_vexpr_ctx[dest_index]; |
| // PT1 => dest primitive type |
| RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr)); |
| // column_ptr maybe a ColumnConst, convert it to a normal column |
| column_ptr = column_ptr->convert_to_full_column_if_const(); |
| DCHECK(column_ptr); |
| |
| // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr |
| // is likely to be nullable |
| if (LIKELY(column_ptr->is_nullable())) { |
| const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get()); |
| for (int i = 0; i < rows; ++i) { |
| if (filter_map[i] && nullable_column->is_null_at(i)) { |
| // skip checks for non-mentioned columns in flexible partial update |
| if (skip_bitmaps == nullptr || |
| !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) { |
| // clang-format off |
| if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) && |
| !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) { |
| filter_map[i] = false; |
| RETURN_IF_ERROR(_state->append_error_msg_to_file( |
| [&]() -> std::string { |
| return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file); |
| }, |
| [&]() -> std::string { |
| auto raw_value = |
| _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i); |
| std::string raw_string = raw_value.to_string(); |
| fmt::memory_buffer error_msg; |
| fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}", |
| slot_desc->col_name(), _strict_mode, raw_string); |
| return fmt::to_string(error_msg); |
| })); |
| } else if (!slot_desc->is_nullable()) { |
| filter_map[i] = false; |
| RETURN_IF_ERROR(_state->append_error_msg_to_file( |
| [&]() -> std::string { |
| return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file); |
| }, |
| [&]() -> std::string { |
| fmt::memory_buffer error_msg; |
| fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name()); |
| return fmt::to_string(error_msg); |
| })); |
| } |
| // clang-format on |
| } |
| } |
| } |
| if (!slot_desc->is_nullable()) { |
| column_ptr = remove_nullable(column_ptr); |
| } |
| } else if (slot_desc->is_nullable()) { |
| column_ptr = make_nullable(column_ptr); |
| } |
| mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows); |
| ctx_idx++; |
| } |
| |
| // after do the dest block insert operation, clear _src_block to remove the reference of origin column |
| _src_block_ptr->clear(); |
| |
| size_t dest_size = block->columns(); |
| // do filter |
| block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(), |
| "filter column")); |
| RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size)); |
| |
| _counter.num_rows_filtered += rows - block->rows(); |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_process_src_block_after_read(Block* block) { |
| DCHECK(_process_src_block_after_read_handler != nullptr); |
| return (this->*_process_src_block_after_read_handler)(block); |
| } |
| |
| Status FileScanner::_process_src_block_after_read_for_query(Block* block) { |
| // Truncate CHAR/VARCHAR columns when target size is smaller than file schema. |
| // This is needed for external table queries with truncate_char_or_varchar_columns=true. |
| RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_process_src_block_after_read_for_load(Block* block) { |
| // Convert the src block columns type in-place. |
| RETURN_IF_ERROR(_cast_to_input_block(block)); |
| // All readers fill partition and missing columns inside get_next_block: |
| // ORC/Parquet: in _do_get_next_block via on_fill_partition_columns/on_fill_missing_columns |
| // CSV/JSON/others: in on_after_read_block via fill_remaining_columns |
| // Assert all columns have consistent row counts. |
| if (_src_block_ptr->columns() > 0) { |
| size_t rows = _src_block_ptr->get_by_position(0).column->size(); |
| for (size_t i = 1; i < _src_block_ptr->columns(); ++i) { |
| DCHECK_EQ(_src_block_ptr->get_by_position(i).column->size(), rows) |
| << "Column " << _src_block_ptr->get_by_position(i).name << " has " |
| << _src_block_ptr->get_by_position(i).column->size() << " rows, expected " |
| << rows; |
| } |
| } |
| // Apply _pre_conjunct_ctxs to filter src block. |
| RETURN_IF_ERROR(_pre_filter_src_block()); |
| |
| // Convert src block to output block (dest block), then apply filters. |
| RETURN_IF_ERROR(_convert_to_output_block(block)); |
| // Truncate CHAR/VARCHAR columns when target size is smaller than file schema. |
| RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_truncate_char_or_varchar_columns(Block* block) { |
| // Truncate char columns or varchar columns if size is smaller than file columns |
| // or not found in the file column schema. |
| if (!_state->query_options().truncate_char_or_varchar_columns) { |
| return Status::OK(); |
| } |
| int idx = 0; |
| for (auto* slot_desc : _real_tuple_desc->slots()) { |
| const auto& type = slot_desc->type(); |
| if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) { |
| ++idx; |
| continue; |
| } |
| auto iter = _source_file_col_name_types.find(slot_desc->col_name()); |
| if (iter != _source_file_col_name_types.end()) { |
| const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()]; |
| int l = -1; |
| if (auto* ftype = check_and_get_data_type<DataTypeString>( |
| remove_nullable(file_type_desc).get())) { |
| l = ftype->len(); |
| } |
| if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) && |
| (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l || |
| l < 0)) { |
| _truncate_char_or_varchar_column( |
| block, idx, |
| assert_cast<const DataTypeString*>(remove_nullable(type).get())->len()); |
| } |
| } else { |
| _truncate_char_or_varchar_column( |
| block, idx, |
| assert_cast<const DataTypeString*>(remove_nullable(type).get())->len()); |
| } |
| ++idx; |
| } |
| return Status::OK(); |
| } |
| |
| // VARCHAR substring(VARCHAR str, INT pos[, INT len]) |
| void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) { |
| auto int_type = std::make_shared<DataTypeInt32>(); |
| uint32_t num_columns_without_result = block->columns(); |
| const ColumnNullable* col_nullable = |
| assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get()); |
| const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr(); |
| ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr(); |
| block->replace_by_position(idx, std::move(string_column_ptr)); |
| block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type, |
| "const 1"}); // pos is 1 |
| block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type, |
| fmt::format("const {}", len)}); // len |
| block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column |
| ColumnNumbers temp_arguments(3); |
| temp_arguments[0] = idx; // str column |
| temp_arguments[1] = num_columns_without_result; // pos |
| temp_arguments[2] = num_columns_without_result + 1; // len |
| uint32_t result_column_id = num_columns_without_result + 2; |
| |
| SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows()); |
| auto res = ColumnNullable::create(block->get_by_position(result_column_id).column, |
| null_map_column_ptr); |
| block->replace_by_position(idx, std::move(res)); |
| Block::erase_useless_column(block, num_columns_without_result); |
| } |
| |
| std::shared_ptr<segment_v2::RowIdColumnIteratorV2> FileScanner::_create_row_id_column_iterator() { |
| auto& id_file_map = _state->get_id_file_map(); |
| auto file_id = id_file_map->get_file_mapping_id( |
| std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(), |
| _current_range, _should_enable_file_meta_cache())); |
| return std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION, |
| BackendOptions::get_backend_id(), file_id); |
| } |
| |
| void FileScanner::_fill_base_init_context(ReaderInitContext* ctx) { |
| ctx->column_descs = &_column_descs; |
| ctx->col_name_to_block_idx = &_src_block_name_to_idx; |
| ctx->state = _state; |
| ctx->tuple_descriptor = _real_tuple_desc; |
| ctx->row_descriptor = _default_val_row_desc.get(); |
| ctx->params = _params; |
| ctx->range = &_current_range; |
| ctx->table_info_node = TableSchemaChangeHelper::ConstNode::get_instance(); |
| ctx->push_down_agg_type = _get_push_down_agg_type(); |
| } |
| |
| Status FileScanner::_get_next_reader() { |
| while (true) { |
| if (_cur_reader) { |
| _cur_reader->collect_profile_before_close(); |
| RETURN_IF_ERROR(_cur_reader->close()); |
| _state->update_num_finished_scan_range(1); |
| } |
| _cur_reader.reset(nullptr); |
| _src_block_init = false; |
| bool has_next = _first_scan_range; |
| if (!_first_scan_range) { |
| RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range)); |
| } |
| _first_scan_range = false; |
| if (!has_next || _should_stop) { |
| _scanner_eof = true; |
| return Status::OK(); |
| } |
| |
| const TFileRangeDesc& range = _current_range; |
| _current_range_path = range.path; |
| |
| if (!_partition_slot_index_map.empty()) { |
| // we need get partition columns first for runtime filter partition pruning |
| RETURN_IF_ERROR(_generate_partition_columns()); |
| |
| if (_state->query_options().enable_runtime_filter_partition_prune) { |
| // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out |
| // by runtime filter partition prune |
| if (_push_down_conjuncts.size() < _conjuncts.size()) { |
| // there are new runtime filters, need to re-init runtime filter partition pruning ctxs |
| _init_runtime_filter_partition_prune_ctxs(); |
| } |
| |
| bool can_filter_all = false; |
| RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all)); |
| if (can_filter_all) { |
| // this range can be filtered out by runtime filter partition pruning |
| // so we need to skip this range |
| COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1); |
| continue; |
| } |
| } |
| } |
| |
| // create reader for specific format |
| Status init_status = Status::OK(); |
| TFileFormatType::type format_type = _get_current_format_type(); |
| // for compatibility, this logic is deprecated in 3.1 |
| if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) { |
| if (range.table_format_params.table_format_type == "paimon" && |
| !range.table_format_params.paimon_params.__isset.paimon_split) { |
| // use native reader |
| auto format = range.table_format_params.paimon_params.file_format; |
| if (format == "orc") { |
| format_type = TFileFormatType::FORMAT_ORC; |
| } else if (format == "parquet") { |
| format_type = TFileFormatType::FORMAT_PARQUET; |
| } else { |
| return Status::InternalError("Not supported paimon file format: {}", format); |
| } |
| } |
| } |
| |
| bool push_down_predicates = _should_push_down_predicates(format_type); |
| bool need_to_get_parsed_schema = false; |
| switch (format_type) { |
| case TFileFormatType::FORMAT_JNI: { |
| ReaderInitContext jni_ctx; |
| _fill_base_init_context(&jni_ctx); |
| |
| if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "max_compute") { |
| const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>( |
| _real_tuple_desc->table_desc()); |
| if (!mc_desc->init_status()) { |
| return mc_desc->init_status(); |
| } |
| std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique( |
| mc_desc, range.table_format_params.max_compute_params, _file_slot_descs, |
| range, _state, _profile); |
| init_status = static_cast<GenericReader*>(mc_reader.get())->init_reader(&jni_ctx); |
| _cur_reader = std::move(mc_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "paimon") { |
| if (_state->query_options().__isset.enable_paimon_cpp_reader && |
| _state->query_options().enable_paimon_cpp_reader) { |
| auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state, |
| _profile, range, _params); |
| if (!_is_load && !_push_down_conjuncts.empty()) { |
| PaimonPredicateConverter predicate_converter(_file_slot_descs, _state); |
| auto predicate = predicate_converter.build(_push_down_conjuncts); |
| if (predicate) { |
| cpp_reader->set_predicate(std::move(predicate)); |
| } |
| } |
| init_status = |
| static_cast<GenericReader*>(cpp_reader.get())->init_reader(&jni_ctx); |
| _cur_reader = std::move(cpp_reader); |
| } else { |
| auto paimon_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, |
| _profile, range, _params); |
| init_status = |
| static_cast<GenericReader*>(paimon_reader.get())->init_reader(&jni_ctx); |
| _cur_reader = std::move(paimon_reader); |
| } |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "hudi") { |
| auto hudi_reader = HudiJniReader::create_unique( |
| *_params, range.table_format_params.hudi_params, _file_slot_descs, _state, |
| _profile); |
| init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&jni_ctx); |
| _cur_reader = std::move(hudi_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "trino_connector") { |
| auto trino_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state, |
| _profile, range); |
| init_status = |
| static_cast<GenericReader*>(trino_reader.get())->init_reader(&jni_ctx); |
| _cur_reader = std::move(trino_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "jdbc") { |
| // Extract jdbc params from table_format_params |
| std::map<std::string, std::string> jdbc_params( |
| range.table_format_params.jdbc_params.begin(), |
| range.table_format_params.jdbc_params.end()); |
| auto jdbc_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile, |
| jdbc_params); |
| init_status = static_cast<GenericReader*>(jdbc_reader.get())->init_reader(&jni_ctx); |
| _cur_reader = std::move(jdbc_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "iceberg") { |
| auto iceberg_sys_reader = IcebergSysTableJniReader::create_unique( |
| _file_slot_descs, _state, _profile, range, _params); |
| init_status = static_cast<GenericReader*>(iceberg_sys_reader.get()) |
| ->init_reader(&jni_ctx); |
| _cur_reader = std::move(iceberg_sys_reader); |
| } |
| // Set col_name_to_block_idx for JNI readers to avoid repeated map creation |
| if (_cur_reader) { |
| if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) { |
| jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx); |
| } |
| } |
| break; |
| } |
| case TFileFormatType::FORMAT_PARQUET: { |
| auto file_meta_cache_ptr = _should_enable_file_meta_cache() |
| ? ExecEnv::GetInstance()->file_meta_cache() |
| : nullptr; |
| if (push_down_predicates) { |
| RETURN_IF_ERROR(_process_late_arrival_conjuncts()); |
| } |
| RETURN_IF_ERROR(_init_parquet_reader(file_meta_cache_ptr)); |
| |
| need_to_get_parsed_schema = true; |
| break; |
| } |
| case TFileFormatType::FORMAT_ORC: { |
| auto file_meta_cache_ptr = _should_enable_file_meta_cache() |
| ? ExecEnv::GetInstance()->file_meta_cache() |
| : nullptr; |
| if (push_down_predicates) { |
| RETURN_IF_ERROR(_process_late_arrival_conjuncts()); |
| } |
| RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr)); |
| |
| need_to_get_parsed_schema = true; |
| break; |
| } |
| case TFileFormatType::FORMAT_CSV_PLAIN: |
| case TFileFormatType::FORMAT_CSV_GZ: |
| case TFileFormatType::FORMAT_CSV_BZ2: |
| case TFileFormatType::FORMAT_CSV_LZ4FRAME: |
| case TFileFormatType::FORMAT_CSV_LZ4BLOCK: |
| case TFileFormatType::FORMAT_CSV_LZOP: |
| case TFileFormatType::FORMAT_CSV_DEFLATE: |
| case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: |
| case TFileFormatType::FORMAT_PROTO: { |
| auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range, |
| _file_slot_descs, _io_ctx.get()); |
| CsvInitContext csv_ctx; |
| _fill_base_init_context(&csv_ctx); |
| csv_ctx.is_load = _is_load; |
| init_status = static_cast<GenericReader*>(reader.get())->init_reader(&csv_ctx); |
| _cur_reader = std::move(reader); |
| break; |
| } |
| case TFileFormatType::FORMAT_TEXT: { |
| auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range, |
| _file_slot_descs, _io_ctx.get()); |
| CsvInitContext text_ctx; |
| _fill_base_init_context(&text_ctx); |
| text_ctx.is_load = _is_load; |
| init_status = static_cast<GenericReader*>(reader.get())->init_reader(&text_ctx); |
| _cur_reader = std::move(reader); |
| break; |
| } |
| case TFileFormatType::FORMAT_JSON: { |
| _cur_reader = |
| NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range, |
| _file_slot_descs, &_scanner_eof, _io_ctx.get()); |
| JsonInitContext json_ctx; |
| _fill_base_init_context(&json_ctx); |
| json_ctx.col_default_value_ctx = &_col_default_value_ctx; |
| json_ctx.is_load = _is_load; |
| init_status = _cur_reader->init_reader(&json_ctx); |
| break; |
| } |
| |
| case TFileFormatType::FORMAT_WAL: { |
| _cur_reader = WalReader::create_unique(_state); |
| WalInitContext wal_ctx; |
| _fill_base_init_context(&wal_ctx); |
| wal_ctx.output_tuple_descriptor = _output_tuple_desc; |
| init_status = _cur_reader->init_reader(&wal_ctx); |
| break; |
| } |
| case TFileFormatType::FORMAT_NATIVE: { |
| auto reader = |
| NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state); |
| ReaderInitContext native_ctx; |
| _fill_base_init_context(&native_ctx); |
| init_status = static_cast<GenericReader*>(reader.get())->init_reader(&native_ctx); |
| _cur_reader = std::move(reader); |
| need_to_get_parsed_schema = false; |
| break; |
| } |
| case TFileFormatType::FORMAT_ARROW: { |
| ReaderInitContext arrow_ctx; |
| _fill_base_init_context(&arrow_ctx); |
| |
| if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "remote_doris") { |
| auto doris_reader = |
| RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range); |
| init_status = |
| static_cast<GenericReader*>(doris_reader.get())->init_reader(&arrow_ctx); |
| if (doris_reader) { |
| doris_reader->set_col_name_to_block_idx(&_src_block_name_to_idx); |
| } |
| _cur_reader = std::move(doris_reader); |
| } else { |
| auto arrow_reader = |
| ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params, |
| range, _file_slot_descs, _io_ctx.get()); |
| init_status = |
| static_cast<GenericReader*>(arrow_reader.get())->init_reader(&arrow_ctx); |
| _cur_reader = std::move(arrow_reader); |
| } |
| break; |
| } |
| #ifdef BUILD_RUST_READERS |
| case TFileFormatType::FORMAT_LANCE: { |
| auto lance_reader = LanceRustReader::create_unique(_file_slot_descs, _state, _profile, |
| range, _params); |
| init_status = lance_reader->init_reader(); |
| _cur_reader = std::move(lance_reader); |
| need_to_get_parsed_schema = true; |
| break; |
| } |
| #endif |
| case TFileFormatType::FORMAT_ES_HTTP: { |
| _cur_reader = EsHttpReader::create_unique(_file_slot_descs, _state, _profile, range, |
| *_params, _real_tuple_desc); |
| init_status = static_cast<EsHttpReader*>(_cur_reader.get())->init_reader(); |
| break; |
| } |
| default: |
| return Status::NotSupported("Not supported create reader for file format: {}.", |
| to_string(_params->format_type)); |
| } |
| |
| if (_cur_reader == nullptr) { |
| return Status::NotSupported( |
| "Not supported create reader for table format: {} / file format: {}.", |
| range.__isset.table_format_params ? range.table_format_params.table_format_type |
| : "NotSet", |
| to_string(_params->format_type)); |
| } |
| COUNTER_UPDATE(_file_counter, 1); |
| // The FileScanner for external table may try to open not exist files, |
| // Because FE file cache for external table may out of date. |
| // So, NOT_FOUND for FileScanner is not a fail case. |
| // Will remove this after file reader refactor. |
| if (init_status.is<END_OF_FILE>()) { |
| COUNTER_UPDATE(_empty_file_counter, 1); |
| continue; |
| } else if (init_status.is<ErrorCode::NOT_FOUND>()) { |
| if (config::ignore_not_found_file_in_external_table) { |
| COUNTER_UPDATE(_not_found_file_counter, 1); |
| continue; |
| } |
| return Status::InternalError("failed to find reader, err: {}", init_status.to_string()); |
| } else if (!init_status.ok()) { |
| return Status::InternalError("failed to init reader, err: {}", init_status.to_string()); |
| } |
| |
| // For table-level COUNT pushdown, offsets are undefined so we must skip |
| // _set_fill_or_truncate_columns (it uses [start_offset, end_offset] to |
| // filter row groups, which would produce incorrect empty results). |
| bool is_table_level_count = _get_push_down_agg_type() == TPushAggOp::type::COUNT && |
| range.__isset.table_format_params && |
| range.table_format_params.table_level_row_count >= 0; |
| if (!is_table_level_count) { |
| Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema); |
| if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered |
| continue; |
| } else if (!status.ok()) { |
| return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}", |
| status.to_string()); |
| } |
| } |
| |
| // Unified COUNT(*) pushdown: replace the real reader with CountReader |
| // decorator if the reader accepts COUNT and can provide a total row count. |
| if (_cur_reader->get_push_down_agg_type() == TPushAggOp::type::COUNT) { |
| int64_t total_rows = -1; |
| if (is_table_level_count) { |
| // FE-provided count (may account for table-format deletions) |
| total_rows = range.table_format_params.table_level_row_count; |
| } else if (_cur_reader->supports_count_pushdown()) { |
| // File metadata count (ORC footer / Parquet row groups) |
| total_rows = _cur_reader->get_total_rows(); |
| } |
| if (total_rows >= 0) { |
| auto batch_size = _state->query_options().batch_size; |
| _cur_reader = std::make_unique<CountReader>(total_rows, batch_size, |
| std::move(_cur_reader)); |
| } |
| } |
| _cur_reader_eof = false; |
| break; |
| } |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr, |
| std::unique_ptr<ParquetReader> parquet_reader) { |
| const TFileRangeDesc& range = _current_range; |
| Status init_status = Status::OK(); |
| |
| phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates = |
| _local_state |
| ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates |
| : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {}; |
| |
| // Build unified ParquetInitContext (shared by all Parquet reader variants) |
| ParquetInitContext pctx; |
| _fill_base_init_context(&pctx); |
| pctx.conjuncts = &_push_down_conjuncts; |
| pctx.slot_id_to_predicates = &slot_id_to_predicates; |
| pctx.colname_to_slot_id = _col_name_to_slot_id; |
| pctx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts; |
| pctx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts; |
| |
| if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "iceberg") { |
| // IcebergParquetReader IS-A ParquetReader (CRTP mixin), no wrapping needed |
| std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique( |
| _kv_cache, _profile, *_params, range, _state->query_options().batch_size, |
| &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr); |
| iceberg_reader->set_create_row_id_column_iterator_func( |
| [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> { |
| return _create_row_id_column_iterator(); |
| }); |
| init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&pctx); |
| _cur_reader = std::move(iceberg_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "paimon") { |
| // PaimonParquetReader IS-A ParquetReader, no wrapping needed |
| auto paimon_reader = PaimonParquetReader::create_unique( |
| _profile, *_params, range, _state->query_options().batch_size, |
| &_state->timezone_obj(), _kv_cache, _io_ctx.get(), _state, file_meta_cache_ptr); |
| init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&pctx); |
| _cur_reader = std::move(paimon_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "hudi") { |
| // HudiParquetReader IS-A ParquetReader, no wrapping needed |
| auto hudi_reader = HudiParquetReader::create_unique( |
| _profile, *_params, range, _state->query_options().batch_size, |
| &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr); |
| init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&pctx); |
| _cur_reader = std::move(hudi_reader); |
| } else if (range.table_format_params.table_format_type == "hive") { |
| auto hive_reader = HiveParquetReader::create_unique( |
| _profile, *_params, range, _state->query_options().batch_size, |
| &_state->timezone_obj(), _io_ctx.get(), _state, &_is_file_slot, file_meta_cache_ptr, |
| _state->query_options().enable_parquet_lazy_mat); |
| hive_reader->set_create_row_id_column_iterator_func( |
| [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> { |
| return _create_row_id_column_iterator(); |
| }); |
| init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&pctx); |
| _cur_reader = std::move(hive_reader); |
| } else if (range.table_format_params.table_format_type == "tvf") { |
| if (!parquet_reader) { |
| parquet_reader = ParquetReader::create_unique( |
| _profile, *_params, range, _state->query_options().batch_size, |
| &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr, |
| _state->query_options().enable_parquet_lazy_mat); |
| } |
| parquet_reader->set_create_row_id_column_iterator_func( |
| [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> { |
| return _create_row_id_column_iterator(); |
| }); |
| init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx); |
| _cur_reader = std::move(parquet_reader); |
| } else if (_is_load) { |
| if (!parquet_reader) { |
| parquet_reader = ParquetReader::create_unique( |
| _profile, *_params, range, _state->query_options().batch_size, |
| &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr, |
| _state->query_options().enable_parquet_lazy_mat); |
| } |
| init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx); |
| _cur_reader = std::move(parquet_reader); |
| } |
| |
| return init_status; |
| } |
| |
| Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, |
| std::unique_ptr<OrcReader> orc_reader) { |
| const TFileRangeDesc& range = _current_range; |
| Status init_status = Status::OK(); |
| |
| // Build unified OrcInitContext (shared by all ORC reader variants) |
| OrcInitContext octx; |
| _fill_base_init_context(&octx); |
| octx.conjuncts = &_push_down_conjuncts; |
| octx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts; |
| octx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts; |
| |
| if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "transactional_hive") { |
| // TransactionalHiveReader IS-A OrcReader, no wrapping needed |
| auto tran_orc_reader = TransactionalHiveReader::create_unique( |
| _profile, _state, *_params, range, _state->query_options().batch_size, |
| _state->timezone(), _io_ctx.get(), file_meta_cache_ptr); |
| tran_orc_reader->set_create_row_id_column_iterator_func( |
| [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> { |
| return _create_row_id_column_iterator(); |
| }); |
| init_status = static_cast<GenericReader*>(tran_orc_reader.get())->init_reader(&octx); |
| |
| _cur_reader = std::move(tran_orc_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "iceberg") { |
| // IcebergOrcReader IS-A OrcReader (CRTP mixin), no wrapping needed |
| std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique( |
| _kv_cache, _profile, _state, *_params, range, _state->query_options().batch_size, |
| _state->timezone(), _io_ctx.get(), file_meta_cache_ptr); |
| iceberg_reader->set_create_row_id_column_iterator_func( |
| [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> { |
| return _create_row_id_column_iterator(); |
| }); |
| init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&octx); |
| |
| _cur_reader = std::move(iceberg_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "paimon") { |
| // PaimonOrcReader IS-A OrcReader, no wrapping needed |
| auto paimon_reader = PaimonOrcReader::create_unique( |
| _profile, _state, *_params, range, _state->query_options().batch_size, |
| _state->timezone(), _kv_cache, _io_ctx.get(), file_meta_cache_ptr); |
| init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&octx); |
| |
| _cur_reader = std::move(paimon_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "hudi") { |
| // HudiOrcReader IS-A OrcReader, no wrapping needed |
| auto hudi_reader = HudiOrcReader::create_unique( |
| _profile, _state, *_params, range, _state->query_options().batch_size, |
| _state->timezone(), _io_ctx.get(), file_meta_cache_ptr); |
| init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&octx); |
| |
| _cur_reader = std::move(hudi_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "hive") { |
| auto hive_reader = HiveOrcReader::create_unique( |
| _profile, _state, *_params, range, _state->query_options().batch_size, |
| _state->timezone(), _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr, |
| _state->query_options().enable_orc_lazy_mat); |
| hive_reader->set_create_row_id_column_iterator_func( |
| [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> { |
| return _create_row_id_column_iterator(); |
| }); |
| init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&octx); |
| |
| _cur_reader = std::move(hive_reader); |
| } else if (range.__isset.table_format_params && |
| range.table_format_params.table_format_type == "tvf") { |
| if (!orc_reader) { |
| orc_reader = OrcReader::create_unique( |
| _profile, _state, *_params, range, _state->query_options().batch_size, |
| _state->timezone(), _io_ctx.get(), file_meta_cache_ptr, |
| _state->query_options().enable_orc_lazy_mat); |
| } |
| orc_reader->set_create_row_id_column_iterator_func( |
| [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> { |
| return _create_row_id_column_iterator(); |
| }); |
| init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx); |
| _cur_reader = std::move(orc_reader); |
| } else if (_is_load) { |
| if (!orc_reader) { |
| orc_reader = OrcReader::create_unique( |
| _profile, _state, *_params, range, _state->query_options().batch_size, |
| _state->timezone(), _io_ctx.get(), file_meta_cache_ptr, |
| _state->query_options().enable_orc_lazy_mat); |
| } |
| init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx); |
| _cur_reader = std::move(orc_reader); |
| } |
| |
| return init_status; |
| } |
| |
| Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) { |
| _slot_lower_name_to_col_type.clear(); |
| |
| std::unordered_map<std::string, DataTypePtr> name_to_col_type; |
| RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type)); |
| |
| for (const auto& [col_name, col_type] : name_to_col_type) { |
| auto col_name_lower = to_lower(col_name); |
| if (_partition_col_descs.contains(col_name_lower)) { |
| /* |
| * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to |
| * generate columns of the corresponding type, which records the columns existing in the file. |
| * |
| * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will |
| * not match the slot type in `_output_tuple_desc`, causing an error when |
| * Serde `deserialize_one_cell_from_json` fills the partition values. |
| * |
| * So for partition column not need fill _slot_lower_name_to_col_type. |
| */ |
| continue; |
| } |
| _slot_lower_name_to_col_type.emplace(col_name_lower, col_type); |
| } |
| |
| RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema)); |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) { |
| _source_file_col_name_types.clear(); |
| // The col names and types of source file, such as parquet, orc files. |
| if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) { |
| std::vector<std::string> source_file_col_names; |
| std::vector<DataTypePtr> source_file_col_types; |
| Status status = |
| _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types); |
| if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) { |
| return status; |
| } |
| DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size()); |
| for (int i = 0; i < source_file_col_names.size(); ++i) { |
| _source_file_col_name_types[to_lower(source_file_col_names[i])] = |
| source_file_col_types[i]; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) { |
| _current_range = range; |
| |
| _file_cache_statistics.reset(new io::FileCacheStatistics()); |
| _file_reader_stats.reset(new io::FileReaderStats()); |
| |
| _file_read_bytes_counter = |
| ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1); |
| _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1); |
| |
| RETURN_IF_ERROR(_init_io_ctx()); |
| _io_ctx->file_cache_stats = _file_cache_statistics.get(); |
| _io_ctx->file_reader_stats = _file_reader_stats.get(); |
| _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc)); |
| RETURN_IF_ERROR(_init_expr_ctxes()); |
| |
| // Since only one column is read from the file, there is no need to filter, so set these variables to empty. |
| _push_down_conjuncts.clear(); |
| _not_single_slot_filter_conjuncts.clear(); |
| _slot_id_to_filter_conjuncts.clear(); |
| _kv_cache = nullptr; |
| return Status::OK(); |
| } |
| |
| Status FileScanner::read_lines_from_range(const TFileRangeDesc& range, |
| const std::list<int64_t>& row_ids, Block* result_block, |
| const ExternalFileMappingInfo& external_info, |
| int64_t* init_reader_ms, int64_t* get_block_ms) { |
| _current_range = range; |
| RETURN_IF_ERROR(_generate_partition_columns()); |
| |
| TFileFormatType::type format_type = _get_current_format_type(); |
| Status init_status = Status::OK(); |
| |
| auto file_meta_cache_ptr = external_info.enable_file_meta_cache |
| ? ExecEnv::GetInstance()->file_meta_cache() |
| : nullptr; |
| |
| RETURN_IF_ERROR(scope_timer_run( |
| [&]() -> Status { |
| switch (format_type) { |
| case TFileFormatType::FORMAT_PARQUET: { |
| std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique( |
| _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(), |
| _state, file_meta_cache_ptr, false); |
| RETURN_IF_ERROR( |
| _init_parquet_reader(file_meta_cache_ptr, std::move(parquet_reader))); |
| // _init_parquet_reader may create a new table-format specific reader |
| // (e.g., HiveParquetReader) that replaces the original parquet_reader. |
| // We need to re-apply read_by_rows to the actual _cur_reader. |
| RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids)); |
| break; |
| } |
| case TFileFormatType::FORMAT_ORC: { |
| std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique( |
| _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(), |
| file_meta_cache_ptr, false); |
| RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr, std::move(orc_reader))); |
| // Same as above: re-apply read_by_rows to the actual _cur_reader. |
| RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids)); |
| break; |
| } |
| default: { |
| return Status::NotSupported( |
| "Not support create lines reader for file format: {}," |
| "only support parquet and orc.", |
| to_string(_params->format_type)); |
| } |
| } |
| return Status::OK(); |
| }, |
| init_reader_ms)); |
| |
| RETURN_IF_ERROR(_set_fill_or_truncate_columns(true)); |
| _cur_reader_eof = false; |
| |
| RETURN_IF_ERROR(scope_timer_run( |
| [&]() -> Status { |
| while (!_cur_reader_eof) { |
| bool eof = false; |
| RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof)); |
| } |
| return Status::OK(); |
| }, |
| get_block_ms)); |
| |
| _cur_reader->collect_profile_before_close(); |
| RETURN_IF_ERROR(_cur_reader->close()); |
| |
| COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes); |
| COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns); |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_generate_partition_columns() { |
| _partition_col_descs.clear(); |
| _partition_value_is_null.clear(); |
| const TFileRangeDesc& range = _current_range; |
| if (!range.__isset.columns_from_path_keys) { |
| return Status::OK(); |
| } |
| |
| std::unordered_map<std::string, int> partition_name_to_key_index; |
| int index = 0; |
| for (const auto& key : range.columns_from_path_keys) { |
| partition_name_to_key_index.emplace(key, index++); |
| } |
| |
| // Iterate _column_descs to find PARTITION_KEY columns instead of _partition_slot_descs. |
| for (const auto& col_desc : _column_descs) { |
| if (col_desc.category != ColumnCategory::PARTITION_KEY) { |
| continue; |
| } |
| auto pit = partition_name_to_key_index.find(col_desc.name); |
| if (pit != partition_name_to_key_index.end()) { |
| int values_index = pit->second; |
| if (range.__isset.columns_from_path && values_index < range.columns_from_path.size()) { |
| _partition_col_descs.emplace( |
| col_desc.name, |
| std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc)); |
| if (range.__isset.columns_from_path_is_null) { |
| _partition_value_is_null.emplace(col_desc.name, |
| range.columns_from_path_is_null[values_index]); |
| } |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status FileScanner::_init_expr_ctxes() { |
| std::map<SlotId, int> full_src_index_map; |
| std::map<SlotId, SlotDescriptor*> full_src_slot_map; |
| std::map<std::string, int> partition_name_to_key_index_map; |
| int index = 0; |
| for (const auto& slot_desc : _real_tuple_desc->slots()) { |
| full_src_slot_map.emplace(slot_desc->id(), slot_desc); |
| full_src_index_map.emplace(slot_desc->id(), index++); |
| } |
| |
| // For external table query, find the index of column in path. |
| // Because query doesn't always search for all columns in a table |
| // and the order of selected columns is random. |
| // All ranges in _ranges vector should have identical columns_from_path_keys |
| // because they are all file splits for the same external table. |
| // So here use the first element of _ranges to fill the partition_name_to_key_index_map |
| if (_current_range.__isset.columns_from_path_keys) { |
| std::vector<std::string> key_map = _current_range.columns_from_path_keys; |
| if (!key_map.empty()) { |
| for (size_t i = 0; i < key_map.size(); i++) { |
| partition_name_to_key_index_map.emplace(key_map[i], i); |
| } |
| } |
| } |
| |
| _num_of_columns_from_file = _params->num_of_columns_from_file; |
| for (const auto& slot_info : _params->required_slots) { |
| auto slot_id = slot_info.slot_id; |
| auto it = full_src_slot_map.find(slot_id); |
| if (it == std::end(full_src_slot_map)) { |
| return Status::InternalError( |
| fmt::format("Unknown source slot descriptor, slot_id={}", slot_id)); |
| } |
| |
| ColumnDescriptor col_desc; |
| col_desc.name = it->second->col_name(); |
| col_desc.slot_desc = it->second; |
| |
| // Read category from Thrift if available (new FE), otherwise fall back |
| // to slot_info.is_file_slot + partition_name_to_key_index_map for broker/stream load |
| // where the FE does not set TColumnCategory. |
| if (slot_info.__isset.category) { |
| switch (slot_info.category) { |
| case TColumnCategory::REGULAR: |
| col_desc.category = ColumnCategory::REGULAR; |
| break; |
| case TColumnCategory::PARTITION_KEY: |
| col_desc.category = ColumnCategory::PARTITION_KEY; |
| break; |
| case TColumnCategory::SYNTHESIZED: |
| col_desc.category = ColumnCategory::SYNTHESIZED; |
| break; |
| case TColumnCategory::GENERATED: |
| col_desc.category = ColumnCategory::GENERATED; |
| break; |
| } |
| } else if (partition_name_to_key_index_map.contains(it->second->col_name()) && |
| !slot_info.is_file_slot) { |
| col_desc.category = ColumnCategory::PARTITION_KEY; |
| } |
| |
| // Derive is_file_slot from category |
| bool is_file_slot = (col_desc.category == ColumnCategory::REGULAR || |
| col_desc.category == ColumnCategory::GENERATED); |
| |
| if (partition_name_to_key_index_map.contains(it->second->col_name())) { |
| if (_is_load) { |
| auto iti = full_src_index_map.find(slot_id); |
| _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file); |
| } else { |
| auto kit = partition_name_to_key_index_map.find(it->second->col_name()); |
| _partition_slot_index_map.emplace(slot_id, kit->second); |
| } |
| } |
| |
| if (is_file_slot) { |
| _is_file_slot.emplace(slot_id); |
| _file_slot_descs.emplace_back(it->second); |
| _file_col_names.push_back(it->second->col_name()); |
| } |
| |
| _column_descs.push_back(col_desc); |
| } |
| |
| // set column name to default value expr map |
| // new inline TFileScanSlotInfo.default_value_expr (preferred) |
| for (const auto& slot_info : _params->required_slots) { |
| auto slot_id = slot_info.slot_id; |
| auto it = full_src_slot_map.find(slot_id); |
| if (it == std::end(full_src_slot_map)) { |
| continue; |
| } |
| const std::string& col_name = it->second->col_name(); |
| |
| VExprContextSPtr ctx; |
| bool has_default = false; |
| |
| // Prefer inline default_value_expr from TFileScanSlotInfo (new FE) |
| if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) { |
| RETURN_IF_ERROR(VExpr::create_expr_tree(slot_info.default_value_expr, ctx)); |
| RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc)); |
| RETURN_IF_ERROR(ctx->open(_state)); |
| has_default = true; |
| } else if (slot_info.__isset.default_value_expr) { |
| // Empty nodes means null default (same as legacy empty TExpr) |
| has_default = true; |
| } |
| |
| // Fall back to legacy default_value_of_src_slot map for mixed-version FE/BE |
| // and callers that have not preserved inline default_value_expr yet. |
| if (!has_default) { |
| auto legacy_it = _params->default_value_of_src_slot.find(slot_id); |
| if (legacy_it != std::end(_params->default_value_of_src_slot)) { |
| if (!legacy_it->second.nodes.empty()) { |
| RETURN_IF_ERROR(VExpr::create_expr_tree(legacy_it->second, ctx)); |
| RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc)); |
| RETURN_IF_ERROR(ctx->open(_state)); |
| } |
| has_default = true; |
| } |
| } |
| |
| if (has_default) { |
| // if expr is empty, the default value will be null |
| _col_default_value_ctx.emplace(col_name, ctx); |
| } |
| } |
| |
| // Populate default_expr in each ColumnDescriptor from _col_default_value_ctx. |
| // This makes default values available to readers via column_descs, eliminating the |
| // need for the separate _generate_missing_columns roundtrip. |
| for (auto& col_desc : _column_descs) { |
| auto it = _col_default_value_ctx.find(col_desc.name); |
| if (it != _col_default_value_ctx.end()) { |
| col_desc.default_expr = it->second; |
| } |
| } |
| |
| if (_is_load) { |
| // follow desc expr map is only for load task. |
| bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans; |
| int idx = 0; |
| for (auto* slot_desc : _output_tuple_desc->slots()) { |
| auto it = _params->expr_of_dest_slot.find(slot_desc->id()); |
| if (it == std::end(_params->expr_of_dest_slot)) { |
| return Status::InternalError("No expr for dest slot, id={}, name={}", |
| slot_desc->id(), slot_desc->col_name()); |
| } |
| |
| VExprContextSPtr ctx; |
| if (!it->second.nodes.empty()) { |
| RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx)); |
| RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc)); |
| RETURN_IF_ERROR(ctx->open(_state)); |
| } |
| _dest_vexpr_ctx.emplace_back(ctx); |
| _dest_slot_name_to_idx[slot_desc->col_name()] = idx++; |
| |
| if (has_slot_id_map) { |
| auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id()); |
| if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) { |
| _src_slot_descs_order_by_dest.emplace_back(nullptr); |
| } else { |
| auto _src_slot_it = full_src_slot_map.find(it1->second); |
| if (_src_slot_it == std::end(full_src_slot_map)) { |
| return Status::InternalError("No src slot {} in src slot descs", |
| it1->second); |
| } |
| _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(), |
| full_src_index_map[_src_slot_it->first]); |
| _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second); |
| } |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| bool FileScanner::_should_enable_condition_cache() { |
| DCHECK(_should_enable_condition_cache_handler != nullptr); |
| return _condition_cache_digest != 0 && (this->*_should_enable_condition_cache_handler)() && |
| (!_conjuncts.empty() || !_push_down_conjuncts.empty()); |
| } |
| |
| bool FileScanner::_should_enable_condition_cache_for_load() const { |
| return false; |
| } |
| |
| bool FileScanner::_should_enable_condition_cache_for_query() const { |
| return true; |
| } |
| |
| bool FileScanner::_should_push_down_predicates(TFileFormatType::type format_type) const { |
| DCHECK(_should_push_down_predicates_handler != nullptr); |
| return (this->*_should_push_down_predicates_handler)(format_type); |
| } |
| |
| bool FileScanner::_should_push_down_predicates_for_load(TFileFormatType::type format_type) const { |
| static_cast<void>(format_type); |
| return false; |
| } |
| |
| bool FileScanner::_should_push_down_predicates_for_query(TFileFormatType::type format_type) const { |
| // JNI readers handle predicate conversion in their own paths. |
| return format_type != TFileFormatType::FORMAT_JNI; |
| } |
| |
| void FileScanner::_init_reader_condition_cache() { |
| _condition_cache = nullptr; |
| _condition_cache_ctx = nullptr; |
| |
| if (!_should_enable_condition_cache() || !_cur_reader) { |
| return; |
| } |
| |
| // Disable condition cache when delete operations exist (e.g. Iceberg position/equality |
| // deletes, Hive ACID deletes). Cached granule results may become stale if delete files |
| // change between queries while the data file's cache key remains the same. |
| if (_cur_reader->has_delete_operations()) { |
| return; |
| } |
| |
| auto* cache = segment_v2::ConditionCache::instance(); |
| _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey( |
| _current_range.path, |
| _current_range.__isset.modification_time ? _current_range.modification_time : 0, |
| _current_range.__isset.file_size ? _current_range.file_size : -1, |
| _condition_cache_digest, |
| _current_range.__isset.start_offset ? _current_range.start_offset : 0, |
| _current_range.__isset.size ? _current_range.size : -1); |
| |
| segment_v2::ConditionCacheHandle handle; |
| auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle); |
| if (condition_cache_hit) { |
| _condition_cache = handle.get_filter_result(); |
| _condition_cache_hit_count++; |
| } else { |
| // Allocate cache pre-sized to total number of granules. |
| // We add +1 as a safety margin: when a file is split across multiple scanners |
| // and the first row of this scanner's range is not aligned to a granule boundary, |
| // the data may span one more granule than ceil(total_rows / GRANULE_SIZE). |
| // The extra element costs only 1 bit and never affects correctness (an extra |
| // false-granule beyond the actual data range won't overlap any real row range). |
| int64_t total_rows = _cur_reader->get_total_rows(); |
| if (total_rows > 0) { |
| size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) / |
| ConditionCacheContext::GRANULE_SIZE; |
| _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false); |
| } |
| } |
| |
| if (_condition_cache) { |
| // Create context to pass to readers (native readers use it; non-native readers ignore it) |
| _condition_cache_ctx = std::make_shared<ConditionCacheContext>(); |
| _condition_cache_ctx->is_hit = condition_cache_hit; |
| _condition_cache_ctx->filter_result = _condition_cache; |
| _cur_reader->set_condition_cache_context(_condition_cache_ctx); |
| } |
| } |
| |
| void FileScanner::_finalize_reader_condition_cache() { |
| if (!_should_enable_condition_cache() || !_condition_cache_ctx || |
| _condition_cache_ctx->is_hit) { |
| _condition_cache = nullptr; |
| _condition_cache_ctx = nullptr; |
| return; |
| } |
| // Only store the cache if the reader was fully consumed. If the scan was |
| // truncated early (e.g. by LIMIT), the cache is incomplete — unread granules |
| // would remain false and cause surviving rows to be incorrectly skipped on HIT. |
| if (!_cur_reader_eof) { |
| _condition_cache = nullptr; |
| _condition_cache_ctx = nullptr; |
| return; |
| } |
| |
| auto* cache = segment_v2::ConditionCache::instance(); |
| cache->insert(_condition_cache_key, std::move(_condition_cache)); |
| _condition_cache = nullptr; |
| _condition_cache_ctx = nullptr; |
| } |
| |
| Status FileScanner::close(RuntimeState* state) { |
| if (!_try_close()) { |
| return Status::OK(); |
| } |
| |
| _finalize_reader_condition_cache(); |
| |
| if (_cur_reader) { |
| RETURN_IF_ERROR(_cur_reader->close()); |
| } |
| |
| RETURN_IF_ERROR(Scanner::close(state)); |
| return Status::OK(); |
| } |
| |
| void FileScanner::try_stop() { |
| Scanner::try_stop(); |
| if (_io_ctx) { |
| _io_ctx->should_stop = true; |
| } |
| } |
| |
| void FileScanner::update_realtime_counters() { |
| FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state); |
| |
| COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes); |
| COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows); |
| |
| _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows( |
| _file_reader_stats->read_rows); |
| _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes( |
| _file_reader_stats->read_bytes); |
| |
| int64_t delta_bytes_read_from_local = |
| _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local; |
| int64_t delta_bytes_read_from_remote = |
| _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote; |
| if (_file_cache_statistics->bytes_read_from_local == 0 && |
| _file_cache_statistics->bytes_read_from_remote == 0) { |
| _state->get_query_ctx() |
| ->resource_ctx() |
| ->io_context() |
| ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes); |
| DorisMetrics::instance()->query_scan_bytes_from_local->increment( |
| _file_reader_stats->read_bytes); |
| } else { |
| _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage( |
| delta_bytes_read_from_local); |
| _state->get_query_ctx() |
| ->resource_ctx() |
| ->io_context() |
| ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote); |
| DorisMetrics::instance()->query_scan_bytes_from_local->increment( |
| delta_bytes_read_from_local); |
| DorisMetrics::instance()->query_scan_bytes_from_remote->increment( |
| delta_bytes_read_from_remote); |
| } |
| |
| COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes); |
| |
| DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes); |
| DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows); |
| |
| _file_reader_stats->read_bytes = 0; |
| _file_reader_stats->read_rows = 0; |
| |
| _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local; |
| _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote; |
| } |
| |
| void FileScanner::_collect_profile_before_close() { |
| Scanner::_collect_profile_before_close(); |
| if (config::enable_file_cache && _state->query_options().enable_file_cache && |
| _profile != nullptr) { |
| io::FileCacheProfileReporter cache_profile(_profile); |
| cache_profile.update(_file_cache_statistics.get()); |
| _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache( |
| _file_cache_statistics->bytes_write_into_cache); |
| } |
| |
| if (_cur_reader != nullptr) { |
| _cur_reader->collect_profile_before_close(); |
| } |
| |
| FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state); |
| COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes); |
| COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows); |
| |
| COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes); |
| COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls); |
| COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns); |
| COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count); |
| if (_io_ctx) { |
| COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, |
| _io_ctx->condition_cache_filtered_rows); |
| } |
| |
| DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes); |
| DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows); |
| } |
| |
| } // namespace doris |