sidebar_position: 2

API Reference

Complete API reference for the Fluss Python client.

Config

Method / PropertyConfig KeyDescription
Config(properties: dict = None)Create config from a dict of key-value pairs
bootstrap_serversbootstrap.serversGet/set coordinator server address
writer_request_max_sizewriter.request-max-sizeGet/set max request size in bytes
writer_ackswriter.acksGet/set acknowledgment setting ("all" for all replicas)
writer_retrieswriter.retriesGet/set number of retries on failure
writer_batch_sizewriter.batch-sizeGet/set write batch size in bytes
writer_batch_timeout_mswriter.batch-timeout-msGet/set max time in ms to wait for a writer batch to fill up before sending
writer_bucket_no_key_assignerwriter.bucket.no-key-assignerGet/set bucket assignment strategy ("sticky" or "round_robin")
scanner_remote_log_prefetch_numscanner.remote-log.prefetch-numGet/set number of remote log segments to prefetch
remote_file_download_thread_numremote-file.download-thread-numGet/set number of threads for remote log downloads
scanner_remote_log_read_concurrencyscanner.remote-log.read-concurrencyGet/set streaming read concurrency within a remote log file
scanner_log_max_poll_recordsscanner.log.max-poll-recordsGet/set max number of records returned in a single poll()
scanner_log_fetch_max_bytesscanner.log.fetch.max-bytesGet/set maximum bytes per fetch response for LogScanner
scanner_log_fetch_min_bytesscanner.log.fetch.min-bytesGet/set minimum bytes the server must accumulate before returning a fetch response
scanner_log_fetch_wait_max_time_msscanner.log.fetch.wait-max-time-msGet/set maximum time (ms) the server may wait to satisfy min-bytes
scanner_log_fetch_max_bytes_for_bucketscanner.log.fetch.max-bytes-for-bucketGet/set maximum bytes per fetch response per bucket for LogScanner
connect_timeout_msconnect-timeoutGet/set TCP connect timeout in milliseconds
security_protocolsecurity.protocolGet/set security protocol ("PLAINTEXT" or "sasl")
security_sasl_mechanismsecurity.sasl.mechanismGet/set SASL mechanism (only "PLAIN" is supported)
security_sasl_usernamesecurity.sasl.usernameGet/set SASL username (required when protocol is "sasl")
security_sasl_passwordsecurity.sasl.passwordGet/set SASL password (required when protocol is "sasl")

FlussConnection

MethodDescription
await FlussConnection.create(config) -> FlussConnectionConnect to a Fluss cluster
await conn.get_admin() -> FlussAdminGet admin interface
await conn.get_table(table_path) -> FlussTableGet a table for read/write operations
conn.close()Close the connection

Supports with statement (context manager).

FlussAdmin

MethodDescription
await create_database(name, database_descriptor=None, ignore_if_exists=False)Create a database
await drop_database(name, ignore_if_not_exists=False, cascade=True)Drop a database
await list_databases() -> list[str]List all databases
await database_exists(name) -> boolCheck if a database exists
await get_database_info(name) -> DatabaseInfoGet database metadata
await create_table(table_path, table_descriptor, ignore_if_exists=False)Create a table
await drop_table(table_path, ignore_if_not_exists=False)Drop a table
await get_table_info(table_path) -> TableInfoGet table metadata
await list_tables(database_name) -> list[str]List tables in a database
await table_exists(table_path) -> boolCheck if a table exists
await list_offsets(table_path, bucket_ids, offset_spec) -> dict[int, int]Get offsets for buckets
await list_partition_offsets(table_path, partition_name, bucket_ids, offset_spec) -> dict[int, int]Get offsets for a partition's buckets
await create_partition(table_path, partition_spec, ignore_if_exists=False)Create a partition
await drop_partition(table_path, partition_spec, ignore_if_not_exists=False)Drop a partition
await list_partition_infos(table_path) -> list[PartitionInfo]List partitions
await get_latest_lake_snapshot(table_path) -> LakeSnapshotGet latest lake snapshot
await get_server_nodes() -> list[ServerNode]Get all alive server nodes

ServerNode

PropertyDescription
.id -> intServer node ID
.host -> strHostname of the server
.port -> intPort number
.server_type -> strServer type ("CoordinatorServer" or "TabletServer")
.uid -> strUnique identifier (e.g. "cs-0", "ts-1")

FlussTable

