blob: e221d0098f143bcef8a15664886ac21de8f7bab9 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""End-to-end tests for Paimon system tables via pypaimon SQL.
Exercises the `<table>$<system_name>` syntax handled by paimon-rust
DataFusion integration. A non-partitioned table with one snapshot is
created in setUpClass and queried by each test.
"""
import json
import os
import tempfile
import unittest
import pyarrow as pa
from pypaimon_rust.datafusion import SQLContext
WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE")
TABLE_NAME = "sql_system_test_table"
ROW_COUNT = 3
class SQLSystemTableTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._tmpdir = None
if WAREHOUSE:
cls.warehouse = WAREHOUSE
else:
cls._tmpdir = tempfile.TemporaryDirectory(prefix="paimon-sql-systest-")
cls.warehouse = cls._tmpdir.name
ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": cls.warehouse})
ctx.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
ctx.sql(f"CREATE TABLE {TABLE_NAME} (id INT, name STRING)")
ctx.sql(
f"INSERT INTO {TABLE_NAME} VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')"
)
cls.ctx = ctx
@classmethod
def tearDownClass(cls):
ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": cls.warehouse})
ctx.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
if cls._tmpdir is not None:
cls._tmpdir.cleanup()
def _query(self, system_name: str) -> pa.Table:
batches = self.ctx.sql(f"SELECT * FROM {TABLE_NAME}${system_name}")
return pa.Table.from_batches(batches)
def test_options_system_table(self):
table = self._query("options")
self.assertListEqual(table.schema.names, ["key", "value"])
def test_schemas_system_table(self):
table = self._query("schemas")
self.assertListEqual(
table.schema.names,
["schema_id", "fields", "partition_keys", "primary_keys",
"options", "comment", "update_time"],
)
self.assertGreaterEqual(table.num_rows, 1, "should have at least one schema")
ids = table.column("schema_id").to_pylist()
self.assertEqual(sorted(ids), sorted(set(ids)), "schema_id should be unique")
fields = json.loads(table.column("fields").to_pylist()[0])
self.assertEqual([f["name"] for f in fields], ["id", "name"])
def test_snapshots_system_table(self):
table = self._query("snapshots")
names = table.schema.names
for required in (
"snapshot_id", "schema_id", "commit_user", "commit_identifier",
"commit_kind", "commit_time", "base_manifest_list",
"delta_manifest_list", "total_record_count",
):
self.assertIn(required, names)
self.assertEqual(table.num_rows, 1, "single batch write should produce one snapshot")
self.assertEqual(table.column("total_record_count").to_pylist()[0], ROW_COUNT)
def test_tags_system_table_empty(self):
table = self._query("tags")
self.assertListEqual(
table.schema.names,
["tag_name", "snapshot_id", "schema_id", "commit_time",
"record_count", "create_time", "time_retained"],
)
self.assertEqual(table.num_rows, 0, "no tags created")
def test_branches_system_table_empty(self):
table = self._query("branches")
self.assertListEqual(table.schema.names, ["branch_name", "create_time"])
non_main = [b for b in table.column("branch_name").to_pylist() if b != "main"]
self.assertEqual(non_main, [], "no user-created branches")
def test_manifests_system_table(self):
table = self._query("manifests")
for required in (
"file_name", "file_size", "num_added_files",
"num_deleted_files", "schema_id",
):
self.assertIn(required, table.schema.names)
self.assertGreaterEqual(table.num_rows, 1, "snapshot should have manifests")
for size in table.column("file_size").to_pylist():
self.assertGreater(size, 0)
total_added = sum(table.column("num_added_files").to_pylist())
self.assertGreaterEqual(total_added, 1, "single write should add at least one file")
if __name__ == "__main__":
unittest.main()