blob: a9c94823739c0330516eedea024f647fa2bbc2d0 [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import unittest
from unittest.mock import Mock, patch
from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.schema.data_types import AtomicType, DataField
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.offset_row import OffsetRow
from pypaimon.write.commit.commit_scanner import CommitScanner
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_commit import FileStoreCommit
PARTITION_FIELDS = [
DataField(0, 'dt', AtomicType('STRING')),
DataField(1, 'region', AtomicType('STRING')),
]
TABLE_FIELDS = [
DataField(0, 'dt', AtomicType('STRING')),
DataField(1, 'id', AtomicType('INT')),
DataField(2, 'name', AtomicType('STRING')),
DataField(3, 'region', AtomicType('STRING')),
]
_partition_builder = PredicateBuilder(PARTITION_FIELDS)
def _mock_table():
table = Mock()
table.field_names = ['dt', 'id', 'name', 'region']
table.fields = TABLE_FIELDS
table.partition_keys = ['dt', 'region']
table.partition_keys_fields = PARTITION_FIELDS
return table
def _mock_scanner_table():
table = _mock_table()
table.trimmed_primary_keys = []
table.is_primary_key_table = False
table.options.source_split_target_size.return_value = 128 * 1024 * 1024
table.options.source_split_open_file_cost.return_value = 4 * 1024 * 1024
table.options.bucket.return_value = -1
table.options.data_evolution_enabled.return_value = False
table.options.deletion_vectors_enabled.return_value = False
table.options.scan_manifest_parallelism.return_value = 1
table.table_schema = Mock(id=0)
table.schema_manager = Mock()
table.schema_manager.get_schema.return_value = Mock(fields=TABLE_FIELDS)
return table
def _manifest_file_meta(partition_min, partition_max):
return ManifestFileMeta(
file_name='manifest-test',
file_size=1024,
num_added_files=1,
num_deleted_files=0,
partition_stats=SimpleStats(
min_values=GenericRow(partition_min, PARTITION_FIELDS),
max_values=GenericRow(partition_max, PARTITION_FIELDS),
null_counts=[0, 0],
),
schema_id=0,
)
def _manifest_entry(partition_values):
return ManifestEntry(
kind=0,
partition=GenericRow(partition_values, PARTITION_FIELDS),
bucket=0,
total_buckets=1,
file=Mock(),
)
@patch('pypaimon.read.scanner.file_scanner.SnapshotManager')
@patch('pypaimon.read.scanner.file_scanner.ManifestFileManager')
@patch('pypaimon.read.scanner.file_scanner.ManifestListManager')
class TestFileScannerPartitionPredicate(unittest.TestCase):
def _scanner(self, predicate=None, partition_predicate=None):
return FileScanner(
_mock_scanner_table(), lambda: ([], None),
predicate=predicate, partition_predicate=partition_predicate,
)
def test_partition_predicate_used_directly(self, *_):
pred = _partition_builder.equal('dt', '2024-01-15')
scanner = self._scanner(partition_predicate=pred)
self.assertIs(scanner.partition_key_predicate, pred)
self.assertIsNone(scanner.predicate)
self.assertIsNone(scanner.predicate_for_stats)
self.assertIsNone(scanner.primary_key_predicate)
def test_no_partition_predicate_derives_from_predicate(self, *_):
full_pred = PredicateBuilder(TABLE_FIELDS).equal('dt', '2024-01-15')
scanner = self._scanner(predicate=full_pred)
self.assertIsNotNone(scanner.partition_key_predicate)
self.assertEqual(scanner.partition_key_predicate.field, 'dt')
def test_neither_predicate_means_no_filtering(self, *_):
scanner = self._scanner()
self.assertIsNone(scanner.partition_key_predicate)
self.assertTrue(scanner._filter_manifest_entry(
_manifest_entry(['2024-01-15', 'us-east-1'])))
def test_filters_manifest_file_by_stats(self, *_):
scanner = self._scanner(
partition_predicate=_partition_builder.equal('dt', '2024-01-15'))
self.assertTrue(scanner._filter_manifest_file(
_manifest_file_meta(['2024-01-15', 'us-east-1'], ['2024-01-15', 'us-west-2'])))
self.assertFalse(scanner._filter_manifest_file(
_manifest_file_meta(['2024-01-16', 'us-east-1'], ['2024-01-16', 'us-west-2'])))
def test_filters_manifest_entry_by_partition(self, *_):
scanner = self._scanner(
partition_predicate=_partition_builder.and_predicates([
_partition_builder.equal('dt', '2024-01-15'),
_partition_builder.equal('region', 'us-east-1'),
]))
self.assertTrue(scanner._filter_manifest_entry(
_manifest_entry(['2024-01-15', 'us-east-1'])))
self.assertFalse(scanner._filter_manifest_entry(
_manifest_entry(['2024-01-16', 'us-east-1'])))
self.assertFalse(scanner._filter_manifest_entry(
_manifest_entry(['2024-01-15', 'us-west-2'])))
@patch('pypaimon.write.file_store_commit.SnapshotManager')
@patch('pypaimon.write.file_store_commit.ManifestFileManager')
@patch('pypaimon.write.file_store_commit.ManifestListManager')
class TestOverwritePartitionPredicate(unittest.TestCase):
_TARGET = {'dt': '2024-01-15', 'region': 'us-east-1'}
def setUp(self):
self.table = _mock_table()
def _create_commit(self, stub_commit=True):
commit = FileStoreCommit(Mock(), self.table, 'test_user')
if stub_commit:
commit._try_commit = Mock()
return commit
@staticmethod
def _msg(partition):
return CommitMessage(partition=partition, bucket=0, new_files=[Mock(row_count=10)])
def _extract_partition_predicate(self, commit):
entries_plan = commit._try_commit.call_args[1]['commit_entries_plan']
with patch('pypaimon.write.file_store_commit.FileScanner') as mock_cls:
mock_cls.return_value.read_manifest_entries.return_value = []
commit.manifest_list_manager.read_all.return_value = []
entries_plan(Mock(id=1))
return mock_cls.call_args[1]['partition_predicate']
def test_overwrite_rejects_mismatched_partition(self, *_):
commit = self._create_commit(stub_commit=False)
with self.assertRaises(RuntimeError) as ctx:
commit.overwrite(self._TARGET, [self._msg(('2024-01-15', 'us-west-2'))], 1)
self.assertIn('does not belong to this partition', str(ctx.exception))
def test_overwrite_passes_partition_scoped_predicate(self, *_):
commit = self._create_commit()
commit.overwrite(self._TARGET, [self._msg(('2024-01-15', 'us-east-1'))], 1)
pred = self._extract_partition_predicate(commit)
self.assertTrue(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 2)))
self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-west-2'), 0, 2)))
def test_drop_partitions_passes_or_predicate(self, *_):
commit = self._create_commit()
commit.drop_partitions([
{'dt': '2024-01-15', 'region': 'us-east-1'},
{'dt': '2024-01-16', 'region': 'us-west-2'},
], 1)
pred = self._extract_partition_predicate(commit)
self.assertTrue(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 2)))
self.assertTrue(pred.test(OffsetRow(('2024-01-16', 'us-west-2'), 0, 2)))
self.assertFalse(pred.test(OffsetRow(('2024-01-17', 'eu-west-1'), 0, 2)))
class TestCommitScannerPartitionPredicate(unittest.TestCase):
def _scanner(self):
return CommitScanner(_mock_table(), Mock())
def test_filter_uses_partition_key_index(self):
scanner = self._scanner()
pred = scanner._build_partition_filter_from_entries([
_manifest_entry(['2024-01-15', 'us-east-1']),
_manifest_entry(['2024-01-16', 'us-west-2']),
])
self.assertTrue(pred.test(GenericRow(['2024-01-15', 'us-east-1'], PARTITION_FIELDS)))
self.assertTrue(pred.test(GenericRow(['2024-01-16', 'us-west-2'], PARTITION_FIELDS)))
self.assertFalse(pred.test(GenericRow(['2024-01-17', 'eu-west-1'], PARTITION_FIELDS)))
def test_filter_none_without_partition_keys(self):
scanner = CommitScanner(Mock(partition_keys=[]), Mock())
pred = scanner._build_partition_filter_from_entries(
[_manifest_entry(['2024-01-15', 'us-east-1'])])
self.assertIsNone(pred)
@patch('pypaimon.write.commit.commit_scanner.FileScanner')
def test_passes_partition_predicate_to_file_scanner(self, mock_scanner_cls):
mock_scanner_cls.return_value.read_manifest_entries.return_value = []
entries = [_manifest_entry(['2024-01-15', 'us-east-1'])]
cases = [
('read_all_entries_from_changed_partitions', 'read_all', []),
('read_incremental_entries_from_changed_partitions', 'read_delta', [Mock()]),
]
for method, setup_attr, setup_return in cases:
with self.subTest(method=method):
mock_scanner_cls.reset_mock()
scanner = self._scanner()
getattr(scanner.manifest_list_manager, setup_attr).return_value = setup_return
getattr(scanner, method)(Mock(), entries)
kwargs = mock_scanner_cls.call_args[1]
self.assertIn('partition_predicate', kwargs)
self.assertIsNotNone(kwargs['partition_predicate'])
self.assertNotIn('predicate', kwargs)