| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """End-to-end tests for Paimon system tables via pypaimon SQL. |
| |
| Exercises the `<table>$<system_name>` syntax handled by paimon-rust |
| DataFusion integration. A non-partitioned table with one snapshot is |
| created in setUpClass and queried by each test. |
| """ |
| |
| import json |
| import os |
| import tempfile |
| import unittest |
| |
| import pyarrow as pa |
| |
| from pypaimon_rust.datafusion import SQLContext |
| |
| |
| WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE") |
| TABLE_NAME = "sql_system_test_table" |
| ROW_COUNT = 3 |
| |
| |
| class SQLSystemTableTest(unittest.TestCase): |
| |
| @classmethod |
| def setUpClass(cls): |
| cls._tmpdir = None |
| if WAREHOUSE: |
| cls.warehouse = WAREHOUSE |
| else: |
| cls._tmpdir = tempfile.TemporaryDirectory(prefix="paimon-sql-systest-") |
| cls.warehouse = cls._tmpdir.name |
| |
| ctx = SQLContext() |
| ctx.register_catalog("paimon", {"warehouse": cls.warehouse}) |
| ctx.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}") |
| ctx.sql(f"CREATE TABLE {TABLE_NAME} (id INT, name STRING)") |
| ctx.sql( |
| f"INSERT INTO {TABLE_NAME} VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')" |
| ) |
| cls.ctx = ctx |
| |
| @classmethod |
| def tearDownClass(cls): |
| ctx = SQLContext() |
| ctx.register_catalog("paimon", {"warehouse": cls.warehouse}) |
| ctx.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}") |
| if cls._tmpdir is not None: |
| cls._tmpdir.cleanup() |
| |
| def _query(self, system_name: str) -> pa.Table: |
| batches = self.ctx.sql(f"SELECT * FROM {TABLE_NAME}${system_name}") |
| return pa.Table.from_batches(batches) |
| |
| def test_options_system_table(self): |
| table = self._query("options") |
| self.assertListEqual(table.schema.names, ["key", "value"]) |
| |
| def test_schemas_system_table(self): |
| table = self._query("schemas") |
| self.assertListEqual( |
| table.schema.names, |
| ["schema_id", "fields", "partition_keys", "primary_keys", |
| "options", "comment", "update_time"], |
| ) |
| self.assertGreaterEqual(table.num_rows, 1, "should have at least one schema") |
| ids = table.column("schema_id").to_pylist() |
| self.assertEqual(sorted(ids), sorted(set(ids)), "schema_id should be unique") |
| fields = json.loads(table.column("fields").to_pylist()[0]) |
| self.assertEqual([f["name"] for f in fields], ["id", "name"]) |
| |
| def test_snapshots_system_table(self): |
| table = self._query("snapshots") |
| names = table.schema.names |
| for required in ( |
| "snapshot_id", "schema_id", "commit_user", "commit_identifier", |
| "commit_kind", "commit_time", "base_manifest_list", |
| "delta_manifest_list", "total_record_count", |
| ): |
| self.assertIn(required, names) |
| self.assertEqual(table.num_rows, 1, "single batch write should produce one snapshot") |
| self.assertEqual(table.column("total_record_count").to_pylist()[0], ROW_COUNT) |
| |
| def test_tags_system_table_empty(self): |
| table = self._query("tags") |
| self.assertListEqual( |
| table.schema.names, |
| ["tag_name", "snapshot_id", "schema_id", "commit_time", |
| "record_count", "create_time", "time_retained"], |
| ) |
| self.assertEqual(table.num_rows, 0, "no tags created") |
| |
| def test_branches_system_table_empty(self): |
| table = self._query("branches") |
| self.assertListEqual(table.schema.names, ["branch_name", "create_time"]) |
| non_main = [b for b in table.column("branch_name").to_pylist() if b != "main"] |
| self.assertEqual(non_main, [], "no user-created branches") |
| |
| def test_manifests_system_table(self): |
| table = self._query("manifests") |
| for required in ( |
| "file_name", "file_size", "num_added_files", |
| "num_deleted_files", "schema_id", |
| ): |
| self.assertIn(required, table.schema.names) |
| self.assertGreaterEqual(table.num_rows, 1, "snapshot should have manifests") |
| for size in table.column("file_size").to_pylist(): |
| self.assertGreater(size, 0) |
| total_added = sum(table.column("num_added_files").to_pylist()) |
| self.assertGreaterEqual(total_added, 1, "single write should add at least one file") |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |