blob: 43c7de8ff3c66506c4679d4fd7acaa0ed7cf98e4 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
from enum import Enum
from typing import Dict, Optional
from datetime import timedelta
from pypaimon.common.memory_size import MemorySize
from pypaimon.common.options import Options
from pypaimon.common.options.config_options import ConfigOptions
from pypaimon.common.options.config_option import ConfigOption
class ExternalPathStrategy(str, Enum):
"""
Strategy for selecting external paths.
"""
NONE = "none"
ROUND_ROBIN = "round-robin"
SPECIFIC_FS = "specific-fs"
class MergeEngine(str, Enum):
"""
Specifies the merge engine for table with primary key.
"""
DEDUPLICATE = "deduplicate"
PARTIAL_UPDATE = "partial-update"
AGGREGATE = "aggregation"
FIRST_ROW = "first-row"
class CoreOptions:
"""Core options for Paimon tables."""
# File format constants
FILE_FORMAT_ORC: str = "orc"
FILE_FORMAT_AVRO: str = "avro"
FILE_FORMAT_PARQUET: str = "parquet"
FILE_FORMAT_BLOB: str = "blob"
FILE_FORMAT_LANCE: str = "lance"
# Basic options
AUTO_CREATE: ConfigOption[bool] = (
ConfigOptions.key("auto-create")
.boolean_type()
.default_value(False)
.with_description("Whether to auto create table.")
)
PATH: ConfigOption[str] = (
ConfigOptions.key("path")
.string_type()
.no_default_value()
.with_description("The file path of this table in the filesystem.")
)
TYPE: ConfigOption[str] = (
ConfigOptions.key("type")
.string_type()
.default_value("primary-key")
.with_description("Specify what type of table this is.")
)
BRANCH: ConfigOption[str] = (
ConfigOptions.key("branch")
.string_type()
.default_value("main")
.with_description("The branch name of this table.")
)
BUCKET: ConfigOption[int] = (
ConfigOptions.key("bucket")
.int_type()
.default_value(-1)
.with_description(
"Bucket number for file store. If bucket is -1, the parallelism of "
"sink determines the bucket number: write a record to the file corresponding to the "
"hash value of one or more fields. When creating table, bucket number must be specified."
)
)
BUCKET_KEY: ConfigOption[str] = (
ConfigOptions.key("bucket-key")
.string_type()
.no_default_value()
.with_description(
"Specify the hash key by bucket-key. "
"By default, if there is a primary key, the primary key will be used; "
"if there is no primary key, the full row will be used. "
"In this case, the sink parallelism must be set to the bucket number."
)
)
SCAN_MANIFEST_PARALLELISM: ConfigOption[int] = (
ConfigOptions.key("scan.manifest.parallelism")
.int_type()
.default_value(16)
.with_description("The parallelism for scanning manifest files.")
)
# File format options
FILE_FORMAT: ConfigOption[str] = (
ConfigOptions.key("file.format")
.string_type()
.default_value(FILE_FORMAT_ORC)
.with_description("Specify the message format of data files.")
)
FILE_COMPRESSION: ConfigOption[str] = (
ConfigOptions.key("file.compression")
.string_type()
.default_value("zstd")
.with_description("Default file compression format. For faster read and write, it is recommended to use zstd.")
)
FILE_COMPRESSION_ZSTD_LEVEL: ConfigOption[int] = (
ConfigOptions.key("file.compression.zstd-level")
.int_type()
.default_value(1)
.with_description(
"Default file compression zstd level. For higher compression rates, it can be configured to 9, "
"but the read and write speed will significantly decrease."
)
)
FILE_COMPRESSION_PER_LEVEL: ConfigOption[Dict[str, str]] = (
ConfigOptions.key("file.compression.per.level")
.map_type()
.default_value({})
.with_description(
"Define different compression policies for different level LSM data files, "
"you can add the level and the corresponding compression type."
)
)
FILE_FORMAT_PER_LEVEL: ConfigOption[Dict[str, str]] = (
ConfigOptions.key("file.format.per.level")
.map_type()
.default_value({})
.with_description(
"Define different format types for different level LSM data files, "
"you can add the level and the corresponding format type."
)
)
FILE_BLOCK_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("file.block-size")
.memory_type()
.no_default_value()
.with_description("Define the data block size.")
)
METADATA_STATS_MODE: ConfigOption[str] = (
ConfigOptions.key("metadata.stats-mode")
.string_type()
.default_value("none")
.with_description("Stats Mode, Python by default is none. Java is truncate(16).")
)
BLOB_AS_DESCRIPTOR: ConfigOption[bool] = (
ConfigOptions.key("blob-as-descriptor")
.boolean_type()
.default_value(False)
.with_description("Whether to use blob as descriptor.")
)
TARGET_FILE_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("target-file-size")
.memory_type()
.no_default_value()
.with_description("The target file size for data files.")
)
BLOB_TARGET_FILE_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("blob.target-file-size")
.memory_type()
.default_value(MemorySize.of_mebi_bytes(256))
.with_description("The target file size for blob files.")
)
DATA_FILE_PREFIX: ConfigOption[str] = (
ConfigOptions.key("data-file.prefix")
.string_type()
.default_value("data-")
.with_description("Specify the file name prefix of data files.")
)
# Scan options
SCAN_FALLBACK_BRANCH: ConfigOption[str] = (
ConfigOptions.key("scan.fallback-branch")
.string_type()
.no_default_value()
.with_description("The fallback branch for scanning.")
)
INCREMENTAL_BETWEEN_TIMESTAMP: ConfigOption[str] = (
ConfigOptions.key("incremental-between-timestamp")
.string_type()
.no_default_value()
.with_description("The timestamp range for incremental reading.")
)
SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("source.split.target-size")
.memory_type()
.default_value(MemorySize.of_mebi_bytes(128))
.with_description("The target size of a source split when scanning a table.")
)
SOURCE_SPLIT_OPEN_FILE_COST: ConfigOption[MemorySize] = (
ConfigOptions.key("source.split.open-file-cost")
.memory_type()
.default_value(MemorySize.of_mebi_bytes(4))
.with_description(
"The estimated cost to open a file, used when scanning a table. "
"It is used to avoid opening too many small files."
)
)
DELETION_VECTORS_ENABLED: ConfigOption[bool] = (
ConfigOptions.key("deletion-vectors.enabled")
.boolean_type()
.default_value(False)
.with_description("Whether to enable deletion vectors.")
)
MERGE_ENGINE: ConfigOption[MergeEngine] = (
ConfigOptions.key("merge-engine")
.enum_type(MergeEngine)
.default_value(MergeEngine.DEDUPLICATE)
.with_description("Specify the merge engine for table with primary key. "
"Options: deduplicate, partial-update, aggregation, first-row.")
)
# Commit options
COMMIT_USER_PREFIX: ConfigOption[str] = (
ConfigOptions.key("commit.user-prefix")
.string_type()
.no_default_value()
.with_description("The prefix for commit user.")
)
COMMIT_MAX_RETRIES: ConfigOption[int] = (
ConfigOptions.key("commit.max-retries")
.int_type()
.default_value(10)
.with_description("Maximum number of retries for commit operations.")
)
COMMIT_TIMEOUT: ConfigOption[timedelta] = (
ConfigOptions.key("commit.timeout")
.duration_type()
.no_default_value()
.with_description("Timeout for commit operations (e.g., '10s', '5m'). If not set, effectively unlimited.")
)
COMMIT_MIN_RETRY_WAIT: ConfigOption[timedelta] = (
ConfigOptions.key("commit.min-retry-wait")
.duration_type()
.default_value(timedelta(milliseconds=10))
.with_description("Minimum wait time between commit retries (e.g., '10ms', '100ms').")
)
COMMIT_MAX_RETRY_WAIT: ConfigOption[timedelta] = (
ConfigOptions.key("commit.max-retry-wait")
.duration_type()
.default_value(timedelta(seconds=10))
.with_description("Maximum wait time between commit retries (e.g., '1s', '10s').")
)
ROW_TRACKING_ENABLED: ConfigOption[bool] = (
ConfigOptions.key("row-tracking.enabled")
.boolean_type()
.default_value(False)
.with_description("Whether to enable row tracking.")
)
DATA_EVOLUTION_ENABLED: ConfigOption[bool] = (
ConfigOptions.key("data-evolution.enabled")
.boolean_type()
.default_value(False)
.with_description("Whether to enable data evolution.")
)
# External paths options
DATA_FILE_EXTERNAL_PATHS: ConfigOption[str] = (
ConfigOptions.key("data-file.external-paths")
.string_type()
.no_default_value()
.with_description("External paths for data files, separated by comma.")
)
DATA_FILE_EXTERNAL_PATHS_STRATEGY: ConfigOption[str] = (
ConfigOptions.key("data-file.external-paths.strategy")
.string_type()
.default_value(ExternalPathStrategy.NONE)
.with_description("Strategy for selecting external paths. Options: none, round-robin, specific-fs.")
)
DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS: ConfigOption[str] = (
ConfigOptions.key("data-file.external-paths.specific-fs")
.string_type()
.no_default_value()
.with_description("Specific filesystem for external paths when using specific-fs strategy.")
)
# Global Index options
GLOBAL_INDEX_ENABLED: ConfigOption[bool] = (
ConfigOptions.key("global-index.enabled")
.boolean_type()
.default_value(True)
.with_description("Whether to enable global index for scan.")
)
GLOBAL_INDEX_THREAD_NUM: ConfigOption[int] = (
ConfigOptions.key("global-index.thread-num")
.int_type()
.no_default_value()
.with_description(
"The maximum number of concurrent scanner for global index. "
"By default is the number of processors available."
)
)
# FAISS Vector Index options
VECTOR_DIM: ConfigOption[int] = (
ConfigOptions.key("vector.dim")
.int_type()
.default_value(128)
.with_description("The dimension of the vector.")
)
VECTOR_METRIC: ConfigOption[str] = (
ConfigOptions.key("vector.metric")
.string_type()
.default_value("L2")
.with_description("The similarity metric for vector search (L2, INNER_PRODUCT).")
)
VECTOR_INDEX_TYPE: ConfigOption[str] = (
ConfigOptions.key("vector.index-type")
.string_type()
.default_value("IVF_SQ8")
.with_description("The type of FAISS index (FLAT, HNSW, IVF, IVF_PQ, IVF_SQ8).")
)
VECTOR_M: ConfigOption[int] = (
ConfigOptions.key("vector.m")
.int_type()
.default_value(32)
.with_description("Maximum connections per element in HNSW index.")
)
VECTOR_EF_CONSTRUCTION: ConfigOption[int] = (
ConfigOptions.key("vector.ef-construction")
.int_type()
.default_value(40)
.with_description("Size of dynamic candidate list during HNSW construction.")
)
VECTOR_EF_SEARCH: ConfigOption[int] = (
ConfigOptions.key("vector.ef-search")
.int_type()
.default_value(16)
.with_description("Size of dynamic candidate list during HNSW search.")
)
VECTOR_NLIST: ConfigOption[int] = (
ConfigOptions.key("vector.nlist")
.int_type()
.default_value(100)
.with_description("Number of inverted lists (clusters) for IVF index.")
)
VECTOR_NPROBE: ConfigOption[int] = (
ConfigOptions.key("vector.nprobe")
.int_type()
.default_value(64)
.with_description("Number of clusters to visit during IVF search.")
)
VECTOR_SIZE_PER_INDEX: ConfigOption[int] = (
ConfigOptions.key("vector.size-per-index")
.int_type()
.default_value(2000000)
.with_description("Size of vectors stored in each vector index file.")
)
VECTOR_SEARCH_FACTOR: ConfigOption[int] = (
ConfigOptions.key("vector.search-factor")
.int_type()
.default_value(10)
.with_description("Multiplier for search limit when filtering is applied.")
)
VECTOR_NORMALIZE: ConfigOption[bool] = (
ConfigOptions.key("vector.normalize")
.boolean_type()
.default_value(False)
.with_description("Whether to L2 normalize vectors for cosine similarity.")
)
READ_BATCH_SIZE: ConfigOption[int] = (
ConfigOptions.key("read.batch-size")
.int_type()
.default_value(1024)
.with_description("Read batch size for any file format if it supports.")
)
def __init__(self, options: Options):
self.options = options
def set(self, key: ConfigOption, value):
self.options.set(key, value)
@staticmethod
def copy(options: 'CoreOptions') -> 'CoreOptions':
return CoreOptions.from_dict(dict(options.options.to_map()))
@staticmethod
def from_dict(options: dict) -> 'CoreOptions':
return CoreOptions(Options(options))
def path(self, default=None):
return self.options.get(CoreOptions.PATH, default)
def auto_create(self, default=None):
return self.options.get(CoreOptions.AUTO_CREATE, default)
def type(self, default=None):
return self.options.get(CoreOptions.TYPE, default)
def branch(self, default=None):
return self.options.get(CoreOptions.BRANCH, default)
def bucket(self, default=None):
return self.options.get(CoreOptions.BUCKET, default)
def bucket_key(self, default=None):
return self.options.get(CoreOptions.BUCKET_KEY, default)
def scan_manifest_parallelism(self, default=None):
return self.options.get(CoreOptions.SCAN_MANIFEST_PARALLELISM, default)
def file_format(self, default=None):
return self.options.get(CoreOptions.FILE_FORMAT, default)
def file_compression(self, default=None):
return self.options.get(CoreOptions.FILE_COMPRESSION, default)
def file_compression_zstd_level(self, default=None):
return self.options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL, default)
def file_compression_per_level(self, default=None):
return self.options.get(CoreOptions.FILE_COMPRESSION_PER_LEVEL, default)
def file_format_per_level(self, default=None):
return self.options.get(CoreOptions.FILE_FORMAT_PER_LEVEL, default)
def file_block_size(self, default=None):
return self.options.get(CoreOptions.FILE_BLOCK_SIZE, default)
def metadata_stats_enabled(self, default=None):
return self.options.get(CoreOptions.METADATA_STATS_MODE, default) == "full"
def blob_as_descriptor(self, default=None):
return self.options.get(CoreOptions.BLOB_AS_DESCRIPTOR, default)
def target_file_size(self, has_primary_key, default=None):
return self.options.get(CoreOptions.TARGET_FILE_SIZE,
MemorySize.of_mebi_bytes(
128 if has_primary_key else 256) if default is None else MemorySize.parse(
default)).get_bytes()
def blob_target_file_size(self, default=None):
"""
Args:
default: The standby value when the configuration item does not exist. If default is also None,
then use the default value of the configuration item itself.
"""
if self.options.contains(CoreOptions.BLOB_TARGET_FILE_SIZE):
return self.options.get(CoreOptions.BLOB_TARGET_FILE_SIZE, None).get_bytes()
elif default is not None:
return MemorySize.parse(default).get_bytes()
else:
return self.target_file_size(has_primary_key=False)
def data_file_prefix(self, default=None):
return self.options.get(CoreOptions.DATA_FILE_PREFIX, default)
def scan_fallback_branch(self, default=None):
return self.options.get(CoreOptions.SCAN_FALLBACK_BRANCH, default)
def incremental_between_timestamp(self, default=None):
return self.options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, default)
def source_split_target_size(self, default=None):
return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE, default).get_bytes()
def source_split_open_file_cost(self, default=None):
return self.options.get(CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST, default).get_bytes()
def commit_user_prefix(self, default=None):
return self.options.get(CoreOptions.COMMIT_USER_PREFIX, default)
def row_tracking_enabled(self, default=None):
return self.options.get(CoreOptions.ROW_TRACKING_ENABLED, default)
def data_evolution_enabled(self, default=None):
return self.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, default)
def deletion_vectors_enabled(self, default=None):
return self.options.get(CoreOptions.DELETION_VECTORS_ENABLED, default)
def merge_engine(self, default=None):
return self.options.get(CoreOptions.MERGE_ENGINE, default)
def data_file_external_paths(self, default=None):
external_paths_str = self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS, default)
if not external_paths_str:
return None
return [path.strip() for path in external_paths_str.split(",") if path.strip()]
def data_file_external_paths_strategy(self, default=None):
return self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, default)
def data_file_external_paths_specific_fs(self, default=None):
return self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS, default)
def commit_max_retries(self) -> int:
return self.options.get(CoreOptions.COMMIT_MAX_RETRIES)
def commit_timeout(self) -> int:
timeout = self.options.get(CoreOptions.COMMIT_TIMEOUT)
if timeout is None:
return sys.maxsize
return int(timeout.total_seconds() * 1000)
def commit_min_retry_wait(self) -> int:
wait = self.options.get(CoreOptions.COMMIT_MIN_RETRY_WAIT)
return int(wait.total_seconds() * 1000)
def commit_max_retry_wait(self) -> int:
wait = self.options.get(CoreOptions.COMMIT_MAX_RETRY_WAIT)
return int(wait.total_seconds() * 1000)
def global_index_enabled(self, default=None):
return self.options.get(CoreOptions.GLOBAL_INDEX_ENABLED, default)
def global_index_thread_num(self) -> Optional[int]:
return self.options.get(CoreOptions.GLOBAL_INDEX_THREAD_NUM)
def vector_dim(self, default=None):
return self.options.get(CoreOptions.VECTOR_DIM, default)
def vector_metric(self, default=None):
return self.options.get(CoreOptions.VECTOR_METRIC, default)
def vector_index_type(self, default=None):
return self.options.get(CoreOptions.VECTOR_INDEX_TYPE, default)
def vector_m(self, default=None):
return self.options.get(CoreOptions.VECTOR_M, default)
def vector_ef_construction(self, default=None):
return self.options.get(CoreOptions.VECTOR_EF_CONSTRUCTION, default)
def vector_ef_search(self, default=None):
return self.options.get(CoreOptions.VECTOR_EF_SEARCH, default)
def vector_nlist(self, default=None):
return self.options.get(CoreOptions.VECTOR_NLIST, default)
def vector_nprobe(self, default=None):
return self.options.get(CoreOptions.VECTOR_NPROBE, default)
def vector_size_per_index(self, default=None):
return self.options.get(CoreOptions.VECTOR_SIZE_PER_INDEX, default)
def vector_search_factor(self, default=None):
return self.options.get(CoreOptions.VECTOR_SEARCH_FACTOR, default)
def vector_normalize(self, default=None):
return self.options.get(CoreOptions.VECTOR_NORMALIZE, default)
def read_batch_size(self, default=None) -> int:
return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024)