blob: 7b8b139f120013e1048260ad07f6592360c30373 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from pypaimon.common.options.options import Options
from pypaimon.snapshot.time_travel_util import SCAN_KEYS, TimeTravelUtil
class _StubSnapshot:
def __init__(self, snapshot_id, schema_id=0, time_millis=0, watermark=None):
self.id = snapshot_id
self.schema_id = schema_id
self.time_millis = time_millis
self.watermark = watermark
class _StubSnapshotManager:
def __init__(self, snapshots):
self._snapshots = {s.id: s for s in snapshots}
def get_snapshot_by_id(self, snapshot_id):
return self._snapshots.get(snapshot_id)
def try_get_earliest_snapshot(self):
if not self._snapshots:
return None
return self._snapshots[min(self._snapshots.keys())]
def get_latest_snapshot(self):
if not self._snapshots:
return None
return self._snapshots[max(self._snapshots.keys())]
def earlier_or_equal_time_mills(self, timestamp):
result = None
for snap in sorted(self._snapshots.values(), key=lambda s: s.id):
if snap.time_millis <= timestamp:
result = snap
else:
break
return result
def later_or_equal_watermark(self, watermark):
for snap in sorted(self._snapshots.values(), key=lambda s: s.id):
if snap.watermark is not None and snap.watermark >= watermark:
return snap
return None
class _StubTagManager:
def __init__(self, tags):
self._tags = tags
def get(self, name):
return self._tags.get(name)
class TimeTravelUtilTest(unittest.TestCase):
def test_returns_none_when_no_scan_option_set(self):
result = TimeTravelUtil.try_travel_to_snapshot(
Options({}), _StubTagManager({}), _StubSnapshotManager([])
)
self.assertIsNone(result)
def test_resolves_snapshot_id(self):
snap2 = _StubSnapshot(2)
result = TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.snapshot-id': '2'}),
_StubTagManager({}),
_StubSnapshotManager([_StubSnapshot(1), snap2]),
)
self.assertIs(result, snap2)
def test_unknown_snapshot_id_raises(self):
with self.assertRaises(ValueError):
TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.snapshot-id': '99'}),
_StubTagManager({}),
_StubSnapshotManager([_StubSnapshot(1)]),
)
def test_snapshot_id_without_snapshot_manager_raises(self):
with self.assertRaises(ValueError):
TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.snapshot-id': '1'}),
_StubTagManager({}),
None,
)
def test_rejects_setting_snapshot_id_and_tag_name_together(self):
# Defence-in-depth: even if a caller bypasses read_paimon's check,
# the util-level guard still complains.
with self.assertRaises(ValueError):
TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.snapshot-id': '1', 'scan.tag-name': 'v1'}),
_StubTagManager({}),
_StubSnapshotManager([_StubSnapshot(1)]),
)
def test_scan_keys_contains_both_options(self):
# Sanity check: SCAN_KEYS must enumerate all time-travel modes,
# otherwise the mutual-exclusion guard above would not trigger.
self.assertIn('scan.snapshot-id', SCAN_KEYS)
self.assertIn('scan.tag-name', SCAN_KEYS)
self.assertIn('scan.timestamp-millis', SCAN_KEYS)
self.assertIn('scan.timestamp', SCAN_KEYS)
self.assertIn('scan.watermark', SCAN_KEYS)
def test_resolves_timestamp_millis(self):
snap1 = _StubSnapshot(1, time_millis=1000)
snap2 = _StubSnapshot(2, time_millis=2000)
snap3 = _StubSnapshot(3, time_millis=3000)
mgr = _StubSnapshotManager([snap1, snap2, snap3])
result = TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.timestamp-millis': '2500'}),
_StubTagManager({}),
mgr,
)
self.assertIs(result, snap2)
def test_resolves_timestamp_millis_exact_match(self):
snap1 = _StubSnapshot(1, time_millis=1000)
snap2 = _StubSnapshot(2, time_millis=2000)
mgr = _StubSnapshotManager([snap1, snap2])
result = TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.timestamp-millis': '2000'}),
_StubTagManager({}),
mgr,
)
self.assertIs(result, snap2)
def test_timestamp_millis_no_match_raises(self):
snap1 = _StubSnapshot(1, time_millis=5000)
mgr = _StubSnapshotManager([snap1])
with self.assertRaises(ValueError):
TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.timestamp-millis': '1000'}),
_StubTagManager({}),
mgr,
)
def test_timestamp_millis_without_snapshot_manager_raises(self):
with self.assertRaises(ValueError):
TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.timestamp-millis': '1000'}),
_StubTagManager({}),
None,
)
def test_resolves_timestamp_string(self):
# Compute expected millis in local timezone, consistent with Java behavior
from datetime import datetime
expected_millis = int(datetime(2023, 12, 1, 0, 0, 0).timestamp() * 1000)
snap1 = _StubSnapshot(1, time_millis=expected_millis)
snap2 = _StubSnapshot(2, time_millis=expected_millis + 100000)
mgr = _StubSnapshotManager([snap1, snap2])
result = TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.timestamp': '2023-12-01 00:00:00'}),
_StubTagManager({}),
mgr,
)
self.assertIs(result, snap1)
def test_timestamp_string_invalid_format_raises(self):
snap1 = _StubSnapshot(1, time_millis=1000)
mgr = _StubSnapshotManager([snap1])
with self.assertRaises(ValueError):
TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.timestamp': 'not-a-timestamp'}),
_StubTagManager({}),
mgr,
)
def test_resolves_watermark(self):
snap1 = _StubSnapshot(1, watermark=100)
snap2 = _StubSnapshot(2, watermark=200)
snap3 = _StubSnapshot(3, watermark=300)
mgr = _StubSnapshotManager([snap1, snap2, snap3])
result = TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.watermark': '200'}),
_StubTagManager({}),
mgr,
)
self.assertIs(result, snap2)
def test_watermark_no_match_raises(self):
snap1 = _StubSnapshot(1, watermark=100)
mgr = _StubSnapshotManager([snap1])
with self.assertRaises(ValueError):
TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.watermark': '500'}),
_StubTagManager({}),
mgr,
)
def test_watermark_without_snapshot_manager_raises(self):
with self.assertRaises(ValueError):
TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.watermark': '100'}),
_StubTagManager({}),
None,
)
def test_rejects_multiple_time_travel_options(self):
mgr = _StubSnapshotManager([_StubSnapshot(1, time_millis=1000)])
with self.assertRaises(ValueError):
TimeTravelUtil.try_travel_to_snapshot(
Options({'scan.timestamp-millis': '1000', 'scan.snapshot-id': '1'}),
_StubTagManager({}),
mgr,
)
if __name__ == '__main__':
unittest.main()