blob: 07b29bbc4e46c1d916fe351b9ba5dbc7fa98516b [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import struct
import zlib
from typing import BinaryIO, List
from pypaimon.table.row.blob import Blob, BlobData
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
class BlobFormatWriter:
VERSION = 1
MAGIC_NUMBER = 1481511375
BUFFER_SIZE = 4096
METADATA_SIZE = 12 # 8-byte length + 4-byte CRC
def __init__(self, output_stream: BinaryIO):
self.output_stream = output_stream
self.lengths: List[int] = []
self.position = 0
def add_element(self, row) -> None:
if not hasattr(row, 'values') or len(row.values) != 1:
raise ValueError("BlobFormatWriter only supports one field")
blob_value = row.values[0]
if blob_value is None:
raise ValueError("BlobFormatWriter only supports non-null blob")
if not isinstance(blob_value, Blob):
raise ValueError("Field must be Blob/BlobData instance")
previous_pos = self.position
crc32 = 0 # Initialize CRC32
# Write magic number
magic_bytes = struct.pack('<I', self.MAGIC_NUMBER) # Little endian
crc32 = self._write_with_crc(magic_bytes, crc32)
# Write blob data
if isinstance(blob_value, BlobData):
data = blob_value.to_data()
crc32 = self._write_with_crc(data, crc32)
else:
# Stream from BlobRef/Blob
stream = blob_value.new_input_stream()
try:
chunk = stream.read(self.BUFFER_SIZE)
while chunk:
crc32 = self._write_with_crc(chunk, crc32)
chunk = stream.read(self.BUFFER_SIZE)
finally:
stream.close()
# Calculate total length including magic + data + metadata (length + CRC)
bin_length = self.position - previous_pos + self.METADATA_SIZE
self.lengths.append(bin_length)
# Write length (8 bytes, little endian)
length_bytes = struct.pack('<Q', bin_length)
self.output_stream.write(length_bytes)
self.position += 8
# Write CRC32 (4 bytes, little endian)
crc_bytes = struct.pack('<I', crc32 & 0xffffffff)
self.output_stream.write(crc_bytes)
self.position += 4
def _write_with_crc(self, data: bytes, crc32: int) -> int:
crc32 = zlib.crc32(data, crc32)
self.output_stream.write(data)
self.position += len(data)
return crc32
def reach_target_size(self, suggested_check: bool, target_size: int) -> bool:
return self.position >= target_size
def close(self) -> None:
index_bytes = DeltaVarintCompressor.compress(self.lengths)
self.output_stream.write(index_bytes)
# Write header (index length + version)
header = struct.pack('<I', len(index_bytes)) # Index length (4 bytes, little endian)
header += struct.pack('<B', self.VERSION) # Version (1 byte)
self.output_stream.write(header)
# Flush and close
if hasattr(self.output_stream, 'flush'):
self.output_stream.flush()
if hasattr(self.output_stream, 'close'):
self.output_stream.close()