MethodDescription
new_scan() -> TableScanCreate a scan builder
new_append() -> TableAppendCreate an append builder for log tables
new_upsert() -> TableUpsertCreate an upsert builder for PK tables
new_lookup() -> TableLookupCreate a lookup builder for PK tables
get_table_info() -> TableInfoGet table metadata
get_table_path() -> TablePathGet table path
has_primary_key() -> boolCheck if table has a primary key

TableScan

MethodDescription
.project(indices) -> TableScanProject columns by index
.project_by_name(names) -> TableScanProject columns by name
await .create_log_scanner() -> LogScannerCreate record-based scanner (for poll())
await .create_record_batch_log_scanner() -> LogScannerCreate batch-based scanner (for poll_arrow(), to_arrow(), etc.)

TableAppend

Builder for creating an AppendWriter. Obtain via FlussTable.new_append().

MethodDescription
.create_writer() -> AppendWriterCreate the append writer

TableUpsert

Builder for creating an UpsertWriter. Obtain via FlussTable.new_upsert().

MethodDescription
.partial_update_by_name(columns) -> TableUpsertConfigure partial update by column names
.partial_update_by_index(indices) -> TableUpsertConfigure partial update by column indices
.create_writer() -> UpsertWriterCreate the upsert writer

TableLookup

Builder for creating a Lookuper. Obtain via FlussTable.new_lookup().

MethodDescription
.create_lookuper() -> LookuperCreate the lookuper

AppendWriter

MethodDescription
.append(row) -> WriteResultHandleAppend a row (dict, list, or tuple)
.write_arrow(table)Write a PyArrow Table
.write_arrow_batch(batch) -> WriteResultHandleWrite a PyArrow RecordBatch
.write_pandas(df)Write a Pandas DataFrame
await .flush()Flush all pending writes

UpsertWriter

MethodDescription
.upsert(row) -> WriteResultHandleUpsert a row (insert or update by PK)
.delete(pk) -> WriteResultHandleDelete a row by primary key
await .flush()Flush all pending operations

WriteResultHandle

MethodDescription
await .wait()Wait for server acknowledgment of this write

Lookuper

MethodDescription
await .lookup(pk) -> dict | NoneLookup a row by primary key

LogScanner

MethodDescription
.subscribe(bucket_id, start_offset)Subscribe to a bucket
.subscribe_buckets(bucket_offsets)Subscribe to multiple buckets ({bucket_id: offset})
.subscribe_partition(partition_id, bucket_id, start_offset)Subscribe to a partition bucket
.subscribe_partition_buckets(partition_bucket_offsets)Subscribe to multiple partition+bucket combos ({(part_id, bucket_id): offset})
.unsubscribe(bucket_id)Unsubscribe from a bucket (non-partitioned tables)
.unsubscribe_partition(partition_id, bucket_id)Unsubscribe from a partition bucket
.poll(timeout_ms) -> ScanRecordsPoll individual records (record scanner only)
.poll_arrow(timeout_ms) -> pa.TablePoll as Arrow Table (batch scanner only)
.poll_record_batch(timeout_ms) -> list[RecordBatch]Poll batches with metadata (batch scanner only)
.to_arrow() -> pa.TableRead all subscribed data as Arrow Table (batch scanner only)
.to_pandas() -> pd.DataFrameRead all subscribed data as DataFrame (batch scanner only)

ScanRecords

Returned by LogScanner.poll(). Records are grouped by bucket.

Note: Flat iteration and integer indexing traverse buckets in an arbitrary order that is consistent within a single ScanRecords instance but may differ between poll() calls. Use per-bucket access (.items(), .records(bucket)) when bucket ordering matters.

scan_records = scanner.poll(timeout_ms=5000)

# Sequence access
scan_records[0]                              # first record
scan_records[-1]                             # last record
scan_records[:5]                             # first 5 records

# Per-bucket access
for bucket, records in scan_records.items():
    for record in records:
        print(f"bucket={bucket.bucket_id}, offset={record.offset}, row={record.row}")

# Flat iteration
for record in scan_records:
    print(record.row)

Methods

MethodDescription
.buckets() -> list[TableBucket]List of distinct buckets
.records(bucket) -> list[ScanRecord]Records for a specific bucket (empty list if bucket not present)
.count() -> intTotal record count across all buckets
.is_empty() -> boolCheck if empty

Indexing

ExpressionReturnsDescription
scan_records[0]ScanRecordRecord by flat index
scan_records[-1]ScanRecordNegative indexing
scan_records[1:5]list[ScanRecord]Slice
scan_records[bucket]list[ScanRecord]Records for a bucket

Mapping Protocol

