blob: 2b3ba1a1af1fcd96e8b55623aa04f7af9375ae67 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import pyarrow as pa
from pathlib import Path
from pypaimon.write.blob_format_writer import BlobFormatWriter
from pypaimon.table.row.generic_row import GenericRow, RowKind
from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
class BlobFileWriter:
"""
Single blob file writer
Writes rows one by one and tracks file size.
"""
def __init__(self, file_io, file_path: Path, blob_as_descriptor: bool):
self.file_io = file_io
self.file_path = file_path
self.blob_as_descriptor = blob_as_descriptor
self.output_stream = file_io.new_output_stream(file_path)
self.writer = BlobFormatWriter(self.output_stream)
self.row_count = 0
self.closed = False
def write_row(self, row_data: pa.Table):
"""Write a single row to the blob file."""
if row_data.num_rows != 1:
raise ValueError(f"Expected 1 row, got {row_data.num_rows}")
# Convert PyArrow row to GenericRow
records_dict = row_data.to_pydict()
field_name = row_data.schema[0].name
col_data = records_dict[field_name][0]
# Convert to Blob
if self.blob_as_descriptor:
# In blob-as-descriptor mode, we need to read external file data
# for rolling size calculation (based on external file size)
if isinstance(col_data, bytes):
blob_descriptor = BlobDescriptor.deserialize(col_data)
else:
# Handle PyArrow types
if hasattr(col_data, 'as_py'):
col_data = col_data.as_py()
if isinstance(col_data, str):
col_data = col_data.encode('utf-8')
blob_descriptor = BlobDescriptor.deserialize(col_data)
# Read external file data for rolling size calculation
uri_reader = self.file_io.uri_reader_factory.create(blob_descriptor.uri)
blob_data = Blob.from_descriptor(uri_reader, blob_descriptor)
elif isinstance(col_data, bytes):
blob_data = BlobData(col_data)
else:
if hasattr(col_data, 'as_py'):
col_data = col_data.as_py()
if isinstance(col_data, str):
col_data = col_data.encode('utf-8')
blob_data = BlobData(col_data)
# Create GenericRow
fields = [DataField(0, field_name, PyarrowFieldParser.to_paimon_type(row_data.schema[0].type, False))]
row = GenericRow([blob_data], fields, RowKind.INSERT)
# Write to blob format writer
self.writer.add_element(row)
self.row_count += 1
def reach_target_size(self, suggested_check: bool, target_size: int) -> bool:
return self.writer.reach_target_size(suggested_check, target_size)
def close(self) -> int:
if self.closed:
return self.file_io.get_file_size(self.file_path)
self.writer.close()
self.closed = True
# Get actual file size
file_size = self.file_io.get_file_size(self.file_path)
return file_size
def abort(self):
"""Abort the writer and delete the file."""
if not self.closed:
try:
if hasattr(self.output_stream, 'close'):
self.output_stream.close()
except Exception:
pass
self.closed = True
# Delete the file
self.file_io.delete_quietly(self.file_path)