blob: e2f43431d5ef832043ad7e4901c6d78aa0f88600 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Integration tests for FlussAdmin operations.
Mirrors the Rust integration tests in crates/fluss/tests/integration/admin.rs.
"""
import pyarrow as pa
import pytest
import fluss
async def test_create_database(admin):
"""Test database create, exists, get_info, and drop lifecycle."""
db_name = "py_test_create_database"
# Cleanup in case of prior failed run
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
assert not await admin.database_exists(db_name)
db_descriptor = fluss.DatabaseDescriptor(
comment="test_db",
custom_properties={"k1": "v1", "k2": "v2"},
)
await admin.create_database(db_name, db_descriptor, ignore_if_exists=False)
assert await admin.database_exists(db_name)
db_info = await admin.get_database_info(db_name)
assert db_info.database_name == db_name
descriptor = db_info.get_database_descriptor()
assert descriptor.comment == "test_db"
assert descriptor.get_custom_properties() == {"k1": "v1", "k2": "v2"}
await admin.drop_database(db_name, ignore_if_not_exists=False, cascade=True)
assert not await admin.database_exists(db_name)
async def test_create_table(admin):
"""Test table create, exists, get_info, list, and drop lifecycle."""
db_name = "py_test_create_table_db"
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
assert not await admin.database_exists(db_name)
await admin.create_database(
db_name,
fluss.DatabaseDescriptor(comment="Database for test_create_table"),
ignore_if_exists=False,
)
table_name = "test_user_table"
table_path = fluss.TablePath(db_name, table_name)
schema = fluss.Schema(
pa.schema(
[
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field("age", pa.int32()),
pa.field("email", pa.string()),
]
),
primary_keys=["id"],
)
table_descriptor = fluss.TableDescriptor(
schema,
bucket_count=3,
bucket_keys=["id"],
comment="Test table for user data (id, name, age, email)",
log_format="arrow",
kv_format="indexed",
properties={"table.replication.factor": "1"},
)
await admin.create_table(table_path, table_descriptor, ignore_if_exists=False)
assert await admin.table_exists(table_path)
tables = await admin.list_tables(db_name)
assert len(tables) == 1
assert table_name in tables
table_info = await admin.get_table_info(table_path)
assert table_info.comment == "Test table for user data (id, name, age, email)"
assert table_info.get_primary_keys() == ["id"]
assert table_info.num_buckets == 3
assert table_info.get_bucket_keys() == ["id"]
assert table_info.get_column_names() == ["id", "name", "age", "email"]
await admin.drop_table(table_path, ignore_if_not_exists=False)
assert not await admin.table_exists(table_path)
await admin.drop_database(db_name, ignore_if_not_exists=False, cascade=True)
assert not await admin.database_exists(db_name)
async def test_partition_apis(admin):
"""Test partition create, list, and drop lifecycle."""
db_name = "py_test_partition_apis_db"
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
await admin.create_database(
db_name,
fluss.DatabaseDescriptor(comment="Database for test_partition_apis"),
ignore_if_exists=True,
)
table_path = fluss.TablePath(db_name, "partitioned_table")
schema = fluss.Schema(
pa.schema(
[
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field("dt", pa.string()),
pa.field("region", pa.string()),
]
),
primary_keys=["id", "dt", "region"],
)
table_descriptor = fluss.TableDescriptor(
schema,
partition_keys=["dt", "region"],
bucket_count=3,
bucket_keys=["id"],
log_format="arrow",
kv_format="compacted",
properties={"table.replication.factor": "1"},
)
await admin.create_table(table_path, table_descriptor, ignore_if_exists=True)
# Initially no partitions
partitions = await admin.list_partition_infos(table_path)
assert len(partitions) == 0
# Create a partition
await admin.create_partition(
table_path,
{"dt": "2024-01-15", "region": "EMEA"},
ignore_if_exists=False,
)
partitions = await admin.list_partition_infos(table_path)
assert len(partitions) == 1
assert partitions[0].partition_name == "2024-01-15$EMEA"
# Drop the partition
await admin.drop_partition(
table_path,
{"dt": "2024-01-15", "region": "EMEA"},
ignore_if_not_exists=False,
)
partitions = await admin.list_partition_infos(table_path)
assert len(partitions) == 0
await admin.drop_table(table_path, ignore_if_not_exists=True)
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
async def test_fluss_error_response(admin):
"""Test that API errors are raised as FlussError with correct error codes."""
table_path = fluss.TablePath("fluss", "py_not_exist")
with pytest.raises(fluss.FlussError) as exc_info:
await admin.get_table_info(table_path)
assert exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_EXIST
async def test_error_database_not_exist(admin):
"""Test error handling for non-existent database operations."""
# get_database_info
with pytest.raises(fluss.FlussError) as exc_info:
await admin.get_database_info("py_no_such_db")
assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST
# drop_database without ignore flag
with pytest.raises(fluss.FlussError) as exc_info:
await admin.drop_database("py_no_such_db", ignore_if_not_exists=False)
assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST
# list_tables for non-existent database
with pytest.raises(fluss.FlussError) as exc_info:
await admin.list_tables("py_no_such_db")
assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST
async def test_error_database_already_exist(admin):
"""Test error when creating a database that already exists."""
db_name = "py_test_error_db_already_exist"
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
await admin.create_database(db_name, ignore_if_exists=False)
# Create same database again without ignore flag
with pytest.raises(fluss.FlussError) as exc_info:
await admin.create_database(db_name, ignore_if_exists=False)
assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_ALREADY_EXIST
# With ignore flag should succeed
await admin.create_database(db_name, ignore_if_exists=True)
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
async def test_error_table_already_exist(admin):
"""Test error when creating a table that already exists."""
db_name = "py_test_error_tbl_already_exist_db"
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
await admin.create_database(db_name, ignore_if_exists=True)
table_path = fluss.TablePath(db_name, "my_table")
schema = fluss.Schema(
pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
)
table_descriptor = fluss.TableDescriptor(
schema,
bucket_count=1,
properties={"table.replication.factor": "1"},
)
await admin.create_table(table_path, table_descriptor, ignore_if_exists=False)
# Create same table again without ignore flag
with pytest.raises(fluss.FlussError) as exc_info:
await admin.create_table(table_path, table_descriptor, ignore_if_exists=False)
assert exc_info.value.error_code == fluss.ErrorCode.TABLE_ALREADY_EXIST
# With ignore flag should succeed
await admin.create_table(table_path, table_descriptor, ignore_if_exists=True)
await admin.drop_table(table_path, ignore_if_not_exists=True)
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
async def test_error_table_not_exist(admin):
"""Test error handling for non-existent table operations."""
table_path = fluss.TablePath("fluss", "py_no_such_table")
# drop without ignore flag
with pytest.raises(fluss.FlussError) as exc_info:
await admin.drop_table(table_path, ignore_if_not_exists=False)
assert exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_EXIST
# drop with ignore flag should succeed
await admin.drop_table(table_path, ignore_if_not_exists=True)
async def test_get_server_nodes(admin):
"""Test get_server_nodes returns coordinator and tablet servers."""
nodes = await admin.get_server_nodes()
assert len(nodes) > 0, "Expected at least one server node"
server_types = [n.server_type for n in nodes]
assert "CoordinatorServer" in server_types, "Expected a coordinator server"
assert "TabletServer" in server_types, "Expected at least one tablet server"
for node in nodes:
assert node.host, "Server node host should not be empty"
assert node.port > 0, "Server node port should be > 0"
assert node.uid, "Server node uid should not be empty"
assert repr(node).startswith("ServerNode(")
async def test_error_table_not_partitioned(admin):
"""Test error when calling partition operations on non-partitioned table."""
db_name = "py_test_error_not_partitioned_db"
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
await admin.create_database(db_name, ignore_if_exists=True)
table_path = fluss.TablePath(db_name, "non_partitioned_table")
schema = fluss.Schema(
pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())])
)
table_descriptor = fluss.TableDescriptor(
schema,
bucket_count=1,
properties={"table.replication.factor": "1"},
)
await admin.create_table(table_path, table_descriptor, ignore_if_exists=False)
with pytest.raises(fluss.FlussError) as exc_info:
await admin.list_partition_infos(table_path)
assert (
exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_PARTITIONED_EXCEPTION
)
await admin.drop_table(table_path, ignore_if_not_exists=True)
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)