| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| import os |
| import shutil |
| import tempfile |
| import unittest |
| from pathlib import Path |
| from unittest.mock import MagicMock, patch |
| |
| import pyarrow |
| from pyarrow.fs import S3FileSystem |
| |
| from pypaimon.common.options import Options |
| from pypaimon.filesystem.local_file_io import LocalFileIO |
| from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO |
| |
| |
| class FileIOTest(unittest.TestCase): |
| """Test cases for FileIO.to_filesystem_path method.""" |
| |
| def test_s3_filesystem_path_conversion(self): |
| """Test S3FileSystem path conversion with various formats.""" |
| file_io = PyArrowFileIO("s3://bucket/warehouse", Options({})) |
| self.assertIsInstance(file_io.filesystem, S3FileSystem) |
| |
| # Test bucket and path |
| self.assertEqual(file_io.to_filesystem_path("s3://my-bucket/path/to/file.txt"), |
| "my-bucket/path/to/file.txt") |
| self.assertEqual(file_io.to_filesystem_path("oss://my-bucket/path/to/file.txt"), |
| "my-bucket/path/to/file.txt") |
| |
| # Test bucket only |
| self.assertEqual(file_io.to_filesystem_path("s3://my-bucket"), "my-bucket") |
| self.assertEqual(file_io.to_filesystem_path("oss://my-bucket"), "my-bucket") |
| |
| # Test scheme but no netloc |
| self.assertEqual(file_io.to_filesystem_path("oss:///path/to/file.txt"), "path/to/file.txt") |
| self.assertEqual(file_io.to_filesystem_path("s3:///path/to/file.txt"), "path/to/file.txt") |
| |
| # Test empty path |
| self.assertEqual(file_io.to_filesystem_path("oss:///"), ".") |
| |
| # Test path without scheme |
| self.assertEqual(file_io.to_filesystem_path("bucket/path/to/file.txt"), |
| "bucket/path/to/file.txt") |
| |
| # Test idempotency |
| converted_path = "my-bucket/path/to/file.txt" |
| self.assertEqual(file_io.to_filesystem_path(converted_path), converted_path) |
| parent_str = str(Path(converted_path).parent) |
| self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str) |
| |
| def test_local_filesystem_path_conversion(self): |
| file_io = LocalFileIO("file:///tmp/warehouse", Options({})) |
| self.assertIsInstance(file_io, LocalFileIO) |
| |
| # Test file:// scheme |
| self.assertEqual(file_io.to_filesystem_path("file:///tmp/path/to/file.txt"), |
| "/tmp/path/to/file.txt") |
| self.assertEqual(file_io.to_filesystem_path("file:///path/to/file.txt"), |
| "/path/to/file.txt") |
| |
| # Test empty paths |
| self.assertEqual(file_io.to_filesystem_path("file://"), ".") |
| self.assertEqual(file_io.to_filesystem_path("file:///"), "/") |
| |
| # Test paths without scheme |
| self.assertEqual(file_io.to_filesystem_path("/tmp/path/to/file.txt"), |
| "/tmp/path/to/file.txt") |
| self.assertEqual(file_io.to_filesystem_path("relative/path/to/file.txt"), |
| "relative/path/to/file.txt") |
| self.assertEqual(file_io.to_filesystem_path("./relative/path/to/file.txt"), |
| "./relative/path/to/file.txt") |
| |
| # Test idempotency |
| converted_path = "/tmp/path/to/file.txt" |
| self.assertEqual(file_io.to_filesystem_path(converted_path), converted_path) |
| parent_str = str(Path(converted_path).parent) |
| self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str) |
| |
| def test_windows_path_handling(self): |
| file_io = LocalFileIO("file:///tmp/warehouse", Options({})) |
| self.assertIsInstance(file_io, LocalFileIO) |
| |
| self.assertEqual(file_io.to_filesystem_path("C:\\path\\to\\file.txt"), |
| "C:\\path\\to\\file.txt") |
| self.assertEqual(file_io.to_filesystem_path("C:/path/to/file.txt"), |
| "C:/path/to/file.txt") |
| self.assertEqual(file_io.to_filesystem_path("C:"), "C:") |
| |
| # file:// scheme with Windows drive |
| self.assertEqual(file_io.to_filesystem_path("file://C:/path/to/file.txt"), |
| "C:/path/to/file.txt") |
| self.assertEqual(file_io.to_filesystem_path("file://C:/path"), "C:/path") |
| self.assertEqual(file_io.to_filesystem_path("file://C:"), "C:") |
| self.assertEqual(file_io.to_filesystem_path("file:///C:/path/to/file.txt"), |
| "/C:/path/to/file.txt") |
| |
| # Windows path with S3FileSystem (should preserve) |
| s3_file_io = PyArrowFileIO("s3://bucket/warehouse", Options({})) |
| self.assertEqual(s3_file_io.to_filesystem_path("C:\\path\\to\\file.txt"), |
| "C:\\path\\to\\file.txt") |
| |
| def test_path_normalization(self): |
| """Test path normalization (multiple slashes).""" |
| file_io = LocalFileIO("file:///tmp/warehouse", Options({})) |
| self.assertEqual(file_io.to_filesystem_path("file://///tmp///path///file.txt"), |
| "/tmp/path/file.txt") |
| |
| s3_file_io = PyArrowFileIO("s3://bucket/warehouse", Options({})) |
| self.assertEqual(s3_file_io.to_filesystem_path("s3://my-bucket///path///to///file.txt"), |
| "my-bucket/path/to/file.txt") |
| |
| def test_write_file_with_overwrite_flag(self): |
| temp_dir = tempfile.mkdtemp(prefix="file_io_write_test_") |
| try: |
| warehouse_path = f"file://{temp_dir}" |
| file_io = LocalFileIO(warehouse_path, Options({})) |
| |
| test_file_uri = f"file://{temp_dir}/overwrite_test.txt" |
| expected_path = os.path.join(temp_dir, "overwrite_test.txt") |
| |
| # 1) Write to a new file with default overwrite=False |
| file_io.write_file(test_file_uri, "first content") |
| self.assertTrue(os.path.exists(expected_path)) |
| with open(expected_path, "r", encoding="utf-8") as f: |
| self.assertEqual(f.read(), "first content") |
| |
| # 2) Attempt to write again with overwrite=False should raise FileExistsError |
| with self.assertRaises(FileExistsError): |
| file_io.write_file(test_file_uri, "second content", overwrite=False) |
| |
| # Ensure content is unchanged |
| with open(expected_path, "r", encoding="utf-8") as f: |
| self.assertEqual(f.read(), "first content") |
| |
| # 3) Write with overwrite=True should replace the content |
| file_io.write_file(test_file_uri, "overwritten content", overwrite=True) |
| with open(expected_path, "r", encoding="utf-8") as f: |
| self.assertEqual(f.read(), "overwritten content") |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| |
| def test_exists_does_not_catch_exception(self): |
| temp_dir = tempfile.mkdtemp(prefix="file_io_exists_test_") |
| try: |
| warehouse_path = f"file://{temp_dir}" |
| file_io = LocalFileIO(warehouse_path, Options({})) |
| |
| test_file = os.path.join(temp_dir, "test_file.txt") |
| with open(test_file, "w") as f: |
| f.write("test") |
| self.assertTrue(file_io.exists(f"file://{test_file}")) |
| self.assertFalse(file_io.exists(f"file://{temp_dir}/nonexistent.txt")) |
| |
| mock_path = MagicMock(spec=Path) |
| mock_path.exists.side_effect = OSError("Permission denied") |
| with patch.object(file_io, '_to_file', return_value=mock_path): |
| with self.assertRaises(OSError) as context: |
| file_io.exists("file:///some/path") |
| self.assertIn("Permission denied", str(context.exception)) |
| |
| with patch('builtins.open', side_effect=OSError("Permission denied")): |
| with self.assertRaises(OSError): |
| file_io.new_output_stream("file:///some/path/file.txt") |
| |
| mock_path = MagicMock(spec=Path) |
| mock_path.is_dir.side_effect = OSError("Permission denied") |
| with patch.object(file_io, '_to_file', return_value=mock_path): |
| with self.assertRaises(OSError): |
| file_io.check_or_mkdirs("file:///some/path") |
| |
| with patch('builtins.open', side_effect=OSError("Permission denied")): |
| with self.assertRaises(OSError): |
| file_io.write_file("file:///some/path", "content", overwrite=False) |
| |
| with patch('builtins.open', side_effect=OSError("Permission denied")): |
| with self.assertRaises(OSError): |
| file_io.copy_file("file:///src", "file:///dst", overwrite=False) |
| |
| with patch.object(file_io, 'exists', return_value=True): |
| with patch.object(file_io, 'read_file_utf8', side_effect=OSError("Read error")): |
| with self.assertRaises(OSError) as context: |
| file_io.read_overwritten_file_utf8("file:///some/path") |
| self.assertIn("Read error", str(context.exception)) |
| |
| # rename() catches OSError and returns False (consistent with Java implementation) |
| mock_src_path = MagicMock(spec=Path) |
| mock_dst_path = MagicMock(spec=Path) |
| mock_dst_path.parent = MagicMock() |
| mock_dst_path.parent.exists.return_value = True |
| mock_dst_path.exists.return_value = False |
| mock_src_path.rename.side_effect = OSError("Network error") |
| with patch.object(file_io, '_to_file', side_effect=[mock_src_path, mock_dst_path]): |
| # rename() catches OSError and returns False, doesn't raise |
| result = file_io.rename("file:///src", "file:///dst") |
| self.assertFalse(result, "rename() should return False when OSError occurs") |
| |
| file_io.delete_quietly("file:///some/path") |
| file_io.delete_directory_quietly("file:///some/path") |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| |
| def test_delete_non_empty_directory_raises_error(self): |
| temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_") |
| try: |
| warehouse_path = f"file://{temp_dir}" |
| file_io = LocalFileIO(warehouse_path, Options({})) |
| |
| test_dir = os.path.join(temp_dir, "test_dir") |
| os.makedirs(test_dir) |
| test_file = os.path.join(test_dir, "test_file.txt") |
| with open(test_file, "w") as f: |
| f.write("test") |
| |
| with self.assertRaises(OSError) as context: |
| file_io.delete(f"file://{test_dir}", recursive=False) |
| self.assertIn("is not empty", str(context.exception)) |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| |
| def test_delete_returns_false_when_file_not_exists(self): |
| temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_") |
| try: |
| warehouse_path = f"file://{temp_dir}" |
| file_io = LocalFileIO(warehouse_path, Options({})) |
| |
| result = file_io.delete(f"file://{temp_dir}/nonexistent.txt") |
| self.assertFalse(result, "delete() should return False when file does not exist") |
| |
| result = file_io.delete(f"file://{temp_dir}/nonexistent_dir", recursive=False) |
| self.assertFalse(result, "delete() should return False when directory does not exist") |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| |
| def test_mkdirs_raises_error_when_path_is_file(self): |
| temp_dir = tempfile.mkdtemp(prefix="file_io_mkdirs_test_") |
| try: |
| warehouse_path = f"file://{temp_dir}" |
| file_io = LocalFileIO(warehouse_path, Options({})) |
| |
| test_file = os.path.join(temp_dir, "test_file.txt") |
| with open(test_file, "w") as f: |
| f.write("test") |
| |
| with self.assertRaises(FileExistsError) as context: |
| file_io.mkdirs(f"file://{test_file}") |
| self.assertIn("is not a directory", str(context.exception)) |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| |
| def test_rename_returns_false_when_dst_exists(self): |
| temp_dir = tempfile.mkdtemp(prefix="file_io_rename_test_") |
| try: |
| warehouse_path = f"file://{temp_dir}" |
| file_io = LocalFileIO(warehouse_path, Options({})) |
| |
| src_file = os.path.join(temp_dir, "src.txt") |
| dst_file = os.path.join(temp_dir, "dst.txt") |
| with open(src_file, "w") as f: |
| f.write("src") |
| with open(dst_file, "w") as f: |
| f.write("dst") |
| |
| result = file_io.rename(f"file://{src_file}", f"file://{dst_file}") |
| self.assertFalse(result) |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| |
| def test_get_file_status_raises_error_when_file_not_exists(self): |
| temp_dir = tempfile.mkdtemp(prefix="file_io_get_file_status_test_") |
| try: |
| warehouse_path = f"file://{temp_dir}" |
| file_io = LocalFileIO(warehouse_path, Options({})) |
| |
| with self.assertRaises(FileNotFoundError) as context: |
| file_io.get_file_status(f"file://{temp_dir}/nonexistent.txt") |
| self.assertIn("does not exist", str(context.exception)) |
| |
| test_file = os.path.join(temp_dir, "test_file.txt") |
| with open(test_file, "w") as f: |
| f.write("test content") |
| |
| file_info = file_io.get_file_status(f"file://{test_file}") |
| self.assertEqual(file_info.type, pyarrow.fs.FileType.File) |
| self.assertIsNotNone(file_info.size) |
| |
| with self.assertRaises(FileNotFoundError) as context: |
| file_io.get_file_size(f"file://{temp_dir}/nonexistent.txt") |
| self.assertIn("does not exist", str(context.exception)) |
| |
| with self.assertRaises(FileNotFoundError) as context: |
| file_io.is_dir(f"file://{temp_dir}/nonexistent_dir") |
| self.assertIn("does not exist", str(context.exception)) |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| |
| def test_copy_file(self): |
| temp_dir = tempfile.mkdtemp(prefix="file_io_copy_test_") |
| try: |
| warehouse_path = f"file://{temp_dir}" |
| file_io = LocalFileIO(warehouse_path, Options({})) |
| |
| source_file = os.path.join(temp_dir, "source.txt") |
| target_file = os.path.join(temp_dir, "target.txt") |
| |
| with open(source_file, "w") as f: |
| f.write("source content") |
| |
| # Test 1: Raises FileExistsError when target exists and overwrite=False |
| with open(target_file, "w") as f: |
| f.write("target content") |
| |
| with self.assertRaises(FileExistsError) as context: |
| file_io.copy_file(f"file://{source_file}", f"file://{target_file}", overwrite=False) |
| self.assertIn("already exists", str(context.exception)) |
| |
| with open(target_file, "r") as f: |
| self.assertEqual(f.read(), "target content") |
| |
| # Test 2: Overwrites when overwrite=True |
| file_io.copy_file(f"file://{source_file}", f"file://{target_file}", overwrite=True) |
| with open(target_file, "r") as f: |
| self.assertEqual(f.read(), "source content") |
| |
| # Test 3: Creates parent directory if it doesn't exist |
| target_file_in_subdir = os.path.join(temp_dir, "subdir", "target.txt") |
| file_io.copy_file(f"file://{source_file}", f"file://{target_file_in_subdir}", overwrite=False) |
| self.assertTrue(os.path.exists(target_file_in_subdir)) |
| with open(target_file_in_subdir, "r") as f: |
| self.assertEqual(f.read(), "source content") |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| |
| def test_try_to_write_atomic(self): |
| temp_dir = tempfile.mkdtemp(prefix="file_io_try_write_atomic_test_") |
| try: |
| target_dir = os.path.join(temp_dir, "target_dir") |
| normal_file = os.path.join(temp_dir, "normal_file.txt") |
| |
| from pypaimon.filesystem.local_file_io import LocalFileIO |
| local_file_io = LocalFileIO(f"file://{temp_dir}", Options({})) |
| os.makedirs(target_dir) |
| self.assertFalse( |
| local_file_io.try_to_write_atomic(f"file://{target_dir}", "test content"), |
| "LocalFileIO should return False when target is a directory") |
| self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") |
| |
| self.assertTrue(local_file_io.try_to_write_atomic(f"file://{normal_file}", "test content")) |
| with open(normal_file, "r") as f: |
| self.assertEqual(f.read(), "test content") |
| |
| os.remove(normal_file) |
| local_file_io = LocalFileIO(f"file://{temp_dir}", Options({})) |
| self.assertFalse( |
| local_file_io.try_to_write_atomic(f"file://{target_dir}", "test content"), |
| "LocalFileIO should return False when target is a directory") |
| self.assertEqual(len(os.listdir(target_dir)), 0, "No file should be created inside the directory") |
| |
| self.assertTrue(local_file_io.try_to_write_atomic(f"file://{normal_file}", "test content")) |
| with open(normal_file, "r") as f: |
| self.assertEqual(f.read(), "test content") |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| |
| if __name__ == '__main__': |
| unittest.main() |