blob: d5feba2b7de680b0965ad326c942a085f8c8b96b [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Tests for IncrementalDiffScanner."""
import unittest
from unittest.mock import Mock, patch
def _create_mock_table():
table = Mock()
table.is_primary_key_table = False
table.options = Mock()
table.options.source_split_target_size.return_value = 128 * 1024 * 1024
table.options.source_split_open_file_cost.return_value = 4 * 1024 * 1024
table.options.scan_manifest_parallelism.return_value = 8
table.partition_keys = []
return table
def _create_mock_entry(partition_values, bucket, filename):
entry = Mock()
entry.partition = Mock()
entry.partition.values = partition_values
entry.bucket = bucket
entry.total_buckets = 1
entry.file = Mock()
entry.file.file_name = filename
entry.file.file_size = 1024
entry.file.row_count = 100
entry.kind = 0 # ADD
return entry
def _create_mock_snapshot(snapshot_id, commit_kind="APPEND",
base_manifest="base", delta_manifest="delta"):
snapshot = Mock()
snapshot.id = snapshot_id
snapshot.commit_kind = commit_kind
snapshot.base_manifest_list = f"{base_manifest}-{snapshot_id}"
snapshot.delta_manifest_list = f"{delta_manifest}-{snapshot_id}"
return snapshot
class IncrementalDiffScannerTest(unittest.TestCase):
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
def test_diff_returns_only_added_files(self, MockManifestFileManager, MockManifestListManager):
"""Files in end but not in start should be returned."""
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
table = _create_mock_table()
start_entries = [
_create_mock_entry([], 0, "file1.parquet"),
_create_mock_entry([], 0, "file2.parquet"),
]
end_entries = [
_create_mock_entry([], 0, "file1.parquet"),
_create_mock_entry([], 0, "file2.parquet"),
_create_mock_entry([], 0, "file3.parquet"),
_create_mock_entry([], 0, "file4.parquet"),
]
mock_mlm = MockManifestListManager.return_value
mock_mfm = MockManifestFileManager.return_value
mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")]
mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries]
scanner = IncrementalDiffScanner(table)
start_snapshot = _create_mock_snapshot(1)
end_snapshot = _create_mock_snapshot(5)
added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
added_filenames = {e.file.file_name for e in added_entries}
self.assertEqual(added_filenames, {"file3.parquet", "file4.parquet"})
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
def test_diff_handles_empty_start(self, MockManifestFileManager, MockManifestListManager):
"""When start is empty, all end files should be returned."""
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
table = _create_mock_table()
start_entries = []
end_entries = [
_create_mock_entry([], 0, "file1.parquet"),
_create_mock_entry([], 0, "file2.parquet"),
]
mock_mlm = MockManifestListManager.return_value
mock_mfm = MockManifestFileManager.return_value
mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")]
mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries]
scanner = IncrementalDiffScanner(table)
start_snapshot = _create_mock_snapshot(0)
end_snapshot = _create_mock_snapshot(5)
added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
added_filenames = {e.file.file_name for e in added_entries}
self.assertEqual(added_filenames, {"file1.parquet", "file2.parquet"})
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
def test_diff_handles_same_snapshots(self, MockManifestFileManager, MockManifestListManager):
"""When start == end (same files), diff should return empty."""
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
table = _create_mock_table()
entries = [
_create_mock_entry([], 0, "file1.parquet"),
_create_mock_entry([], 0, "file2.parquet"),
]
mock_mlm = MockManifestListManager.return_value
mock_mfm = MockManifestFileManager.return_value
mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")]
mock_mfm.read_entries_parallel.side_effect = [entries, entries]
scanner = IncrementalDiffScanner(table)
snapshot = _create_mock_snapshot(5)
added_entries = scanner.compute_diff(snapshot, snapshot)
self.assertEqual(len(added_entries), 0)
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
def test_diff_groups_by_partition_bucket(self, MockManifestFileManager, MockManifestListManager):
"""Diff should be computed per (partition, bucket)."""
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
table = _create_mock_table()
start_entries = [
_create_mock_entry([1], 0, "file1.parquet"), # p1, bucket 0
_create_mock_entry([1], 1, "file3.parquet"), # p1, bucket 1
]
end_entries = [
_create_mock_entry([1], 0, "file1.parquet"), # p1, bucket 0
_create_mock_entry([1], 0, "file2.parquet"), # p1, bucket 0 - NEW
_create_mock_entry([1], 1, "file3.parquet"), # p1, bucket 1
]
mock_mlm = MockManifestListManager.return_value
mock_mfm = MockManifestFileManager.return_value
mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")]
mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries]
scanner = IncrementalDiffScanner(table)
start_snapshot = _create_mock_snapshot(1)
end_snapshot = _create_mock_snapshot(5)
added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
self.assertEqual(len(added_entries), 1)
self.assertEqual(added_entries[0].file.file_name, "file2.parquet")
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
def test_diff_handles_file_deletions(self, MockManifestFileManager, MockManifestListManager):
"""Files in start but not in end (deleted) should NOT be returned."""
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
table = _create_mock_table()
start_entries = [
_create_mock_entry([], 0, "file1.parquet"),
_create_mock_entry([], 0, "file2.parquet"),
_create_mock_entry([], 0, "file3.parquet"),
]
end_entries = [
_create_mock_entry([], 0, "file1.parquet"),
_create_mock_entry([], 0, "file2.parquet"),
]
mock_mlm = MockManifestListManager.return_value
mock_mfm = MockManifestFileManager.return_value
mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")]
mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries]
scanner = IncrementalDiffScanner(table)
start_snapshot = _create_mock_snapshot(1)
end_snapshot = _create_mock_snapshot(5)
added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
self.assertEqual(len(added_entries), 0)
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
def test_diff_creates_correct_splits(self, MockManifestFileManager, MockManifestListManager):
"""scan() should return a Plan with correct DataSplits."""
from pypaimon.read.plan import Plan
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
table = _create_mock_table()
start_entries = []
end_entries = [
_create_mock_entry([], 0, "file1.parquet"),
]
mock_mlm = MockManifestListManager.return_value
mock_mfm = MockManifestFileManager.return_value
mock_mlm.read_base.side_effect = lambda s: [Mock(file_name=f"manifest-{s.id}")]
mock_mfm.read_entries_parallel.side_effect = [start_entries, end_entries]
scanner = IncrementalDiffScanner(table)
start_snapshot = _create_mock_snapshot(0)
end_snapshot = _create_mock_snapshot(5)
plan = scanner.scan(start_snapshot, end_snapshot)
self.assertIsInstance(plan, Plan)
splits = plan.splits()
self.assertGreater(len(splits), 0)
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
def test_entry_key_uniqueness(self, MockManifestFileManager, MockManifestListManager):
"""Entry key should uniquely identify a file by (partition, bucket, filename)."""
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
table = _create_mock_table()
scanner = IncrementalDiffScanner(table)
# Same file in different partitions should have different keys
entry1 = _create_mock_entry([1], 0, "file.parquet")
entry2 = _create_mock_entry([2], 0, "file.parquet")
self.assertNotEqual(scanner._entry_key(entry1), scanner._entry_key(entry2))
# Same file in different buckets should have different keys
entry3 = _create_mock_entry([1], 0, "file.parquet")
entry4 = _create_mock_entry([1], 1, "file.parquet")
self.assertNotEqual(scanner._entry_key(entry3), scanner._entry_key(entry4))
# Same partition/bucket/filename should have same key
entry5 = _create_mock_entry([1], 0, "file.parquet")
entry6 = _create_mock_entry([1], 0, "file.parquet")
self.assertEqual(scanner._entry_key(entry5), scanner._entry_key(entry6))
class IncrementalDiffIntegrationTest(unittest.TestCase):
"""Integration tests comparing diff vs delta approaches."""
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
def test_diff_equals_delta_results(self, MockManifestFileManager, MockManifestListManager):
"""Diff and delta approaches should return the same added files."""
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
table = _create_mock_table()
base_entries_1 = [
_create_mock_entry([], 0, "file1.parquet"),
_create_mock_entry([], 0, "file2.parquet"),
]
base_entries_5 = [
_create_mock_entry([], 0, "file1.parquet"),
_create_mock_entry([], 0, "file2.parquet"),
_create_mock_entry([], 0, "file3.parquet"),
_create_mock_entry([], 0, "file4.parquet"),
_create_mock_entry([], 0, "file5.parquet"),
]
# Delta manifests for each snapshot
delta_2 = [_create_mock_entry([], 0, "file3.parquet")]
delta_3 = [] # COMPACT - no new files
delta_4 = [_create_mock_entry([], 0, "file4.parquet")]
delta_5 = [_create_mock_entry([], 0, "file5.parquet")]
mock_mlm = MockManifestListManager.return_value
mock_mfm = MockManifestFileManager.return_value
def mock_read_all(snapshot):
if snapshot.id == 1:
return [Mock(file_name="all-manifest-1")]
elif snapshot.id == 5:
return [Mock(file_name="all-manifest-5")]
return []
mock_mlm.read_all.side_effect = mock_read_all
def mock_read_entries(manifests, *args, **kwargs):
if manifests and manifests[0].file_name == "all-manifest-1":
return base_entries_1
elif manifests and manifests[0].file_name == "all-manifest-5":
return base_entries_5
return []
mock_mfm.read_entries_parallel.side_effect = mock_read_entries
scanner = IncrementalDiffScanner(table)
start_snapshot = _create_mock_snapshot(1)
end_snapshot = _create_mock_snapshot(5)
diff_entries = scanner.compute_diff(start_snapshot, end_snapshot)
diff_filenames = {e.file.file_name for e in diff_entries}
delta_filenames = {
e.file.file_name for entries in [delta_2, delta_3, delta_4, delta_5]
for e in entries
}
self.assertEqual(diff_filenames, delta_filenames)
self.assertEqual(diff_filenames, {"file3.parquet", "file4.parquet", "file5.parquet"})
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
def test_diff_handles_compaction_correctly(self, MockManifestFileManager, MockManifestListManager):
"""Compaction merging file1+file2 into file3 should return only file3."""
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
table = _create_mock_table()
base_entries_1 = [
_create_mock_entry([], 0, "file1.parquet"),
_create_mock_entry([], 0, "file2.parquet"),
]
base_entries_5 = [
_create_mock_entry([], 0, "file3.parquet"), # Compacted result
]
mock_mlm = MockManifestListManager.return_value
mock_mfm = MockManifestFileManager.return_value
def mock_read_all(snapshot):
if snapshot.id == 1:
return [Mock(file_name="all-manifest-1")]
elif snapshot.id == 5:
return [Mock(file_name="all-manifest-5")]
return []
mock_mlm.read_all.side_effect = mock_read_all
def mock_read_entries(manifests, *args, **kwargs):
if manifests and manifests[0].file_name == "all-manifest-1":
return base_entries_1
elif manifests and manifests[0].file_name == "all-manifest-5":
return base_entries_5
return []
mock_mfm.read_entries_parallel.side_effect = mock_read_entries
scanner = IncrementalDiffScanner(table)
start_snapshot = _create_mock_snapshot(1)
end_snapshot = _create_mock_snapshot(5)
diff_entries = scanner.compute_diff(start_snapshot, end_snapshot)
diff_filenames = {e.file.file_name for e in diff_entries}
self.assertEqual(diff_filenames, {"file3.parquet"})
if __name__ == '__main__':
unittest.main()