sidebar_position: 2

API Reference

Complete API reference for the Fluss Rust client.

Config

FieldTypeDefaultDescription
bootstrap_serversString"127.0.0.1:9123"Coordinator server address
writer_request_max_sizei3210485760 (10 MB)Maximum request size in bytes
writer_acksString"all"Acknowledgment setting ("all" waits for all replicas)
writer_retriesi32i32::MAXNumber of retries on failure
writer_batch_sizei322097152 (2 MB)Batch size for writes in bytes
writer_batch_timeout_msi64100Maximum time in ms to wait for a writer batch to fill up before sending
writer_bucket_no_key_assignerNoKeyAssignerstickyBucket assignment strategy for tables without bucket keys: sticky or round_robin
scanner_remote_log_prefetch_numusize4Number of remote log segments to prefetch
remote_file_download_thread_numusize3Number of threads for remote log downloads
scanner_remote_log_read_concurrencyusize4Streaming read concurrency within a remote log file
scanner_log_max_poll_recordsusize500Maximum number of records returned in a single poll()
scanner_log_fetch_max_bytesi3216777216 (16 MB)Maximum bytes per fetch response for LogScanner
scanner_log_fetch_min_bytesi321Minimum bytes the server must accumulate before returning a fetch response
scanner_log_fetch_wait_max_time_msi32500Maximum time (ms) the server may wait to satisfy min-bytes
scanner_log_fetch_max_bytes_for_bucketi321048576 (1 MB)Maximum bytes per fetch response per bucket for LogScanner
connect_timeout_msu64120000TCP connect timeout in milliseconds
security_protocolString"PLAINTEXT"PLAINTEXT (default) or sasl for SASL auth
security_sasl_mechanismString"PLAIN"SASL mechanism (only PLAIN is supported)
security_sasl_usernameString(empty)SASL username (required when protocol is sasl)
security_sasl_passwordString(empty)SASL password (required when protocol is sasl)

FlussConnection

MethodDescription
async fn new(config: Config) -> Result<Self>Create a new connection to a Fluss cluster
async fn get_admin(&self) -> Result<FlussAdmin>Get the admin interface for cluster management
async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>>Get a table for read/write operations
fn config(&self) -> &ConfigGet a reference to the connection config

FlussAdmin

Database Operations

MethodDescription
async fn create_database(&self, name: &str, descriptor: Option<&DatabaseDescriptor>, ignore_if_exists: bool) -> Result<()>Create a database
async fn drop_database(&self, name: &str, ignore_if_not_exists: bool, cascade: bool) -> Result<()>Drop a database
async fn list_databases(&self) -> Result<Vec<String>>List all databases
async fn database_exists(&self, name: &str) -> Result<bool>Check if a database exists
async fn get_database_info(&self, name: &str) -> Result<DatabaseInfo>Get database metadata

Table Operations

MethodDescription
async fn create_table(&self, table_path: &TablePath, descriptor: &TableDescriptor, ignore_if_exists: bool) -> Result<()>Create a table
async fn drop_table(&self, table_path: &TablePath, ignore_if_not_exists: bool) -> Result<()>Drop a table
async fn get_table_info(&self, table_path: &TablePath) -> Result<TableInfo>Get table metadata
async fn list_tables(&self, database_name: &str) -> Result<Vec<String>>List tables in a database
async fn table_exists(&self, table_path: &TablePath) -> Result<bool>Check if a table exists

Partition Operations

MethodDescription
async fn list_partition_infos(&self, table_path: &TablePath) -> Result<Vec<PartitionInfo>>List all partitions
async fn list_partition_infos_with_spec(&self, table_path: &TablePath, spec: Option<&PartitionSpec>) -> Result<Vec<PartitionInfo>>List partitions matching a spec
async fn create_partition(&self, table_path: &TablePath, spec: &PartitionSpec, ignore_if_exists: bool) -> Result<()>Create a partition
async fn drop_partition(&self, table_path: &TablePath, spec: &PartitionSpec, ignore_if_not_exists: bool) -> Result<()>Drop a partition

Offset Operations

