blob: 99dc16d69f7df7af1be42ec8cccfb9e032345652 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import shutil
import tempfile
import unittest
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.schema.schema_manager import SchemaManager
class SimpleTableTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tempdir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
cls.catalog = CatalogFactory.create({
'warehouse': cls.warehouse
})
cls.catalog.create_database('default', True)
cls.pa_schema = pa.schema([
('pt', pa.int32()),
('k', pa.int32()),
('v', pa.int64())
])
cls.pk_pa_schema = pa.schema([
pa.field('pt', pa.int32(), nullable=False),
pa.field('k', pa.int32(), nullable=False),
('v', pa.int64())
])
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)
def test_tag_scan(self):
"""
Test reading from a specific tag.
1. Write data in 3 commits
2. Create a tag at snapshot 2
3. Read from the tag and verify only data from snapshots 1 and 2 is returned
"""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_tag_scan', schema, False)
table = self.catalog.get_table('default.test_tag_scan')
write_builder = table.new_batch_write_builder()
# First commit: pt=1, k=10, v=100 and pt=1, k=20, v=200
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = pa.Table.from_pydict({
'pt': [1, 1],
'k': [10, 20],
'v': [100, 200]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data1)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Second commit: pt=2, k=30, v=101 and pt=2, k=40, v=201
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data2 = pa.Table.from_pydict({
'pt': [2, 2],
'k': [30, 40],
'v': [101, 201]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data2)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Third commit: pt=3, k=50, v=500 and pt=3, k=60, v=600
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data3 = pa.Table.from_pydict({
'pt': [3, 3],
'k': [50, 60],
'v': [500, 600]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data3)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Create tag at snapshot 2
table.create_tag("tag2", snapshot_id=2)
# Read from tag2 using scan.tag-name option
table_with_tag = table.copy({CoreOptions.SCAN_TAG_NAME.key(): "tag2"})
read_builder = table_with_tag.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
result = table_read.to_arrow(table_scan.plan().splits())
# Verify: should only contain data from snapshot 1 and 2
# (pt=1, k=10, v=100), (pt=1, k=20, v=200), (pt=2, k=30, v=101), (pt=2, k=40, v=201)
result_sorted = result.sort_by([('pt', 'ascending'), ('k', 'ascending')])
expected = pa.Table.from_pydict({
'pt': [1, 1, 2, 2],
'k': [10, 20, 30, 40],
'v': [100, 200, 101, 201]
}, schema=self.pk_pa_schema)
self.assertEqual(result_sorted.num_rows, 4)
self.assertEqual(result_sorted.column('pt').to_pylist(), expected.column('pt').to_pylist())
self.assertEqual(result_sorted.column('k').to_pylist(), expected.column('k').to_pylist())
self.assertEqual(result_sorted.column('v').to_pylist(), expected.column('v').to_pylist())
def test_non_existing_tag(self):
"""
Test that reading from a non-existing tag raises an error.
"""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_non_existing_tag', schema, False)
table = self.catalog.get_table('default.test_non_existing_tag')
# Try to read from a non-existing tag
table_with_tag = table.copy({CoreOptions.SCAN_TAG_NAME.key(): "non-existing"})
read_builder = table_with_tag.new_read_builder()
table_scan = read_builder.new_scan()
with self.assertRaises(ValueError) as context:
table_scan.plan()
self.assertIn("non-existing", str(context.exception))
self.assertIn("doesn't exist", str(context.exception))
def test_tag_create_and_delete(self):
"""Test creating and deleting tags."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_tag_create_delete', schema, False)
table = self.catalog.get_table('default.test_tag_create_delete')
write_builder = table.new_batch_write_builder()
# Write some data
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1, 1],
'k': [10, 20],
'v': [100, 200]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Create a tag
table.create_tag("test_tag")
# Verify tag exists
tag_manager = table.tag_manager()
self.assertTrue(tag_manager.tag_exists("test_tag"))
# Get the tag
tag = tag_manager.get("test_tag")
self.assertIsNotNone(tag)
self.assertEqual(tag.id, 1)
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1, 1],
'k': [10, 20],
'v': [100, 200]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
table.create_tag("test_tag_2")
all_tags = set(table.list_tags())
self.assertEqual(all_tags, {"test_tag", "test_tag_2"})
# Delete the tag
result = table.delete_tag("test_tag")
self.assertTrue(result)
all_tags = table.list_tags()
self.assertEqual(all_tags, ["test_tag_2"])
# Verify tag no longer exists
self.assertFalse(tag_manager.tag_exists("test_tag"))
def test_tag_ignore_if_exists(self):
"""Test creating a tag with ignore_if_exists=True."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_tag_ignore_exists', schema, False)
table = self.catalog.get_table('default.test_tag_ignore_exists')
write_builder = table.new_batch_write_builder()
# Write some data
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1],
'k': [10],
'v': [100]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Create a tag
table.create_tag("duplicate_tag")
# Try to create the same tag again without ignore_if_exists - should raise error
with self.assertRaises(ValueError) as context:
table.create_tag("duplicate_tag")
self.assertIn("already exists", str(context.exception))
# Create the same tag with ignore_if_exists=True - should not raise error
table.create_tag("duplicate_tag", ignore_if_exists=True)
def test_tag_rename(self):
"""Test renaming a tag."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_tag_rename', schema, False)
table = self.catalog.get_table('default.test_tag_rename')
write_builder = table.new_batch_write_builder()
# Write some data - snapshot 1
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1],
'k': [10],
'v': [100]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Create a tag
table.create_tag("old_tag")
# Rename the tag
table.rename_tag("old_tag", "new_tag")
# Verify old tag no longer exists
tags = table.list_tags()
self.assertNotIn("old_tag", tags)
self.assertIn("new_tag", tags)
# Verify tag content is preserved
tag_manager = table.tag_manager()
tag = tag_manager.get("new_tag")
self.assertIsNotNone(tag)
self.assertEqual(tag.id, 1)
def test_tag_rename_nonexistent(self):
"""Test renaming a nonexistent tag."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_tag_rename_nonexistent', schema, False)
table = self.catalog.get_table('default.test_tag_rename_nonexistent')
write_builder = table.new_batch_write_builder()
# Write some data
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1],
'k': [10],
'v': [100]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Try to rename nonexistent tag
with self.assertRaises(ValueError) as context:
table.rename_tag("nonexistent", "new_tag")
self.assertIn("doesn't exist", str(context.exception))
def test_tag_rename_to_existing(self):
"""Test renaming a tag to a name that already exists."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_tag_rename_to_existing', schema, False)
table = self.catalog.get_table('default.test_tag_rename_to_existing')
write_builder = table.new_batch_write_builder()
# Write some data - snapshot 1
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1],
'k': [10],
'v': [100]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Create two tags
table.create_tag("tag1")
table.create_tag("tag2")
# Try to rename tag1 to tag2 (which already exists)
with self.assertRaises(ValueError) as context:
table.rename_tag("tag1", "tag2")
self.assertIn("already exists", str(context.exception))
# Verify both original tags still exist
tags = table.list_tags()
self.assertIn("tag1", tags)
self.assertIn("tag2", tags)
def test_tag_rename_and_scan(self):
"""Test that renaming a tag and then scanning by new name works correctly."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_tag_rename_scan', schema, False)
table = self.catalog.get_table('default.test_tag_rename_scan')
write_builder = table.new_batch_write_builder()
# First commit
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = pa.Table.from_pydict({
'pt': [1, 1],
'k': [10, 20],
'v': [100, 200]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data1)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Second commit
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data2 = pa.Table.from_pydict({
'pt': [2, 2],
'k': [30, 40],
'v': [101, 201]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data2)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Create tag at snapshot 1
table.create_tag("old_tag", snapshot_id=1)
# Rename the tag
table.rename_tag("old_tag", "new_tag")
# Read from the renamed tag
table_with_tag = table.copy({CoreOptions.SCAN_TAG_NAME.key(): "new_tag"})
read_builder = table_with_tag.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
result = table_read.to_arrow(table_scan.plan().splits())
# Verify: should only contain data from snapshot 1
result_sorted = result.sort_by([('pt', 'ascending'), ('k', 'ascending')])
expected = pa.Table.from_pydict({
'pt': [1, 1],
'k': [10, 20],
'v': [100, 200]
}, schema=self.pk_pa_schema)
self.assertEqual(result_sorted.num_rows, 2)
self.assertEqual(result_sorted.column('pt').to_pylist(), expected.column('pt').to_pylist())
self.assertEqual(result_sorted.column('k').to_pylist(), expected.column('k').to_pylist())
self.assertEqual(result_sorted.column('v').to_pylist(), expected.column('v').to_pylist())
def test_schema_evolution_tag_read(self):
# schema 0
pa_schema = pa.schema([
('user_id', pa.int64()),
('item_id', pa.int64()),
('dt', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
self.catalog.create_table('default.test_0', schema, False)
table1 = self.catalog.get_table('default.test_0')
write_builder = table1.new_batch_write_builder()
# write 1
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = {
'user_id': [1, 2, 3, 4],
'item_id': [1001, 1002, 1003, 1004],
'dt': ['p1', 'p1', 'p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
table_write.write_arrow(pa_table)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# write 2
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = {
'user_id': [11, 22, 33, 44],
'item_id': [1001, 1002, 1003, 1004],
'dt': ['p1', 'p1', 'p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
table_write.write_arrow(pa_table)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# schema 1 add behavior column
pa_schema = pa.schema([
('user_id', pa.int64()),
('item_id', pa.int64()),
('dt', pa.string()),
('behavior', pa.string())
])
schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
self.catalog.create_table('default.test_1', schema2, False)
table2 = self.catalog.get_table('default.test_1')
write_builder = table2.new_batch_write_builder()
# write 1
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data2 = {
'user_id': [5, 6, 7, 8],
'item_id': [1005, 1006, 1007, 1008],
'dt': ['p2', 'p1', 'p2', 'p2'],
'behavior': ['e', 'f', 'g', 'h'],
}
pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
table_write.write_arrow(pa_table)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# write 2
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data2 = {
'user_id': [55, 66, 77, 88],
'item_id': [1005, 1006, 1007, 1008],
'dt': ['p2', 'p1', 'p2', 'p2'],
'behavior': ['e', 'f', 'g', 'h'],
}
pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
table_write.write_arrow(pa_table)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
# Create tag at snapshot 2
table1.create_tag("tag2", snapshot_id=2)
table2.create_tag("tag2", snapshot_id=2)
# When table2 read tag2, it will access table1's schema
table2.schema_manager = SchemaManager(table2.file_io, table1.table_path)
table_with_tag = table2.copy({CoreOptions.SCAN_TAG_NAME.key(): "tag2"})
read_builder = table_with_tag.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
result = table_read.to_arrow(table_scan.plan().splits())
self.assertEqual(result.num_rows, 8)
expected_schema = pa.schema([
('user_id', pa.int64()),
('item_id', pa.int64()),
('dt', pa.string())
])
expected = pa.Table.from_pydict({
'user_id': [5, 6, 7, 8, 55, 66, 77, 88],
'item_id': [1005, 1006, 1007, 1008, 1005, 1006, 1007, 1008],
'dt': ["p2", "p1", "p2", "p2", "p2", "p1", "p2", "p2"]
}, schema=expected_schema)
self.assertEqual(expected, result.sort_by('user_id'))
def test_table_rollback_to_snapshot(self):
"""Test table-level rollback to a specific snapshot."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_rollback_snapshot', schema, False)
table = self.catalog.get_table('default.test_rollback_snapshot')
write_builder = table.new_batch_write_builder()
# Write 5 commits
for i in range(5):
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1],
'k': [i],
'v': [i * 100]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
snapshot_mgr = table.snapshot_manager()
self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
# Rollback to snapshot 3
table.rollback_to(3)
self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
self.assertIsNone(snapshot_mgr.get_snapshot_by_id(4))
self.assertIsNone(snapshot_mgr.get_snapshot_by_id(5))
def test_table_rollback_to_tag(self):
"""Test table-level rollback to a specific tag."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_rollback_tag', schema, False)
table = self.catalog.get_table('default.test_rollback_tag')
write_builder = table.new_batch_write_builder()
# Write 5 commits and create tags
for i in range(5):
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1],
'k': [i],
'v': [i * 100]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
table.create_tag("v{}".format(i + 1))
snapshot_mgr = table.snapshot_manager()
tag_mgr = table.tag_manager()
self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
# Rollback to tag v3
table.rollback_to("v3")
self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
# Tags with snapshot > 3 should be cleaned
self.assertFalse(tag_mgr.tag_exists("v4"))
self.assertFalse(tag_mgr.tag_exists("v5"))
# Tag v3 should still exist
self.assertTrue(tag_mgr.tag_exists("v3"))
def test_table_rollback_to_nonexistent_snapshot(self):
"""Test that rollback to a non-existent snapshot raises ValueError."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_rollback_no_snap', schema, False)
table = self.catalog.get_table('default.test_rollback_no_snap')
write_builder = table.new_batch_write_builder()
# Write 2 commits
for i in range(2):
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1],
'k': [i],
'v': [i * 100]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
with self.assertRaises(ValueError) as context:
table.rollback_to(99)
self.assertIn("Rollback snapshot", str(context.exception))
self.assertIn("99", str(context.exception))
self.assertIn("doesn't exist", str(context.exception))
def test_table_rollback_to_nonexistent_tag(self):
"""Test that rollback to a non-existent tag raises ValueError."""
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['pt', 'k'],
partition_keys=['pt'],
options={'bucket': '3'}
)
self.catalog.create_table('default.test_rollback_no_tag', schema, False)
table = self.catalog.get_table('default.test_rollback_no_tag')
write_builder = table.new_batch_write_builder()
# Write 1 commit
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = pa.Table.from_pydict({
'pt': [1],
'k': [10],
'v': [100]
}, schema=self.pk_pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
with self.assertRaises(ValueError) as context:
table.rollback_to("no-such-tag")
self.assertIn("no-such-tag", str(context.exception))
self.assertIn("doesn't exist", str(context.exception))