blob: 3264f5e09eaf353b059071ae14137dc75bb335ba [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/PlanNodes_types.h>
#include <functional>
#include <memory>
#include <set>
#include <string>
#include <tuple>
#include <unordered_map>
#include <vector>
#include "common/status.h"
#include "core/column/column.h"
#include "core/column/column_nullable.h"
#include "core/data_type/data_type.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
#include "format/column_descriptor.h"
#include "format/table/table_schema_change_helper.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "storage/predicate/block_column_predicate.h"
#include "storage/segment/common.h"
#include "util/profile_collector.h"
namespace doris {
class ColumnPredicate;
} // namespace doris
namespace doris {
class Block;
class VSlotRef;
// Context passed from FileScanner to readers for condition cache integration.
// On MISS: readers populate filter_result per-granule during predicate evaluation.
// On HIT: readers skip granules where filter_result[granule] == false.
struct ConditionCacheContext {
bool is_hit = false;
std::shared_ptr<std::vector<bool>> filter_result; // per-granule: true = has surviving rows
int64_t base_granule = 0; // global granule index of the first granule in filter_result
static constexpr int GRANULE_SIZE = 2048;
};
/// Base context for the unified init_reader(ReaderInitContext*) template method.
/// Contains fields shared by ALL reader types. Format-specific readers define
/// subclasses (ParquetInitContext, OrcInitContext, etc.) with extra fields.
/// FileScanner allocates the appropriate subclass and populates the shared fields
/// before calling init_reader().
struct ReaderInitContext {
virtual ~ReaderInitContext() = default;
// ---- Owned by FileScanner, shared by all readers ----
std::vector<ColumnDescriptor>* column_descs = nullptr;
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx = nullptr;
RuntimeState* state = nullptr;
const TupleDescriptor* tuple_descriptor = nullptr;
const RowDescriptor* row_descriptor = nullptr;
const TFileScanRangeParams* params = nullptr;
const TFileRangeDesc* range = nullptr;
TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE;
// ---- Output slots (populated by on_before_init_reader, consumed by _do_init_reader) ----
// column_names: the list of file columns to read. Populated by on_before_init_reader
// from column_descs (slot→name mapping). _do_init_reader uses this to configure the
// format-specific parsing engine. For standalone readers (column_descs==nullptr),
// callers populate column_names directly before calling init_reader.
std::vector<std::string> column_names;
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node =
TableSchemaChangeHelper::ConstNode::get_instance();
std::set<uint64_t> column_ids;
std::set<uint64_t> filter_column_ids;
};
/// Safe downcast for ReaderInitContext subclasses.
/// Uses dynamic_cast + DORIS_CHECK: crashes on type mismatch (per Doris coding standards).
template <typename To, typename From>
To* checked_context_cast(From* ptr) {
auto* result = dynamic_cast<To*>(ptr);
DORIS_CHECK(result != nullptr);
return result;
}
/// Base reader interface for all file readers.
/// A GenericReader is responsible for reading a file and returning
/// a set of blocks with specified schema.
///
/// Provides hook virtual methods that implement the Template Method pattern:
/// init_reader: _open_file_reader → on_before_init_reader → _do_init_reader → on_after_init_reader
/// get_next_block: on_before_read_block → _do_get_next_block → on_after_read_block
///
/// Column-filling logic (partition/missing/synthesized) lives in TableFormatReader.
class GenericReader : public ProfileCollector {
public:
GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {}
void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
_push_down_agg_type = push_down_agg_type;
}
TPushAggOp::type get_push_down_agg_type() const { return _push_down_agg_type; }
/// Template method for reading blocks.
/// Calls: on_before_read_block → _do_get_next_block → on_after_read_block
Status get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(on_before_read_block(block));
RETURN_IF_ERROR(_do_get_next_block(block, read_rows, eof));
RETURN_IF_ERROR(on_after_read_block(block, read_rows));
return Status::OK();
}
// Type is always nullable to process illegal values.
// Results are cached after the first successful call.
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
if (_get_columns_cached) {
*name_to_type = _cached_name_to_type;
return Status::OK();
}
RETURN_IF_ERROR(_get_columns_impl(name_to_type));
_cached_name_to_type = *name_to_type;
_get_columns_cached = true;
return Status::OK();
}
virtual Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
return Status::NotSupported("get_columns is not implemented");
}
// This method is responsible for initializing the resource for parsing schema.
// It will be called before `get_parsed_schema`.
virtual Status init_schema_reader() {
return Status::NotSupported("init_schema_reader is not implemented for this reader.");
}
// `col_types` is always nullable to process illegal values.
virtual Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
return Status::NotSupported("get_parsed_schema is not implemented for this reader.");
}
~GenericReader() override = default;
virtual Status close() { return Status::OK(); }
Status read_by_rows(const std::list<int64_t>& row_ids) {
_read_by_rows = true;
_row_ids = row_ids;
return _set_read_one_line_impl();
}
/// The reader is responsible for counting the number of rows read,
/// because some readers, such as parquet/orc,
/// can skip some pages/rowgroups through indexes.
virtual bool count_read_rows() { return false; }
/// Returns true if on_before_init_reader has already set _column_descs.
bool has_column_descs() const { return _column_descs != nullptr; }
/// Unified initialization entry point (NVI pattern).
/// Enforces the template method sequence for ALL readers:
/// _open_file_reader → on_before_init_reader → _do_init_reader → on_after_init_reader
/// Subclasses implement _open_file_reader and _do_init_reader(ReaderInitContext*).
/// FileScanner constructs the appropriate ReaderInitContext subclass and calls this.
///
/// NOTE: During migration, readers not yet ported to this API still use their
/// format-specific init_reader(...) methods. This method is non-virtual so it
/// cannot be accidentally overridden.
Status init_reader(ReaderInitContext* ctx) {
// Apply push_down_agg_type early so _open_file_reader and _do_init_reader
// can use it (e.g., PaimonCppReader skips full init on COUNT pushdown).
// on_after_init_reader may reset this (e.g., Iceberg with equality deletes).
set_push_down_agg_type(ctx->push_down_agg_type);
RETURN_IF_ERROR(_open_file_reader(ctx));
// Standalone readers (delete file readers, push handler) set column_descs=nullptr
// and pre-populate column_names directly. Skip hooks for them.
if (ctx->column_descs != nullptr) {
RETURN_IF_ERROR(on_before_init_reader(ctx));
}
RETURN_IF_ERROR(_do_init_reader(ctx));
if (ctx->column_descs != nullptr) {
RETURN_IF_ERROR(on_after_init_reader(ctx));
}
return Status::OK();
}
/// Hook called before core init. Default just sets _column_descs.
/// TableFormatReader overrides with partition/missing column computation.
/// ORC/Parquet/Hive/Iceberg further override with format-specific schema matching.
virtual Status on_before_init_reader(ReaderInitContext* ctx) {
_column_descs = ctx->column_descs;
return Status::OK();
}
protected:
// ---- Init-time hooks (Template Method for init_reader) ----
/// Opens the file and prepares I/O resources before hooks run. Override in
/// subclasses to open files, read metadata, set up decompressors, etc.
/// For Parquet/ORC, opens the file and reads footer metadata.
/// For CSV/JSON, opens the file, creates decompressors, and sets up line readers.
/// Default is no-op (for JNI, Native, Arrow readers).
virtual Status _open_file_reader(ReaderInitContext* /*ctx*/) { return Status::OK(); }
/// Core initialization (format-specific). Subclasses override to perform
/// their actual parsing engine setup. The context should be downcast to
/// the appropriate subclass using checked_context_cast<T>.
/// Default returns NotSupported — readers not yet migrated to the unified
/// init_reader(ReaderInitContext*) API still use their old init methods.
virtual Status _do_init_reader(ReaderInitContext* /*ctx*/) {
return Status::NotSupported(
"_do_init_reader(ReaderInitContext*) not yet implemented for this reader");
}
// ---- Existing init-time hooks ----
/// Called after core init completes. Subclasses override to process
/// delete files, deletion vectors, etc.
virtual Status on_after_init_reader(ReaderInitContext* /*ctx*/) { return Status::OK(); }
// ---- Read-time hooks ----
/// Called before reading a block. Subclasses override to modify block
/// structure (e.g. add ACID columns, expand for equality delete).
virtual Status on_before_read_block(Block* block) { return Status::OK(); }
/// Core block reading. Subclasses must override with actual read logic.
virtual Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;
/// Called after reading a block. Subclasses override to post-process
/// (e.g. remove ACID columns, apply equality delete filter).
virtual Status on_after_read_block(Block* block, size_t* read_rows) { return Status::OK(); }
virtual Status _set_read_one_line_impl() {
return Status::NotSupported("read_by_rows is not implemented for this reader.");
}
const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding)
TPushAggOp::type _push_down_agg_type {};
public:
// Pass condition cache context to the reader for HIT/MISS tracking.
virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {}
// Returns true if this reader can produce an accurate total row count from metadata
// without reading actual data. Used to determine if CountReader decorator can be applied.
// Only ORC and Parquet readers support this (via file footer metadata).
virtual bool supports_count_pushdown() const { return false; }
// Returns the total number of rows the reader will produce.
// Used to pre-allocate condition cache with the correct number of granules.
virtual int64_t get_total_rows() const { return 0; }
// Returns true if this reader has delete operations (e.g. Iceberg position/equality deletes,
// Hive ACID deletes). Used to disable condition cache when deletes are present, since cached
// granule results may become stale if delete files change between queries.
virtual bool has_delete_operations() const { return false; }
protected:
bool _read_by_rows = false;
std::list<int64_t> _row_ids;
// Cache to save some common part such as file footer.
// Maybe null if not used
FileMetaCache* _meta_cache = nullptr;
// ---- Column descriptors (set by init_reader, owned by FileScanner) ----
const std::vector<ColumnDescriptor>* _column_descs = nullptr;
// ---- get_columns cache ----
bool _get_columns_cached = false;
std::unordered_map<std::string, DataTypePtr> _cached_name_to_type;
};
/// Provides an accessor for the current batch's row positions within the file.
/// Implemented by RowGroupReader (Parquet) and OrcReader.
class RowPositionProvider {
public:
virtual ~RowPositionProvider() = default;
virtual const std::vector<segment_v2::rowid_t>& current_batch_row_positions() const = 0;
};
} // namespace doris