| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| ################################################################################ |
| |
| """Shredded Parquet VARIANT support for pypaimon. |
| |
| Paimon stores VARIANT columns in Parquet using a "shredded" format that enables |
| efficient sub-field reading and predicate pushdown. This module provides: |
| |
| 1. VariantSchema / build_variant_schema() |
| Parse the PyArrow struct type of a shredded VARIANT column into a tree |
| that mirrors the Java ``VariantSchema`` class. |
| |
| 2. rebuild_value() / rebuild() |
| Reconstruct standard ``(value: bytes, metadata: bytes)`` VARIANT binary |
| from a shredded row dict. Mirrors ``ShreddingUtils.rebuild()`` in Java. |
| |
| 3. assemble_shredded_column() |
| High-level helper used by ``FormatPyArrowReader`` to post-process batches |
| that contain shredded VARIANT columns. |
| |
| 4. is_shredded_variant() |
| Detect shredded VARIANT columns in a file schema. |
| |
| Shredded column layout (Parquet GROUP → PyArrow struct): |
| - ``metadata``: binary — the top-level key dictionary |
| - ``value``: binary (optional) — overflow bytes for un-shredded fields |
| - ``typed_value``: struct — per-field typed sub-columns (shredded fields) |
| |
| Each field inside ``typed_value`` has the same ``{value, typed_value}`` |
| structure recursively. No ``field_`` prefix is used; sub-column names are |
| the exact variant key names. |
| """ |
| |
| import datetime |
| import decimal as _decimal |
| from dataclasses import dataclass |
| from typing import Dict, List, Optional, Tuple |
| |
| import pyarrow as pa |
| |
| from pypaimon.data._variant_binary import ( |
| _OBJECT, |
| _U8_MAX, _U32_SIZE, |
| _VERSION, _VERSION_MASK, |
| _read_unsigned, _get_int_size, |
| _primitive_header, _object_header, _array_header, |
| ) |
| |
| # --------------------------------------------------------------------------- |
| # Local constants for null/true/false type IDs |
| # --------------------------------------------------------------------------- |
| |
| _NULL_TYPE_ID = 0 |
| _TRUE_TYPE_ID = 1 |
| _FALSE_TYPE_ID = 2 |
| |
| _NULL_VALUE_BYTES: bytes = bytes([_primitive_header(_NULL_TYPE_ID)]) |
| |
| |
| # --------------------------------------------------------------------------- |
| # Low-level binary helpers (shredding-specific) |
| # --------------------------------------------------------------------------- |
| |
| def _append_le(buf: bytearray, value: int, n: int) -> None: |
| buf.extend(value.to_bytes(n, 'little')) |
| |
| |
| # --------------------------------------------------------------------------- |
| # Metadata parsing |
| # --------------------------------------------------------------------------- |
| |
| def parse_metadata_dict(metadata: bytes) -> Dict[str, int]: |
| """Parse variant metadata bytes into a ``{key_name: key_id}`` mapping. |
| |
| The top-level metadata is shared across all shredded sub-fields. We parse |
| it once and pass the dict down to ``rebuild_value()`` so every recursive |
| call can look up key IDs without re-parsing. |
| """ |
| if not metadata or len(metadata) < 1: |
| return {} |
| if (metadata[0] & _VERSION_MASK) != _VERSION: |
| raise ValueError('MALFORMED_VARIANT: invalid metadata version') |
| offset_size = ((metadata[0] >> 6) & 0x3) + 1 |
| if len(metadata) < 1 + offset_size: |
| return {} |
| dict_size = _read_unsigned(metadata, 1, offset_size) |
| if dict_size == 0: |
| return {} |
| string_start = 1 + (dict_size + 2) * offset_size |
| result: Dict[str, int] = {} |
| for key_id in range(dict_size): |
| off = _read_unsigned(metadata, 1 + (key_id + 1) * offset_size, offset_size) |
| next_off = _read_unsigned(metadata, 1 + (key_id + 2) * offset_size, offset_size) |
| key = metadata[string_start + off:string_start + next_off].decode('utf-8') |
| result[key] = key_id |
| return result |
| |
| |
| # --------------------------------------------------------------------------- |
| # VariantSchema / build_variant_schema |
| # --------------------------------------------------------------------------- |
| |
| @dataclass |
| class ObjectField: |
| """One shredded field inside a ``typed_value`` object group.""" |
| field_name: str # exact variant key name, no "field_" prefix |
| schema: 'VariantSchema' |
| |
| |
| @dataclass |
| class VariantSchema: |
| """Describes the shredding layout of a VARIANT column or sub-column. |
| |
| Mirrors the Java ``VariantSchema`` class. Indices are positions within |
| the PyArrow struct type that was parsed via ``build_variant_schema()``. |
| |
| For a plain (un-shredded) VARIANT: |
| ``metadata_idx >= 0``, ``value_idx >= 0``, ``typed_idx < 0``. |
| """ |
| typed_idx: int = -1 |
| value_idx: int = -1 |
| metadata_idx: int = -1 |
| num_fields: int = 0 |
| scalar_arrow_type: Optional[pa.DataType] = None |
| object_fields: Optional[List[ObjectField]] = None |
| object_schema_map: Optional[Dict[str, int]] = None |
| array_schema: Optional['VariantSchema'] = None |
| |
| def is_unshredded(self) -> bool: |
| """Return True if this is a plain (non-shredded) VARIANT layout.""" |
| return self.metadata_idx >= 0 and self.typed_idx < 0 |
| |
| |
| def is_shredded_variant(pa_type: pa.DataType) -> bool: |
| """Return True if *pa_type* is a shredded Parquet VARIANT struct. |
| |
| A shredded VARIANT column has three top-level fields: ``metadata``, |
| ``value`` (overflow), and ``typed_value`` (shredded sub-columns). |
| """ |
| if not pa.types.is_struct(pa_type): |
| return False |
| names = {pa_type[i].name for i in range(pa_type.num_fields)} |
| return 'metadata' in names and 'value' in names and 'typed_value' in names |
| |
| |
| def build_variant_schema(pa_type: pa.StructType) -> VariantSchema: |
| """Parse a PyArrow struct type into a ``VariantSchema`` tree. |
| |
| Works for both the top-level VARIANT column struct (which has ``metadata`` |
| in addition to ``value`` / ``typed_value``) and sub-field structs (which |
| only have ``value`` / ``typed_value``). |
| """ |
| schema = VariantSchema(num_fields=pa_type.num_fields) |
| for i in range(pa_type.num_fields): |
| f = pa_type[i] |
| if f.name == 'metadata': |
| schema.metadata_idx = i |
| elif f.name == 'value': |
| schema.value_idx = i |
| elif f.name == 'typed_value': |
| schema.typed_idx = i |
| schema = _parse_typed_value_field(schema, f.type) |
| return schema |
| |
| |
| def _parse_typed_value_field(schema: VariantSchema, tv_type: pa.DataType) -> VariantSchema: |
| """Fill in the shredding details for a ``typed_value`` field.""" |
| if pa.types.is_struct(tv_type): |
| object_fields: List[ObjectField] = [] |
| for j in range(tv_type.num_fields): |
| sub_f = tv_type.field(j) |
| if pa.types.is_struct(sub_f.type): |
| sub_schema = build_variant_schema(sub_f.type) |
| else: |
| # Scalar typed_value embedded directly (no surrounding struct) |
| sub_schema = VariantSchema( |
| typed_idx=0, num_fields=1, scalar_arrow_type=sub_f.type |
| ) |
| object_fields.append(ObjectField(sub_f.name, sub_schema)) |
| schema.object_fields = object_fields |
| schema.object_schema_map = { |
| of.field_name: idx for idx, of in enumerate(object_fields) |
| } |
| elif pa.types.is_list(tv_type) or pa.types.is_large_list(tv_type): |
| elem_type = tv_type.value_type |
| if pa.types.is_struct(elem_type): |
| schema.array_schema = build_variant_schema(elem_type) |
| else: |
| schema.array_schema = VariantSchema( |
| typed_idx=0, num_fields=1, scalar_arrow_type=elem_type |
| ) |
| else: |
| schema.scalar_arrow_type = tv_type |
| return schema |
| |
| |
| # --------------------------------------------------------------------------- |
| # Scalar encoding: Arrow typed value → variant binary bytes |
| # --------------------------------------------------------------------------- |
| |
| def _encode_scalar_to_value_bytes(typed_value, arrow_type: pa.DataType) -> bytes: |
| """Encode a typed Python scalar (from PyArrow .as_py()) to variant value bytes.""" |
| from pypaimon.data.generic_variant import _GenericVariantBuilder # local import avoids circular |
| builder = _GenericVariantBuilder() |
| _append_scalar(builder, typed_value, arrow_type) |
| gv = builder.result() |
| # _pos == 0 so value() returns all bytes |
| return gv.value() |
| |
| |
| def _append_scalar(builder, value, arrow_type: pa.DataType) -> None: |
| """Dispatch a typed Python scalar into the appropriate builder method.""" |
| if value is None: |
| builder.append_null() |
| return |
| |
| if pa.types.is_boolean(arrow_type): |
| builder.append_boolean(bool(value)) |
| elif pa.types.is_integer(arrow_type): |
| builder.append_long(int(value)) |
| elif pa.types.is_float64(arrow_type): |
| builder.append_double(float(value)) |
| elif pa.types.is_float32(arrow_type): |
| builder.append_float(float(value)) |
| elif pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type): |
| builder.append_string(str(value)) |
| elif pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type): |
| builder.append_binary(bytes(value)) |
| elif pa.types.is_date32(arrow_type): |
| # PyArrow converts date32 to datetime.date |
| if isinstance(value, datetime.date): |
| days = (value - datetime.date(1970, 1, 1)).days |
| else: |
| days = int(value) |
| builder.append_date(days) |
| elif pa.types.is_timestamp(arrow_type): |
| # PyArrow converts timestamp to datetime.datetime |
| if isinstance(value, datetime.datetime): |
| if value.tzinfo is not None: |
| epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) |
| micros = int((value - epoch).total_seconds() * 1_000_000) |
| builder.append_timestamp(micros) |
| else: |
| epoch = datetime.datetime(1970, 1, 1) |
| micros = int((value - epoch).total_seconds() * 1_000_000) |
| builder.append_timestamp_ntz(micros) |
| else: |
| builder.append_timestamp_ntz(int(value)) |
| elif pa.types.is_decimal(arrow_type): |
| if isinstance(value, _decimal.Decimal): |
| builder.append_decimal(value) |
| else: |
| builder.append_decimal(_decimal.Decimal(str(value))) |
| else: |
| # Fallback: encode as string |
| builder.append_string(str(value)) |
| |
| |
| # --------------------------------------------------------------------------- |
| # Object / array binary construction |
| # --------------------------------------------------------------------------- |
| |
| def _build_object_value(fields: List[Tuple[int, bytes]]) -> bytes: |
| """Build object variant value bytes from ``(key_id, value_bytes)`` pairs. |
| |
| The variant spec requires fields sorted by key_id. |
| """ |
| if not fields: |
| # Empty object: header + size=0 + one zero-offset sentinel |
| buf = bytearray() |
| buf.append(_object_header(False, 1, 1)) |
| buf.append(0) # size = 0 |
| buf.append(0) # offset[0] = 0 (sentinel) |
| return bytes(buf) |
| |
| fields = sorted(fields, key=lambda f: f[0]) |
| size = len(fields) |
| data = b''.join(vb for _, vb in fields) |
| data_size = len(data) |
| |
| large_size = size > _U8_MAX |
| size_bytes = _U32_SIZE if large_size else 1 |
| max_id = max(kid for kid, _ in fields) |
| id_size = _get_int_size(max_id) |
| offset_size = _get_int_size(data_size) if data_size > 0 else 1 |
| |
| buf = bytearray() |
| buf.append(_object_header(large_size, id_size, offset_size)) |
| _append_le(buf, size, size_bytes) |
| |
| for kid, _ in fields: |
| _append_le(buf, kid, id_size) |
| |
| offset = 0 |
| for _, vb in fields: |
| _append_le(buf, offset, offset_size) |
| offset += len(vb) |
| _append_le(buf, offset, offset_size) # sentinel = total data size |
| |
| buf.extend(data) |
| return bytes(buf) |
| |
| |
| def _build_array_value(element_bytes_list: List[bytes]) -> bytes: |
| """Build array variant value bytes from per-element value bytes.""" |
| size = len(element_bytes_list) |
| data = b''.join(element_bytes_list) |
| data_size = len(data) |
| |
| large_size = size > _U8_MAX |
| size_bytes = _U32_SIZE if large_size else 1 |
| offset_size = _get_int_size(data_size) if data_size > 0 else 1 |
| |
| buf = bytearray() |
| buf.append(_array_header(large_size, offset_size)) |
| _append_le(buf, size, size_bytes) |
| |
| offset = 0 |
| for eb in element_bytes_list: |
| _append_le(buf, offset, offset_size) |
| offset += len(eb) |
| _append_le(buf, offset, offset_size) # sentinel |
| |
| buf.extend(data) |
| return bytes(buf) |
| |
| |
| def _extract_overflow_fields(overflow_bytes: bytes) -> List[Tuple[int, bytes]]: |
| """Parse an overflow binary (a variant object) into ``(key_id, value_bytes)`` pairs. |
| |
| The overflow binary contains fields that were NOT shredded — they remain |
| encoded as a compact variant object. |
| """ |
| if not overflow_bytes: |
| return [] |
| |
| b = overflow_bytes[0] |
| basic_type = b & 0x3 |
| if basic_type != _OBJECT: |
| return [] |
| |
| type_info = (b >> 2) & 0x3F |
| large_size = bool((type_info >> 4) & 0x1) |
| size_bytes = _U32_SIZE if large_size else 1 |
| size = _read_unsigned(overflow_bytes, 1, size_bytes) |
| if size == 0: |
| return [] |
| |
| id_size = ((type_info >> 2) & 0x3) + 1 |
| offset_size = (type_info & 0x3) + 1 |
| id_start = 1 + size_bytes |
| offset_start = id_start + size * id_size |
| data_start = offset_start + (size + 1) * offset_size |
| |
| # The Parquet variant spec stores field offsets in the same order as the id_table, |
| # but the DATA section may be laid out in a different order (e.g. GenericVariantBuilder |
| # sorts the id/offset tables alphabetically while writing data in insertion order). |
| # We must sort by offset to determine each field's byte boundaries correctly. |
| # |
| # Example: variant {"b": 2, "a": 1} written by GenericVariantBuilder |
| # id_table: [id_a=0, id_b=1] (alphabetical order) |
| # offset_table: [off_a=2, off_b=0] (id-order: a first, then b) |
| # data section: [enc(2), enc(1)] (insertion order: b first, then a) |
| # So pairs = [(id_a=0, off=2), (id_b=1, off=0)] |
| # Sorted by offset: [(id_b=1, off=0), (id_a=0, off=2)] |
| # Boundaries: [0, 2, sentinel] → enc(2)=data[0:2], enc(1)=data[2:sentinel] |
| pairs: List[Tuple[int, int]] = [] |
| for i in range(size): |
| key_id = _read_unsigned(overflow_bytes, id_start + i * id_size, id_size) |
| off = _read_unsigned(overflow_bytes, offset_start + i * offset_size, offset_size) |
| pairs.append((key_id, off)) |
| |
| sentinel = _read_unsigned(overflow_bytes, offset_start + size * offset_size, offset_size) |
| |
| # Sort by offset so that adjacent entries define contiguous data boundaries. |
| # Track by original index (always unique) to avoid dict key collisions when |
| # two fields share the same offset (malformed data). |
| indexed_pairs = sorted(enumerate(pairs), key=lambda x: x[1][1]) |
| boundaries = [ip[1][1] for ip in indexed_pairs] + [sentinel] |
| |
| # end_by_orig[i] = end offset for pairs[i] |
| end_by_orig = [0] * size |
| for rank, (orig_idx, _) in enumerate(indexed_pairs): |
| end_by_orig[orig_idx] = boundaries[rank + 1] |
| |
| fields: List[Tuple[int, bytes]] = [] |
| for orig_idx, (key_id, off) in enumerate(pairs): |
| end = end_by_orig[orig_idx] |
| field_bytes = bytes(overflow_bytes[data_start + off:data_start + end]) |
| fields.append((key_id, field_bytes)) |
| return fields |
| |
| |
| # --------------------------------------------------------------------------- |
| # Core rebuild algorithm (mirrors ShreddingUtils.rebuild() in Java) |
| # --------------------------------------------------------------------------- |
| |
| def rebuild_value( |
| row: dict, |
| schema: VariantSchema, |
| key_dict: Dict[str, int], |
| ) -> Optional[bytes]: |
| """Reconstruct variant value bytes from a shredded sub-row dict. |
| |
| Args: |
| row: Python dict from ``PyArrow StructScalar.as_py()``. Keys are |
| ``'value'`` and/or ``'typed_value'``. |
| schema: VariantSchema for this level. |
| key_dict: ``{key_name: key_id}`` parsed from the top-level metadata. |
| |
| Returns: |
| Variant value bytes, or ``None`` if the field is absent (both |
| ``typed_value`` and ``value`` are null — "missing from this row"). |
| """ |
| typed_value = row.get('typed_value') if schema.typed_idx >= 0 else None |
| overflow = row.get('value') if schema.value_idx >= 0 else None |
| |
| # if both null → field is absent in this row |
| if typed_value is None and overflow is None: |
| return None |
| |
| # if typed_value is null → use overflow bytes directly |
| if typed_value is None: |
| return bytes(overflow) |
| |
| if schema.scalar_arrow_type is not None: |
| return _encode_scalar_to_value_bytes(typed_value, schema.scalar_arrow_type) |
| |
| if schema.object_fields is not None: |
| return _rebuild_object(typed_value, schema, key_dict, overflow) |
| |
| if schema.array_schema is not None: |
| return _rebuild_array(typed_value, schema.array_schema, key_dict) |
| |
| # No sub-schema for typed_value; fall back to overflow |
| return bytes(overflow) if overflow is not None else None |
| |
| |
| def _rebuild_object( |
| typed_value: dict, |
| schema: VariantSchema, |
| key_dict: Dict[str, int], |
| overflow_bytes: Optional[bytes], |
| ) -> bytes: |
| """Rebuild an object variant from shredded object sub-fields.""" |
| fields: List[Tuple[int, bytes]] = [] |
| |
| for obj_field in schema.object_fields: |
| fname = obj_field.field_name |
| sub_row = typed_value.get(fname) if isinstance(typed_value, dict) else None |
| if sub_row is None: |
| continue |
| |
| field_value = rebuild_value(sub_row, obj_field.schema, key_dict) |
| if field_value is None: |
| continue |
| |
| key_id = key_dict.get(fname) |
| if key_id is None: |
| # Key not found in metadata — data integrity issue, skip |
| continue |
| fields.append((key_id, field_value)) |
| |
| # Merge with overflow (fields not shredded) |
| if overflow_bytes: |
| fields.extend(_extract_overflow_fields(bytes(overflow_bytes))) |
| |
| return _build_object_value(fields) |
| |
| |
| def _rebuild_array( |
| typed_value, |
| element_schema: VariantSchema, |
| key_dict: Dict[str, int], |
| ) -> bytes: |
| """Rebuild an array variant from a shredded list.""" |
| if typed_value is None: |
| typed_value = [] |
| |
| element_bytes_list: List[bytes] = [] |
| for element_row in typed_value: |
| if element_row is None: |
| element_bytes_list.append(_NULL_VALUE_BYTES) |
| elif isinstance(element_row, dict): |
| eb = rebuild_value(element_row, element_schema, key_dict) |
| element_bytes_list.append(eb if eb is not None else _NULL_VALUE_BYTES) |
| else: |
| # Scalar element directly (no surrounding {value, typed_value} struct) |
| if element_schema.scalar_arrow_type is not None: |
| eb = _encode_scalar_to_value_bytes(element_row, element_schema.scalar_arrow_type) |
| else: |
| eb = _NULL_VALUE_BYTES |
| element_bytes_list.append(eb) |
| |
| return _build_array_value(element_bytes_list) |
| |
| |
| def rebuild( |
| row: dict, |
| schema: VariantSchema, |
| key_dict: Optional[Dict[str, int]] = None, |
| ) -> Tuple[bytes, bytes]: |
| """Reconstruct ``(value_bytes, metadata_bytes)`` from a top-level shredded row. |
| |
| Args: |
| row: Top-level shredded row dict (contains ``'metadata'``, |
| ``'value'``, and/or ``'typed_value'`` keys). |
| schema: Top-level VariantSchema (from ``build_variant_schema()``). |
| key_dict: Optional pre-parsed ``{key_name: key_id}`` mapping. If None, |
| it is parsed from ``row['metadata']`` automatically. |
| |
| Returns: |
| ``(value_bytes, metadata_bytes)`` suitable for constructing a standard |
| VARIANT ``struct<value: binary, metadata: binary>`` row. |
| """ |
| raw_metadata = row.get('metadata') |
| if raw_metadata is None: |
| raise ValueError("Shredded VARIANT row missing 'metadata' field") |
| metadata = bytes(raw_metadata) |
| |
| if key_dict is None: |
| key_dict = parse_metadata_dict(metadata) |
| |
| value_bytes = rebuild_value(row, schema, key_dict) |
| if value_bytes is None: |
| value_bytes = _NULL_VALUE_BYTES |
| |
| return value_bytes, metadata |
| |
| |
| # --------------------------------------------------------------------------- |
| # High-level column assembly |
| # --------------------------------------------------------------------------- |
| |
| #: The canonical VARIANT Arrow type (struct<value: binary NOT NULL, |
| #: metadata: binary NOT NULL>). |
| VARIANT_ARROW_TYPE = pa.struct([ |
| pa.field('value', pa.binary(), nullable=False), |
| pa.field('metadata', pa.binary(), nullable=False), |
| ]) |
| |
| |
| def assemble_shredded_column(column: pa.Array, schema: VariantSchema) -> pa.Array: |
| """Convert a shredded VARIANT column to standard ``struct<value, metadata>``. |
| |
| Args: |
| column: A PyArrow Array whose type is a shredded VARIANT struct. |
| schema: VariantSchema built from the column's type. |
| |
| Returns: |
| A ``pa.StructArray`` with type ``struct<value: binary, metadata: binary>``. |
| """ |
| rows = column.to_pylist() |
| assembled = [] |
| # cache parsed key dicts keyed by metadata bytes; most files share one metadata |
| key_dict_cache: Dict[bytes, Dict[str, int]] = {} |
| |
| for row in rows: |
| if row is None: |
| assembled.append(None) |
| continue |
| |
| raw_meta = row.get('metadata') |
| if raw_meta is None: |
| assembled.append(None) |
| continue |
| |
| metadata = bytes(raw_meta) |
| if metadata not in key_dict_cache: |
| key_dict_cache[metadata] = parse_metadata_dict(metadata) |
| |
| value_bytes = rebuild_value(row, schema, key_dict_cache[metadata]) |
| if value_bytes is None: |
| value_bytes = _NULL_VALUE_BYTES |
| |
| assembled.append({'value': value_bytes, 'metadata': metadata}) |
| |
| return pa.array(assembled, type=VARIANT_ARROW_TYPE) |
| |
| |
| # --------------------------------------------------------------------------- |
| # Write-side shredding |
| # --------------------------------------------------------------------------- |
| |
| def _paimon_type_str_to_arrow(type_str: str) -> pa.DataType: |
| """Map a Paimon SQL type string to the Arrow type used in ``typed_value``.""" |
| from pypaimon.schema.data_types import AtomicType, PyarrowFieldParser |
| try: |
| return PyarrowFieldParser.from_paimon_type(AtomicType(type_str.upper())) |
| except Exception: |
| return pa.binary() |
| |
| |
| def _parse_sub_schema(type_def) -> VariantSchema: |
| """Recursively parse a Paimon type definition (string or dict) into a VariantSchema. |
| |
| The resulting schema is for a sub-field struct, so ``value_idx=0``, |
| ``typed_idx=1``, no ``metadata_idx``. |
| """ |
| if isinstance(type_def, str): |
| # Scalar leaf: e.g. "BIGINT", "VARCHAR", "DOUBLE" |
| arrow_type = _paimon_type_str_to_arrow(type_def) |
| return VariantSchema( |
| value_idx=0, |
| typed_idx=1, |
| num_fields=2, |
| scalar_arrow_type=arrow_type, |
| ) |
| |
| if not isinstance(type_def, dict): |
| return VariantSchema(value_idx=0, num_fields=1) |
| |
| kind = type_def.get('type', '').upper() |
| |
| if kind == 'ROW': |
| sub_fields_def = type_def.get('fields', []) |
| object_fields: List[ObjectField] = [] |
| for f in sub_fields_def: |
| fname = f.get('name', '') |
| ftype = f.get('type', 'BINARY') |
| sub_schema = _parse_sub_schema(ftype) |
| object_fields.append(ObjectField(fname, sub_schema)) |
| schema = VariantSchema( |
| value_idx=0, |
| typed_idx=1, |
| num_fields=2, |
| object_fields=object_fields, |
| object_schema_map={of.field_name: i for i, of in enumerate(object_fields)}, |
| ) |
| return schema |
| |
| if kind == 'ARRAY': |
| elem_type = type_def.get('element', 'BINARY') |
| elem_schema = _parse_sub_schema(elem_type) |
| return VariantSchema( |
| value_idx=0, |
| typed_idx=1, |
| num_fields=2, |
| array_schema=elem_schema, |
| ) |
| |
| # Fallback: treat as scalar using the type string |
| arrow_type = _paimon_type_str_to_arrow(kind) |
| return VariantSchema( |
| value_idx=0, |
| typed_idx=1, |
| num_fields=2, |
| scalar_arrow_type=arrow_type, |
| ) |
| |
| |
| def parse_shredding_schema_option(json_str: str) -> Dict[str, List[ObjectField]]: |
| """Parse the ``variant.shreddingSchema`` option value. |
| |
| Args: |
| json_str: JSON-encoded Paimon ROW type where each top-level field |
| corresponds to a VARIANT column name, and its type is a ROW |
| listing the sub-fields to shred. |
| |
| Returns: |
| ``{variant_col_name: [ObjectField, ...]}`` mapping. |
| |
| Raises: |
| ValueError: if the JSON is invalid or the top-level type is not ROW. |
| """ |
| import json as _json |
| data = _json.loads(json_str) |
| if data.get('type', '').upper() != 'ROW': |
| raise ValueError( |
| f"variant.shreddingSchema must be a JSON-encoded ROW type, got: {data.get('type')}" |
| ) |
| |
| result: Dict[str, List[ObjectField]] = {} |
| for field_def in data.get('fields', []): |
| col_name = field_def.get('name', '') |
| col_type = field_def.get('type', {}) |
| |
| if isinstance(col_type, dict) and col_type.get('type', '').upper() == 'ROW': |
| sub_fields_def = col_type.get('fields', []) |
| obj_fields: List[ObjectField] = [] |
| for sf in sub_fields_def: |
| fname = sf.get('name', '') |
| ftype = sf.get('type', 'BINARY') |
| sub_schema = _parse_sub_schema(ftype) |
| obj_fields.append(ObjectField(fname, sub_schema)) |
| result[col_name] = obj_fields |
| else: |
| # Top-level field type is not a ROW — skip (can't shred a non-object schema) |
| pass |
| |
| return result |
| |
| |
| def _fid(field_id: int) -> dict: |
| """Return PyArrow field metadata dict that sets the Parquet field ID. |
| |
| PyArrow respects the ``PARQUET:field_id`` key when writing Parquet, which |
| ensures ``parquetType.getId()`` returns a non-null value on the Java reader |
| side (``ParquetSchemaConverter.convertToPaimonField`` calls |
| ``parquetType.getId().intValue()`` unconditionally). |
| """ |
| return {b'PARQUET:field_id': str(field_id).encode()} |
| |
| |
| def _leaf_arrow_type_for_write(schema: VariantSchema) -> pa.DataType: |
| """Return the Arrow type for the ``typed_value`` leaf of a sub-field struct. |
| |
| Used by ``sub_field_output_type``; field IDs are NOT embedded here since the |
| result describes output column types, not Parquet-serialised fields. |
| """ |
| if schema.scalar_arrow_type is not None: |
| return schema.scalar_arrow_type |
| if schema.object_fields is not None: |
| return pa.struct([ |
| pa.field(of.field_name, pa.struct([ |
| pa.field('value', pa.binary(), nullable=True), |
| pa.field('typed_value', _leaf_arrow_type_for_write(of.schema), nullable=True), |
| ]), nullable=True) |
| for of in schema.object_fields |
| ]) |
| return pa.binary() |
| |
| |
| def _leaf_arrow_type_for_write_with_ids(schema: VariantSchema) -> pa.DataType: |
| """Like ``_leaf_arrow_type_for_write`` but embeds ``PARQUET:field_id`` metadata. |
| |
| Used by ``shredding_schema_to_arrow_type`` so that Parquet field IDs are |
| present in every nested field of the written file. |
| """ |
| if schema.scalar_arrow_type is not None: |
| return schema.scalar_arrow_type |
| if schema.object_fields is not None: |
| inner_fields = [] |
| for i, of in enumerate(schema.object_fields): |
| inner_sub = pa.struct([ |
| pa.field('value', pa.binary(), nullable=True, metadata=_fid(0)), |
| pa.field( |
| 'typed_value', |
| _leaf_arrow_type_for_write_with_ids(of.schema), |
| nullable=True, |
| metadata=_fid(1), |
| ), |
| ]) |
| inner_fields.append( |
| pa.field(of.field_name, inner_sub, nullable=False, metadata=_fid(i + 1)) |
| ) |
| return pa.struct(inner_fields) |
| return pa.binary() |
| |
| |
| def shredding_schema_to_arrow_type(obj_fields: List[ObjectField]) -> pa.StructType: |
| """Convert an ``[ObjectField]`` list into the PyArrow struct type for a shredded column. |
| |
| The produced type is the canonical Parquet shredding layout:: |
| |
| struct< |
| metadata: binary NOT NULL, (field_id=0) |
| value: binary, (field_id=1) |
| typed_value: struct< (field_id=2) |
| field_a: struct< (field_id=1, NOT NULL) |
| value: binary, (field_id=0) |
| typed_value: <type_a> (field_id=1) |
| >, |
| ... |
| > |
| > |
| |
| ``PARQUET:field_id`` metadata is embedded on every field so that the Java |
| ``ParquetSchemaConverter.convertToPaimonField`` can call |
| ``parquetType.getId().intValue()`` without a NullPointerException. |
| |
| Sub-field structs within ``typed_value`` are marked NOT NULL (``nullable=False``) |
| to match the Java shredding schema where each named sub-field carries a |
| ``.notNull()`` annotation. |
| """ |
| sub_field_defs = [] |
| for i, of in enumerate(obj_fields): |
| typed_val_type = _leaf_arrow_type_for_write_with_ids(of.schema) |
| sub_struct = pa.struct([ |
| pa.field('value', pa.binary(), nullable=True, metadata=_fid(0)), |
| pa.field('typed_value', typed_val_type, nullable=True, metadata=_fid(1)), |
| ]) |
| # Java's variantShreddingSchema marks each named sub-field as .notNull() |
| sub_field_defs.append(pa.field(of.field_name, sub_struct, nullable=False, metadata=_fid(i + 1))) |
| |
| return pa.struct([ |
| pa.field('metadata', pa.binary(), nullable=False, metadata=_fid(0)), |
| pa.field('value', pa.binary(), nullable=True, metadata=_fid(1)), |
| pa.field('typed_value', pa.struct(sub_field_defs), nullable=True, metadata=_fid(2)), |
| ]) |
| |
| |
| def _decompose_field_bytes( |
| field_bytes: bytes, |
| schema: VariantSchema, |
| metadata: bytes, |
| ) -> dict: |
| """Decompose a variant field's value bytes into a ``{value, typed_value}`` sub-struct dict. |
| |
| This is the write-direction counterpart of ``rebuild_value()``. |
| |
| Args: |
| field_bytes: Variant value bytes for a single field extracted from the |
| original variant binary. |
| schema: VariantSchema describing how this field should be shredded. |
| metadata: Top-level metadata bytes (used for key ID lookups in nested objects). |
| |
| Returns: |
| A Python dict ``{'value': bytes_or_none, 'typed_value': val_or_dict_or_none}`` |
| suitable for building a PyArrow struct array row. |
| """ |
| from pypaimon.data.generic_variant import GenericVariant |
| |
| if schema.scalar_arrow_type is not None: |
| try: |
| py_val = GenericVariant(field_bytes, metadata).to_python() |
| except Exception: |
| return {'value': field_bytes, 'typed_value': None} |
| return {'value': None, 'typed_value': py_val} |
| |
| if schema.object_fields is not None: |
| if not field_bytes or (field_bytes[0] & 0x3) != _OBJECT: |
| return {'value': field_bytes, 'typed_value': None} |
| |
| all_sub = _extract_overflow_fields(field_bytes) |
| key_dict = parse_metadata_dict(metadata) |
| id_to_name = {v: k for k, v in key_dict.items()} |
| shredded_names = {of.field_name for of in schema.object_fields} |
| |
| overflow_pairs = [ |
| (kid, fb) for kid, fb in all_sub |
| if id_to_name.get(kid) not in shredded_names |
| ] |
| shredded_by_name = { |
| id_to_name[kid]: fb for kid, fb in all_sub |
| if kid in id_to_name and id_to_name[kid] in shredded_names |
| } |
| |
| typed_value: Dict[str, dict] = {} |
| for of in schema.object_fields: |
| fname = of.field_name |
| if fname in shredded_by_name: |
| typed_value[fname] = _decompose_field_bytes( |
| shredded_by_name[fname], of.schema, metadata |
| ) |
| else: |
| typed_value[fname] = {'value': None, 'typed_value': None} |
| |
| overflow_bytes = _build_object_value(overflow_pairs) if overflow_pairs else None |
| return {'value': overflow_bytes, 'typed_value': typed_value} |
| |
| # No shredding sub-schema: treat field bytes as overflow |
| return {'value': field_bytes, 'typed_value': None} |
| |
| |
| def decompose_variant( |
| gv: 'GenericVariant', |
| obj_fields: List[ObjectField], |
| ) -> dict: |
| """Decompose a ``GenericVariant`` into a shredded row dict for writing. |
| |
| This is the inverse of ``rebuild()`` / ``rebuild_value()`` — it takes a |
| fully-encoded variant and splits it into the shredded ``{metadata, value, |
| typed_value}`` structure that Parquet shredding expects. |
| |
| Args: |
| gv: The ``GenericVariant`` to decompose. |
| obj_fields: List of ``ObjectField`` from ``parse_shredding_schema_option()``, |
| describing which top-level object keys to shred. |
| |
| Returns: |
| A Python dict matching the Arrow type produced by |
| ``shredding_schema_to_arrow_type(obj_fields)``. |
| """ |
| metadata = gv.metadata() |
| value_bytes = gv.value() |
| |
| # Non-object variants cannot be shredded: put everything in overflow and set |
| # typed_value to NULL so the Java reader falls through to the overflow path. |
| if not value_bytes or (value_bytes[0] & 0x3) != _OBJECT: |
| return {'metadata': metadata, 'value': value_bytes, 'typed_value': None} |
| |
| all_fields = _extract_overflow_fields(value_bytes) |
| key_dict = parse_metadata_dict(metadata) |
| id_to_name = {v: k for k, v in key_dict.items()} |
| shredded_names = {of.field_name for of in obj_fields} |
| |
| overflow_pairs = [ |
| (kid, fb) for kid, fb in all_fields |
| if id_to_name.get(kid) not in shredded_names |
| ] |
| shredded_by_name = { |
| id_to_name[kid]: fb for kid, fb in all_fields |
| if kid in id_to_name and id_to_name[kid] in shredded_names |
| } |
| |
| typed_value = {} |
| for of in obj_fields: |
| fname = of.field_name |
| if fname in shredded_by_name: |
| typed_value[fname] = _decompose_field_bytes( |
| shredded_by_name[fname], of.schema, metadata |
| ) |
| else: |
| typed_value[fname] = {'value': None, 'typed_value': None} |
| |
| overflow_bytes = _build_object_value(overflow_pairs) if overflow_pairs else None |
| return {'metadata': metadata, 'value': overflow_bytes, 'typed_value': typed_value} |
| |
| |
| def shred_variant_column( |
| column: pa.Array, |
| obj_fields: List[ObjectField], |
| target_type: pa.StructType, |
| ) -> pa.Array: |
| """Convert a standard VARIANT column to its shredded representation. |
| |
| Args: |
| column: A ``pa.Array`` of type ``struct<value: binary, metadata: binary>`` |
| (the standard Paimon VARIANT layout). |
| obj_fields: ``[ObjectField, ...]`` from ``parse_shredding_schema_option()``. |
| target_type: The Arrow struct type from ``shredding_schema_to_arrow_type(obj_fields)``. |
| |
| Returns: |
| A ``pa.StructArray`` with ``target_type`` suitable for writing to Parquet |
| in the shredded format. |
| """ |
| from pypaimon.data.generic_variant import GenericVariant |
| |
| rows = column.to_pylist() |
| result = [] |
| for row in rows: |
| if row is None: |
| result.append(None) |
| else: |
| gv = GenericVariant.from_arrow_struct(row) |
| result.append(decompose_variant(gv, obj_fields)) |
| return pa.array(result, type=target_type) |