| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| """ |
| |
| import pyarrow as pa |
| |
| from pypaimon import PaimonVirtualFileSystem, Schema |
| from pypaimon.table.object import ObjectTable |
| from pypaimon.tests.rest.rest_base_test import RESTBaseTest |
| |
| |
| class RESTObjectTableTest(RESTBaseTest): |
| |
| def setUp(self): |
| super().setUp() |
| pvfs_options = { |
| 'uri': self.options['uri'], |
| 'warehouse': self.options['warehouse'], |
| 'dlf.region': self.options['dlf.region'], |
| 'token.provider': self.options['token.provider'], |
| 'token': self.options['token'], |
| } |
| self.pvfs = PaimonVirtualFileSystem(pvfs_options) |
| |
| def _create_object_table(self, table_name, extra_options=None): |
| """Helper to create an object table with the given name. |
| |
| ObjectTable has a fixed schema and does not support custom fields. |
| Only options (including type=object-table) are needed. |
| """ |
| options = {"type": "object-table"} |
| if extra_options: |
| options.update(extra_options) |
| schema = Schema(options=options) |
| self.rest_catalog.drop_table(table_name, True) |
| self.rest_catalog.create_table(table_name, schema, False) |
| return self.rest_catalog.get_table(table_name) |
| |
| def _pvfs_table_path(self, table_name, sub_path=None): |
| """Build a pvfs:// path for the given table and optional sub-path.""" |
| warehouse = self.options['warehouse'] |
| base = "pvfs://{}/{}".format(warehouse, table_name.replace('.', '/')) |
| if sub_path: |
| return "{}/{}".format(base, sub_path) |
| return base |
| |
| def _write_file_via_pvfs(self, table_name, filename, content): |
| """Write a file into the table's location using pvfs.""" |
| path = self._pvfs_table_path(table_name, filename) |
| # Ensure parent directory exists when filename contains subdirectories |
| if '/' in filename: |
| parent_dir = self._pvfs_table_path( |
| table_name, filename.rsplit('/', 1)[0] |
| ) |
| self.pvfs.makedirs(parent_dir, exist_ok=True) |
| with self.pvfs.open(path, 'wb') as f: |
| f.write(content) |
| |
| def test_get_object_table(self): |
| table_name = "default.object_table_basic" |
| table = self._create_object_table(table_name) |
| |
| self.assertIsInstance(table, ObjectTable) |
| self.assertEqual(table.name(), "object_table_basic") |
| self.assertEqual(table.full_name(), table_name) |
| self.assertEqual(table.partition_keys, []) |
| self.assertEqual(table.primary_keys, []) |
| self.assertEqual(table.options().get("type"), "object-table") |
| |
| def test_object_table_read_files(self): |
| table_name = "default.object_table_read" |
| table = self._create_object_table(table_name) |
| |
| # Write some test files into the table's location via pvfs |
| test_files = { |
| "file_a.txt": b"hello world", |
| "file_b.dat": b"some binary data here", |
| } |
| for filename, content in test_files.items(): |
| self._write_file_via_pvfs(table_name, filename, content) |
| |
| # Read the object table |
| read_builder = table.new_read_builder() |
| scan = read_builder.new_scan() |
| plan = scan.plan() |
| splits = plan.splits() |
| |
| self.assertEqual(len(splits), 1) |
| |
| table_read = read_builder.new_read() |
| result = table_read.to_arrow(splits) |
| |
| self.assertIsInstance(result, pa.Table) |
| # Should contain the schema, snapshot and manifest files plus our test files |
| # Filter to only our test files |
| result_names = result.column("name").to_pylist() |
| for filename in test_files: |
| self.assertIn(filename, result_names) |
| |
| # Verify file sizes for our test files |
| result_dict = {} |
| for i in range(result.num_rows): |
| name = result.column("name")[i].as_py() |
| length = result.column("length")[i].as_py() |
| result_dict[name] = length |
| |
| for filename, content in test_files.items(): |
| self.assertEqual(result_dict[filename], len(content)) |
| |
| # Verify schema columns |
| self.assertIn("path", result.column_names) |
| self.assertIn("name", result.column_names) |
| self.assertIn("length", result.column_names) |
| self.assertIn("mtime", result.column_names) |
| self.assertIn("atime", result.column_names) |
| self.assertIn("owner", result.column_names) |
| |
| def test_object_table_read_with_subdirectories(self): |
| table_name = "default.object_table_subdir" |
| table = self._create_object_table(table_name) |
| |
| # Write files via pvfs, including a nested subdirectory file |
| self._write_file_via_pvfs(table_name, "root_file.txt", b"root content") |
| self._write_file_via_pvfs(table_name, "subdir/nested_file.txt", b"nested content") |
| |
| read_builder = table.new_read_builder() |
| splits = read_builder.new_scan().plan().splits() |
| result = read_builder.new_read().to_arrow(splits) |
| |
| result_names = result.column("name").to_pylist() |
| self.assertIn("root_file.txt", result_names) |
| self.assertIn("nested_file.txt", result_names) |
| |
| # Verify relative paths contain subdirectory |
| result_paths = result.column("path").to_pylist() |
| nested_paths = [p for p in result_paths if "nested_file.txt" in p] |
| self.assertTrue(len(nested_paths) > 0) |
| self.assertIn("subdir/", nested_paths[0]) |
| |
| def test_object_table_with_projection(self): |
| table_name = "default.object_table_projection" |
| table = self._create_object_table(table_name) |
| |
| self._write_file_via_pvfs(table_name, "proj_test.txt", b"test data") |
| |
| # Test projection with two columns |
| read_builder = table.new_read_builder() |
| read_builder.with_projection(["name", "length"]) |
| splits = read_builder.new_scan().plan().splits() |
| result = read_builder.new_read().to_arrow(splits) |
| |
| self.assertEqual(result.column_names, ["name", "length"]) |
| self.assertNotIn("path", result.column_names) |
| self.assertNotIn("mtime", result.column_names) |
| self.assertNotIn("atime", result.column_names) |
| self.assertNotIn("owner", result.column_names) |
| |
| # Verify data content for our test file |
| result_names = result.column("name").to_pylist() |
| self.assertIn("proj_test.txt", result_names) |
| idx = result_names.index("proj_test.txt") |
| self.assertEqual(result.column("length")[idx].as_py(), len(b"test data")) |
| |
| def test_object_table_with_limit(self): |
| table_name = "default.object_table_limit" |
| table = self._create_object_table(table_name) |
| |
| for i in range(5): |
| self._write_file_via_pvfs( |
| table_name, "file_{}.txt".format(i), "content {}".format(i).encode() |
| ) |
| |
| read_builder = table.new_read_builder() |
| read_builder.with_limit(2) |
| splits = read_builder.new_scan().plan().splits() |
| result = read_builder.new_read().to_arrow(splits) |
| |
| self.assertEqual(result.num_rows, 2) |
| |
| def test_object_table_options_and_copy(self): |
| table_name = "default.object_table_options" |
| table = self._create_object_table( |
| table_name, extra_options={"custom.key": "custom_value"} |
| ) |
| |
| self.assertEqual(table.options().get("custom.key"), "custom_value") |
| self.assertEqual(table.options().get("type"), "object-table") |
| |
| # Test copy with dynamic options |
| copied = table.copy({"new.key": "new_value"}) |
| self.assertIsInstance(copied, ObjectTable) |
| self.assertEqual(copied.options().get("custom.key"), "custom_value") |
| self.assertEqual(copied.options().get("new.key"), "new_value") |
| |
| # Original should not be modified |
| self.assertIsNone(table.options().get("new.key")) |
| |
| # Test copy with override |
| overridden = table.copy({"custom.key": "overridden"}) |
| self.assertEqual(overridden.options().get("custom.key"), "overridden") |
| self.assertEqual(table.options().get("custom.key"), "custom_value") |
| |
| def test_object_table_unsupported_write(self): |
| table_name = "default.object_table_no_write" |
| table = self._create_object_table(table_name) |
| |
| with self.assertRaises(NotImplementedError): |
| table.new_batch_write_builder() |
| with self.assertRaises(NotImplementedError): |
| table.new_stream_write_builder() |
| |
| def test_object_table_unsupported_drop_partitions(self): |
| table_name = "default.object_table_no_drop_partitions" |
| self._create_object_table(table_name) |
| |
| with self.assertRaisesRegex( |
| ValueError, |
| "drop_partitions is not supported for table type 'ObjectTable'", |
| ): |
| self.rest_catalog.drop_partitions( |
| table_name, |
| [{"dt": "20250101"}], |
| ) |
| |
| def test_object_table_to_pandas(self): |
| table_name = "default.object_table_pandas" |
| table = self._create_object_table(table_name) |
| |
| self._write_file_via_pvfs(table_name, "pandas_test.txt", b"pandas data") |
| |
| read_builder = table.new_read_builder() |
| splits = read_builder.new_scan().plan().splits() |
| result_df = read_builder.new_read().to_pandas(splits) |
| |
| self.assertIn("name", result_df.columns) |
| self.assertIn("pandas_test.txt", result_df["name"].values) |