blob: 5e0affe7d85d776929b5cb7dd24cb89d62b2874d [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import struct
from typing import List, Optional, Any, Iterator
import pyarrow as pa
import pyarrow.dataset as ds
from pyarrow import RecordBatch
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
from pypaimon.common.file_io import FileIO
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser, AtomicType
from pypaimon.table.row.blob import Blob
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.row_kind import RowKind
class FormatBlobReader(RecordBatchReader):
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool,
batch_size: int = 4096):
self._file_io = file_io
self._file_path = file_path
self._push_down_predicate = push_down_predicate
self._blob_as_descriptor = blob_as_descriptor
self._batch_size = batch_size
# Get file size
self._file_size = file_io.get_file_size(file_path)
# Initialize the low-level blob format reader
self.file_path = file_path
self.blob_lengths: List[int] = []
self.blob_offsets: List[int] = []
self.returned = False
self._read_index()
# Set up fields and schema
if len(read_fields) > 1:
raise RuntimeError("Blob reader only supports one field.")
self._fields = read_fields
full_fields_map = {field.name: field for field in full_fields}
projected_data_fields = [full_fields_map[name] for name in read_fields]
self._schema = PyarrowFieldParser.from_paimon_schema(projected_data_fields)
# Initialize iterator
self._blob_iterator = None
self._current_batch = None
def read_arrow_batch(self) -> Optional[RecordBatch]:
if self._blob_iterator is None:
if self.returned:
return None
self.returned = True
batch_iterator = BlobRecordIterator(
self._file_io, self.file_path, self.blob_lengths,
self.blob_offsets, self._fields[0]
)
self._blob_iterator = iter(batch_iterator)
# Collect records for this batch
pydict_data = {name: [] for name in self._fields}
records_in_batch = 0
try:
while True:
blob_row = next(self._blob_iterator)
if blob_row is None:
break
blob = blob_row.values[0]
for field_name in self._fields:
blob_descriptor = blob.to_descriptor()
if self._blob_as_descriptor:
blob_data = blob_descriptor.serialize()
else:
blob_data = blob.to_data()
pydict_data[field_name].append(blob_data)
records_in_batch += 1
if records_in_batch >= self._batch_size:
break
except StopIteration:
# Stop immediately when StopIteration occurs
pass
if records_in_batch == 0:
return None
# Create RecordBatch
if self._push_down_predicate is None:
# Convert to Table first, then to RecordBatch
table = pa.Table.from_pydict(pydict_data, self._schema)
if table.num_rows > 0:
return table.to_batches()[0]
else:
return None
else:
# Apply predicate filtering
pa_batch = pa.Table.from_pydict(pydict_data, self._schema)
dataset = ds.InMemoryDataset(pa_batch)
scanner = dataset.scanner(filter=self._push_down_predicate)
combine_chunks = scanner.to_table().combine_chunks()
if combine_chunks.num_rows > 0:
return combine_chunks.to_batches()[0]
else:
return None
def close(self):
self._blob_iterator = None
def _read_index(self) -> None:
with self._file_io.new_input_stream(self.file_path) as f:
# Seek to header: last 5 bytes
f.seek(self._file_size - 5)
header = f.read(5)
if len(header) != 5:
raise IOError("Invalid blob file: cannot read header")
# Parse header
index_length = struct.unpack('<I', header[:4])[0] # Little endian
version = header[4]
if version != 1:
raise IOError(f"Unsupported blob file version: {version}")
# Read index data
f.seek(self._file_size - 5 - index_length)
index_bytes = f.read(index_length)
if len(index_bytes) != index_length:
raise IOError("Invalid blob file: cannot read index")
# Decompress blob lengths and compute offsets
blob_lengths = DeltaVarintCompressor.decompress(index_bytes)
blob_offsets = []
offset = 0
for length in blob_lengths:
blob_offsets.append(offset)
offset += length
self.blob_lengths = blob_lengths
self.blob_offsets = blob_offsets
class BlobRecordIterator:
MAGIC_NUMBER_SIZE = 4
METADATA_OVERHEAD = 16
def __init__(self, file_io: FileIO, file_path: str, blob_lengths: List[int],
blob_offsets: List[int], field_name: str):
self.file_io = file_io
self.file_path = file_path
self.field_name = field_name
self.blob_lengths = blob_lengths
self.blob_offsets = blob_offsets
self.current_position = 0
def __iter__(self) -> Iterator[GenericRow]:
return self
def __next__(self) -> GenericRow:
if self.current_position >= len(self.blob_lengths):
raise StopIteration
# Create blob reference for the current blob
# Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes
blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number
blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD
blob = Blob.from_file(self.file_io, self.file_path, blob_offset, blob_length)
self.current_position += 1
fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
return GenericRow([blob], fields, RowKind.INSERT)
def returned_position(self) -> int:
return self.current_position