blob: ba3748a050e96fb2b4fff1a78990f384a81c5a75 [file] [log] [blame] [view]
---
sidebar_position: 3
---
# Admin Operations
```python
admin = await conn.get_admin()
```
## Databases
```python
await admin.create_database("my_database", ignore_if_exists=True)
databases = await admin.list_databases()
exists = await admin.database_exists("my_database")
await admin.drop_database("my_database", ignore_if_not_exists=True, cascade=True)
```
## Tables
Schemas are defined using PyArrow and wrapped in `fluss.Schema`:
```python
import pyarrow as pa
schema = fluss.Schema(pa.schema([
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field("amount", pa.int64()),
]))
table_path = fluss.TablePath("my_database", "my_table")
await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True)
table_info = await admin.get_table_info(table_path)
tables = await admin.list_tables("my_database")
await admin.drop_table(table_path, ignore_if_not_exists=True)
```
### TableDescriptor Options
`TableDescriptor` accepts these optional parameters:
| Parameter | Description |
|---------------------|-------------------------------------------------------------------------------------|
| `partition_keys` | Column names to partition by (e.g. `["region"]`) |
| `bucket_count` | Number of buckets (parallelism units) for the table |
| `bucket_keys` | Columns used to determine bucket assignment |
| `comment` | Table comment / description |
| `log_format` | Log storage format: `"ARROW"` or `"INDEXED"` |
| `kv_format` | KV storage format for primary key tables: `"INDEXED"` or `"COMPACTED"` |
| `properties` | Table configuration properties as a dict (e.g. `{"table.replication.factor": "1"}`) |
| `custom_properties` | User-defined properties as a dict |
## Offsets
```python
# Latest offsets for buckets
offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_spec=fluss.OffsetSpec.latest())
# By timestamp
offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.timestamp(1704067200000))
# Per-partition offsets
offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest())
```
## Lake Snapshot
:::note
Lake snapshots require [lake integration](https://fluss.apache.org/docs/maintenance/tiered-storage/overview/) (e.g. Paimon or Iceberg) to be enabled on the server. Without it, `get_latest_lake_snapshot` will raise an error.
:::
```python
snapshot = await admin.get_latest_lake_snapshot(table_path)
print(f"Snapshot ID: {snapshot.snapshot_id}")
print(f"Table buckets: {snapshot.get_table_buckets()}")
bucket = fluss.TableBucket(table_id=1, bucket=0)
offset = snapshot.get_bucket_offset(bucket)
```