admin = await conn.get_admin()
await admin.create_database("my_database", ignore_if_exists=True) databases = await admin.list_databases() exists = await admin.database_exists("my_database") await admin.drop_database("my_database", ignore_if_not_exists=True, cascade=True)
Schemas are defined using PyArrow and wrapped in fluss.Schema:
import pyarrow as pa schema = fluss.Schema(pa.schema([ pa.field("id", pa.int32()), pa.field("name", pa.string()), pa.field("amount", pa.int64()), ])) table_path = fluss.TablePath("my_database", "my_table") await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) table_info = await admin.get_table_info(table_path) tables = await admin.list_tables("my_database") await admin.drop_table(table_path, ignore_if_not_exists=True)
TableDescriptor accepts these optional parameters:
| Parameter | Description |
|---|---|
partition_keys | Column names to partition by (e.g. ["region"]) |
bucket_count | Number of buckets (parallelism units) for the table |
bucket_keys | Columns used to determine bucket assignment |
comment | Table comment / description |
log_format | Log storage format: "ARROW" or "INDEXED" |
kv_format | KV storage format for primary key tables: "INDEXED" or "COMPACTED" |
properties | Table configuration properties as a dict (e.g. {"table.replication.factor": "1"}) |
custom_properties | User-defined properties as a dict |
# Latest offsets for buckets offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_spec=fluss.OffsetSpec.latest()) # By timestamp offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.timestamp(1704067200000)) # Per-partition offsets offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest())
:::note Lake snapshots require lake integration (e.g. Paimon or Iceberg) to be enabled on the server. Without it, get_latest_lake_snapshot will raise an error. :::
snapshot = await admin.get_latest_lake_snapshot(table_path) print(f"Snapshot ID: {snapshot.snapshot_id}") print(f"Table buckets: {snapshot.get_table_buckets()}") bucket = fluss.TableBucket(table_id=1, bucket=0) offset = snapshot.get_bucket_offset(bucket)