blob: 156cda52bf33e655c0618515a7cd6a137f435ed8 [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Utilities for deserializing Paimon BlobDescriptor bytes into FileReference arrays."""
from __future__ import annotations
import struct
import pyarrow as pa
FILE_PHYSICAL_TYPE = pa.struct(
[
pa.field("url", pa.large_utf8()),
pa.field("io_config", pa.large_binary()),
pa.field("offset", pa.int64()),
pa.field("length", pa.int64()),
]
)
def _deserialize_one(data: bytes) -> tuple[str, int, int]:
"""Deserialize a single BlobDescriptor -> (url, offset, length)."""
pos = 0
version = data[pos]
pos += 1
if version > 1:
pos += 8 # skip magic
uri_len = struct.unpack_from("<I", data, pos)[0]
pos += 4
uri = data[pos:pos + uri_len].decode("utf-8")
pos += uri_len
offset = struct.unpack_from("<q", data, pos)[0]
pos += 8
length = struct.unpack_from("<q", data, pos)[0]
return uri, offset, length
def blob_column_to_file_array(column: pa.Array) -> pa.Array:
"""Convert a large_binary column of serialized BlobDescriptors to a FileReference-compatible struct."""
urls: list[str | None] = []
offsets: list[int | None] = []
lengths: list[int | None] = []
for value in column:
if value is None or not value.is_valid:
urls.append(None)
offsets.append(None)
lengths.append(None)
else:
raw = value.as_py()
uri, off, length = _deserialize_one(raw)
urls.append(uri)
offsets.append(off)
lengths.append(length)
n = len(urls)
return pa.StructArray.from_arrays(
[
pa.array(urls, type=pa.large_utf8()),
pa.nulls(n, type=pa.large_binary()),
pa.array(offsets, type=pa.int64()),
pa.array(lengths, type=pa.int64()),
],
names=["url", "io_config", "offset", "length"],
)