blob: 9ee2e162d8e678d7f2e11bf99600e46128bf2da2 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import time
from typing import List, Optional
from pypaimon.api import BatchTableCommit
from pypaimon.pynative.write.commit_message_impl import CommitMessageImpl
from pypaimon.pynative.write.file_store_commit import FileStoreCommit
class BatchTableCommitImpl(BatchTableCommit):
"""Python implementation of BatchTableCommit for batch writing scenarios."""
def __init__(self, table, commit_user: str, static_partition: Optional[dict]):
from pypaimon.pynative.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.commit_user = commit_user
self.overwrite_partition = static_partition
self.file_store_commit = FileStoreCommit(table, commit_user)
self.batch_committed = False
def commit(self, commit_messages: List[CommitMessageImpl]):
self._check_committed()
non_empty_messages = [msg for msg in commit_messages if not msg.is_empty()]
if not non_empty_messages:
return
commit_identifier = int(time.time() * 1000)
try:
if self.overwrite_partition is not None:
self.file_store_commit.overwrite(
partition=self.overwrite_partition,
commit_messages=non_empty_messages,
commit_identifier=commit_identifier
)
else:
self.file_store_commit.commit(
commit_messages=non_empty_messages,
commit_identifier=commit_identifier
)
except Exception as e:
self.file_store_commit.abort(commit_messages)
raise RuntimeError(f"Failed to commit: {str(e)}") from e
def abort(self, commit_messages: List[CommitMessageImpl]):
self.file_store_commit.abort(commit_messages)
def close(self):
self.file_store_commit.close()
def _check_committed(self):
if self.batch_committed:
raise RuntimeError("BatchTableCommit only supports one-time committing.")
self.batch_committed = True