blob: afd061c97b97d1cb455d7eddfa92116aa13e0b11 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import uuid
import fastavro
from typing import List, Optional
from io import BytesIO
from pypaimon.pynative.table.snapshot import Snapshot
class ManifestListManager:
"""Manager for manifest list files in Avro format using unified FileIO."""
def __init__(self, table):
from pypaimon.pynative.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.manifest_path = self.table.table_path / "manifest"
self.file_io = self.table.file_io
def read_all_manifest_files(self, snapshot: Snapshot) -> List[str]:
manifest_files = []
base_manifests = self.read(snapshot.base_manifest_list)
manifest_files.extend(base_manifests)
delta_manifests = self.read(snapshot.delta_manifest_list)
manifest_files.extend(delta_manifests)
return list(set(manifest_files))
def read(self, manifest_list_name: str) -> List[str]:
manifest_list_path = self.manifest_path / manifest_list_name
manifest_paths = []
with self.file_io.new_input_stream(manifest_list_path) as input_stream:
avro_bytes = input_stream.read()
buffer = BytesIO(avro_bytes)
reader = fastavro.reader(buffer)
for record in reader:
file_name = record['_FILE_NAME']
manifest_paths.append(file_name)
return manifest_paths
def write(self, manifest_file_names: List[str]) -> Optional[str]:
if not manifest_file_names:
return None
avro_records = []
for manifest_file_name in manifest_file_names:
avro_record = {
"_FILE_NAME": manifest_file_name,
"_FILE_SIZE": 0, # TODO
"_NUM_ADDED_FILES": 0,
"_NUM_DELETED_FILES": 0,
"_PARTITION_STATS": 0,
"_SCHEMA_ID": 0,
}
avro_records.append(avro_record)
list_filename = f"manifest-list-{str(uuid.uuid4())}.avro"
list_path = self.manifest_path / list_filename
try:
buffer = BytesIO()
fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records)
avro_bytes = buffer.getvalue()
with self.file_io.new_output_stream(list_path) as output_stream:
output_stream.write(avro_bytes)
return list_filename
except Exception as e:
self.file_io.delete_quietly(list_path)
raise RuntimeError(f"Failed to write manifest list file: {e}") from e
MANIFEST_FILE_META_SCHEMA = {
"type": "record",
"name": "ManifestFileMeta",
"fields": [
{"name": "_FILE_NAME", "type": "string"},
{"name": "_FILE_SIZE", "type": "long"},
{"name": "_NUM_ADDED_FILES", "type": "long"},
{"name": "_NUM_DELETED_FILES", "type": "long"},
{"name": "_PARTITION_STATS", "type": "long"}, # TODO
{"name": "_SCHEMA_ID", "type": "long"},
]
}