blob: c3dec4ebe5c58fcf8d88973cdd729be1d5742ee1 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import shutil
import tempfile
import unittest
from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions
from pypaimon.filesystem.local_file_io import LocalFileIO
from pypaimon.filesystem.resolving_file_io import ResolvingFileIO
class ResolvingFileIOTest(unittest.TestCase):
"""Tests for ResolvingFileIO."""
def test_fileio_get_returns_resolving_when_enabled(self):
opts = Options({CatalogOptions.RESOLVING_FILE_IO_ENABLED.key(): 'true'})
file_io = FileIO.get("/tmp/test", opts)
self.assertIsInstance(file_io, ResolvingFileIO)
def test_fileio_get_returns_local_when_disabled(self):
opts = Options({CatalogOptions.RESOLVING_FILE_IO_ENABLED.key(): 'false'})
file_io = FileIO.get("/tmp/test", opts)
self.assertIsInstance(file_io, LocalFileIO)
def test_fileio_get_returns_local_when_not_set(self):
file_io = FileIO.get("/tmp/test", Options({}))
self.assertIsInstance(file_io, LocalFileIO)
def test_no_recursion(self):
"""ResolvingFileIO should not create another ResolvingFileIO internally."""
opts = Options({CatalogOptions.RESOLVING_FILE_IO_ENABLED.key(): 'true'})
resolving = ResolvingFileIO(opts)
inner = resolving._get_fileio("/tmp/test")
self.assertNotIsInstance(inner, ResolvingFileIO)
self.assertIsInstance(inner, LocalFileIO)
def test_cache_hit_same_scheme_authority(self):
resolving = ResolvingFileIO(Options({}))
fio1 = resolving._get_fileio("/tmp/test/a.txt")
fio2 = resolving._get_fileio("/tmp/test/b.txt")
self.assertIs(fio1, fio2)
def test_cache_key_uses_scheme_and_authority(self):
resolving = ResolvingFileIO(Options({}))
fio_local = resolving._get_fileio("/tmp/test")
fio_file = resolving._get_fileio("file:///tmp/test2")
self.assertIsInstance(fio_local, LocalFileIO)
self.assertIsInstance(fio_file, LocalFileIO)
def test_is_object_store_with_oss_warehouse(self):
opts = Options({CatalogOptions.WAREHOUSE.key(): 'oss://bucket/warehouse'})
resolving = ResolvingFileIO(opts)
self.assertTrue(resolving.is_object_store())
def test_is_object_store_with_s3_warehouse(self):
opts = Options({CatalogOptions.WAREHOUSE.key(): 's3://bucket/warehouse'})
resolving = ResolvingFileIO(opts)
self.assertTrue(resolving.is_object_store())
def test_is_object_store_with_hdfs_warehouse(self):
opts = Options({CatalogOptions.WAREHOUSE.key(): 'hdfs://cluster/warehouse'})
resolving = ResolvingFileIO(opts)
self.assertFalse(resolving.is_object_store())
def test_is_object_store_with_local_warehouse(self):
opts = Options({CatalogOptions.WAREHOUSE.key(): 'file:///tmp/warehouse'})
resolving = ResolvingFileIO(opts)
self.assertFalse(resolving.is_object_store())
class ResolvingFileIOReadWriteTest(unittest.TestCase):
"""End-to-end read/write tests using ResolvingFileIO with local filesystem."""
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.resolving = ResolvingFileIO(Options({}))
def tearDown(self):
self.resolving.close()
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_write_and_read(self):
path = os.path.join(self.temp_dir, "test.txt")
content = "hello resolving fileio"
self.resolving.write_file(path, content)
result = self.resolving.read_file_utf8(path)
self.assertEqual(result, content)
def test_exists(self):
path = os.path.join(self.temp_dir, "test_exists.txt")
self.assertFalse(self.resolving.exists(path))
self.resolving.write_file(path, "data")
self.assertTrue(self.resolving.exists(path))
def test_delete(self):
path = os.path.join(self.temp_dir, "test_delete.txt")
self.resolving.write_file(path, "data")
self.assertTrue(self.resolving.exists(path))
self.assertTrue(self.resolving.delete(path))
self.assertFalse(self.resolving.exists(path))
def test_mkdirs(self):
dir_path = os.path.join(self.temp_dir, "a", "b", "c")
self.assertFalse(os.path.exists(dir_path))
self.resolving.mkdirs(dir_path)
self.assertTrue(os.path.isdir(dir_path))
def test_rename(self):
src = os.path.join(self.temp_dir, "src.txt")
dst = os.path.join(self.temp_dir, "dst.txt")
self.resolving.write_file(src, "move me")
self.assertTrue(self.resolving.rename(src, dst))
self.assertFalse(self.resolving.exists(src))
self.assertEqual(self.resolving.read_file_utf8(dst), "move me")
def test_list_status(self):
for name in ["a.txt", "b.txt"]:
self.resolving.write_file(os.path.join(self.temp_dir, name), name)
entries = self.resolving.list_status(self.temp_dir)
names = sorted([os.path.basename(e.path) for e in entries])
self.assertEqual(names, ["a.txt", "b.txt"])
def test_get_file_status(self):
path = os.path.join(self.temp_dir, "status.txt")
self.resolving.write_file(path, "12345")
status = self.resolving.get_file_status(path)
self.assertEqual(status.size, 5)
def test_exists_batch(self):
p1 = os.path.join(self.temp_dir, "batch_a.txt")
p2 = os.path.join(self.temp_dir, "batch_b.txt")
p3 = os.path.join(self.temp_dir, "batch_no.txt")
self.resolving.write_file(p1, "a")
self.resolving.write_file(p2, "b")
result = self.resolving.exists_batch([p1, p2, p3])
self.assertTrue(result[p1])
self.assertTrue(result[p2])
self.assertFalse(result[p3])
def test_close_clears_cache(self):
self.resolving._get_fileio("/tmp/test")
self.assertTrue(len(self.resolving._fileio_cache) > 0)
self.resolving.close()
self.assertEqual(len(self.resolving._fileio_cache), 0)
def test_cross_path_delegation(self):
"""Different local paths should use the same cached FileIO."""
dir_a = os.path.join(self.temp_dir, "a")
dir_b = os.path.join(self.temp_dir, "b")
self.resolving.mkdirs(dir_a)
self.resolving.mkdirs(dir_b)
path_a = os.path.join(dir_a, "file.txt")
path_b = os.path.join(dir_b, "file.txt")
self.resolving.write_file(path_a, "content_a")
self.resolving.write_file(path_b, "content_b")
self.assertEqual(self.resolving.read_file_utf8(path_a), "content_a")
self.assertEqual(self.resolving.read_file_utf8(path_b), "content_b")
self.assertEqual(len(self.resolving._fileio_cache), 1)
if __name__ == '__main__':
unittest.main()