blob: b6f168ac58238b5969a9acd9aa4feb21a808ce46 [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
import unittest
import uuid
import pyarrow.fs as pafs
from pyarrow.fs import PyFileSystem
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions
from pypaimon.filesystem.jindo_file_system_handler import JindoFileSystemHandler, JINDO_AVAILABLE
class JindoFileSystemTest(unittest.TestCase):
"""Test cases for JindoFileSystem."""
def setUp(self):
"""Set up test fixtures."""
if not JINDO_AVAILABLE:
self.skipTest("pyjindo is not available")
# Get OSS credentials from environment variables or use test values
bucket = os.environ.get("OSS_TEST_BUCKET")
access_key_id = os.environ.get("OSS_ACCESS_KEY_ID")
access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET")
endpoint = os.environ.get("OSS_ENDPOINT")
if not bucket:
self.skipTest("test bucket is not configured")
return
if not access_key_id:
self.skipTest("test access key id is not configured")
return
if not access_key_secret:
self.skipTest("test access key secret is not configured")
return
if not endpoint:
self.skipTest("test endpoint is not configured")
return
self.root_path = f"oss://{bucket}/"
self.catalog_options = Options({
OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id,
OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret,
OssOptions.OSS_ENDPOINT.key(): endpoint,
})
# Create JindoFileSystemHandler instance
fs_handler = JindoFileSystemHandler(self.root_path, self.catalog_options)
self.fs = PyFileSystem(fs_handler)
# Create unique test prefix to avoid conflicts
self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/"
def tearDown(self):
"""Clean up test files and directories."""
# Delete the entire test prefix directory
test_dir = f"{self.root_path}{self.test_prefix}"
try:
file_info = self.fs.get_file_info(test_dir)
if file_info.type == pafs.FileType.Directory:
try:
self.fs.delete_dir(test_dir)
except Exception:
pass
except Exception:
pass # Ignore cleanup errors
def _get_test_path(self, name: str) -> str:
"""Get a test path under test_prefix."""
return f"{self.root_path}{self.test_prefix}{name}"
def test_get_root_file_info(self):
"""Test get_file_info for root path."""
# Verify directory exists using get_file_info
file_info = self.fs.get_file_info(self.root_path)
# Verify the returned file info
self.assertIsInstance(file_info, pafs.FileInfo)
self.assertEqual(file_info.type, pafs.FileType.Directory)
def test_create_dir_recursive(self):
"""Test create_dir with recursive=True."""
test_dir = self._get_test_path("nested/deep/dir/")
# Create nested directory
self.fs.create_dir(test_dir, recursive=True)
# Verify directory exists
file_info = self.fs.get_file_info(test_dir)
self.assertEqual(file_info.type, pafs.FileType.Directory)
def test_write_and_read_small_file(self):
"""Test writing and reading a small file (10 bytes) with random data."""
test_file = self._get_test_path("small-file.txt")
test_data = os.urandom(10)
# Write file
out_stream = self.fs.open_output_stream(test_file)
out_stream.write(test_data)
out_stream.close()
# Read file
with self.fs.open_input_file(test_file) as in_file:
read_data = in_file.read()
# Verify data correctness
self.assertEqual(test_data, read_data)
self.assertEqual(len(read_data), 10)
# Verify file info
file_info = self.fs.get_file_info(test_file)
self.assertEqual(file_info.type, pafs.FileType.File)
self.assertEqual(file_info.size, 10)
def test_write_and_read_small_file_with_context(self):
"""Test writing and reading a small file (10 bytes) with random data using context manager."""
test_file = self._get_test_path("small-file-with-context.txt")
test_data = os.urandom(10)
# Write file using context manager
with self.fs.open_output_stream(test_file) as out_stream:
out_stream.write(test_data)
# Read file
with self.fs.open_input_file(test_file) as in_file:
read_data = in_file.read()
# Verify data correctness
self.assertEqual(test_data, read_data)
self.assertEqual(len(read_data), 10)
# Verify file info
file_info = self.fs.get_file_info(test_file)
self.assertEqual(file_info.type, pafs.FileType.File)
self.assertEqual(file_info.size, 10)
def test_write_and_read_large_file(self):
"""Test writing and reading a large file (20MB) with random data."""
test_file = self._get_test_path("large-file.bin")
file_size = 20 * 1024 * 1024 # 20MB
test_data = os.urandom(file_size)
# Write file in chunks
chunk_size = 1024 * 1024 # 1MB chunks
out_stream = self.fs.open_output_stream(test_file)
for i in range(0, len(test_data), chunk_size):
chunk = test_data[i:i + chunk_size]
out_stream.write(chunk)
out_stream.close()
# Verify file info
file_info = self.fs.get_file_info(test_file)
self.assertEqual(file_info.type, pafs.FileType.File)
self.assertEqual(file_info.size, file_size)
# Test JindoInputFile all methods
with self.fs.open_input_file(test_file) as in_file:
# Test tell() - should be at position 0 initially
self.assertEqual(in_file.tell(), 0)
# Test read() with specific size
chunk1 = in_file.read(1024)
self.assertEqual(len(chunk1), 1024)
self.assertEqual(in_file.tell(), 1024)
self.assertEqual(chunk1, test_data[0:1024])
# Test seek() - seek to middle of file
middle_pos = file_size // 2
in_file.seek(middle_pos)
self.assertEqual(in_file.tell(), middle_pos)
# Test read_at()
read_at_data = in_file.read_at(1024, 0)
self.assertEqual(len(read_at_data), 1024)
self.assertEqual(read_at_data, test_data[0:1024])
# Test read_at() at different offset
offset = file_size - 2048
read_at_data2 = in_file.read_at(1024, offset)
self.assertEqual(len(read_at_data2), 1024)
self.assertEqual(read_at_data2, test_data[offset:offset + 1024])
# Test seek() to end
in_file.seek(file_size)
self.assertEqual(in_file.tell(), file_size)
# Test read() at end - should return empty bytes
empty_data = in_file.read(100)
self.assertEqual(len(empty_data), 0)
# Test read() without size - read remaining
in_file.seek(0)
all_data = in_file.read()
self.assertEqual(len(all_data), file_size)
self.assertEqual(all_data, test_data)
# Verify complete file read
with self.fs.open_input_file(test_file) as in_file:
complete_data = in_file.read()
self.assertEqual(complete_data, test_data)
self.assertEqual(len(complete_data), file_size)
def test_get_file_info_single_path(self):
"""Test get_file_info with single path."""
test_file = self._get_test_path("info-test.txt")
test_data = b"test content"
# Write file
out_stream = self.fs.open_output_stream(test_file)
out_stream.write(test_data)
out_stream.close()
# Get file info
file_info = self.fs.get_file_info(test_file)
self.assertIsInstance(file_info, pafs.FileInfo)
self.assertEqual(file_info.type, pafs.FileType.File)
self.assertEqual(file_info.size, len(test_data))
def test_get_file_info_list(self):
"""Test get_file_info with list of paths."""
test_file = self._get_test_path("info-test1.txt")
test_dir = self._get_test_path("dir1")
# Write files
out_stream = self.fs.open_output_stream(test_file)
out_stream.write(b"content1")
out_stream.close()
self.fs.create_dir(test_dir)
# Get file info for list
file_infos = self.fs.get_file_info([test_file, test_dir])
self.assertIsInstance(file_infos, list)
self.assertEqual(len(file_infos), 2)
self.assertEqual(file_infos[0].type, pafs.FileType.File)
self.assertTrue(test_file in file_infos[0].path)
self.assertEqual(file_infos[1].type, pafs.FileType.Directory)
self.assertTrue(test_dir in file_infos[1].path)
def test_get_file_info_with_selector(self):
"""Test get_file_info with FileSelector."""
test_dir = self._get_test_path("selector-test/")
test_file1 = self._get_test_path("selector-test/file1.txt")
test_file2 = self._get_test_path("selector-test/file2.txt")
# Create files
self.fs.create_dir(test_dir)
out_stream1 = self.fs.open_output_stream(test_file1)
out_stream1.write(b"content1")
out_stream1.close()
out_stream2 = self.fs.open_output_stream(test_file2)
out_stream2.write(b"content2")
out_stream2.close()
# Test non-recursive listing
selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False)
file_infos = self.fs.get_file_info(selector)
self.assertIsInstance(file_infos, list)
self.assertEqual(len(file_infos), 2)
# Verify we got the files
file_names = [info.path for info in file_infos]
self.assertTrue(any("file1.txt" in name for name in file_names))
self.assertTrue(any("file2.txt" in name for name in file_names))
def test_get_file_info_with_selector_recursive(self):
"""Test get_file_info with FileSelector recursive=True."""
test_dir = self._get_test_path("selector-recursive/")
test_subdir = self._get_test_path("selector-recursive/subdir/")
test_file1 = self._get_test_path("selector-recursive/file1.txt")
test_file2 = self._get_test_path("selector-recursive/subdir/file2.txt")
# Create directory structure
self.fs.create_dir(test_dir)
self.fs.create_dir(test_subdir)
out_stream1 = self.fs.open_output_stream(test_file1)
out_stream1.write(b"content1")
out_stream1.close()
out_stream2 = self.fs.open_output_stream(test_file2)
out_stream2.write(b"content2")
out_stream2.close()
# Test recursive listing
selector = pafs.FileSelector(test_dir, recursive=True, allow_not_found=False)
file_infos = self.fs.get_file_info(selector)
self.assertIsInstance(file_infos, list)
self.assertEqual(len(file_infos), 3)
def test_get_file_info_not_found(self):
"""Test get_file_info for non-existent file."""
non_existent = self._get_test_path("non-existent-file.txt")
file_info = self.fs.get_file_info(non_existent)
self.assertEqual(file_info.type, pafs.FileType.NotFound)
# Try to open non-existent file should raise FileNotFoundError
with self.assertRaises(FileNotFoundError):
with self.fs.open_input_file(non_existent) as in_file:
in_file.read()
def test_get_file_info_selector_allow_not_found(self):
"""Test get_file_info with FileSelector allow_not_found=True."""
non_existent_dir = self._get_test_path("non-existent-dir/")
# Test with allow_not_found=True
selector = pafs.FileSelector(non_existent_dir, recursive=False, allow_not_found=True)
file_infos = self.fs.get_file_info(selector)
self.assertIsInstance(file_infos, list)
self.assertEqual(len(file_infos), 0)
def test_delete_file(self):
"""Test delete_file method."""
test_file = self._get_test_path("delete-test.txt")
# Create file
out_stream = self.fs.open_output_stream(test_file)
out_stream.write(b"test content")
out_stream.close()
# Verify file exists
file_info = self.fs.get_file_info(test_file)
self.assertEqual(file_info.type, pafs.FileType.File)
# Delete file
self.fs.delete_file(test_file)
# Verify file is deleted - should raise FileNotFoundError when accessing
with self.assertRaises(FileNotFoundError):
with self.fs.open_input_file(test_file) as in_file:
in_file.read()
def test_delete_dir(self):
"""Test delete_dir method."""
test_dir = self._get_test_path("delete-dir/")
# Create directory
self.fs.create_dir(test_dir)
# Verify directory exists
file_info = self.fs.get_file_info(test_dir)
self.assertEqual(file_info.type, pafs.FileType.Directory)
# Delete directory
self.fs.delete_dir(test_dir)
# Verify directory is deleted - should raise FileNotFoundError when accessing
with self.assertRaises(FileNotFoundError):
# Try to list directory contents
selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False)
self.fs.get_file_info(selector)
def test_delete_dir_contents(self):
"""Test delete_dir_contents method."""
test_dir = self._get_test_path("delete-contents/")
test_file1 = self._get_test_path("delete-contents/file1.txt")
test_file2 = self._get_test_path("delete-contents/file2.txt")
test_subdir = self._get_test_path("delete-contents/subdir/")
test_file3 = self._get_test_path("delete-contents/subdir/file3.txt")
# Create directory structure
self.fs.create_dir(test_dir)
out_stream1 = self.fs.open_output_stream(test_file1)
out_stream1.write(b"content1")
out_stream1.close()
out_stream2 = self.fs.open_output_stream(test_file2)
out_stream2.write(b"content2")
out_stream2.close()
self.fs.create_dir(test_subdir)
out_stream3 = self.fs.open_output_stream(test_file3)
out_stream3.write(b"content3")
out_stream3.close()
# Delete directory contents
self.fs.delete_dir_contents(test_dir)
# Verify directory still exists but is empty
file_info = self.fs.get_file_info(test_dir)
self.assertEqual(file_info.type, pafs.FileType.Directory)
selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False)
file_infos = self.fs.get_file_info(selector)
self.assertEqual(len(file_infos), 0)
# Verify files are deleted - should raise FileNotFoundError when accessing
with self.assertRaises(FileNotFoundError):
with self.fs.open_input_file(test_file1) as in_file:
in_file.read()
with self.assertRaises(FileNotFoundError):
with self.fs.open_input_file(test_file2) as in_file:
in_file.read()
with self.assertRaises(FileNotFoundError):
with self.fs.open_input_file(test_file3) as in_file:
in_file.read()
def test_move_file(self):
"""Test move method for file."""
src_file = self._get_test_path("move-src.txt")
dst_file = self._get_test_path("move-dst.txt")
test_data = b"move test content"
# Create source file
out_stream = self.fs.open_output_stream(src_file)
out_stream.write(test_data)
out_stream.close()
# Move file
self.fs.move(src_file, dst_file)
# Verify source is gone - should raise FileNotFoundError when accessing
with self.assertRaises(FileNotFoundError):
with self.fs.open_input_file(src_file) as in_file:
in_file.read()
# Verify destination exists with correct content
dst_info = self.fs.get_file_info(dst_file)
self.assertEqual(dst_info.type, pafs.FileType.File)
with self.fs.open_input_file(dst_file) as in_file:
read_data = in_file.read()
self.assertEqual(read_data, test_data)
def test_move_directory(self):
"""Test move method for directory."""
src_dir = self._get_test_path("move-src-dir/")
dst_dir = self._get_test_path("move-dst-dir/")
src_file = self._get_test_path("move-src-dir/file.txt")
# Create source directory and file
self.fs.create_dir(src_dir)
out_stream = self.fs.open_output_stream(src_file)
out_stream.write(b"test content")
out_stream.close()
# Move directory
self.fs.move(src_dir, dst_dir)
# Verify source is gone - should raise FileNotFoundError when accessing
with self.assertRaises(FileNotFoundError):
# Try to list directory contents
selector = pafs.FileSelector(src_dir, recursive=False, allow_not_found=False)
self.fs.get_file_info(selector)
# Verify destination exists
dst_info = self.fs.get_file_info(dst_dir)
self.assertEqual(dst_info.type, pafs.FileType.Directory)
def test_copy_file(self):
"""Test copy_file method."""
src_file = self._get_test_path("copy-src.txt")
dst_file = self._get_test_path("copy-dst.txt")
test_data = os.urandom(1024 * 1024) # 1MB random data
# Create source file
out_stream = self.fs.open_output_stream(src_file)
out_stream.write(test_data)
out_stream.close()
# Copy file
self.fs.copy_file(src_file, dst_file)
# Verify both files exist
src_info = self.fs.get_file_info(src_file)
self.assertEqual(src_info.type, pafs.FileType.File)
dst_info = self.fs.get_file_info(dst_file)
self.assertEqual(dst_info.type, pafs.FileType.File)
# Verify destination content matches source
with self.fs.open_input_file(dst_file) as in_file:
read_data = in_file.read()
self.assertEqual(read_data, test_data)
self.assertEqual(len(read_data), len(test_data))
def test_delete_nonexistent_file(self):
"""Test deleting non-existent file."""
non_existent = self._get_test_path("non-existent-delete.txt")
# Try to delete non-existent file
with self.assertRaises(IOError):
self.fs.delete_file(non_existent)
def test_multiple_sequential_reads(self):
"""Test multiple sequential reads from same file."""
test_file = self._get_test_path("sequential-read.txt")
test_data = os.urandom(10000) # 10KB
# Write file
out_stream = self.fs.open_output_stream(test_file)
out_stream.write(test_data)
out_stream.close()
# Read in chunks sequentially
with self.fs.open_input_file(test_file) as in_file:
chunk1 = in_file.read(1000)
chunk2 = in_file.read(2000)
chunk3 = in_file.read(3000)
chunk4 = in_file.read() # Read rest
# Verify all chunks
self.assertEqual(chunk1, test_data[0:1000])
self.assertEqual(chunk2, test_data[1000:3000])
self.assertEqual(chunk3, test_data[3000:6000])
self.assertEqual(chunk4, test_data[6000:])
# Verify total length
total_read = len(chunk1) + len(chunk2) + len(chunk3) + len(chunk4)
self.assertEqual(total_read, len(test_data))
if __name__ == '__main__':
unittest.main()