blob: e3ef634c726fe4720412e4088183dfb0c5d06647 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/scan/file_scanner.h"
#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <glog/logging.h>
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <map>
#include <ranges>
#include <tuple>
#include <unordered_map>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/consts.h"
#include "common/logging.h"
#include "common/status.h"
#include "core/block/column_with_type_and_name.h"
#include "core/block/columns_with_type_and_name.h"
#include "core/column/column.h"
#include "core/column/column_nullable.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_string.h"
#include "core/string_ref.h"
#include "exec/common/stringop_substring.h"
#include "exec/rowid_fetcher.h"
#include "exec/scan/scan_node.h"
#include "exprs/aggregate/aggregate_function.h"
#include "exprs/function/function.h"
#include "exprs/function/simple_function_factory.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
#include "exprs/vslot_ref.h"
#include "format/arrow/arrow_stream_reader.h"
#include "format/count_reader.h"
#include "format/csv/csv_reader.h"
#include "format/json/new_json_reader.h"
#include "format/native/native_reader.h"
#include "format/orc/vorc_reader.h"
#include "format/parquet/vparquet_reader.h"
#include "format/table/es/es_http_reader.h"
#include "format/table/hive_reader.h"
#include "format/table/hudi_jni_reader.h"
#include "format/table/hudi_reader.h"
#include "format/table/iceberg_reader.h"
#include "format/table/iceberg_sys_table_jni_reader.h"
#include "format/table/jdbc_jni_reader.h"
#include "format/table/max_compute_jni_reader.h"
#include "format/table/paimon_cpp_reader.h"
#include "format/table/paimon_jni_reader.h"
#include "format/table/paimon_predicate_converter.h"
#include "format/table/paimon_reader.h"
#include "format/table/remote_doris_reader.h"
#include "format/table/transactional_hive_reader.h"
#include "format/table/trino_connector_jni_reader.h"
#include "format/text/text_reader.h"
#ifdef BUILD_RUST_READERS
#include "format/lance/lance_rust_reader.h"
#endif
#include "io/cache/block_file_cache_profile.h"
#include "load/group_commit/wal/wal_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
namespace cctz {
class time_zone;
} // namespace cctz
namespace doris {
class ShardedKVCache;
} // namespace doris
namespace doris {
using namespace ErrorCode;
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
std::shared_ptr<SplitSourceConnector> split_source,
RuntimeProfile* profile, ShardedKVCache* kv_cache,
const std::unordered_map<std::string, int>* colname_to_slot_id)
: Scanner(state, local_state, limit, profile),
_split_source(split_source),
_cur_reader(nullptr),
_cur_reader_eof(false),
_kv_cache(kv_cache),
_strict_mode(false),
_col_name_to_slot_id(colname_to_slot_id) {
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
_params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
} else {
// old fe thrift protocol
_params = _split_source->get_params();
}
if (_params->__isset.strict_mode) {
_strict_mode = _params->strict_mode;
}
// For load scanner, there are input and output tuple.
// For query scanner, there is only output tuple
_input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
_real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
_is_load = (_input_tuple_desc != nullptr);
_configure_file_scan_handlers();
}
void FileScanner::_configure_file_scan_handlers() {
if (_is_load) {
_init_src_block_handler = &FileScanner::_init_src_block_for_load;
_process_src_block_after_read_handler =
&FileScanner::_process_src_block_after_read_for_load;
_should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_load;
_should_enable_condition_cache_handler =
&FileScanner::_should_enable_condition_cache_for_load;
} else {
_init_src_block_handler = &FileScanner::_init_src_block_for_query;
_process_src_block_after_read_handler =
&FileScanner::_process_src_block_after_read_for_query;
_should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_query;
_should_enable_condition_cache_handler =
&FileScanner::_should_enable_condition_cache_for_query;
}
}
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
RETURN_IF_ERROR(Scanner::init(state, conjuncts));
_get_block_timer =
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
_cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerCastInputBlockTime", 1);
_fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerFillMissingColumnTime", 1);
_pre_filter_timer =
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
_convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerConvertOuputBlockTime", 1);
_runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
_local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
_empty_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
_not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"NotFoundFileNum", TUnit::UNIT, 1);
_fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"FullySkippedFileNum", TUnit::UNIT, 1);
_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
_file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
FileReadBytesProfile, TUnit::BYTES, 1);
_file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"FileReadCalls", TUnit::UNIT, 1);
_file_read_time_counter =
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
_runtime_filter_partition_pruned_range_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
_file_cache_statistics.reset(new io::FileCacheStatistics());
_file_reader_stats.reset(new io::FileReaderStats());
RETURN_IF_ERROR(_init_io_ctx());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
_io_ctx->file_reader_stats = _file_reader_stats.get();
_io_ctx->is_disposable = _state->query_options().disable_file_cache;
if (_is_load) {
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_input_tuple_desc->id()})));
// prepare pre filters
if (_params->__isset.pre_filter_exprs_list) {
RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
_pre_conjunct_ctxs));
} else if (_params->__isset.pre_filter_exprs) {
VExprContextSPtr context;
RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
_pre_conjunct_ctxs.emplace_back(context);
}
for (auto& conjunct : _pre_conjunct_ctxs) {
RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
RETURN_IF_ERROR(conjunct->open(_state));
}
_dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_output_tuple_desc->id()})));
}
_default_val_row_desc.reset(
new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
return Status::OK();
}
// check if the expr is a partition pruning expr
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
if (expr->is_slot_ref()) {
auto* slot_ref = static_cast<VSlotRef*>(expr.get());
return _partition_slot_index_map.find(slot_ref->slot_id()) !=
_partition_slot_index_map.end();
}
if (expr->is_literal()) {
return true;
}
return std::ranges::all_of(expr->children(), [this](const auto& child) {
return _check_partition_prune_expr(child);
});
}
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
_runtime_filter_partition_prune_ctxs.clear();
for (auto& conjunct : _conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto expr = impl ? impl : conjunct->root();
if (_check_partition_prune_expr(expr)) {
_runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
}
}
}
void FileScanner::_init_runtime_filter_partition_prune_block() {
// init block with empty column
for (auto const* slot_desc : _real_tuple_desc->slots()) {
_runtime_filter_partition_prune_block.insert(
ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}
}
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
SCOPED_TIMER(_runtime_filter_partition_prune_timer);
if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
return Status::OK();
}
size_t partition_value_column_size = 1;
// 1. Get partition key values to string columns.
std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
for (auto const& partition_col_desc : _partition_col_descs) {
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
auto data_type = partition_slot_desc->get_data_type_ptr();
auto test_serde = data_type->get_serde();
auto partition_value_column = data_type->create_column();
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
Slice slice(partition_value.data(), partition_value.size());
uint64_t num_deserialized = 0;
DataTypeSerDe::FormatOptions options {};
if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
// for iceberg/paimon table
// NOTICE: column is always be nullable for iceberg/paimon table now
DCHECK(data_type->is_nullable());
test_serde = test_serde->get_nested_serdes()[0];
auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
if (_partition_value_is_null[partition_slot_desc->col_name()]) {
null_column->insert_many_defaults(partition_value_column_size);
} else {
// If the partition value is not null, we set null map to 0 and deserialize it normally.
null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
null_column->get_nested_column(), slice, partition_value_column_size,
&num_deserialized, options));
}
} else {
// for hive/hudi table, the null value is set as "\\N"
// TODO: this will be unified as iceberg/paimon table in the future
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, options));
}
partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}
// 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
// 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
size_t index = 0;
bool first_column_filled = false;
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (partition_slot_id_to_column.find(slot_desc->id()) !=
partition_slot_id_to_column.end()) {
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(
ColumnNullable::create(
std::move(partition_value_column),
ColumnUInt8::create(partition_value_column_size, 0)),
data_type, slot_desc->col_name()));
} else {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
slot_desc->col_name()));
}
if (index == 0) {
first_column_filled = true;
}
}
index++;
}
// 2.2 Execute conjuncts.
if (!first_column_filled) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
_runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
partition_value_column_size);
}
IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
&_runtime_filter_partition_prune_block,
&result_filter, &can_filter_all));
return Status::OK();
}
Status FileScanner::_process_conjuncts() {
_slot_id_to_filter_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
for (auto& conjunct : _push_down_conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto cur_expr = impl ? impl : conjunct->root();
std::vector<int> slot_ids;
_get_slot_ids(cur_expr.get(), &slot_ids);
if (slot_ids.empty()) {
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
continue;
}
bool single_slot = true;
for (int i = 1; i < slot_ids.size(); i++) {
if (slot_ids[i] != slot_ids[0]) {
single_slot = false;
break;
}
}
if (single_slot) {
SlotId slot_id = slot_ids[0];
_slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
} else {
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
}
}
return Status::OK();
}
Status FileScanner::_process_late_arrival_conjuncts() {
if (_push_down_conjuncts.size() < _conjuncts.size()) {
_push_down_conjuncts = _conjuncts;
// Do not clear _conjuncts here!
// We must keep it for fallback filtering, especially when mixing
// Native readers (which use _push_down_conjuncts) and JNI readers (which rely on _conjuncts).
// _conjuncts.clear();
RETURN_IF_ERROR(_process_conjuncts());
}
if (_applied_rf_num == _total_rf_num) {
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
}
return Status::OK();
}
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
for (auto& child_expr : expr->children()) {
if (child_expr->is_slot_ref()) {
VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
slot_desc->set_is_predicate(true);
slot_ids->emplace_back(slot_ref->slot_id());
} else {
_get_slot_ids(child_expr.get(), slot_ids);
}
}
}
Status FileScanner::_open_impl(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(Scanner::_open_impl(state));
if (_local_state) {
_condition_cache_digest = _local_state->get_condition_cache_digest();
}
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
if (_first_scan_range) {
RETURN_IF_ERROR(_init_expr_ctxes());
if (_state->query_options().enable_runtime_filter_partition_prune &&
!_partition_slot_index_map.empty()) {
_init_runtime_filter_partition_prune_ctxs();
_init_runtime_filter_partition_prune_block();
}
} else {
// there's no scan range in split source. stop scanner directly.
_scanner_eof = true;
}
return Status::OK();
}
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
Status st = _get_block_wrapped(state, block, eof);
if (!st.ok()) {
// add cur path in error msg for easy debugging
return std::move(st.append(". cur path: " + get_current_scan_range_name()));
}
return st;
}
// For query:
// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x - x
// get_next_block x x x - - - x
// _cast_to_input_block - - - - - - -
// _fill_columns_from_path - - - - x - x
// _fill_missing_columns - - - x - - x
// _convert_to_output_block - - - - - - -
//
// For load:
// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x x -
// get_next_block x x x - - x -
// _cast_to_input_block x x x - - x -
// _fill_columns_from_path - - - - x x -
// _fill_missing_columns - - - x - x -
// _convert_to_output_block - - - - - - x
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
do {
RETURN_IF_CANCELLED(state);
if (_cur_reader == nullptr || _cur_reader_eof) {
_finalize_reader_condition_cache();
// The file may not exist because the file list is got from meta cache,
// And the file may already be removed from storage.
// Just ignore not found files.
Status st = _get_next_reader();
if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
_cur_reader_eof = true;
COUNTER_UPDATE(_not_found_file_counter, 1);
continue;
} else if (st.is<ErrorCode::END_OF_FILE>()) {
_cur_reader_eof = true;
COUNTER_UPDATE(_fully_skipped_file_counter, 1);
continue;
} else if (!st) {
return st;
}
_init_reader_condition_cache();
}
if (_scanner_eof) {
*eof = true;
return Status::OK();
}
// Init src block for load job based on the data file schema (e.g. parquet)
// For query job, simply set _src_block_ptr to block.
size_t read_rows = 0;
RETURN_IF_ERROR(_init_src_block(block));
{
SCOPED_TIMER(_get_block_timer);
// Read next block.
// Some of column in block may not be filled (column not exist in file)
RETURN_IF_ERROR(
_cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
}
// use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
// may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
if (read_rows > 0) {
if ((!_cur_reader->count_read_rows()) && _io_ctx) {
_io_ctx->file_reader_stats->read_rows += read_rows;
}
// If the push_down_agg_type is COUNT, no need to do the rest,
// because we only save a number in block.
if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
RETURN_IF_ERROR(_process_src_block_after_read(block));
}
}
break;
} while (true);
// Update filtered rows and unselected rows for load, reset counter.
// {
// state->update_num_rows_load_filtered(_counter.num_rows_filtered);
// state->update_num_rows_load_unselected(_counter.num_rows_unselected);
// _reset_counter();
// }
return Status::OK();
}
/**
* Check whether there are complex types in parquet/orc reader in broker/stream load.
* Broker/stream load will cast any type as string type, and complex types will be casted wrong.
* This is a temporary method, and will be replaced by tvf.
*/
Status FileScanner::_check_output_block_types() {
// Only called from _init_src_block_for_load, so _is_load is always true.
TFileFormatType::type format_type = _params->format_type;
if (format_type == TFileFormatType::FORMAT_PARQUET ||
format_type == TFileFormatType::FORMAT_ORC) {
for (auto slot : _output_tuple_desc->slots()) {
if (is_complex_type(slot->type()->get_primitive_type())) {
return Status::InternalError(
"Parquet/orc doesn't support complex types in broker/stream load, "
"please use tvf(table value function) to insert complex types.");
}
}
}
return Status::OK();
}
Status FileScanner::_init_src_block(Block* block) {
DCHECK(_init_src_block_handler != nullptr);
return (this->*_init_src_block_handler)(block);
}
Status FileScanner::_init_src_block_for_query(Block* block) {
_src_block_ptr = block;
// Build name to index map only once on first call.
if (_src_block_name_to_idx.empty()) {
_src_block_name_to_idx = block->get_name_to_pos_map();
}
return Status::OK();
}
Status FileScanner::_init_src_block_for_load(Block* block) {
static_cast<void>(block);
RETURN_IF_ERROR(_check_output_block_types());
// if (_src_block_init) {
// _src_block.clear_column_data();
// _src_block_ptr = &_src_block;
// return Status::OK();
// }
_src_block.clear();
uint32_t idx = 0;
// slots in _input_tuple_desc contains all slots describe in load statement, eg:
// -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
// _input_tuple_desc will contains: k1, k2, tmp1
// and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
// _input_tuple_desc also contains columns from path
for (auto& slot : _input_tuple_desc->slots()) {
DataTypePtr data_type;
auto it = _slot_lower_name_to_col_type.find(slot->col_name());
if (slot->is_skip_bitmap_col()) {
_skip_bitmap_col_idx = idx;
}
if (_params->__isset.sequence_map_col) {
if (_params->sequence_map_col == slot->col_name()) {
_sequence_map_col_uid = slot->col_unique_id();
}
}
data_type =
it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
MutableColumnPtr data_column = data_type->create_column();
_src_block.insert(
ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
_src_block_name_to_idx.emplace(slot->col_name(), idx++);
}
if (_params->__isset.sequence_map_col) {
for (const auto& slot : _output_tuple_desc->slots()) {
// When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
// so we should get its column unique id from _output_tuple_desc
if (slot->is_sequence_col()) {
_sequence_col_uid = slot->col_unique_id();
}
}
}
_src_block_ptr = &_src_block;
_src_block_init = true;
return Status::OK();
}
Status FileScanner::_cast_to_input_block(Block* block) {
// Only called from _process_src_block_after_read_for_load, so _is_load is always true.
SCOPED_TIMER(_cast_to_input_block_timer);
// cast primitive type(PT0) to primitive type(PT1)
uint32_t idx = 0;
for (auto& slot_desc : _input_tuple_desc->slots()) {
if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
_slot_lower_name_to_col_type.end()) {
// skip columns which does not exist in file
continue;
}
auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
auto return_type = slot_desc->get_data_type_ptr();
// remove nullable here, let the get_function decide whether nullable
auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
ColumnsWithTypeAndName arguments {
arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
auto func_cast =
SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
if (!func_cast) {
return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
arg.type->get_name(), slot_desc->col_name(),
return_type->get_name());
}
idx = _src_block_name_to_idx[slot_desc->col_name()];
DCHECK(_state != nullptr);
auto ctx = FunctionContext::create_context(_state, {}, {});
RETURN_IF_ERROR(
func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
}
return Status::OK();
}
Status FileScanner::_pre_filter_src_block() {
// Only called from _process_src_block_after_read_for_load, so _is_load is always true.
if (!_pre_conjunct_ctxs.empty()) {
SCOPED_TIMER(_pre_filter_timer);
auto origin_column_num = _src_block_ptr->columns();
auto old_rows = _src_block_ptr->rows();
RETURN_IF_ERROR(
VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
_counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
}
return Status::OK();
}
Status FileScanner::_convert_to_output_block(Block* block) {
// Only called from _process_src_block_after_read_for_load, so _is_load is always true.
SCOPED_TIMER(_convert_to_output_block_timer);
// The block is passed from scanner context's free blocks,
// which is initialized by output columns
// so no need to clear it
// block->clear();
int ctx_idx = 0;
size_t rows = _src_block_ptr->rows();
auto filter_column = ColumnUInt8::create(rows, 1);
auto& filter_map = filter_column->get_data();
// After convert, the column_ptr should be copied into output block.
// Can not use block->insert() because it may cause use_count() non-zero bug
MutableBlock mutable_output_block =
VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
auto& mutable_output_columns = mutable_output_block.mutable_columns();
std::vector<BitmapValue>* skip_bitmaps {nullptr};
if (_should_process_skip_bitmap_col()) {
auto* skip_bitmap_nullable_col_ptr =
assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
.column->assume_mutable()
.get());
skip_bitmaps = &(assert_cast<ColumnBitmap*>(
skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
->get_data());
// NOTE:
// - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
// __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
// - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
// so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
// So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
if (_sequence_map_col_uid != -1) {
for (int j = 0; j < rows; ++j) {
if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
(*skip_bitmaps)[j].add(_sequence_col_uid);
}
}
}
}
// for (auto slot_desc : _output_tuple_desc->slots()) {
for (int j = 0; j < mutable_output_columns.size(); ++j) {
auto* slot_desc = _output_tuple_desc->slots()[j];
int dest_index = ctx_idx;
ColumnPtr column_ptr;
auto& ctx = _dest_vexpr_ctx[dest_index];
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
// column_ptr maybe a ColumnConst, convert it to a normal column
column_ptr = column_ptr->convert_to_full_column_if_const();
DCHECK(column_ptr);
// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
// is likely to be nullable
if (LIKELY(column_ptr->is_nullable())) {
const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
for (int i = 0; i < rows; ++i) {
if (filter_map[i] && nullable_column->is_null_at(i)) {
// skip checks for non-mentioned columns in flexible partial update
if (skip_bitmaps == nullptr ||
!skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
// clang-format off
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
!_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
filter_map[i] = false;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
auto raw_value =
_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
std::string raw_string = raw_value.to_string();
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
slot_desc->col_name(), _strict_mode, raw_string);
return fmt::to_string(error_msg);
}));
} else if (!slot_desc->is_nullable()) {
filter_map[i] = false;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
return fmt::to_string(error_msg);
}));
}
// clang-format on
}
}
}
if (!slot_desc->is_nullable()) {
column_ptr = remove_nullable(column_ptr);
}
} else if (slot_desc->is_nullable()) {
column_ptr = make_nullable(column_ptr);
}
mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
ctx_idx++;
}
// after do the dest block insert operation, clear _src_block to remove the reference of origin column
_src_block_ptr->clear();
size_t dest_size = block->columns();
// do filter
block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
"filter column"));
RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
_counter.num_rows_filtered += rows - block->rows();
return Status::OK();
}
Status FileScanner::_process_src_block_after_read(Block* block) {
DCHECK(_process_src_block_after_read_handler != nullptr);
return (this->*_process_src_block_after_read_handler)(block);
}
Status FileScanner::_process_src_block_after_read_for_query(Block* block) {
// Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
// This is needed for external table queries with truncate_char_or_varchar_columns=true.
RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
return Status::OK();
}
Status FileScanner::_process_src_block_after_read_for_load(Block* block) {
// Convert the src block columns type in-place.
RETURN_IF_ERROR(_cast_to_input_block(block));
// All readers fill partition and missing columns inside get_next_block:
// ORC/Parquet: in _do_get_next_block via on_fill_partition_columns/on_fill_missing_columns
// CSV/JSON/others: in on_after_read_block via fill_remaining_columns
// Assert all columns have consistent row counts.
if (_src_block_ptr->columns() > 0) {
size_t rows = _src_block_ptr->get_by_position(0).column->size();
for (size_t i = 1; i < _src_block_ptr->columns(); ++i) {
DCHECK_EQ(_src_block_ptr->get_by_position(i).column->size(), rows)
<< "Column " << _src_block_ptr->get_by_position(i).name << " has "
<< _src_block_ptr->get_by_position(i).column->size() << " rows, expected "
<< rows;
}
}
// Apply _pre_conjunct_ctxs to filter src block.
RETURN_IF_ERROR(_pre_filter_src_block());
// Convert src block to output block (dest block), then apply filters.
RETURN_IF_ERROR(_convert_to_output_block(block));
// Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
return Status::OK();
}
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
// Truncate char columns or varchar columns if size is smaller than file columns
// or not found in the file column schema.
if (!_state->query_options().truncate_char_or_varchar_columns) {
return Status::OK();
}
int idx = 0;
for (auto* slot_desc : _real_tuple_desc->slots()) {
const auto& type = slot_desc->type();
if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
++idx;
continue;
}
auto iter = _source_file_col_name_types.find(slot_desc->col_name());
if (iter != _source_file_col_name_types.end()) {
const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
int l = -1;
if (auto* ftype = check_and_get_data_type<DataTypeString>(
remove_nullable(file_type_desc).get())) {
l = ftype->len();
}
if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
(assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
l < 0)) {
_truncate_char_or_varchar_column(
block, idx,
assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
}
} else {
_truncate_char_or_varchar_column(
block, idx,
assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
}
++idx;
}
return Status::OK();
}
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
auto int_type = std::make_shared<DataTypeInt32>();
uint32_t num_columns_without_result = block->columns();
const ColumnNullable* col_nullable =
assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
block->replace_by_position(idx, std::move(string_column_ptr));
block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
"const 1"}); // pos is 1
block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
fmt::format("const {}", len)}); // len
block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
ColumnNumbers temp_arguments(3);
temp_arguments[0] = idx; // str column
temp_arguments[1] = num_columns_without_result; // pos
temp_arguments[2] = num_columns_without_result + 1; // len
uint32_t result_column_id = num_columns_without_result + 2;
SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
null_map_column_ptr);
block->replace_by_position(idx, std::move(res));
Block::erase_useless_column(block, num_columns_without_result);
}
std::shared_ptr<segment_v2::RowIdColumnIteratorV2> FileScanner::_create_row_id_column_iterator() {
auto& id_file_map = _state->get_id_file_map();
auto file_id = id_file_map->get_file_mapping_id(
std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
_current_range, _should_enable_file_meta_cache()));
return std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION,
BackendOptions::get_backend_id(), file_id);
}
void FileScanner::_fill_base_init_context(ReaderInitContext* ctx) {
ctx->column_descs = &_column_descs;
ctx->col_name_to_block_idx = &_src_block_name_to_idx;
ctx->state = _state;
ctx->tuple_descriptor = _real_tuple_desc;
ctx->row_descriptor = _default_val_row_desc.get();
ctx->params = _params;
ctx->range = &_current_range;
ctx->table_info_node = TableSchemaChangeHelper::ConstNode::get_instance();
ctx->push_down_agg_type = _get_push_down_agg_type();
}
Status FileScanner::_get_next_reader() {
while (true) {
if (_cur_reader) {
_cur_reader->collect_profile_before_close();
RETURN_IF_ERROR(_cur_reader->close());
_state->update_num_finished_scan_range(1);
}
_cur_reader.reset(nullptr);
_src_block_init = false;
bool has_next = _first_scan_range;
if (!_first_scan_range) {
RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
}
_first_scan_range = false;
if (!has_next || _should_stop) {
_scanner_eof = true;
return Status::OK();
}
const TFileRangeDesc& range = _current_range;
_current_range_path = range.path;
if (!_partition_slot_index_map.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_partition_columns());
if (_state->query_options().enable_runtime_filter_partition_prune) {
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
// by runtime filter partition prune
if (_push_down_conjuncts.size() < _conjuncts.size()) {
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
_init_runtime_filter_partition_prune_ctxs();
}
bool can_filter_all = false;
RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
if (can_filter_all) {
// this range can be filtered out by runtime filter partition pruning
// so we need to skip this range
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
continue;
}
}
}
// create reader for specific format
Status init_status = Status::OK();
TFileFormatType::type format_type = _get_current_format_type();
// for compatibility, this logic is deprecated in 3.1
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
if (range.table_format_params.table_format_type == "paimon" &&
!range.table_format_params.paimon_params.__isset.paimon_split) {
// use native reader
auto format = range.table_format_params.paimon_params.file_format;
if (format == "orc") {
format_type = TFileFormatType::FORMAT_ORC;
} else if (format == "parquet") {
format_type = TFileFormatType::FORMAT_PARQUET;
} else {
return Status::InternalError("Not supported paimon file format: {}", format);
}
}
}
bool push_down_predicates = _should_push_down_predicates(format_type);
bool need_to_get_parsed_schema = false;
switch (format_type) {
case TFileFormatType::FORMAT_JNI: {
ReaderInitContext jni_ctx;
_fill_base_init_context(&jni_ctx);
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "max_compute") {
const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
_real_tuple_desc->table_desc());
if (!mc_desc->init_status()) {
return mc_desc->init_status();
}
std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
range, _state, _profile);
init_status = static_cast<GenericReader*>(mc_reader.get())->init_reader(&jni_ctx);
_cur_reader = std::move(mc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
if (_state->query_options().__isset.enable_paimon_cpp_reader &&
_state->query_options().enable_paimon_cpp_reader) {
auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
_profile, range, _params);
if (!_is_load && !_push_down_conjuncts.empty()) {
PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
auto predicate = predicate_converter.build(_push_down_conjuncts);
if (predicate) {
cpp_reader->set_predicate(std::move(predicate));
}
}
init_status =
static_cast<GenericReader*>(cpp_reader.get())->init_reader(&jni_ctx);
_cur_reader = std::move(cpp_reader);
} else {
auto paimon_reader = PaimonJniReader::create_unique(_file_slot_descs, _state,
_profile, range, _params);
init_status =
static_cast<GenericReader*>(paimon_reader.get())->init_reader(&jni_ctx);
_cur_reader = std::move(paimon_reader);
}
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
auto hudi_reader = HudiJniReader::create_unique(
*_params, range.table_format_params.hudi_params, _file_slot_descs, _state,
_profile);
init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&jni_ctx);
_cur_reader = std::move(hudi_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "trino_connector") {
auto trino_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
_profile, range);
init_status =
static_cast<GenericReader*>(trino_reader.get())->init_reader(&jni_ctx);
_cur_reader = std::move(trino_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "jdbc") {
// Extract jdbc params from table_format_params
std::map<std::string, std::string> jdbc_params(
range.table_format_params.jdbc_params.begin(),
range.table_format_params.jdbc_params.end());
auto jdbc_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
jdbc_params);
init_status = static_cast<GenericReader*>(jdbc_reader.get())->init_reader(&jni_ctx);
_cur_reader = std::move(jdbc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
auto iceberg_sys_reader = IcebergSysTableJniReader::create_unique(
_file_slot_descs, _state, _profile, range, _params);
init_status = static_cast<GenericReader*>(iceberg_sys_reader.get())
->init_reader(&jni_ctx);
_cur_reader = std::move(iceberg_sys_reader);
}
// Set col_name_to_block_idx for JNI readers to avoid repeated map creation
if (_cur_reader) {
if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
}
}
break;
}
case TFileFormatType::FORMAT_PARQUET: {
auto file_meta_cache_ptr = _should_enable_file_meta_cache()
? ExecEnv::GetInstance()->file_meta_cache()
: nullptr;
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
RETURN_IF_ERROR(_init_parquet_reader(file_meta_cache_ptr));
need_to_get_parsed_schema = true;
break;
}
case TFileFormatType::FORMAT_ORC: {
auto file_meta_cache_ptr = _should_enable_file_meta_cache()
? ExecEnv::GetInstance()->file_meta_cache()
: nullptr;
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr));
need_to_get_parsed_schema = true;
break;
}
case TFileFormatType::FORMAT_CSV_PLAIN:
case TFileFormatType::FORMAT_CSV_GZ:
case TFileFormatType::FORMAT_CSV_BZ2:
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
case TFileFormatType::FORMAT_PROTO: {
auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, _io_ctx.get());
CsvInitContext csv_ctx;
_fill_base_init_context(&csv_ctx);
csv_ctx.is_load = _is_load;
init_status = static_cast<GenericReader*>(reader.get())->init_reader(&csv_ctx);
_cur_reader = std::move(reader);
break;
}
case TFileFormatType::FORMAT_TEXT: {
auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, _io_ctx.get());
CsvInitContext text_ctx;
_fill_base_init_context(&text_ctx);
text_ctx.is_load = _is_load;
init_status = static_cast<GenericReader*>(reader.get())->init_reader(&text_ctx);
_cur_reader = std::move(reader);
break;
}
case TFileFormatType::FORMAT_JSON: {
_cur_reader =
NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, &_scanner_eof, _io_ctx.get());
JsonInitContext json_ctx;
_fill_base_init_context(&json_ctx);
json_ctx.col_default_value_ctx = &_col_default_value_ctx;
json_ctx.is_load = _is_load;
init_status = _cur_reader->init_reader(&json_ctx);
break;
}
case TFileFormatType::FORMAT_WAL: {
_cur_reader = WalReader::create_unique(_state);
WalInitContext wal_ctx;
_fill_base_init_context(&wal_ctx);
wal_ctx.output_tuple_descriptor = _output_tuple_desc;
init_status = _cur_reader->init_reader(&wal_ctx);
break;
}
case TFileFormatType::FORMAT_NATIVE: {
auto reader =
NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state);
ReaderInitContext native_ctx;
_fill_base_init_context(&native_ctx);
init_status = static_cast<GenericReader*>(reader.get())->init_reader(&native_ctx);
_cur_reader = std::move(reader);
need_to_get_parsed_schema = false;
break;
}
case TFileFormatType::FORMAT_ARROW: {
ReaderInitContext arrow_ctx;
_fill_base_init_context(&arrow_ctx);
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "remote_doris") {
auto doris_reader =
RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
init_status =
static_cast<GenericReader*>(doris_reader.get())->init_reader(&arrow_ctx);
if (doris_reader) {
doris_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
}
_cur_reader = std::move(doris_reader);
} else {
auto arrow_reader =
ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
range, _file_slot_descs, _io_ctx.get());
init_status =
static_cast<GenericReader*>(arrow_reader.get())->init_reader(&arrow_ctx);
_cur_reader = std::move(arrow_reader);
}
break;
}
#ifdef BUILD_RUST_READERS
case TFileFormatType::FORMAT_LANCE: {
auto lance_reader = LanceRustReader::create_unique(_file_slot_descs, _state, _profile,
range, _params);
init_status = lance_reader->init_reader();
_cur_reader = std::move(lance_reader);
need_to_get_parsed_schema = true;
break;
}
#endif
case TFileFormatType::FORMAT_ES_HTTP: {
_cur_reader = EsHttpReader::create_unique(_file_slot_descs, _state, _profile, range,
*_params, _real_tuple_desc);
init_status = static_cast<EsHttpReader*>(_cur_reader.get())->init_reader();
break;
}
default:
return Status::NotSupported("Not supported create reader for file format: {}.",
to_string(_params->format_type));
}
if (_cur_reader == nullptr) {
return Status::NotSupported(
"Not supported create reader for table format: {} / file format: {}.",
range.__isset.table_format_params ? range.table_format_params.table_format_type
: "NotSet",
to_string(_params->format_type));
}
COUNTER_UPDATE(_file_counter, 1);
// The FileScanner for external table may try to open not exist files,
// Because FE file cache for external table may out of date.
// So, NOT_FOUND for FileScanner is not a fail case.
// Will remove this after file reader refactor.
if (init_status.is<END_OF_FILE>()) {
COUNTER_UPDATE(_empty_file_counter, 1);
continue;
} else if (init_status.is<ErrorCode::NOT_FOUND>()) {
if (config::ignore_not_found_file_in_external_table) {
COUNTER_UPDATE(_not_found_file_counter, 1);
continue;
}
return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
} else if (!init_status.ok()) {
return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
}
// For table-level COUNT pushdown, offsets are undefined so we must skip
// _set_fill_or_truncate_columns (it uses [start_offset, end_offset] to
// filter row groups, which would produce incorrect empty results).
bool is_table_level_count = _get_push_down_agg_type() == TPushAggOp::type::COUNT &&
range.__isset.table_format_params &&
range.table_format_params.table_level_row_count >= 0;
if (!is_table_level_count) {
Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
continue;
} else if (!status.ok()) {
return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
status.to_string());
}
}
// Unified COUNT(*) pushdown: replace the real reader with CountReader
// decorator if the reader accepts COUNT and can provide a total row count.
if (_cur_reader->get_push_down_agg_type() == TPushAggOp::type::COUNT) {
int64_t total_rows = -1;
if (is_table_level_count) {
// FE-provided count (may account for table-format deletions)
total_rows = range.table_format_params.table_level_row_count;
} else if (_cur_reader->supports_count_pushdown()) {
// File metadata count (ORC footer / Parquet row groups)
total_rows = _cur_reader->get_total_rows();
}
if (total_rows >= 0) {
auto batch_size = _state->query_options().batch_size;
_cur_reader = std::make_unique<CountReader>(total_rows, batch_size,
std::move(_cur_reader));
}
}
_cur_reader_eof = false;
break;
}
return Status::OK();
}
Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
std::unique_ptr<ParquetReader> parquet_reader) {
const TFileRangeDesc& range = _current_range;
Status init_status = Status::OK();
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
_local_state
? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
: phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
// Build unified ParquetInitContext (shared by all Parquet reader variants)
ParquetInitContext pctx;
_fill_base_init_context(&pctx);
pctx.conjuncts = &_push_down_conjuncts;
pctx.slot_id_to_predicates = &slot_id_to_predicates;
pctx.colname_to_slot_id = _col_name_to_slot_id;
pctx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
pctx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
// IcebergParquetReader IS-A ParquetReader (CRTP mixin), no wrapping needed
std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
_kv_cache, _profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr);
iceberg_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
return _create_row_id_column_iterator();
});
init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&pctx);
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
// PaimonParquetReader IS-A ParquetReader, no wrapping needed
auto paimon_reader = PaimonParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _kv_cache, _io_ctx.get(), _state, file_meta_cache_ptr);
init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&pctx);
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
// HudiParquetReader IS-A ParquetReader, no wrapping needed
auto hudi_reader = HudiParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr);
init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&pctx);
_cur_reader = std::move(hudi_reader);
} else if (range.table_format_params.table_format_type == "hive") {
auto hive_reader = HiveParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _io_ctx.get(), _state, &_is_file_slot, file_meta_cache_ptr,
_state->query_options().enable_parquet_lazy_mat);
hive_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
return _create_row_id_column_iterator();
});
init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&pctx);
_cur_reader = std::move(hive_reader);
} else if (range.table_format_params.table_format_type == "tvf") {
if (!parquet_reader) {
parquet_reader = ParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
_state->query_options().enable_parquet_lazy_mat);
}
parquet_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
return _create_row_id_column_iterator();
});
init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
_cur_reader = std::move(parquet_reader);
} else if (_is_load) {
if (!parquet_reader) {
parquet_reader = ParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
_state->query_options().enable_parquet_lazy_mat);
}
init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
_cur_reader = std::move(parquet_reader);
}
return init_status;
}
Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr,
std::unique_ptr<OrcReader> orc_reader) {
const TFileRangeDesc& range = _current_range;
Status init_status = Status::OK();
// Build unified OrcInitContext (shared by all ORC reader variants)
OrcInitContext octx;
_fill_base_init_context(&octx);
octx.conjuncts = &_push_down_conjuncts;
octx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
octx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "transactional_hive") {
// TransactionalHiveReader IS-A OrcReader, no wrapping needed
auto tran_orc_reader = TransactionalHiveReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
tran_orc_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
return _create_row_id_column_iterator();
});
init_status = static_cast<GenericReader*>(tran_orc_reader.get())->init_reader(&octx);
_cur_reader = std::move(tran_orc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
// IcebergOrcReader IS-A OrcReader (CRTP mixin), no wrapping needed
std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
_kv_cache, _profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
iceberg_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
return _create_row_id_column_iterator();
});
init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&octx);
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
// PaimonOrcReader IS-A OrcReader, no wrapping needed
auto paimon_reader = PaimonOrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _kv_cache, _io_ctx.get(), file_meta_cache_ptr);
init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&octx);
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
// HudiOrcReader IS-A OrcReader, no wrapping needed
auto hudi_reader = HudiOrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&octx);
_cur_reader = std::move(hudi_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hive") {
auto hive_reader = HiveOrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr,
_state->query_options().enable_orc_lazy_mat);
hive_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
return _create_row_id_column_iterator();
});
init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&octx);
_cur_reader = std::move(hive_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "tvf") {
if (!orc_reader) {
orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
_state->query_options().enable_orc_lazy_mat);
}
orc_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
return _create_row_id_column_iterator();
});
init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
_cur_reader = std::move(orc_reader);
} else if (_is_load) {
if (!orc_reader) {
orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
_state->query_options().enable_orc_lazy_mat);
}
init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
_cur_reader = std::move(orc_reader);
}
return init_status;
}
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
_slot_lower_name_to_col_type.clear();
std::unordered_map<std::string, DataTypePtr> name_to_col_type;
RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type));
for (const auto& [col_name, col_type] : name_to_col_type) {
auto col_name_lower = to_lower(col_name);
if (_partition_col_descs.contains(col_name_lower)) {
/*
* `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
* generate columns of the corresponding type, which records the columns existing in the file.
*
* When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
* not match the slot type in `_output_tuple_desc`, causing an error when
* Serde `deserialize_one_cell_from_json` fills the partition values.
*
* So for partition column not need fill _slot_lower_name_to_col_type.
*/
continue;
}
_slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
}
RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
return Status::OK();
}
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
_source_file_col_name_types.clear();
// The col names and types of source file, such as parquet, orc files.
if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
std::vector<std::string> source_file_col_names;
std::vector<DataTypePtr> source_file_col_types;
Status status =
_cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
return status;
}
DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
for (int i = 0; i < source_file_col_names.size(); ++i) {
_source_file_col_name_types[to_lower(source_file_col_names[i])] =
source_file_col_types[i];
}
}
return Status::OK();
}
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
_current_range = range;
_file_cache_statistics.reset(new io::FileCacheStatistics());
_file_reader_stats.reset(new io::FileReaderStats());
_file_read_bytes_counter =
ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
_file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
RETURN_IF_ERROR(_init_io_ctx());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
_io_ctx->file_reader_stats = _file_reader_stats.get();
_default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
RETURN_IF_ERROR(_init_expr_ctxes());
// Since only one column is read from the file, there is no need to filter, so set these variables to empty.
_push_down_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
_slot_id_to_filter_conjuncts.clear();
_kv_cache = nullptr;
return Status::OK();
}
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
const std::list<int64_t>& row_ids, Block* result_block,
const ExternalFileMappingInfo& external_info,
int64_t* init_reader_ms, int64_t* get_block_ms) {
_current_range = range;
RETURN_IF_ERROR(_generate_partition_columns());
TFileFormatType::type format_type = _get_current_format_type();
Status init_status = Status::OK();
auto file_meta_cache_ptr = external_info.enable_file_meta_cache
? ExecEnv::GetInstance()->file_meta_cache()
: nullptr;
RETURN_IF_ERROR(scope_timer_run(
[&]() -> Status {
switch (format_type) {
case TFileFormatType::FORMAT_PARQUET: {
std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
_profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(),
_state, file_meta_cache_ptr, false);
RETURN_IF_ERROR(
_init_parquet_reader(file_meta_cache_ptr, std::move(parquet_reader)));
// _init_parquet_reader may create a new table-format specific reader
// (e.g., HiveParquetReader) that replaces the original parquet_reader.
// We need to re-apply read_by_rows to the actual _cur_reader.
RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
break;
}
case TFileFormatType::FORMAT_ORC: {
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(),
file_meta_cache_ptr, false);
RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr, std::move(orc_reader)));
// Same as above: re-apply read_by_rows to the actual _cur_reader.
RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
break;
}
default: {
return Status::NotSupported(
"Not support create lines reader for file format: {},"
"only support parquet and orc.",
to_string(_params->format_type));
}
}
return Status::OK();
},
init_reader_ms));
RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
_cur_reader_eof = false;
RETURN_IF_ERROR(scope_timer_run(
[&]() -> Status {
while (!_cur_reader_eof) {
bool eof = false;
RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
}
return Status::OK();
},
get_block_ms));
_cur_reader->collect_profile_before_close();
RETURN_IF_ERROR(_cur_reader->close());
COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
return Status::OK();
}
Status FileScanner::_generate_partition_columns() {
_partition_col_descs.clear();
_partition_value_is_null.clear();
const TFileRangeDesc& range = _current_range;
if (!range.__isset.columns_from_path_keys) {
return Status::OK();
}
std::unordered_map<std::string, int> partition_name_to_key_index;
int index = 0;
for (const auto& key : range.columns_from_path_keys) {
partition_name_to_key_index.emplace(key, index++);
}
// Iterate _column_descs to find PARTITION_KEY columns instead of _partition_slot_descs.
for (const auto& col_desc : _column_descs) {
if (col_desc.category != ColumnCategory::PARTITION_KEY) {
continue;
}
auto pit = partition_name_to_key_index.find(col_desc.name);
if (pit != partition_name_to_key_index.end()) {
int values_index = pit->second;
if (range.__isset.columns_from_path && values_index < range.columns_from_path.size()) {
_partition_col_descs.emplace(
col_desc.name,
std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc));
if (range.__isset.columns_from_path_is_null) {
_partition_value_is_null.emplace(col_desc.name,
range.columns_from_path_is_null[values_index]);
}
}
}
}
return Status::OK();
}
Status FileScanner::_init_expr_ctxes() {
std::map<SlotId, int> full_src_index_map;
std::map<SlotId, SlotDescriptor*> full_src_slot_map;
std::map<std::string, int> partition_name_to_key_index_map;
int index = 0;
for (const auto& slot_desc : _real_tuple_desc->slots()) {
full_src_slot_map.emplace(slot_desc->id(), slot_desc);
full_src_index_map.emplace(slot_desc->id(), index++);
}
// For external table query, find the index of column in path.
// Because query doesn't always search for all columns in a table
// and the order of selected columns is random.
// All ranges in _ranges vector should have identical columns_from_path_keys
// because they are all file splits for the same external table.
// So here use the first element of _ranges to fill the partition_name_to_key_index_map
if (_current_range.__isset.columns_from_path_keys) {
std::vector<std::string> key_map = _current_range.columns_from_path_keys;
if (!key_map.empty()) {
for (size_t i = 0; i < key_map.size(); i++) {
partition_name_to_key_index_map.emplace(key_map[i], i);
}
}
}
_num_of_columns_from_file = _params->num_of_columns_from_file;
for (const auto& slot_info : _params->required_slots) {
auto slot_id = slot_info.slot_id;
auto it = full_src_slot_map.find(slot_id);
if (it == std::end(full_src_slot_map)) {
return Status::InternalError(
fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
}
ColumnDescriptor col_desc;
col_desc.name = it->second->col_name();
col_desc.slot_desc = it->second;
// Read category from Thrift if available (new FE), otherwise fall back
// to slot_info.is_file_slot + partition_name_to_key_index_map for broker/stream load
// where the FE does not set TColumnCategory.
if (slot_info.__isset.category) {
switch (slot_info.category) {
case TColumnCategory::REGULAR:
col_desc.category = ColumnCategory::REGULAR;
break;
case TColumnCategory::PARTITION_KEY:
col_desc.category = ColumnCategory::PARTITION_KEY;
break;
case TColumnCategory::SYNTHESIZED:
col_desc.category = ColumnCategory::SYNTHESIZED;
break;
case TColumnCategory::GENERATED:
col_desc.category = ColumnCategory::GENERATED;
break;
}
} else if (partition_name_to_key_index_map.contains(it->second->col_name()) &&
!slot_info.is_file_slot) {
col_desc.category = ColumnCategory::PARTITION_KEY;
}
// Derive is_file_slot from category
bool is_file_slot = (col_desc.category == ColumnCategory::REGULAR ||
col_desc.category == ColumnCategory::GENERATED);
if (partition_name_to_key_index_map.contains(it->second->col_name())) {
if (_is_load) {
auto iti = full_src_index_map.find(slot_id);
_partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
} else {
auto kit = partition_name_to_key_index_map.find(it->second->col_name());
_partition_slot_index_map.emplace(slot_id, kit->second);
}
}
if (is_file_slot) {
_is_file_slot.emplace(slot_id);
_file_slot_descs.emplace_back(it->second);
_file_col_names.push_back(it->second->col_name());
}
_column_descs.push_back(col_desc);
}
// set column name to default value expr map
// new inline TFileScanSlotInfo.default_value_expr (preferred)
for (const auto& slot_info : _params->required_slots) {
auto slot_id = slot_info.slot_id;
auto it = full_src_slot_map.find(slot_id);
if (it == std::end(full_src_slot_map)) {
continue;
}
const std::string& col_name = it->second->col_name();
VExprContextSPtr ctx;
bool has_default = false;
// Prefer inline default_value_expr from TFileScanSlotInfo (new FE)
if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
RETURN_IF_ERROR(VExpr::create_expr_tree(slot_info.default_value_expr, ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
has_default = true;
} else if (slot_info.__isset.default_value_expr) {
// Empty nodes means null default (same as legacy empty TExpr)
has_default = true;
}
// Fall back to legacy default_value_of_src_slot map for mixed-version FE/BE
// and callers that have not preserved inline default_value_expr yet.
if (!has_default) {
auto legacy_it = _params->default_value_of_src_slot.find(slot_id);
if (legacy_it != std::end(_params->default_value_of_src_slot)) {
if (!legacy_it->second.nodes.empty()) {
RETURN_IF_ERROR(VExpr::create_expr_tree(legacy_it->second, ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
}
has_default = true;
}
}
if (has_default) {
// if expr is empty, the default value will be null
_col_default_value_ctx.emplace(col_name, ctx);
}
}
// Populate default_expr in each ColumnDescriptor from _col_default_value_ctx.
// This makes default values available to readers via column_descs, eliminating the
// need for the separate _generate_missing_columns roundtrip.
for (auto& col_desc : _column_descs) {
auto it = _col_default_value_ctx.find(col_desc.name);
if (it != _col_default_value_ctx.end()) {
col_desc.default_expr = it->second;
}
}
if (_is_load) {
// follow desc expr map is only for load task.
bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
int idx = 0;
for (auto* slot_desc : _output_tuple_desc->slots()) {
auto it = _params->expr_of_dest_slot.find(slot_desc->id());
if (it == std::end(_params->expr_of_dest_slot)) {
return Status::InternalError("No expr for dest slot, id={}, name={}",
slot_desc->id(), slot_desc->col_name());
}
VExprContextSPtr ctx;
if (!it->second.nodes.empty()) {
RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
}
_dest_vexpr_ctx.emplace_back(ctx);
_dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
if (has_slot_id_map) {
auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
_src_slot_descs_order_by_dest.emplace_back(nullptr);
} else {
auto _src_slot_it = full_src_slot_map.find(it1->second);
if (_src_slot_it == std::end(full_src_slot_map)) {
return Status::InternalError("No src slot {} in src slot descs",
it1->second);
}
_dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
full_src_index_map[_src_slot_it->first]);
_src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
}
}
}
}
return Status::OK();
}
bool FileScanner::_should_enable_condition_cache() {
DCHECK(_should_enable_condition_cache_handler != nullptr);
return _condition_cache_digest != 0 && (this->*_should_enable_condition_cache_handler)() &&
(!_conjuncts.empty() || !_push_down_conjuncts.empty());
}
bool FileScanner::_should_enable_condition_cache_for_load() const {
return false;
}
bool FileScanner::_should_enable_condition_cache_for_query() const {
return true;
}
bool FileScanner::_should_push_down_predicates(TFileFormatType::type format_type) const {
DCHECK(_should_push_down_predicates_handler != nullptr);
return (this->*_should_push_down_predicates_handler)(format_type);
}
bool FileScanner::_should_push_down_predicates_for_load(TFileFormatType::type format_type) const {
static_cast<void>(format_type);
return false;
}
bool FileScanner::_should_push_down_predicates_for_query(TFileFormatType::type format_type) const {
// JNI readers handle predicate conversion in their own paths.
return format_type != TFileFormatType::FORMAT_JNI;
}
void FileScanner::_init_reader_condition_cache() {
_condition_cache = nullptr;
_condition_cache_ctx = nullptr;
if (!_should_enable_condition_cache() || !_cur_reader) {
return;
}
// Disable condition cache when delete operations exist (e.g. Iceberg position/equality
// deletes, Hive ACID deletes). Cached granule results may become stale if delete files
// change between queries while the data file's cache key remains the same.
if (_cur_reader->has_delete_operations()) {
return;
}
auto* cache = segment_v2::ConditionCache::instance();
_condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
_current_range.path,
_current_range.__isset.modification_time ? _current_range.modification_time : 0,
_current_range.__isset.file_size ? _current_range.file_size : -1,
_condition_cache_digest,
_current_range.__isset.start_offset ? _current_range.start_offset : 0,
_current_range.__isset.size ? _current_range.size : -1);
segment_v2::ConditionCacheHandle handle;
auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
if (condition_cache_hit) {
_condition_cache = handle.get_filter_result();
_condition_cache_hit_count++;
} else {
// Allocate cache pre-sized to total number of granules.
// We add +1 as a safety margin: when a file is split across multiple scanners
// and the first row of this scanner's range is not aligned to a granule boundary,
// the data may span one more granule than ceil(total_rows / GRANULE_SIZE).
// The extra element costs only 1 bit and never affects correctness (an extra
// false-granule beyond the actual data range won't overlap any real row range).
int64_t total_rows = _cur_reader->get_total_rows();
if (total_rows > 0) {
size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
ConditionCacheContext::GRANULE_SIZE;
_condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
}
}
if (_condition_cache) {
// Create context to pass to readers (native readers use it; non-native readers ignore it)
_condition_cache_ctx = std::make_shared<ConditionCacheContext>();
_condition_cache_ctx->is_hit = condition_cache_hit;
_condition_cache_ctx->filter_result = _condition_cache;
_cur_reader->set_condition_cache_context(_condition_cache_ctx);
}
}
void FileScanner::_finalize_reader_condition_cache() {
if (!_should_enable_condition_cache() || !_condition_cache_ctx ||
_condition_cache_ctx->is_hit) {
_condition_cache = nullptr;
_condition_cache_ctx = nullptr;
return;
}
// Only store the cache if the reader was fully consumed. If the scan was
// truncated early (e.g. by LIMIT), the cache is incomplete — unread granules
// would remain false and cause surviving rows to be incorrectly skipped on HIT.
if (!_cur_reader_eof) {
_condition_cache = nullptr;
_condition_cache_ctx = nullptr;
return;
}
auto* cache = segment_v2::ConditionCache::instance();
cache->insert(_condition_cache_key, std::move(_condition_cache));
_condition_cache = nullptr;
_condition_cache_ctx = nullptr;
}
Status FileScanner::close(RuntimeState* state) {
if (!_try_close()) {
return Status::OK();
}
_finalize_reader_condition_cache();
if (_cur_reader) {
RETURN_IF_ERROR(_cur_reader->close());
}
RETURN_IF_ERROR(Scanner::close(state));
return Status::OK();
}
void FileScanner::try_stop() {
Scanner::try_stop();
if (_io_ctx) {
_io_ctx->should_stop = true;
}
}
void FileScanner::update_realtime_counters() {
FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
_file_reader_stats->read_rows);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
_file_reader_stats->read_bytes);
int64_t delta_bytes_read_from_local =
_file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
int64_t delta_bytes_read_from_remote =
_file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
if (_file_cache_statistics->bytes_read_from_local == 0 &&
_file_cache_statistics->bytes_read_from_remote == 0) {
_state->get_query_ctx()
->resource_ctx()
->io_context()
->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
_file_reader_stats->read_bytes);
} else {
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
delta_bytes_read_from_local);
_state->get_query_ctx()
->resource_ctx()
->io_context()
->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
delta_bytes_read_from_local);
DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
delta_bytes_read_from_remote);
}
COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
_file_reader_stats->read_bytes = 0;
_file_reader_stats->read_rows = 0;
_last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
_last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
}
void FileScanner::_collect_profile_before_close() {
Scanner::_collect_profile_before_close();
if (config::enable_file_cache && _state->query_options().enable_file_cache &&
_profile != nullptr) {
io::FileCacheProfileReporter cache_profile(_profile);
cache_profile.update(_file_cache_statistics.get());
_state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
_file_cache_statistics->bytes_write_into_cache);
}
if (_cur_reader != nullptr) {
_cur_reader->collect_profile_before_close();
}
FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count);
if (_io_ctx) {
COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
_io_ctx->condition_cache_filtered_rows);
}
DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
}
} // namespace doris