blob: 4ba82bd80e39229a60d8c4d70518cffdc1294faf [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""
FieldBunch classes for organizing files by field in data evolution.
These classes help organize DataFileMeta objects into groups based on their field content,
supporting both regular data files and blob files.
"""
from abc import ABC
from typing import List
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
class FieldBunch(ABC):
"""Interface for files organized by field."""
def row_count(self) -> int:
"""Return the total row count for this bunch."""
...
def files(self) -> List[DataFileMeta]:
"""Return the list of files in this bunch."""
...
class DataBunch(FieldBunch):
"""Files for a single data file."""
def __init__(self, data_file: DataFileMeta):
self.data_file = data_file
def row_count(self) -> int:
return self.data_file.row_count
def files(self) -> List[DataFileMeta]:
return [self.data_file]
class BlobBunch(FieldBunch):
"""Files for partial field (blob files)."""
def __init__(self, expected_row_count: int):
self._files: List[DataFileMeta] = []
self.expected_row_count = expected_row_count
self.latest_first_row_id = -1
self.expected_next_first_row_id = -1
self.latest_max_sequence_number = -1
self._row_count = 0
def add(self, file: DataFileMeta) -> None:
"""Add a blob file to this bunch."""
if not self._is_blob_file(file.file_name):
raise ValueError("Only blob file can be added to a blob bunch.")
if file.first_row_id == self.latest_first_row_id:
if file.max_sequence_number >= self.latest_max_sequence_number:
raise ValueError(
"Blob file with same first row id should have decreasing sequence number."
)
return
if self._files:
first_row_id = file.first_row_id
if first_row_id < self.expected_next_first_row_id:
if file.max_sequence_number >= self.latest_max_sequence_number:
raise ValueError(
"Blob file with overlapping row id should have decreasing sequence number."
)
return
elif first_row_id > self.expected_next_first_row_id:
raise ValueError(
f"Blob file first row id should be continuous, expect "
f"{self.expected_next_first_row_id} but got {first_row_id}"
)
if file.schema_id != self._files[0].schema_id:
raise ValueError(
"All files in a blob bunch should have the same schema id."
)
if file.write_cols != self._files[0].write_cols:
raise ValueError(
"All files in a blob bunch should have the same write columns."
)
self._files.append(file)
self._row_count += file.row_count
if self._row_count > self.expected_row_count:
raise ValueError(
f"Blob files row count exceed the expect {self.expected_row_count}"
)
self.latest_max_sequence_number = file.max_sequence_number
self.latest_first_row_id = file.first_row_id
self.expected_next_first_row_id = self.latest_first_row_id + file.row_count
def row_count(self) -> int:
return self._row_count
def files(self) -> List[DataFileMeta]:
return self._files
@staticmethod
def _is_blob_file(file_name: str) -> bool:
"""Check if a file is a blob file based on its extension."""
return file_name.endswith('.blob')