MethodDescription
async fn list_offsets(&self, table_path: &TablePath, bucket_ids: &[i32], offset_spec: OffsetSpec) -> Result<HashMap<i32, i64>>Get offsets for buckets
async fn list_partition_offsets(&self, table_path: &TablePath, partition_name: &str, bucket_ids: &[i32], offset_spec: OffsetSpec) -> Result<HashMap<i32, i64>>Get offsets for a partition's buckets

Lake Operations

MethodDescription
async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot>Get the latest lake snapshot

Cluster Operations

MethodDescription
async fn get_server_nodes(&self) -> Result<Vec<ServerNode>>Get all alive server nodes (coordinator + tablets)

ServerNode

MethodDescription
fn id(&self) -> i32Server node ID
fn host(&self) -> &strHostname of the server
fn port(&self) -> u32Port number
fn server_type(&self) -> &ServerTypeServer type (CoordinatorServer or TabletServer)
fn uid(&self) -> &strUnique identifier (e.g. "cs-0", "ts-1")

FlussTable<'a>

MethodDescription
fn get_table_info(&self) -> &TableInfoGet table metadata
fn new_append(&self) -> Result<TableAppend>Create an append builder for log tables
fn new_scan(&self) -> TableScan<'_>Create a scan builder
fn new_lookup(&self) -> Result<TableLookup>Create a lookup builder for PK tables
fn new_upsert(&self) -> Result<TableUpsert>Create an upsert builder for PK tables
fn has_primary_key(&self) -> boolCheck if the table has a primary key
fn table_path(&self) -> &TablePathGet the table path

TableAppend

MethodDescription
fn create_writer(&self) -> Result<AppendWriter>Create an append writer

AppendWriter

MethodDescription
fn append(&self, row: &impl InternalRow) -> Result<WriteResultFuture>Append a row; returns a future for acknowledgment
fn append_arrow_batch(&self, batch: RecordBatch) -> Result<WriteResultFuture>Append an Arrow RecordBatch
async fn flush(&self) -> Result<()>Flush all pending writes to the server

TableScan<'a>

MethodDescription
fn project(self, indices: &[usize]) -> Result<Self>Project columns by index
fn project_by_name(self, names: &[&str]) -> Result<Self>Project columns by name
fn create_log_scanner(self) -> Result<LogScanner>Create a record-based log scanner
fn create_record_batch_log_scanner(self) -> Result<RecordBatchLogScanner>Create an Arrow batch-based log scanner

LogScanner

MethodDescription
async fn subscribe(&self, bucket_id: i32, start_offset: i64) -> Result<()>Subscribe to a bucket
async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()>Subscribe to multiple buckets
async fn subscribe_partition(&self, partition_id: i64, bucket_id: i32, start_offset: i64) -> Result<()>Subscribe to a partition bucket
async fn subscribe_partition_buckets(&self, offsets: &HashMap<(i64, i32), i64>) -> Result<()>Subscribe to multiple partition-bucket pairs
async fn unsubscribe(&self, bucket_id: i32) -> Result<()>Unsubscribe from a bucket (non-partitioned tables)
async fn unsubscribe_partition(&self, partition_id: i64, bucket_id: i32) -> Result<()>Unsubscribe from a partition bucket (partitioned tables)
async fn poll(&self, timeout: Duration) -> Result<ScanRecords>Poll for records

RecordBatchLogScanner

MethodDescription
async fn subscribe(&self, bucket_id: i32, start_offset: i64) -> Result<()>Subscribe to a bucket
async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()>Subscribe to multiple buckets
async fn subscribe_partition(&self, partition_id: i64, bucket_id: i32, start_offset: i64) -> Result<()>Subscribe to a partition bucket
async fn subscribe_partition_buckets(&self, offsets: &HashMap<(i64, i32), i64>) -> Result<()>Subscribe to multiple partition-bucket pairs
async fn unsubscribe(&self, bucket_id: i32) -> Result<()>Unsubscribe from a bucket (non-partitioned tables)
async fn unsubscribe_partition(&self, partition_id: i64, bucket_id: i32) -> Result<()>Unsubscribe from a partition bucket (partitioned tables)
async fn poll(&self, timeout: Duration) -> Result<Vec<ScanBatch>>Poll for Arrow record batches
fn is_partitioned(&self) -> boolCheck if the table is partitioned
fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)>Get all current subscriptions as (bucket, offset) pairs