Method / ProtocolDescription
.keys()Same as .buckets()
.values()Lazy iterator over record lists, one per bucket
.items()Lazy iterator over (bucket, records) pairs
len(scan_records)Same as .count()
bucket in scan_recordsMembership test
for record in scan_recordsFlat iteration over all records

ScanRecord

PropertyDescription
.offset -> intRecord offset in the log
.timestamp -> intRecord timestamp
.change_type -> ChangeTypeChange type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete)
.row -> dictRow data as {column_name: value}

RecordBatch

PropertyDescription
.batch -> pa.RecordBatchArrow RecordBatch data
.bucket -> TableBucketBucket this batch belongs to
.base_offset -> intFirst record offset
.last_offset -> intLast record offset

Schema

MethodDescription
Schema(schema: pa.Schema, primary_keys=None)Create from PyArrow schema
.get_column_names() -> list[str]Get column names
.get_column_types() -> list[str]Get column type names
.get_columns() -> list[tuple[str, str]]Get (name, type) pairs
.get_primary_keys() -> list[str]Get primary key columns

TableDescriptor

MethodDescription
TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, log_format=None, kv_format=None, properties=None, custom_properties=None)Create table descriptor
.get_schema() -> SchemaGet the schema

TablePath

Method / PropertyDescription
TablePath(database, table)Create a table path
.database_name -> strDatabase name
.table_name -> strTable name

TableInfo

Property / MethodDescription
.table_id -> intTable ID
.table_path -> TablePathTable path
.num_buckets -> intNumber of buckets
.schema_id -> intSchema ID
.comment -> str | NoneTable comment
.created_time -> intCreation timestamp
.modified_time -> intLast modification timestamp
.get_primary_keys() -> list[str]Primary key columns
.get_partition_keys() -> list[str]Partition columns
.get_bucket_keys() -> list[str]Bucket key columns
.has_primary_key() -> boolHas primary key?
.is_partitioned() -> boolIs partitioned?
.get_schema() -> SchemaGet table schema
.get_column_names() -> list[str]Column names
.get_column_count() -> intNumber of columns
.get_properties() -> dictAll table properties
.get_custom_properties() -> dictCustom properties only

PartitionInfo

PropertyDescription
.partition_id -> intPartition ID
.partition_name -> strPartition name

DatabaseDescriptor

Method / PropertyDescription
DatabaseDescriptor(comment=None, custom_properties=None)Create descriptor
.comment -> str | NoneDatabase comment
.get_custom_properties() -> dictCustom properties

DatabaseInfo

Property / MethodDescription
.database_name -> strDatabase name
.created_time -> intCreation timestamp
.modified_time -> intLast modification timestamp
.get_database_descriptor() -> DatabaseDescriptorGet descriptor

LakeSnapshot

Property / MethodDescription
.snapshot_id -> intSnapshot ID
.table_buckets_offset -> dict[TableBucket, int]All bucket offsets
.get_bucket_offset(bucket) -> int | NoneGet offset for a bucket
.get_table_buckets() -> list[TableBucket]Get all buckets

TableBucket

Method / PropertyDescription
TableBucket(table_id, bucket)Create non-partitioned bucket
TableBucket.with_partition(table_id, partition_id, bucket)Create partitioned bucket
.table_id -> intTable ID
.bucket_id -> intBucket ID
.partition_id -> int | NonePartition ID (None if non-partitioned)

FlussError

PropertyDescription
.message -> strError message
.error_code -> intError code (ErrorCode.CLIENT_ERROR for client-side errors, server code otherwise)

Raised for all Fluss-specific errors (connection failures, table not found, schema mismatches, etc.). Inherits from Exception. See Error Handling for details on matching specific error codes.

Constants

ConstantValueDescription
fluss.EARLIEST_OFFSET-2Start reading from earliest available offset

OffsetSpec

MethodDescription
OffsetSpec.earliest()Earliest available offset
OffsetSpec.latest()Latest offset
OffsetSpec.timestamp(ts)Offset at or after the given timestamp (millis)

To start reading from the latest offset (only new records), resolve the current offset via list_offsets before subscribing:

offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
scanner.subscribe(bucket_id=0, start_offset=offsets[0])

ChangeType

ValueShort StringDescription
ChangeType.AppendOnly (0)+AAppend-only
ChangeType.Insert (1)+IInsert
ChangeType.UpdateBefore (2)-UPrevious value of updated row
ChangeType.UpdateAfter (3)+UNew value of updated row
ChangeType.Delete (4)-DDelete