blob: c7d30e1e7e76dc373786ab64cb1ef34bbcd4b8a0 [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import pyarrow as pa
from pypaimon import PaimonVirtualFileSystem, Schema
from pypaimon.table.object import ObjectTable
from pypaimon.tests.rest.rest_base_test import RESTBaseTest
class RESTObjectTableTest(RESTBaseTest):
def setUp(self):
super().setUp()
pvfs_options = {
'uri': self.options['uri'],
'warehouse': self.options['warehouse'],
'dlf.region': self.options['dlf.region'],
'token.provider': self.options['token.provider'],
'token': self.options['token'],
}
self.pvfs = PaimonVirtualFileSystem(pvfs_options)
def _create_object_table(self, table_name, extra_options=None):
"""Helper to create an object table with the given name.
ObjectTable has a fixed schema and does not support custom fields.
Only options (including type=object-table) are needed.
"""
options = {"type": "object-table"}
if extra_options:
options.update(extra_options)
schema = Schema(options=options)
self.rest_catalog.drop_table(table_name, True)
self.rest_catalog.create_table(table_name, schema, False)
return self.rest_catalog.get_table(table_name)
def _pvfs_table_path(self, table_name, sub_path=None):
"""Build a pvfs:// path for the given table and optional sub-path."""
warehouse = self.options['warehouse']
base = "pvfs://{}/{}".format(warehouse, table_name.replace('.', '/'))
if sub_path:
return "{}/{}".format(base, sub_path)
return base
def _write_file_via_pvfs(self, table_name, filename, content):
"""Write a file into the table's location using pvfs."""
path = self._pvfs_table_path(table_name, filename)
# Ensure parent directory exists when filename contains subdirectories
if '/' in filename:
parent_dir = self._pvfs_table_path(
table_name, filename.rsplit('/', 1)[0]
)
self.pvfs.makedirs(parent_dir, exist_ok=True)
with self.pvfs.open(path, 'wb') as f:
f.write(content)
def test_get_object_table(self):
table_name = "default.object_table_basic"
table = self._create_object_table(table_name)
self.assertIsInstance(table, ObjectTable)
self.assertEqual(table.name(), "object_table_basic")
self.assertEqual(table.full_name(), table_name)
self.assertEqual(table.partition_keys, [])
self.assertEqual(table.primary_keys, [])
self.assertEqual(table.options().get("type"), "object-table")
def test_object_table_read_files(self):
table_name = "default.object_table_read"
table = self._create_object_table(table_name)
# Write some test files into the table's location via pvfs
test_files = {
"file_a.txt": b"hello world",
"file_b.dat": b"some binary data here",
}
for filename, content in test_files.items():
self._write_file_via_pvfs(table_name, filename, content)
# Read the object table
read_builder = table.new_read_builder()
scan = read_builder.new_scan()
plan = scan.plan()
splits = plan.splits()
self.assertEqual(len(splits), 1)
table_read = read_builder.new_read()
result = table_read.to_arrow(splits)
self.assertIsInstance(result, pa.Table)
# Should contain the schema, snapshot and manifest files plus our test files
# Filter to only our test files
result_names = result.column("name").to_pylist()
for filename in test_files:
self.assertIn(filename, result_names)
# Verify file sizes for our test files
result_dict = {}
for i in range(result.num_rows):
name = result.column("name")[i].as_py()
length = result.column("length")[i].as_py()
result_dict[name] = length
for filename, content in test_files.items():
self.assertEqual(result_dict[filename], len(content))
# Verify schema columns
self.assertIn("path", result.column_names)
self.assertIn("name", result.column_names)
self.assertIn("length", result.column_names)
self.assertIn("mtime", result.column_names)
self.assertIn("atime", result.column_names)
self.assertIn("owner", result.column_names)
def test_object_table_read_with_subdirectories(self):
table_name = "default.object_table_subdir"
table = self._create_object_table(table_name)
# Write files via pvfs, including a nested subdirectory file
self._write_file_via_pvfs(table_name, "root_file.txt", b"root content")
self._write_file_via_pvfs(table_name, "subdir/nested_file.txt", b"nested content")
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
result_names = result.column("name").to_pylist()
self.assertIn("root_file.txt", result_names)
self.assertIn("nested_file.txt", result_names)
# Verify relative paths contain subdirectory
result_paths = result.column("path").to_pylist()
nested_paths = [p for p in result_paths if "nested_file.txt" in p]
self.assertTrue(len(nested_paths) > 0)
self.assertIn("subdir/", nested_paths[0])
def test_object_table_with_projection(self):
table_name = "default.object_table_projection"
table = self._create_object_table(table_name)
self._write_file_via_pvfs(table_name, "proj_test.txt", b"test data")
# Test projection with two columns
read_builder = table.new_read_builder()
read_builder.with_projection(["name", "length"])
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
self.assertEqual(result.column_names, ["name", "length"])
self.assertNotIn("path", result.column_names)
self.assertNotIn("mtime", result.column_names)
self.assertNotIn("atime", result.column_names)
self.assertNotIn("owner", result.column_names)
# Verify data content for our test file
result_names = result.column("name").to_pylist()
self.assertIn("proj_test.txt", result_names)
idx = result_names.index("proj_test.txt")
self.assertEqual(result.column("length")[idx].as_py(), len(b"test data"))
def test_object_table_with_limit(self):
table_name = "default.object_table_limit"
table = self._create_object_table(table_name)
for i in range(5):
self._write_file_via_pvfs(
table_name, "file_{}.txt".format(i), "content {}".format(i).encode()
)
read_builder = table.new_read_builder()
read_builder.with_limit(2)
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
self.assertEqual(result.num_rows, 2)
def test_object_table_options_and_copy(self):
table_name = "default.object_table_options"
table = self._create_object_table(
table_name, extra_options={"custom.key": "custom_value"}
)
self.assertEqual(table.options().get("custom.key"), "custom_value")
self.assertEqual(table.options().get("type"), "object-table")
# Test copy with dynamic options
copied = table.copy({"new.key": "new_value"})
self.assertIsInstance(copied, ObjectTable)
self.assertEqual(copied.options().get("custom.key"), "custom_value")
self.assertEqual(copied.options().get("new.key"), "new_value")
# Original should not be modified
self.assertIsNone(table.options().get("new.key"))
# Test copy with override
overridden = table.copy({"custom.key": "overridden"})
self.assertEqual(overridden.options().get("custom.key"), "overridden")
self.assertEqual(table.options().get("custom.key"), "custom_value")
def test_object_table_unsupported_write(self):
table_name = "default.object_table_no_write"
table = self._create_object_table(table_name)
with self.assertRaises(NotImplementedError):
table.new_batch_write_builder()
with self.assertRaises(NotImplementedError):
table.new_stream_write_builder()
def test_object_table_unsupported_drop_partitions(self):
table_name = "default.object_table_no_drop_partitions"
self._create_object_table(table_name)
with self.assertRaisesRegex(
ValueError,
"drop_partitions is not supported for table type 'ObjectTable'",
):
self.rest_catalog.drop_partitions(
table_name,
[{"dt": "20250101"}],
)
def test_object_table_to_pandas(self):
table_name = "default.object_table_pandas"
table = self._create_object_table(table_name)
self._write_file_via_pvfs(table_name, "pandas_test.txt", b"pandas data")
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result_df = read_builder.new_read().to_pandas(splits)
self.assertIn("name", result_df.columns)
self.assertIn("pandas_test.txt", result_df["name"].values)