ScanRecord

MethodDescription
fn row(&self) -> &dyn InternalRowGet the row data
fn offset(&self) -> i64Record offset in the log
fn timestamp(&self) -> i64Record timestamp
fn change_type(&self) -> &ChangeTypeChange type (AppendOnly, Insert, etc.)

ScanRecords

MethodDescription
fn count(&self) -> usizeNumber of records
fn is_empty(&self) -> boolWhether the result set is empty
fn records(&self, bucket: &TableBucket) -> &[ScanRecord]Get records for a specific bucket
fn records_by_buckets(&self) -> &HashMap<TableBucket, Vec<ScanRecord>>Get all records grouped by bucket

ScanRecords also implements IntoIterator, so you can iterate over all records directly:

for record in records {
    println!("offset={}", record.offset());
}

ScanBatch

MethodDescription
fn bucket(&self) -> &TableBucketBucket this batch belongs to
fn batch(&self) -> &RecordBatchArrow RecordBatch data
fn base_offset(&self) -> i64First record offset
fn last_offset(&self) -> i64Last record offset
fn num_records(&self) -> usizeNumber of records in the batch

TableUpsert

MethodDescription
fn create_writer(&self) -> Result<UpsertWriter>Create an upsert writer
fn partial_update(&self, column_indices: Option<Vec<usize>>) -> Result<TableUpsert>Create a partial update builder by column indices
fn partial_update_with_column_names(&self, names: &[&str]) -> Result<TableUpsert>Create a partial update builder by column names

UpsertWriter

MethodDescription
fn upsert(&self, row: &impl InternalRow) -> Result<WriteResultFuture>Upsert a row (insert or update by PK)
fn delete(&self, row: &impl InternalRow) -> Result<WriteResultFuture>Delete a row by primary key
async fn flush(&self) -> Result<()>Flush all pending operations

TableLookup

MethodDescription
fn create_lookuper(&self) -> Result<Lookuper>Create a lookuper for point lookups

Lookuper

MethodDescription
async fn lookup(&mut self, key: &impl InternalRow) -> Result<LookupResult>Lookup a row by primary key

LookupResult

MethodDescription
fn get_single_row(&self) -> Result<Option<impl InternalRow>>Get a single row from the result
fn get_rows(&self) -> Vec<impl InternalRow>Get all rows from the result

WriteResultFuture

Description
Implements Future<Output = Result<(), Error>>. Await to wait for server acknowledgment. Returned by append(), upsert(), and delete().

Usage:

// Fire-and-forget (batched)
writer.append(&row)?;
writer.flush().await?;

// Per-record acknowledgment
writer.append(&row)?.await?;

Schema

MethodDescription
fn builder() -> SchemaBuilderCreate a schema builder
fn columns(&self) -> &[Column]Get all columns
fn primary_key(&self) -> Option<&PrimaryKey>Get primary key (None if no primary key)
fn column_names(&self) -> Vec<&str>Get all column names
fn primary_key_indexes(&self) -> Vec<usize>Get primary key column indices

SchemaBuilder

MethodDescription
fn column(name: &str, data_type: DataType) -> SelfAdd a column
fn primary_key(keys: Vec<&str>) -> SelfSet primary key columns
fn build() -> Result<Schema>Build the schema

TableDescriptor

MethodDescription
fn builder() -> TableDescriptorBuilderCreate a table descriptor builder
fn schema(&self) -> &SchemaGet the table schema
fn partition_keys(&self) -> &[String]Get partition key column names
fn has_primary_key(&self) -> boolCheck if the table has a primary key
fn properties(&self) -> &HashMap<String, String>Get all table properties
fn custom_properties(&self) -> &HashMap<String, String>Get custom properties
fn comment(&self) -> Option<&str>Get table comment

