blob: c9147b823fc1d04ff7674b48f9e8931470eb000b [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Schema-evolution read tests for nested (struct/array/map) types.
Two layers are covered:
* Whole-column evolution of a top-level struct/array/map column
(add / drop / rename / projection) -- aligned by the column's field id.
* Sub-field evolution INSIDE a struct (add/rename/update-type/drop a nested
field via a dotted ``field_names`` path) -- this is NOT implemented:
``schema_manager`` only operates on the top-level ``field_names[-1]``.
``SchemaEvolutionNestedGapTest`` locks in the current behaviour with
explicit assertions so the gap is documented and any future fix is
noticed.
"""
import os
import shutil
import tempfile
import unittest
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.schema.data_types import AtomicType, PyarrowFieldParser
from pypaimon.schema.schema_change import SchemaChange
def _paimon_type(pa_type, nullable=True):
return PyarrowFieldParser.to_paimon_type(pa_type, nullable)
_MV_PA = pa.struct([('latest_version', pa.int64()), ('latest_value', pa.string())])
class _NestedBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tempdir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
cls.catalog.create_database('default', True)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)
def _create(self, name, pa_schema, primary_keys=None, bucket='-1'):
options = {'bucket': bucket}
schema = Schema.from_pyarrow_schema(
pa_schema,
primary_keys=list(primary_keys) if primary_keys else None,
options=options,
)
full = 'default.{}'.format(name)
self.catalog.create_table(full, schema, False)
return self.catalog.get_table(full)
def _write(self, table, pa_table):
wb = table.new_batch_write_builder()
w = wb.new_write()
c = wb.new_commit()
try:
w.write_arrow(pa_table)
c.commit(w.prepare_commit())
finally:
w.close()
c.close()
def _read_sorted(self, table, key='id', projection=None):
rb = table.new_read_builder()
if projection is not None:
rb = rb.with_projection(projection)
splits = rb.new_scan().plan().splits()
rows = rb.new_read().to_arrow(splits).to_pylist() if splits else []
return sorted(rows, key=lambda r: r[key])
class SchemaEvolutionNestedReadTest(_NestedBase):
"""Top-level struct/array/map column evolution (works)."""
# -- C1: add a new struct top-level column ---------------------------
def test_add_struct_column(self):
s0 = pa.schema([('id', pa.int64()), ('val', pa.string())])
table = self._create('nested_add_struct', s0)
self._write(table, pa.Table.from_pydict(
{'id': [1, 2], 'val': ['x', 'y']}, schema=s0))
self.catalog.alter_table(
'default.nested_add_struct',
[SchemaChange.add_column('mv', _paimon_type(_MV_PA))], False)
table = self.catalog.get_table('default.nested_add_struct')
s1 = pa.schema([('id', pa.int64()), ('val', pa.string()), ('mv', _MV_PA)])
self._write(table, pa.Table.from_pylist(
[{'id': 3, 'val': 'z',
'mv': {'latest_version': 300, 'latest_value': 'c'}}], schema=s1))
self.assertEqual(self._read_sorted(table), [
{'id': 1, 'val': 'x', 'mv': None},
{'id': 2, 'val': 'y', 'mv': None},
{'id': 3, 'val': 'z',
'mv': {'latest_version': 300, 'latest_value': 'c'}}])
# -- C2: drop a struct top-level column ------------------------------
def test_drop_struct_column(self):
s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', pa.string())])
table = self._create('nested_drop_struct', s0)
self._write(table, pa.Table.from_pylist([
{'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'},
{'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'}, 'val': 'y'},
], schema=s0))
self.catalog.alter_table(
'default.nested_drop_struct', [SchemaChange.drop_column('mv')], False)
table = self.catalog.get_table('default.nested_drop_struct')
s1 = pa.schema([('id', pa.int64()), ('val', pa.string())])
self._write(table, pa.Table.from_pydict(
{'id': [3], 'val': ['z']}, schema=s1))
self.assertEqual(self._read_sorted(table), [
{'id': 1, 'val': 'x'}, {'id': 2, 'val': 'y'}, {'id': 3, 'val': 'z'}])
# -- C4: nested leaf projection under evolution ----------------------
def test_nested_projection_after_add_column(self):
s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', pa.string())])
table = self._create('nested_proj_evo', s0)
self._write(table, pa.Table.from_pylist([
{'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'},
], schema=s0))
self.catalog.alter_table(
'default.nested_proj_evo',
[SchemaChange.add_column('extra', AtomicType('STRING'))], False)
table = self.catalog.get_table('default.nested_proj_evo')
s1 = pa.schema([('id', pa.int64()), ('mv', _MV_PA),
('val', pa.string()), ('extra', pa.string())])
self._write(table, pa.Table.from_pylist([
{'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'},
'val': 'y', 'extra': 'E'},
], schema=s1))
rows = self._read_sorted(
table, projection=['id', 'mv.latest_version', 'extra'])
self.assertEqual(rows, [
{'id': 1, 'mv_latest_version': 100, 'extra': None},
{'id': 2, 'mv_latest_version': 200, 'extra': 'E'}])
# -- C5: PK table struct column + add column merge read --------------
def test_pk_struct_add_column_merge(self):
s0 = pa.schema([pa.field('id', pa.int64(), nullable=False),
('mv', _MV_PA)])
table = self._create('nested_pk_struct', s0,
primary_keys=['id'], bucket='1')
rows0 = [{'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}}]
self._write(table, pa.Table.from_pylist(rows0, schema=s0))
self.catalog.alter_table(
'default.nested_pk_struct',
[SchemaChange.add_column('w', AtomicType('STRING'))], False)
table = self.catalog.get_table('default.nested_pk_struct')
s1 = pa.schema([pa.field('id', pa.int64(), nullable=False),
('mv', _MV_PA), ('w', pa.string())])
self._write(table, pa.Table.from_pylist(
[{'id': 1, 'mv': {'latest_version': 101, 'latest_value': 'a2'},
'w': 'W'}], schema=s1))
self.assertEqual(self._read_sorted(table), [
{'id': 1, 'mv': {'latest_version': 101, 'latest_value': 'a2'},
'w': 'W'}])
# -- C6: array / map top-level column add ----------------------------
def test_add_array_and_map_columns(self):
s0 = pa.schema([('id', pa.int64()), ('val', pa.string())])
table = self._create('nested_add_arr_map', s0)
self._write(table, pa.Table.from_pydict(
{'id': [1], 'val': ['x']}, schema=s0))
arr_pa = pa.list_(pa.int64())
map_pa = pa.map_(pa.string(), pa.int64())
self.catalog.alter_table(
'default.nested_add_arr_map', [
SchemaChange.add_column('arr', _paimon_type(arr_pa)),
SchemaChange.add_column('m', _paimon_type(map_pa)),
], False)
table = self.catalog.get_table('default.nested_add_arr_map')
s1 = pa.schema([('id', pa.int64()), ('val', pa.string()),
('arr', arr_pa), ('m', map_pa)])
self._write(table, pa.Table.from_pylist(
[{'id': 2, 'val': 'y', 'arr': [1, 2, 3],
'm': [('k', 7)]}], schema=s1))
rows = self._read_sorted(table)
self.assertEqual(rows[0], {'id': 1, 'val': 'x', 'arr': None, 'm': None})
self.assertEqual(rows[1]['arr'], [1, 2, 3])
self.assertEqual(dict(rows[1]['m']), {'k': 7})
# -- C3: rename a struct top-level column ----------------------------
def test_rename_struct_column(self):
# Renaming the top-level struct column keeps its field id, so old rows
# read their original struct back under the new name.
s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', pa.string())])
table = self._create('nested_rename_struct', s0)
self._write(table, pa.Table.from_pylist([
{'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'},
], schema=s0))
self.catalog.alter_table(
'default.nested_rename_struct',
[SchemaChange.rename_column('mv', 'mv2')], False)
table = self.catalog.get_table('default.nested_rename_struct')
s1 = pa.schema([('id', pa.int64()), ('mv2', _MV_PA), ('val', pa.string())])
self._write(table, pa.Table.from_pylist([
{'id': 2, 'mv2': {'latest_version': 200, 'latest_value': 'b'}, 'val': 'y'},
], schema=s1))
self.assertEqual(self._read_sorted(table), [
{'id': 1, 'mv2': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'},
{'id': 2, 'mv2': {'latest_version': 200, 'latest_value': 'b'}, 'val': 'y'}])
class SchemaEvolutionNestedGapTest(_NestedBase):
"""Sub-field-level evolution inside a struct is NOT implemented.
schema_manager handles only the top-level ``field_names[-1]``; a dotted
path like ``['mv', 'latest_value']`` never recurses into the RowType.
These tests assert the current behaviour (silent top-level mutation, or
ColumnNotExistException) so the gap is documented.
"""
def _create_struct_table(self, name):
s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', pa.string())])
table = self._create(name, s0)
self._write(table, pa.Table.from_pylist([
{'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'},
], schema=s0))
return table
def _mv_subfield_names(self, table_name):
schema = self.catalog.get_table(
'default.{}'.format(table_name)).table_schema
mv = next(f for f in schema.fields if f.name == 'mv')
return [sf.name for sf in mv.type.fields]
def _top_level_names(self, table_name):
schema = self.catalog.get_table(
'default.{}'.format(table_name)).table_schema
return [f.name for f in schema.fields]
# -- C7: add nested sub-field -> silently adds a TOP-LEVEL column ----
def test_nested_add_subfield_mutates_top_level(self):
# GAP: add_column(['mv','new_inner']) does NOT add new_inner inside
# mv; it silently appends a top-level column 'new_inner' instead.
self._create_struct_table('gap_add')
self.catalog.alter_table(
'default.gap_add',
[SchemaChange.add_column(['mv', 'new_inner'], AtomicType('INT'))],
False)
# mv's sub-fields are unchanged; a stray top-level column appeared.
self.assertEqual(self._mv_subfield_names('gap_add'),
['latest_version', 'latest_value'])
self.assertIn('new_inner', self._top_level_names('gap_add'))
# -- C8/C9/C10: rename / update-type / drop nested sub-field ---------
def test_nested_rename_subfield_raises(self):
# GAP: field_names[-1]='latest_value' is looked up at the TOP level,
# where it does not exist -> ColumnNotExistException (wrapped).
self._create_struct_table('gap_rename')
with self.assertRaises(RuntimeError) as cm:
self.catalog.alter_table(
'default.gap_rename',
[SchemaChange.rename_column(['mv', 'latest_value'], 'lv')], False)
self.assertIn('latest_value', str(cm.exception))
def test_nested_update_subfield_type_raises(self):
self._create_struct_table('gap_update')
with self.assertRaises(RuntimeError) as cm:
self.catalog.alter_table(
'default.gap_update',
[SchemaChange.update_column_type(
['mv', 'latest_version'], AtomicType('BIGINT'))], False)
self.assertIn('latest_version', str(cm.exception))
def test_nested_drop_subfield_raises(self):
self._create_struct_table('gap_drop')
with self.assertRaises(RuntimeError) as cm:
self.catalog.alter_table(
'default.gap_drop',
[SchemaChange.drop_column(['mv', 'latest_value'])], False)
self.assertIn('latest_value', str(cm.exception))
if __name__ == '__main__':
unittest.main()