blob: c78b79e701afe33c8e0e9be1dc44b6eb2348d506 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""End-to-end tests for the ``$options`` system table."""
import os
import shutil
import tempfile
import unittest
from pypaimon import CatalogFactory, Schema
from pypaimon.schema.data_types import DataField
from pypaimon.table.system.options_table import OptionsTable
def _read(table):
rb = table.new_read_builder()
return rb.new_read().to_arrow(rb.new_scan().plan().splits())
class OptionsTableTest(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="opts_sys_")
warehouse = os.path.join(self.tmp, "warehouse")
self.catalog = CatalogFactory.create({"warehouse": warehouse})
self.catalog.create_database("db", False)
fields = [DataField.from_dict({"id": 0, "name": "v", "type": "INT"})]
self.catalog.create_table(
"db.t",
Schema(fields=fields, options={
"bucket": "2",
"user-flag": "alpha",
}),
False,
)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_options_table_is_loaded_via_catalog(self):
table = self.catalog.get_table("db.t$options")
self.assertIsInstance(table, OptionsTable)
def test_schema_column_layout(self):
table = self.catalog.get_table("db.t$options")
row_type = table.row_type()
self.assertEqual(["key", "value"], [f.name for f in row_type.fields])
for field in row_type.fields:
self.assertFalse(field.type.nullable,
"{} should be NOT NULL".format(field.name))
self.assertEqual(["key"], table.primary_keys())
def test_read_returns_every_committed_option(self):
table = self.catalog.get_table("db.t$options")
arrow_table = _read(table)
self.assertEqual(["key", "value"], arrow_table.schema.names)
rendered = dict(zip(
arrow_table.column("key").to_pylist(),
arrow_table.column("value").to_pylist(),
))
self.assertEqual("2", rendered["bucket"])
self.assertEqual("alpha", rendered["user-flag"])
def test_read_with_projection_keeps_requested_column_only(self):
table = self.catalog.get_table("db.t$options")
rb = table.new_read_builder().with_projection(["key"])
arrow_table = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(["key"], arrow_table.schema.names)
self.assertIn("bucket", arrow_table.column("key").to_pylist())
if __name__ == "__main__":
unittest.main()