TableDescriptorBuilder

MethodDescription
fn schema(schema: Schema) -> SelfSet the schema
fn log_format(format: LogFormat) -> SelfSet log format (e.g., LogFormat::ARROW)
fn kv_format(format: KvFormat) -> SelfSet KV format (e.g., KvFormat::COMPACTED)
fn property(key: &str, value: &str) -> SelfSet a table property
fn custom_property(key: impl Into<String>, value: impl Into<String>) -> SelfSet a single custom property
fn custom_properties(properties: HashMap<impl Into<String>, impl Into<String>>) -> SelfSet custom properties
fn partitioned_by(keys: Vec<&str>) -> SelfSet partition columns
fn distributed_by(bucket_count: Option<i32>, bucket_keys: Vec<String>) -> SelfSet bucket distribution
fn comment(comment: &str) -> SelfSet table comment
fn build() -> Result<TableDescriptor>Build the table descriptor

TablePath

MethodDescription
TablePath::new(database: &str, table: &str) -> SelfCreate a table path
fn database(&self) -> &strGet database name
fn table(&self) -> &strGet table name

TableInfo

Field / MethodDescription
.table_pathTablePath -- Table path
.table_idi64 -- Table ID
.schema_idi32 -- Schema ID
.schemaSchema -- Table schema
.primary_keysVec<String> -- Primary key column names
.partition_keysVec<String> -- Partition key column names
.num_bucketsi32 -- Number of buckets
.propertiesHashMap<String, String> -- All table properties
.custom_propertiesHashMap<String, String> -- Custom properties only
.commentOption<String> -- Table comment
.created_timei64 -- Creation timestamp
.modified_timei64 -- Last modification timestamp

TableBucket

MethodDescription
TableBucket::new(table_id: i64, bucket_id: i32) -> SelfCreate a non-partitioned bucket
TableBucket::new_with_partition(table_id: i64, partition_id: Option<i64>, bucket_id: i32) -> SelfCreate a partitioned bucket
fn table_id(&self) -> i64Get table ID
fn partition_id(&self) -> Option<i64>Get partition ID (None if non-partitioned)
fn bucket_id(&self) -> i32Get bucket ID

PartitionSpec

MethodDescription
PartitionSpec::new(spec_map: HashMap<&str, &str>) -> SelfCreate from a map of partition column names to values
fn get_spec_map(&self) -> &HashMap<String, String>Get the partition spec map

PartitionInfo

MethodDescription
fn get_partition_id(&self) -> i64Get partition ID
fn get_partition_name(&self) -> StringGet partition name

DatabaseDescriptor

MethodDescription
fn builder() -> DatabaseDescriptorBuilderCreate a database descriptor builder
fn comment(&self) -> Option<&str>Get database comment
fn custom_properties(&self) -> &HashMap<String, String>Get custom properties

DatabaseDescriptorBuilder

MethodDescription
fn comment(comment: impl Into<String>) -> SelfSet database comment
fn custom_properties(properties: HashMap<impl Into<String>, impl Into<String>>) -> SelfSet custom properties
fn custom_property(key: impl Into<String>, value: impl Into<String>) -> SelfSet a single custom property
fn build() -> DatabaseDescriptorBuild the database descriptor

DatabaseInfo

MethodDescription
fn database_name(&self) -> &strGet database name
fn created_time(&self) -> i64Get creation timestamp
fn modified_time(&self) -> i64Get last modification timestamp
fn database_descriptor(&self) -> &DatabaseDescriptorGet the database descriptor

LakeSnapshot

FieldDescription
.snapshot_idi64 -- Snapshot ID
.table_buckets_offsetHashMap<TableBucket, i64> -- All bucket offsets

GenericRow<'a>

