sidebar_position: 2
API Reference
Complete API reference for the Fluss Python client.
Config
| Method / Property | Config Key | Description |
|---|
Config(properties: dict = None) | | Create config from a dict of key-value pairs |
bootstrap_servers | bootstrap.servers | Get/set coordinator server address |
writer_request_max_size | writer.request-max-size | Get/set max request size in bytes |
writer_acks | writer.acks | Get/set acknowledgment setting ("all" for all replicas) |
writer_retries | writer.retries | Get/set number of retries on failure |
writer_batch_size | writer.batch-size | Get/set write batch size in bytes |
writer_batch_timeout_ms | writer.batch-timeout-ms | Get/set max time in ms to wait for a writer batch to fill up before sending |
writer_bucket_no_key_assigner | writer.bucket.no-key-assigner | Get/set bucket assignment strategy ("sticky" or "round_robin") |
scanner_remote_log_prefetch_num | scanner.remote-log.prefetch-num | Get/set number of remote log segments to prefetch |
remote_file_download_thread_num | remote-file.download-thread-num | Get/set number of threads for remote log downloads |
scanner_remote_log_read_concurrency | scanner.remote-log.read-concurrency | Get/set streaming read concurrency within a remote log file |
scanner_log_max_poll_records | scanner.log.max-poll-records | Get/set max number of records returned in a single poll() |
scanner_log_fetch_max_bytes | scanner.log.fetch.max-bytes | Get/set maximum bytes per fetch response for LogScanner |
scanner_log_fetch_min_bytes | scanner.log.fetch.min-bytes | Get/set minimum bytes the server must accumulate before returning a fetch response |
scanner_log_fetch_wait_max_time_ms | scanner.log.fetch.wait-max-time-ms | Get/set maximum time (ms) the server may wait to satisfy min-bytes |
scanner_log_fetch_max_bytes_for_bucket | scanner.log.fetch.max-bytes-for-bucket | Get/set maximum bytes per fetch response per bucket for LogScanner |
connect_timeout_ms | connect-timeout | Get/set TCP connect timeout in milliseconds |
security_protocol | security.protocol | Get/set security protocol ("PLAINTEXT" or "sasl") |
security_sasl_mechanism | security.sasl.mechanism | Get/set SASL mechanism (only "PLAIN" is supported) |
security_sasl_username | security.sasl.username | Get/set SASL username (required when protocol is "sasl") |
security_sasl_password | security.sasl.password | Get/set SASL password (required when protocol is "sasl") |
FlussConnection
| Method | Description |
|---|
await FlussConnection.create(config) -> FlussConnection | Connect to a Fluss cluster |
await conn.get_admin() -> FlussAdmin | Get admin interface |
await conn.get_table(table_path) -> FlussTable | Get a table for read/write operations |
conn.close() | Close the connection |
Supports with statement (context manager).
FlussAdmin
| Method | Description |
|---|
await create_database(name, database_descriptor=None, ignore_if_exists=False) | Create a database |
await drop_database(name, ignore_if_not_exists=False, cascade=True) | Drop a database |
await list_databases() -> list[str] | List all databases |
await database_exists(name) -> bool | Check if a database exists |
await get_database_info(name) -> DatabaseInfo | Get database metadata |
await create_table(table_path, table_descriptor, ignore_if_exists=False) | Create a table |
await drop_table(table_path, ignore_if_not_exists=False) | Drop a table |
await get_table_info(table_path) -> TableInfo | Get table metadata |
await list_tables(database_name) -> list[str] | List tables in a database |
await table_exists(table_path) -> bool | Check if a table exists |
await list_offsets(table_path, bucket_ids, offset_spec) -> dict[int, int] | Get offsets for buckets |
await list_partition_offsets(table_path, partition_name, bucket_ids, offset_spec) -> dict[int, int] | Get offsets for a partition's buckets |
await create_partition(table_path, partition_spec, ignore_if_exists=False) | Create a partition |
await drop_partition(table_path, partition_spec, ignore_if_not_exists=False) | Drop a partition |
await list_partition_infos(table_path) -> list[PartitionInfo] | List partitions |
await get_latest_lake_snapshot(table_path) -> LakeSnapshot | Get latest lake snapshot |
await get_server_nodes() -> list[ServerNode] | Get all alive server nodes |
ServerNode
| Property | Description |
|---|
.id -> int | Server node ID |
.host -> str | Hostname of the server |
.port -> int | Port number |
.server_type -> str | Server type ("CoordinatorServer" or "TabletServer") |
.uid -> str | Unique identifier (e.g. "cs-0", "ts-1") |
FlussTable
| Method | Description |
|---|
new_scan() -> TableScan | Create a scan builder |
new_append() -> TableAppend | Create an append builder for log tables |
new_upsert() -> TableUpsert | Create an upsert builder for PK tables |
new_lookup() -> TableLookup | Create a lookup builder for PK tables |
get_table_info() -> TableInfo | Get table metadata |
get_table_path() -> TablePath | Get table path |
has_primary_key() -> bool | Check if table has a primary key |
TableScan
| Method | Description |
|---|
.project(indices) -> TableScan | Project columns by index |
.project_by_name(names) -> TableScan | Project columns by name |
await .create_log_scanner() -> LogScanner | Create record-based scanner (for poll()) |
await .create_record_batch_log_scanner() -> LogScanner | Create batch-based scanner (for poll_arrow(), to_arrow(), etc.) |
TableAppend
Builder for creating an AppendWriter. Obtain via FlussTable.new_append().
| Method | Description |
|---|
.create_writer() -> AppendWriter | Create the append writer |
TableUpsert
Builder for creating an UpsertWriter. Obtain via FlussTable.new_upsert().
| Method | Description |
|---|
.partial_update_by_name(columns) -> TableUpsert | Configure partial update by column names |
.partial_update_by_index(indices) -> TableUpsert | Configure partial update by column indices |
.create_writer() -> UpsertWriter | Create the upsert writer |
TableLookup
Builder for creating a Lookuper. Obtain via FlussTable.new_lookup().
| Method | Description |
|---|
.create_lookuper() -> Lookuper | Create the lookuper |
AppendWriter
| Method | Description |
|---|
.append(row) -> WriteResultHandle | Append a row (dict, list, or tuple) |
.write_arrow(table) | Write a PyArrow Table |
.write_arrow_batch(batch) -> WriteResultHandle | Write a PyArrow RecordBatch |
.write_pandas(df) | Write a Pandas DataFrame |
await .flush() | Flush all pending writes |
UpsertWriter
| Method | Description |
|---|
.upsert(row) -> WriteResultHandle | Upsert a row (insert or update by PK) |
.delete(pk) -> WriteResultHandle | Delete a row by primary key |
await .flush() | Flush all pending operations |
WriteResultHandle
| Method | Description |
|---|
await .wait() | Wait for server acknowledgment of this write |
Lookuper
| Method | Description |
|---|
await .lookup(pk) -> dict | None | Lookup a row by primary key |
LogScanner
| Method | Description |
|---|
.subscribe(bucket_id, start_offset) | Subscribe to a bucket |
.subscribe_buckets(bucket_offsets) | Subscribe to multiple buckets ({bucket_id: offset}) |
.subscribe_partition(partition_id, bucket_id, start_offset) | Subscribe to a partition bucket |
.subscribe_partition_buckets(partition_bucket_offsets) | Subscribe to multiple partition+bucket combos ({(part_id, bucket_id): offset}) |
.unsubscribe(bucket_id) | Unsubscribe from a bucket (non-partitioned tables) |
.unsubscribe_partition(partition_id, bucket_id) | Unsubscribe from a partition bucket |
.poll(timeout_ms) -> ScanRecords | Poll individual records (record scanner only) |
.poll_arrow(timeout_ms) -> pa.Table | Poll as Arrow Table (batch scanner only) |
.poll_record_batch(timeout_ms) -> list[RecordBatch] | Poll batches with metadata (batch scanner only) |
.to_arrow() -> pa.Table | Read all subscribed data as Arrow Table (batch scanner only) |
.to_pandas() -> pd.DataFrame | Read all subscribed data as DataFrame (batch scanner only) |
ScanRecords
Returned by LogScanner.poll(). Records are grouped by bucket.
Note: Flat iteration and integer indexing traverse buckets in an arbitrary order that is consistent within a single ScanRecords instance but may differ between poll() calls. Use per-bucket access (.items(), .records(bucket)) when bucket ordering matters.
scan_records = scanner.poll(timeout_ms=5000)
# Sequence access
scan_records[0] # first record
scan_records[-1] # last record
scan_records[:5] # first 5 records
# Per-bucket access
for bucket, records in scan_records.items():
for record in records:
print(f"bucket={bucket.bucket_id}, offset={record.offset}, row={record.row}")
# Flat iteration
for record in scan_records:
print(record.row)
Methods
| Method | Description |
|---|
.buckets() -> list[TableBucket] | List of distinct buckets |
.records(bucket) -> list[ScanRecord] | Records for a specific bucket (empty list if bucket not present) |
.count() -> int | Total record count across all buckets |
.is_empty() -> bool | Check if empty |
Indexing
| Expression | Returns | Description |
|---|
scan_records[0] | ScanRecord | Record by flat index |
scan_records[-1] | ScanRecord | Negative indexing |
scan_records[1:5] | list[ScanRecord] | Slice |
scan_records[bucket] | list[ScanRecord] | Records for a bucket |
Mapping Protocol
| Method / Protocol | Description |
|---|
.keys() | Same as .buckets() |
.values() | Lazy iterator over record lists, one per bucket |
.items() | Lazy iterator over (bucket, records) pairs |
len(scan_records) | Same as .count() |
bucket in scan_records | Membership test |
for record in scan_records | Flat iteration over all records |
ScanRecord
| Property | Description |
|---|
.offset -> int | Record offset in the log |
.timestamp -> int | Record timestamp |
.change_type -> ChangeType | Change type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete) |
.row -> dict | Row data as {column_name: value} |
RecordBatch
| Property | Description |
|---|
.batch -> pa.RecordBatch | Arrow RecordBatch data |
.bucket -> TableBucket | Bucket this batch belongs to |
.base_offset -> int | First record offset |
.last_offset -> int | Last record offset |
Schema
| Method | Description |
|---|
Schema(schema: pa.Schema, primary_keys=None) | Create from PyArrow schema |
.get_column_names() -> list[str] | Get column names |
.get_column_types() -> list[str] | Get column type names |
.get_columns() -> list[tuple[str, str]] | Get (name, type) pairs |
.get_primary_keys() -> list[str] | Get primary key columns |
TableDescriptor
| Method | Description |
|---|
TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, log_format=None, kv_format=None, properties=None, custom_properties=None) | Create table descriptor |
.get_schema() -> Schema | Get the schema |
TablePath
| Method / Property | Description |
|---|
TablePath(database, table) | Create a table path |
.database_name -> str | Database name |
.table_name -> str | Table name |
TableInfo
| Property / Method | Description |
|---|
.table_id -> int | Table ID |
.table_path -> TablePath | Table path |
.num_buckets -> int | Number of buckets |
.schema_id -> int | Schema ID |
.comment -> str | None | Table comment |
.created_time -> int | Creation timestamp |
.modified_time -> int | Last modification timestamp |
.get_primary_keys() -> list[str] | Primary key columns |
.get_partition_keys() -> list[str] | Partition columns |
.get_bucket_keys() -> list[str] | Bucket key columns |
.has_primary_key() -> bool | Has primary key? |
.is_partitioned() -> bool | Is partitioned? |
.get_schema() -> Schema | Get table schema |
.get_column_names() -> list[str] | Column names |
.get_column_count() -> int | Number of columns |
.get_properties() -> dict | All table properties |
.get_custom_properties() -> dict | Custom properties only |
PartitionInfo
| Property | Description |
|---|
.partition_id -> int | Partition ID |
.partition_name -> str | Partition name |
DatabaseDescriptor
| Method / Property | Description |
|---|
DatabaseDescriptor(comment=None, custom_properties=None) | Create descriptor |
.comment -> str | None | Database comment |
.get_custom_properties() -> dict | Custom properties |
DatabaseInfo
| Property / Method | Description |
|---|
.database_name -> str | Database name |
.created_time -> int | Creation timestamp |
.modified_time -> int | Last modification timestamp |
.get_database_descriptor() -> DatabaseDescriptor | Get descriptor |
LakeSnapshot
| Property / Method | Description |
|---|
.snapshot_id -> int | Snapshot ID |
.table_buckets_offset -> dict[TableBucket, int] | All bucket offsets |
.get_bucket_offset(bucket) -> int | None | Get offset for a bucket |
.get_table_buckets() -> list[TableBucket] | Get all buckets |
TableBucket
| Method / Property | Description |
|---|
TableBucket(table_id, bucket) | Create non-partitioned bucket |
TableBucket.with_partition(table_id, partition_id, bucket) | Create partitioned bucket |
.table_id -> int | Table ID |
.bucket_id -> int | Bucket ID |
.partition_id -> int | None | Partition ID (None if non-partitioned) |
FlussError
| Property | Description |
|---|
.message -> str | Error message |
.error_code -> int | Error code (ErrorCode.CLIENT_ERROR for client-side errors, server code otherwise) |
Raised for all Fluss-specific errors (connection failures, table not found, schema mismatches, etc.). Inherits from Exception. See Error Handling for details on matching specific error codes.
Constants
| Constant | Value | Description |
|---|
fluss.EARLIEST_OFFSET | -2 | Start reading from earliest available offset |
OffsetSpec
| Method | Description |
|---|
OffsetSpec.earliest() | Earliest available offset |
OffsetSpec.latest() | Latest offset |
OffsetSpec.timestamp(ts) | Offset at or after the given timestamp (millis) |
To start reading from the latest offset (only new records), resolve the current offset via list_offsets before subscribing:
offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
scanner.subscribe(bucket_id=0, start_offset=offsets[0])
ChangeType
| Value | Short String | Description |
|---|
ChangeType.AppendOnly (0) | +A | Append-only |
ChangeType.Insert (1) | +I | Insert |
ChangeType.UpdateBefore (2) | -U | Previous value of updated row |
ChangeType.UpdateAfter (3) | +U | New value of updated row |
ChangeType.Delete (4) | -D | Delete |