blob: cd57dc5b42e0114c795a7f4938c5b1ea492387ff [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import time
from pathlib import Path
from typing import List
from pypaimon.pynative.table.snapshot import Snapshot
from pypaimon.pynative.write.commit_message_impl import CommitMessageImpl
from pypaimon.pynative.table.manifest_file_manager import ManifestFileManager
from pypaimon.pynative.table.manifest_list_manager import ManifestListManager
from pypaimon.pynative.table.snapshot_manager import SnapshotManager
class FileStoreCommit:
"""Core commit logic for file store operations."""
def __init__(self, table, commit_user: str):
from pypaimon.pynative.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.commit_user = commit_user
self.snapshot_manager = SnapshotManager(table)
self.manifest_file_manager = ManifestFileManager(table)
self.manifest_list_manager = ManifestListManager(table)
self.manifest_target_size = 8 * 1024 * 1024
self.manifest_merge_min_count = 30
def commit(self, commit_messages: List[CommitMessageImpl], commit_identifier: int):
"""Commit the given commit messages in normal append mode."""
if not commit_messages:
return
new_manifest_files = self.manifest_file_manager.write(commit_messages)
if not new_manifest_files:
return
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
existing_manifest_files = []
if latest_snapshot:
existing_manifest_files = self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
new_manifest_files.extend(existing_manifest_files)
manifest_list = self.manifest_list_manager.write(new_manifest_files)
new_snapshot_id = self._generate_snapshot_id()
snapshot_data = Snapshot(
version=3,
id=new_snapshot_id,
schema_id=0,
base_manifest_list=manifest_list,
delta_manifest_list=manifest_list,
commit_user=self.commit_user,
commit_identifier=commit_identifier,
commit_kind="APPEND",
time_millis=int(time.time() * 1000),
log_offsets={},
)
self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
def overwrite(self, partition, commit_messages: List[CommitMessageImpl], commit_identifier: int):
if not commit_messages:
return
new_manifest_files = self.manifest_file_manager.write(commit_messages)
if not new_manifest_files:
return
# In overwrite mode, we don't merge with existing manifests
manifest_list = self.manifest_list_manager.write(new_manifest_files)
new_snapshot_id = self._generate_snapshot_id()
snapshot_data = Snapshot(
version=3,
id=new_snapshot_id,
schema_id=0,
base_manifest_list=manifest_list,
delta_manifest_list=manifest_list,
commit_user=self.commit_user,
commit_identifier=commit_identifier,
commit_kind="OVERWRITE",
time_millis=int(time.time() * 1000),
log_offsets={},
)
self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
def abort(self, commit_messages: List[CommitMessageImpl]):
for message in commit_messages:
for file in message.new_files():
try:
file_path_obj = Path(file.file_path)
if file_path_obj.exists():
file_path_obj.unlink()
except Exception as e:
print(f"Warning: Failed to clean up file {file.file_path}: {e}")
def close(self):
pass
def _generate_snapshot_id(self) -> int:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
if latest_snapshot:
return latest_snapshot.id + 1
else:
return 1