blob: ef09066f90f485dbcb47dd0c772bd25fb3dee98b [file]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import struct
import unittest
from datetime import datetime
from decimal import Decimal
from pypaimon.schema.data_types import AtomicType, DataField
from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
GenericRowSerializer)
from pypaimon.table.row.row_kind import RowKind
# Java-golden epoch millis, computed from Timestamp.fromLocalDateTime:
# millisecond = epochDay(date) * 86_400_000 + nanoOfDay(time) / 1_000_000
# nanoOfMillisecond = nanoOfDay(time) % 1_000_000
# These are deterministic, independent of host timezone, and match Java's
# BinaryRowWriter wire format exactly. Do NOT compute them via any pypaimon
# helper — the whole point is to have an external oracle.
TS_2025_04_08_10_30_00_123456 = datetime(2025, 4, 8, 10, 30, 0, 123456)
TS_2025_04_08_10_30_00_123456_MILLIS = 1_744_108_200_123
TS_2025_04_08_10_30_00_123456_NANO_OF_MS = 456_000
TS_2025_01_01_MILLIS = 1_735_689_600_000 # datetime(2025, 1, 1, 0, 0, 0)
TS_1969_07_20_20_17_00_MILLIS = -14_182_980_000 # datetime(1969, 7, 20, 20, 17, 0)
class TimestampTest(unittest.TestCase):
"""Tests for timestamp serialization/deserialization in GenericRow.
Semantic contract: naive datetime fields are interpreted as if in UTC,
matching Java's Timestamp.fromLocalDateTime (Timestamp.java:195). All
oracles below are hand-computed from Java's algorithm, not from any
pypaimon helper.
"""
def test_timestamp_compact_binary_format(self):
"""Compact layout: raw epoch millis in fixed slot, no variable area."""
fields = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, RowKind.INSERT)
serialized = GenericRowSerializer.to_bytes(row)
data = serialized[4:]
null_bits_size = 8
fixed_part_size = null_bits_size + 1 * 8
self.assertEqual(len(data), fixed_part_size)
millis = struct.unpack('<q', data[null_bits_size:null_bits_size + 8])[0]
self.assertEqual(millis, TS_2025_04_08_10_30_00_123456_MILLIS)
def test_timestamp_compact_round_trip(self):
"""Compact round-trip preserves the datetime down to milliseconds."""
fields = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
ts = datetime(2025, 4, 8, 10, 30, 0, 123000) # only millis meaningful
row = GenericRow([ts], fields, RowKind.INSERT)
serialized = GenericRowSerializer.to_bytes(row)
result = GenericRowDeserializer.from_bytes(serialized, fields)
self.assertEqual(result.values[0], ts)
def test_timestamp_non_compact_binary_format(self):
"""Non-compact layout: (cursor<<32)|nanoOfMilli in fixed slot,
epoch millis long in variable area. Both oracles come from Java."""
fields = [DataField(0, "ts", AtomicType("TIMESTAMP(6)"))]
row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, RowKind.INSERT)
serialized = GenericRowSerializer.to_bytes(row)
data = serialized[4:]
null_bits_size = 8
fixed_part_size = null_bits_size + 1 * 8
self.assertEqual(len(data), fixed_part_size + 8)
offset_and_nano = struct.unpack('<q', data[null_bits_size:null_bits_size + 8])[0]
cursor = (offset_and_nano >> 32) & 0xFFFFFFFF
nano_of_millisecond = offset_and_nano & 0xFFFFFFFF
self.assertEqual(cursor, fixed_part_size)
self.assertEqual(nano_of_millisecond, TS_2025_04_08_10_30_00_123456_NANO_OF_MS)
var_millis = struct.unpack('<q', data[cursor:cursor + 8])[0]
self.assertEqual(var_millis, TS_2025_04_08_10_30_00_123456_MILLIS)
def test_timestamp_non_compact_round_trip(self):
"""Non-compact round-trip preserves microseconds."""
for type_str in ["TIMESTAMP(6)", "TIMESTAMP(9)"]:
with self.subTest(type=type_str):
fields = [DataField(0, "ts", AtomicType(type_str))]
row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, RowKind.INSERT)
serialized = GenericRowSerializer.to_bytes(row)
result = GenericRowDeserializer.from_bytes(serialized, fields)
self.assertEqual(result.values[0], TS_2025_04_08_10_30_00_123456)
def test_timestamp_non_compact_null(self):
fields = [DataField(0, "ts", AtomicType("TIMESTAMP(6)"))]
row = GenericRow([None], fields, RowKind.INSERT)
serialized = GenericRowSerializer.to_bytes(row)
result = GenericRowDeserializer.from_bytes(serialized, fields)
self.assertIsNone(result.values[0])
def test_timestamp_boundary_precision(self):
"""Precision 3 is last compact, precision 4 is first non-compact."""
fixed_part_size = 8 + 1 * 8
fields_3 = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
s_3 = GenericRowSerializer.to_bytes(
GenericRow([TS_2025_04_08_10_30_00_123456], fields_3, RowKind.INSERT))
self.assertEqual(len(s_3[4:]), fixed_part_size)
fields_4 = [DataField(0, "ts", AtomicType("TIMESTAMP(4)"))]
s_4 = GenericRowSerializer.to_bytes(
GenericRow([TS_2025_04_08_10_30_00_123456], fields_4, RowKind.INSERT))
self.assertEqual(len(s_4[4:]), fixed_part_size + 8)
def test_timestamp_mixed_with_other_types(self):
fields = [
DataField(0, "id", AtomicType("INT")),
DataField(1, "name", AtomicType("STRING")),
DataField(2, "ts_compact", AtomicType("TIMESTAMP(3)")),
DataField(3, "ts_non_compact", AtomicType("TIMESTAMP(6)")),
DataField(4, "dec", AtomicType("DECIMAL(38, 10)")),
]
ts_compact = datetime(2025, 1, 1, 0, 0, 0)
dec_val = Decimal("12345678901234567890.1234567890")
row = GenericRow(
[42, "hello", ts_compact, TS_2025_04_08_10_30_00_123456, dec_val],
fields, RowKind.INSERT
)
serialized = GenericRowSerializer.to_bytes(row)
result = GenericRowDeserializer.from_bytes(serialized, fields)
self.assertEqual(result.values[0], 42)
self.assertEqual(result.values[1], "hello")
self.assertEqual(result.values[2], ts_compact)
self.assertEqual(result.values[3], TS_2025_04_08_10_30_00_123456)
self.assertEqual(result.values[4], dec_val)
def test_timestamp_default_precision_is_six(self):
"""Bare TIMESTAMP must match Java's TimestampType.DEFAULT_PRECISION = 6,
i.e. use the non-compact layout and preserve microseconds."""
fields = [DataField(0, "ts", AtomicType("TIMESTAMP"))]
row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, RowKind.INSERT)
serialized = GenericRowSerializer.to_bytes(row)
data = serialized[4:]
null_bits_size = 8
fixed_part_size = null_bits_size + 1 * 8
# non-compact layout must emit an 8-byte variable area
self.assertEqual(len(data), fixed_part_size + 8)
offset_and_nano = struct.unpack('<q', data[null_bits_size:null_bits_size + 8])[0]
nano_of_millisecond = offset_and_nano & 0xFFFFFFFF
self.assertEqual(nano_of_millisecond, TS_2025_04_08_10_30_00_123456_NANO_OF_MS)
result = GenericRowDeserializer.from_bytes(serialized, fields)
self.assertEqual(result.values[0], TS_2025_04_08_10_30_00_123456)
def test_timestamp_ltz_default_precision_is_six(self):
"""Bare TIMESTAMP_LTZ must also default to precision 6 (non-compact)."""
fields = [DataField(0, "ts", AtomicType("TIMESTAMP_LTZ"))]
row = GenericRow([TS_2025_04_08_10_30_00_123456], fields, RowKind.INSERT)
serialized = GenericRowSerializer.to_bytes(row)
fixed_part_size = 8 + 1 * 8
self.assertEqual(len(serialized[4:]), fixed_part_size + 8)
result = GenericRowDeserializer.from_bytes(serialized, fields)
self.assertEqual(result.values[0], TS_2025_04_08_10_30_00_123456)
def test_timestamp_golden_2025_01_01(self):
"""Hard-coded Java golden: 2025-01-01 00:00:00 UTC == 1_735_689_600_000 ms.
This is what catches a regression into local-time semantics, since the
value is picked so every tz offset produces a different number."""
fields = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
ts = datetime(2025, 1, 1, 0, 0, 0)
serialized = GenericRowSerializer.to_bytes(GenericRow([ts], fields, RowKind.INSERT))
data = serialized[4:]
millis = struct.unpack('<q', data[8:16])[0]
self.assertEqual(millis, TS_2025_01_01_MILLIS)
def test_timestamp_pre_epoch(self):
"""Negative epoch millis round-trip correctly.
Golden: 1969-07-20 20:17:00 UTC == -14_182_980_000 ms (Java-computed)."""
fields3 = [DataField(0, "ts", AtomicType("TIMESTAMP(3)"))]
ts_pre = datetime(1969, 7, 20, 20, 17, 0)
serialized = GenericRowSerializer.to_bytes(GenericRow([ts_pre], fields3, RowKind.INSERT))
data = serialized[4:]
millis = struct.unpack('<q', data[8:16])[0]
self.assertEqual(millis, TS_1969_07_20_20_17_00_MILLIS)
result = GenericRowDeserializer.from_bytes(serialized, fields3)
self.assertEqual(result.values[0], ts_pre)
# Non-compact
fields6 = [DataField(0, "ts", AtomicType("TIMESTAMP(6)"))]
ts_pre_us = datetime(1969, 7, 20, 20, 17, 0, 123456)
row6 = GenericRow([ts_pre_us], fields6, RowKind.INSERT)
serialized6 = GenericRowSerializer.to_bytes(row6)
result6 = GenericRowDeserializer.from_bytes(serialized6, fields6)
self.assertEqual(result6.values[0], ts_pre_us)
def test_timestamp_ltz_non_compact(self):
fields = [DataField(0, "ts", AtomicType("TIMESTAMP_LTZ(6)"))]
ts = datetime(2025, 4, 8, 10, 30, 0, 654321)
row = GenericRow([ts], fields, RowKind.INSERT)
serialized = GenericRowSerializer.to_bytes(row)
result = GenericRowDeserializer.from_bytes(serialized, fields)
self.assertEqual(result.values[0], ts)
if __name__ == '__main__':
unittest.main()