blob: 0dc7ebf53fffcb854d0b5d0ba06ef154ce9cdbed [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Acceptance tests for IncrementalDiffScanner.
These tests verify that the diff approach (reading 2 base_manifest_lists)
returns the same data as the delta approach (reading N delta_manifest_lists).
Uses real file I/O with local temp filesystem.
"""
import os
import shutil
import tempfile
import unittest
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.read.scanner.append_table_split_generator import \
AppendTableSplitGenerator
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
from pypaimon.snapshot.snapshot_manager import SnapshotManager
class IncrementalDiffAcceptanceTest(unittest.TestCase):
"""Acceptance tests for diff vs delta equivalence with real data."""
@classmethod
def setUpClass(cls):
cls.tempdir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
cls.catalog.create_database('default', True)
cls.pa_schema = pa.schema([
('id', pa.int32()),
('value', pa.string()),
('partition_col', pa.string())
])
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)
def _create_table_with_snapshots(self, name, num_snapshots=5, partition_keys=None):
"""Create a table and write num_snapshots of data.
Returns:
Tuple of (table, expected_data_per_snapshot)
"""
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=partition_keys)
self.catalog.create_table(f'default.{name}', schema, False)
table = self.catalog.get_table(f'default.{name}')
all_data = []
for snap_id in range(1, num_snapshots + 1):
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data = {
'id': [snap_id * 10 + i for i in range(5)],
'value': [f'snap{snap_id}_row{i}' for i in range(5)],
'partition_col': ['p1' if i % 2 == 0 else 'p2' for i in range(5)]
}
all_data.append(data)
pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
table_write.write_arrow(pa_table)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
return table, all_data
def _read_via_diff(self, table, start_snap_id, end_snap_id):
"""Read data using IncrementalDiffScanner between two snapshots."""
snapshot_manager = SnapshotManager(table)
start_snapshot = snapshot_manager.get_snapshot_by_id(start_snap_id)
end_snapshot = snapshot_manager.get_snapshot_by_id(end_snap_id)
scanner = IncrementalDiffScanner(table)
plan = scanner.scan(start_snapshot, end_snapshot)
table_read = table.new_read_builder().new_read()
return table_read.to_arrow(plan.splits())
def _read_via_delta(self, table, start_snap_id, end_snap_id):
"""Read data by iterating delta_manifest_lists between two snapshots."""
snapshot_manager = SnapshotManager(table)
manifest_list_manager = ManifestListManager(table)
manifest_file_manager = ManifestFileManager(table)
all_entries = []
for snap_id in range(start_snap_id + 1, end_snap_id + 1):
snapshot = snapshot_manager.get_snapshot_by_id(snap_id)
if snapshot and snapshot.commit_kind == 'APPEND':
manifest_files = manifest_list_manager.read_delta(snapshot)
if manifest_files:
entries = manifest_file_manager.read_entries_parallel(manifest_files)
all_entries.extend(entries)
# Create splits from entries
options = table.options
split_generator = AppendTableSplitGenerator(
table,
options.source_split_target_size(),
options.source_split_open_file_cost(),
{}
)
splits = split_generator.create_splits(all_entries)
table_read = table.new_read_builder().new_read()
return table_read.to_arrow(splits)
def _rows_to_set(self, arrow_table):
"""Convert arrow table to set of (id, value, partition_col) tuples."""
rows = set()
for i in range(arrow_table.num_rows):
row = (
arrow_table.column('id')[i].as_py(),
arrow_table.column('value')[i].as_py(),
arrow_table.column('partition_col')[i].as_py()
)
rows.add(row)
return rows
def test_diff_returns_same_rows_as_delta_simple(self):
"""
Basic case: 5 snapshots, verify row-level equivalence.
Creates a table with 5 snapshots, then reads data from snapshot 1 to 5
using both diff and delta approaches, verifying they return the same rows.
"""
table, all_data = self._create_table_with_snapshots(
'test_diff_delta_simple',
num_snapshots=5
)
# Read using both approaches (from snapshot 1 to 5, so we get snapshots 2-5)
diff_result = self._read_via_diff(table, 1, 5)
delta_result = self._read_via_delta(table, 1, 5)
# Convert to sets for order-independent comparison
diff_rows = self._rows_to_set(diff_result)
delta_rows = self._rows_to_set(delta_result)
self.assertEqual(diff_rows, delta_rows)
# Verify we got the expected number of rows (snapshots 2-5, 5 rows each = 20)
self.assertEqual(len(diff_rows), 20)
# Verify specific IDs are present (from snapshots 2-5)
expected_ids = set()
for snap_id in range(2, 6): # snapshots 2, 3, 4, 5
for i in range(5):
expected_ids.add(snap_id * 10 + i)
actual_ids = {row[0] for row in diff_rows}
self.assertEqual(actual_ids, expected_ids)
def test_diff_returns_same_rows_as_delta_many_snapshots(self):
"""
Stress test: 20 snapshots, verify row-level equivalence.
This tests the catch-up scenario where there are many snapshots
between start and end.
"""
table, all_data = self._create_table_with_snapshots(
'test_diff_delta_many',
num_snapshots=20
)
# Read using both approaches (from snapshot 1 to 20)
diff_result = self._read_via_diff(table, 1, 20)
delta_result = self._read_via_delta(table, 1, 20)
# Convert to sets for order-independent comparison
diff_rows = self._rows_to_set(diff_result)
delta_rows = self._rows_to_set(delta_result)
self.assertEqual(diff_rows, delta_rows)
# Verify we got the expected number of rows (snapshots 2-20, 5 rows each = 95)
self.assertEqual(len(diff_rows), 95)
def test_diff_returns_same_rows_with_mixed_partitions(self):
"""
Partitioned table: Verify diff handles multiple partitions correctly.
Creates a partitioned table and verifies diff and delta return
the same rows across all partitions.
"""
table, all_data = self._create_table_with_snapshots(
'test_diff_delta_partitioned',
num_snapshots=5,
partition_keys=['partition_col']
)
# Read using both approaches
diff_result = self._read_via_diff(table, 1, 5)
delta_result = self._read_via_delta(table, 1, 5)
# Convert to sets for order-independent comparison
diff_rows = self._rows_to_set(diff_result)
delta_rows = self._rows_to_set(delta_result)
self.assertEqual(diff_rows, delta_rows)
# Verify both partitions have data
p1_rows = {r for r in diff_rows if r[2] == 'p1'}
p2_rows = {r for r in diff_rows if r[2] == 'p2'}
self.assertGreater(len(p1_rows), 0, "Should have rows in partition p1")
self.assertGreater(len(p2_rows), 0, "Should have rows in partition p2")
if __name__ == '__main__':
unittest.main()