| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| |
| """Tests for LocalDiskCacheManager, CachingInputStream, and CachingFileIO.""" |
| |
| import io |
| import os |
| import shutil |
| import tempfile |
| import threading |
| import unittest |
| from unittest.mock import MagicMock |
| |
| from pypaimon.filesystem.caching_file_io import ( |
| LocalDiskCacheManager, |
| CachingFileIO, |
| CachingInputStream, |
| ) |
| |
| |
| class LocalDiskCacheManagerTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.cache_dir = tempfile.mkdtemp() |
| |
| def tearDown(self): |
| shutil.rmtree(self.cache_dir, ignore_errors=True) |
| |
| def test_put_and_get(self): |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| data = b"hello world block" |
| cache.put_block("file1.index", 0, data) |
| result = cache.get_block("file1.index", 0) |
| self.assertEqual(data, result) |
| |
| def test_cache_miss(self): |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| result = cache.get_block("nonexistent", 0) |
| self.assertIsNone(result) |
| |
| def test_different_keys(self): |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| cache.put_block("file1", 0, b"block0") |
| cache.put_block("file1", 1, b"block1") |
| cache.put_block("file2", 0, b"other0") |
| |
| self.assertEqual(b"block0", cache.get_block("file1", 0)) |
| self.assertEqual(b"block1", cache.get_block("file1", 1)) |
| self.assertEqual(b"other0", cache.get_block("file2", 0)) |
| |
| def test_duplicate_put_is_noop(self): |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| cache.put_block("file1", 0, b"original") |
| cache.put_block("file1", 0, b"duplicate") |
| self.assertEqual(b"original", cache.get_block("file1", 0)) |
| |
| def test_eviction(self): |
| cache = LocalDiskCacheManager(self.cache_dir, max_size_bytes=100, block_size=64) |
| cache.put_block("f", 0, b"a" * 60) |
| cache.put_block("f", 1, b"b" * 60) |
| # Total 120 > 100, at least one block should be evicted |
| remaining_0 = cache.get_block("f", 0) |
| remaining_1 = cache.get_block("f", 1) |
| self.assertTrue(remaining_0 is None or remaining_1 is None) |
| |
| def test_scan_size_on_restart(self): |
| cache1 = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| cache1.put_block("f", 0, b"x" * 100) |
| cache1.put_block("f", 1, b"y" * 200) |
| |
| # Simulate restart: new cache instance on same directory |
| cache2 = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| self.assertEqual(300, cache2._current_size) |
| self.assertEqual(b"x" * 100, cache2.get_block("f", 0)) |
| self.assertEqual(b"y" * 200, cache2.get_block("f", 1)) |
| |
| def test_cache_dir_created(self): |
| new_dir = os.path.join(self.cache_dir, "sub", "deep") |
| LocalDiskCacheManager(new_dir, 2 ** 63 - 1, block_size=64) |
| self.assertTrue(os.path.isdir(new_dir)) |
| |
| def test_concurrent_put_get(self): |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| errors = [] |
| |
| def writer(idx): |
| try: |
| data = bytes([idx % 256]) * 100 |
| cache.put_block("concurrent", idx, data) |
| except Exception as e: |
| errors.append(e) |
| |
| def reader(idx): |
| try: |
| result = cache.get_block("concurrent", idx) |
| if result is not None: |
| expected = bytes([idx % 256]) * 100 |
| assert result == expected, f"Mismatch at {idx}" |
| except Exception as e: |
| errors.append(e) |
| |
| threads = [] |
| for i in range(20): |
| threads.append(threading.Thread(target=writer, args=(i,))) |
| threads.append(threading.Thread(target=reader, args=(i,))) |
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
| |
| self.assertEqual([], errors) |
| |
| def test_unlimited_cache_skips_eviction(self): |
| cache = LocalDiskCacheManager(self.cache_dir, max_size_bytes=2 ** 63 - 1, block_size=64) |
| for i in range(50): |
| cache.put_block("f", i, b"x" * 100) |
| # All 50 blocks should still be present |
| for i in range(50): |
| self.assertIsNotNone(cache.get_block("f", i)) |
| |
| |
| class CachingInputStreamTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.cache_dir = tempfile.mkdtemp() |
| |
| def tearDown(self): |
| shutil.rmtree(self.cache_dir, ignore_errors=True) |
| |
| def _make_file_io(self, data, path="test"): |
| file_io = MagicMock() |
| file_io.new_input_stream.side_effect = lambda p: io.BytesIO(data) |
| file_io.get_file_size.side_effect = lambda p: len(data) |
| return file_io |
| |
| def test_read_entire_file(self): |
| data = b"abcdefghijklmnop" |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| stream = CachingInputStream(self._make_file_io(data), "test", cache) |
| result = stream.read() |
| self.assertEqual(data, result) |
| |
| def test_read_with_size(self): |
| data = b"abcdefghijklmnop" |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| stream = CachingInputStream(self._make_file_io(data), "test", cache) |
| self.assertEqual(b"abcde", stream.read(5)) |
| self.assertEqual(b"fghij", stream.read(5)) |
| self.assertEqual(b"klmnop", stream.read(10)) |
| |
| def test_seek_and_read(self): |
| data = b"0123456789abcdef" |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| stream = CachingInputStream(self._make_file_io(data), "test", cache) |
| stream.seek(10) |
| self.assertEqual(b"abcdef", stream.read()) |
| |
| def test_seek_whence_1(self): |
| data = b"0123456789" |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| stream = CachingInputStream(self._make_file_io(data), "test", cache) |
| stream.read(3) |
| stream.seek(2, 1) # relative |
| self.assertEqual(5, stream.tell()) |
| self.assertEqual(b"56789", stream.read()) |
| |
| def test_seek_whence_2(self): |
| data = b"0123456789" |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| stream = CachingInputStream(self._make_file_io(data), "test", cache) |
| stream.seek(-3, 2) # from end |
| self.assertEqual(b"789", stream.read()) |
| |
| def test_read_spanning_multiple_blocks(self): |
| data = bytes(range(256)) * 4 # 1024 bytes |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=100) |
| stream = CachingInputStream(self._make_file_io(data), "test", cache) |
| # Read across block boundary: block 0 ends at 100, block 1 starts at 100 |
| stream.seek(90) |
| result = stream.read(30) # 90..120, spans blocks 0 and 1 |
| self.assertEqual(data[90:120], result) |
| |
| def test_cache_hit_avoids_remote_read(self): |
| data = b"0123456789abcdef" |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| |
| # First read populates cache |
| file_io1 = self._make_file_io(data) |
| stream1 = CachingInputStream(file_io1, "test", cache) |
| stream1.read() |
| |
| # Second stream: remote should not be called because all blocks are cached |
| file_io2 = MagicMock() |
| file_io2.get_file_size.side_effect = lambda p: len(data) |
| stream2 = CachingInputStream(file_io2, "test", cache) |
| result = stream2.read() |
| self.assertEqual(data, result) |
| file_io2.new_input_stream.assert_not_called() |
| |
| def test_read_empty(self): |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| stream = CachingInputStream(self._make_file_io(b""), "test", cache) |
| self.assertEqual(b"", stream.read()) |
| |
| def test_read_at_eof(self): |
| data = b"abc" |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| stream = CachingInputStream(self._make_file_io(data), "test", cache) |
| stream.read() |
| self.assertEqual(b"", stream.read(10)) |
| |
| def test_context_manager(self): |
| data = b"test" |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| file_io = self._make_file_io(data) |
| with CachingInputStream(file_io, "test", cache): |
| pass |
| |
| def test_partial_last_block(self): |
| data = b"abcdefghij" # 10 bytes, block_size=8 -> block0=8, block1=2 |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=8) |
| stream = CachingInputStream(self._make_file_io(data), "test", cache) |
| result = stream.read() |
| self.assertEqual(data, result) |
| # Verify block 1 is only 2 bytes |
| self.assertEqual(b"ij", cache.get_block("test", 1)) |
| |
| |
| class CachingFileIOTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.cache_dir = tempfile.mkdtemp() |
| |
| def tearDown(self): |
| shutil.rmtree(self.cache_dir, ignore_errors=True) |
| |
| def _make_delegate(self, path_to_data): |
| delegate = MagicMock() |
| |
| def new_input_stream(path): |
| return io.BytesIO(path_to_data.get(path, b"")) |
| |
| def get_file_size(path): |
| return len(path_to_data.get(path, b"")) |
| |
| delegate.new_input_stream.side_effect = new_input_stream |
| delegate.get_file_size.side_effect = get_file_size |
| return delegate |
| |
| def test_meta_file_is_cached(self): |
| data = b"snapshot data" |
| delegate = self._make_delegate({"snapshot-1": data}) |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache) |
| |
| # First read |
| with caching_io.new_input_stream("snapshot-1") as s: |
| result1 = s.read() |
| self.assertEqual(data, result1) |
| |
| # Second read should hit cache (returns CachingInputStream) |
| with caching_io.new_input_stream("snapshot-1") as s: |
| result2 = s.read() |
| self.assertEqual(data, result2) |
| # get_file_size called only once — second stream gets size from cache |
| self.assertEqual(1, delegate.get_file_size.call_count) |
| |
| def test_manifest_file_is_cached(self): |
| data = b"manifest data" |
| delegate = self._make_delegate({"manifest-abc": data}) |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache) |
| |
| with caching_io.new_input_stream("manifest-abc") as s: |
| self.assertEqual(data, s.read()) |
| |
| def test_global_index_file_is_cached(self): |
| data = b"index data" |
| delegate = self._make_delegate({"global-index-uuid.index": data}) |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache) |
| |
| with caching_io.new_input_stream("global-index-uuid.index") as s: |
| self.assertEqual(data, s.read()) |
| |
| def test_bucket_index_file_not_cached_by_default(self): |
| data = b"bucket index" |
| delegate = self._make_delegate({"index-uuid-0": data}) |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache) |
| |
| result = caching_io.new_input_stream("index-uuid-0") |
| self.assertNotIsInstance(result, CachingInputStream) |
| self.assertEqual(data, result.read()) |
| delegate.get_file_size.assert_not_called() |
| |
| def test_bucket_index_cached_when_in_whitelist(self): |
| from pypaimon.utils.file_type import FileType |
| data = b"bucket index" |
| delegate = self._make_delegate({"index-uuid-0": data}) |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| whitelist = {FileType.META, FileType.GLOBAL_INDEX, FileType.BUCKET_INDEX} |
| caching_io = CachingFileIO(delegate, cache, whitelist) |
| |
| with caching_io.new_input_stream("index-uuid-0") as s: |
| self.assertEqual(data, s.read()) |
| |
| def test_custom_whitelist_meta_only(self): |
| from pypaimon.utils.file_type import FileType |
| delegate = self._make_delegate({ |
| "snapshot-1": b"snap", |
| "global-index-uuid.index": b"idx", |
| }) |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache, {FileType.META}) |
| |
| with caching_io.new_input_stream("snapshot-1") as s: |
| self.assertIsInstance(s, CachingInputStream) |
| |
| result = caching_io.new_input_stream("global-index-uuid.index") |
| self.assertNotIsInstance(result, CachingInputStream) |
| |
| def test_data_file_not_cached(self): |
| data = b"data content" |
| delegate = self._make_delegate({"data-abc.orc": data}) |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache) |
| |
| result = caching_io.new_input_stream("data-abc.orc") |
| # Should be raw BytesIO, not CachingInputStream |
| self.assertNotIsInstance(result, CachingInputStream) |
| self.assertEqual(data, result.read()) |
| # get_file_size should NOT be called for data files |
| delegate.get_file_size.assert_not_called() |
| |
| def test_file_index_not_cached(self): |
| data = b"file index content" |
| delegate = self._make_delegate({"data-abc.orc.index": data}) |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache) |
| |
| result = caching_io.new_input_stream("data-abc.orc.index") |
| self.assertNotIsInstance(result, CachingInputStream) |
| self.assertEqual(data, result.read()) |
| delegate.get_file_size.assert_not_called() |
| |
| def test_delegate_methods_forwarded(self): |
| delegate = MagicMock() |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache) |
| |
| caching_io.exists("/some/path") |
| delegate.exists.assert_called_once_with("/some/path") |
| |
| caching_io.mkdirs("/some/dir") |
| delegate.mkdirs.assert_called_once_with("/some/dir") |
| |
| caching_io.delete("/some/file", recursive=True) |
| delegate.delete.assert_called_once_with("/some/file", True) |
| |
| caching_io.rename("/src", "/dst") |
| delegate.rename.assert_called_once_with("/src", "/dst") |
| |
| caching_io.get_file_status("/path") |
| delegate.get_file_status.assert_called_once_with("/path") |
| |
| caching_io.list_status("/dir") |
| delegate.list_status.assert_called_once_with("/dir") |
| |
| caching_io.new_output_stream("/out") |
| delegate.new_output_stream.assert_called_once_with("/out") |
| |
| def test_to_filesystem_path_forwarded_to_delegate(self): |
| delegate = MagicMock() |
| delegate.to_filesystem_path.side_effect = lambda p: p.replace("oss://", "") |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache) |
| |
| self.assertEqual("bucket/key", caching_io.to_filesystem_path("oss://bucket/key")) |
| delegate.to_filesystem_path.assert_called_once_with("oss://bucket/key") |
| |
| def test_write_parquet_when_enable_local_cache(self): |
| import pyarrow as pa |
| from pypaimon.filesystem.local_file_io import LocalFileIO |
| |
| delegate = LocalFileIO() |
| cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, block_size=64) |
| caching_io = CachingFileIO(delegate, cache) |
| |
| out_path = os.path.join(self.cache_dir, "data.parquet") |
| table = pa.table({"a": [1, 2, 3]}) |
| caching_io.write_parquet(out_path, table) |
| |
| self.assertTrue(os.path.exists(out_path)) |
| self.assertEqual(table, pa.parquet.read_table(out_path)) |
| |
| def test_file_io_for_data_wraps_cache_when_data_token_enabled(self): |
| from pypaimon.catalog.rest.rest_catalog import RESTCatalog |
| from pypaimon.common.identifier import Identifier |
| from pypaimon.common.options.options import Options |
| |
| catalog = MagicMock(spec=RESTCatalog) |
| catalog.data_token_enabled = True |
| catalog.fuse_enabled = False |
| catalog._fuse_resolver = None |
| catalog.context = MagicMock() |
| catalog.context.options = Options({ |
| 'local-cache.enabled': 'true', |
| 'local-cache.dir': self.cache_dir, |
| 'local-cache.whitelist': 'meta,global-index,data', |
| }) |
| catalog._cache_manager = CachingFileIO.create_cache_manager( |
| catalog.context.options) |
| catalog.file_io_for_data = RESTCatalog.file_io_for_data.__get__( |
| catalog, RESTCatalog) |
| |
| file_io = catalog.file_io_for_data( |
| "oss://catalog/db1/table1", Identifier.create("db1", "table1")) |
| |
| self.assertIsInstance( |
| file_io, CachingFileIO, |
| msg="Cache wrap should apply even when data-token.enabled=true; " |
| "currently bypassed in DLF mode (RESTTokenFileIO returned).") |
| |
| def test_default_memory_cache_max_size_capped(self): |
| from pypaimon.common.options.options import Options |
| from pypaimon.filesystem.caching_file_io import LocalMemoryCacheManager |
| |
| cache = CachingFileIO.create_cache_manager(Options({ |
| 'local-cache.enabled': 'true', |
| })) |
| self.assertIsInstance(cache, LocalMemoryCacheManager) |
| self.assertEqual( |
| cache._max_size_bytes, 256 * 1024 * 1024, |
| msg="Memory cache without explicit max-size should default to " |
| "256 MB, not unlimited (OOM risk).") |
| |
| def test_default_disk_cache_max_size_capped(self): |
| from pypaimon.common.options.options import Options |
| from pypaimon.filesystem.caching_file_io import LocalDiskCacheManager |
| |
| cache = CachingFileIO.create_cache_manager(Options({ |
| 'local-cache.enabled': 'true', |
| 'local-cache.dir': self.cache_dir, |
| })) |
| self.assertIsInstance(cache, LocalDiskCacheManager) |
| self.assertEqual( |
| cache._max_size_bytes, 10 * 1024 * 1024 * 1024, |
| msg="Disk cache without explicit max-size should default to " |
| "10 GB, not unlimited (disk-full risk).") |
| |
| |
| class ConfigOptionsTest(unittest.TestCase): |
| |
| def test_local_cache_options_defaults(self): |
| from pypaimon.common.options import Options |
| from pypaimon.common.options.core_options import CoreOptions |
| |
| opts = CoreOptions(Options({})) |
| self.assertFalse(opts.local_cache_enabled()) |
| self.assertIsNone(opts.local_cache_dir()) |
| self.assertIsNone(opts.local_cache_max_size()) |
| self.assertEqual(1 * 1024 * 1024, opts.local_cache_block_size().get_bytes()) |
| self.assertEqual("meta,global-index", opts.local_cache_whitelist()) |
| |
| def test_local_cache_options_custom(self): |
| from pypaimon.common.options import Options |
| from pypaimon.common.options.core_options import CoreOptions |
| |
| opts = CoreOptions(Options({ |
| "local-cache.enabled": "true", |
| "local-cache.dir": "/custom/cache", |
| "local-cache.max-size": "2gb", |
| "local-cache.block-size": "4mb", |
| "local-cache.whitelist": "meta,global-index,bucket-index", |
| })) |
| self.assertTrue(opts.local_cache_enabled()) |
| self.assertEqual("/custom/cache", opts.local_cache_dir()) |
| self.assertEqual(2 * 1024 * 1024 * 1024, opts.local_cache_max_size().get_bytes()) |
| self.assertEqual(4 * 1024 * 1024, opts.local_cache_block_size().get_bytes()) |
| self.assertEqual("meta,global-index,bucket-index", opts.local_cache_whitelist()) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |