blob: a1a46d8565226bd03eb6766404bd4cd9fc5333f7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "format/orc/vorc_reader.h"
#include <cctz/civil_time_detail.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <algorithm>
#include <cctype>
#include <limits>
#include <list>
#include "exprs/vdirect_in_predicate.h"
#include "exprs/vexpr.h"
#include "exprs/vruntimefilter_wrapper.h"
#include "exprs/vslot_ref.h"
#include "exprs/vtopn_pred.h"
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <exception>
#include <iterator>
#include <map>
#include <memory>
#include <ostream>
#include <tuple>
#include <utility>
#include "absl/strings/substitute.h"
#include "cctz/civil_time.h"
#include "cctz/time_zone.h"
#include "common/consts.h"
#include "common/exception.h"
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column.h"
#include "core/column/column_array.h"
#include "core/column/column_const.h"
#include "core/column/column_map.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/column/column_struct.h"
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_map.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
#include "core/data_type/data_type_struct.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type/primitive_type.h"
#include "core/string_ref.h"
#include "core/types.h"
#include "core/value/decimalv2_value.h"
#include "core/value/vdatetime_value.h"
#include "exec/scan/file_scanner.h"
#include "exprs/create_predicate_function.h"
#include "exprs/hybrid_set.h"
#include "exprs/vbloom_predicate.h"
#include "exprs/vdirect_in_predicate.h"
#include "exprs/vectorized_fn_call.h"
#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
#include "exprs/vin_predicate.h"
#include "exprs/vruntimefilter_wrapper.h"
#include "format/orc/orc_file_reader.h"
#include "format/table/iceberg_reader.h"
#include "format/table/transactional_hive_common.h"
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "orc/Exceptions.hh"
#include "orc/Int128.hh"
#include "orc/MemoryPool.hh"
#include "orc/OrcFile.hh"
#include "orc/sargs/Literal.hh"
#include "orc/sargs/SearchArgument.hh"
#include "runtime/descriptors.h"
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "storage/id_manager.h"
#include "storage/segment/column_reader.h"
#include "storage/utils.h"
#include "util/slice.h"
#include "util/timezone_utils.h"
namespace doris {
class RuntimeState;
namespace io {
struct IOContext;
enum class FileCachePolicy : uint8_t;
} // namespace io
} // namespace doris
namespace doris {
#include "common/compile_check_begin.h"
namespace {
Status build_iceberg_rowid_column(const DataTypePtr& type, const std::string& file_path,
int64_t start_row, size_t num_rows, int32_t partition_spec_id,
const std::string& partition_data_json,
MutableColumnPtr* column_out) {
if (type == nullptr || column_out == nullptr) {
return Status::InvalidArgument("Invalid iceberg rowid column type or output column");
}
MutableColumnPtr column = type->create_column();
ColumnNullable* nullable_col = check_and_get_column<ColumnNullable>(column.get());
ColumnStruct* struct_col = nullptr;
if (nullable_col != nullptr) {
struct_col =
check_and_get_column<ColumnStruct>(nullable_col->get_nested_column_ptr().get());
} else {
struct_col = check_and_get_column<ColumnStruct>(column.get());
}
if (struct_col == nullptr || struct_col->tuple_size() < 4) {
return Status::InternalError("Invalid iceberg rowid column structure");
}
auto& file_path_col = struct_col->get_column(0);
auto& row_pos_col = struct_col->get_column(1);
auto& spec_id_col = struct_col->get_column(2);
auto& partition_data_col = struct_col->get_column(3);
file_path_col.reserve(num_rows);
row_pos_col.reserve(num_rows);
spec_id_col.reserve(num_rows);
partition_data_col.reserve(num_rows);
for (size_t i = 0; i < num_rows; ++i) {
file_path_col.insert_data(file_path.data(), file_path.size());
}
for (size_t i = 0; i < num_rows; ++i) {
int64_t row_pos = start_row + static_cast<int64_t>(i);
row_pos_col.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos));
}
for (size_t i = 0; i < num_rows; ++i) {
int32_t spec_id = partition_spec_id;
spec_id_col.insert_data(reinterpret_cast<const char*>(&spec_id), sizeof(spec_id));
}
for (size_t i = 0; i < num_rows; ++i) {
partition_data_col.insert_data(partition_data_json.data(), partition_data_json.size());
}
if (nullable_col != nullptr) {
nullable_col->get_null_map_data().resize_fill(num_rows, 0);
}
*column_out = std::move(column);
return Status::OK();
}
} // namespace
// TODO: we need to determine it by test.
static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max();
static constexpr char EMPTY_STRING_FOR_OVERFLOW[ColumnString::MAX_STRINGS_OVERFLOW_SIZE] = "";
// Because HIVE 0.11 & 0.12 does not support precision and scale for decimal
// The decimal type of orc file produced by HIVE 0.11 & 0.12 are DECIMAL(0,0)
// We should set a default precision and scale for these orc files.
static constexpr int decimal_precision_for_hive11 = BeConsts::MAX_DECIMAL128_PRECISION;
static constexpr int decimal_scale_for_hive11 = 10;
#define FOR_FLAT_ORC_COLUMNS(M) \
M(PrimitiveType::TYPE_TINYINT, Int8, orc::LongVectorBatch) \
M(PrimitiveType::TYPE_BOOLEAN, UInt8, orc::LongVectorBatch) \
M(PrimitiveType::TYPE_SMALLINT, Int16, orc::LongVectorBatch) \
M(PrimitiveType::TYPE_BIGINT, Int64, orc::LongVectorBatch) \
M(PrimitiveType::TYPE_FLOAT, Float32, orc::DoubleVectorBatch) \
M(PrimitiveType::TYPE_DOUBLE, Float64, orc::DoubleVectorBatch)
void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
uint64_t has_read = 0;
char* out = reinterpret_cast<char*>(buf);
while (has_read < length) {
if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
throw orc::ParseError("stop");
}
size_t loop_read;
Slice result(out + has_read, length - has_read);
Status st = _tracing_file_reader->read_at(offset + has_read, result, &loop_read, _io_ctx);
if (!st.ok()) {
throw orc::ParseError(
absl::Substitute("Failed to read $0: $1", _file_name, st.to_string()));
}
if (loop_read == 0) {
break;
}
has_read += loop_read;
}
if (has_read != length) {
throw orc::ParseError(absl::Substitute("Try to read $0 bytes from $1, actually read $2",
length, has_read, _file_name));
}
}
void StripeStreamInputStream::read(void* buf, uint64_t length, uint64_t offset) {
uint64_t has_read = 0;
char* out = reinterpret_cast<char*>(buf);
while (has_read < length) {
if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
throw orc::ParseError("stop");
}
size_t loop_read;
Slice result(out + has_read, length - has_read);
Status st = _inner_reader->read_at(offset + has_read, result, &loop_read, _io_ctx);
if (!st.ok()) {
throw orc::ParseError(
absl::Substitute("Failed to read $0: $1", _file_name, st.to_string()));
}
if (loop_read == 0) {
break;
}
has_read += loop_read;
}
if (has_read != length) {
throw orc::ParseError(absl::Substitute("Try to read $0 bytes from $1, actually read $2",
length, has_read, _file_name));
}
}
OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
FileMetaCache* meta_cache, bool enable_lazy_mat)
: _profile(profile),
_state(state),
_scan_params(params),
_scan_range(range),
_batch_size(std::max(batch_size, _MIN_BATCH_SIZE)),
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
_enable_filter_by_min_max(
state == nullptr ? true : state->query_options().enable_orc_filter_by_min_max),
_dict_cols_has_converted(false) {
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
_meta_cache = meta_cache;
_init_profile();
_init_system_properties();
_init_file_description();
}
OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
size_t batch_size, const std::string& ctz,
std::shared_ptr<io::IOContext> io_ctx_holder, FileMetaCache* meta_cache,
bool enable_lazy_mat)
: _profile(profile),
_state(state),
_scan_params(params),
_scan_range(range),
_batch_size(std::max(batch_size, _MIN_BATCH_SIZE)),
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz),
_io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
_io_ctx_holder(std::move(io_ctx_holder)),
_enable_lazy_mat(enable_lazy_mat),
_enable_filter_by_min_max(
state == nullptr ? true : state->query_options().enable_orc_filter_by_min_max),
_dict_cols_has_converted(false) {
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
_meta_cache = meta_cache;
_init_profile();
_init_system_properties();
_init_file_description();
}
OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* meta_cache,
bool enable_lazy_mat)
: _profile(nullptr),
_scan_params(params),
_scan_range(range),
_ctz(ctz),
_file_system(nullptr),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
_enable_filter_by_min_max(true),
_dict_cols_has_converted(false) {
_meta_cache = meta_cache;
_init_system_properties();
_init_file_description();
}
OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::string& ctz, std::shared_ptr<io::IOContext> io_ctx_holder,
FileMetaCache* meta_cache, bool enable_lazy_mat)
: _profile(nullptr),
_scan_params(params),
_scan_range(range),
_ctz(ctz),
_file_system(nullptr),
_io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
_io_ctx_holder(std::move(io_ctx_holder)),
_enable_lazy_mat(enable_lazy_mat),
_enable_filter_by_min_max(true),
_dict_cols_has_converted(false) {
_meta_cache = meta_cache;
_init_system_properties();
_init_file_description();
}
void OrcReader::_collect_profile_before_close() {
if (_profile != nullptr) {
COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time);
COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time);
COUNTER_UPDATE(_orc_profile.create_reader_time, _statistics.create_reader_time);
COUNTER_UPDATE(_orc_profile.init_column_time, _statistics.init_column_time);
COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time);
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
COUNTER_UPDATE(_orc_profile.predicate_filter_time, _statistics.predicate_filter_time);
COUNTER_UPDATE(_orc_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time);
COUNTER_UPDATE(_orc_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows);
COUNTER_UPDATE(_orc_profile.file_footer_read_calls, _statistics.file_footer_read_calls);
COUNTER_UPDATE(_orc_profile.file_footer_hit_cache, _statistics.file_footer_hit_cache);
if (_file_input_stream != nullptr) {
_file_input_stream->collect_profile_before_close();
}
}
}
int64_t OrcReader::size() const {
return _file_input_stream->getLength();
}
void OrcReader::_init_profile() {
if (_profile != nullptr) {
static const char* orc_profile = "OrcReader";
ADD_TIMER_WITH_LEVEL(_profile, orc_profile, 1);
_orc_profile.column_read_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime", orc_profile, 1);
_orc_profile.get_batch_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "GetBatchTime", orc_profile, 1);
_orc_profile.create_reader_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "CreateReaderTime", orc_profile, 1);
_orc_profile.init_column_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "InitColumnTime", orc_profile, 1);
_orc_profile.set_fill_column_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SetFillColumnTime", orc_profile, 1);
_orc_profile.decode_value_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", orc_profile, 1);
_orc_profile.decode_null_map_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime", orc_profile, 1);
_orc_profile.predicate_filter_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", orc_profile, 1);
_orc_profile.dict_filter_rewrite_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime", orc_profile, 1);
_orc_profile.lazy_read_filtered_rows =
ADD_COUNTER_WITH_LEVEL(_profile, "FilteredRowsByLazyRead", TUnit::UNIT, 1);
_orc_profile.selected_row_group_count =
ADD_COUNTER_WITH_LEVEL(_profile, "SelectedRowGroupCount", TUnit::UNIT, 1);
_orc_profile.evaluated_row_group_count =
ADD_COUNTER_WITH_LEVEL(_profile, "EvaluatedRowGroupCount", TUnit::UNIT, 1);
_orc_profile.file_footer_read_calls =
ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterReadCalls", TUnit::UNIT, 1);
_orc_profile.file_footer_hit_cache =
ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterHitCache", TUnit::UNIT, 1);
}
}
Status OrcReader::_create_file_reader() {
SCOPED_RAW_TIMER(&_statistics.create_reader_time);
if (_reader != nullptr) {
return Status::OK();
}
if (_file_input_stream == nullptr) {
_file_description.mtime =
_scan_range.__isset.modification_time ? _scan_range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
io::FileReaderSPtr inner_reader;
if (_io_ctx_holder != nullptr) {
inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options,
io::DelegateReader::AccessMode::RANDOM, _io_ctx_holder));
} else {
inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options,
io::DelegateReader::AccessMode::RANDOM, _io_ctx));
}
_file_input_stream = std::make_unique<ORCFileInputStream>(
_scan_range.path, std::move(inner_reader), _io_ctx, _profile,
_orc_once_max_read_bytes, _orc_max_merge_distance_bytes);
}
if (_file_input_stream->getLength() == 0) {
return Status::EndOfFile("empty orc file: " + _scan_range.path);
}
// create orc reader
orc::ReaderOptions options;
options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
options.setReaderMetrics(&_reader_metrics);
auto create_orc_reader = [&]() {
try {
_reader = orc::createReader(
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
} catch (std::exception& e) {
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
std::string _err_msg = e.what();
if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
return Status::EndOfFile("stop");
}
// one for fs, the other is for oss.
if (_err_msg.find("No such file or directory") != std::string::npos ||
_err_msg.find("NoSuchKey") != std::string::npos) {
return Status::NotFound(_err_msg);
}
return Status::InternalError("Init OrcReader failed. reason = {}", _err_msg);
}
return Status::OK();
};
if (_meta_cache == nullptr) {
_statistics.file_footer_read_calls++;
RETURN_IF_ERROR(create_orc_reader());
} else {
auto inner_file_reader = _file_input_stream->get_inner_reader();
const auto& file_meta_cache_key =
FileMetaCache::get_key(inner_file_reader, _file_description);
// Local variables can be required because setSerializedFileTail is an assignment operation, not a reference.
ObjLRUCache::CacheHandle _meta_cache_handle;
if (_meta_cache->lookup(file_meta_cache_key, &_meta_cache_handle)) {
const std::string* footer_ptr = _meta_cache_handle.data<String>();
options.setSerializedFileTail(*footer_ptr);
RETURN_IF_ERROR(create_orc_reader());
_statistics.file_footer_hit_cache++;
} else {
_statistics.file_footer_read_calls++;
RETURN_IF_ERROR(create_orc_reader());
std::string* footer_ptr = new std::string {_reader->getSerializedFileTail()};
_meta_cache->insert(file_meta_cache_key, footer_ptr, &_meta_cache_handle);
}
}
return Status::OK();
}
Status OrcReader::init_reader(
const std::vector<std::string>* column_names,
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr,
const std::set<uint64_t>& column_ids, const std::set<uint64_t>& filter_column_ids) {
_table_column_names = column_names;
_col_name_to_block_idx = col_name_to_block_idx;
_lazy_read_ctx.conjuncts = conjuncts;
_is_acid = is_acid;
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_table_info_node_ptr = table_info_node_ptr;
_column_ids = column_ids;
_filter_column_ids = filter_column_ids;
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
}
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_obj_pool = std::make_unique<ObjectPool>();
if (_state != nullptr) {
_orc_tiny_stripe_threshold_bytes = _state->query_options().orc_tiny_stripe_threshold_bytes;
_orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes;
_orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes;
}
RETURN_IF_ERROR(_create_file_reader());
RETURN_IF_ERROR(_init_read_columns());
return Status::OK();
}
// init file reader for parsing schema
Status OrcReader::init_schema_reader() {
return _create_file_reader();
}
Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
const auto& root_type = _is_acid ? remove_acid(_reader->getType()) : _reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
col_names->emplace_back(root_type.getFieldName(i));
col_types->emplace_back(convert_to_doris_type(root_type.getSubtype(i)));
}
return Status::OK();
}
void OrcReader::set_iceberg_rowid_params(const std::string& file_path, int32_t partition_spec_id,
const std::string& partition_data_json,
int row_id_column_pos) {
_iceberg_rowid_params.enabled = true;
_iceberg_rowid_params.file_path = file_path;
_iceberg_rowid_params.partition_spec_id = partition_spec_id;
_iceberg_rowid_params.partition_data_json = partition_data_json;
_iceberg_rowid_params.row_id_column_pos = row_id_column_pos;
}
Status OrcReader::_init_read_columns() {
SCOPED_RAW_TIMER(&_statistics.init_column_time);
const auto& root_type = _reader->getType();
// Build column ID to file type mapping for all columns in file
_column_id_to_file_type.clear();
std::function<void(const orc::Type*, int)> build_column_id_map = [&](const orc::Type* type,
int depth) {
if (type == nullptr) return;
uint64_t col_id = type->getColumnId();
_column_id_to_file_type[col_id] = type;
std::string indent(depth * 2, ' ');
VLOG_DEBUG << indent << "[OrcReader] Mapping column_id=" << col_id
<< ", kind=" << static_cast<int>(type->getKind())
<< ", subtype_count=" << type->getSubtypeCount();
for (uint64_t i = 0; i < type->getSubtypeCount(); ++i) {
build_column_id_map(type->getSubtype(i), depth + 1);
}
};
VLOG_DEBUG << "[OrcReader] Building column ID to file type mapping...";
build_column_id_map(&root_type, 0);
VLOG_DEBUG << "[OrcReader] Total mapped columns: " << _column_id_to_file_type.size();
if (_is_acid) {
for (uint64_t i = 0; i < root_type.getSubtypeCount(); ++i) {
if (root_type.getSubtype(i)->getKind() == orc::TypeKind::STRUCT) {
auto row_orc_type = root_type.getSubtype(i);
for (uint64_t j = 0; j < row_orc_type->getSubtypeCount(); j++) {
_type_map.emplace(TransactionalHive::ROW + "." + row_orc_type->getFieldName(j),
row_orc_type->getSubtype(j));
}
} else {
_type_map.emplace(root_type.getFieldName(i), root_type.getSubtype(i));
}
}
} else {
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
_type_map.emplace(root_type.getFieldName(i), root_type.getSubtype(i));
}
}
for (size_t i = 0; i < _table_column_names->size(); ++i) {
const auto& table_column_name = (*_table_column_names)[i];
if (!_table_info_node_ptr->children_column_exists(table_column_name)) {
_missing_cols.emplace_back(table_column_name);
continue;
}
const auto file_column_name =
_table_info_node_ptr->children_file_column_name(table_column_name);
_read_file_cols.emplace_back(file_column_name);
_read_table_cols.emplace_back(table_column_name);
}
return Status::OK();
}
bool OrcReader::_check_acid_schema(const orc::Type& type) {
if (orc::TypeKind::STRUCT == type.getKind()) {
if (type.getSubtypeCount() != TransactionalHive::ACID_COLUMN_NAMES.size()) {
return false;
}
for (uint64_t i = 0; i < type.getSubtypeCount(); ++i) {
const std::string& field_name = type.getFieldName(i);
std::string field_name_lower_case = field_name;
std::transform(field_name.begin(), field_name.end(), field_name_lower_case.begin(),
[](unsigned char c) { return std::tolower(c); });
if (field_name_lower_case != TransactionalHive::ACID_COLUMN_NAMES_LOWER_CASE[i]) {
return false;
}
}
} else {
return false;
}
return true;
}
const orc::Type& OrcReader::remove_acid(const orc::Type& type) {
if (_check_acid_schema(type)) {
return *(type.getSubtype(TransactionalHive::ROW_OFFSET));
} else {
return type;
}
}
// orc only support LONG, FLOAT, STRING, DATE, DECIMAL, TIMESTAMP, BOOLEAN to push down predicates
static std::unordered_map<orc::TypeKind, orc::PredicateDataType> TYPEKIND_TO_PREDICATE_TYPE = {
{orc::TypeKind::BYTE, orc::PredicateDataType::LONG},
{orc::TypeKind::SHORT, orc::PredicateDataType::LONG},
{orc::TypeKind::INT, orc::PredicateDataType::LONG},
{orc::TypeKind::LONG, orc::PredicateDataType::LONG},
{orc::TypeKind::FLOAT, orc::PredicateDataType::FLOAT},
{orc::TypeKind::DOUBLE, orc::PredicateDataType::FLOAT},
{orc::TypeKind::STRING, orc::PredicateDataType::STRING},
{orc::TypeKind::BINARY, orc::PredicateDataType::STRING},
// should not pust down CHAR type, because CHAR type is fixed length and will be padded
// {orc::TypeKind::CHAR, orc::PredicateDataType::STRING},
{orc::TypeKind::VARCHAR, orc::PredicateDataType::STRING},
{orc::TypeKind::DATE, orc::PredicateDataType::DATE},
{orc::TypeKind::DECIMAL, orc::PredicateDataType::DECIMAL},
{orc::TypeKind::TIMESTAMP, orc::PredicateDataType::TIMESTAMP},
{orc::TypeKind::BOOLEAN, orc::PredicateDataType::BOOLEAN}};
template <PrimitiveType primitive_type>
std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,
StringRef& literal_data, int precision,
int scale) {
const auto* value = literal_data.data;
try {
switch (type->getKind()) {
case orc::TypeKind::BOOLEAN: {
if (primitive_type != TYPE_BOOLEAN) {
return std::make_tuple(false, orc::Literal(false));
}
return std::make_tuple(true, orc::Literal(bool(*((uint8_t*)value))));
}
case orc::TypeKind::BYTE:
case orc::TypeKind::SHORT:
case orc::TypeKind::INT:
case orc::TypeKind::LONG: {
if constexpr (primitive_type == TYPE_TINYINT) {
return std::make_tuple(true, orc::Literal(int64_t(*((int8_t*)value))));
} else if constexpr (primitive_type == TYPE_SMALLINT) {
return std::make_tuple(true, orc::Literal(int64_t(*((int16_t*)value))));
} else if constexpr (primitive_type == TYPE_INT) {
return std::make_tuple(true, orc::Literal(int64_t(*((int32_t*)value))));
} else if constexpr (primitive_type == TYPE_BIGINT) {
return std::make_tuple(true, orc::Literal(int64_t(*((int64_t*)value))));
}
return std::make_tuple(false, orc::Literal(false));
}
case orc::TypeKind::FLOAT: {
if constexpr (primitive_type == TYPE_FLOAT) {
return std::make_tuple(true, orc::Literal(double(*((float*)value))));
} else if constexpr (primitive_type == TYPE_DOUBLE) {
return std::make_tuple(true, orc::Literal(double(*((double*)value))));
}
return std::make_tuple(false, orc::Literal(false));
}
case orc::TypeKind::DOUBLE: {
if (primitive_type == TYPE_DOUBLE) {
return std::make_tuple(true, orc::Literal(*((double*)value)));
}
return std::make_tuple(false, orc::Literal(false));
}
case orc::TypeKind::STRING:
[[fallthrough]];
case orc::TypeKind::BINARY:
[[fallthrough]];
// should not pust down CHAR type, because CHAR type is fixed length and will be padded
// case orc::TypeKind::CHAR:
// [[fallthrough]];
case orc::TypeKind::VARCHAR: {
if (primitive_type == TYPE_STRING || primitive_type == TYPE_CHAR ||
primitive_type == TYPE_VARCHAR) {
return std::make_tuple(true, orc::Literal(literal_data.data, literal_data.size));
}
return std::make_tuple(false, orc::Literal(false));
}
case orc::TypeKind::DECIMAL: {
int128_t decimal_value;
if constexpr (primitive_type == TYPE_DECIMALV2) {
decimal_value = *reinterpret_cast<const int128_t*>(value);
precision = DecimalV2Value::PRECISION;
scale = DecimalV2Value::SCALE;
} else if constexpr (primitive_type == TYPE_DECIMAL32) {
decimal_value = *((int32_t*)value);
} else if constexpr (primitive_type == TYPE_DECIMAL64) {
decimal_value = *((int64_t*)value);
} else if constexpr (primitive_type == TYPE_DECIMAL128I) {
decimal_value = *((int128_t*)value);
} else {
return std::make_tuple(false, orc::Literal(false));
}
return std::make_tuple(true, orc::Literal(orc::Int128(uint64_t(decimal_value >> 64),
uint64_t(decimal_value)),
precision, scale));
}
case orc::TypeKind::DATE: {
int64_t day_offset;
static const cctz::time_zone utc0 = cctz::utc_time_zone();
if constexpr (primitive_type == TYPE_DATE) {
const VecDateTimeValue date_v1 = *reinterpret_cast<const VecDateTimeValue*>(value);
cctz::civil_day civil_date(date_v1.year(), date_v1.month(), date_v1.day());
day_offset =
cctz::convert(civil_date, utc0).time_since_epoch().count() / (24 * 60 * 60);
} else if (primitive_type == TYPE_DATEV2) {
const DateV2Value<DateV2ValueType> date_v2 =
*reinterpret_cast<const DateV2Value<DateV2ValueType>*>(value);
cctz::civil_day civil_date(date_v2.year(), date_v2.month(), date_v2.day());
day_offset =
cctz::convert(civil_date, utc0).time_since_epoch().count() / (24 * 60 * 60);
} else {
return std::make_tuple(false, orc::Literal(false));
}
return std::make_tuple(true, orc::Literal(orc::PredicateDataType::DATE, day_offset));
}
case orc::TypeKind::TIMESTAMP: {
int64_t seconds;
int32_t nanos;
static const cctz::time_zone utc0 = cctz::utc_time_zone();
// TODO: ColumnValueRange has lost the precision of microsecond
if constexpr (primitive_type == TYPE_DATETIME) {
const VecDateTimeValue datetime_v1 =
*reinterpret_cast<const VecDateTimeValue*>(value);
cctz::civil_second civil_seconds(datetime_v1.year(), datetime_v1.month(),
datetime_v1.day(), datetime_v1.hour(),
datetime_v1.minute(), datetime_v1.second());
seconds = cctz::convert(civil_seconds, utc0).time_since_epoch().count();
nanos = 0;
} else if (primitive_type == TYPE_DATETIMEV2) {
const DateV2Value<DateTimeV2ValueType> datetime_v2 =
*reinterpret_cast<const DateV2Value<DateTimeV2ValueType>*>(value);
cctz::civil_second civil_seconds(datetime_v2.year(), datetime_v2.month(),
datetime_v2.day(), datetime_v2.hour(),
datetime_v2.minute(), datetime_v2.second());
seconds = cctz::convert(civil_seconds, utc0).time_since_epoch().count();
nanos = datetime_v2.microsecond() * 1000;
} else {
return std::make_tuple(false, orc::Literal(false));
}
return std::make_tuple(true, orc::Literal(seconds, nanos));
}
default:
return std::make_tuple(false, orc::Literal(false));
}
} catch (Exception& e) {
// When table schema changed, and using new schema to read old data.
LOG(WARNING) << "Failed to convert doris value to orc predicate literal, error = "
<< e.what();
return std::make_tuple(false, orc::Literal(false));
}
}
std::pair<bool, orc::PredicateDataType> OrcReader::_get_orc_predicate_type(
const VSlotRef* slot_ref) {
DCHECK(_table_info_node_ptr->children_column_exists(slot_ref->expr_name()));
auto file_col_name = _table_info_node_ptr->children_file_column_name(slot_ref->expr_name());
if (!_type_map.contains(file_col_name)) {
LOG(WARNING) << "Column " << slot_ref->expr_name() << "in file name" << file_col_name
<< " not found in _type_map";
return {false, orc::PredicateDataType::LONG};
}
DCHECK(_type_map.contains(file_col_name));
const auto* orc_type = _type_map[file_col_name];
if (!TYPEKIND_TO_PREDICATE_TYPE.contains(orc_type->getKind())) {
LOG(WARNING) << "Unsupported Push Down Orc Type [TypeKind=" << orc_type->getKind() << "]";
return {false, orc::PredicateDataType::LONG};
}
const auto predicate_type = TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
return {true, predicate_type};
}
std::pair<bool, orc::Literal> OrcReader::_make_orc_literal(const VSlotRef* slot_ref,
const VLiteral* literal) {
// Get predicate type using new function
auto [valid_pred_type, predicate_type] = _get_orc_predicate_type(slot_ref);
if (!valid_pred_type) {
return {false, orc::Literal(false)};
}
// Get the orc_type again here as it's needed for convert_to_orc_literal
auto file_col_name = _table_info_node_ptr->children_file_column_name(slot_ref->expr_name());
const auto* orc_type = _type_map[file_col_name];
DCHECK(literal != nullptr);
// this only happens when the literals of in_predicate contains null value, like in (1, null)
if (literal->get_column_ptr()->is_null_at(0)) {
return {false, orc::Literal(false)};
}
auto literal_data = literal->get_column_ptr()->get_data_at(0);
auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
auto slot_type = slot->type();
auto primitive_type = slot_type->get_primitive_type();
auto src_type = convert_to_doris_type(orc_type)->get_primitive_type();
// should not down predicate for string type change from other type
if (src_type != primitive_type && !is_string_type(src_type) && is_string_type(primitive_type)) {
LOG(WARNING) << "Unsupported Push Down Schema Changed Column " << primitive_type << " to "
<< src_type;
return {false, orc::Literal(false)};
}
switch (primitive_type) {
#define M(NAME) \
case TYPE_##NAME: { \
auto [valid, orc_literal] = convert_to_orc_literal<TYPE_##NAME>( \
orc_type, literal_data, slot_type->get_precision(), slot_type->get_scale()); \
return {valid, orc_literal}; \
}
#define APPLY_FOR_PRIMITIVE_TYPE(M) \
M(TINYINT) \
M(SMALLINT) \
M(INT) \
M(BIGINT) \
M(LARGEINT) \
M(DATE) \
M(DATETIME) \
M(DATEV2) \
M(DATETIMEV2) \
M(VARCHAR) \
M(STRING) \
M(HLL) \
M(DECIMAL32) \
M(DECIMAL64) \
M(DECIMAL128I) \
M(DECIMAL256) \
M(DECIMALV2) \
M(BOOLEAN) \
M(IPV4) \
M(IPV6)
APPLY_FOR_PRIMITIVE_TYPE(M)
#undef M
default: {
VLOG_CRITICAL << "Unsupported Convert Orc Literal [ColName=" << slot->col_name() << "]";
return {false, orc::Literal(false)};
}
}
}
// check if the slot of expr can be pushed down to orc reader and make orc predicate type
bool OrcReader::_check_slot_can_push_down(const VExprSPtr& expr) {
if (!expr->children()[0]->is_slot_ref()) {
return false;
}
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
// check if the slot exists in orc file and not partition column
if (_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name()) ||
(!_table_info_node_ptr->children_column_exists(slot_ref->expr_name()))) {
return false;
}
// Directly use _get_orc_predicate_type since we only need the type
auto [valid, predicate_type] = _get_orc_predicate_type(slot_ref);
if (valid) {
_vslot_ref_to_orc_predicate_data_type[slot_ref] = predicate_type;
}
return valid;
}
// check if the literal of expr can be pushed down to orc reader and make orc literal
bool OrcReader::_check_literal_can_push_down(const VExprSPtr& expr, size_t child_id) {
if (!expr->children()[child_id]->is_literal()) {
return false;
}
// the slot has been checked in _check_slot_can_push_down before calling this function
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[child_id].get());
auto [valid, orc_literal] = _make_orc_literal(slot_ref, literal);
if (valid) {
_vliteral_to_orc_literal.insert(std::make_pair(literal, orc_literal));
}
return valid;
}
// check if there are rest children of expr can be pushed down to orc reader
bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {
if (expr->children().size() < 2) {
return false;
}
bool at_least_one_child_can_push_down = false;
for (size_t i = 1; i < expr->children().size(); ++i) {
if (_check_literal_can_push_down(expr, i)) {
at_least_one_child_can_push_down = true;
}
}
return at_least_one_child_can_push_down;
}
// check if the expr can be pushed down to orc reader
bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
if (expr == nullptr) {
return false;
}
switch (expr->op()) {
case TExprOpcode::COMPOUND_AND:
// at least one child can be pushed down
return std::ranges::any_of(expr->children(), [this](const auto& child) {
return _check_expr_can_push_down(child);
});
case TExprOpcode::COMPOUND_OR:
// all children must be pushed down
return std::ranges::all_of(expr->children(), [this](const auto& child) {
return _check_expr_can_push_down(child);
});
case TExprOpcode::COMPOUND_NOT:
DCHECK_EQ(expr->children().size(), 1);
return _check_expr_can_push_down(expr->children()[0]);
case TExprOpcode::GE:
case TExprOpcode::GT:
case TExprOpcode::LE:
case TExprOpcode::LT:
case TExprOpcode::EQ:
case TExprOpcode::NE:
case TExprOpcode::FILTER_IN:
case TExprOpcode::FILTER_NOT_IN:
// can't push down if expr is null aware predicate
return expr->node_type() != TExprNodeType::NULL_AWARE_BINARY_PRED &&
expr->node_type() != TExprNodeType::NULL_AWARE_IN_PRED &&
_check_slot_can_push_down(expr) && _check_rest_children_can_push_down(expr);
case TExprOpcode::INVALID_OPCODE:
if (expr->node_type() == TExprNodeType::FUNCTION_CALL) {
auto fn_name = expr->fn().name.function_name;
// only support is_null_pred and is_not_null_pred
if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
return _check_slot_can_push_down(expr);
}
VLOG_CRITICAL << "Unsupported function [funciton=" << fn_name << "]";
}
return false;
default:
VLOG_CRITICAL << "Unsupported Opcode [OpCode=" << expr->op() << "]";
return false;
}
}
void OrcReader::_build_less_than(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->lessThan(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
predicate_type, orc_literal);
}
void OrcReader::_build_less_than_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->lessThanEquals(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
predicate_type, orc_literal);
}
void OrcReader::_build_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->equals(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
predicate_type, orc_literal);
}
void OrcReader::_build_filter_in(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() >= 2);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
std::vector<orc::Literal> literals;
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
orc::PredicateDataType predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
for (size_t i = 1; i < expr->children().size(); ++i) {
DCHECK(expr->children()[i]->is_literal());
const auto* literal = static_cast<const VLiteral*>(expr->children()[i].get());
if (_vliteral_to_orc_literal.contains(literal)) {
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
literals.emplace_back(orc_literal);
}
}
DCHECK(!literals.empty());
if (literals.size() == 1) {
builder->equals(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
predicate_type, literals[0]);
} else {
builder->in(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
predicate_type, literals);
}
}
void OrcReader::_build_is_null(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 1);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
builder->isNull(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
predicate_type);
}
bool OrcReader::_build_search_argument(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
// OPTIMIZE: check expr only once
if (!_check_expr_can_push_down(expr)) {
return false;
}
switch (expr->op()) {
case TExprOpcode::COMPOUND_AND: {
builder->startAnd();
bool at_least_one_can_push_down = false;
for (const auto& child : expr->children()) {
if (_build_search_argument(child, builder)) {
at_least_one_can_push_down = true;
}
}
DCHECK(at_least_one_can_push_down);
builder->end();
break;
}
case TExprOpcode::COMPOUND_OR: {
builder->startOr();
bool all_can_push_down = true;
for (const auto& child : expr->children()) {
if (!_build_search_argument(child, builder)) {
all_can_push_down = false;
}
}
DCHECK(all_can_push_down);
builder->end();
break;
}
case TExprOpcode::COMPOUND_NOT: {
DCHECK_EQ(expr->children().size(), 1);
builder->startNot();
auto res = _build_search_argument(expr->children()[0], builder);
DCHECK(res);
builder->end();
break;
}
case TExprOpcode::GE:
builder->startNot();
_build_less_than(expr, builder);
builder->end();
break;
case TExprOpcode::GT:
builder->startNot();
_build_less_than_equals(expr, builder);
builder->end();
break;
case TExprOpcode::LE:
_build_less_than_equals(expr, builder);
break;
case TExprOpcode::LT:
_build_less_than(expr, builder);
break;
case TExprOpcode::EQ:
_build_equals(expr, builder);
break;
case TExprOpcode::NE:
builder->startNot();
_build_equals(expr, builder);
builder->end();
break;
case TExprOpcode::FILTER_IN:
_build_filter_in(expr, builder);
break;
case TExprOpcode::FILTER_NOT_IN:
builder->startNot();
_build_filter_in(expr, builder);
builder->end();
break;
// is null and is not null is represented as function call
case TExprOpcode::INVALID_OPCODE:
DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
if (expr->fn().name.function_name == "is_null_pred") {
_build_is_null(expr, builder);
} else if (expr->fn().name.function_name == "is_not_null_pred") {
builder->startNot();
_build_is_null(expr, builder);
builder->end();
} else {
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
break;
default:
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
return true;
}
bool OrcReader::_init_search_argument(const VExprSPtrs& exprs) {
// build search argument, if any expr can not be pushed down, return false
auto builder = orc::SearchArgumentFactory::newBuilder();
bool at_least_one_can_push_down = false;
builder->startAnd();
for (const auto& expr : exprs) {
_vslot_ref_to_orc_predicate_data_type.clear();
_vliteral_to_orc_literal.clear();
if (_build_search_argument(expr, builder)) {
at_least_one_can_push_down = true;
}
}
if (!at_least_one_can_push_down) {
// if all exprs can not be pushed down, builder->end() will throw exception
return false;
}
builder->end();
auto sargs = builder->build();
_profile->add_info_string("OrcReader SearchArgument: ", sargs->toString());
_row_reader_options.searchArgument(std::move(sargs));
return true;
}
Status OrcReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
SCOPED_RAW_TIMER(&_statistics.set_fill_column_time);
// std::unordered_map<column_name, std::pair<col_id, slot_id>>
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_table_columns;
// visit_slot for lazy mat.
std::function<void(VExpr * expr)> visit_slot = [&](VExpr* expr) {
if (expr->is_slot_ref()) {
VSlotRef* slot_ref = static_cast<VSlotRef*>(expr);
auto expr_name = slot_ref->expr_name();
predicate_table_columns.emplace(
expr_name, std::make_pair(slot_ref->column_id(), slot_ref->slot_id()));
if (slot_ref->column_id() == 0) {
_lazy_read_ctx.resize_first_column = false;
}
return;
}
for (auto& child : expr->children()) {
visit_slot(child.get());
}
};
for (const auto& conjunct : _lazy_read_ctx.conjuncts) {
auto expr = conjunct->root();
if (expr->is_rf_wrapper()) {
// REF: src/runtime_filter/runtime_filter_consumer.cpp
auto* runtime_filter = static_cast<VRuntimeFilterWrapper*>(expr.get());
auto filter_impl = runtime_filter->get_impl();
visit_slot(filter_impl.get());
// only support push down for filter row group : MAX_FILTER, MAX_FILTER, MINMAX_FILTER, IN_FILTER
if ((runtime_filter->node_type() == TExprNodeType::BINARY_PRED) &&
(runtime_filter->op() == TExprOpcode::GE ||
runtime_filter->op() == TExprOpcode::LE)) {
expr = filter_impl;
} else if (runtime_filter->node_type() == TExprNodeType::IN_PRED &&
runtime_filter->op() == TExprOpcode::FILTER_IN) {
auto* direct_in_predicate = static_cast<VDirectInPredicate*>(filter_impl.get());
int max_in_size =
_state->query_options().__isset.max_pushdown_conditions_per_column
? _state->query_options().max_pushdown_conditions_per_column
: 1024;
if (direct_in_predicate->get_set_func()->size() == 0 ||
direct_in_predicate->get_set_func()->size() > max_in_size) {
continue;
}
VExprSPtr new_in_slot = nullptr;
if (direct_in_predicate->get_slot_in_expr(new_in_slot)) {
expr = new_in_slot;
} else {
continue;
}
} else {
continue;
}
} else if (VTopNPred* topn_pred = typeid_cast<VTopNPred*>(expr.get())) {
// top runtime filter : only le && ge.
DCHECK(topn_pred->children().size() > 0);
visit_slot(topn_pred->children()[0].get());
VExprSPtr binary_expr;
if (topn_pred->get_binary_expr(binary_expr)) {
// for min-max filter.
expr = binary_expr;
} else {
continue;
}
} else {
visit_slot(expr.get());
}
if (_check_expr_can_push_down(expr)) {
_push_down_exprs.emplace_back(expr);
}
}
if (_is_acid) {
_lazy_read_ctx.predicate_orc_columns.insert(
_lazy_read_ctx.predicate_orc_columns.end(),
TransactionalHive::READ_ROW_COLUMN_NAMES.begin(),
TransactionalHive::READ_ROW_COLUMN_NAMES.end());
}
auto check_iceberg_row_lineage_column_idx = [&](const auto& col_name) -> int {
if (_row_lineage_columns != nullptr) {
if (col_name == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
return _row_lineage_columns->row_id_column_idx;
} else if (col_name == IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
return _row_lineage_columns->last_updated_sequence_number_column_idx;
}
}
return -1;
};
for (auto& read_table_col : _read_table_cols) {
_lazy_read_ctx.all_read_columns.emplace_back(read_table_col);
if (!predicate_table_columns.empty()) {
auto iter = predicate_table_columns.find(read_table_col);
if (iter == predicate_table_columns.end()) {
if (!_is_acid ||
std::find(TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end(),
read_table_col) ==
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end()) {
_lazy_read_ctx.lazy_read_columns.emplace_back(read_table_col);
}
} else {
_lazy_read_ctx.predicate_columns.first.emplace_back(iter->first);
_lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);
_lazy_read_ctx.predicate_orc_columns.emplace_back(
_table_info_node_ptr->children_file_column_name(iter->first));
if (check_iceberg_row_lineage_column_idx(read_table_col) != -1) {
// Todo : enable lazy mat where filter iceberg row lineage column.
_enable_lazy_mat = false;
}
}
}
}
for (const auto& kv : partition_columns) {
auto iter = predicate_table_columns.find(kv.first);
if (iter == predicate_table_columns.end()) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
} else {
_lazy_read_ctx.predicate_partition_columns.emplace(kv.first, kv.second);
}
}
for (const auto& kv : missing_columns) {
auto iter = predicate_table_columns.find(kv.first);
if (iter == predicate_table_columns.end()) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
} else {
//For check missing column : missing column == xx, missing column is null,missing column is not null.
if (_slot_id_to_filter_conjuncts->find(iter->second.second) !=
_slot_id_to_filter_conjuncts->end()) {
for (const auto& ctx :
_slot_id_to_filter_conjuncts->find(iter->second.second)->second) {
_filter_conjuncts.emplace_back(ctx); // todo ??????
}
}
// predicate_missing_columns is VLiteral.To fill in default values for missing columns.
_lazy_read_ctx.predicate_missing_columns.emplace(kv.first, kv.second);
if (check_iceberg_row_lineage_column_idx(kv.first) != -1) {
_enable_lazy_mat = false;
}
}
}
if (_enable_lazy_mat && !_lazy_read_ctx.predicate_columns.first.empty() &&
!_lazy_read_ctx.lazy_read_columns.empty()) {
_lazy_read_ctx.can_lazy_read = true;
}
if (_lazy_read_ctx.conjuncts.empty()) {
_lazy_read_ctx.can_lazy_read = false;
} else if (_enable_filter_by_min_max) {
auto res = _init_search_argument(_push_down_exprs);
if (_state->query_options().check_orc_init_sargs_success && !res) {
std::stringstream ss;
for (const auto& conjunct : _lazy_read_ctx.conjuncts) {
ss << conjunct->root()->debug_string() << "\n";
}
std::string conjuncts_str = ss.str();
return Status::InternalError(
"Session variable check_orc_init_sargs_success is set, but "
"_init_search_argument returns false because all exprs can not be pushed "
"down:\n " +
conjuncts_str);
}
}
try {
_row_reader_options.range(_range_start_offset, _range_size);
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
if (!_column_ids.empty()) {
std::list<uint64_t> column_ids_list(_column_ids.begin(), _column_ids.end());
_row_reader_options.includeTypes(column_ids_list);
} else { // If column_ids is empty, include all top-level columns to be read.
_row_reader_options.include(_read_file_cols);
}
_row_reader_options.setEnableLazyDecoding(true);
//orc reader should not use the tiny stripe optimization when reading by row id.
if (!_read_by_rows) {
uint64_t number_of_stripes = _reader->getNumberOfStripes();
auto all_stripes_needed = _reader->getNeedReadStripes(_row_reader_options);
int64_t range_end_offset = _range_start_offset + _range_size;
bool all_tiny_stripes = true;
std::vector<io::PrefetchRange> tiny_stripe_ranges;
for (uint64_t i = 0; i < number_of_stripes; i++) {
std::unique_ptr<orc::StripeInformation> strip_info = _reader->getStripe(i);
uint64_t strip_start_offset = strip_info->getOffset();
uint64_t strip_end_offset = strip_start_offset + strip_info->getLength();
if (strip_start_offset >= range_end_offset ||
strip_end_offset < _range_start_offset || !all_stripes_needed[i]) {
continue;
}
if (strip_info->getLength() > _orc_tiny_stripe_threshold_bytes) {
all_tiny_stripes = false;
break;
}
tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset);
}
if (all_tiny_stripes && number_of_stripes > 0) {
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
_orc_max_merge_distance_bytes,
_orc_once_max_read_bytes);
auto range_finder = std::make_shared<io::LinearProbeRangeFinder>(
std::move(prefetch_merge_ranges));
auto* orc_input_stream_ptr = static_cast<ORCFileInputStream*>(_reader->getStream());
orc_input_stream_ptr->set_all_tiny_stripes();
auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
orc_file_reader = std::make_shared<io::RangeCacheFileReader>(
_profile, orc_inner_reader, range_finder);
}
}
if (!_lazy_read_ctx.can_lazy_read) {
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
}
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
}
}
_fill_all_columns = true;
// create orc row reader
if (_lazy_read_ctx.can_lazy_read) {
_row_reader_options.filter(_lazy_read_ctx.predicate_orc_columns);
_orc_filter = std::make_unique<ORCFilterImpl>(this);
}
if (!_lazy_read_ctx.conjuncts.empty()) {
_string_dict_filter = std::make_unique<StringDictFilterImpl>(this);
}
_row_reader = _reader->createRowReader(_row_reader_options, _orc_filter.get(),
_string_dict_filter.get());
_batch = _row_reader->createRowBatch(_batch_size);
// Derive the first row in this scan range from ORC RowReader's initial state.
// getRowNumber() returns firstRowOfStripe[firstStripe]-1, or uint64_max if firstStripe==0.
uint64_t row_num = _row_reader->getRowNumber();
_first_row_in_range = (row_num == std::numeric_limits<uint64_t>::max()) ? 0 : row_num + 1;
_current_read_position = _first_row_in_range;
const auto& selected_type = _row_reader->getSelectedType();
int idx = 0;
if (_is_acid) {
for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
auto sub_type = selected_type.getSubtype(i);
if (sub_type->getKind() == orc::TypeKind::STRUCT) {
for (int j = 0; j < sub_type->getSubtypeCount(); ++j) {
_colname_to_idx[TransactionalHive::ROW + "." + sub_type->getFieldName(j)] =
idx++;
}
} else {
_colname_to_idx[selected_type.getFieldName(i)] = idx++;
}
}
} else {
for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
_colname_to_idx[selected_type.getFieldName(i)] = idx++;
}
}
_type_map.clear();
if (_is_acid) {
for (uint64_t i = 0; i < selected_type.getSubtypeCount(); ++i) {
if (selected_type.getSubtype(i)->getKind() == orc::TypeKind::STRUCT) {
auto row_orc_type = selected_type.getSubtype(i);
for (uint64_t j = 0; j < row_orc_type->getSubtypeCount(); j++) {
std::string field_name =
TransactionalHive::ROW + "." + row_orc_type->getFieldName(j);
_type_map.emplace(field_name, row_orc_type->getSubtype(j));
}
} else {
std::string field_name = selected_type.getFieldName(i);
_type_map.emplace(field_name, selected_type.getSubtype(i));
}
}
} else {
for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
std::string field_name = selected_type.getFieldName(i);
_type_map.emplace(field_name, selected_type.getSubtype(i));
}
}
_remaining_rows = _row_reader->getNumberOfRows();
} catch (std::exception& e) {
std::string _err_msg = e.what();
// ignore stop exception
if (!(_io_ctx && _io_ctx->should_stop && _err_msg == "stop")) {
return Status::InternalError("Failed to create orc row reader. reason = {}", _err_msg);
}
}
if (!_not_single_slot_filter_conjuncts.empty()) {
_filter_conjuncts.insert(_filter_conjuncts.end(), _not_single_slot_filter_conjuncts.begin(),
_not_single_slot_filter_conjuncts.end());
_disable_dict_filter = true;
}
if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) {
// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
auto& [value, slot_desc] = kv.second;
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
if (iter != _slot_id_to_filter_conjuncts->end()) {
for (const auto& ctx : iter->second) {
_filter_conjuncts.push_back(ctx);
}
}
}
}
return Status::OK();
}
Status OrcReader::_fill_partition_columns(
Block* block, uint64_t rows,
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns) {
DataTypeSerDe::FormatOptions _text_formatOptions;
for (const auto& kv : partition_columns) {
auto col_ptr = block->get_by_position((*_col_name_to_block_idx)[kv.first])
.column->assume_mutable();
const auto& [value, slot_desc] = kv.second;
auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
uint64_t num_deserialized = 0;
if (text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows, &num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
if (num_deserialized != rows) {
return Status::InternalError(
"Failed to fill partition column: {}={} ."
"Number of rows expected to be written : {}, number of rows actually "
"written : "
"{}",
slot_desc->col_name(), value, num_deserialized, rows);
}
}
return Status::OK();
}
Status OrcReader::_fill_missing_columns(
Block* block, uint64_t rows,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
for (const auto& kv : missing_columns) {
if (!_col_name_to_block_idx->contains(kv.first)) {
return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first,
block->dump_structure());
}
if (kv.second == nullptr) {
// no default column, fill with null
auto mutable_column = block->get_by_position((*_col_name_to_block_idx)[kv.first])
.column->assume_mutable();
auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(rows);
} else {
// fill with default value
const auto& ctx = kv.second;
// PT1 => dest primitive type
ColumnPtr result_column_ptr;
RETURN_IF_ERROR(ctx->execute(block, result_column_ptr));
if (result_column_ptr->use_count() == 1) {
// call resize because the first column of _src_block_ptr may not be filled by reader,
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
// has only one row.
auto mutable_column = result_column_ptr->assume_mutable();
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a normal column
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
auto origin_column_type =
block->get_by_position((*_col_name_to_block_idx)[kv.first]).type;
bool is_nullable = origin_column_type->is_nullable();
block->replace_by_position(
(*_col_name_to_block_idx)[kv.first],
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
}
}
}
return Status::OK();
}
Status OrcReader::_fill_row_id_columns(Block* block, int64_t start_row) {
size_t fill_size = _batch->numElements;
if (_row_id_column_iterator_pair.first != nullptr) {
RETURN_IF_ERROR(_row_id_column_iterator_pair.first->seek_to_ordinal(start_row));
auto col = block->get_by_position(_row_id_column_iterator_pair.second)
.column->assume_mutable();
RETURN_IF_ERROR(_row_id_column_iterator_pair.first->next_batch(&fill_size, col));
}
if (_row_lineage_columns != nullptr && _row_lineage_columns->need_row_ids() &&
_row_lineage_columns->first_row_id >= 0) {
auto col = block->get_by_position(_row_lineage_columns->row_id_column_idx)
.column->assume_mutable();
auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
auto& null_map = nullable_column->get_null_map_data();
auto& data =
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
for (size_t i = 0; i < fill_size; ++i) {
if (null_map[i] != 0) {
null_map[i] = 0;
data[i] = _row_lineage_columns->first_row_id + start_row + static_cast<int64_t>(i);
}
}
}
if (_row_lineage_columns != nullptr &&
_row_lineage_columns->has_last_updated_sequence_number_column() &&
_row_lineage_columns->last_updated_sequence_number >= 0) {
auto col = block->get_by_position(
_row_lineage_columns->last_updated_sequence_number_column_idx)
.column->assume_mutable();
auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
auto& null_map = nullable_column->get_null_map_data();
auto& data =
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
for (size_t i = 0; i < fill_size; ++i) {
if (null_map[i] != 0) {
null_map[i] = 0;
data[i] = _row_lineage_columns->last_updated_sequence_number;
}
}
}
return Status::OK();
}
Status OrcReader::_append_iceberg_rowid_column(Block* block, size_t rows, int64_t start_row) {
if (!_iceberg_rowid_params.enabled) {
return Status::OK();
}
int row_id_idx = block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
if (row_id_idx >= 0) {
auto& col_with_type = block->get_by_position(static_cast<size_t>(row_id_idx));
MutableColumnPtr row_id_column;
RETURN_IF_ERROR(build_iceberg_rowid_column(
col_with_type.type, _iceberg_rowid_params.file_path, start_row, rows,
_iceberg_rowid_params.partition_spec_id, _iceberg_rowid_params.partition_data_json,
&row_id_column));
col_with_type.column = std::move(row_id_column);
} else {
DataTypes field_types;
field_types.push_back(std::make_shared<DataTypeString>());
field_types.push_back(std::make_shared<DataTypeInt64>());
field_types.push_back(std::make_shared<DataTypeInt32>());
field_types.push_back(std::make_shared<DataTypeString>());
std::vector<std::string> field_names = {"file_path", "row_position", "partition_spec_id",
"partition_data"};
auto row_id_type = std::make_shared<DataTypeStruct>(field_types, field_names);
MutableColumnPtr row_id_column;
RETURN_IF_ERROR(build_iceberg_rowid_column(
row_id_type, _iceberg_rowid_params.file_path, start_row, rows,
_iceberg_rowid_params.partition_spec_id, _iceberg_rowid_params.partition_data_json,
&row_id_column));
int insert_pos = _iceberg_rowid_params.row_id_column_pos;
if (insert_pos < 0 || insert_pos > static_cast<int>(block->columns())) {
insert_pos = static_cast<int>(block->columns());
}
block->insert(static_cast<size_t>(insert_pos),
ColumnWithTypeAndName(std::move(row_id_column), row_id_type,
doris::BeConsts::ICEBERG_ROWID_COL));
}
if (_col_name_to_block_idx != nullptr) {
*_col_name_to_block_idx = block->get_name_to_pos_map();
}
return Status::OK();
}
void OrcReader::_init_system_properties() {
if (_scan_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _scan_range.file_type;
} else {
_system_properties.system_type = _scan_params.file_type;
}
_system_properties.properties = _scan_params.properties;
_system_properties.hdfs_params = _scan_params.hdfs_params;
if (_scan_params.__isset.broker_addresses) {
_system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(),
_scan_params.broker_addresses.end());
}
}
void OrcReader::_init_file_description() {
_file_description.path = _scan_range.path;
_file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : -1;
if (_scan_range.__isset.fs_name) {
_file_description.fs_name = _scan_range.fs_name;
}
if (_scan_range.__isset.file_cache_admission) {
_file_description.file_cache_admission = _scan_range.file_cache_admission;
}
}
DataTypePtr OrcReader::convert_to_doris_type(const orc::Type* orc_type) {
// Critical: check for nullptr BEFORE accessing any methods
if (orc_type == nullptr) {
LOG(WARNING) << "[OrcReader] ERROR: convert_to_doris_type called with nullptr orc_type! "
"Falling back to STRING";
// Return a safe default type instead of crashing
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true);
}
switch (orc_type->getKind()) {
case orc::TypeKind::BOOLEAN:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_BOOLEAN, true);
case orc::TypeKind::BYTE:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_TINYINT, true);
case orc::TypeKind::SHORT:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_SMALLINT, true);
case orc::TypeKind::INT:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, true);
case orc::TypeKind::LONG:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_BIGINT, true);
case orc::TypeKind::FLOAT:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_FLOAT, true);
case orc::TypeKind::DOUBLE:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_DOUBLE, true);
case orc::TypeKind::STRING:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true);
case orc::TypeKind::BINARY:
if (_scan_params.__isset.enable_mapping_varbinary &&
_scan_params.enable_mapping_varbinary) {
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_VARBINARY,
true);
} else {
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true);
}
case orc::TypeKind::TIMESTAMP:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_DATETIMEV2, true, 0,
6);
case orc::TypeKind::DECIMAL:
return DataTypeFactory::instance().create_data_type(
PrimitiveType::TYPE_DECIMAL128I, true,
orc_type->getPrecision() == 0 ? decimal_precision_for_hive11
: cast_set<int>(orc_type->getPrecision()),
orc_type->getPrecision() == 0 ? decimal_scale_for_hive11
: cast_set<int>(orc_type->getScale()));
case orc::TypeKind::DATE:
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_DATEV2, true);
case orc::TypeKind::VARCHAR:
return DataTypeFactory::instance().create_data_type(
PrimitiveType::TYPE_VARCHAR, true, 0, 0,
cast_set<int>(orc_type->getMaximumLength()));
case orc::TypeKind::CHAR:
return DataTypeFactory::instance().create_data_type(
PrimitiveType::TYPE_CHAR, true, 0, 0, cast_set<int>(orc_type->getMaximumLength()));
case orc::TypeKind::TIMESTAMP_INSTANT:
if (_scan_params.__isset.enable_mapping_timestamp_tz &&
_scan_params.enable_mapping_timestamp_tz) {
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_TIMESTAMPTZ,
true, 0, 6);
}
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_DATETIMEV2, true, 0,
6);
case orc::TypeKind::LIST: {
return make_nullable(
std::make_shared<DataTypeArray>(convert_to_doris_type(orc_type->getSubtype(0))));
}
case orc::TypeKind::MAP: {
// Handle incomplete MAP type due to column pruning
// If MAP doesn't have both key and value subtypes, try to find the complete type from file
if (orc_type->getSubtypeCount() < 2 || orc_type->getSubtype(0) == nullptr ||
orc_type->getSubtype(1) == nullptr) {
// Try to find the complete type from _column_id_to_file_type
uint64_t column_id = orc_type->getColumnId();
VLOG_DEBUG << "[OrcReader] Detected incomplete MAP type: column_id=" << column_id
<< ", subtype_count=" << orc_type->getSubtypeCount() << ", subtype(0)="
<< (orc_type->getSubtypeCount() > 0 && orc_type->getSubtype(0) != nullptr
? "not null"
: "null")
<< ", subtype(1)="
<< (orc_type->getSubtypeCount() > 1 && orc_type->getSubtype(1) != nullptr
? "not null"
: "null");
auto it = _column_id_to_file_type.find(column_id);
if (it != _column_id_to_file_type.end() && it->second != nullptr) {
const orc::Type* complete_type = it->second;
VLOG_DEBUG << "[OrcReader] Found complete type in mapping: column_id=" << column_id
<< ", complete_type_kind=" << static_cast<int>(complete_type->getKind())
<< ", complete_subtype_count=" << complete_type->getSubtypeCount();
// Print subtypes information
for (uint64_t i = 0; i < complete_type->getSubtypeCount(); ++i) {
const orc::Type* subtype = complete_type->getSubtype(i);
VLOG_DEBUG << "[OrcReader] complete_type->subtype[" << i << "]: "
<< (subtype != nullptr
? ("kind=" +
std::to_string(static_cast<int>(subtype->getKind())) +
", column_id=" +
std::to_string(subtype->getColumnId()))
: "nullptr");
}
if (complete_type->getKind() == orc::TypeKind::MAP &&
complete_type->getSubtypeCount() == 2) {
VLOG_DEBUG
<< "[OrcReader] Using complete MAP type from file schema for column_id="
<< column_id;
// Get subtypes with extra validation
const orc::Type* key_type = complete_type->getSubtype(0);
const orc::Type* value_type = complete_type->getSubtype(1);
VLOG_DEBUG << "[OrcReader] About to convert key_type: "
<< (key_type != nullptr ? "not null" : "NULL");
VLOG_DEBUG << "[OrcReader] About to convert value_type: "
<< (value_type != nullptr ? "not null" : "NULL");
// Use the complete type from file - with null checks
DataTypePtr key_doris_type = convert_to_doris_type(key_type);
VLOG_DEBUG << "[OrcReader] Successfully converted key_type";
DataTypePtr value_doris_type = convert_to_doris_type(value_type);
VLOG_DEBUG << "[OrcReader] Successfully converted value_type";
return make_nullable(
std::make_shared<DataTypeMap>(key_doris_type, value_doris_type));
} else {
LOG(WARNING) << "[OrcReader] Warning: Complete type is not a valid MAP or has "
"wrong subtype count";
}
} else {
LOG(WARNING) << "[OrcReader] Warning: Could not find complete type in mapping for "
"column_id="
<< column_id << ", mapping_size=" << _column_id_to_file_type.size();
}
}
return make_nullable(
std::make_shared<DataTypeMap>(convert_to_doris_type(orc_type->getSubtype(0)),
convert_to_doris_type(orc_type->getSubtype(1))));
}
case orc::TypeKind::STRUCT: {
DataTypes res_data_types;
std::vector<std::string> names;
for (int i = 0; i < orc_type->getSubtypeCount(); ++i) {
res_data_types.push_back(convert_to_doris_type(orc_type->getSubtype(i)));
names.push_back(get_field_name_lower_case(orc_type, i));
}
return make_nullable(std::make_shared<DataTypeStruct>(res_data_types, names));
}
default:
throw Exception(Status::InternalError("Orc type is not supported!"));
return nullptr;
}
}
Status OrcReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
const auto& root_type = _reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
name_to_type->emplace(root_type.getFieldName(i),
convert_to_doris_type(root_type.getSubtype(i)));
}
for (auto& col : _missing_cols) {
missing_cols->insert(col);
}
return Status::OK();
}
// Hive ORC char type will pad trailing spaces.
// https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/impala_char.html
static inline size_t trim_right(const char* s, size_t size) {
while (size > 0 && s[size - 1] == ' ') {
size--;
}
return size;
}
template <bool is_filter>
Status OrcReader::_decode_string_column(const std::string& col_name,
const MutableColumnPtr& data_column,
const orc::TypeKind& type_kind,
const orc::ColumnVectorBatch* cvb, size_t num_values) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
const auto* data = dynamic_cast<const orc::EncodedStringVectorBatch*>(cvb);
if (data == nullptr) {
return Status::InternalError(
"Wrong data type for column '{}', expected EncodedStringVectorBatch", col_name);
}
if (data->isEncoded) {
return _decode_string_dict_encoded_column<is_filter>(data_column, type_kind, data,
num_values);
} else {
return _decode_string_non_dict_encoded_column<is_filter>(data_column, type_kind, data,
num_values);
}
}
template <bool is_filter>
Status OrcReader::_decode_string_non_dict_encoded_column(const MutableColumnPtr& data_column,
const orc::TypeKind& type_kind,
const orc::EncodedStringVectorBatch* cvb,
size_t num_values) {
const static std::string empty_string;
std::vector<StringRef> string_values;
string_values.reserve(num_values);
if (type_kind == orc::TypeKind::CHAR) {
// Possibly there are some zero padding characters in CHAR type, we have to strip them off.
if (cvb->hasNulls) {
for (int i = 0; i < num_values; ++i) {
if (cvb->notNull[i]) {
size_t length = trim_right(cvb->data[i], cvb->length[i]);
string_values.emplace_back((length > 0) ? cvb->data[i] : empty_string.data(),
length);
} else {
// Orc doesn't fill null values in new batch, but the former batch has been release.
// Other types like int/long/timestamp... are flat types without pointer in them,
// so other types do not need to be handled separately like string.
string_values.emplace_back(empty_string.data(), 0);
}
}
} else {
for (int i = 0; i < num_values; ++i) {
size_t length = trim_right(cvb->data[i], cvb->length[i]);
string_values.emplace_back((length > 0) ? cvb->data[i] : empty_string.data(),
length);
}
}
} else {
if (cvb->hasNulls) {
for (int i = 0; i < num_values; ++i) {
if (cvb->notNull[i]) {
string_values.emplace_back(
(cvb->length[i] > 0) ? cvb->data[i] : empty_string.data(),
cvb->length[i]);
} else {
string_values.emplace_back(empty_string.data(), 0);
}
}
} else {
for (int i = 0; i < num_values; ++i) {
string_values.emplace_back(
(cvb->length[i] > 0) ? cvb->data[i] : empty_string.data(), cvb->length[i]);
}
}
}
if (!string_values.empty()) {
data_column->insert_many_strings(string_values.data(), num_values);
}
return Status::OK();
}
template <bool is_filter>
Status OrcReader::_decode_string_dict_encoded_column(const MutableColumnPtr& data_column,
const orc::TypeKind& type_kind,
const orc::EncodedStringVectorBatch* cvb,
size_t num_values) {
std::vector<StringRef> string_values;
size_t max_value_length = 0;
string_values.reserve(num_values);
UInt8* __restrict filter_data = nullptr;
if constexpr (is_filter) {
filter_data = _filter->data();
}
auto process_one = [&]<bool is_char>(int i) {
if constexpr (is_filter) {
if (!filter_data[i]) {
string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
return;
}
}
char* val_ptr;
int64_t length;
cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length);
if constexpr (is_char) {
length = trim_right(val_ptr, length);
}
if (length > max_value_length) {
max_value_length = length;
}
string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW, length);
};
if (type_kind == orc::TypeKind::CHAR) {
if (cvb->hasNulls) {
for (int i = 0; i < num_values; ++i) {
if (cvb->notNull[i]) {
process_one.template operator()<true>(i);
} else {
string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
}
}
} else {
for (int i = 0; i < num_values; ++i) {
process_one.template operator()<true>(i);
}
}
} else {
if (cvb->hasNulls) {
for (int i = 0; i < num_values; ++i) {
if (cvb->notNull[i]) {
process_one.template operator()<false>(i);
} else {
string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
}
}
} else {
for (int i = 0; i < num_values; ++i) {
process_one.template operator()<false>(i);
}
}
}
if (!string_values.empty()) {
data_column->insert_many_strings_overflow(string_values.data(), string_values.size(),
max_value_length);
}
return Status::OK();
}
template <bool is_filter>
Status OrcReader::_decode_int32_column(const std::string& col_name,
const MutableColumnPtr& data_column,
const orc::ColumnVectorBatch* cvb, size_t num_values) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
if (dynamic_cast<const orc::LongVectorBatch*>(cvb) != nullptr) {
return _decode_flat_column<TYPE_INT, orc::LongVectorBatch>(col_name, data_column, cvb,
num_values);
} else if (dynamic_cast<const orc::EncodedStringVectorBatch*>(cvb) != nullptr) {
const auto* data = static_cast<const orc::EncodedStringVectorBatch*>(cvb);
const auto* cvb_data = data->index.data();
auto& column_data = static_cast<ColumnInt32&>(*data_column).get_data();
auto origin_size = column_data.size();
column_data.resize(origin_size + num_values);
for (int i = 0; i < num_values; ++i) {
column_data[origin_size + i] = (Int32)cvb_data[i];
}
return Status::OK();
} else {
DCHECK(false) << "Bad ColumnVectorBatch type.";
return Status::InternalError("Bad ColumnVectorBatch type.");
}
}
Status OrcReader::_fill_doris_array_offsets(const std::string& col_name,
ColumnArray::Offsets64& doris_offsets,
const orc::DataBuffer<int64_t>& orc_offsets,
size_t num_values, size_t* element_size) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
if (num_values > 0) {
// The const variable uses a non-const method from a third-party dependency
// without modification, so const_cast can be used.
if (const_cast<orc::DataBuffer<int64_t>&>(orc_offsets).size() < num_values + 1) {
return Status::InternalError("Wrong array offsets in orc file for column '{}'",
col_name);
}
auto prev_offset = doris_offsets.back();
auto base_offset = orc_offsets[0];
for (int i = 1; i < num_values + 1; ++i) {
doris_offsets.emplace_back(prev_offset + orc_offsets[i] - base_offset);
}
*element_size = orc_offsets[num_values] - base_offset;
} else {
*element_size = 0;
}
return Status::OK();
}
template <bool is_filter>
Status OrcReader::_fill_doris_data_column(const std::string& col_name,
MutableColumnPtr& data_column,
const DataTypePtr& data_type,
std::shared_ptr<TableSchemaChangeHelper::Node> root_node,
const orc::Type* orc_column_type,
const orc::ColumnVectorBatch* cvb, size_t num_values) {
auto logical_type = data_type->get_primitive_type();
switch (logical_type) {
#define DISPATCH(FlatType, CppType, OrcColumnType) \
case FlatType: \
return _decode_flat_column<FlatType, OrcColumnType>(col_name, data_column, cvb, num_values);
FOR_FLAT_ORC_COLUMNS(DISPATCH)
#undef DISPATCH
case PrimitiveType::TYPE_INT:
return _decode_int32_column<is_filter>(col_name, data_column, cvb, num_values);
case PrimitiveType::TYPE_DECIMAL32:
return _decode_decimal_column<TYPE_DECIMAL32, is_filter>(col_name, data_column, data_type,
cvb, num_values);
case PrimitiveType::TYPE_DECIMAL64:
return _decode_decimal_column<TYPE_DECIMAL64, is_filter>(col_name, data_column, data_type,
cvb, num_values);
case PrimitiveType::TYPE_DECIMALV2:
return _decode_decimal_column<TYPE_DECIMALV2, is_filter>(col_name, data_column, data_type,
cvb, num_values);
case PrimitiveType::TYPE_DECIMAL128I:
return _decode_decimal_column<TYPE_DECIMAL128I, is_filter>(col_name, data_column, data_type,
cvb, num_values);
case PrimitiveType::TYPE_DATEV2:
return _decode_time_column<DateV2Value<DateV2ValueType>, TYPE_DATEV2, orc::LongVectorBatch,
is_filter>(col_name, data_column, cvb, num_values);
case PrimitiveType::TYPE_DATETIMEV2:
return _decode_time_column<DateV2Value<DateTimeV2ValueType>, TYPE_DATETIMEV2,
orc::TimestampVectorBatch, is_filter>(col_name, data_column, cvb,
num_values);
case PrimitiveType::TYPE_STRING:
case PrimitiveType::TYPE_VARCHAR:
case PrimitiveType::TYPE_CHAR:
return _decode_string_column<is_filter>(col_name, data_column, orc_column_type->getKind(),
cvb, num_values);
case PrimitiveType::TYPE_VARBINARY:
// case BINARY: binary type still use StringVectorBatch, so here we just call _decode_string_column
// return encoded ? std::make_unique<EncodedStringVectorBatch>(capacity, memoryPool)
// : std::make_unique<StringVectorBatch>(capacity, memoryPool);
return _decode_string_column<is_filter>(col_name, data_column, orc_column_type->getKind(),
cvb, num_values);
case PrimitiveType::TYPE_TIMESTAMPTZ:
return _decode_timestamp_tz_column<is_filter>(col_name, data_column, cvb, num_values);
case PrimitiveType::TYPE_ARRAY: {
if (orc_column_type->getKind() != orc::TypeKind::LIST) {
return Status::InternalError(
"Wrong data type for column '{}', expected list, actual {}", col_name,
orc_column_type->getKind());
}
const auto* orc_list = dynamic_cast<const orc::ListVectorBatch*>(cvb);
auto& doris_offsets = static_cast<ColumnArray&>(*data_column).get_offsets();
const auto& orc_offsets = orc_list->offsets;
size_t element_size = 0;
RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_offsets, orc_offsets, num_values,
&element_size));
const DataTypePtr& nested_type =
reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get())
->get_nested_type();
const orc::Type* nested_orc_type = orc_column_type->getSubtype(0);
std::string element_name = col_name + ".element";
return _orc_column_to_doris_column<false>(
element_name, static_cast<ColumnArray&>(*data_column).get_data_ptr(), nested_type,
root_node->get_element_node(), nested_orc_type, orc_list->elements.get(),
element_size);
}
case PrimitiveType::TYPE_MAP: {
if (orc_column_type->getKind() != orc::TypeKind::MAP) {
return Status::InternalError("Wrong data type for column '{}', expected map, actual {}",
col_name, orc_column_type->getKind());
}
const auto* orc_map = dynamic_cast<const orc::MapVectorBatch*>(cvb);
auto& doris_map = static_cast<ColumnMap&>(*data_column);
size_t element_size = 0;
RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_map.get_offsets(),
orc_map->offsets, num_values, &element_size));
const DataTypePtr& doris_key_type =
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_key_type();
const DataTypePtr& doris_value_type =
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_value_type();
// Get ORC key and value types with null checks
const orc::Type* orc_key_type = orc_column_type->getSubtype(0);
const orc::Type* orc_value_type = orc_column_type->getSubtype(1);
VLOG_DEBUG << "[OrcReader] MAP column '" << col_name
<< "': orc_key_type=" << (orc_key_type != nullptr ? "not null" : "NULL")
<< ", orc_value_type=" << (orc_value_type != nullptr ? "not null" : "NULL")
<< ", element_size=" << element_size;
// Handle incomplete MAP type - if key or value type is nullptr, try to recover from mapping
bool key_is_missing = (orc_key_type == nullptr);
bool value_is_missing = (orc_value_type == nullptr);
if (key_is_missing || value_is_missing) {
VLOG_DEBUG << "[OrcReader] Detected incomplete MAP subtypes for column '" << col_name
<< "', attempting to recover from mapping...";
uint64_t column_id = orc_column_type->getColumnId();
auto it = _column_id_to_file_type.find(column_id);
if (it != _column_id_to_file_type.end() && it->second != nullptr) {
const orc::Type* complete_map_type = it->second;
if (complete_map_type->getKind() == orc::TypeKind::MAP &&
complete_map_type->getSubtypeCount() == 2) {
if (key_is_missing) {
orc_key_type = complete_map_type->getSubtype(0);
if (orc_key_type != nullptr) {
// key_is_missing = false;
VLOG_DEBUG << "[OrcReader] Recovered key type from mapping for column '"
<< col_name << "'";
}
}
if (value_is_missing) {
orc_value_type = complete_map_type->getSubtype(1);
if (orc_value_type != nullptr) {
// value_is_missing = false;
VLOG_DEBUG
<< "[OrcReader] Recovered value type from mapping for column '"
<< col_name << "'";
}
}
}
}
}
ColumnPtr& doris_key_column = doris_map.get_keys_ptr();
ColumnPtr& doris_value_column = doris_map.get_values_ptr();
std::string key_col_name = col_name + ".key";
std::string value_col_name = col_name + ".value";
// Handle key column: if still missing, fill with default values
if (key_is_missing) {
// Fill key column with default values (nulls or empty values)
auto mutable_key_column = doris_key_column->assume_mutable();
if (mutable_key_column->is_nullable()) {
auto* nullable_column = static_cast<ColumnNullable*>(mutable_key_column.get());
nullable_column->insert_many_defaults(element_size);
} else {
mutable_key_column->insert_many_defaults(element_size);
}
} else {
// Normal processing: convert ORC column to Doris column
RETURN_IF_ERROR(_orc_column_to_doris_column<false>(
key_col_name, doris_key_column, doris_key_type, root_node->get_key_node(),
orc_key_type, orc_map->keys.get(), element_size));
}
// Handle value column: if still missing, fill with default values
if (value_is_missing) {
// Fill value column with default values (nulls or empty values)
auto mutable_value_column = doris_value_column->assume_mutable();
if (mutable_value_column->is_nullable()) {
auto* nullable_column = static_cast<ColumnNullable*>(mutable_value_column.get());
nullable_column->insert_many_defaults(element_size);
} else {
mutable_value_column->insert_many_defaults(element_size);
}
} else {
// Normal processing: convert ORC column to Doris column
RETURN_IF_ERROR(_orc_column_to_doris_column<false>(
value_col_name, doris_value_column, doris_value_type,
root_node->get_value_node(), orc_value_type, orc_map->elements.get(),
element_size));
}
return Status::OK();
}
case PrimitiveType::TYPE_STRUCT: {
if (orc_column_type->getKind() != orc::TypeKind::STRUCT) {
return Status::InternalError(
"Wrong data type for column '{}', expected struct, actual {}", col_name,
orc_column_type->getKind());
}
const auto* orc_struct = dynamic_cast<const orc::StructVectorBatch*>(cvb);
auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
std::map<int, int> read_fields;
std::set<int> missing_fields;
const auto* doris_struct_type =
assert_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
// Build ORC field name to index map for faster lookup
std::unordered_map<std::string, int> orc_field_name_to_idx;
for (int j = 0; j < orc_column_type->getSubtypeCount(); ++j) {
std::string field_name = orc_column_type->getFieldName(j);
std::transform(field_name.begin(), field_name.end(), field_name.begin(), ::tolower);
orc_field_name_to_idx[field_name] = j;
}
for (int i = 0; i < doris_struct.tuple_size(); ++i) {
const auto& table_column_name = doris_struct_type->get_name_by_position(i);
if (!root_node->children_column_exists(table_column_name)) {
missing_fields.insert(i);
continue;
}
const auto& file_column_name = root_node->children_file_column_name(table_column_name);
std::string file_column_name_lower = file_column_name;
std::transform(file_column_name_lower.begin(), file_column_name_lower.end(),
file_column_name_lower.begin(), ::tolower);
auto it = orc_field_name_to_idx.find(file_column_name_lower);
if (it != orc_field_name_to_idx.end()) {
read_fields[i] = it->second;
VLOG_DEBUG << "[OrcReader] Found field mapping: doris_field[" << i
<< "] -> orc_field[" << it->second
<< "], table_column: " << table_column_name
<< ", file_column: " << file_column_name_lower;
} else {
missing_fields.insert(i);
VLOG_DEBUG << "[OrcReader] Missing field: doris_field[" << i
<< "], table_column: " << table_column_name
<< ", file_column: " << file_column_name_lower
<< " (not found in ORC file)";
}
}
for (int missing_field : missing_fields) {
ColumnPtr& doris_field = doris_struct.get_column_ptr(missing_field);
if (!doris_field->is_nullable()) {
return Status::InternalError(
"Child field of '{}' is not nullable, but is missing in orc file",
col_name);
}
reinterpret_cast<ColumnNullable*>(doris_field->assume_mutable().get())
->insert_many_defaults(num_values);
}
for (auto read_field : read_fields) {
orc::ColumnVectorBatch* orc_field = orc_struct->fields[read_field.second];
const orc::Type* orc_type = orc_column_type->getSubtype(read_field.second);
std::string field_name =
col_name + "." + orc_column_type->getFieldName(read_field.second);
ColumnPtr& doris_field = doris_struct.get_column_ptr(read_field.first);
const DataTypePtr& doris_type = doris_struct_type->get_element(read_field.first);
RETURN_IF_ERROR(_orc_column_to_doris_column<is_filter>(
field_name, doris_field, doris_type,
root_node->get_children_node(
doris_struct_type->get_name_by_position(read_field.first)),
orc_type, orc_field, num_values));
}
return Status::OK();
}
default:
break;
}
return Status::InternalError("Unsupported type {} for column '{}'", data_type->get_name(),
col_name);
}
template <bool is_filter>
Status OrcReader::_orc_column_to_doris_column(
const std::string& col_name, ColumnPtr& doris_column, const DataTypePtr& data_type,
std::shared_ptr<TableSchemaChangeHelper::Node> root_node, const orc::Type* orc_column_type,
const orc::ColumnVectorBatch* cvb, size_t num_values) {
DataTypePtr resolved_type;
ColumnPtr resolved_column;
MutableColumnPtr data_column;
if (orc_column_type != nullptr) {
auto src_type = convert_to_doris_type(orc_column_type);
bool is_dict_filter_col = false;
for (const std::pair<std::string, int>& dict_col : _dict_filter_cols) {
if (col_name == dict_col.first) {
src_type =
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, true);
is_dict_filter_col = true;
break;
}
}
// If the column can be dictionary filtered, there will be two types.
// It may be plain or a dictionary, because the same field in different stripes may have different types.
// Here we use the $dict_ prefix to represent the dictionary type converter.
auto converter_key = !is_dict_filter_col ? col_name : fmt::format("$dict_{}", col_name);
if (!_converters.contains(converter_key)) {
std::unique_ptr<converter::ColumnTypeConverter> converter =
converter::ColumnTypeConverter::get_converter(src_type, data_type,
converter::FileFormat::ORC);
if (!converter->support()) {
return Status::InternalError(
"The column type of '{}' has changed and is not supported: ", col_name,
converter->get_error_msg());
}
// reuse the cached converter
_converters[converter_key] = std::move(converter);
}
converter::ColumnTypeConverter* converter = _converters[converter_key].get();
resolved_column = converter->get_column(src_type, doris_column, data_type);
resolved_type = converter->get_type();
if (resolved_column->is_nullable()) {
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
auto* nullable_column =
reinterpret_cast<ColumnNullable*>(resolved_column->assume_mutable().get());
data_column = nullable_column->get_nested_column_ptr();
NullMap& map_data_column = nullable_column->get_null_map_data();
auto origin_size = map_data_column.size();
map_data_column.resize(origin_size + num_values);
if (cvb->hasNulls) {
const auto* cvb_nulls = cvb->notNull.data();
for (int i = 0; i < num_values; ++i) {
map_data_column[origin_size + i] = !cvb_nulls[i];
}
} else {
memset(map_data_column.data() + origin_size, 0, num_values);
}
} else {
if (cvb->hasNulls) {
return Status::InternalError("Not nullable column {} has null values in orc file",
col_name);
}
data_column = resolved_column->assume_mutable();
}
RETURN_IF_ERROR(_fill_doris_data_column<is_filter>(
col_name, data_column, remove_nullable(resolved_type), root_node, orc_column_type,
cvb, num_values));
// resolve schema change
auto converted_column = doris_column->assume_mutable();
return converter->convert(resolved_column, converted_column);
} else {
auto mutable_column = doris_column->assume_mutable();
if (mutable_column->is_nullable()) {
auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
nullable_column->insert_many_defaults(num_values);
} else {
mutable_column->insert_many_defaults(num_values);
}
}
return Status::OK();
}
std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int pos) {
std::string name = orc_type->getFieldName(pos);
transform(name.begin(), name.end(), name.begin(), ::tolower);
return name;
}
Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(_get_next_block_impl(block, read_rows, eof));
if (*eof) {
COUNTER_UPDATE(_orc_profile.selected_row_group_count,
_reader_metrics.SelectedRowGroupCount);
COUNTER_UPDATE(_orc_profile.evaluated_row_group_count,
_reader_metrics.EvaluatedRowGroupCount);
if (_io_ctx) {
_io_ctx->file_reader_stats->read_rows += _reader_metrics.ReadRowCount;
}
}
return Status::OK();
}
void OrcReader::_filter_rows_by_condition_cache(size_t* read_rows, bool* eof) {
// Condition cache HIT: skip consecutive false granules before reading.
// Uses _current_read_position which tracks where the *next* batch will
// start, as opposed to _last_read_row_number which is the start of the
// most recently read batch (set after nextBatch returns).
if (_condition_cache_ctx && _condition_cache_ctx->is_hit) {
auto& cache = *_condition_cache_ctx->filter_result;
uint64_t base_granule = _first_row_in_range / ConditionCacheContext::GRANULE_SIZE;
uint64_t cur_granule = _current_read_position / ConditionCacheContext::GRANULE_SIZE;
uint64_t cache_idx = cur_granule - base_granule;
while (cache_idx < cache.size() && !cache[cache_idx]) {
cache_idx++;
}
if (cache_idx >= cache.size()) {
// No more surviving rows exist in this scan range.
*eof = true;
*read_rows = 0;
if (_io_ctx) {
_io_ctx->condition_cache_filtered_rows +=
(_first_row_in_range + get_total_rows()) - _current_read_position;
}
return;
}
uint64_t target_row = (base_granule + cache_idx) * ConditionCacheContext::GRANULE_SIZE;
if (target_row > _current_read_position) {
_row_reader->seekToRow(target_row);
if (_io_ctx) {
_io_ctx->condition_cache_filtered_rows += target_row - _current_read_position;
}
_current_read_position = target_row;
}
}
}
Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eof) {
if (_io_ctx && _io_ctx->should_stop) {
*eof = true;
*read_rows = 0;
return Status::OK();
}
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size);
set_remaining_rows(get_remaining_rows() - rows);
auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));
*read_rows = rows;
if (get_remaining_rows() == 0) {
*eof = true;
}
return Status::OK();
}
if (!_seek_to_read_one_line()) {
*eof = true;
return Status::OK();
}
if (_lazy_read_ctx.can_lazy_read) {
std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}
uint64_t rr;
SCOPED_RAW_TIMER(&_statistics.column_read_time);
{
SCOPED_RAW_TIMER(&_statistics.get_batch_time);
// reset decimal_scale_params_index;
_decimal_scale_params_index = 0;
try {
_filter_rows_by_condition_cache(read_rows, eof);
if (*eof) {
return Status::OK();
}
rr = _row_reader->nextBatch(*_batch, block);
if (rr == 0 || _batch->numElements == 0) {
*eof = true;
*read_rows = 0;
return Status::OK();
}
// After nextBatch(), getRowNumber() returns the start of the batch just read.
_last_read_row_number = _row_reader->getRowNumber();
// Use _batch->numElements (not rr) because ORC's nextBatch has an
// internal do-while loop: when the filter callback rejects an entire
// batch, the loop retries with the next batch. The return value (rr)
// accumulates rows across ALL iterations, but getRowNumber() returns
// the start of the LAST iteration's batch. _batch->numElements is set
// to that iteration's batch size (Reader.cc:1427), giving the correct
// next-read position.
_current_read_position = _last_read_row_number + _batch->numElements;
} catch (std::exception& e) {
std::string _err_msg = e.what();
if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
block->clear_column_data();
*eof = true;
*read_rows = 0;
return Status::OK();
}
return Status::InternalError("Orc row reader nextBatch failed. reason = {}",
_err_msg);
}
}
// Condition cache MISS: mark granules with surviving rows (lazy-read path).
// This is done here (after nextBatch) instead of in the filter() callback because
// getRowNumber() only returns the correct batch-start row after nextBatch() returns.
if (_condition_cache_ctx && !_condition_cache_ctx->is_hit && _filter) {
auto& cache = *_condition_cache_ctx->filter_result;
auto* filter_data = _filter->data();
size_t filter_size = _filter->size();
size_t base_granule = _first_row_in_range / ConditionCacheContext::GRANULE_SIZE;
for (size_t i = 0; i < filter_size; i++) {
if (filter_data[i]) {
size_t granule =
(_last_read_row_number + i) / ConditionCacheContext::GRANULE_SIZE;
size_t cache_idx = granule - base_granule;
if (cache_idx < cache.size()) {
cache[cache_idx] = true;
}
}
}
}
int64_t start_row = _row_reader->getRowNumber();
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);
for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
auto& column_with_type_and_name =
block->get_by_position((*_col_name_to_block_idx)[col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
auto orc_col_idx = _colname_to_idx.find(file_column_name);
if (orc_col_idx == _colname_to_idx.end()) {
return Status::InternalError("Wrong read column '{}' in orc file", col_name);
}
RETURN_IF_ERROR(_orc_column_to_doris_column<true>(
col_name, column_ptr, column_type,
_table_info_node_ptr->get_children_node(col_name), _type_map[file_column_name],
batch_vec[orc_col_idx->second], _batch->numElements));
#ifndef NDEBUG
column_ptr->sanity_check();
#endif
}
RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
_lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(
_fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns));
RETURN_IF_ERROR(_fill_row_id_columns(block, start_row));
RETURN_IF_ERROR(_append_iceberg_rowid_column(block, block->rows(), start_row));
if (block->rows() == 0) {
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
*eof = true;
*read_rows = 0;
return Status::OK();
}
{
#ifndef NDEBUG
for (auto col : *block) {
col.column->sanity_check();
DCHECK(block->rows() == col.column->size())
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
block->rows(), col.column->size(), col.name);
}
#endif
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
_execute_filter_position_delete_rowids(*_filter, start_row);
#ifndef NDEBUG
for (auto col : *block) {
col.column->sanity_check();
DCHECK(block->rows() == col.column->size())
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
block->rows(), col.column->size(), col.name);
}
#endif
{
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, *_filter));
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
#ifndef NDEBUG
for (auto col : *block) {
col.column->sanity_check();
DCHECK(block->rows() == col.column->size())
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
block->rows(), col.column->size(), col.name);
}
#endif
}
} else {
uint64_t rr;
SCOPED_RAW_TIMER(&_statistics.column_read_time);
{
SCOPED_RAW_TIMER(&_statistics.get_batch_time);
// reset decimal_scale_params_index;
_decimal_scale_params_index = 0;
try {
_filter_rows_by_condition_cache(read_rows, eof);
if (*eof) {
return Status::OK();
}
rr = _row_reader->nextBatch(*_batch, block);
if (rr == 0 || _batch->numElements == 0) {
*eof = true;
*read_rows = 0;
return Status::OK();
}
// After nextBatch(), getRowNumber() returns the start of the batch just read.
_last_read_row_number = _row_reader->getRowNumber();
_current_read_position = _last_read_row_number + _batch->numElements;
} catch (std::exception& e) {
std::string _err_msg = e.what();
if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
block->clear_column_data();
*eof = true;
*read_rows = 0;
return Status::OK();
}
return Status::InternalError("Orc row reader nextBatch failed. reason = {}",
_err_msg);
}
}
int64_t start_row = _row_reader->getRowNumber();
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
for (auto& dict_filter_cols : _dict_filter_cols) {
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
return Status::InternalError(
"Failed to find dict filter column '{}' in block {}",
dict_filter_cols.first, block->dump_structure());
}
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
block->get_by_position(pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
pos,
ColumnNullable::create(std::move(dict_col_ptr),
ColumnUInt8::create(dict_col_ptr->size(), 0)));
} else {
block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(pos, std::move(dict_col_ptr));
}
}
_dict_cols_has_converted = true;
}
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);
for (auto& col_name : _lazy_read_ctx.all_read_columns) {
auto& column_with_type_and_name =
block->get_by_position((*_col_name_to_block_idx)[col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
auto orc_col_idx = _colname_to_idx.find(file_column_name);
if (orc_col_idx == _colname_to_idx.end()) {
return Status::InternalError("Wrong read column '{}' in orc file", col_name);
}
RETURN_IF_ERROR(_orc_column_to_doris_column<false>(
col_name, column_ptr, column_type,
_table_info_node_ptr->get_children_node(col_name), _type_map[file_column_name],
batch_vec[orc_col_idx->second], _batch->numElements));
#ifndef NDEBUG
column_ptr->sanity_check();
#endif
}
RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
_lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(
_fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns));
RETURN_IF_ERROR(_fill_row_id_columns(block, start_row));
RETURN_IF_ERROR(_append_iceberg_rowid_column(block, block->rows(), start_row));
if (block->rows() == 0) {
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
*eof = true;
*read_rows = 0;
return Status::OK();
}
#ifndef NDEBUG
for (auto col : *block) {
col.column->sanity_check();
DCHECK(block->rows() == col.column->size())
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
block->rows(), col.column->size(), col.name);
}
#endif
{
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
_build_delete_row_filter(block, _batch->numElements);
std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}
if (!_lazy_read_ctx.conjuncts.empty()) {
VExprContextSPtrs filter_conjuncts;
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
_filter_conjuncts.end());
for (auto& conjunct : _dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
for (auto& conjunct : _non_dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
std::vector<IColumn::Filter*> filters;
if (_delete_rows_filter_ptr) {
filters.push_back(_delete_rows_filter_ptr.get());
}
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
// Condition cache MISS: mark granules with surviving rows (non-lazy path)
if (_condition_cache_ctx && !_condition_cache_ctx->is_hit) {
auto& cache = *_condition_cache_ctx->filter_result;
auto* filter_data = result_filter.data();
size_t num_rows = block->rows();
size_t base_granule = _first_row_in_range / ConditionCacheContext::GRANULE_SIZE;
for (size_t i = 0; i < num_rows; i++) {
if (filter_data[i]) {
size_t granule = (_last_read_row_number + i) /
ConditionCacheContext::GRANULE_SIZE;
size_t cache_idx = granule - base_granule;
if (cache_idx < cache.size()) {
cache[cache_idx] = true;
}
}
}
}
if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
Block::erase_useless_column(block, column_to_keep);
return _convert_dict_cols_to_string_cols(block, &batch_vec);
}
_execute_filter_position_delete_rowids(result_filter, start_row);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
Block::erase_useless_column(block, column_to_keep);
} else {
if (_delete_rows_filter_ptr) {
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr, start_row);
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
block, columns_to_filter, (*_delete_rows_filter_ptr)));
} else if (_position_delete_ordered_rowids != nullptr) {
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
_execute_filter_position_delete_rowids(*filter, start_row);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, (*filter)));
}
Block::erase_useless_column(block, column_to_keep);
}
}
#ifndef NDEBUG
for (auto col : *block) {
col.column->sanity_check();
DCHECK(block->rows() == col.column->size())
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
block->rows(), col.column->size(), col.name);
}
#endif
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
#ifndef NDEBUG
for (auto col : *block) {
col.column->sanity_check();
DCHECK(block->rows() == col.column->size())
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
block->rows(), col.column->size(), col.name);
}
#endif
*read_rows = block->rows();
}
return Status::OK();
}
void OrcReader::_fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
orc::ColumnVectorBatch* batch, int idx) {
if (_is_acid) {
for (auto* field : dynamic_cast<const orc::StructVectorBatch*>(batch)->fields) {
if (dynamic_cast<const orc::StructVectorBatch*>(field) != nullptr) {
for (auto* row_field : dynamic_cast<const orc::StructVectorBatch*>(field)->fields) {
result.push_back(row_field);
}
} else {
result.push_back(field);
}
}
} else {
for (auto* field : dynamic_cast<const orc::StructVectorBatch*>(batch)->fields) {
result.push_back(field);
}
}
}
void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
// transactional hive orc delete row
if (_delete_rows != nullptr) {
_delete_rows_filter_ptr = std::make_unique<IColumn::Filter>(rows, 1);
auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data();
const auto& original_transaction_column = assert_cast<const ColumnInt64&>(*remove_nullable(
block->get_by_position((*_col_name_to_block_idx)
[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
.column));
const auto& bucket_id_column = assert_cast<const ColumnInt32&>(*remove_nullable(
block->get_by_position(
(*_col_name_to_block_idx)[TransactionalHive::BUCKET_LOWER_CASE])
.column));
const auto& row_id_column = assert_cast<const ColumnInt64&>(*remove_nullable(
block->get_by_position(
(*_col_name_to_block_idx)[TransactionalHive::ROW_ID_LOWER_CASE])
.column));
for (int i = 0; i < rows; ++i) {
auto original_transaction = original_transaction_column.get_int(i);
auto bucket_id = bucket_id_column.get_int(i);
auto row_id = row_id_column.get_int(i);
TransactionalHiveReader::AcidRowID transactional_row_id = {
.original_transaction = original_transaction,
.bucket = bucket_id,
.row_id = row_id};
if (_delete_rows->contains(transactional_row_id)) {
_pos_delete_filter_data[i] = 0;
}
}
}
}
Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) {
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
auto* block = (Block*)arg;
size_t origin_column_num = block->columns();
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
for (auto& dict_filter_cols : _dict_filter_cols) {
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
dict_filter_cols.first, block->dump_structure());
}
MutableColumnPtr dict_col_ptr = ColumnInt32::create();
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
block->get_by_position(pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
pos, ColumnNullable::create(std::move(dict_col_ptr),
ColumnUInt8::create(dict_col_ptr->size(), 0)));
} else {
block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(pos, std::move(dict_col_ptr));
}
}
_dict_cols_has_converted = true;
}
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, &data, 0);
std::vector<std::string> table_col_names;
table_col_names.insert(table_col_names.end(), _lazy_read_ctx.predicate_columns.first.begin(),
_lazy_read_ctx.predicate_columns.first.end());
if (_is_acid) {
table_col_names.insert(table_col_names.end(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
}
for (auto& table_col_name : table_col_names) {
auto& column_with_type_and_name =
block->get_by_position((*_col_name_to_block_idx)[table_col_name]);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto file_column_name = _table_info_node_ptr->children_file_column_name(table_col_name);
auto orc_col_idx = _colname_to_idx.find(file_column_name);
if (orc_col_idx == _colname_to_idx.end()) {
return Status::InternalError("Wrong read column '{}' in orc file", table_col_name);
}
RETURN_IF_ERROR(_orc_column_to_doris_column<false>(
table_col_name, column_ptr, column_type,
_table_info_node_ptr->get_children_node(table_col_name),
_type_map[file_column_name], batch_vec[orc_col_idx->second], data.numElements));
#ifndef NDEBUG
column_ptr->sanity_check();
#endif
}
RETURN_IF_ERROR(
_fill_partition_columns(block, size, _lazy_read_ctx.predicate_partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, size, _lazy_read_ctx.predicate_missing_columns));
if (_lazy_read_ctx.resize_first_column) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
block->get_by_position(0).column->assume_mutable()->resize(size);
}
// transactional hive orc delete row
_build_delete_row_filter(block, size);
_filter = std::make_unique<IColumn::Filter>(size, 1);
auto* __restrict result_filter_data = _filter->data();
bool can_filter_all = false;
VExprContextSPtrs filter_conjuncts;
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
_filter_conjuncts.end());
for (auto& conjunct : _dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
for (auto& conjunct : _non_dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
std::vector<IColumn::Filter*> filters;
if (_delete_rows_filter_ptr) {
filters.push_back(_delete_rows_filter_ptr.get());
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
filter_conjuncts, &filters, block, _filter.get(), &can_filter_all));
if (_lazy_read_ctx.resize_first_column) {
// We have to clean the first column to insert right data.
block->get_by_position(0).column->assume_mutable()->clear();
}
if (can_filter_all) {
for (auto& col : table_col_names) {
// clean block to read predicate columns and acid columns
block->get_by_position((*_col_name_to_block_idx)[col])
.column->assume_mutable()
->clear();
}
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
block->get_by_position((*_col_name_to_block_idx)[col.first])
.column->assume_mutable()
->clear();
}
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
block->get_by_position((*_col_name_to_block_idx)[col.first])
.column->assume_mutable()
->clear();
}
Block::erase_useless_column(block, origin_column_num);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
}
uint16_t new_size = 0;
for (uint16_t i = 0; i < size; i++) {
sel[new_size] = i;
new_size += result_filter_data[i] ? 1 : 0;
}
// NOTE: Condition cache MISS marking for the lazy-read path is done
// in _get_next_block_impl after nextBatch() returns, where
// _last_read_row_number has been correctly set via getRowNumber().
// We cannot do it here because this callback fires *during* nextBatch()
// when getRowNumber() still returns the previous batch's start row.
_statistics.lazy_read_filtered_rows += static_cast<int64_t>(size - new_size);
data.numElements = new_size;
return Status::OK();
}
Status OrcReader::fill_dict_filter_column_names(
std::unique_ptr<orc::StripeInformation> current_strip_information,
std::list<std::string>& column_names) {
// Check if single slot can be filtered by dict.
if (!_slot_id_to_filter_conjuncts) {
return Status::OK();
}
_obj_pool->clear();
_dict_filter_cols.clear();
_dict_filter_conjuncts.clear();
_non_dict_filter_conjuncts.clear();
const std::list<std::string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first;
const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
int i = 0;
for (const auto& predicate_col_name : predicate_col_names) {
int slot_id = predicate_col_slot_ids[i];
if (!_disable_dict_filter && _can_filter_by_dict(slot_id)) {
_dict_filter_cols.emplace_back(predicate_col_name, slot_id);
column_names.emplace_back(
_table_info_node_ptr->children_file_column_name(predicate_col_name));
} else {
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
_slot_id_to_filter_conjuncts->end()) {
for (const auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
_non_dict_filter_conjuncts.push_back(ctx);
}
}
}
++i;
}
return Status::OK();
}
bool OrcReader::_can_filter_by_dict(int slot_id) {
SlotDescriptor* slot = nullptr;
const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
for (auto* each : slots) {
if (each->id() == slot_id) {
slot = each;
break;
}
}
if (slot == nullptr) {
return false;
}
if (!is_string_type(slot->type()->get_primitive_type()) &&
!is_var_len_object(slot->type()->get_primitive_type())) {
return false;
}
if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) {
return false;
}
// TODO: The current implementation of dictionary filtering does not take into account
// the implementation of NULL values because the dictionary itself does not contain
// NULL value encoding. As a result, many NULL-related functions or expressions
// cannot work properly, such as is null, is not null, coalesce, etc.
// Here we check if the predicate expr is IN or BINARY_PRED.
// Implementation of NULL value dictionary filtering will be carried out later.
return std::ranges::all_of(_slot_id_to_filter_conjuncts->at(slot_id), [&](const auto& ctx) {
return (ctx->root()->node_type() == TExprNodeType::IN_PRED ||
ctx->root()->node_type() == TExprNodeType::BINARY_PRED) &&
ctx->root()->children()[0]->node_type() == TExprNodeType::SLOT_REF;
});
}
Status OrcReader::on_string_dicts_loaded(
std::unordered_map<std::string, orc::StringDictionary*>& file_column_name_to_dict_map,
bool* is_stripe_filtered) {
SCOPED_RAW_TIMER(&_statistics.dict_filter_rewrite_time);
*is_stripe_filtered = false;
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
std::string& dict_filter_col_name = it->first;
int slot_id = it->second;
// Can not dict filter col find because stripe is not dict encoded, then remove it.
VExprContextSPtrs ctxs;
auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
if (iter != _slot_id_to_filter_conjuncts->end()) {
for (const auto& ctx : iter->second) {
ctxs.push_back(ctx);
}
} else {
std::stringstream msg;
msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found";
return Status::NotFound(msg.str());
}
auto file_column_name_to_dict_map_iter = file_column_name_to_dict_map.find(
_table_info_node_ptr->children_file_column_name(dict_filter_col_name));
if (file_column_name_to_dict_map_iter == file_column_name_to_dict_map.end()) {
it = _dict_filter_cols.erase(it);
for (auto& ctx : ctxs) {
_non_dict_filter_conjuncts.emplace_back(ctx);
}
continue;
}
// 1. Get dictionary values to a string column.
MutableColumnPtr dict_value_column = ColumnString::create();
orc::StringDictionary* dict = file_column_name_to_dict_map_iter->second;
std::vector<StringRef> dict_values;
size_t max_value_length = 0;
uint64_t dictionaryCount = dict->dictionaryOffset.size() - 1;
if (dictionaryCount == 0) {
it = _dict_filter_cols.erase(it);
for (auto& ctx : ctxs) {
_non_dict_filter_conjuncts.emplace_back(ctx);
}
continue;
}
dict_values.reserve(dictionaryCount);
for (int i = 0; i < dictionaryCount; ++i) {
char* val_ptr;
int64_t length;
dict->getValueByIndex(i, val_ptr, length);
StringRef dict_value((length > 0) ? val_ptr : "", length);
if (length > max_value_length) {
max_value_length = length;
}
dict_values.emplace_back(dict_value);
}
dict_value_column->insert_many_strings_overflow(dict_values.data(), dict_values.size(),
max_value_length);
size_t dict_value_column_size = dict_value_column->size();
// 2. Build a temp block from the dict string column, then execute conjuncts and filter block.
// 2.1 Build a temp block from the dict string column to match the conjuncts executing.
Block temp_block;
int dict_pos = -1;
int index = 0;
for (const auto slot_desc : _tuple_descriptor->slots()) {
if (slot_desc->id() == slot_id) {
auto data_type = slot_desc->get_data_type_ptr();
if (data_type->is_nullable()) {
temp_block.insert(
{ColumnNullable::create(
std::move(
dict_value_column), // NOLINT(bugprone-use-after-move)
ColumnUInt8::create(dict_value_column_size, 0)),
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
""});
} else {
temp_block.insert(
{std::move(dict_value_column), std::make_shared<DataTypeString>(), ""});
}
dict_pos = index;
} else {
temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
++index;
}
// 2.2 Execute conjuncts.
if (dict_pos != 0) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
}
IColumn::Filter result_filter(temp_block.rows(), 1);
bool can_filter_all;
RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block, &result_filter,
&can_filter_all));
if (dict_pos != 0) {
// We have to clean the first column to insert right data.
temp_block.get_by_position(0).column->assume_mutable()->clear();
}
// If can_filter_all = true, can filter this stripe.
if (can_filter_all) {
*is_stripe_filtered = true;
return Status::OK();
}
// 3. Get dict codes.
std::vector<int32_t> dict_codes;
for (size_t i = 0; i < result_filter.size(); ++i) {
if (result_filter[i]) {
dict_codes.emplace_back(i);
}
}
// About Performance: if dict_column size is too large, it will generate a large IN filter.
if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
it = _dict_filter_cols.erase(it);
for (auto& ctx : ctxs) {
_non_dict_filter_conjuncts.emplace_back(ctx);
}
continue;
}
// 4. Rewrite conjuncts.
RETURN_IF_ERROR(_rewrite_dict_conjuncts(
dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable()));
++it;
}
return Status::OK();
}
Status OrcReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id,
bool is_nullable) {
VExprSPtr root;
if (dict_codes.size() == 1) {
{
TFunction fn;
TFunctionName fn_name;
fn_name.__set_db_name("");
fn_name.__set_function_name("eq");
fn.__set_name(fn_name);
fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
std::vector<TTypeDesc> arg_types;
arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
fn.__set_arg_types(arg_types);
fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
fn.__set_has_var_args(false);
TExprNode texpr_node;
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
texpr_node.__set_opcode(TExprOpcode::EQ);
texpr_node.__set_fn(fn);
texpr_node.__set_num_children(2);
texpr_node.__set_is_nullable(is_nullable);
root = VectorizedFnCall::create_shared(texpr_node);
}
{
SlotDescriptor* slot = nullptr;
const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
for (auto* each : slots) {
if (each->id() == slot_id) {
slot = each;
break;
}
}
root->add_child(VSlotRef::create_shared(slot));
}
{
TExprNode texpr_node;
texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
texpr_node.__set_type(create_type_desc(TYPE_INT));
TIntLiteral int_literal;
int_literal.__set_value(dict_codes[0]);
texpr_node.__set_int_literal(int_literal);
texpr_node.__set_is_nullable(is_nullable);
root->add_child(VLiteral::create_shared(texpr_node));
}
} else {
{
TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
TExprNode node;
node.__set_type(type_desc);
node.__set_node_type(TExprNodeType::IN_PRED);
node.in_predicate.__set_is_not_in(false);
node.__set_opcode(TExprOpcode::FILTER_IN);
// VdirectInPredicate assume is_nullable = false.
node.__set_is_nullable(false);
std::shared_ptr<HybridSetBase> hybrid_set(
create_set(PrimitiveType::TYPE_INT, dict_codes.size(), false));
for (int& dict_code : dict_codes) {
hybrid_set->insert(&dict_code);
}
root = VDirectInPredicate::create_shared(node, hybrid_set);
}
{
SlotDescriptor* slot = nullptr;
const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
for (auto* each : slots) {
if (each->id() == slot_id) {
slot = each;
break;
}
}
root->add_child(VSlotRef::create_shared(slot));
}
}
VExprContextSPtr rewritten_conjunct_ctx = VExprContext::create_shared(root);
RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
_dict_filter_conjuncts.emplace_back(rewritten_conjunct_ctx);
return Status::OK();
}
Status OrcReader::_convert_dict_cols_to_string_cols(
Block* block, const std::vector<orc::ColumnVectorBatch*>* batch_vec) {
if (!_dict_cols_has_converted) {
return Status::OK();
}
if (!_dict_filter_cols.empty()) {
for (auto& dict_filter_cols : _dict_filter_cols) {
if (!_col_name_to_block_idx->contains(dict_filter_cols.first)) {
return Status::InternalError("Failed to find dict filter column '{}' in block {}",
dict_filter_cols.first, block->dump_structure());
}
auto pos = (*_col_name_to_block_idx)[dict_filter_cols.first];
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
const ColumnPtr& column = column_with_type_and_name.column;
auto file_column_name =
_table_info_node_ptr->children_file_column_name(dict_filter_cols.first);
auto orc_col_idx = _colname_to_idx.find(file_column_name);
if (orc_col_idx == _colname_to_idx.end()) {
return Status::InternalError("Wrong read column '{}' in orc file",
dict_filter_cols.first);
}
if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
const auto* dict_column = assert_cast<const ColumnInt32*>(nested_column.get());
DCHECK(dict_column);
const NullMap& null_map = nullable_column->get_null_map_data();
MutableColumnPtr string_column;
if (batch_vec != nullptr) {
string_column = _convert_dict_column_to_string_column(
dict_column, &null_map, (*batch_vec)[orc_col_idx->second],
_type_map[file_column_name]);
} else {
string_column = ColumnString::create();
}
column_with_type_and_name.type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
block->replace_by_position(
pos, ColumnNullable::create(std::move(string_column),
nullable_column->get_null_map_column_ptr()));
} else {
const auto* dict_column = assert_cast<const ColumnInt32*>(column.get());
MutableColumnPtr string_column;
if (batch_vec != nullptr) {
string_column = _convert_dict_column_to_string_column(
dict_column, nullptr, (*batch_vec)[orc_col_idx->second],
_type_map[file_column_name]);
} else {
string_column = ColumnString::create();
}
column_with_type_and_name.type = std::make_shared<DataTypeString>();
block->replace_by_position(pos, std::move(string_column));
}
}
_dict_cols_has_converted = false;
}
return Status::OK();
}
// TODO: Possible optimization points.
// After filtering the dict column, the null_map for the null dict column should always not be null.
// Then it can avoid checking null_map. However, currently when inert materialization is enabled,
// the filter column will not be filtered first, but will be filtered together at the end.
MutableColumnPtr OrcReader::_convert_dict_column_to_string_column(
const ColumnInt32* dict_column, const NullMap* null_map, orc::ColumnVectorBatch* cvb,
const orc::Type* orc_column_type) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
auto res = ColumnString::create();
auto* encoded_string_vector_batch = static_cast<orc::EncodedStringVectorBatch*>(cvb);
DCHECK(encoded_string_vector_batch);
std::vector<StringRef> string_values;
size_t num_values = dict_column->size();
const int* dict_data = dict_column->get_data().data();
string_values.reserve(num_values);
size_t max_value_length = 0;
if (orc_column_type->getKind() == orc::TypeKind::CHAR) {
// Possibly there are some zero padding characters in CHAR type, we have to strip them off.
if (null_map) {
const auto* null_map_data = null_map->data();
for (int i = 0; i < num_values; ++i) {
if (!null_map_data[i]) {
char* val_ptr;
int64_t length;
encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
length);
length = trim_right(val_ptr, length);
if (length > max_value_length) {
max_value_length = length;
}
string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW,
length);
} else {
// Orc doesn't fill null values in new batch, but the former batch has been release.
// Other types like int/long/timestamp... are flat types without pointer in them,
// so other types do not need to be handled separately like string.
string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
}
}
} else {
for (int i = 0; i < num_values; ++i) {
char* val_ptr;
int64_t length;
encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
length);
length = trim_right(val_ptr, length);
if (length > max_value_length) {
max_value_length = length;
}
string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW,
length);
}
}
} else {
if (null_map) {
const auto* null_map_data = null_map->data();
for (int i = 0; i < num_values; ++i) {
if (!null_map_data[i]) {
char* val_ptr;
int64_t length;
encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
length);
if (length > max_value_length) {
max_value_length = length;
}
string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW,
length);
} else {
string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
}
}
} else {
for (int i = 0; i < num_values; ++i) {
char* val_ptr;
int64_t length;
encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
length);
if (length > max_value_length) {
max_value_length = length;
}
string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW,
length);
}
}
}
if (!string_values.empty()) {
res->insert_many_strings_overflow(string_values.data(), num_values, max_value_length);
}
return res;
}
void ORCFileInputStream::beforeReadStripe(
std::unique_ptr<orc::StripeInformation> current_strip_information,
const std::vector<bool>& selected_columns,
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {
if (_is_all_tiny_stripes) {
return;
}
_collect_profile_before_close_file_stripe();
_stripe_streams.clear();
uint64_t offset = current_strip_information->getOffset();
std::unordered_map<orc::StreamId, io::PrefetchRange> prefetch_ranges;
for (uint64_t stream_id = 0; stream_id < current_strip_information->getNumberOfStreams();
++stream_id) {
std::unique_ptr<orc::StreamInformation> stream =
current_strip_information->getStreamInformation(stream_id);
uint64_t columnId = stream->getColumnId();
uint64_t length = stream->getLength();
if (selected_columns[columnId]) {
doris::io::PrefetchRange prefetch_range = {offset, offset + length};
orc::StreamId streamId(stream->getColumnId(), stream->getKind());
prefetch_ranges.emplace(std::move(streamId), std::move(prefetch_range));
}
offset += length;
}
_build_input_stripe_streams(prefetch_ranges, streams);
}
void ORCFileInputStream::_build_input_stripe_streams(
const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {
if (ranges.empty()) {
return;
}
std::unordered_map<orc::StreamId, io::PrefetchRange> small_ranges;
std::unordered_map<orc::StreamId, io::PrefetchRange> large_ranges;
for (const auto& range : ranges) {
if (range.second.end_offset - range.second.start_offset <= _orc_once_max_read_bytes) {
small_ranges.emplace(range.first, range.second);
} else {
large_ranges.emplace(range.first, range.second);
}
}
_build_small_ranges_input_stripe_streams(small_ranges, streams);
_build_large_ranges_input_stripe_streams(large_ranges, streams);
}
void ORCFileInputStream::_build_small_ranges_input_stripe_streams(
const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {
// Sort ranges by start_offset for efficient searching
std::vector<std::pair<orc::StreamId, io::PrefetchRange>> sorted_ranges(ranges.begin(),
ranges.end());
std::sort(sorted_ranges.begin(), sorted_ranges.end(), [](const auto& a, const auto& b) {
return a.second.start_offset < b.second.start_offset;
});
std::vector<io::PrefetchRange> all_ranges;
all_ranges.reserve(ranges.size());
std::transform(sorted_ranges.begin(), sorted_ranges.end(), std::back_inserter(all_ranges),
[](const auto& pair) { return pair.second; });
auto merged_ranges = io::PrefetchRange::merge_adjacent_seq_ranges(
all_ranges, _orc_max_merge_distance_bytes, _orc_once_max_read_bytes);
for (const auto& merged_range : merged_ranges) {
auto merge_range_file_reader =
std::make_shared<OrcMergeRangeFileReader>(_profile, _file_reader, merged_range);
std::shared_ptr<io::FileReader> tracing_file_reader;
if (_io_ctx) {
tracing_file_reader = std::make_shared<io::TracingFileReader>(
std::move(merge_range_file_reader), _io_ctx->file_reader_stats);
} else {
tracing_file_reader = std::move(merge_range_file_reader);
}
// Use binary search to find the starting point in sorted_ranges
auto it =
std::lower_bound(sorted_ranges.begin(), sorted_ranges.end(),
merged_range.start_offset, [](const auto& pair, uint64_t offset) {
return pair.second.start_offset < offset;
});
// Iterate from the found starting point
for (; it != sorted_ranges.end() && it->second.start_offset < merged_range.end_offset;
++it) {
if (it->second.end_offset <= merged_range.end_offset) {
auto stripe_stream_input_stream = std::make_shared<StripeStreamInputStream>(
getName(), tracing_file_reader, _io_ctx, _profile);
streams.emplace(it->first, stripe_stream_input_stream);
_stripe_streams.emplace_back(stripe_stream_input_stream);
}
}
}
}
void ORCFileInputStream::_build_large_ranges_input_stripe_streams(
const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {
for (const auto& range : ranges) {
auto stripe_stream_input_stream = std::make_shared<StripeStreamInputStream>(
getName(),
_io_ctx ? std::make_shared<io::TracingFileReader>(_file_reader,
_io_ctx->file_reader_stats)
: _file_reader,
_io_ctx, _profile);
streams.emplace(range.first, stripe_stream_input_stream);
_stripe_streams.emplace_back(stripe_stream_input_stream);
}
}
void OrcReader::_execute_filter_position_delete_rowids(IColumn::Filter& filter, int64_t start_row) {
if (_position_delete_ordered_rowids == nullptr) {
return;
}
auto start = start_row;
auto nums = _batch->numElements;
auto l = std::lower_bound(_position_delete_ordered_rowids->begin(),
_position_delete_ordered_rowids->end(), start);
auto r = std::upper_bound(_position_delete_ordered_rowids->begin(),
_position_delete_ordered_rowids->end(), start + nums - 1);
for (; l < r; l++) {
filter[*l - start] = 0;
}
}
#include "common/compile_check_end.h"
} // namespace doris