blob: b30560c400af27e4719999850f46579b91f4f243 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Data evolution tests covering parquet + blob + vector (vortex) formats.
Each test writes data using different file format combinations and reads it
back, verifying correctness of the data evolution merge path across formats.
"""
import os
import shutil
import sys
import tempfile
import unittest
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
class DataEvolutionFormatsTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tempdir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
cls.catalog.create_database('default', False)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)
@staticmethod
def _file_path(file_meta):
return file_meta.external_path if file_meta.external_path else file_meta.file_path
# ------------------------------------------------------------------
# Parquet-format data evolution
# ------------------------------------------------------------------
def test_parquet_column_subset_write_and_merge_read(self):
"""Write disjoint column subsets as parquet, merge-read via data evolution."""
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('score', pa.float64()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'parquet',
})
self.catalog.create_table('default.fmt_parquet_subset', schema, False)
table = self.catalog.get_table('default.fmt_parquet_subset')
wb = table.new_batch_write_builder()
# commit 1: write id + name
w0 = wb.new_write().with_write_type(['id', 'name'])
w1 = wb.new_write().with_write_type(['score'])
c = wb.new_commit()
w0.write_arrow(pa.Table.from_pydict(
{'id': [1, 2, 3], 'name': ['a', 'b', 'c']},
schema=pa.schema([('id', pa.int32()), ('name', pa.string())])))
w1.write_arrow(pa.Table.from_pydict(
{'score': [1.1, 2.2, 3.3]},
schema=pa.schema([('score', pa.float64())])))
cmts = w0.prepare_commit() + w1.prepare_commit()
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 0
c.commit(cmts)
w0.close()
w1.close()
c.close()
# verify file format
all_files = [nf for m in cmts for nf in m.new_files]
for f in all_files:
self.assertTrue(f.file_name.endswith('.parquet'),
f"Expected parquet file, got {f.file_name}")
# read back
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
expect = pa.Table.from_pydict(
{'id': [1, 2, 3], 'name': ['a', 'b', 'c'], 'score': [1.1, 2.2, 3.3]},
schema=pa_schema)
self.assertEqual(actual, expect)
def test_parquet_overwrite_column(self):
"""Write all columns, then overwrite one column via a second commit."""
pa_schema = pa.schema([
('k', pa.int64()),
('v', pa.string()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'parquet',
})
self.catalog.create_table('default.fmt_parquet_overwrite', schema, False)
table = self.catalog.get_table('default.fmt_parquet_overwrite')
wb = table.new_batch_write_builder()
# commit 1: full row
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'k': [10, 20], 'v': ['old1', 'old2']}, schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
# commit 2: overwrite v only (first_row_id=0)
tw = wb.new_write().with_write_type(['v'])
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'v': ['new1', 'new2']}, schema=pa.schema([('v', pa.string())])))
cmts = tw.prepare_commit()
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 0
tc.commit(cmts)
tw.close()
tc.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
expect = pa.Table.from_pydict(
{'k': [10, 20], 'v': ['new1', 'new2']}, schema=pa_schema)
self.assertEqual(actual, expect)
def test_parquet_append_new_rows(self):
"""Append new rows (new first_row_id) with column subsets, merge-read all."""
pa_schema = pa.schema([
('a', pa.int32()),
('b', pa.string()),
('c', pa.float32()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'parquet',
})
self.catalog.create_table('default.fmt_parquet_append', schema, False)
table = self.catalog.get_table('default.fmt_parquet_append')
wb = table.new_batch_write_builder()
# commit 1: 2 full rows
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'a': [1, 2], 'b': ['x', 'y'], 'c': [0.1, 0.2]}, schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
# commit 2: append 2 new rows with column subsets, first_row_id=2
w_ab = wb.new_write().with_write_type(['a', 'b'])
w_c = wb.new_write().with_write_type(['c'])
tc = wb.new_commit()
w_ab.write_arrow(pa.Table.from_pydict(
{'a': [3, 4], 'b': ['z', 'w']},
schema=pa.schema([('a', pa.int32()), ('b', pa.string())])))
w_c.write_arrow(pa.Table.from_pydict(
{'c': [0.3, 0.4]},
schema=pa.schema([('c', pa.float32())])))
cmts = w_ab.prepare_commit() + w_c.prepare_commit()
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 2
tc.commit(cmts)
w_ab.close()
w_c.close()
tc.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 4)
expect = pa.Table.from_pydict(
{'a': [1, 2, 3, 4], 'b': ['x', 'y', 'z', 'w'],
'c': [0.1, 0.2, 0.3, 0.4]},
schema=pa_schema)
self.assertEqual(actual, expect)
# ------------------------------------------------------------------
# Blob-format data evolution
# ------------------------------------------------------------------
def test_blob_write_and_read(self):
"""Write a table with normal + blob columns, read back and verify."""
pa_schema = pa.schema([
('id', pa.int32()),
('payload', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
self.catalog.create_table('default.fmt_blob_basic', schema, False)
table = self.catalog.get_table('default.fmt_blob_basic')
wb = table.new_batch_write_builder()
blobs = [b'hello world', b'\x00\x01\x02\xff', b'paimon blob']
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'id': [1, 2, 3], 'payload': blobs}, schema=pa_schema))
cmts = tw.prepare_commit()
tc.commit(cmts)
tw.close()
tc.close()
# verify we produced both parquet and blob files
all_files = [nf for m in cmts for nf in m.new_files]
parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')]
blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
self.assertGreater(len(parquet_files), 0)
self.assertGreater(len(blob_files), 0)
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 3)
self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
self.assertEqual(actual.column('payload').to_pylist(), blobs)
def test_blob_abort_deletes_uncommitted_files(self):
pa_schema = pa.schema([
('id', pa.int32()),
('payload', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
self.catalog.create_table('default.fmt_blob_abort_cleanup', schema, False)
table = self.catalog.get_table('default.fmt_blob_abort_cleanup')
writer = table.new_batch_write_builder().new_write()
writer.write_arrow(pa.Table.from_pydict({
'id': [1, 2, 3],
'payload': [b'a', b'b', b'c'],
}, schema=pa_schema))
commit_messages = writer.prepare_commit()
all_files = [nf for msg in commit_messages for nf in msg.new_files]
parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')]
blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
self.assertGreater(len(parquet_files), 0)
self.assertGreater(len(blob_files), 0)
for file_meta in all_files:
self.assertTrue(table.file_io.exists(self._file_path(file_meta)))
writer.abort()
for file_meta in all_files:
self.assertFalse(
table.file_io.exists(self._file_path(file_meta)),
f"Expected abort to delete {file_meta.file_name}",
)
def test_blob_column_subset_evolution(self):
"""Write normal+blob cols in one commit, overwrite normal col in another, merge-read."""
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('doc', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
self.catalog.create_table('default.fmt_blob_evolution', schema, False)
table = self.catalog.get_table('default.fmt_blob_evolution')
wb = table.new_batch_write_builder()
# commit 1: write id + doc (normal + blob together)
tw = wb.new_write().with_write_type(['id', 'doc'])
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'id': [1, 2], 'doc': [b'doc_alice', b'doc_bob']},
schema=pa.schema([('id', pa.int32()), ('doc', pa.large_binary())])))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
# commit 2: write name for the same rows (first_row_id=0)
tw = wb.new_write().with_write_type(['name'])
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'name': ['Alice', 'Bob']},
schema=pa.schema([('name', pa.string())])))
cmts = tw.prepare_commit()
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 0
tc.commit(cmts)
tw.close()
tc.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 2)
self.assertEqual(actual.column('id').to_pylist(), [1, 2])
self.assertEqual(actual.column('name').to_pylist(), ['Alice', 'Bob'])
self.assertEqual(actual.column('doc').to_pylist(), [b'doc_alice', b'doc_bob'])
def test_blob_append_with_subset_evolution(self):
"""Write normal+blob subset in first commit, add remaining col via evolution."""
pa_schema = pa.schema([
('id', pa.int32()),
('tag', pa.string()),
('picture', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
self.catalog.create_table('default.fmt_blob_append_evo', schema, False)
table = self.catalog.get_table('default.fmt_blob_append_evo')
wb = table.new_batch_write_builder()
# commit 1: id + picture (normal + blob)
tw = wb.new_write().with_write_type(['id', 'picture'])
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'id': [1, 2], 'picture': [b'pic1', b'pic2']},
schema=pa.schema([('id', pa.int32()), ('picture', pa.large_binary())])))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
# commit 2: add tag for the same rows
tw = wb.new_write().with_write_type(['tag'])
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'tag': ['t1', 't2']},
schema=pa.schema([('tag', pa.string())])))
cmts = tw.prepare_commit()
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 0
tc.commit(cmts)
tw.close()
tc.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 2)
self.assertEqual(actual.column('id').to_pylist(), [1, 2])
self.assertEqual(actual.column('tag').to_pylist(), ['t1', 't2'])
self.assertEqual(actual.column('picture').to_pylist(), [b'pic1', b'pic2'])
def test_blob_multiple_blob_columns(self):
"""Table with two blob columns, write and read both."""
pa_schema = pa.schema([
('id', pa.int32()),
('audio', pa.large_binary()),
('video', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
self.catalog.create_table('default.fmt_blob_multi', schema, False)
table = self.catalog.get_table('default.fmt_blob_multi')
wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict({
'id': [1, 2],
'audio': [b'audio_1', b'audio_2'],
'video': [b'video_1', b'video_2'],
}, schema=pa_schema))
cmts = tw.prepare_commit()
tc.commit(cmts)
tw.close()
tc.close()
# verify blob files were produced
all_files = [nf for m in cmts for nf in m.new_files]
blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
self.assertGreaterEqual(len(blob_files), 2)
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 2)
self.assertEqual(actual.column('audio').to_pylist(), [b'audio_1', b'audio_2'])
self.assertEqual(actual.column('video').to_pylist(), [b'video_1', b'video_2'])
# ------------------------------------------------------------------
# Vortex-format data evolution
# ------------------------------------------------------------------
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
@unittest.skipUnless(
__import__('importlib').util.find_spec('vortex') is not None,
"vortex not installed")
def test_vortex_column_subset_write_and_merge_read(self):
"""Write disjoint column subsets as vortex, merge-read via data evolution."""
pa_schema = pa.schema([
('id', pa.int32()),
('tag', pa.string()),
('val', pa.float64()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'vortex',
})
self.catalog.create_table('default.fmt_vortex_subset', schema, False)
table = self.catalog.get_table('default.fmt_vortex_subset')
wb = table.new_batch_write_builder()
w0 = wb.new_write().with_write_type(['id', 'tag'])
w1 = wb.new_write().with_write_type(['val'])
c = wb.new_commit()
w0.write_arrow(pa.Table.from_pydict(
{'id': [10, 20, 30], 'tag': ['p', 'q', 'r']},
schema=pa.schema([('id', pa.int32()), ('tag', pa.string())])))
w1.write_arrow(pa.Table.from_pydict(
{'val': [1.5, 2.5, 3.5]},
schema=pa.schema([('val', pa.float64())])))
cmts = w0.prepare_commit() + w1.prepare_commit()
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 0
c.commit(cmts)
w0.close()
w1.close()
c.close()
# verify vortex files
all_files = [nf for m in cmts for nf in m.new_files]
for f in all_files:
self.assertTrue(f.file_name.endswith('.vortex'),
f"Expected vortex file, got {f.file_name}")
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
expect = pa.Table.from_pydict(
{'id': [10, 20, 30], 'tag': ['p', 'q', 'r'], 'val': [1.5, 2.5, 3.5]},
schema=pa_schema)
self.assertEqual(actual, expect)
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
@unittest.skipUnless(
__import__('importlib').util.find_spec('vortex') is not None,
"vortex not installed")
def test_vortex_overwrite_column(self):
"""Full row write then overwrite one column, all in vortex format."""
pa_schema = pa.schema([
('k', pa.int64()),
('v', pa.string()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'vortex',
})
self.catalog.create_table('default.fmt_vortex_overwrite', schema, False)
table = self.catalog.get_table('default.fmt_vortex_overwrite')
wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'k': [100, 200], 'v': ['old', 'old']}, schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
tw = wb.new_write().with_write_type(['v'])
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'v': ['new', 'new']}, schema=pa.schema([('v', pa.string())])))
cmts = tw.prepare_commit()
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 0
tc.commit(cmts)
tw.close()
tc.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual, pa.Table.from_pydict(
{'k': [100, 200], 'v': ['new', 'new']}, schema=pa_schema))
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
@unittest.skipUnless(
__import__('importlib').util.find_spec('vortex') is not None,
"vortex not installed")
def test_vortex_append_new_rows(self):
"""Append new rows with column subsets in vortex format."""
pa_schema = pa.schema([
('x', pa.int32()),
('y', pa.string()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'vortex',
})
self.catalog.create_table('default.fmt_vortex_append', schema, False)
table = self.catalog.get_table('default.fmt_vortex_append')
wb = table.new_batch_write_builder()
# commit 1
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'x': [1, 2], 'y': ['a', 'b']}, schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
# commit 2: append with subsets, first_row_id=2
w_x = wb.new_write().with_write_type(['x'])
w_y = wb.new_write().with_write_type(['y'])
tc = wb.new_commit()
w_x.write_arrow(pa.Table.from_pydict(
{'x': [3]}, schema=pa.schema([('x', pa.int32())])))
w_y.write_arrow(pa.Table.from_pydict(
{'y': ['c']}, schema=pa.schema([('y', pa.string())])))
cmts = w_x.prepare_commit() + w_y.prepare_commit()
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 2
tc.commit(cmts)
w_x.close()
w_y.close()
tc.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 3)
expect = pa.Table.from_pydict(
{'x': [1, 2, 3], 'y': ['a', 'b', 'c']}, schema=pa_schema)
self.assertEqual(actual, expect)
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
@unittest.skipUnless(
__import__('importlib').util.find_spec('vortex') is not None,
"vortex not installed")
def test_vortex_with_row_id_and_filter(self):
"""Write vortex data, read with _ROW_ID projection and filter."""
pa_schema = pa.schema([
('id', pa.int32()),
('val', pa.string()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'vortex',
})
self.catalog.create_table('default.fmt_vortex_rowid_filter', schema, False)
table = self.catalog.get_table('default.fmt_vortex_rowid_filter')
wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'id': list(range(10)), 'val': [f'v{i}' for i in range(10)]},
schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
# full read
rb = table.new_read_builder()
full = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(full.num_rows, 10)
# filter by _ROW_ID
rb_rid = table.new_read_builder().with_projection(['id', 'val', '_ROW_ID'])
pb = rb_rid.new_predicate_builder()
rb_f = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 5))
actual = rb_f.new_read().to_arrow(rb_f.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 1)
self.assertEqual(actual.column('id')[0].as_py(), 5)
self.assertEqual(actual.column('val')[0].as_py(), 'v5')
# ------------------------------------------------------------------
# Vector (vortex) file format for embedding columns
# ------------------------------------------------------------------
def test_vector_abort_deletes_uncommitted_files(self):
pa_schema = pa.schema([
('id', pa.int64()),
('embed', pa.list_(pa.float32(), 3)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'vector.file.format': 'parquet',
})
self.catalog.create_table('default.fmt_vector_abort_cleanup', schema, False)
table = self.catalog.get_table('default.fmt_vector_abort_cleanup')
writer = table.new_batch_write_builder().new_write()
writer.write_arrow(pa.table({
'id': pa.array([1, 2, 3], type=pa.int64()),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.1, 0.2, 0.3,
0.4, 0.5, 0.6,
0.7, 0.8, 0.9], type=pa.float32()), 3),
}))
commit_messages = writer.prepare_commit()
all_files = [nf for msg in commit_messages for nf in msg.new_files]
normal_files = [f for f in all_files if not DataFileMeta.is_vector_file(f.file_name)]
vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)]
self.assertGreater(len(normal_files), 0)
self.assertGreater(len(vector_files), 0)
for file_meta in all_files:
self.assertTrue(table.file_io.exists(self._file_path(file_meta)))
writer.abort()
for file_meta in all_files:
self.assertFalse(
table.file_io.exists(self._file_path(file_meta)),
f"Expected abort to delete {file_meta.file_name}",
)
def test_vector_close_failure_after_prepare_raises(self):
from unittest.mock import patch
pa_schema = pa.schema([
('id', pa.int64()),
('embed', pa.list_(pa.float32(), 3)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'vector.file.format': 'parquet',
})
self.catalog.create_table('default.fmt_vector_close_failure', schema, False)
table = self.catalog.get_table('default.fmt_vector_close_failure')
writer = table.new_batch_write_builder().new_write()
writer.write_arrow(pa.table({
'id': pa.array([1, 2, 3], type=pa.int64()),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.1, 0.2, 0.3,
0.4, 0.5, 0.6,
0.7, 0.8, 0.9], type=pa.float32()), 3),
}))
commit_messages = writer.prepare_commit()
all_files = [nf for msg in commit_messages for nf in msg.new_files]
for file_meta in all_files:
self.assertTrue(table.file_io.exists(self._file_path(file_meta)))
data_writer = next(iter(writer.file_store_write.data_writers.values()))
with patch.object(
data_writer, '_close_current_writers',
side_effect=RuntimeError("Close error")):
with self.assertRaisesRegex(RuntimeError, "Close error"):
writer.close()
for file_meta in all_files:
self.assertFalse(
table.file_io.exists(self._file_path(file_meta)),
f"Expected abort to delete {file_meta.file_name}",
)
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
@unittest.skipUnless(
__import__('importlib').util.find_spec('vortex') is not None,
"vortex not installed")
def test_vector_vortex_write_and_read(self):
"""Write table with normal + vector columns using vortex vector format."""
pa_schema = pa.schema([
('id', pa.int64()),
('embed', pa.list_(pa.float32(), 4)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'vortex',
'vector.file.format': 'vortex',
})
self.catalog.create_table('default.fmt_vec_vortex', schema, False)
table = self.catalog.get_table('default.fmt_vec_vortex')
embeddings = [1.0, 0.0, 0.0, 0.0,
0.0, 1.0, 0.0, 0.0,
0.0, 0.0, 1.0, 0.0]
test_data = pa.table({
'id': pa.array([1, 2, 3], type=pa.int64()),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array(embeddings, type=pa.float32()), 4),
})
wb = table.new_batch_write_builder()
tw = wb.new_write()
tw.write_arrow(test_data)
cmts = tw.prepare_commit()
# should produce both normal and vector files
all_files = [nf for m in cmts for nf in m.new_files]
normal_files = [f for f in all_files if not DataFileMeta.is_vector_file(f.file_name)]
vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)]
self.assertGreater(len(normal_files), 0)
self.assertGreater(len(vector_files), 0)
for vf in vector_files:
self.assertIn('.vector.vortex', vf.file_name)
wb.new_commit().commit(cmts)
tw.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 3)
self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
embed_col = actual.column('embed')
self.assertEqual(embed_col[0].as_py(), [1.0, 0.0, 0.0, 0.0])
self.assertEqual(embed_col[1].as_py(), [0.0, 1.0, 0.0, 0.0])
self.assertEqual(embed_col[2].as_py(), [0.0, 0.0, 1.0, 0.0])
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
@unittest.skipUnless(
__import__('importlib').util.find_spec('vortex') is not None,
"vortex not installed")
def test_vector_vortex_multiple_appends(self):
"""Append multiple batches of normal+vector data and read all back."""
pa_schema = pa.schema([
('id', pa.int64()),
('label', pa.string()),
('embed', pa.list_(pa.float32(), 3)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'vortex',
'vector.file.format': 'vortex',
})
self.catalog.create_table('default.fmt_vec_vortex_append', schema, False)
table = self.catalog.get_table('default.fmt_vec_vortex_append')
wb = table.new_batch_write_builder()
# commit 1
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.table({
'id': pa.array([1, 2], type=pa.int64()),
'label': pa.array(['cat', 'dog']),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], type=pa.float32()), 3),
}))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
# commit 2: append
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.table({
'id': pa.array([3], type=pa.int64()),
'label': pa.array(['bird']),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.7, 0.8, 0.9], type=pa.float32()), 3),
}))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 3)
self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
self.assertEqual(actual.column('label').to_pylist(), ['cat', 'dog', 'bird'])
embed_col = actual.column('embed')
self.assertAlmostEqual(embed_col[0].as_py()[0], 0.1, places=5)
self.assertAlmostEqual(embed_col[2].as_py()[2], 0.9, places=5)
# ------------------------------------------------------------------
# Mixed formats: parquet + blob + vector in one table
# ------------------------------------------------------------------
def test_parquet_and_blob_mixed_append(self):
"""Table with normal parquet cols + blob col, append new rows."""
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('image', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
self.catalog.create_table('default.fmt_mixed_parquet_blob', schema, False)
table = self.catalog.get_table('default.fmt_mixed_parquet_blob')
wb = table.new_batch_write_builder()
# commit 1: first batch
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict({
'id': [1, 2],
'name': ['a', 'b'],
'image': [b'img1', b'img2'],
}, schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
# commit 2: append more rows
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict({
'id': [3, 4],
'name': ['c', 'd'],
'image': [b'img3', b'img4'],
}, schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 4)
self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3, 4])
self.assertEqual(actual.column('name').to_pylist(), ['a', 'b', 'c', 'd'])
self.assertEqual(actual.column('image').to_pylist(),
[b'img1', b'img2', b'img3', b'img4'])
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
@unittest.skipUnless(
__import__('importlib').util.find_spec('vortex') is not None,
"vortex not installed")
def test_vortex_and_vector_vortex_mixed(self):
"""Table with normal (vortex) + vector (vortex) columns, write and read.
Verifies that the writer produces separate .vortex and .vector.vortex files,
and the data evolution merge reader stitches them back together.
"""
pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('embed', pa.list_(pa.float32(), 3)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'vortex',
'vector.file.format': 'vortex',
})
self.catalog.create_table('default.fmt_vortex_vector', schema, False)
table = self.catalog.get_table('default.fmt_vortex_vector')
wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.table({
'id': pa.array([1, 2, 3], type=pa.int64()),
'name': pa.array(['cat', 'dog', 'bird']),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
type=pa.float32()), 3),
}))
cmts = tw.prepare_commit()
tc.commit(cmts)
tw.close()
tc.close()
# verify two file types: .vortex + .vector.vortex
all_files = [nf for m in cmts for nf in m.new_files]
normal_files = [f for f in all_files if not DataFileMeta.is_vector_file(f.file_name)]
vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)]
self.assertGreater(len(normal_files), 0, "should produce normal vortex files")
self.assertGreater(len(vector_files), 0, "should produce vector files")
for nf in normal_files:
self.assertTrue(nf.file_name.endswith('.vortex'))
for vf in vector_files:
self.assertIn('.vector.vortex', vf.file_name)
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 3)
self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
self.assertEqual(actual.column('name').to_pylist(), ['cat', 'dog', 'bird'])
embed = actual.column('embed')
self.assertAlmostEqual(embed[0].as_py()[0], 0.1, places=5)
self.assertAlmostEqual(embed[2].as_py()[2], 0.9, places=5)
def test_blob_and_vector_inline_mixed(self):
"""Table with normal + blob + vector(inline) columns, write and read.
When blob columns are present, vector columns are stored inline in the
parquet file (not as separate .vector files). This test verifies the
blob+inline-vector path works correctly.
"""
pa_schema = pa.schema([
('id', pa.int64()),
('doc', pa.large_binary()),
('embed', pa.list_(pa.float32(), 3)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
self.catalog.create_table('default.fmt_blob_vector_inline', schema, False)
table = self.catalog.get_table('default.fmt_blob_vector_inline')
wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.table({
'id': pa.array([1, 2], type=pa.int64()),
'doc': pa.array([b'doc1', b'doc2'], type=pa.large_binary()),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], type=pa.float32()), 3),
}))
cmts = tw.prepare_commit()
tc.commit(cmts)
tw.close()
tc.close()
# verify parquet + blob files
all_files = [nf for m in cmts for nf in m.new_files]
parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')]
blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
self.assertGreater(len(parquet_files), 0, "should produce parquet files")
self.assertGreater(len(blob_files), 0, "should produce blob files")
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 2)
self.assertEqual(actual.column('id').to_pylist(), [1, 2])
self.assertEqual(actual.column('doc').to_pylist(), [b'doc1', b'doc2'])
embed = actual.column('embed')
self.assertAlmostEqual(embed[0].as_py()[0], 0.1, places=5)
self.assertAlmostEqual(embed[1].as_py()[2], 0.6, places=5)
def test_blob_and_vector_with_vector_file_format(self):
"""Table with blob + vector columns and explicit vector.file.format.
DedicatedFormatWriter splits data three ways: normal columns to .parquet,
blob columns to .blob, and vector columns to .vector.<format> files.
"""
pa_schema = pa.schema([
('id', pa.int64()),
('doc', pa.large_binary()),
('embed', pa.list_(pa.float32(), 3)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'vector.file.format': 'parquet',
})
self.catalog.create_table('default.fmt_blob_vec_format', schema, False)
table = self.catalog.get_table('default.fmt_blob_vec_format')
wb = table.new_batch_write_builder()
# commit 1: write all columns
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.table({
'id': pa.array([1, 2, 3], type=pa.int64()),
'doc': pa.array([b'aaa', b'bbb', b'ccc'], type=pa.large_binary()),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([1.0, 0.0, 0.0,
0.0, 1.0, 0.0,
0.0, 0.0, 1.0], type=pa.float32()), 3),
}))
cmts = tw.prepare_commit()
tc.commit(cmts)
tw.close()
tc.close()
# DedicatedFormatWriter produces parquet + blob + vector files
all_files = [nf for m in cmts for nf in m.new_files]
parquet_files = [f for f in all_files
if f.file_name.endswith('.parquet')
and not DataFileMeta.is_vector_file(f.file_name)]
blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)]
self.assertGreater(len(parquet_files), 0, "should produce normal parquet files")
self.assertGreater(len(blob_files), 0, "should produce blob files")
self.assertGreater(len(vector_files), 0, "should produce vector files")
for vf in vector_files:
self.assertIn('.vector.parquet', vf.file_name)
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 3)
self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
self.assertEqual(actual.column('doc').to_pylist(), [b'aaa', b'bbb', b'ccc'])
self.assertEqual(actual.column('embed')[0].as_py(), [1.0, 0.0, 0.0])
self.assertEqual(actual.column('embed')[1].as_py(), [0.0, 1.0, 0.0])
self.assertEqual(actual.column('embed')[2].as_py(), [0.0, 0.0, 1.0])
# commit 2: append more rows
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.table({
'id': pa.array([4, 5], type=pa.int64()),
'doc': pa.array([b'ddd', b'eee'], type=pa.large_binary()),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.5, 0.5, 0.0,
0.0, 0.5, 0.5], type=pa.float32()), 3),
}))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
rb = table.new_read_builder()
actual2 = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual2.num_rows, 5)
self.assertEqual(actual2.column('id').to_pylist(), [1, 2, 3, 4, 5])
self.assertEqual(actual2.column('doc').to_pylist(),
[b'aaa', b'bbb', b'ccc', b'ddd', b'eee'])
def test_blob_vector_partial_write_vector_only(self):
"""Blob+vector table with with_write_type(['embed']) — vector-only partial write.
When normal_column_names is empty, the writer must still flush vector
metadata without crashing on an empty normal data path.
"""
pa_schema = pa.schema([
('id', pa.int64()),
('doc', pa.large_binary()),
('embed', pa.list_(pa.float32(), 3)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'vector.file.format': 'parquet',
})
self.catalog.create_table('default.fmt_blob_vec_partial', schema, False)
table = self.catalog.get_table('default.fmt_blob_vec_partial')
wb = table.new_batch_write_builder()
# commit 1: write all columns
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.table({
'id': pa.array([1, 2, 3], type=pa.int64()),
'doc': pa.array([b'aaa', b'bbb', b'ccc'], type=pa.large_binary()),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([1.0, 0.0, 0.0,
0.0, 1.0, 0.0,
0.0, 0.0, 1.0], type=pa.float32()), 3),
}))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
# commit 2: write only vector column — no normal columns
tw = wb.new_write().with_write_type(['embed'])
tc = wb.new_commit()
tw.write_arrow(pa.table({
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.5, 0.5, 0.0,
0.0, 0.5, 0.5,
0.5, 0.0, 0.5], type=pa.float32()), 3),
}))
cmts = tw.prepare_commit()
# should produce only vector files, no normal or blob files
all_files = [nf for m in cmts for nf in m.new_files]
self.assertGreater(len(all_files), 0, "should produce vector files")
for f in all_files:
self.assertTrue(DataFileMeta.is_vector_file(f.file_name),
f"Expected vector file, got {f.file_name}")
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 0
tc.commit(cmts)
tw.close()
tc.close()
# read back and verify the vector column was updated
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 3)
self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
self.assertEqual(actual.column('doc').to_pylist(), [b'aaa', b'bbb', b'ccc'])
embed = actual.column('embed')
self.assertEqual(embed[0].as_py(), [0.5, 0.5, 0.0])
self.assertEqual(embed[1].as_py(), [0.0, 0.5, 0.5])
self.assertEqual(embed[2].as_py(), [0.5, 0.0, 0.5])
# ------------------------------------------------------------------
# Projection and _ROW_ID across formats
# ------------------------------------------------------------------
def test_blob_with_row_id_projection(self):
"""Read blob table with _ROW_ID projection."""
pa_schema = pa.schema([
('id', pa.int32()),
('data', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
self.catalog.create_table('default.fmt_blob_rowid', schema, False)
table = self.catalog.get_table('default.fmt_blob_rowid')
wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'id': [10, 20], 'data': [b'aa', b'bb']}, schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()
rb = table.new_read_builder()
rb.with_projection(['id', 'data', '_ROW_ID'])
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 2)
self.assertEqual(actual.column('_ROW_ID').to_pylist(), [0, 1])
self.assertEqual(actual.column('id').to_pylist(), [10, 20])
self.assertEqual(actual.column('data').to_pylist(), [b'aa', b'bb'])
def test_parquet_large_data_evolution(self):
"""Larger dataset: 1000 rows, column-subset write+merge."""
pa_schema = pa.schema([
('id', pa.int32()),
('col_a', pa.string()),
('col_b', pa.float64()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'parquet',
})
self.catalog.create_table('default.fmt_parquet_large', schema, False)
table = self.catalog.get_table('default.fmt_parquet_large')
wb = table.new_batch_write_builder()
n = 1000
w0 = wb.new_write().with_write_type(['id', 'col_a'])
w1 = wb.new_write().with_write_type(['col_b'])
c = wb.new_commit()
w0.write_arrow(pa.Table.from_pydict(
{'id': list(range(n)), 'col_a': [f's{i}' for i in range(n)]},
schema=pa.schema([('id', pa.int32()), ('col_a', pa.string())])))
w1.write_arrow(pa.Table.from_pydict(
{'col_b': [float(i) for i in range(n)]},
schema=pa.schema([('col_b', pa.float64())])))
cmts = w0.prepare_commit() + w1.prepare_commit()
for m in cmts:
for nf in m.new_files:
nf.first_row_id = 0
c.commit(cmts)
w0.close()
w1.close()
c.close()
rb = table.new_read_builder()
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, n)
self.assertEqual(actual.column('id').to_pylist(), list(range(n)))
self.assertEqual(actual.column('col_a').to_pylist(), [f's{i}' for i in range(n)])
self.assertEqual(actual.column('col_b').to_pylist(), [float(i) for i in range(n)])
if __name__ == '__main__':
unittest.main()