blob: c0b440fa6f301f4c97f3bbce949fc8244922062a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/scan/file_scanner.h"
#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <glog/logging.h>
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <map>
#include <ranges>
#include <tuple>
#include <unordered_map>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "exec/rowid_fetcher.h"
#include "io/cache/block_file_cache_profile.h"
#include "io/fs/tracing_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/runtime_profile.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/common/string_ref.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_string.h"
#include "vec/exec/format/arrow/arrow_stream_reader.h"
#include "vec/exec/format/avro/avro_jni_reader.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/hive_reader.h"
#include "vec/exec/format/table/hudi_jni_reader.h"
#include "vec/exec/format/table/hudi_reader.h"
#include "vec/exec/format/table/iceberg_reader.h"
#include "vec/exec/format/table/lakesoul_jni_reader.h"
#include "vec/exec/format/table/max_compute_jni_reader.h"
#include "vec/exec/format/table/paimon_jni_reader.h"
#include "vec/exec/format/table/paimon_reader.h"
#include "vec/exec/format/table/transactional_hive_reader.h"
#include "vec/exec/format/table/trino_connector_jni_reader.h"
#include "vec/exec/format/text/text_reader.h"
#include "vec/exec/format/wal/wal_reader.h"
#include "vec/exec/scan/scan_node.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/function.h"
#include "vec/functions/function_string.h"
#include "vec/functions/simple_function_factory.h"
#include "vec/utils/stringop_substring.h"
namespace cctz {
class time_zone;
} // namespace cctz
namespace doris {
namespace vectorized {
class ShardedKVCache;
} // namespace vectorized
} // namespace doris
namespace doris::vectorized {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
FileScanner::FileScanner(
RuntimeState* state, pipeline::FileScanLocalState* local_state, int64_t limit,
std::shared_ptr<vectorized::SplitSourceConnector> split_source, RuntimeProfile* profile,
ShardedKVCache* kv_cache,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const std::unordered_map<std::string, int>* colname_to_slot_id)
: Scanner(state, local_state, limit, profile),
_split_source(split_source),
_cur_reader(nullptr),
_cur_reader_eof(false),
_colname_to_value_range(colname_to_value_range),
_kv_cache(kv_cache),
_strict_mode(false),
_col_name_to_slot_id(colname_to_slot_id) {
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
_params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
} else {
// old fe thrift protocol
_params = _split_source->get_params();
}
if (_params->__isset.strict_mode) {
_strict_mode = _params->strict_mode;
}
// For load scanner, there are input and output tuple.
// For query scanner, there is only output tuple
_input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
_real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
_is_load = (_input_tuple_desc != nullptr);
}
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
RETURN_IF_ERROR(Scanner::init(state, conjuncts));
_get_block_timer =
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
_cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerCastInputBlockTime", 1);
_fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerFillMissingColumnTime", 1);
_pre_filter_timer =
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
_convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerConvertOuputBlockTime", 1);
_runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
_local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
_empty_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
_not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"NotFoundFileNum", TUnit::UNIT, 1);
_fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"FullySkippedFileNum", TUnit::UNIT, 1);
_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
_file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
FileReadBytesProfile, TUnit::BYTES, 1);
_file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"FileReadCalls", TUnit::UNIT, 1);
_file_read_time_counter =
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
_runtime_filter_partition_pruned_range_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
_file_cache_statistics.reset(new io::FileCacheStatistics());
_file_reader_stats.reset(new io::FileReaderStats());
RETURN_IF_ERROR(_init_io_ctx());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
_io_ctx->file_reader_stats = _file_reader_stats.get();
_io_ctx->is_disposable = _state->query_options().disable_file_cache;
if (_is_load) {
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_input_tuple_desc->id()}),
std::vector<bool>({false})));
// prepare pre filters
if (_params->__isset.pre_filter_exprs_list) {
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_trees(
_params->pre_filter_exprs_list, _pre_conjunct_ctxs));
} else if (_params->__isset.pre_filter_exprs) {
VExprContextSPtr context;
RETURN_IF_ERROR(
doris::vectorized::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
_pre_conjunct_ctxs.emplace_back(context);
}
for (auto& conjunct : _pre_conjunct_ctxs) {
RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
RETURN_IF_ERROR(conjunct->open(_state));
}
_dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_output_tuple_desc->id()}),
std::vector<bool>({false})));
}
_default_val_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_real_tuple_desc->id()}),
std::vector<bool>({false})));
return Status::OK();
}
// check if the expr is a partition pruning expr
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
if (expr->is_slot_ref()) {
auto* slot_ref = static_cast<VSlotRef*>(expr.get());
return _partition_slot_index_map.find(slot_ref->slot_id()) !=
_partition_slot_index_map.end();
}
if (expr->is_literal()) {
return true;
}
return std::ranges::all_of(expr->children(), [this](const auto& child) {
return _check_partition_prune_expr(child);
});
}
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
_runtime_filter_partition_prune_ctxs.clear();
for (auto& conjunct : _conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto expr = impl ? impl : conjunct->root();
if (_check_partition_prune_expr(expr)) {
_runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
}
}
}
void FileScanner::_init_runtime_filter_partition_prune_block() {
// init block with empty column
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
// should be ignored from reading
continue;
}
_runtime_filter_partition_prune_block.insert(
ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}
}
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
SCOPED_TIMER(_runtime_filter_partition_prune_timer);
if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
return Status::OK();
}
size_t partition_value_column_size = 1;
// 1. Get partition key values to string columns.
std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
for (auto const& partition_col_desc : _partition_col_descs) {
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
auto data_type = partition_slot_desc->get_data_type_ptr();
auto test_serde = data_type->get_serde();
auto partition_value_column = data_type->create_column();
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
Slice slice(partition_value.data(), partition_value.size());
uint64_t num_deserialized = 0;
DataTypeSerDe::FormatOptions options {};
if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
// for iceberg/paimon table
// NOTICE: column is always be nullable for iceberg/paimon table now
DCHECK(data_type->is_nullable());
test_serde = test_serde->get_nested_serdes()[0];
auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
if (_partition_value_is_null[partition_slot_desc->col_name()]) {
null_column->insert_many_defaults(partition_value_column_size);
} else {
// If the partition value is not null, we set null map to 0 and deserialize it normally.
null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
null_column->get_nested_column(), slice, partition_value_column_size,
&num_deserialized, options));
}
} else {
// for hive/hudi table, the null value is set as "\\N"
// TODO: this will be unified as iceberg/paimon table in the future
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, options));
}
partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}
// 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
// 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
size_t index = 0;
bool first_column_filled = false;
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
// should be ignored from reading
continue;
}
if (partition_slot_id_to_column.find(slot_desc->id()) !=
partition_slot_id_to_column.end()) {
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(
ColumnNullable::create(
std::move(partition_value_column),
ColumnUInt8::create(partition_value_column_size, 0)),
data_type, slot_desc->col_name()));
} else {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
slot_desc->col_name()));
}
if (index == 0) {
first_column_filled = true;
}
}
index++;
}
// 2.2 Execute conjuncts.
if (!first_column_filled) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
_runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
partition_value_column_size);
}
IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
&_runtime_filter_partition_prune_block,
&result_filter, &can_filter_all));
return Status::OK();
}
Status FileScanner::_process_conjuncts_for_dict_filter() {
_slot_id_to_filter_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
for (auto& conjunct : _push_down_conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto cur_expr = impl ? impl : conjunct->root();
std::vector<int> slot_ids;
_get_slot_ids(cur_expr.get(), &slot_ids);
if (slot_ids.size() == 0) {
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
return Status::OK();
}
bool single_slot = true;
for (int i = 1; i < slot_ids.size(); i++) {
if (slot_ids[i] != slot_ids[0]) {
single_slot = false;
break;
}
}
if (single_slot) {
SlotId slot_id = slot_ids[0];
_slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
} else {
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
}
}
return Status::OK();
}
Status FileScanner::_process_late_arrival_conjuncts() {
if (_push_down_conjuncts.size() < _conjuncts.size()) {
_push_down_conjuncts.clear();
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
}
RETURN_IF_ERROR(_process_conjuncts_for_dict_filter());
_discard_conjuncts();
}
if (_applied_rf_num == _total_rf_num) {
_local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
}
return Status::OK();
}
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
for (auto& child_expr : expr->children()) {
if (child_expr->is_slot_ref()) {
VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
slot_ids->emplace_back(slot_ref->slot_id());
} else {
_get_slot_ids(child_expr.get(), slot_ids);
}
}
}
Status FileScanner::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(Scanner::open(state));
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
if (_first_scan_range) {
RETURN_IF_ERROR(_init_expr_ctxes());
if (_state->query_options().enable_runtime_filter_partition_prune &&
!_partition_slot_index_map.empty()) {
_init_runtime_filter_partition_prune_ctxs();
_init_runtime_filter_partition_prune_block();
}
} else {
// there's no scan range in split source. stop scanner directly.
_scanner_eof = true;
}
return Status::OK();
}
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
Status st = _get_block_wrapped(state, block, eof);
if (!st.ok()) {
// add cur path in error msg for easy debugging
return std::move(st.append(". cur path: " + get_current_scan_range_name()));
}
return st;
}
// For query:
// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x - x
// get_next_block x x x - - - x
// _cast_to_input_block - - - - - - -
// _fill_columns_from_path - - - - x - x
// _fill_missing_columns - - - x - - x
// _convert_to_output_block - - - - - - -
//
// For load:
// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x x -
// get_next_block x x x - - x -
// _cast_to_input_block x x x - - x -
// _fill_columns_from_path - - - - x x -
// _fill_missing_columns - - - x - x -
// _convert_to_output_block - - - - - - x
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
do {
RETURN_IF_CANCELLED(state);
if (_cur_reader == nullptr || _cur_reader_eof) {
// The file may not exist because the file list is got from meta cache,
// And the file may already be removed from storage.
// Just ignore not found files.
Status st = _get_next_reader();
if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
_cur_reader_eof = true;
COUNTER_UPDATE(_not_found_file_counter, 1);
continue;
} else if (st.is<ErrorCode::END_OF_FILE>()) {
_cur_reader_eof = true;
COUNTER_UPDATE(_fully_skipped_file_counter, 1);
continue;
} else if (!st) {
return st;
}
}
if (_scanner_eof) {
*eof = true;
return Status::OK();
}
// Init src block for load job based on the data file schema (e.g. parquet)
// For query job, simply set _src_block_ptr to block.
size_t read_rows = 0;
RETURN_IF_ERROR(_init_src_block(block));
{
SCOPED_TIMER(_get_block_timer);
// Read next block.
// Some of column in block may not be filled (column not exist in file)
RETURN_IF_ERROR(
_cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
}
// use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
// may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
if (read_rows > 0) {
if ((!_cur_reader->count_read_rows()) && _io_ctx) {
_io_ctx->file_reader_stats->read_rows += read_rows;
}
// If the push_down_agg_type is COUNT, no need to do the rest,
// because we only save a number in block.
if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
// Convert the src block columns type to string in-place.
RETURN_IF_ERROR(_cast_to_input_block(block));
// FileReader can fill partition and missing columns itself
if (!_cur_reader->fill_all_columns()) {
// Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
// Fill columns not exist in file with null or default value
RETURN_IF_ERROR(_fill_missing_columns(read_rows));
}
// Apply _pre_conjunct_ctxs to filter src block.
RETURN_IF_ERROR(_pre_filter_src_block());
// Convert src block to output block (dest block), string to dest data type and apply filters.
RETURN_IF_ERROR(_convert_to_output_block(block));
// Truncate char columns or varchar columns if size is smaller than file columns
// or not found in the file column schema.
RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
}
}
break;
} while (true);
// Update filtered rows and unselected rows for load, reset counter.
// {
// state->update_num_rows_load_filtered(_counter.num_rows_filtered);
// state->update_num_rows_load_unselected(_counter.num_rows_unselected);
// _reset_counter();
// }
return Status::OK();
}
/**
* Check whether there are complex types in parquet/orc reader in broker/stream load.
* Broker/stream load will cast any type as string type, and complex types will be casted wrong.
* This is a temporary method, and will be replaced by tvf.
*/
Status FileScanner::_check_output_block_types() {
if (_is_load) {
TFileFormatType::type format_type = _params->format_type;
if (format_type == TFileFormatType::FORMAT_PARQUET ||
format_type == TFileFormatType::FORMAT_ORC) {
for (auto slot : _output_tuple_desc->slots()) {
if (is_complex_type(slot->type()->get_primitive_type())) {
return Status::InternalError(
"Parquet/orc doesn't support complex types in broker/stream load, "
"please use tvf(table value function) to insert complex types.");
}
}
}
}
return Status::OK();
}
Status FileScanner::_init_src_block(Block* block) {
if (!_is_load) {
_src_block_ptr = block;
return Status::OK();
}
RETURN_IF_ERROR(_check_output_block_types());
// if (_src_block_init) {
// _src_block.clear_column_data();
// _src_block_ptr = &_src_block;
// return Status::OK();
// }
_src_block.clear();
uint32_t idx = 0;
// slots in _input_tuple_desc contains all slots describe in load statement, eg:
// -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
// _input_tuple_desc will contains: k1, k2, tmp1
// and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
// _input_tuple_desc also contains columns from path
for (auto& slot : _input_tuple_desc->slots()) {
DataTypePtr data_type;
auto it = _slot_lower_name_to_col_type.find(slot->col_name());
if (slot->is_skip_bitmap_col()) {
_skip_bitmap_col_idx = idx;
}
if (_params->__isset.sequence_map_col) {
if (_params->sequence_map_col == slot->col_name()) {
_sequence_map_col_uid = slot->col_unique_id();
}
}
data_type =
it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
MutableColumnPtr data_column = data_type->create_column();
_src_block.insert(
ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
_src_block_name_to_idx.emplace(slot->col_name(), idx++);
}
if (_params->__isset.sequence_map_col) {
for (const auto& slot : _output_tuple_desc->slots()) {
// When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
// so we should get its column unique id from _output_tuple_desc
if (slot->is_sequence_col()) {
_sequence_col_uid = slot->col_unique_id();
}
}
}
_src_block_ptr = &_src_block;
_src_block_init = true;
return Status::OK();
}
Status FileScanner::_cast_to_input_block(Block* block) {
if (!_is_load) {
return Status::OK();
}
SCOPED_TIMER(_cast_to_input_block_timer);
// cast primitive type(PT0) to primitive type(PT1)
uint32_t idx = 0;
for (auto& slot_desc : _input_tuple_desc->slots()) {
if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
_slot_lower_name_to_col_type.end()) {
// skip columns which does not exist in file
continue;
}
if (slot_desc->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
// skip variant type
continue;
}
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
auto return_type = slot_desc->get_data_type_ptr();
// remove nullable here, let the get_function decide whether nullable
auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
ColumnsWithTypeAndName arguments {
arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
auto func_cast = SimpleFunctionFactory::instance().get_function(
"CAST", arguments, return_type,
{.enable_decimal256 = runtime_state()->enable_decimal256()});
if (!func_cast) {
return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
arg.type->get_name(), slot_desc->col_name(),
return_type->get_name());
}
idx = _src_block_name_to_idx[slot_desc->col_name()];
RETURN_IF_ERROR(
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
}
return Status::OK();
}
Status FileScanner::_fill_columns_from_path(size_t rows) {
if (!_fill_partition_from_path) {
return Status::OK();
}
DataTypeSerDe::FormatOptions _text_formatOptions;
for (auto& kv : _partition_col_descs) {
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
// _src_block_ptr points to a mutable block created by this class itself, so const_cast can be used here.
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
uint64_t num_deserialized = 0;
if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
&num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
if (num_deserialized != rows) {
return Status::InternalError(
"Failed to fill partition column: {}={} ."
"Number of rows expected to be written : {}, number of rows actually written : "
"{}",
slot_desc->col_name(), value, num_deserialized, rows);
}
}
return Status::OK();
}
Status FileScanner::_fill_missing_columns(size_t rows) {
if (_missing_cols.empty()) {
return Status::OK();
}
SCOPED_TIMER(_fill_missing_columns_timer);
for (auto& kv : _missing_col_descs) {
if (kv.second == nullptr) {
// no default column, fill with null
auto mutable_column = _src_block_ptr->get_by_name(kv.first).column->assume_mutable();
auto* nullable_column = static_cast<vectorized::ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
// fill with default value
auto& ctx = kv.second;
auto origin_column_num = _src_block_ptr->columns();
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
bool is_origin_column = result_column_id < origin_column_num;
if (!is_origin_column) {
// call resize because the first column of _src_block_ptr may not be filled by reader,
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
// has only one row.
auto result_column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
auto mutable_column = result_column_ptr->assume_mutable();
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a normal column
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
auto origin_column_type = _src_block_ptr->get_by_name(kv.first).type;
bool is_nullable = origin_column_type->is_nullable();
_src_block_ptr->replace_by_position(
_src_block_ptr->get_position_by_name(kv.first),
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
_src_block_ptr->erase(result_column_id);
}
}
}
return Status::OK();
}
Status FileScanner::_pre_filter_src_block() {
if (!_is_load) {
return Status::OK();
}
if (!_pre_conjunct_ctxs.empty()) {
SCOPED_TIMER(_pre_filter_timer);
auto origin_column_num = _src_block_ptr->columns();
auto old_rows = _src_block_ptr->rows();
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr,
origin_column_num));
_counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
}
return Status::OK();
}
Status FileScanner::_convert_to_output_block(Block* block) {
if (!_is_load) {
return Status::OK();
}
SCOPED_TIMER(_convert_to_output_block_timer);
// The block is passed from scanner context's free blocks,
// which is initialized by output columns
// so no need to clear it
// block->clear();
int ctx_idx = 0;
size_t rows = _src_block_ptr->rows();
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
auto& filter_map = filter_column->get_data();
// After convert, the column_ptr should be copied into output block.
// Can not use block->insert() because it may cause use_count() non-zero bug
MutableBlock mutable_output_block =
VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
auto& mutable_output_columns = mutable_output_block.mutable_columns();
std::vector<BitmapValue>* skip_bitmaps {nullptr};
if (_should_process_skip_bitmap_col()) {
auto* skip_bitmap_nullable_col_ptr =
assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
.column->assume_mutable()
.get());
skip_bitmaps = &(assert_cast<ColumnBitmap*>(
skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
->get_data());
// NOTE:
// - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
// __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
// - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
// so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
// So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
if (_sequence_map_col_uid != -1) {
for (int j = 0; j < rows; ++j) {
if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
(*skip_bitmaps)[j].add(_sequence_col_uid);
}
}
}
}
// for (auto slot_desc : _output_tuple_desc->slots()) {
for (int j = 0; j < mutable_output_columns.size(); ++j) {
auto slot_desc = _output_tuple_desc->slots()[j];
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx;
vectorized::ColumnPtr column_ptr;
auto& ctx = _dest_vexpr_ctx[dest_index];
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
// column_ptr maybe a ColumnConst, convert it to a normal column
column_ptr = column_ptr->convert_to_full_column_if_const();
DCHECK(column_ptr);
// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
// is likely to be nullable
if (LIKELY(column_ptr->is_nullable())) {
const auto* nullable_column =
reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
for (int i = 0; i < rows; ++i) {
if (filter_map[i] && nullable_column->is_null_at(i)) {
// skip checks for non-mentioned columns in flexible partial update
if (skip_bitmaps == nullptr ||
!skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
// clang-format off
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
!_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
filter_map[i] = false;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
auto raw_value =
_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
std::string raw_string = raw_value.to_string();
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
slot_desc->col_name(), _strict_mode, raw_string);
return fmt::to_string(error_msg);
}));
} else if (!slot_desc->is_nullable()) {
filter_map[i] = false;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
return fmt::to_string(error_msg);
}));
}
// clang-format on
}
}
}
if (!slot_desc->is_nullable()) {
column_ptr = remove_nullable(column_ptr);
}
} else if (slot_desc->is_nullable()) {
column_ptr = make_nullable(column_ptr);
}
mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
ctx_idx++;
}
// after do the dest block insert operation, clear _src_block to remove the reference of origin column
_src_block_ptr->clear();
size_t dest_size = block->columns();
// do filter
block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column),
std::make_shared<vectorized::DataTypeUInt8>(),
"filter column"));
RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size));
_counter.num_rows_filtered += rows - block->rows();
return Status::OK();
}
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
// Truncate char columns or varchar columns if size is smaller than file columns
// or not found in the file column schema.
if (!_state->query_options().truncate_char_or_varchar_columns) {
return Status::OK();
}
int idx = 0;
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
const auto& type = slot_desc->type();
if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
++idx;
continue;
}
auto iter = _source_file_col_name_types.find(slot_desc->col_name());
if (iter != _source_file_col_name_types.end()) {
const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
int l = -1;
if (auto* ftype = check_and_get_data_type<DataTypeString>(
remove_nullable(file_type_desc).get())) {
l = ftype->len();
}
if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
(assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
l < 0)) {
_truncate_char_or_varchar_column(
block, idx,
assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
}
} else {
_truncate_char_or_varchar_column(
block, idx,
assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
}
++idx;
}
return Status::OK();
}
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
auto int_type = std::make_shared<DataTypeInt32>();
uint32_t num_columns_without_result = block->columns();
const ColumnNullable* col_nullable =
assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
block->replace_by_position(idx, std::move(string_column_ptr));
block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
"const 1"}); // pos is 1
block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
fmt::format("const {}", len)}); // len
block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
ColumnNumbers temp_arguments(3);
temp_arguments[0] = idx; // str column
temp_arguments[1] = num_columns_without_result; // pos
temp_arguments[2] = num_columns_without_result + 1; // len
uint32_t result_column_id = num_columns_without_result + 2;
SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
null_map_column_ptr);
block->replace_by_position(idx, std::move(res));
Block::erase_useless_column(block, num_columns_without_result);
}
Status FileScanner::_create_row_id_column_iterator() {
auto& id_file_map = _state->get_id_file_map();
auto file_id = id_file_map->get_file_mapping_id(std::make_shared<FileMapping>(
((pipeline::FileScanLocalState*)_local_state)->parent_id(), _current_range,
_should_enable_file_meta_cache()));
_row_id_column_iterator_pair.first = std::make_shared<RowIdColumnIteratorV2>(
IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
return Status::OK();
}
Status FileScanner::_get_next_reader() {
while (true) {
if (_cur_reader) {
_cur_reader->collect_profile_before_close();
RETURN_IF_ERROR(_cur_reader->close());
_state->update_num_finished_scan_range(1);
}
_cur_reader.reset(nullptr);
_src_block_init = false;
bool has_next = _first_scan_range;
if (!_first_scan_range) {
RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
}
_first_scan_range = false;
if (!has_next || _should_stop) {
_scanner_eof = true;
return Status::OK();
}
const TFileRangeDesc& range = _current_range;
_current_range_path = range.path;
if (!_partition_slot_descs.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_partition_columns());
if (_state->query_options().enable_runtime_filter_partition_prune) {
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
// by runtime filter partition prune
if (_push_down_conjuncts.size() < _conjuncts.size()) {
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
_init_runtime_filter_partition_prune_ctxs();
}
bool can_filter_all = false;
RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
if (can_filter_all) {
// this range can be filtered out by runtime filter partition pruning
// so we need to skip this range
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
continue;
}
}
}
// create reader for specific format
Status init_status = Status::OK();
TFileFormatType::type format_type = _get_current_format_type();
// for compatibility, this logic is deprecated in 3.1
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
if (range.table_format_params.table_format_type == "paimon" &&
!range.table_format_params.paimon_params.__isset.paimon_split) {
// use native reader
auto format = range.table_format_params.paimon_params.file_format;
if (format == "orc") {
format_type = TFileFormatType::FORMAT_ORC;
} else if (format == "parquet") {
format_type = TFileFormatType::FORMAT_PARQUET;
} else {
return Status::InternalError("Not supported paimon file format: {}", format);
}
}
}
// JNI reader can only push down column value range
bool push_down_predicates = !_is_load && format_type != TFileFormatType::FORMAT_JNI;
bool need_to_get_parsed_schema = false;
switch (format_type) {
case TFileFormatType::FORMAT_JNI: {
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "max_compute") {
const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
_real_tuple_desc->table_desc());
if (!mc_desc->init_status()) {
return mc_desc->init_status();
}
std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
range, _state, _profile);
init_status = mc_reader->init_reader(_colname_to_value_range);
_cur_reader = std::move(mc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
_cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
range, _params);
init_status = ((PaimonJniReader*)(_cur_reader.get()))
->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
_cur_reader = HudiJniReader::create_unique(*_params,
range.table_format_params.hudi_params,
_file_slot_descs, _state, _profile);
init_status =
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "lakesoul") {
_cur_reader =
LakeSoulJniReader::create_unique(range.table_format_params.lakesoul_params,
_file_slot_descs, _state, _profile);
init_status = ((LakeSoulJniReader*)_cur_reader.get())
->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "trino_connector") {
_cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
_profile, range);
init_status = ((TrinoConnectorJniReader*)(_cur_reader.get()))
->init_reader(_colname_to_value_range);
}
break;
}
case TFileFormatType::FORMAT_PARQUET: {
auto file_meta_cache_ptr = _should_enable_file_meta_cache()
? ExecEnv::GetInstance()->file_meta_cache()
: nullptr;
std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
_state->query_options().enable_parquet_lazy_mat);
if (_row_id_column_iterator_pair.second != -1) {
RETURN_IF_ERROR(_create_row_id_column_iterator());
parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
}
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
need_to_get_parsed_schema = true;
break;
}
case TFileFormatType::FORMAT_ORC: {
auto file_meta_cache_ptr = _should_enable_file_meta_cache()
? ExecEnv::GetInstance()->file_meta_cache()
: nullptr;
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
_state->query_options().enable_orc_lazy_mat);
if (_row_id_column_iterator_pair.second != -1) {
RETURN_IF_ERROR(_create_row_id_column_iterator());
orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
}
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
need_to_get_parsed_schema = true;
break;
}
case TFileFormatType::FORMAT_CSV_PLAIN:
case TFileFormatType::FORMAT_CSV_GZ:
case TFileFormatType::FORMAT_CSV_BZ2:
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
case TFileFormatType::FORMAT_PROTO: {
auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, _io_ctx.get());
init_status = reader->init_reader(_is_load);
_cur_reader = std::move(reader);
break;
}
case TFileFormatType::FORMAT_TEXT: {
auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, _io_ctx.get());
init_status = reader->init_reader(_is_load);
_cur_reader = std::move(reader);
break;
}
case TFileFormatType::FORMAT_JSON: {
_cur_reader =
NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, &_scanner_eof, _io_ctx.get());
init_status = ((NewJsonReader*)(_cur_reader.get()))
->init_reader(_col_default_value_ctx, _is_load);
break;
}
case TFileFormatType::FORMAT_AVRO: {
_cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs,
range);
init_status =
((AvroJNIReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range);
break;
}
case TFileFormatType::FORMAT_WAL: {
_cur_reader = WalReader::create_unique(_state);
init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
break;
}
case TFileFormatType::FORMAT_ARROW: {
_cur_reader = ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
range, _file_slot_descs, _io_ctx.get());
init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
break;
}
default:
return Status::NotSupported("Not supported create reader for file format: {}.",
to_string(_params->format_type));
}
if (_cur_reader == nullptr) {
return Status::NotSupported(
"Not supported create reader for table format: {} / file format: {}.",
range.__isset.table_format_params ? range.table_format_params.table_format_type
: "NotSet",
to_string(_params->format_type));
}
COUNTER_UPDATE(_file_counter, 1);
// The FileScanner for external table may try to open not exist files,
// Because FE file cache for external table may out of date.
// So, NOT_FOUND for FileScanner is not a fail case.
// Will remove this after file reader refactor.
if (init_status.is<END_OF_FILE>()) {
COUNTER_UPDATE(_empty_file_counter, 1);
continue;
} else if (init_status.is<ErrorCode::NOT_FOUND>()) {
if (config::ignore_not_found_file_in_external_table) {
COUNTER_UPDATE(_not_found_file_counter, 1);
continue;
}
return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
} else if (!init_status.ok()) {
return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
}
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (_get_push_down_agg_type() == TPushAggOp::type::COUNT &&
range.__isset.table_format_params &&
range.table_format_params.table_level_row_count >= 0) {
// This is a table level count push down operation, no need to call
// _set_fill_or_truncate_columns.
// in _set_fill_or_truncate_columns, we will use [range.start_offset, end offset]
// to filter the row group. But if this is count push down, the offset is undefined,
// causing incorrect row group filter and may return empty result.
} else {
RETURN_IF_ERROR(_set_fill_or_truncate_columns(need_to_get_parsed_schema));
}
_cur_reader_eof = false;
break;
}
return Status::OK();
}
Status FileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
FileMetaCache* file_meta_cache_ptr) {
const TFileRangeDesc& range = _current_range;
Status init_status = Status::OK();
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
_io_ctx.get(), file_meta_cache_ptr);
init_status = iceberg_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
std::unique_ptr<PaimonParquetReader> paimon_reader = PaimonParquetReader::create_unique(
std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(),
file_meta_cache_ptr);
init_status = paimon_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
std::unique_ptr<HudiParquetReader> hudi_reader = HudiParquetReader::create_unique(
std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(),
file_meta_cache_ptr);
init_status = hudi_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
_cur_reader = std::move(hudi_reader);
} else if (range.table_format_params.table_format_type == "hive") {
auto hive_reader = HiveParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _io_ctx.get(),
&_is_file_slot, file_meta_cache_ptr);
init_status = hive_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
_cur_reader = std::move(hive_reader);
} else if (range.table_format_params.table_format_type == "tvf") {
const FieldDescriptor* parquet_meta = nullptr;
RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
DCHECK(parquet_meta != nullptr);
// TVF will first `get_parsed_schema` to obtain file information from BE, and FE will convert
// the column names to lowercase (because the query process is case-insensitive),
// so the lowercase file column names are used here to match the read columns.
std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name(
_real_tuple_desc, *parquet_meta, tvf_info_node));
init_status = parquet_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, tvf_info_node);
_cur_reader = std::move(parquet_reader);
} else if (_is_load) {
const FieldDescriptor* parquet_meta = nullptr;
RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
DCHECK(parquet_meta != nullptr);
// Load is case-insensitive, so you to match the columns in the file.
std::map<std::string, std::string> file_lower_name_to_native;
for (const auto& parquet_field : parquet_meta->get_fields_schema()) {
file_lower_name_to_native.emplace(doris::to_lower(parquet_field.name),
parquet_field.name);
}
auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
for (const auto slot : _real_tuple_desc->slots()) {
if (file_lower_name_to_native.contains(slot->col_name())) {
load_info_node->add_children(slot->col_name(),
file_lower_name_to_native[slot->col_name()],
TableSchemaChangeHelper::ConstNode::get_instance());
// For Load, `file_scanner` will create block columns using the file type,
// there is no schema change when reading inside the struct,
// so use `TableSchemaChangeHelper::ConstNode`.
} else {
load_info_node->add_not_exist_children(slot->col_name());
}
}
init_status = parquet_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, load_info_node);
_cur_reader = std::move(parquet_reader);
}
return init_status;
}
Status FileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
FileMetaCache* file_meta_cache_ptr) {
const TFileRangeDesc& range = _current_range;
Status init_status = Status::OK();
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "transactional_hive") {
std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, _state,
*_params, range, _io_ctx.get(),
file_meta_cache_ptr);
init_status = tran_orc_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
_cur_reader = std::move(tran_orc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
file_meta_cache_ptr);
init_status = iceberg_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
std::unique_ptr<PaimonOrcReader> paimon_reader =
PaimonOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params,
range, _io_ctx.get(), file_meta_cache_ptr);
init_status = paimon_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
std::unique_ptr<HudiOrcReader> hudi_reader =
HudiOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params,
range, _io_ctx.get(), file_meta_cache_ptr);
init_status = hudi_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
_cur_reader = std::move(hudi_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hive") {
std::unique_ptr<HiveOrcReader> hive_reader = HiveOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(),
&_is_file_slot, file_meta_cache_ptr);
init_status = hive_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
_cur_reader = std::move(hive_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "tvf") {
const orc::Type* orc_type_ptr = nullptr;
RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_name(
_real_tuple_desc, orc_type_ptr, tvf_info_node));
init_status = orc_reader->init_reader(
&_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts, tvf_info_node);
_cur_reader = std::move(orc_reader);
} else if (_is_load) {
const orc::Type* orc_type_ptr = nullptr;
RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
std::map<std::string, std::string> file_lower_name_to_native;
for (uint64_t idx = 0; idx < orc_type_ptr->getSubtypeCount(); idx++) {
file_lower_name_to_native.emplace(doris::to_lower(orc_type_ptr->getFieldName(idx)),
orc_type_ptr->getFieldName(idx));
}
auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
for (const auto slot : _real_tuple_desc->slots()) {
if (file_lower_name_to_native.contains(slot->col_name())) {
load_info_node->add_children(slot->col_name(),
file_lower_name_to_native[slot->col_name()],
TableSchemaChangeHelper::ConstNode::get_instance());
} else {
load_info_node->add_not_exist_children(slot->col_name());
}
}
init_status = orc_reader->init_reader(
&_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts, load_info_node);
_cur_reader = std::move(orc_reader);
}
return init_status;
}
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
_missing_cols.clear();
_slot_lower_name_to_col_type.clear();
std::unordered_map<std::string, DataTypePtr> name_to_col_type;
RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, &_missing_cols));
for (const auto& [col_name, col_type] : name_to_col_type) {
_slot_lower_name_to_col_type.emplace(to_lower(col_name), col_type);
}
if (!_fill_partition_from_path && config::enable_iceberg_partition_column_fallback) {
// check if the cols of _partition_col_descs are in _missing_cols
// if so, set _fill_partition_from_path to true and remove the col from _missing_cols
for (const auto& [col_name, col_type] : _partition_col_descs) {
if (_missing_cols.contains(col_name)) {
_fill_partition_from_path = true;
_missing_cols.erase(col_name);
}
}
}
RETURN_IF_ERROR(_generate_missing_columns());
if (_fill_partition_from_path) {
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
} else {
// If the partition columns are not from path, we only fill the missing columns.
RETURN_IF_ERROR(_cur_reader->set_fill_columns({}, _missing_col_descs));
}
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
fmt::format_to(col_buf, " {}", col);
}
VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
_current_range.path);
}
RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
return Status::OK();
}
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
_source_file_col_name_types.clear();
// The col names and types of source file, such as parquet, orc files.
if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
std::vector<std::string> source_file_col_names;
std::vector<DataTypePtr> source_file_col_types;
Status status =
_cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
return status;
}
DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
for (int i = 0; i < source_file_col_names.size(); ++i) {
_source_file_col_name_types[to_lower(source_file_col_names[i])] =
source_file_col_types[i];
}
}
return Status::OK();
}
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
_current_range = range;
_file_cache_statistics.reset(new io::FileCacheStatistics());
_file_reader_stats.reset(new io::FileReaderStats());
_file_read_bytes_counter =
ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
_file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
RETURN_IF_ERROR(_init_io_ctx());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
_io_ctx->file_reader_stats = _file_reader_stats.get();
_default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc, false));
RETURN_IF_ERROR(_init_expr_ctxes());
// Since only one column is read from the file, there is no need to filter, so set these variables to empty.
static std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
_colname_to_value_range = &colname_to_value_range;
_push_down_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
_slot_id_to_filter_conjuncts.clear();
_kv_cache = nullptr;
return Status::OK();
}
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
const std::list<int64_t>& row_ids, Block* result_block,
const ExternalFileMappingInfo& external_info,
int64_t* init_reader_ms, int64_t* get_block_ms) {
_current_range = range;
RETURN_IF_ERROR(_generate_partition_columns());
TFileFormatType::type format_type = _get_current_format_type();
Status init_status = Status::OK();
auto file_meta_cache_ptr = external_info.enable_file_meta_cache
? ExecEnv::GetInstance()->file_meta_cache()
: nullptr;
RETURN_IF_ERROR(scope_timer_run(
[&]() -> Status {
switch (format_type) {
case TFileFormatType::FORMAT_PARQUET: {
std::unique_ptr<vectorized::ParquetReader> parquet_reader =
vectorized::ParquetReader::create_unique(
_profile, *_params, range, 1, &_state->timezone_obj(),
_io_ctx.get(), _state, file_meta_cache_ptr, false);
RETURN_IF_ERROR(parquet_reader->read_by_rows(row_ids));
RETURN_IF_ERROR(
_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
break;
}
case TFileFormatType::FORMAT_ORC: {
std::unique_ptr<vectorized::OrcReader> orc_reader =
vectorized::OrcReader::create_unique(
_profile, _state, *_params, range, 1, _state->timezone(),
_io_ctx.get(), file_meta_cache_ptr, false);
RETURN_IF_ERROR(orc_reader->read_by_rows(row_ids));
RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
break;
}
default: {
return Status::NotSupported(
"Not support create lines reader for file format: {},"
"only support parquet and orc.",
to_string(_params->format_type));
}
}
return Status::OK();
},
init_reader_ms));
RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
_cur_reader_eof = false;
RETURN_IF_ERROR(scope_timer_run(
[&]() -> Status {
while (!_cur_reader_eof) {
bool eof = false;
RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
}
return Status::OK();
},
get_block_ms));
_cur_reader->collect_profile_before_close();
RETURN_IF_ERROR(_cur_reader->close());
COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
return Status::OK();
}
Status FileScanner::_generate_partition_columns() {
_partition_col_descs.clear();
_partition_value_is_null.clear();
const TFileRangeDesc& range = _current_range;
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
if (slot_desc) {
auto it = _partition_slot_index_map.find(slot_desc->id());
if (it == std::end(_partition_slot_index_map)) {
return Status::InternalError("Unknown source slot descriptor, slot_id={}",
slot_desc->id());
}
const std::string& column_from_path = range.columns_from_path[it->second];
_partition_col_descs.emplace(slot_desc->col_name(),
std::make_tuple(column_from_path, slot_desc));
if (range.__isset.columns_from_path_is_null) {
_partition_value_is_null.emplace(slot_desc->col_name(),
range.columns_from_path_is_null[it->second]);
}
}
}
}
return Status::OK();
}
Status FileScanner::_generate_missing_columns() {
_missing_col_descs.clear();
if (!_missing_cols.empty()) {
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
continue;
}
auto it = _col_default_value_ctx.find(slot_desc->col_name());
if (it == _col_default_value_ctx.end()) {
return Status::InternalError("failed to find default value expr for slot: {}",
slot_desc->col_name());
}
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}
return Status::OK();
}
Status FileScanner::_init_expr_ctxes() {
std::map<SlotId, int> full_src_index_map;
std::map<SlotId, SlotDescriptor*> full_src_slot_map;
std::map<std::string, int> partition_name_to_key_index_map;
int index = 0;
for (const auto& slot_desc : _real_tuple_desc->slots()) {
full_src_slot_map.emplace(slot_desc->id(), slot_desc);
full_src_index_map.emplace(slot_desc->id(), index++);
}
// For external table query, find the index of column in path.
// Because query doesn't always search for all columns in a table
// and the order of selected columns is random.
// All ranges in _ranges vector should have identical columns_from_path_keys
// because they are all file splits for the same external table.
// So here use the first element of _ranges to fill the partition_name_to_key_index_map
if (_current_range.__isset.columns_from_path_keys) {
std::vector<std::string> key_map = _current_range.columns_from_path_keys;
if (!key_map.empty()) {
for (size_t i = 0; i < key_map.size(); i++) {
partition_name_to_key_index_map.emplace(key_map[i], i);
}
}
}
_num_of_columns_from_file = _params->num_of_columns_from_file;
for (const auto& slot_info : _params->required_slots) {
auto slot_id = slot_info.slot_id;
auto it = full_src_slot_map.find(slot_id);
if (it == std::end(full_src_slot_map)) {
return Status::InternalError(
fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
}
if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
_row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
continue;
}
if (slot_info.is_file_slot) {
_is_file_slot.emplace(slot_id);
_file_slot_descs.emplace_back(it->second);
_file_col_names.push_back(it->second->col_name());
}
if (partition_name_to_key_index_map.find(it->second->col_name()) !=
partition_name_to_key_index_map.end()) {
if (slot_info.is_file_slot) {
// If there is slot which is both a partition column and a file column,
// we should not fill the partition column from path.
_fill_partition_from_path = false;
} else if (!_fill_partition_from_path) {
// This should not happen
return Status::InternalError(
"Partition column {} is not a file column, but there is already a column "
"which is both a partition column and a file column.",
it->second->col_name());
}
_partition_slot_descs.emplace_back(it->second);
if (_is_load) {
auto iti = full_src_index_map.find(slot_id);
_partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
} else {
auto kit = partition_name_to_key_index_map.find(it->second->col_name());
_partition_slot_index_map.emplace(slot_id, kit->second);
}
}
}
// set column name to default value expr map
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
vectorized::VExprContextSPtr ctx;
auto it = _params->default_value_of_src_slot.find(slot_desc->id());
if (it != std::end(_params->default_value_of_src_slot)) {
if (!it->second.nodes.empty()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
}
// if expr is empty, the default value will be null
_col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
}
}
if (_is_load) {
// follow desc expr map is only for load task.
bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
int idx = 0;
for (auto slot_desc : _output_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
auto it = _params->expr_of_dest_slot.find(slot_desc->id());
if (it == std::end(_params->expr_of_dest_slot)) {
return Status::InternalError("No expr for dest slot, id={}, name={}",
slot_desc->id(), slot_desc->col_name());
}
vectorized::VExprContextSPtr ctx;
if (!it->second.nodes.empty()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
}
_dest_vexpr_ctx.emplace_back(ctx);
_dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
if (has_slot_id_map) {
auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
_src_slot_descs_order_by_dest.emplace_back(nullptr);
} else {
auto _src_slot_it = full_src_slot_map.find(it1->second);
if (_src_slot_it == std::end(full_src_slot_map)) {
return Status::InternalError("No src slot {} in src slot descs",
it1->second);
}
_dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
full_src_index_map[_src_slot_it->first]);
_src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
}
}
}
}
return Status::OK();
}
Status FileScanner::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
if (_cur_reader) {
RETURN_IF_ERROR(_cur_reader->close());
}
RETURN_IF_ERROR(Scanner::close(state));
return Status::OK();
}
void FileScanner::try_stop() {
Scanner::try_stop();
if (_io_ctx) {
_io_ctx->should_stop = true;
}
}
void FileScanner::update_realtime_counters() {
pipeline::FileScanLocalState* local_state =
static_cast<pipeline::FileScanLocalState*>(_local_state);
COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
_file_reader_stats->read_rows);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
_file_reader_stats->read_bytes);
if (_file_cache_statistics->bytes_read_from_local == 0 &&
_file_cache_statistics->bytes_read_from_remote == 0) {
_state->get_query_ctx()
->resource_ctx()
->io_context()
->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
_file_reader_stats->read_bytes);
} else {
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
_file_cache_statistics->bytes_read_from_local);
_state->get_query_ctx()
->resource_ctx()
->io_context()
->update_scan_bytes_from_remote_storage(
_file_cache_statistics->bytes_read_from_remote);
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
_file_cache_statistics->bytes_read_from_local);
DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
_file_cache_statistics->bytes_read_from_remote);
}
COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
_file_reader_stats->read_bytes = 0;
_file_reader_stats->read_rows = 0;
_file_cache_statistics->bytes_read_from_local = 0;
_file_cache_statistics->bytes_read_from_remote = 0;
}
void FileScanner::_collect_profile_before_close() {
Scanner::_collect_profile_before_close();
if (config::enable_file_cache && _state->query_options().enable_file_cache &&
_profile != nullptr) {
io::FileCacheProfileReporter cache_profile(_profile);
cache_profile.update(_file_cache_statistics.get());
_state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
_file_cache_statistics->bytes_write_into_cache);
}
if (_cur_reader != nullptr) {
_cur_reader->collect_profile_before_close();
}
pipeline::FileScanLocalState* local_state =
static_cast<pipeline::FileScanLocalState*>(_local_state);
COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
}
} // namespace doris::vectorized