blob: c33fdf9beb40faed0099cb6e3f5a06c6640c9d7d [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
import warnings
from unittest.mock import Mock
from pypaimon.common.options.core_options import CoreOptions, StartupMode
from pypaimon.read.table_scan import TableScan
def _scan(options):
scan = TableScan.__new__(TableScan)
scan.table = Mock()
scan.table.options = CoreOptions.from_dict(options)
return scan
class TableScanModeTest(unittest.TestCase):
def test_from_timestamp_requires_timestamp_option(self):
scan = _scan({
CoreOptions.SCAN_MODE.key(): StartupMode.FROM_TIMESTAMP.value,
})
with self.assertRaisesRegex(
ValueError,
"neither scan.timestamp-millis nor scan.timestamp is set"):
scan._validate_scan_mode()
def test_latest_conflicts_with_snapshot_id(self):
scan = _scan({
CoreOptions.SCAN_MODE.key(): StartupMode.LATEST.value,
CoreOptions.SCAN_SNAPSHOT_ID.key(): "1",
})
with self.assertRaisesRegex(ValueError, "scan.snapshot-id"):
scan._validate_scan_mode()
def test_default_with_timestamp_millis_resolves_to_from_timestamp(self):
options = CoreOptions.from_dict({
CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value,
CoreOptions.SCAN_TIMESTAMP_MILLIS.key(): "123",
})
self.assertEqual(options.startup_mode(), StartupMode.FROM_TIMESTAMP)
_scan(options.options.to_map())._validate_scan_mode()
def test_default_with_snapshot_id_resolves_to_from_snapshot(self):
options = CoreOptions.from_dict({
CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value,
CoreOptions.SCAN_SNAPSHOT_ID.key(): "1",
})
self.assertEqual(options.startup_mode(), StartupMode.FROM_SNAPSHOT)
_scan(options.options.to_map())._validate_scan_mode()
def test_unsupported_scan_modes_raise_value_error(self):
scan = _scan({
CoreOptions.SCAN_MODE.key(): StartupMode.COMPACTED_FULL.value,
})
with self.assertRaisesRegex(ValueError, "not yet supported"):
scan._validate_scan_mode()
def test_full_mode_maps_to_latest_full_with_deprecation_warning(self):
options = CoreOptions.from_dict({
CoreOptions.SCAN_MODE.key(): StartupMode.FULL.value,
})
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
self.assertEqual(options.startup_mode(), StartupMode.LATEST_FULL)
self.assertEqual(len(caught), 1)
self.assertTrue(issubclass(caught[0].category, DeprecationWarning))
if __name__ == '__main__':
unittest.main()