blob: e4d79b2ffd54c6df174cda3fd89a5957b01a09c8 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <vector>
#include "paimon/predicate/predicate.h"
#include "paimon/result.h"
#include "paimon/type_fwd.h"
#include "paimon/utils/read_ahead_cache.h"
#include "paimon/visibility.h"
namespace paimon {
class Executor;
class MemoryPool;
class Predicate;
class FileSystem;
/// `ReadContext` is some configuration for read operations.
///
/// Please do not use this class directly, use `ReadContextBuilder` to build a `ReadContext` which
/// has input validation.
/// @see ReadContextBuilder
class PAIMON_EXPORT ReadContext {
public:
ReadContext(const std::string& path, const std::string& branch,
const std::vector<std::string>& read_schema,
const std::vector<int32_t>& read_field_ids,
const std::shared_ptr<Predicate>& predicate, bool enable_predicate_filter,
bool enable_prefetch, uint32_t prefetch_batch_count,
uint32_t prefetch_max_parallel_num, bool enable_multi_thread_row_to_batch,
uint32_t row_to_batch_thread_number, const std::optional<std::string>& table_schema,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor,
const std::shared_ptr<FileSystem>& specific_file_system,
const std::map<std::string, std::string>& fs_scheme_to_identifier_map,
const std::map<std::string, std::string>& options,
PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config);
~ReadContext();
const std::string& GetPath() const {
return path_;
}
const std::string& GetBranch() const {
return branch_;
}
const std::map<std::string, std::string>& GetFileSystemSchemeToIdentifierMap() const {
return fs_scheme_to_identifier_map_;
}
const std::map<std::string, std::string>& GetOptions() const {
return options_;
}
const std::vector<std::string>& GetReadSchema() const {
return read_schema_;
}
const std::vector<int32_t>& GetReadFieldIds() const {
return read_field_ids_;
}
const std::shared_ptr<Predicate>& GetPredicate() const {
return predicate_;
}
bool EnablePredicateFilter() const {
return enable_predicate_filter_;
}
bool EnablePrefetch() const {
return enable_prefetch_;
}
uint32_t GetPrefetchBatchCount() const {
return prefetch_batch_count_;
}
uint32_t GetPrefetchMaxParallelNum() const {
return prefetch_max_parallel_num_;
}
bool EnableMultiThreadRowToBatch() const {
return enable_multi_thread_row_to_batch_;
}
uint32_t GetRowToBatchThreadNumber() const {
return row_to_batch_thread_number_;
}
const std::optional<std::string>& GetSpecificTableSchema() {
return table_schema_;
}
std::shared_ptr<MemoryPool> GetMemoryPool() const {
return memory_pool_;
}
std::shared_ptr<Executor> GetExecutor() const {
return executor_;
}
std::shared_ptr<FileSystem> GetSpecificFileSystem() const {
return specific_file_system_;
}
PrefetchCacheMode GetPrefetchCacheMode() const {
return prefetch_cache_mode_;
}
const CacheConfig& GetCacheConfig() const {
return cache_config_;
}
private:
std::string path_;
std::string branch_;
std::vector<std::string> read_schema_;
std::vector<int32_t> read_field_ids_;
std::shared_ptr<Predicate> predicate_;
bool enable_predicate_filter_;
bool enable_prefetch_;
uint32_t prefetch_batch_count_;
uint32_t prefetch_max_parallel_num_;
bool enable_multi_thread_row_to_batch_;
uint32_t row_to_batch_thread_number_;
std::optional<std::string> table_schema_;
std::shared_ptr<MemoryPool> memory_pool_;
std::shared_ptr<Executor> executor_;
std::shared_ptr<FileSystem> specific_file_system_;
std::map<std::string, std::string> fs_scheme_to_identifier_map_;
std::map<std::string, std::string> options_;
PrefetchCacheMode prefetch_cache_mode_;
CacheConfig cache_config_;
};
/// `ReadContextBuilder` used to build a `ReadContext`, has input validation.
class PAIMON_EXPORT ReadContextBuilder {
public:
/// Constructs a `ReadContextBuilder` with required parameters.
/// @param path The root path of the table.
explicit ReadContextBuilder(const std::string& path);
~ReadContextBuilder();
ReadContextBuilder(ReadContextBuilder&&) noexcept;
ReadContextBuilder& operator=(ReadContextBuilder&&) noexcept;
/// Set the schema fields to read from the table.
///
/// If not set, all fields from the table schema will be read. This is useful for
/// projection pushdown to reduce I/O and improve performance by reading only
/// the required columns.
///
/// @param read_field_names Vector of field names to read from the table.
/// @return Reference to this builder for method chaining.
/// @note Currently supports top-level field selection. Future versions may support
/// nested field selection using ArrowSchema for more granular projection
ReadContextBuilder& SetReadSchema(const std::vector<std::string>& read_field_names);
/// Set the schema fields to read from the table.
///
/// If not set, all fields from the table schema will be read. This is useful for
/// projection pushdown to reduce I/O and improve performance by reading only
/// the required columns.
///
/// @param read_field_ids Vector of field ids to read from the table.
/// @return Reference to this builder for method chaining.
/// @note Currently supports top-level field selection. Future versions may support
/// nested field selection using ArrowSchema for more granular projection.
/// @note SetReadFieldIds() and SetReadSchema() are mutually exclusive.
/// Calling both will ignore the read schema set by SetReadSchema().
ReadContextBuilder& SetReadFieldIds(const std::vector<int32_t>& read_field_ids);
/// Set a configuration options map to set some option entries which are not defined in the
/// table schema or whose values you want to overwrite.
/// @note The options map will clear the options added by `AddOption()` before.
/// @param options The configuration options map.
/// @return Reference to this builder for method chaining.
ReadContextBuilder& SetOptions(const std::map<std::string, std::string>& options);
/// Add a single configuration option which is not defined in the table schema or whose value
/// you want to overwrite.
///
/// If you want to add multiple options, call `AddOption()` multiple times or use `SetOptions()`
/// instead.
/// @param key The option key.
/// @param value The option value.
/// @return Reference to this builder for method chaining.
ReadContextBuilder& AddOption(const std::string& key, const std::string& value);
/// Set a predicate for filtering data during reading.
///
/// The predicate is used for both partition pruning and data filtering.
/// It can significantly improve performance by reducing the amount of data
/// that needs to be read and processed.
///
/// @param predicate Shared pointer to the predicate for data filtering.
/// @return Reference to this builder for method chaining.
ReadContextBuilder& SetPredicate(const std::shared_ptr<Predicate>& predicate);
/// Whether to perform precise filtering according to predicates for data read from format
/// reader.
/// @param enabled Whether to enable precise filtering (default: false)
/// @return Reference to this builder for method chaining.
ReadContextBuilder& EnablePredicateFilter(bool enabled);
/// Enable or disable prefetching of data batches from individual files.
///
/// When enabled, the reader will prefetch multiple batches in parallel to
/// improve throughput by overlapping I/O with computation. This is particularly
/// beneficial for high-latency storage systems.
///
/// @param enabled Whether to enable prefetching (default: false)
/// @return Reference to this builder for method chaining.
ReadContextBuilder& EnablePrefetch(bool enabled);
/// Set prefetch cache mode for read operations.
///
/// A prefetch cache is used to prebuffer data ranges before they are needed,
/// which can improve read performance by reducing redundant I/O operations.
/// @param mode (default: PrefetchCacheMode::ALWAYS)
/// @return Reference to this builder for method chaining.
ReadContextBuilder& SetPrefetchCacheMode(PrefetchCacheMode mode);
/// Set the cache configuration for prefetch read operations.
///
/// @param config The cache configuration to use.
/// @return Reference to this builder for method chaining.
ReadContextBuilder& WithCacheConfig(const CacheConfig& config);
/// Set the total number of batches to prefetch across all files.
///
/// This controls the memory usage and parallelism of the prefetching mechanism.
/// Higher values can improve throughput but consume more memory.
///
/// @param batch_count Total number of batches to prefetch (default: 600)
/// @return Reference to this builder for method chaining.
ReadContextBuilder& SetPrefetchBatchCount(uint32_t batch_count);
/// Set the maximum number of parallel prefetch operations.
///
/// This limits the number of concurrent I/O operations to prevent overwhelming
/// the storage system or consuming excessive system resources.
///
/// @param parallel_num Maximum parallel prefetch operations (default: 3)
/// @return Reference to this builder for method chaining.
ReadContextBuilder& SetPrefetchMaxParallelNum(uint32_t parallel_num);
/// Enable or disable multi-threaded row-to-batch conversion in merge-on-read scenarios.
///
/// When enabled, multiple threads are used to convert row data to batch format
/// during merge operations, which can improve performance for CPU-intensive
/// merge operations.
///
/// @param enabled Whether to enable multi-threaded conversion (default: false)
/// @return Reference to this builder for method chaining.
ReadContextBuilder& EnableMultiThreadRowToBatch(bool enabled);
/// Set the number of threads for row-to-batch conversion in merge-on-read scenarios.
///
/// This controls the parallelism of row-to-batch conversion during merge operations.
/// Higher values can improve performance but may affect result ordering.
///
/// @param thread_number Number of conversion threads (default: 1)
/// @return Reference to this builder for method chaining.
/// @note If thread_number > 1, Arrow batches from the reader may not be in primary key order.
ReadContextBuilder& SetRowToBatchThreadNumber(uint32_t thread_number);
/// Set custom memory pool for memory management.
/// @param memory_pool The memory pool to use.
/// @return Reference to this builder for method chaining.
/// @note If not set, the default system memory pool will be used.
ReadContextBuilder& WithMemoryPool(const std::shared_ptr<MemoryPool>& memory_pool);
/// Set custom executor for task execution.
/// @param executor The executor to use.
/// @return Reference to this builder for method chaining.
/// @note If not set, the default system executor will be used.
ReadContextBuilder& WithExecutor(const std::shared_ptr<Executor>& executor);
/// Set the table schema as a string to avoid schema loading I/O operations.
///
/// This optimization allows the reader to use a pre-loaded schema instead of
/// reading it from the table metadata, which can improve performance especially
/// in scenarios with many small read operations.
///
/// @param table_schema String representation of the table schema.
/// @return Reference to this builder for method chaining.
/// @note The user must ensure that the schema string is valid and matches the table.
/// @note If not set, the schema will be loaded from the table path.
ReadContextBuilder& SetTableSchema(const std::string& table_schema);
/// Set the specific branch to read from in a versioned table.
///
/// Paimon supports branching for data versioning and time travel queries.
/// This method allows reading from a specific branch instead of the main branch.
///
/// @param branch Name of the branch to read from.
/// @return Reference to this builder for method chaining.
/// @note Default branch is "main" if not specified.
ReadContextBuilder& WithBranch(const std::string& branch);
/// Sets a mapping from URI schemes (e.g., "file", "oss") to registered file system
/// identifiers. This allows selecting different pre-registered file system implementations
/// based on the URI scheme at runtime.
///
/// @param fs_scheme_to_identifier_map Map from URI scheme (like "oss") to the corresponding
/// file system identifier.
/// @return Reference to this builder for method chaining.
/// @note
/// - This method is intended for environments where multiple file systems are pre-registered.
/// - The specified identifiers must correspond to file systems that have been registered at
/// compile time or initialization.
/// - Cannot be used together with `WithFileSystem()`.
/// - If not set, use default file system (configured in `Options::FILE_SYSTEM`).
/// Example:
/// builder.WithFileSystemSchemeToIdentifierMap({{"oss", "jindo"}, {"file", "local"}});
///
ReadContextBuilder& WithFileSystemSchemeToIdentifierMap(
const std::map<std::string, std::string>& fs_scheme_to_identifier_map);
/// Sets a custom file system instance to be used for all file operations in this read context.
/// This bypasses the global file system registry and uses the provided implementation directly.
///
/// @param file_system The file system to use.
/// @return Reference to this builder for method chaining.
/// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`)
ReadContextBuilder& WithFileSystem(const std::shared_ptr<FileSystem>& file_system);
/// Build and return a `ReadContext` instance with input validation.
/// @return Result containing the constructed `ReadContext` or an error status.
Result<std::unique_ptr<ReadContext>> Finish();
private:
class Impl;
std::unique_ptr<Impl> impl_;
};
} // namespace paimon