blob: 5a04becde844aed87d0e50a4a6e6becc88b01d96 [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import random
import sys
import unittest
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
class DeltaVarintCompressorTest(unittest.TestCase):
"""Tests for DeltaVarintCompressor following org.apache.paimon.utils.DeltaVarintCompressorTest."""
def test_normal_case_1(self):
"""Test case for normal compression and decompression."""
original = [80, 50, 90, 80, 70]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed) # Verify data integrity
self.assertEqual(6, len(compressed)) # Optimized size for small deltas
def test_normal_case_2(self):
"""Test case for normal compression and decompression."""
original = [100, 50, 150, 100, 200]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed) # Verify data integrity
self.assertEqual(8, len(compressed)) # Optimized size for small deltas
def test_random(self):
"""Test with random data to ensure robustness."""
# Run multiple iterations to test various random cases
for _ in range(100): # Reduced from 10000 for reasonable test time
original = []
for i in range(100):
# Use a smaller range than Java's Long.MAX_VALUE for Python compatibility
original.append(random.randint(-sys.maxsize, sys.maxsize))
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed) # Verify data integrity
def test_empty_array(self):
"""Test case for empty array."""
original = []
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
self.assertEqual(0, len(compressed))
def test_single_element(self):
"""Test case for single-element array."""
original = [42]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# Calculate expected size: Varint encoding for 42 (0x2A -> 1 byte)
self.assertEqual(1, len(compressed))
def test_extreme_values(self):
"""Test case for extreme values (sys.maxsize and -sys.maxsize)."""
original = [-sys.maxsize, sys.maxsize]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# The compressed size will depend on the platform's sys.maxsize
# but should be reasonable for the delta encoding
self.assertGreater(len(compressed), 0)
def test_negative_deltas(self):
"""Test case for negative deltas with ZigZag optimization."""
original = [100, -50, -150, -100] # Negative values
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# Verify ZigZag optimization: -1 → 1 (1 byte)
# Delta sequence: [100, -150, -100, 50] → ZigZag →
# Each encoded in 1-2 bytes
self.assertLessEqual(len(compressed), 8) # Optimized size
def test_unsorted_data(self):
"""Test case for unsorted data (worse compression ratio)."""
original = [1000, 5, 9999, 12345, 6789]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# Larger deltas → more bytes (e.g., 9994 → 3 bytes)
self.assertGreater(len(compressed), 5) # Worse than sorted case
def test_corrupted_input(self):
"""Test case for corrupted input (invalid Varint)."""
# Invalid Varint (all continuation flags)
corrupted = bytes([0x80, 0x80, 0x80])
try:
result = DeltaVarintCompressor.decompress(corrupted)
# If it doesn't raise an exception, the result should be reasonable
self.assertIsInstance(result, list)
except (IndexError, ValueError, RuntimeError):
# It's acceptable to raise an exception for corrupted data
pass
def test_zero_values(self):
"""Test case for arrays with zero values."""
original = [0, 0, 0, 0, 0]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# All deltas are 0, so should compress very well
self.assertLessEqual(len(compressed), 5)
def test_ascending_sequence(self):
"""Test case for ascending sequence (optimal for delta compression)."""
original = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# All deltas are 1, so should compress very well
self.assertLessEqual(len(compressed), 15) # Much smaller than original
def test_descending_sequence(self):
"""Test case for descending sequence."""
original = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# All deltas are -1, should still compress well with ZigZag
self.assertLessEqual(len(compressed), 15)
def test_large_positive_values(self):
"""Test case for large positive values."""
original = [1000000, 2000000, 3000000, 4000000]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# Large values but consistent deltas should still compress reasonably
self.assertGreater(len(compressed), 4) # Will be larger due to big numbers
def test_mixed_positive_negative(self):
"""Test case for mixed positive and negative values."""
original = [100, -200, 300, -400, 500]
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# Mixed signs create larger deltas
self.assertGreater(len(compressed), 5)
def test_compression_efficiency(self):
"""Test that compression actually reduces size for suitable data."""
# Create a sequence with small deltas
original = []
base = 1000
for i in range(100):
base += random.randint(-10, 10) # Small deltas
original.append(base)
compressed = DeltaVarintCompressor.compress(original)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed)
# For small deltas, compression should be effective
# Original would need 8 bytes per int (800 bytes), compressed should be much smaller
self.assertLess(len(compressed), len(original) * 4) # At least 50% compression
def test_round_trip_consistency(self):
"""Test that multiple compress/decompress cycles are consistent."""
original = [1, 10, 100, 1000, 10000]
# First round trip
compressed1 = DeltaVarintCompressor.compress(original)
decompressed1 = DeltaVarintCompressor.decompress(compressed1)
# Second round trip
compressed2 = DeltaVarintCompressor.compress(decompressed1)
decompressed2 = DeltaVarintCompressor.decompress(compressed2)
# All should be identical
self.assertEqual(original, decompressed1)
self.assertEqual(original, decompressed2)
self.assertEqual(compressed1, compressed2)
def test_boundary_values(self):
"""Test boundary values for varint encoding."""
# Test values around varint boundaries (127, 16383, etc.)
boundary_values = [
0, 1, 127, 128, 255, 256,
16383, 16384, 32767, 32768,
-1, -127, -128, -255, -256,
-16383, -16384, -32767, -32768
]
compressed = DeltaVarintCompressor.compress(boundary_values)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(boundary_values, decompressed)
def test_java_compatibility_zigzag_encoding(self):
"""Test ZigZag encoding compatibility with Java implementation."""
# Test cases that verify ZigZag encoding matches Java's implementation
# ZigZag mapping: 0->0, -1->1, 1->2, -2->3, 2->4, -3->5, 3->6, etc.
zigzag_test_cases = [
(0, 0), # 0 -> 0
(-1, 1), # -1 -> 1
(1, 2), # 1 -> 2
(-2, 3), # -2 -> 3
(2, 4), # 2 -> 4
(-3, 5), # -3 -> 5
(3, 6), # 3 -> 6
(-64, 127), # -64 -> 127
(64, 128), # 64 -> 128
(-65, 129), # -65 -> 129
]
for original_value, expected_zigzag in zigzag_test_cases:
# Test single value compression to verify ZigZag encoding
compressed = DeltaVarintCompressor.compress([original_value])
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual([original_value], decompressed,
f"ZigZag encoding failed for value {original_value}")
def test_java_compatibility_known_vectors(self):
"""Test with known test vectors that should match Java implementation."""
# Test vectors with expected compressed output (hexadecimal)
test_vectors = [
# Simple cases
([0], "00"), # 0 -> ZigZag(0) = 0 -> Varint(0) = 0x00
([1], "02"), # 1 -> ZigZag(1) = 2 -> Varint(2) = 0x02
([-1], "01"), # -1 -> ZigZag(-1) = 1 -> Varint(1) = 0x01
([2], "04"), # 2 -> ZigZag(2) = 4 -> Varint(4) = 0x04
([-2], "03"), # -2 -> ZigZag(-2) = 3 -> Varint(3) = 0x03
# Delta encoding cases
([0, 1], "0002"), # [0, 1] -> [0, delta=1] -> [0x00, 0x02]
([1, 2], "0202"), # [1, 2] -> [1, delta=1] -> [0x02, 0x02]
([0, -1], "0001"), # [0, -1] -> [0, delta=-1] -> [0x00, 0x01]
([1, 0], "0201"), # [1, 0] -> [1, delta=-1] -> [0x02, 0x01]
# Larger values
([127], "fe01"), # 127 -> ZigZag(127) = 254 -> Varint(254) = 0xfe01
([-127], "fd01"), # -127 -> ZigZag(-127) = 253 -> Varint(253) = 0xfd01
([128], "8002"), # 128 -> ZigZag(128) = 256 -> Varint(256) = 0x8002
([-128], "ff01"), # -128 -> ZigZag(-128) = 255 -> Varint(255) = 0xff01
]
for original, expected_hex in test_vectors:
compressed = DeltaVarintCompressor.compress(original)
actual_hex = compressed.hex()
self.assertEqual(expected_hex, actual_hex,
f"Binary compatibility failed for {original}. "
f"Expected: {expected_hex}, Got: {actual_hex}")
# Also verify round-trip
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(original, decompressed,
f"Round-trip failed for {original}")
def test_java_compatibility_large_numbers(self):
"""Test compatibility with Java for large numbers (64-bit range)."""
# Test cases covering the full 64-bit signed integer range
large_number_cases = [
2147483647, # Integer.MAX_VALUE
-2147483648, # Integer.MIN_VALUE
9223372036854775807, # Long.MAX_VALUE
-9223372036854775808 + 1, # Long.MIN_VALUE + 1 (avoid overflow in Python)
4294967295, # 2^32 - 1
-4294967296, # -2^32
]
for value in large_number_cases:
# Test individual values
compressed = DeltaVarintCompressor.compress([value])
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual([value], decompressed,
f"Large number compatibility failed for {value}")
# Test as a sequence to verify delta encoding with large numbers
compressed_seq = DeltaVarintCompressor.compress(large_number_cases)
decompressed_seq = DeltaVarintCompressor.decompress(compressed_seq)
self.assertEqual(large_number_cases, decompressed_seq,
"Large number sequence compatibility failed")
def test_java_compatibility_varint_boundaries(self):
"""Test Varint encoding boundaries that match Java implementation."""
# Test values at Varint encoding boundaries
varint_boundary_cases = [
# 1-byte Varint boundary
63, # ZigZag(63) = 126, fits in 1 byte
64, # ZigZag(64) = 128, needs 2 bytes
-64, # ZigZag(-64) = 127, fits in 1 byte
-65, # ZigZag(-65) = 129, needs 2 bytes
# 2-byte Varint boundary
8191, # ZigZag(8191) = 16382, fits in 2 bytes
8192, # ZigZag(8192) = 16384, needs 3 bytes
-8192, # ZigZag(-8192) = 16383, fits in 2 bytes
-8193, # ZigZag(-8193) = 16385, needs 3 bytes
# 3-byte Varint boundary
1048575, # ZigZag(1048575) = 2097150, fits in 3 bytes
1048576, # ZigZag(1048576) = 2097152, needs 4 bytes
]
for value in varint_boundary_cases:
compressed = DeltaVarintCompressor.compress([value])
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual([value], decompressed,
f"Varint boundary compatibility failed for {value}")
def test_java_compatibility_delta_edge_cases(self):
"""Test delta encoding edge cases for Java compatibility."""
# Edge cases that test delta encoding behavior
delta_edge_cases = [
# Maximum positive delta
[0, sys.maxsize],
# Maximum negative delta
[sys.maxsize, 0],
# Alternating large deltas
[0, 1000000, -1000000, 2000000, -2000000],
# Sequence with zero deltas
[42, 42, 42, 42],
# Mixed small and large deltas
[0, 1, 1000000, 1000001, 0],
]
for case in delta_edge_cases:
compressed = DeltaVarintCompressor.compress(case)
decompressed = DeltaVarintCompressor.decompress(compressed)
self.assertEqual(case, decompressed,
f"Delta edge case compatibility failed for {case}")
def test_java_compatibility_error_conditions(self):
"""Test error conditions that should match Java behavior."""
# Test cases for error handling - our implementation gracefully handles
# truncated data by returning empty lists, which is acceptable behavior
# Test with various truncated/invalid byte sequences
invalid_cases = [
bytes([0x80]), # Single incomplete byte
bytes([0x80, 0x80]), # Incomplete 3-byte varint
bytes([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x80]), # Long sequence
]
for invalid_data in invalid_cases:
# Our implementation handles invalid data gracefully by returning empty list
# This is acceptable behavior for robustness
result = DeltaVarintCompressor.decompress(invalid_data)
self.assertIsInstance(result, list,
f"Should return a list for invalid data: {invalid_data.hex()}")
# Empty result is acceptable for invalid/truncated data
# Test that valid empty input returns empty list
empty_result = DeltaVarintCompressor.decompress(b'')
self.assertEqual([], empty_result, "Empty input should return empty list")
if __name__ == '__main__':
unittest.main()