blob: 4307c8ecffe472051c8fb9649f2f5e5a5661d34c [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Unit tests for VARIANT type support in pypaimon.
Covers the user-facing features described in the Python API documentation:
- Plain VARIANT: ordinary write (GenericVariant → Parquet) and read.
- Shredded VARIANT: write-side decomposition (shred_variant_column) and
read-side transparent assembly (assemble_shredded_column).
Sections
--------
1. Type-system layer – schema parsing, Paimon ↔ Arrow type mapping.
2. GenericVariant container – construction, inspection, JSON round-trip.
3. Plain VARIANT I/O – PyArrow-level Parquet write/read sanity check.
4. Shredded schema utils – is_shredded_variant, parse_metadata_dict,
build_variant_schema.
5. Binary encoding helpers – _encode_scalar_to_value_bytes, _build_object_value,
_build_array_value.
6. Read-path assembly – rebuild_value, rebuild, assemble_shredded_column.
7. Write-path shredding – parse_shredding_schema_option,
shredding_schema_to_arrow_type, decompose_variant.
8. Full-chain via Paimon API – write then read through CatalogFactory / Schema /
write_arrow / to_arrow for both plain VARIANT and
shredded VARIANT.
"""
import io
import json
import os
import shutil
import struct as _struct
import tempfile
import unittest
import pyarrow as pa
import pyarrow.parquet as pq
from pypaimon import CatalogFactory, Schema
from pypaimon.data.generic_variant import GenericVariant
from pypaimon.data.variant_shredding import (
VARIANT_ARROW_TYPE,
VariantSchema,
_NULL_VALUE_BYTES,
_build_array_value,
_build_object_value,
_encode_scalar_to_value_bytes,
assemble_shredded_column,
build_variant_schema,
decompose_variant,
is_shredded_variant,
parse_metadata_dict,
parse_shredding_schema_option,
rebuild,
rebuild_value,
shredding_schema_to_arrow_type,
)
from pypaimon.schema.data_types import (
AtomicType,
DataField,
DataTypeParser,
PyarrowFieldParser,
RowType,
is_variant_struct,
)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _variant_arrow_type() -> pa.StructType:
"""The canonical Arrow representation of a plain VARIANT column."""
return pa.struct([
pa.field('value', pa.binary(), nullable=False),
pa.field('metadata', pa.binary(), nullable=False),
])
def _make_metadata(*keys: str) -> bytes:
"""Build variant metadata bytes. With no keys returns the minimal header."""
if keys:
return GenericVariant.from_python({k: None for k in keys}).metadata()
return b'\x01\x00' # version=1, dict_size=0
def _scalar_sub_struct(arrow_type: pa.DataType) -> pa.StructType:
"""Return struct<value: binary, typed_value: arrow_type> for one scalar sub-field."""
return pa.struct([
pa.field('value', pa.binary(), nullable=True),
pa.field('typed_value', arrow_type, nullable=True),
])
def _object_shredded_type(*key_type_pairs) -> pa.StructType:
"""Build a top-level shredded VARIANT struct with the given (key, arrow_type) sub-fields."""
tv_fields = [
pa.field(name, _scalar_sub_struct(atype), nullable=True)
for name, atype in key_type_pairs
]
return pa.struct([
pa.field('metadata', pa.binary(), nullable=False),
pa.field('value', pa.binary(), nullable=True),
pa.field('typed_value', pa.struct(tv_fields), nullable=True),
])
def _schema_json(col_name: str, sub_fields) -> str:
"""Build a variant.shreddingSchema JSON string for the given column and sub-fields.
Args:
col_name: VARIANT column name.
sub_fields: list of (field_name, paimon_type_str) pairs, e.g.
[('age', 'BIGINT'), ('city', 'VARCHAR')].
"""
return json.dumps({
'type': 'ROW',
'fields': [
{
'id': 0,
'name': col_name,
'type': {
'type': 'ROW',
'fields': [
{'id': i, 'name': name, 'type': dtype}
for i, (name, dtype) in enumerate(sub_fields)
],
},
}
],
})
# ===========================================================================
# 1. Type-system layer
# ===========================================================================
class TestVariantSchemaParsing(unittest.TestCase):
def test_parse_variant_keyword(self):
dt = DataTypeParser.parse_atomic_type_sql_string('VARIANT')
self.assertIsInstance(dt, AtomicType)
self.assertEqual(dt.type, 'VARIANT')
self.assertTrue(dt.nullable)
def test_parse_variant_not_null(self):
dt = DataTypeParser.parse_atomic_type_sql_string('VARIANT NOT NULL')
self.assertIsInstance(dt, AtomicType)
self.assertFalse(dt.nullable)
def test_variant_to_dict_roundtrip(self):
dt = AtomicType('VARIANT')
restored = DataTypeParser.parse_data_type(dt.to_dict())
self.assertEqual(dt, restored)
def test_variant_str(self):
self.assertEqual(str(AtomicType('VARIANT')), 'VARIANT')
self.assertEqual(str(AtomicType('VARIANT', nullable=False)), 'VARIANT NOT NULL')
class TestVariantFromPaimonType(unittest.TestCase):
def _arrow_type(self):
return PyarrowFieldParser.from_paimon_type(AtomicType('VARIANT'))
def test_returns_struct(self):
self.assertTrue(pa.types.is_struct(self._arrow_type()))
self.assertEqual(self._arrow_type().num_fields, 2)
def test_field_names(self):
t = self._arrow_type()
self.assertEqual(t.field(0).name, 'value')
self.assertEqual(t.field(1).name, 'metadata')
def test_field_types_are_binary(self):
t = self._arrow_type()
self.assertTrue(pa.types.is_binary(t.field(0).type))
self.assertTrue(pa.types.is_binary(t.field(1).type))
def test_fields_not_nullable(self):
t = self._arrow_type()
self.assertFalse(t.field(0).nullable)
self.assertFalse(t.field(1).nullable)
def test_from_paimon_field(self):
df = DataField(id=0, name='payload', type=AtomicType('VARIANT'))
pa_field = PyarrowFieldParser.from_paimon_field(df)
self.assertEqual(pa_field.name, 'payload')
self.assertTrue(pa.types.is_struct(pa_field.type))
self.assertTrue(pa_field.nullable)
def test_from_paimon_schema(self):
fields = [
DataField(id=0, name='id', type=AtomicType('BIGINT')),
DataField(id=1, name='payload', type=AtomicType('VARIANT')),
]
schema = PyarrowFieldParser.from_paimon_schema(fields)
self.assertEqual(schema.field('payload').type, _variant_arrow_type())
class TestVariantToPaimonType(unittest.TestCase):
def test_is_variant_struct_positive(self):
self.assertTrue(is_variant_struct(_variant_arrow_type()))
def test_is_variant_struct_wrong_names(self):
st = pa.struct([
pa.field('val', pa.binary(), nullable=False),
pa.field('meta', pa.binary(), nullable=False),
])
self.assertFalse(is_variant_struct(st))
def test_is_variant_struct_nullable_fields(self):
st = pa.struct([
pa.field('value', pa.binary(), nullable=True),
pa.field('metadata', pa.binary(), nullable=False),
])
self.assertFalse(is_variant_struct(st))
def test_is_variant_struct_wrong_types(self):
st = pa.struct([
pa.field('value', pa.string(), nullable=False),
pa.field('metadata', pa.binary(), nullable=False),
])
self.assertFalse(is_variant_struct(st))
def test_is_variant_struct_extra_fields(self):
st = pa.struct([
pa.field('value', pa.binary(), nullable=False),
pa.field('metadata', pa.binary(), nullable=False),
pa.field('typed_value', pa.int64(), nullable=True),
])
self.assertFalse(is_variant_struct(st))
def test_to_paimon_type_variant(self):
result = PyarrowFieldParser.to_paimon_type(_variant_arrow_type(), nullable=True)
self.assertIsInstance(result, AtomicType)
self.assertEqual(result.type, 'VARIANT')
self.assertTrue(result.nullable)
def test_to_paimon_type_variant_not_null(self):
result = PyarrowFieldParser.to_paimon_type(_variant_arrow_type(), nullable=False)
self.assertFalse(result.nullable)
def test_ordinary_struct_maps_to_row_type(self):
st = pa.struct([pa.field('a', pa.int32()), pa.field('b', pa.string())])
result = PyarrowFieldParser.to_paimon_type(st, nullable=True)
self.assertIsInstance(result, RowType)
def test_struct_same_names_different_types_is_row_type(self):
st = pa.struct([
pa.field('value', pa.string(), nullable=False),
pa.field('metadata', pa.string(), nullable=False),
])
result = PyarrowFieldParser.to_paimon_type(st, nullable=True)
self.assertIsInstance(result, RowType)
class TestVariantSchemaRoundTrip(unittest.TestCase):
def test_paimon_to_arrow_to_paimon(self):
original = DataField(id=0, name='v', type=AtomicType('VARIANT'))
pa_field = PyarrowFieldParser.from_paimon_field(original)
restored = PyarrowFieldParser.to_paimon_type(pa_field.type, pa_field.nullable)
self.assertIsInstance(restored, AtomicType)
self.assertEqual(restored.type, 'VARIANT')
def test_mixed_schema_round_trip(self):
fields = [
DataField(id=0, name='id', type=AtomicType('BIGINT')),
DataField(id=1, name='payload', type=AtomicType('VARIANT')),
DataField(id=2, name='ts', type=AtomicType('TIMESTAMP(6)')),
]
pa_schema = PyarrowFieldParser.from_paimon_schema(fields)
restored = PyarrowFieldParser.to_paimon_schema(pa_schema)
self.assertEqual(restored[1].name, 'payload')
self.assertIsInstance(restored[1].type, AtomicType)
self.assertEqual(restored[1].type.type, 'VARIANT')
self.assertEqual(restored[2].name, 'ts')
# ===========================================================================
# 2. GenericVariant container
# ===========================================================================
class TestGenericVariantContainer(unittest.TestCase):
def test_from_python_returns_instance(self):
gv = GenericVariant.from_python({'a': 1, 'b': 'hello'})
self.assertIsInstance(gv, GenericVariant)
self.assertIsInstance(gv.value(), bytes)
def test_from_arrow_struct_roundtrip(self):
original = GenericVariant.from_python({'x': 1, 'y': 2})
restored = GenericVariant.from_arrow_struct(
{'value': original.value(), 'metadata': original.metadata()})
self.assertEqual(restored.value(), original.value())
self.assertEqual(restored.metadata(), original.metadata())
def test_to_python_object(self):
gv = GenericVariant.from_python({'age': 30, 'city': 'Beijing'})
result = gv.to_python()
self.assertEqual(result, {'age': 30, 'city': 'Beijing'})
def test_to_python_array(self):
gv = GenericVariant.from_python([1, 2, 3])
self.assertEqual(gv.to_python(), [1, 2, 3])
def test_to_python_null(self):
gv = GenericVariant.from_python(None)
self.assertIsNone(gv.to_python())
def test_to_python_string(self):
gv = GenericVariant.from_python('hello')
self.assertEqual(gv.to_python(), 'hello')
def test_to_python_number(self):
gv = GenericVariant.from_python(42)
self.assertEqual(gv.to_python(), 42)
def test_from_python_none(self):
gv = GenericVariant.from_python(None)
self.assertIsNone(gv.to_python())
def test_from_python_nested(self):
obj = {'user': {'name': 'Alice', 'scores': [10, 20, 30]}, 'active': True}
gv = GenericVariant.from_python(obj)
result = gv.to_python()
self.assertEqual(result['user']['name'], 'Alice')
self.assertEqual(result['user']['scores'], [10, 20, 30])
self.assertTrue(result['active'])
def test_equality(self):
gv1 = GenericVariant.from_python({'a': 1})
gv2 = GenericVariant.from_python({'a': 1})
self.assertEqual(gv1, gv2)
def test_repr_and_str(self):
gv = GenericVariant.from_python('hello')
self.assertIn('hello', repr(gv))
self.assertIn('hello', str(gv))
class TestToArrowArray(unittest.TestCase):
def test_basic(self):
gv1 = GenericVariant.from_python({'a': 1})
gv2 = GenericVariant.from_python([1, 2])
arr = GenericVariant.to_arrow_array([gv1, gv2])
self.assertIsInstance(arr, pa.StructArray)
self.assertEqual(len(arr), 2)
restored = GenericVariant.from_arrow_struct(arr[0].as_py())
self.assertEqual(restored.to_python(), {'a': 1})
def test_with_nulls(self):
arr = GenericVariant.to_arrow_array([GenericVariant.from_python(42), None])
self.assertEqual(len(arr), 2)
self.assertIsNotNone(arr[0].as_py())
self.assertIsNone(arr[1].as_py())
def test_empty(self):
self.assertEqual(len(GenericVariant.to_arrow_array([])), 0)
def test_arrow_type(self):
gv = GenericVariant.from_python(True)
arr = GenericVariant.to_arrow_array([gv])
self.assertEqual(arr.type, _variant_arrow_type())
class TestJsonRoundtrip(unittest.TestCase):
def _check(self, json_str):
gv = GenericVariant.from_python(json.loads(json_str))
self.assertEqual(gv.to_python(), json.loads(json_str))
def test_nested_object_array(self):
self._check('{"users":[{"name":"Alice","age":30},{"name":"Bob","age":25}]}')
def test_deep_nesting(self):
self._check('{"a":{"b":{"c":{"d":42}}}}')
def test_array_of_objects(self):
self._check('[{"x":1},{"x":2},{"x":3}]')
def test_all_primitive_types(self):
self._check('{"n":null,"b":true,"i":42,"s":"hello","f":1.5}')
def test_empty_object(self):
gv = GenericVariant.from_python({})
self.assertEqual(gv.to_python(), {})
def test_empty_array(self):
gv = GenericVariant.from_python([])
self.assertEqual(gv.to_python(), [])
# ===========================================================================
# 3. Plain VARIANT — PyArrow-level Parquet sanity check
# (verifies the physical struct<value, metadata> layout works in Parquet)
# ===========================================================================
def _make_variant_bytes(json_str: str) -> bytes:
"""Produce a minimal VARIANT value payload encoding a long-string primitive."""
payload = json_str.encode('utf-8')
return _struct.pack('<B', 0x15) + _struct.pack('<I', len(payload)) + payload
class TestVariantParquetCycle(unittest.TestCase):
def test_write_and_read_parquet_values(self):
"""Write struct<value, metadata> bytes to Parquet and verify schema + values."""
meta = _make_metadata()
payload_col = pa.array(
[{'value': _make_variant_bytes('{"key":"hello"}'), 'metadata': meta},
{'value': _make_variant_bytes('42'), 'metadata': meta}],
type=_variant_arrow_type(),
)
original = pa.table({'id': [1, 2], 'payload': payload_col})
buf = io.BytesIO()
pq.write_table(original, buf)
buf.seek(0)
restored = pq.read_table(buf)
self.assertEqual(restored.schema.field('payload').type, _variant_arrow_type())
self.assertEqual(restored.num_rows, 2)
row0 = restored.column('payload')[0].as_py()
self.assertEqual(row0['value'], _make_variant_bytes('{"key":"hello"}'))
self.assertEqual(row0['metadata'], meta)
def test_null_variant_row(self):
"""SQL-NULL VARIANT row survives Parquet round-trip as None."""
payload_col = pa.array(
[None, {'value': _make_variant_bytes('true'), 'metadata': _make_metadata()}],
type=_variant_arrow_type(),
)
buf = io.BytesIO()
pq.write_table(pa.table({'id': [1, 2], 'payload': payload_col}), buf)
buf.seek(0)
restored = pq.read_table(buf)
self.assertIsNone(restored.column('payload')[0].as_py())
self.assertIsNotNone(restored.column('payload')[1].as_py())
def test_to_arrow_array_roundtrip_parquet(self):
"""GenericVariant.to_arrow_array() produces bytes that survive a Parquet cycle."""
gvs = [
GenericVariant.from_python({'score': 1}),
GenericVariant.from_python({'score': 2}),
None,
]
col = GenericVariant.to_arrow_array(gvs)
buf = io.BytesIO()
pq.write_table(pa.table({'v': col}), buf)
buf.seek(0)
restored = pq.read_table(buf).column('v')
self.assertEqual(
GenericVariant.from_arrow_struct(restored[0].as_py()).to_python(), {'score': 1})
self.assertEqual(
GenericVariant.from_arrow_struct(restored[1].as_py()).to_python(), {'score': 2})
self.assertIsNone(restored[2].as_py())
# ===========================================================================
# 4. Shredded VARIANT — schema detection and parsing
# ===========================================================================
class TestIsShredded(unittest.TestCase):
def test_shredded_is_detected(self):
t = _object_shredded_type(('age', pa.int64()))
self.assertTrue(is_shredded_variant(t))
def test_plain_variant_not_shredded(self):
plain = pa.struct([
pa.field('value', pa.binary(), nullable=False),
pa.field('metadata', pa.binary(), nullable=False),
])
self.assertFalse(is_shredded_variant(plain))
def test_arbitrary_struct_not_shredded(self):
t = pa.struct([pa.field('a', pa.int32()), pa.field('b', pa.string())])
self.assertFalse(is_shredded_variant(t))
def test_non_struct_not_shredded(self):
self.assertFalse(is_shredded_variant(pa.binary()))
self.assertFalse(is_shredded_variant(pa.int64()))
def test_missing_typed_value_not_shredded(self):
t = pa.struct([
pa.field('metadata', pa.binary()),
pa.field('value', pa.binary()),
])
self.assertFalse(is_shredded_variant(t))
class TestParseMetadataDict(unittest.TestCase):
def test_single_key(self):
meta = _make_metadata('age')
d = parse_metadata_dict(meta)
self.assertIn('age', d)
self.assertEqual(d['age'], 0)
def test_multiple_keys(self):
meta = _make_metadata('age', 'name', 'city')
d = parse_metadata_dict(meta)
self.assertEqual(set(d.keys()), {'age', 'name', 'city'})
self.assertEqual(d['age'], 0)
self.assertEqual(d['name'], 1)
self.assertEqual(d['city'], 2)
def test_empty_metadata(self):
meta = b'\x01\x00'
d = parse_metadata_dict(meta)
self.assertEqual(d, {})
def test_empty_bytes(self):
d = parse_metadata_dict(b'')
self.assertEqual(d, {})
class TestBuildVariantSchema(unittest.TestCase):
def test_simple_scalar(self):
t = _object_shredded_type(('age', pa.int64()), ('name', pa.string()))
schema = build_variant_schema(t)
self.assertEqual(schema.metadata_idx, 0)
self.assertEqual(schema.value_idx, 1)
self.assertEqual(schema.typed_idx, 2)
self.assertIsNotNone(schema.object_fields)
self.assertEqual(len(schema.object_fields), 2)
self.assertEqual(schema.object_fields[0].field_name, 'age')
self.assertEqual(schema.object_fields[1].field_name, 'name')
def test_sub_field_scalar_type(self):
t = _object_shredded_type(('age', pa.int64()))
schema = build_variant_schema(t)
age_schema = schema.object_fields[0].schema
self.assertEqual(age_schema.scalar_arrow_type, pa.int64())
self.assertEqual(age_schema.value_idx, 0)
self.assertEqual(age_schema.typed_idx, 1)
def test_object_schema_map(self):
t = _object_shredded_type(('age', pa.int64()), ('name', pa.string()))
schema = build_variant_schema(t)
self.assertIn('age', schema.object_schema_map)
self.assertIn('name', schema.object_schema_map)
def test_is_unshredded_for_plain_variant(self):
schema = VariantSchema(metadata_idx=0, value_idx=1)
self.assertTrue(schema.is_unshredded())
def test_not_unshredded_for_shredded_schema(self):
t = _object_shredded_type(('age', pa.int64()))
schema = build_variant_schema(t)
self.assertFalse(schema.is_unshredded())
def test_no_typed_value_field(self):
plain = pa.struct([
pa.field('metadata', pa.binary(), nullable=False),
pa.field('value', pa.binary(), nullable=True),
])
schema = build_variant_schema(plain)
self.assertEqual(schema.metadata_idx, 0)
self.assertEqual(schema.value_idx, 1)
self.assertEqual(schema.typed_idx, -1)
self.assertTrue(schema.is_unshredded())
# ===========================================================================
# 5. Binary encoding helpers
# ===========================================================================
class TestEncodeScalar(unittest.TestCase):
def _roundtrip(self, json_str: str, arrow_type: pa.DataType):
"""Encode a scalar to bytes via the Arrow type, then decode via GenericVariant."""
gv_orig = GenericVariant.from_python(json.loads(json_str))
typed_value = pa.array([gv_orig.to_python()], type=arrow_type).to_pylist()[0]
value_bytes = _encode_scalar_to_value_bytes(typed_value, arrow_type)
gv = GenericVariant(value_bytes, b'\x01\x00')
return gv.to_python()
def test_int(self):
self.assertEqual(self._roundtrip('42', pa.int64()), 42)
def test_float(self):
value_bytes = _encode_scalar_to_value_bytes(3.14, pa.float64())
gv = GenericVariant(value_bytes, b'\x01\x00')
self.assertAlmostEqual(gv.to_python(), 3.14, places=5)
def test_bool_true(self):
self.assertEqual(self._roundtrip('true', pa.bool_()), True)
def test_bool_false(self):
self.assertEqual(self._roundtrip('false', pa.bool_()), False)
def test_string(self):
self.assertEqual(self._roundtrip('"hello"', pa.string()), 'hello')
def test_null(self):
value_bytes = _encode_scalar_to_value_bytes(None, pa.int64())
gv = GenericVariant(value_bytes, b'\x01\x00')
self.assertIsNone(gv.to_python())
class TestBuildBinary(unittest.TestCase):
def test_build_object_empty(self):
obj_bytes = _build_object_value([])
gv = GenericVariant(obj_bytes, b'\x01\x00')
self.assertEqual(gv.to_python(), {})
def test_build_object_one_field(self):
meta = _make_metadata('age')
key_dict = parse_metadata_dict(meta)
age_val = _encode_scalar_to_value_bytes(30, pa.int64())
obj_bytes = _build_object_value([(key_dict['age'], age_val)])
gv = GenericVariant(obj_bytes, meta)
self.assertEqual(gv.to_python(), {'age': 30})
def test_build_array_empty(self):
arr_bytes = _build_array_value([])
gv = GenericVariant(arr_bytes, b'\x01\x00')
self.assertEqual(gv.to_python(), [])
def test_build_array_three_ints(self):
elem1 = _encode_scalar_to_value_bytes(1, pa.int64())
elem2 = _encode_scalar_to_value_bytes(2, pa.int64())
elem3 = _encode_scalar_to_value_bytes(3, pa.int64())
arr_bytes = _build_array_value([elem1, elem2, elem3])
gv = GenericVariant(arr_bytes, b'\x01\x00')
self.assertEqual(gv.to_python(), [1, 2, 3])
def test_build_array_with_null(self):
elem1 = _encode_scalar_to_value_bytes(1, pa.int64())
arr_bytes = _build_array_value([elem1, _NULL_VALUE_BYTES])
gv = GenericVariant(arr_bytes, b'\x01\x00')
result = gv.to_python()
self.assertEqual(result[0], 1)
self.assertIsNone(result[1])
# ===========================================================================
# 6. Read-path assembly
# ===========================================================================
class TestRebuildValue(unittest.TestCase):
"""Test the core rebuild_value() function for various shredding scenarios."""
def _age_schema(self) -> VariantSchema:
"""Schema for a sub-field struct {value: binary, typed_value: int64}."""
return build_variant_schema(_scalar_sub_struct(pa.int64()))
def _metadata_and_dict(self, *keys: str):
meta = _make_metadata(*keys)
return meta, parse_metadata_dict(meta)
def test_scalar_typed_value(self):
schema = self._age_schema()
row = {'value': None, 'typed_value': 30}
result = rebuild_value(row, schema, {})
gv = GenericVariant(result, b'\x01\x00')
self.assertEqual(gv.to_python(), 30)
def test_scalar_overflow(self):
"""When typed_value is None, fall back to overflow value bytes."""
age_val = _encode_scalar_to_value_bytes(42, pa.int64())
schema = self._age_schema()
row = {'value': age_val, 'typed_value': None}
result = rebuild_value(row, schema, {})
gv = GenericVariant(result, b'\x01\x00')
self.assertEqual(gv.to_python(), 42)
def test_both_null_returns_none(self):
"""Both value and typed_value being None signals an absent field."""
schema = self._age_schema()
row = {'value': None, 'typed_value': None}
self.assertIsNone(rebuild_value(row, schema, {}))
def test_object_with_shredded_fields(self):
meta, key_dict = self._metadata_and_dict('age', 'name')
t = _object_shredded_type(('age', pa.int64()), ('name', pa.string()))
schema = build_variant_schema(t)
row = {
'metadata': meta,
'value': None,
'typed_value': {
'age': {'value': None, 'typed_value': 30},
'name': {'value': None, 'typed_value': 'Alice'},
},
}
value_bytes, _ = rebuild(row, schema, key_dict)
gv = GenericVariant(value_bytes, meta)
result = gv.to_python()
self.assertEqual(result['age'], 30)
self.assertEqual(result['name'], 'Alice')
def test_object_absent_field_skipped(self):
"""A sub-field with both value=None and typed_value=None is omitted from output."""
meta, key_dict = self._metadata_and_dict('age', 'name')
t = _object_shredded_type(('age', pa.int64()), ('name', pa.string()))
schema = build_variant_schema(t)
row = {
'metadata': meta,
'value': None,
'typed_value': {
'age': {'value': None, 'typed_value': 25},
'name': {'value': None, 'typed_value': None}, # absent
},
}
value_bytes, _ = rebuild(row, schema, key_dict)
gv = GenericVariant(value_bytes, meta)
result = gv.to_python()
self.assertEqual(result['age'], 25)
self.assertNotIn('name', result)
def test_object_with_overflow(self):
"""Fields not in typed_value are preserved from overflow bytes."""
original = GenericVariant.from_python({'age': 30, 'extra': 'overflow_val'})
overflow_bytes = original.value()
meta = original.metadata()
key_dict = parse_metadata_dict(meta)
t = _object_shredded_type(('age', pa.int64()))
schema = build_variant_schema(t)
row = {
'metadata': meta,
'value': overflow_bytes,
'typed_value': {
'age': {'value': None, 'typed_value': 30},
},
}
value_bytes = rebuild_value(row, schema, key_dict)
gv = GenericVariant(value_bytes, meta)
result = gv.to_python()
self.assertEqual(result['age'], 30)
self.assertIn('extra', result)
def test_typed_value_null_uses_overflow(self):
"""If typed_value struct is None for the whole row, full overflow bytes are used."""
original = GenericVariant.from_python({'age': 99})
meta = original.metadata()
key_dict = parse_metadata_dict(meta)
t = _object_shredded_type(('age', pa.int64()))
schema = build_variant_schema(t)
row = {
'metadata': meta,
'value': original.value(),
'typed_value': None, # typed_value absent for this row
}
value_bytes, _ = rebuild(row, schema, key_dict)
gv = GenericVariant(value_bytes, meta)
self.assertEqual(gv.to_python()['age'], 99)
def test_rebuild_matches_direct_variant(self):
"""Bytes rebuilt from shredded form must equal bytes from GenericVariant.from_python."""
original_json = '{"score": 42, "tag": "test"}'
original_gv = GenericVariant.from_python(json.loads(original_json))
meta = original_gv.metadata()
key_dict = parse_metadata_dict(meta)
t = _object_shredded_type(('score', pa.int64()), ('tag', pa.string()))
schema = build_variant_schema(t)
row = {
'metadata': meta,
'value': None,
'typed_value': {
'score': {'value': None, 'typed_value': 42},
'tag': {'value': None, 'typed_value': 'test'},
},
}
value_bytes, _ = rebuild(row, schema, key_dict)
gv_rebuilt = GenericVariant(value_bytes, meta)
self.assertEqual(gv_rebuilt.to_python(), original_gv.to_python())
class TestRebuild(unittest.TestCase):
def test_missing_metadata_raises(self):
schema = VariantSchema()
with self.assertRaises(ValueError):
rebuild({'value': None, 'typed_value': None}, schema)
def test_basic_roundtrip(self):
meta = _make_metadata('x')
key_dict = parse_metadata_dict(meta)
t = _object_shredded_type(('x', pa.int64()))
schema = build_variant_schema(t)
row = {
'metadata': meta,
'value': None,
'typed_value': {'x': {'value': None, 'typed_value': 7}},
}
value_bytes, ret_meta = rebuild(row, schema, key_dict)
self.assertEqual(ret_meta, meta)
gv = GenericVariant(value_bytes, ret_meta)
self.assertEqual(gv.to_python(), {'x': 7})
def test_auto_parse_key_dict(self):
"""When key_dict is None, rebuild() parses it automatically from the metadata field."""
meta = _make_metadata('y')
t = _object_shredded_type(('y', pa.string()))
schema = build_variant_schema(t)
row = {
'metadata': meta,
'value': None,
'typed_value': {'y': {'value': None, 'typed_value': 'hi'}},
}
value_bytes, _ = rebuild(row, schema)
gv = GenericVariant(value_bytes, meta)
self.assertEqual(gv.to_python(), {'y': 'hi'})
class TestAssembleShreddedColumn(unittest.TestCase):
def test_basic_two_rows(self):
"""assemble_shredded_column converts a shredded Arrow column to plain VARIANT."""
meta = _make_metadata('age', 'name')
t = _object_shredded_type(('age', pa.int64()), ('name', pa.string()))
rows = [
{'metadata': meta, 'value': None, 'typed_value': {
'age': {'value': None, 'typed_value': 30},
'name': {'value': None, 'typed_value': 'Alice'},
}},
{'metadata': meta, 'value': None, 'typed_value': {
'age': {'value': None, 'typed_value': 25},
'name': {'value': None, 'typed_value': 'Bob'},
}},
]
col = pa.array(rows, type=t)
schema = build_variant_schema(col.type)
result = assemble_shredded_column(col, schema)
self.assertEqual(result.type, VARIANT_ARROW_TYPE)
self.assertEqual(len(result), 2)
gv0 = GenericVariant.from_arrow_struct(result[0].as_py())
self.assertEqual(gv0.to_python(), {'age': 30, 'name': 'Alice'})
gv1 = GenericVariant.from_arrow_struct(result[1].as_py())
self.assertEqual(gv1.to_python(), {'age': 25, 'name': 'Bob'})
# ===========================================================================
# 7. Write-path shredding
# ===========================================================================
class TestShreddingWrite(unittest.TestCase):
"""Tests for the static-mode write shredding API."""
def _obj_fields_for(self, col_name, sub_fields):
"""Helper: build obj_fields for a single column from (name, type) pairs."""
return parse_shredding_schema_option(_schema_json(col_name, sub_fields))[col_name]
# -----------------------------------------------------------------------
# parse_shredding_schema_option
# -----------------------------------------------------------------------
def test_parse_single_scalar_field(self):
result = parse_shredding_schema_option(json.dumps({
'type': 'ROW',
'fields': [
{'id': 0, 'name': 'payload', 'type': {
'type': 'ROW',
'fields': [{'id': 0, 'name': 'age', 'type': 'INT'}],
}},
],
}))
self.assertIn('payload', result)
obj_fields = result['payload']
self.assertEqual(len(obj_fields), 1)
self.assertEqual(obj_fields[0].field_name, 'age')
self.assertIsNotNone(obj_fields[0].schema.scalar_arrow_type)
def test_parse_multiple_fields_multiple_cols(self):
result = parse_shredding_schema_option(json.dumps({
'type': 'ROW',
'fields': [
{'id': 0, 'name': 'col_a', 'type': {'type': 'ROW', 'fields': [
{'id': 0, 'name': 'x', 'type': 'BIGINT'},
{'id': 1, 'name': 'y', 'type': 'VARCHAR'},
]}},
{'id': 1, 'name': 'col_b', 'type': {'type': 'ROW', 'fields': [
{'id': 0, 'name': 'flag', 'type': 'BOOLEAN'},
]}},
],
}))
self.assertIn('col_a', result)
self.assertIn('col_b', result)
self.assertEqual(len(result['col_a']), 2)
self.assertEqual(result['col_a'][0].field_name, 'x')
self.assertEqual(result['col_a'][1].field_name, 'y')
self.assertEqual(result['col_b'][0].field_name, 'flag')
def test_parse_invalid_top_level_type_raises(self):
with self.assertRaises(ValueError):
parse_shredding_schema_option(json.dumps({'type': 'ARRAY', 'element': 'INT'}))
# -----------------------------------------------------------------------
# shredding_schema_to_arrow_type
# -----------------------------------------------------------------------
def test_arrow_type_structure_single_scalar(self):
obj_fields = self._obj_fields_for('col', [('score', 'BIGINT')])
arrow_type = shredding_schema_to_arrow_type(obj_fields)
self.assertTrue(pa.types.is_struct(arrow_type))
names = {arrow_type.field(i).name for i in range(arrow_type.num_fields)}
self.assertIn('metadata', names)
self.assertIn('value', names)
self.assertIn('typed_value', names)
tv = arrow_type.field('typed_value').type
self.assertTrue(pa.types.is_struct(tv))
self.assertEqual(tv.num_fields, 1)
self.assertEqual(tv.field(0).name, 'score')
sub = tv.field('score').type
self.assertTrue(pa.types.is_struct(sub))
sub_names = {sub.field(i).name for i in range(sub.num_fields)}
self.assertIn('value', sub_names)
self.assertIn('typed_value', sub_names)
# -----------------------------------------------------------------------
# decompose_variant
# -----------------------------------------------------------------------
def test_decompose_scalar_field_extracted(self):
"""A shredded scalar field ends up in typed_value; non-shredded fields go to overflow."""
obj_fields = self._obj_fields_for('col', [('age', 'BIGINT')])
gv = GenericVariant.from_python({'age': 25, 'name': 'alice'})
result = decompose_variant(gv, obj_fields)
self.assertIn('metadata', result)
self.assertIn('value', result)
self.assertIn('typed_value', result)
tv = result['typed_value']
self.assertIn('age', tv)
self.assertIsNone(tv['age']['value'])
self.assertEqual(tv['age']['typed_value'], 25)
# 'name' not in schema → must appear in overflow
overflow = result['value']
if overflow:
overflow_gv = GenericVariant(overflow, result['metadata'])
self.assertIn('name', overflow_gv.to_python())
def test_decompose_absent_field_is_null(self):
"""A shredded field absent from the variant yields {value: None, typed_value: None}."""
obj_fields = self._obj_fields_for('col', [('missing_field', 'BIGINT')])
gv = GenericVariant.from_python({'other': 99})
result = decompose_variant(gv, obj_fields)
tv = result['typed_value']
self.assertIsNone(tv['missing_field']['value'])
self.assertIsNone(tv['missing_field']['typed_value'])
def test_decompose_non_object_variant_all_overflow(self):
"""A non-object variant goes fully into overflow; typed_value is null (not a struct).
Java's ShreddingUtils.rebuild requires typed_value to be null (not a non-null struct
with all-null sub-fields) when the variant is not an object, so that the overflow
binary can be consumed directly without expecting it to be an OBJECT variant.
"""
obj_fields = self._obj_fields_for('col', [('x', 'BIGINT')])
gv = GenericVariant.from_python(42)
result = decompose_variant(gv, obj_fields)
self.assertIsNotNone(result['value'])
self.assertIsNone(result['typed_value'])
def test_decompose_multiple_typed_fields(self):
"""Multiple shredded fields are all correctly split from the variant."""
obj_fields = self._obj_fields_for('col', [('score', 'BIGINT'), ('tag', 'VARCHAR')])
gv = GenericVariant.from_python({'score': 100, 'tag': 'gold', 'extra': True})
result = decompose_variant(gv, obj_fields)
tv = result['typed_value']
self.assertEqual(tv['score']['typed_value'], 100)
self.assertEqual(tv['tag']['typed_value'], 'gold')
# 'extra' must be in overflow
overflow = result['value']
if overflow:
overflow_py = GenericVariant(overflow, result['metadata']).to_python()
self.assertIn('extra', overflow_py)
# ===========================================================================
# 8. Full-chain via Paimon API
# Write and read using CatalogFactory / Schema / write_arrow / to_arrow —
# the same code path used by real applications.
# ===========================================================================
class TestVariantPaimonTable(unittest.TestCase):
"""End-to-end tests that use the real Paimon Python API.
Each test creates its own table (unique name) within a shared in-process
warehouse so that tests are fully independent.
"""
@classmethod
def setUpClass(cls):
cls.tmpdir = tempfile.mkdtemp()
warehouse = os.path.join(cls.tmpdir, 'warehouse')
cls.catalog = CatalogFactory.create({'warehouse': warehouse})
cls.catalog.create_database('default', True)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdir, ignore_errors=True)
def _write_and_read(self, table, pa_data: pa.Table) -> pa.Table:
"""Write pa_data to the table and read all rows back."""
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_arrow(pa_data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
return read_builder.new_read().to_arrow(splits)
def _pa_schema(self):
return pa.schema([
pa.field('id', pa.int64()),
pa.field('payload', _variant_arrow_type()),
])
def test_plain_variant_write_and_read(self):
"""Plain VARIANT: GenericVariant → write_arrow → to_arrow → GenericVariant."""
schema = Schema.from_pyarrow_schema(self._pa_schema())
self.catalog.create_table('default.plain_variant', schema, False)
table = self.catalog.get_table('default.plain_variant')
gvs = [
GenericVariant.from_python({'age': 30, 'city': 'Beijing'}),
GenericVariant.from_python({'score': 99, 'active': True}),
GenericVariant.from_python([1, 2, 3]),
]
data = pa.table(
{'id': [1, 2, 3], 'payload': GenericVariant.to_arrow_array(gvs)},
schema=self._pa_schema(),
)
result = self._write_and_read(table, data)
self.assertEqual(result.num_rows, 3)
payload_col = result.column('payload')
gv0 = GenericVariant.from_arrow_struct(payload_col[0].as_py())
self.assertEqual(gv0.to_python(), {'age': 30, 'city': 'Beijing'})
gv1 = GenericVariant.from_arrow_struct(payload_col[1].as_py())
self.assertEqual(gv1.to_python(), {'score': 99, 'active': True})
gv2 = GenericVariant.from_arrow_struct(payload_col[2].as_py())
self.assertEqual(gv2.to_python(), [1, 2, 3])
def test_plain_variant_null_row(self):
"""SQL-NULL VARIANT rows are stored and retrieved as None."""
schema = Schema.from_pyarrow_schema(self._pa_schema())
self.catalog.create_table('default.plain_variant_null', schema, False)
table = self.catalog.get_table('default.plain_variant_null')
gvs = [GenericVariant.from_python({'x': 1}), None, GenericVariant.from_python({'x': 3})]
data = pa.table(
{'id': [1, 2, 3], 'payload': GenericVariant.to_arrow_array(gvs)},
schema=self._pa_schema(),
)
result = self._write_and_read(table, data)
payload_col = result.column('payload')
self.assertIsNotNone(payload_col[0].as_py())
self.assertIsNone(payload_col[1].as_py())
self.assertIsNotNone(payload_col[2].as_py())
def test_shredded_variant_write_and_read(self):
"""Shredded VARIANT: writer shreds automatically, reader assembles transparently."""
shredding_json = _schema_json('payload', [('age', 'BIGINT'), ('city', 'VARCHAR')])
schema = Schema.from_pyarrow_schema(
self._pa_schema(),
options={'variant.shreddingSchema': shredding_json},
)
self.catalog.create_table('default.shredded_variant', schema, False)
table = self.catalog.get_table('default.shredded_variant')
gvs = [
GenericVariant.from_python({'age': 28, 'city': 'Beijing'}),
GenericVariant.from_python({'age': 35, 'city': 'Shanghai'}),
]
data = pa.table(
{'id': [1, 2], 'payload': GenericVariant.to_arrow_array(gvs)},
schema=self._pa_schema(),
)
result = self._write_and_read(table, data)
self.assertEqual(result.num_rows, 2)
payload_col = result.column('payload')
py0 = GenericVariant.from_arrow_struct(payload_col[0].as_py()).to_python()
self.assertEqual(py0['age'], 28)
self.assertEqual(py0['city'], 'Beijing')
py1 = GenericVariant.from_arrow_struct(payload_col[1].as_py()).to_python()
self.assertEqual(py1['age'], 35)
self.assertEqual(py1['city'], 'Shanghai')
def test_shredded_variant_overflow_preserved(self):
"""Fields outside the shredding schema survive in overflow bytes end-to-end."""
shredding_json = _schema_json('payload', [('age', 'BIGINT')])
schema = Schema.from_pyarrow_schema(
self._pa_schema(),
options={'variant.shreddingSchema': shredding_json},
)
self.catalog.create_table('default.shredded_overflow', schema, False)
table = self.catalog.get_table('default.shredded_overflow')
# 'city' and 'active' are NOT in the shredding schema → stored as overflow
gv = GenericVariant.from_python({'age': 30, 'city': 'Beijing', 'active': True})
data = pa.table(
{'id': [1], 'payload': GenericVariant.to_arrow_array([gv])},
schema=self._pa_schema(),
)
result = self._write_and_read(table, data)
py = GenericVariant.from_arrow_struct(result.column('payload')[0].as_py()).to_python()
self.assertEqual(py['age'], 30)
self.assertEqual(py['city'], 'Beijing')
self.assertEqual(py['active'], True)
def test_shredded_variant_null_row(self):
"""SQL-NULL VARIANT rows survive the shred → assemble cycle as None."""
shredding_json = _schema_json('payload', [('x', 'BIGINT')])
schema = Schema.from_pyarrow_schema(
self._pa_schema(),
options={'variant.shreddingSchema': shredding_json},
)
self.catalog.create_table('default.shredded_null', schema, False)
table = self.catalog.get_table('default.shredded_null')
gvs = [GenericVariant.from_python({'x': 7}), None]
data = pa.table(
{'id': [1, 2], 'payload': GenericVariant.to_arrow_array(gvs)},
schema=self._pa_schema(),
)
result = self._write_and_read(table, data)
payload_col = result.column('payload')
gv = GenericVariant.from_arrow_struct(payload_col[0].as_py())
self.assertEqual(gv.to_python(), {'x': 7})
self.assertIsNone(payload_col[1].as_py())
if __name__ == '__main__':
unittest.main()