blob: 76d7bc12ce609e1b472e376abd1abe22cae29852 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/scan/vfile_scanner.h"
#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <iterator>
#include <map>
#include <ostream>
#include <tuple>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "io/cache/block/block_file_cache_profile.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/string_ref.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/core/field.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_string.h"
#include "vec/exec/format/arrow/arrow_stream_reader.h"
#include "vec/exec/format/avro/avro_jni_reader.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/hudi_jni_reader.h"
#include "vec/exec/format/table/iceberg_reader.h"
#include "vec/exec/format/table/max_compute_jni_reader.h"
#include "vec/exec/format/table/paimon_reader.h"
#include "vec/exec/format/table/transactional_hive_reader.h"
#include "vec/exec/format/wal/wal_reader.h"
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/function.h"
#include "vec/functions/function_string.h"
#include "vec/functions/simple_function_factory.h"
namespace cctz {
class time_zone;
} // namespace cctz
namespace doris {
namespace vectorized {
class ShardedKVCache;
} // namespace vectorized
} // namespace doris
namespace doris::vectorized {
using namespace ErrorCode;
VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, RuntimeProfile* profile,
ShardedKVCache* kv_cache)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
_ranges(scan_range.ranges),
_next_range(0),
_cur_reader(nullptr),
_cur_reader_eof(false),
_kv_cache(kv_cache),
_strict_mode(false) {
if (scan_range.params.__isset.strict_mode) {
_strict_mode = scan_range.params.strict_mode;
}
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.count(parent->id()) > 0) {
_params = &(state->get_query_ctx()->file_scan_range_params_map[parent->id()]);
} else {
CHECK(scan_range.__isset.params);
_params = &(scan_range.params);
}
// For load scanner, there are input and output tuple.
// For query scanner, there is only output tuple
_input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
_real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
_is_load = (_input_tuple_desc != nullptr);
}
VFileScanner::VFileScanner(RuntimeState* state, pipeline::FileScanLocalState* local_state,
int64_t limit, const TFileScanRange& scan_range, RuntimeProfile* profile,
ShardedKVCache* kv_cache)
: VScanner(state, local_state, limit, profile),
_ranges(scan_range.ranges),
_next_range(0),
_cur_reader(nullptr),
_cur_reader_eof(false),
_kv_cache(kv_cache),
_strict_mode(false) {
if (scan_range.params.__isset.strict_mode) {
_strict_mode = scan_range.params.strict_mode;
}
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
_params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
} else {
CHECK(scan_range.__isset.params);
_params = &(scan_range.params);
}
// For load scanner, there are input and output tuple.
// For query scanner, there is only output tuple
_input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
_real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
_is_load = (_input_tuple_desc != nullptr);
}
Status VFileScanner::prepare(
const VExprContextSPtrs& conjuncts,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const std::unordered_map<std::string, int>* colname_to_slot_id) {
RETURN_IF_ERROR(VScanner::prepare(_state, conjuncts));
_colname_to_value_range = colname_to_value_range;
_col_name_to_slot_id = colname_to_slot_id;
if (get_parent() != nullptr) {
_get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime");
_open_reader_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerOpenReaderTime");
_cast_to_input_block_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerCastInputBlockTime");
_fill_path_columns_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillPathColumnTime");
_fill_missing_columns_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillMissingColumnTime");
_pre_filter_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerPreFilterTimer");
_convert_to_output_block_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
_empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT);
} else {
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime");
_open_reader_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerOpenReaderTime");
_cast_to_input_block_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerCastInputBlockTime");
_fill_path_columns_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerFillPathColumnTime");
_fill_missing_columns_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerFillMissingColumnTime");
_pre_filter_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerPreFilterTimer");
_convert_to_output_block_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerConvertOuputBlockTime");
_empty_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT);
}
_file_cache_statistics.reset(new io::FileCacheStatistics());
_io_ctx.reset(new io::IOContext());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
_io_ctx->query_id = &_state->query_id();
if (_is_load) {
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_input_tuple_desc->id()}),
std::vector<bool>({false})));
// prepare pre filters
if (_params->__isset.pre_filter_exprs_list) {
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_trees(
_params->pre_filter_exprs_list, _pre_conjunct_ctxs));
} else if (_params->__isset.pre_filter_exprs) {
VExprContextSPtr context;
RETURN_IF_ERROR(
doris::vectorized::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
_pre_conjunct_ctxs.emplace_back(context);
}
for (auto& conjunct : _pre_conjunct_ctxs) {
RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
RETURN_IF_ERROR(conjunct->open(_state));
}
_dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_output_tuple_desc->id()}),
std::vector<bool>({false})));
}
_default_val_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_real_tuple_desc->id()}),
std::vector<bool>({false})));
return Status::OK();
}
Status VFileScanner::_process_conjuncts_for_dict_filter() {
_slot_id_to_filter_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
for (auto& conjunct : _push_down_conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto cur_expr = impl ? impl : conjunct->root();
std::vector<int> slot_ids;
_get_slot_ids(cur_expr.get(), &slot_ids);
if (slot_ids.size() == 0) {
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
return Status::OK();
}
bool single_slot = true;
for (int i = 1; i < slot_ids.size(); i++) {
if (slot_ids[i] != slot_ids[0]) {
single_slot = false;
break;
}
}
if (single_slot) {
SlotId slot_id = slot_ids[0];
_slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
} else {
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
}
}
return Status::OK();
}
Status VFileScanner::_process_late_arrival_conjuncts() {
if (_push_down_conjuncts.size() < _conjuncts.size()) {
_push_down_conjuncts.clear();
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
}
RETURN_IF_ERROR(_process_conjuncts_for_dict_filter());
_discard_conjuncts();
}
if (_applied_rf_num == _total_rf_num) {
COUNTER_UPDATE(_has_fully_rf_file_counter, 1);
}
return Status::OK();
}
void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
for (auto& child_expr : expr->children()) {
if (child_expr->is_slot_ref()) {
VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
slot_ids->emplace_back(slot_ref->slot_id());
}
_get_slot_ids(child_expr.get(), slot_ids);
}
}
Status VFileScanner::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VScanner::open(state));
RETURN_IF_ERROR(_init_expr_ctxes());
return Status::OK();
}
// For query:
// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x - x
// get_next_block x x x - - - x
// _cast_to_input_block - - - - - - -
// _fill_columns_from_path - - - - x - x
// _fill_missing_columns - - - x - - x
// _convert_to_output_block - - - - - - -
//
// For load:
// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x x -
// get_next_block x x x - - x -
// _cast_to_input_block x x x - - x -
// _fill_columns_from_path - - - - x x -
// _fill_missing_columns - - - x - x -
// _convert_to_output_block - - - - - - x
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
do {
RETURN_IF_CANCELLED(state);
if (_cur_reader == nullptr || _cur_reader_eof) {
RETURN_IF_ERROR(_get_next_reader());
}
if (_scanner_eof) {
*eof = true;
return Status::OK();
}
// Init src block for load job based on the data file schema (e.g. parquet)
// For query job, simply set _src_block_ptr to block.
size_t read_rows = 0;
RETURN_IF_ERROR(_init_src_block(block));
{
SCOPED_TIMER(_get_block_timer);
// Read next block.
// Some of column in block may not be filled (column not exist in file)
RETURN_IF_ERROR(
_cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
}
// use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
// may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
if (read_rows > 0) {
// If the push_down_agg_type is COUNT, no need to do the rest,
// because we only save a number in block.
if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
// Convert the src block columns type to string in-place.
RETURN_IF_ERROR(_cast_to_input_block(block));
// FileReader can fill partition and missing columns itself
if (!_cur_reader->fill_all_columns()) {
// Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
// Fill columns not exist in file with null or default value
RETURN_IF_ERROR(_fill_missing_columns(read_rows));
}
// Apply _pre_conjunct_ctxs to filter src block.
RETURN_IF_ERROR(_pre_filter_src_block());
// Convert src block to output block (dest block), string to dest data type and apply filters.
RETURN_IF_ERROR(_convert_to_output_block(block));
// Truncate char columns or varchar columns if size is smaller than file columns
// or not found in the file column schema.
RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
}
break;
}
} while (true);
// Update filtered rows and unselected rows for load, reset counter.
// {
// state->update_num_rows_load_filtered(_counter.num_rows_filtered);
// state->update_num_rows_load_unselected(_counter.num_rows_unselected);
// _reset_counter();
// }
return Status::OK();
}
/**
* Check whether there are complex types in parquet/orc reader in broker/stream load.
* Broker/stream load will cast any type as string type, and complex types will be casted wrong.
* This is a temporary method, and will be replaced by tvf.
*/
Status VFileScanner::_check_output_block_types() {
if (_is_load) {
TFileFormatType::type format_type = _params->format_type;
if (format_type == TFileFormatType::FORMAT_PARQUET ||
format_type == TFileFormatType::FORMAT_ORC) {
for (auto slot : _output_tuple_desc->slots()) {
if (slot->type().is_complex_type()) {
return Status::InternalError(
"Parquet/orc doesn't support complex types in broker/stream load, "
"please use tvf(table value function) to insert complex types.");
}
}
}
}
return Status::OK();
}
Status VFileScanner::_init_src_block(Block* block) {
if (!_is_load) {
_src_block_ptr = block;
return Status::OK();
}
RETURN_IF_ERROR(_check_output_block_types());
// if (_src_block_init) {
// _src_block.clear_column_data();
// _src_block_ptr = &_src_block;
// return Status::OK();
// }
_src_block.clear();
size_t idx = 0;
// slots in _input_tuple_desc contains all slots describe in load statement, eg:
// -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
// _input_tuple_desc will contains: k1, k2, tmp1
// and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
// _input_tuple_desc also contains columns from path
for (auto& slot : _input_tuple_desc->slots()) {
DataTypePtr data_type;
auto it = _name_to_col_type.find(slot->col_name());
if (it == _name_to_col_type.end()) {
// not exist in file, using type from _input_tuple_desc
RETURN_IF_CATCH_EXCEPTION(data_type = DataTypeFactory::instance().create_data_type(
slot->type(), slot->is_nullable()));
} else {
RETURN_IF_CATCH_EXCEPTION(
data_type = DataTypeFactory::instance().create_data_type(it->second, true));
}
MutableColumnPtr data_column = data_type->create_column();
_src_block.insert(
ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
_src_block_name_to_idx.emplace(slot->col_name(), idx++);
}
_src_block_ptr = &_src_block;
_src_block_init = true;
return Status::OK();
}
Status VFileScanner::_cast_to_input_block(Block* block) {
if (!_is_load) {
return Status::OK();
}
SCOPED_TIMER(_cast_to_input_block_timer);
// cast primitive type(PT0) to primitive type(PT1)
size_t idx = 0;
for (auto& slot_desc : _input_tuple_desc->slots()) {
if (_name_to_col_type.find(slot_desc->col_name()) == _name_to_col_type.end()) {
// skip columns which does not exist in file
continue;
}
if (slot_desc->type().is_variant_type()) {
// skip variant type
continue;
}
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
auto return_type = slot_desc->get_data_type_ptr();
// remove nullable here, let the get_function decide whether nullable
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(
remove_nullable(return_type)->get_type_as_type_descriptor());
ColumnsWithTypeAndName arguments {
arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
auto func_cast =
SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type);
idx = _src_block_name_to_idx[slot_desc->col_name()];
RETURN_IF_ERROR(
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
}
return Status::OK();
}
Status VFileScanner::_fill_columns_from_path(size_t rows) {
DataTypeSerDe::FormatOptions _text_formatOptions;
for (auto& kv : _partition_col_descs) {
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
vector<Slice> slices(rows);
for (int i = 0; i < rows; i++) {
slices[i] = {value.data(), value.size()};
}
int num_deserialized = 0;
if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
if (num_deserialized != rows) {
return Status::InternalError(
"Failed to fill partition column: {}={} ."
"Number of rows expected to be written : {}, number of rows actually written : "
"{}",
slot_desc->col_name(), value, num_deserialized, rows);
}
}
return Status::OK();
}
Status VFileScanner::_fill_missing_columns(size_t rows) {
if (_missing_cols.empty()) {
return Status::OK();
}
SCOPED_TIMER(_fill_missing_columns_timer);
for (auto& kv : _missing_col_descs) {
if (kv.second == nullptr) {
// no default column, fill with null
auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(_src_block_ptr->get_by_name(kv.first).column)).mutate().get());
nullable_column->insert_many_defaults(rows);
} else {
// fill with default value
auto& ctx = kv.second;
auto origin_column_num = _src_block_ptr->columns();
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
bool is_origin_column = result_column_id < origin_column_num;
if (!is_origin_column) {
// call resize because the first column of _src_block_ptr may not be filled by reader,
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
// has only one row.
std::move(*_src_block_ptr->get_by_position(result_column_id).column)
.mutate()
->resize(rows);
auto result_column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
// result_column_ptr maybe a ColumnConst, convert it to a normal column
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
auto origin_column_type = _src_block_ptr->get_by_name(kv.first).type;
bool is_nullable = origin_column_type->is_nullable();
_src_block_ptr->replace_by_position(
_src_block_ptr->get_position_by_name(kv.first),
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
_src_block_ptr->erase(result_column_id);
}
}
}
return Status::OK();
}
Status VFileScanner::_pre_filter_src_block() {
if (!_is_load) {
return Status::OK();
}
if (!_pre_conjunct_ctxs.empty()) {
SCOPED_TIMER(_pre_filter_timer);
auto origin_column_num = _src_block_ptr->columns();
auto old_rows = _src_block_ptr->rows();
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr,
origin_column_num));
_counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
}
return Status::OK();
}
Status VFileScanner::_convert_to_output_block(Block* block) {
if (!_is_load) {
return Status::OK();
}
SCOPED_TIMER(_convert_to_output_block_timer);
// The block is passed from scanner context's free blocks,
// which is initialized by output columns
// so no need to clear it
// block->clear();
int ctx_idx = 0;
size_t rows = _src_block_ptr->rows();
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
auto& filter_map = filter_column->get_data();
// After convert, the column_ptr should be copied into output block.
// Can not use block->insert() because it may cause use_count() non-zero bug
MutableBlock mutable_output_block =
VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
auto& mutable_output_columns = mutable_output_block.mutable_columns();
// for (auto slot_desc : _output_tuple_desc->slots()) {
for (int i = 0; i < mutable_output_columns.size(); ++i) {
auto slot_desc = _output_tuple_desc->slots()[i];
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx;
vectorized::ColumnPtr column_ptr;
auto& ctx = _dest_vexpr_ctx[dest_index];
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
// column_ptr maybe a ColumnConst, convert it to a normal column
column_ptr = column_ptr->convert_to_full_column_if_const();
DCHECK(column_ptr != nullptr);
// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
// is likely to be nullable
if (LIKELY(column_ptr->is_nullable())) {
const ColumnNullable* nullable_column =
reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
for (int i = 0; i < rows; ++i) {
if (filter_map[i] && nullable_column->is_null_at(i)) {
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
!_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index])
.column->is_null_at(i)) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block_ptr->dump_one_line(i,
_num_of_columns_from_file);
},
[&]() -> std::string {
auto raw_value = _src_block_ptr->get_by_position(ctx_idx)
.column->get_data_at(i);
std::string raw_string = raw_value.to_string();
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
"column({}) value is incorrect while strict "
"mode is {}, "
"src value is {}",
slot_desc->col_name(), _strict_mode, raw_string);
return fmt::to_string(error_msg);
},
&_scanner_eof));
filter_map[i] = false;
} else if (!slot_desc->is_nullable()) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block_ptr->dump_one_line(i,
_num_of_columns_from_file);
},
[&]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
"column({}) values is null while columns is not "
"nullable",
slot_desc->col_name());
return fmt::to_string(error_msg);
},
&_scanner_eof));
filter_map[i] = false;
}
}
}
if (!slot_desc->is_nullable()) {
column_ptr = remove_nullable(column_ptr);
}
} else if (slot_desc->is_nullable()) {
column_ptr = make_nullable(column_ptr);
}
mutable_output_columns[i]->insert_range_from(*column_ptr, 0, rows);
ctx_idx++;
}
// after do the dest block insert operation, clear _src_block to remove the reference of origin column
_src_block_ptr->clear();
size_t dest_size = block->columns();
// do filter
block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column),
std::make_shared<vectorized::DataTypeUInt8>(),
"filter column"));
RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size));
_counter.num_rows_filtered += rows - block->rows();
return Status::OK();
}
Status VFileScanner::_truncate_char_or_varchar_columns(Block* block) {
// Truncate char columns or varchar columns if size is smaller than file columns
// or not found in the file column schema.
if (!_state->query_options().truncate_char_or_varchar_columns) {
return Status::OK();
}
int idx = 0;
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
const TypeDescriptor& type_desc = slot_desc->type();
if (type_desc.type != TYPE_VARCHAR && type_desc.type != TYPE_CHAR) {
++idx;
continue;
}
auto iter = _source_file_col_name_types.find(slot_desc->col_name());
if (iter != _source_file_col_name_types.end()) {
const TypeDescriptor* file_type_desc =
_source_file_col_name_types[slot_desc->col_name()];
if ((type_desc.len > 0) &&
(type_desc.len < file_type_desc->len || file_type_desc->len < 0)) {
_truncate_char_or_varchar_column(block, idx, type_desc.len);
}
} else {
_truncate_char_or_varchar_column(block, idx, type_desc.len);
}
++idx;
}
return Status::OK();
}
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
void VFileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
auto int_type = std::make_shared<DataTypeInt32>();
size_t num_columns_without_result = block->columns();
const ColumnNullable* col_nullable =
assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
block->replace_by_position(idx, std::move(string_column_ptr));
block->insert({int_type->create_column_const(block->rows(), to_field(1)), int_type,
"const 1"}); // pos is 1
block->insert({int_type->create_column_const(block->rows(), to_field(len)), int_type,
fmt::format("const {}", len)}); // len
block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
ColumnNumbers temp_arguments(3);
temp_arguments[0] = idx; // str column
temp_arguments[1] = num_columns_without_result; // pos
temp_arguments[2] = num_columns_without_result + 1; // len
size_t result_column_id = num_columns_without_result + 2;
SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
null_map_column_ptr);
block->replace_by_position(idx, std::move(res));
Block::erase_useless_column(block, num_columns_without_result);
}
Status VFileScanner::_get_next_reader() {
while (true) {
if (_cur_reader) {
RETURN_IF_ERROR(_cur_reader->close());
}
_cur_reader.reset(nullptr);
_src_block_init = false;
if (_next_range >= _ranges.size() || _should_stop) {
_scanner_eof = true;
_state->update_num_finished_scan_range(1);
return Status::OK();
}
if (_next_range != 0) {
_state->update_num_finished_scan_range(1);
}
const TFileRangeDesc& range = _ranges[_next_range++];
_current_range_path = range.path;
// create reader for specific format
Status init_status;
TFileFormatType::type format_type = _params->format_type;
// JNI reader can only push down column value range
bool push_down_predicates =
!_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
if (range.table_format_params.table_format_type == "hudi" &&
range.table_format_params.hudi_params.delta_logs.empty()) {
// fall back to native reader if there is no log file
format_type = TFileFormatType::FORMAT_PARQUET;
} else if (range.table_format_params.table_format_type == "paimon" &&
!range.table_format_params.paimon_params.__isset.paimon_split) {
// use native reader
auto format = range.table_format_params.paimon_params.file_format;
if (format == "orc") {
format_type = TFileFormatType::FORMAT_ORC;
} else if (format == "parquet") {
format_type = TFileFormatType::FORMAT_PARQUET;
} else {
return Status::InternalError("Not supported paimon file format: {}", format);
}
}
}
bool need_to_get_parsed_schema = false;
switch (format_type) {
case TFileFormatType::FORMAT_JNI: {
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "max_compute") {
const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
_real_tuple_desc->table_desc());
std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
range, _state, _profile);
init_status = mc_reader->init_reader(_colname_to_value_range);
_cur_reader = std::move(mc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
_cur_reader =
PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range);
init_status = ((PaimonJniReader*)(_cur_reader.get()))
->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
_cur_reader = HudiJniReader::create_unique(*_params,
range.table_format_params.hudi_params,
_file_slot_descs, _state, _profile);
init_status =
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
}
break;
}
case TFileFormatType::FORMAT_PARQUET: {
static const cctz::time_zone utc0 = cctz::utc_time_zone();
cctz::time_zone* tz;
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
// The timestmap generated by paimon does not carry metadata information (e.g., isAdjustToUTC, etc.),
// and the stored data is UTC0 by default, so it is directly set to the UTC time zone.
// In version 0.7, paimon fixed this issue and can remove the judgment here
tz = const_cast<cctz::time_zone*>(&utc0);
} else {
tz = const_cast<cctz::time_zone*>(&_state->timezone_obj());
}
std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size, tz,
_io_ctx.get(), _state,
config::max_external_file_meta_cache_num <= 0
? nullptr
: ExecEnv::GetInstance()->file_meta_cache(),
_state->query_options().enable_parquet_lazy_mat);
{
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
}
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergTableReader> iceberg_reader =
IcebergTableReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _kv_cache,
_io_ctx.get(), _get_push_down_count());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader = std::move(iceberg_reader);
} else {
std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_file_col_names, place_holder, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
_cur_reader = std::move(parquet_reader);
}
need_to_get_parsed_schema = true;
break;
}
case TFileFormatType::FORMAT_ORC: {
std::vector<orc::TypeKind>* unsupported_pushdown_types = nullptr;
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
static std::vector<orc::TypeKind> paimon_unsupport_type =
std::vector<orc::TypeKind> {orc::TypeKind::CHAR};
unsupported_pushdown_types = &paimon_unsupport_type;
}
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat,
unsupported_pushdown_types);
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "transactional_hive") {
std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
_state, *_params, range,
_io_ctx.get());
init_status = tran_orc_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range));
_cur_reader = std::move(tran_orc_reader);
} else {
init_status = orc_reader->init_reader(
&_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
_cur_reader = std::move(orc_reader);
}
need_to_get_parsed_schema = true;
break;
}
case TFileFormatType::FORMAT_CSV_PLAIN:
case TFileFormatType::FORMAT_CSV_GZ:
case TFileFormatType::FORMAT_CSV_BZ2:
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
case TFileFormatType::FORMAT_PROTO: {
_cur_reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, _io_ctx.get());
init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
break;
}
case TFileFormatType::FORMAT_JSON: {
_cur_reader =
NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, &_scanner_eof, _io_ctx.get());
init_status =
((NewJsonReader*)(_cur_reader.get()))->init_reader(_col_default_value_ctx);
break;
}
case TFileFormatType::FORMAT_AVRO: {
_cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs,
range);
init_status = ((AvroJNIReader*)(_cur_reader.get()))
->init_fetch_table_reader(_colname_to_value_range);
break;
}
case TFileFormatType::FORMAT_WAL: {
_cur_reader.reset(new WalReader(_state));
init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
break;
}
case TFileFormatType::FORMAT_ARROW: {
_cur_reader = ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
range, _file_slot_descs, _io_ctx.get());
init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
break;
}
default:
return Status::InternalError("Not supported file format: {}", _params->format_type);
}
if (init_status.is<END_OF_FILE>()) {
COUNTER_UPDATE(_empty_file_counter, 1);
continue;
} else if (!init_status.ok()) {
if (init_status.is<ErrorCode::NOT_FOUND>()) {
COUNTER_UPDATE(_empty_file_counter, 1);
LOG(INFO) << "failed to find file: " << range.path;
return init_status;
}
return Status::InternalError("failed to init reader for file {}, err: {}", range.path,
init_status.to_string());
}
COUNTER_UPDATE(_file_counter, 1);
_name_to_col_type.clear();
_missing_cols.clear();
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
RETURN_IF_ERROR(_generate_fill_columns());
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
fmt::format_to(col_buf, " {}", col);
}
VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
range.path);
}
_source_file_col_names.clear();
_source_file_col_types.clear();
_source_file_col_name_types.clear();
if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
Status status = _cur_reader->get_parsed_schema(&_source_file_col_names,
&_source_file_col_types);
if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
return status;
}
DCHECK(_source_file_col_names.size() == _source_file_col_types.size());
for (int i = 0; i < _source_file_col_names.size(); ++i) {
_source_file_col_name_types[_source_file_col_names[i]] = &_source_file_col_types[i];
}
}
_cur_reader_eof = false;
break;
}
return Status::OK();
}
Status VFileScanner::_generate_fill_columns() {
_partition_col_descs.clear();
_missing_col_descs.clear();
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
if (slot_desc) {
auto it = _partition_slot_index_map.find(slot_desc->id());
if (it == std::end(_partition_slot_index_map)) {
return Status::InternalError("Unknown source slot descriptor, slot_id={}",
slot_desc->id());
}
const std::string& column_from_path = range.columns_from_path[it->second];
const char* data = column_from_path.c_str();
size_t size = column_from_path.size();
if (size == 4 && memcmp(data, "null", 4) == 0) {
data = const_cast<char*>("\\N");
}
_partition_col_descs.emplace(slot_desc->col_name(),
std::make_tuple(data, slot_desc));
}
}
}
if (!_missing_cols.empty()) {
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
continue;
}
auto it = _col_default_value_ctx.find(slot_desc->col_name());
if (it == _col_default_value_ctx.end()) {
return Status::InternalError("failed to find default value expr for slot: {}",
slot_desc->col_name());
}
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}
return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
}
Status VFileScanner::_init_expr_ctxes() {
DCHECK(!_ranges.empty());
std::map<SlotId, int> full_src_index_map;
std::map<SlotId, SlotDescriptor*> full_src_slot_map;
std::map<std::string, int> partition_name_to_key_index_map;
int index = 0;
for (const auto& slot_desc : _real_tuple_desc->slots()) {
full_src_slot_map.emplace(slot_desc->id(), slot_desc);
full_src_index_map.emplace(slot_desc->id(), index++);
}
// For external table query, find the index of column in path.
// Because query doesn't always search for all columns in a table
// and the order of selected columns is random.
// All ranges in _ranges vector should have identical columns_from_path_keys
// because they are all file splits for the same external table.
// So here use the first element of _ranges to fill the partition_name_to_key_index_map
if (_ranges[0].__isset.columns_from_path_keys) {
std::vector<std::string> key_map = _ranges[0].columns_from_path_keys;
if (!key_map.empty()) {
for (size_t i = 0; i < key_map.size(); i++) {
partition_name_to_key_index_map.emplace(key_map[i], i);
}
}
}
_num_of_columns_from_file = _params->num_of_columns_from_file;
for (const auto& slot_info : _params->required_slots) {
auto slot_id = slot_info.slot_id;
auto it = full_src_slot_map.find(slot_id);
if (it == std::end(full_src_slot_map)) {
return Status::InternalError(
fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
}
if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
_file_col_names.push_back(it->second->col_name());
if (it->second->col_unique_id() > 0) {
_col_id_name_map.emplace(it->second->col_unique_id(), it->second->col_name());
}
} else {
_partition_slot_descs.emplace_back(it->second);
if (_is_load) {
auto iti = full_src_index_map.find(slot_id);
_partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
} else {
auto kit = partition_name_to_key_index_map.find(it->second->col_name());
_partition_slot_index_map.emplace(slot_id, kit->second);
}
}
}
// set column name to default value expr map
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
vectorized::VExprContextSPtr ctx;
auto it = _params->default_value_of_src_slot.find(slot_desc->id());
if (it != std::end(_params->default_value_of_src_slot)) {
if (!it->second.nodes.empty()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
}
// if expr is empty, the default value will be null
_col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
}
}
if (_is_load) {
// follow desc expr map is only for load task.
bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
int idx = 0;
for (auto slot_desc : _output_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
auto it = _params->expr_of_dest_slot.find(slot_desc->id());
if (it == std::end(_params->expr_of_dest_slot)) {
return Status::InternalError("No expr for dest slot, id={}, name={}",
slot_desc->id(), slot_desc->col_name());
}
vectorized::VExprContextSPtr ctx;
if (!it->second.nodes.empty()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
}
_dest_vexpr_ctx.emplace_back(ctx);
_dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
if (has_slot_id_map) {
auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
_src_slot_descs_order_by_dest.emplace_back(nullptr);
} else {
auto _src_slot_it = full_src_slot_map.find(it1->second);
if (_src_slot_it == std::end(full_src_slot_map)) {
return Status::InternalError("No src slot {} in src slot descs",
it1->second);
}
_dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
full_src_index_map[_src_slot_it->first]);
_src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
}
}
}
}
return Status::OK();
}
Status VFileScanner::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
if (config::enable_file_cache && _state->query_options().enable_file_cache) {
io::FileCacheProfileReporter cache_profile(_profile);
cache_profile.update(_file_cache_statistics.get());
}
if (_cur_reader) {
RETURN_IF_ERROR(_cur_reader->close());
}
RETURN_IF_ERROR(VScanner::close(state));
return Status::OK();
}
void VFileScanner::try_stop() {
VScanner::try_stop();
if (_io_ctx) {
_io_ctx->should_stop = true;
}
}
} // namespace doris::vectorized