blob: 50727b6ce37cfae9b970a89c2bd553e64c2d6aab [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List
from pypaimon.common.json_util import json_field
from pypaimon.snapshot.snapshot import Snapshot
@dataclass
class PartitionStatistics:
"""
Represents partition statistics for snapshot commits.
This class matches the Java org.apache.paimon.partition.PartitionStatistics
structure for proper JSON serialization in REST API calls.
"""
spec: Dict[str, str] = json_field("spec", default_factory=dict)
record_count: int = json_field("recordCount", default=0)
file_size_in_bytes: int = json_field("fileSizeInBytes", default=0)
file_count: int = json_field("fileCount", default=0)
last_file_creation_time: int = json_field("lastFileCreationTime", default_factory=lambda: int(time.time() * 1000))
@classmethod
def create(cls, partition_spec: Dict[str, str] = None, record_count: int = 0,
file_count: int = 0, file_size_in_bytes: int = 0,
last_file_creation_time: int = None) -> 'PartitionStatistics':
"""
Factory method to create PartitionStatistics with backward compatibility.
Args:
partition_spec: Partition specification dictionary
record_count: Number of records
file_count: Number of files
file_size_in_bytes: Total file size in bytes
last_file_creation_time: Last file creation time in milliseconds
Returns:
PartitionStatistics instance
"""
return cls(
spec=partition_spec or {},
record_count=record_count,
file_count=file_count,
file_size_in_bytes=file_size_in_bytes,
last_file_creation_time=last_file_creation_time or int(time.time() * 1000)
)
class SnapshotCommit(ABC):
"""Interface to commit snapshot atomically."""
@abstractmethod
def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStatistics]) -> bool:
"""
Commit the given snapshot.
Args:
snapshot: The snapshot to commit
branch: The branch name to commit to
statistics: List of partition statistics
Returns:
True if commit was successful, False otherwise
Raises:
Exception: If commit fails
"""
pass
@abstractmethod
def close(self):
"""Close the snapshot commit and release any resources."""
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()