sidebar_position: 3

Admin Operations

admin = await conn.get_admin()

Databases

await admin.create_database("my_database", ignore_if_exists=True)
databases = await admin.list_databases()
exists = await admin.database_exists("my_database")
await admin.drop_database("my_database", ignore_if_not_exists=True, cascade=True)

Tables

Schemas are defined using PyArrow and wrapped in fluss.Schema:

import pyarrow as pa

schema = fluss.Schema(pa.schema([
    pa.field("id", pa.int32()),
    pa.field("name", pa.string()),
    pa.field("amount", pa.int64()),
]))

table_path = fluss.TablePath("my_database", "my_table")
await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True)

table_info = await admin.get_table_info(table_path)
tables = await admin.list_tables("my_database")
await admin.drop_table(table_path, ignore_if_not_exists=True)

TableDescriptor Options

TableDescriptor accepts these optional parameters:

ParameterDescription
partition_keysColumn names to partition by (e.g. ["region"])
bucket_countNumber of buckets (parallelism units) for the table
bucket_keysColumns used to determine bucket assignment
commentTable comment / description
log_formatLog storage format: "ARROW" or "INDEXED"
kv_formatKV storage format for primary key tables: "INDEXED" or "COMPACTED"
propertiesTable configuration properties as a dict (e.g. {"table.replication.factor": "1"})
custom_propertiesUser-defined properties as a dict

Offsets

# Latest offsets for buckets
offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_spec=fluss.OffsetSpec.latest())

# By timestamp
offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.timestamp(1704067200000))

# Per-partition offsets
offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest())

Lake Snapshot

:::note Lake snapshots require lake integration (e.g. Paimon or Iceberg) to be enabled on the server. Without it, get_latest_lake_snapshot will raise an error. :::

snapshot = await admin.get_latest_lake_snapshot(table_path)
print(f"Snapshot ID: {snapshot.snapshot_id}")
print(f"Table buckets: {snapshot.get_table_buckets()}")

bucket = fluss.TableBucket(table_id=1, bucket=0)
offset = snapshot.get_bucket_offset(bucket)