blob: cde282eff00556f1de814f66974c885823df08f6 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pathlib import Path
from typing import Optional
from pypaimon.catalog.catalog_environment import CatalogEnvironment
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.read.read_builder import ReadBuilder
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.table_schema import TableSchema
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.table import Table
from pypaimon.write.batch_write_builder import BatchWriteBuilder
from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
FixedBucketRowKeyExtractor,
PostponeBucketRowKeyExtractor,
RowKeyExtractor,
UnawareBucketRowKeyExtractor)
class FileStoreTable(Table):
def __init__(self, file_io: FileIO, identifier: Identifier, table_path: Path,
table_schema: TableSchema, catalog_environment: Optional[CatalogEnvironment] = None):
self.file_io = file_io
self.identifier = identifier
self.table_path = table_path
self.catalog_environment = catalog_environment or CatalogEnvironment.empty()
self.table_schema = table_schema
self.fields = table_schema.fields
self.field_names = [field.name for field in table_schema.fields]
self.field_dict = {field.name: field for field in self.fields}
self.primary_keys = table_schema.primary_keys
self.primary_keys_fields = [self.field_dict[name] for name in self.primary_keys]
self.partition_keys = table_schema.partition_keys
self.partition_keys_fields = [self.field_dict[name] for name in self.partition_keys]
self.trimmed_primary_keys = [pk for pk in self.primary_keys if pk not in self.partition_keys]
self.trimmed_primary_keys_fields = [self.field_dict[name] for name in self.trimmed_primary_keys]
self.options = table_schema.options
self.cross_partition_update = self.table_schema.cross_partition_update()
self.is_primary_key_table = bool(self.primary_keys)
self.total_buckets = int(table_schema.options.get(CoreOptions.BUCKET, -1))
self.schema_manager = SchemaManager(file_io, table_path)
def current_branch(self) -> str:
"""Get the current branch name from options."""
return self.options.get(CoreOptions.BRANCH, "main")
def snapshot_manager(self):
"""Get the snapshot manager for this table."""
from pypaimon.snapshot.snapshot_manager import SnapshotManager
return SnapshotManager(self)
def new_snapshot_commit(self):
"""Create a new SnapshotCommit instance using the catalog environment."""
return self.catalog_environment.snapshot_commit(self.snapshot_manager())
def bucket_mode(self) -> BucketMode:
if self.is_primary_key_table:
if int(self.options.get(CoreOptions.BUCKET, -1)) == -2:
return BucketMode.POSTPONE_MODE
elif int(self.options.get(CoreOptions.BUCKET, -1)) == -1:
if self.cross_partition_update:
return BucketMode.CROSS_PARTITION
else:
return BucketMode.HASH_DYNAMIC
else:
return BucketMode.HASH_FIXED
else:
if int(self.options.get(CoreOptions.BUCKET, -1)) == -1:
return BucketMode.BUCKET_UNAWARE
else:
return BucketMode.HASH_FIXED
def new_read_builder(self) -> 'ReadBuilder':
return ReadBuilder(self)
def new_batch_write_builder(self) -> BatchWriteBuilder:
return BatchWriteBuilder(self)
def create_row_key_extractor(self) -> RowKeyExtractor:
bucket_mode = self.bucket_mode()
if bucket_mode == BucketMode.HASH_FIXED:
return FixedBucketRowKeyExtractor(self.table_schema)
elif bucket_mode == BucketMode.BUCKET_UNAWARE:
return UnawareBucketRowKeyExtractor(self.table_schema)
elif bucket_mode == BucketMode.POSTPONE_MODE:
return PostponeBucketRowKeyExtractor(self.table_schema)
elif bucket_mode == BucketMode.HASH_DYNAMIC or bucket_mode == BucketMode.CROSS_PARTITION:
return DynamicBucketRowKeyExtractor(self.table_schema)
else:
raise ValueError(f"Unsupported bucket mode: {bucket_mode}")
def copy(self, options: dict) -> 'FileStoreTable':
if CoreOptions.BUCKET in options and options.get(CoreOptions.BUCKET) != self.options.get(CoreOptions.BUCKET):
raise ValueError("Cannot change bucket number")
new_options = self.options.copy()
for k, v in options.items():
if v is None:
new_options.pop(k)
else:
new_options[k] = v
new_table_schema = self.table_schema.copy(new_options=new_options)
return FileStoreTable(self.file_io, self.identifier, self.table_path, new_table_schema,
self.catalog_environment)
def add_options(self, options: dict):
for key, value in options.items():
self.options[key] = value