| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| |
| import uuid |
| import fastavro |
| from typing import List |
| from io import BytesIO |
| |
| from pypaimon.pynative.row.binary_row import BinaryRowDeserializer, BinaryRowSerializer, BinaryRow |
| from pypaimon.pynative.table.data_file_meta import DataFileMeta |
| from pypaimon.pynative.table.manifest_entry import ManifestEntry |
| from pypaimon.pynative.write.commit_message_impl import CommitMessageImpl |
| |
| |
| class ManifestFileManager: |
| """Writer for manifest files in Avro format using unified FileIO.""" |
| |
| def __init__(self, table): |
| from pypaimon.pynative.table.file_store_table import FileStoreTable |
| |
| self.table: FileStoreTable = table |
| self.manifest_path = table.table_path / "manifest" |
| self.file_io = table.file_io |
| self.partition_key_fields = self.table.table_schema.get_partition_key_fields() |
| self.primary_key_fields = self.table.table_schema.get_primary_key_fields() |
| self.trimmed_primary_key_fields = self.table.table_schema.get_trimmed_primary_key_fields() |
| |
| def read(self, manifest_file_name: str) -> List[ManifestEntry]: |
| manifest_file_path = self.manifest_path / manifest_file_name |
| |
| entries = [] |
| with self.file_io.new_input_stream(manifest_file_path) as input_stream: |
| avro_bytes = input_stream.read() |
| buffer = BytesIO(avro_bytes) |
| reader = fastavro.reader(buffer) |
| |
| for record in reader: |
| file_info = dict(record['_FILE']) |
| file_meta = DataFileMeta( |
| file_name=file_info['_FILE_NAME'], |
| file_size=file_info['_FILE_SIZE'], |
| row_count=file_info['_ROW_COUNT'], |
| min_key=BinaryRowDeserializer.from_bytes(file_info['_MIN_KEY'], self.trimmed_primary_key_fields), |
| max_key=BinaryRowDeserializer.from_bytes(file_info['_MAX_KEY'], self.trimmed_primary_key_fields), |
| key_stats=None, # TODO |
| value_stats=None, # TODO |
| min_sequence_number=file_info['_MIN_SEQUENCE_NUMBER'], |
| max_sequence_number=file_info['_MAX_SEQUENCE_NUMBER'], |
| schema_id=file_info['_SCHEMA_ID'], |
| level=file_info['_LEVEL'], |
| extra_files=None, # TODO |
| ) |
| entry = ManifestEntry( |
| kind=record['_KIND'], |
| partition=BinaryRowDeserializer.from_bytes(record['_PARTITION'], self.partition_key_fields), |
| bucket=record['_BUCKET'], |
| total_buckets=record['_TOTAL_BUCKETS'], |
| file=file_meta |
| ) |
| entries.append(entry) |
| return entries |
| |
| def write(self, commit_messages: List[CommitMessageImpl]) -> List[str]: |
| avro_records = [] |
| for message in commit_messages: |
| partition_bytes = BinaryRowSerializer.to_bytes( |
| BinaryRow(list(message.partition()), self.table.table_schema.get_partition_key_fields)) |
| for file in message.new_files(): |
| avro_record = { |
| "_KIND": 0, |
| "_PARTITION": partition_bytes, |
| "_BUCKET": message.bucket(), |
| "_TOTAL_BUCKETS": -1, # TODO |
| "_FILE": { |
| "_FILE_NAME": file.file_name, |
| "_FILE_SIZE": file.file_size, |
| "_ROW_COUNT": file.row_count, |
| "_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key), |
| "_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key), |
| "_KEY_STATS": 1, # TODO |
| "_VALUE_STATS": 1, |
| "_MIN_SEQUENCE_NUMBER": 0, |
| "_MAX_SEQUENCE_NUMBER": 0, |
| "_SCHEMA_ID": 0, |
| "_LEVEL": 0, |
| "_EXTRA_FILES": [], |
| } |
| } |
| avro_records.append(avro_record) |
| |
| manifest_filename = f"manifest-{str(uuid.uuid4())}.avro" |
| manifest_path = self.manifest_path / manifest_filename |
| try: |
| buffer = BytesIO() |
| fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records) |
| avro_bytes = buffer.getvalue() |
| with self.file_io.new_output_stream(manifest_path) as output_stream: |
| output_stream.write(avro_bytes) |
| return [str(manifest_filename)] |
| except Exception as e: |
| self.file_io.delete_quietly(manifest_path) |
| raise RuntimeError(f"Failed to write manifest file: {e}") from e |
| |
| |
| SIMPLE_STATS_SCHEMA = { |
| "type": "record", |
| "name": "SimpleStats", |
| "namespace": "com.example.paimon", |
| "fields": [ |
| {"name": "null_count", "type": ["null", "long"], "default": None}, |
| {"name": "min_value", "type": ["null", "bytes"], "default": None}, |
| {"name": "max_value", "type": ["null", "bytes"], "default": None}, |
| ] |
| } |
| |
| DATA_FILE_META_SCHEMA = { |
| "type": "record", |
| "name": "DataFileMeta", |
| "fields": [ |
| {"name": "_FILE_NAME", "type": "string"}, |
| {"name": "_FILE_SIZE", "type": "long"}, |
| {"name": "_ROW_COUNT", "type": "long"}, |
| {"name": "_MIN_KEY", "type": "bytes"}, |
| {"name": "_MAX_KEY", "type": "bytes"}, |
| {"name": "_KEY_STATS", "type": "long"}, # TODO |
| {"name": "_VALUE_STATS", "type": "long"}, # TODO |
| {"name": "_MIN_SEQUENCE_NUMBER", "type": "long"}, |
| {"name": "_MAX_SEQUENCE_NUMBER", "type": "long"}, |
| {"name": "_SCHEMA_ID", "type": "long"}, |
| {"name": "_LEVEL", "type": "int"}, |
| {"name": "_EXTRA_FILES", "type": {"type": "array", "items": "string"}}, |
| {"name": "_CREATION_TIME", "type": ["null", "long"], "default": None}, |
| {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": None}, |
| {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": None}, |
| {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None}, |
| {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": "array", "items": "string"}], "default": None}, |
| {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": None}, |
| ] |
| } |
| |
| MANIFEST_ENTRY_SCHEMA = { |
| "type": "record", |
| "name": "ManifestEntry", |
| "fields": [ |
| {"name": "_KIND", "type": "int"}, |
| {"name": "_PARTITION", "type": "bytes"}, |
| {"name": "_BUCKET", "type": "int"}, |
| {"name": "_TOTAL_BUCKETS", "type": "int"}, |
| {"name": "_FILE", "type": DATA_FILE_META_SCHEMA} |
| ] |
| } |