MethodDescription
GenericRow::new(field_count: usize) -> SelfCreate a new row with the given number of fields
fn set_field(&mut self, pos: usize, value: impl Into<Datum<'a>>)Set a field value by position
GenericRow::from_data(data: Vec<impl Into<Datum<'a>>>) -> SelfCreate a row from existing field data

Implements the InternalRow trait (see below).

InternalRow trait

MethodDescription
fn is_null_at(&self, idx: usize) -> Result<bool>Check if a field is null
fn get_boolean(&self, idx: usize) -> Result<bool>Get boolean value
fn get_byte(&self, idx: usize) -> Result<i8>Get tinyint value
fn get_short(&self, idx: usize) -> Result<i16>Get smallint value
fn get_int(&self, idx: usize) -> Result<i32>Get int value
fn get_long(&self, idx: usize) -> Result<i64>Get bigint value
fn get_float(&self, idx: usize) -> Result<f32>Get float value
fn get_double(&self, idx: usize) -> Result<f64>Get double value
fn get_string(&self, idx: usize) -> Result<&str>Get string value
fn get_decimal(&self, idx: usize, precision: usize, scale: usize) -> Result<Decimal>Get decimal value
fn get_date(&self, idx: usize) -> Result<Date>Get date value
fn get_time(&self, idx: usize) -> Result<Time>Get time value
fn get_timestamp_ntz(&self, idx: usize, precision: u32) -> Result<TimestampNtz>Get timestamp value
fn get_timestamp_ltz(&self, idx: usize, precision: u32) -> Result<TimestampLtz>Get timestamp with local timezone value
fn get_bytes(&self, idx: usize) -> Result<&[u8]>Get bytes value
fn get_binary(&self, idx: usize, length: usize) -> Result<&[u8]>Get fixed-length binary value
fn get_char(&self, idx: usize, length: usize) -> Result<&str>Get fixed-length char value

ChangeType

ValueShort StringDescription
ChangeType::AppendOnly+AAppend-only record
ChangeType::Insert+IInserted row
ChangeType::UpdateBefore-UPrevious value of an updated row
ChangeType::UpdateAfter+UNew value of an updated row
ChangeType::Delete-DDeleted row
MethodDescription
fn short_string(&self) -> &strGet the short string representation

OffsetSpec

VariantDescription
OffsetSpec::EarliestStart from the earliest available offset
OffsetSpec::LatestStart from the latest offset (only new records)
OffsetSpec::Timestamp(i64)Start from a specific timestamp in milliseconds

Constants

ConstantValueDescription
fluss::client::EARLIEST_OFFSET-2Start reading from the earliest available offset

To start reading from the latest offset (only new records), resolve the current offset via list_offsets before subscribing:

use fluss::rpc::message::OffsetSpec;

let offsets = admin.list_offsets(&table_path, &[0], OffsetSpec::Latest).await?;
let latest = offsets[&0];
log_scanner.subscribe(0, latest).await?;

DataTypes factory

MethodReturnsDescription
DataTypes::boolean()DataTypeBoolean type
DataTypes::tinyint()DataType8-bit signed integer
DataTypes::smallint()DataType16-bit signed integer
DataTypes::int()DataType32-bit signed integer
DataTypes::bigint()DataType64-bit signed integer
DataTypes::float()DataType32-bit floating point
DataTypes::double()DataType64-bit floating point
DataTypes::string()DataTypeVariable-length string
DataTypes::bytes()DataTypeVariable-length byte array
DataTypes::date()DataTypeDate (days since epoch)
DataTypes::time()DataTypeTime (milliseconds since midnight)
DataTypes::timestamp()DataTypeTimestamp without timezone
DataTypes::timestamp_ltz()DataTypeTimestamp with local timezone
DataTypes::decimal(precision: u32, scale: u32)DataTypeFixed-point decimal
DataTypes::char(length: u32)DataTypeFixed-length string
DataTypes::binary(length: usize)DataTypeFixed-length byte array
DataTypes::array(element: DataType)DataTypeArray of elements
DataTypes::map(key: DataType, value: DataType)DataTypeMap of key-value pairs
DataTypes::row(fields: Vec<DataField>)DataTypeNested row type

DataField

MethodDescription
DataField::new(name: impl Into<String>, data_type: DataType, description: Option<String>) -> DataFieldCreate a data field
fn name(&self) -> &strGet the field name