Complete API reference for the Fluss Rust client.
Config| Field | Type | Default | Description |
|---|---|---|---|
bootstrap_servers | String | "127.0.0.1:9123" | Coordinator server address |
writer_request_max_size | i32 | 10485760 (10 MB) | Maximum request size in bytes |
writer_acks | String | "all" | Acknowledgment setting ("all" waits for all replicas) |
writer_retries | i32 | i32::MAX | Number of retries on failure |
writer_batch_size | i32 | 2097152 (2 MB) | Batch size for writes in bytes |
writer_batch_timeout_ms | i64 | 100 | Maximum time in ms to wait for a writer batch to fill up before sending |
writer_bucket_no_key_assigner | NoKeyAssigner | sticky | Bucket assignment strategy for tables without bucket keys: sticky or round_robin |
scanner_remote_log_prefetch_num | usize | 4 | Number of remote log segments to prefetch |
remote_file_download_thread_num | usize | 3 | Number of threads for remote log downloads |
scanner_remote_log_read_concurrency | usize | 4 | Streaming read concurrency within a remote log file |
scanner_log_max_poll_records | usize | 500 | Maximum number of records returned in a single poll() |
scanner_log_fetch_max_bytes | i32 | 16777216 (16 MB) | Maximum bytes per fetch response for LogScanner |
scanner_log_fetch_min_bytes | i32 | 1 | Minimum bytes the server must accumulate before returning a fetch response |
scanner_log_fetch_wait_max_time_ms | i32 | 500 | Maximum time (ms) the server may wait to satisfy min-bytes |
scanner_log_fetch_max_bytes_for_bucket | i32 | 1048576 (1 MB) | Maximum bytes per fetch response per bucket for LogScanner |
connect_timeout_ms | u64 | 120000 | TCP connect timeout in milliseconds |
security_protocol | String | "PLAINTEXT" | PLAINTEXT (default) or sasl for SASL auth |
security_sasl_mechanism | String | "PLAIN" | SASL mechanism (only PLAIN is supported) |
security_sasl_username | String | (empty) | SASL username (required when protocol is sasl) |
security_sasl_password | String | (empty) | SASL password (required when protocol is sasl) |
FlussConnection| Method | Description |
|---|---|
async fn new(config: Config) -> Result<Self> | Create a new connection to a Fluss cluster |
async fn get_admin(&self) -> Result<FlussAdmin> | Get the admin interface for cluster management |
async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>> | Get a table for read/write operations |
fn config(&self) -> &Config | Get a reference to the connection config |
FlussAdmin| Method | Description |
|---|---|
async fn create_database(&self, name: &str, descriptor: Option<&DatabaseDescriptor>, ignore_if_exists: bool) -> Result<()> | Create a database |
async fn drop_database(&self, name: &str, ignore_if_not_exists: bool, cascade: bool) -> Result<()> | Drop a database |
async fn list_databases(&self) -> Result<Vec<String>> | List all databases |
async fn database_exists(&self, name: &str) -> Result<bool> | Check if a database exists |
async fn get_database_info(&self, name: &str) -> Result<DatabaseInfo> | Get database metadata |
| Method | Description |
|---|---|
async fn create_table(&self, table_path: &TablePath, descriptor: &TableDescriptor, ignore_if_exists: bool) -> Result<()> | Create a table |
async fn drop_table(&self, table_path: &TablePath, ignore_if_not_exists: bool) -> Result<()> | Drop a table |
async fn get_table_info(&self, table_path: &TablePath) -> Result<TableInfo> | Get table metadata |
async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> | List tables in a database |
async fn table_exists(&self, table_path: &TablePath) -> Result<bool> | Check if a table exists |
| Method | Description |
|---|---|
async fn list_partition_infos(&self, table_path: &TablePath) -> Result<Vec<PartitionInfo>> | List all partitions |
async fn list_partition_infos_with_spec(&self, table_path: &TablePath, spec: Option<&PartitionSpec>) -> Result<Vec<PartitionInfo>> | List partitions matching a spec |
async fn create_partition(&self, table_path: &TablePath, spec: &PartitionSpec, ignore_if_exists: bool) -> Result<()> | Create a partition |
async fn drop_partition(&self, table_path: &TablePath, spec: &PartitionSpec, ignore_if_not_exists: bool) -> Result<()> | Drop a partition |
| Method | Description |
|---|---|
async fn list_offsets(&self, table_path: &TablePath, bucket_ids: &[i32], offset_spec: OffsetSpec) -> Result<HashMap<i32, i64>> | Get offsets for buckets |
async fn list_partition_offsets(&self, table_path: &TablePath, partition_name: &str, bucket_ids: &[i32], offset_spec: OffsetSpec) -> Result<HashMap<i32, i64>> | Get offsets for a partition's buckets |
| Method | Description |
|---|---|
async fn get_latest_lake_snapshot(&self, table_path: &TablePath) -> Result<LakeSnapshot> | Get the latest lake snapshot |
| Method | Description |
|---|---|
async fn get_server_nodes(&self) -> Result<Vec<ServerNode>> | Get all alive server nodes (coordinator + tablets) |
ServerNode| Method | Description |
|---|---|
fn id(&self) -> i32 | Server node ID |
fn host(&self) -> &str | Hostname of the server |
fn port(&self) -> u32 | Port number |
fn server_type(&self) -> &ServerType | Server type (CoordinatorServer or TabletServer) |
fn uid(&self) -> &str | Unique identifier (e.g. "cs-0", "ts-1") |
FlussTable<'a>| Method | Description |
|---|---|
fn get_table_info(&self) -> &TableInfo | Get table metadata |
fn new_append(&self) -> Result<TableAppend> | Create an append builder for log tables |
fn new_scan(&self) -> TableScan<'_> | Create a scan builder |
fn new_lookup(&self) -> Result<TableLookup> | Create a lookup builder for PK tables |
fn new_upsert(&self) -> Result<TableUpsert> | Create an upsert builder for PK tables |
fn has_primary_key(&self) -> bool | Check if the table has a primary key |
fn table_path(&self) -> &TablePath | Get the table path |
TableAppend| Method | Description |
|---|---|
fn create_writer(&self) -> Result<AppendWriter> | Create an append writer |
AppendWriter| Method | Description |
|---|---|
fn append(&self, row: &impl InternalRow) -> Result<WriteResultFuture> | Append a row; returns a future for acknowledgment |
fn append_arrow_batch(&self, batch: RecordBatch) -> Result<WriteResultFuture> | Append an Arrow RecordBatch |
async fn flush(&self) -> Result<()> | Flush all pending writes to the server |
TableScan<'a>| Method | Description |
|---|---|
fn project(self, indices: &[usize]) -> Result<Self> | Project columns by index |
fn project_by_name(self, names: &[&str]) -> Result<Self> | Project columns by name |
fn create_log_scanner(self) -> Result<LogScanner> | Create a record-based log scanner |
fn create_record_batch_log_scanner(self) -> Result<RecordBatchLogScanner> | Create an Arrow batch-based log scanner |
LogScanner| Method | Description |
|---|---|
async fn subscribe(&self, bucket_id: i32, start_offset: i64) -> Result<()> | Subscribe to a bucket |
async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> | Subscribe to multiple buckets |
async fn subscribe_partition(&self, partition_id: i64, bucket_id: i32, start_offset: i64) -> Result<()> | Subscribe to a partition bucket |
async fn subscribe_partition_buckets(&self, offsets: &HashMap<(i64, i32), i64>) -> Result<()> | Subscribe to multiple partition-bucket pairs |
async fn unsubscribe(&self, bucket_id: i32) -> Result<()> | Unsubscribe from a bucket (non-partitioned tables) |
async fn unsubscribe_partition(&self, partition_id: i64, bucket_id: i32) -> Result<()> | Unsubscribe from a partition bucket (partitioned tables) |
async fn poll(&self, timeout: Duration) -> Result<ScanRecords> | Poll for records |
RecordBatchLogScanner| Method | Description |
|---|---|
async fn subscribe(&self, bucket_id: i32, start_offset: i64) -> Result<()> | Subscribe to a bucket |
async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> | Subscribe to multiple buckets |
async fn subscribe_partition(&self, partition_id: i64, bucket_id: i32, start_offset: i64) -> Result<()> | Subscribe to a partition bucket |
async fn subscribe_partition_buckets(&self, offsets: &HashMap<(i64, i32), i64>) -> Result<()> | Subscribe to multiple partition-bucket pairs |
async fn unsubscribe(&self, bucket_id: i32) -> Result<()> | Unsubscribe from a bucket (non-partitioned tables) |
async fn unsubscribe_partition(&self, partition_id: i64, bucket_id: i32) -> Result<()> | Unsubscribe from a partition bucket (partitioned tables) |
async fn poll(&self, timeout: Duration) -> Result<Vec<ScanBatch>> | Poll for Arrow record batches |
fn is_partitioned(&self) -> bool | Check if the table is partitioned |
fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> | Get all current subscriptions as (bucket, offset) pairs |
ScanRecord| Method | Description |
|---|---|
fn row(&self) -> &dyn InternalRow | Get the row data |
fn offset(&self) -> i64 | Record offset in the log |
fn timestamp(&self) -> i64 | Record timestamp |
fn change_type(&self) -> &ChangeType | Change type (AppendOnly, Insert, etc.) |
ScanRecords| Method | Description |
|---|---|
fn count(&self) -> usize | Number of records |
fn is_empty(&self) -> bool | Whether the result set is empty |
fn records(&self, bucket: &TableBucket) -> &[ScanRecord] | Get records for a specific bucket |
fn records_by_buckets(&self) -> &HashMap<TableBucket, Vec<ScanRecord>> | Get all records grouped by bucket |
ScanRecords also implements IntoIterator, so you can iterate over all records directly:
for record in records { println!("offset={}", record.offset()); }
ScanBatch| Method | Description |
|---|---|
fn bucket(&self) -> &TableBucket | Bucket this batch belongs to |
fn batch(&self) -> &RecordBatch | Arrow RecordBatch data |
fn base_offset(&self) -> i64 | First record offset |
fn last_offset(&self) -> i64 | Last record offset |
fn num_records(&self) -> usize | Number of records in the batch |
TableUpsert| Method | Description |
|---|---|
fn create_writer(&self) -> Result<UpsertWriter> | Create an upsert writer |
fn partial_update(&self, column_indices: Option<Vec<usize>>) -> Result<TableUpsert> | Create a partial update builder by column indices |
fn partial_update_with_column_names(&self, names: &[&str]) -> Result<TableUpsert> | Create a partial update builder by column names |
UpsertWriter| Method | Description |
|---|---|
fn upsert(&self, row: &impl InternalRow) -> Result<WriteResultFuture> | Upsert a row (insert or update by PK) |
fn delete(&self, row: &impl InternalRow) -> Result<WriteResultFuture> | Delete a row by primary key |
async fn flush(&self) -> Result<()> | Flush all pending operations |
TableLookup| Method | Description |
|---|---|
fn create_lookuper(&self) -> Result<Lookuper> | Create a lookuper for point lookups |
Lookuper| Method | Description |
|---|---|
async fn lookup(&mut self, key: &impl InternalRow) -> Result<LookupResult> | Lookup a row by primary key |
LookupResult| Method | Description |
|---|---|
fn get_single_row(&self) -> Result<Option<impl InternalRow>> | Get a single row from the result |
fn get_rows(&self) -> Vec<impl InternalRow> | Get all rows from the result |
WriteResultFuture| Description |
|---|
Implements Future<Output = Result<(), Error>>. Await to wait for server acknowledgment. Returned by append(), upsert(), and delete(). |
Usage:
// Fire-and-forget (batched) writer.append(&row)?; writer.flush().await?; // Per-record acknowledgment writer.append(&row)?.await?;
Schema| Method | Description |
|---|---|
fn builder() -> SchemaBuilder | Create a schema builder |
fn columns(&self) -> &[Column] | Get all columns |
fn primary_key(&self) -> Option<&PrimaryKey> | Get primary key (None if no primary key) |
fn column_names(&self) -> Vec<&str> | Get all column names |
fn primary_key_indexes(&self) -> Vec<usize> | Get primary key column indices |
SchemaBuilder| Method | Description |
|---|---|
fn column(name: &str, data_type: DataType) -> Self | Add a column |
fn primary_key(keys: Vec<&str>) -> Self | Set primary key columns |
fn build() -> Result<Schema> | Build the schema |
TableDescriptor| Method | Description |
|---|---|
fn builder() -> TableDescriptorBuilder | Create a table descriptor builder |
fn schema(&self) -> &Schema | Get the table schema |
fn partition_keys(&self) -> &[String] | Get partition key column names |
fn has_primary_key(&self) -> bool | Check if the table has a primary key |
fn properties(&self) -> &HashMap<String, String> | Get all table properties |
fn custom_properties(&self) -> &HashMap<String, String> | Get custom properties |
fn comment(&self) -> Option<&str> | Get table comment |
TableDescriptorBuilder| Method | Description |
|---|---|
fn schema(schema: Schema) -> Self | Set the schema |
fn log_format(format: LogFormat) -> Self | Set log format (e.g., LogFormat::ARROW) |
fn kv_format(format: KvFormat) -> Self | Set KV format (e.g., KvFormat::COMPACTED) |
fn property(key: &str, value: &str) -> Self | Set a table property |
fn custom_property(key: impl Into<String>, value: impl Into<String>) -> Self | Set a single custom property |
fn custom_properties(properties: HashMap<impl Into<String>, impl Into<String>>) -> Self | Set custom properties |
fn partitioned_by(keys: Vec<&str>) -> Self | Set partition columns |
fn distributed_by(bucket_count: Option<i32>, bucket_keys: Vec<String>) -> Self | Set bucket distribution |
fn comment(comment: &str) -> Self | Set table comment |
fn build() -> Result<TableDescriptor> | Build the table descriptor |
TablePath| Method | Description |
|---|---|
TablePath::new(database: &str, table: &str) -> Self | Create a table path |
fn database(&self) -> &str | Get database name |
fn table(&self) -> &str | Get table name |
TableInfo| Field / Method | Description |
|---|---|
.table_path | TablePath -- Table path |
.table_id | i64 -- Table ID |
.schema_id | i32 -- Schema ID |
.schema | Schema -- Table schema |
.primary_keys | Vec<String> -- Primary key column names |
.partition_keys | Vec<String> -- Partition key column names |
.num_buckets | i32 -- Number of buckets |
.properties | HashMap<String, String> -- All table properties |
.custom_properties | HashMap<String, String> -- Custom properties only |
.comment | Option<String> -- Table comment |
.created_time | i64 -- Creation timestamp |
.modified_time | i64 -- Last modification timestamp |
TableBucket| Method | Description |
|---|---|
TableBucket::new(table_id: i64, bucket_id: i32) -> Self | Create a non-partitioned bucket |
TableBucket::new_with_partition(table_id: i64, partition_id: Option<i64>, bucket_id: i32) -> Self | Create a partitioned bucket |
fn table_id(&self) -> i64 | Get table ID |
fn partition_id(&self) -> Option<i64> | Get partition ID (None if non-partitioned) |
fn bucket_id(&self) -> i32 | Get bucket ID |
PartitionSpec| Method | Description |
|---|---|
PartitionSpec::new(spec_map: HashMap<&str, &str>) -> Self | Create from a map of partition column names to values |
fn get_spec_map(&self) -> &HashMap<String, String> | Get the partition spec map |
PartitionInfo| Method | Description |
|---|---|
fn get_partition_id(&self) -> i64 | Get partition ID |
fn get_partition_name(&self) -> String | Get partition name |
DatabaseDescriptor| Method | Description |
|---|---|
fn builder() -> DatabaseDescriptorBuilder | Create a database descriptor builder |
fn comment(&self) -> Option<&str> | Get database comment |
fn custom_properties(&self) -> &HashMap<String, String> | Get custom properties |
DatabaseDescriptorBuilder| Method | Description |
|---|---|
fn comment(comment: impl Into<String>) -> Self | Set database comment |
fn custom_properties(properties: HashMap<impl Into<String>, impl Into<String>>) -> Self | Set custom properties |
fn custom_property(key: impl Into<String>, value: impl Into<String>) -> Self | Set a single custom property |
fn build() -> DatabaseDescriptor | Build the database descriptor |
DatabaseInfo| Method | Description |
|---|---|
fn database_name(&self) -> &str | Get database name |
fn created_time(&self) -> i64 | Get creation timestamp |
fn modified_time(&self) -> i64 | Get last modification timestamp |
fn database_descriptor(&self) -> &DatabaseDescriptor | Get the database descriptor |
LakeSnapshot| Field | Description |
|---|---|
.snapshot_id | i64 -- Snapshot ID |
.table_buckets_offset | HashMap<TableBucket, i64> -- All bucket offsets |
GenericRow<'a>| Method | Description |
|---|---|
GenericRow::new(field_count: usize) -> Self | Create a new row with the given number of fields |
fn set_field(&mut self, pos: usize, value: impl Into<Datum<'a>>) | Set a field value by position |
GenericRow::from_data(data: Vec<impl Into<Datum<'a>>>) -> Self | Create a row from existing field data |
Implements the InternalRow trait (see below).
InternalRow trait| Method | Description |
|---|---|
fn is_null_at(&self, idx: usize) -> Result<bool> | Check if a field is null |
fn get_boolean(&self, idx: usize) -> Result<bool> | Get boolean value |
fn get_byte(&self, idx: usize) -> Result<i8> | Get tinyint value |
fn get_short(&self, idx: usize) -> Result<i16> | Get smallint value |
fn get_int(&self, idx: usize) -> Result<i32> | Get int value |
fn get_long(&self, idx: usize) -> Result<i64> | Get bigint value |
fn get_float(&self, idx: usize) -> Result<f32> | Get float value |
fn get_double(&self, idx: usize) -> Result<f64> | Get double value |
fn get_string(&self, idx: usize) -> Result<&str> | Get string value |
fn get_decimal(&self, idx: usize, precision: usize, scale: usize) -> Result<Decimal> | Get decimal value |
fn get_date(&self, idx: usize) -> Result<Date> | Get date value |
fn get_time(&self, idx: usize) -> Result<Time> | Get time value |
fn get_timestamp_ntz(&self, idx: usize, precision: u32) -> Result<TimestampNtz> | Get timestamp value |
fn get_timestamp_ltz(&self, idx: usize, precision: u32) -> Result<TimestampLtz> | Get timestamp with local timezone value |
fn get_bytes(&self, idx: usize) -> Result<&[u8]> | Get bytes value |
fn get_binary(&self, idx: usize, length: usize) -> Result<&[u8]> | Get fixed-length binary value |
fn get_char(&self, idx: usize, length: usize) -> Result<&str> | Get fixed-length char value |
ChangeType| Value | Short String | Description |
|---|---|---|
ChangeType::AppendOnly | +A | Append-only record |
ChangeType::Insert | +I | Inserted row |
ChangeType::UpdateBefore | -U | Previous value of an updated row |
ChangeType::UpdateAfter | +U | New value of an updated row |
ChangeType::Delete | -D | Deleted row |
| Method | Description |
|---|---|
fn short_string(&self) -> &str | Get the short string representation |
OffsetSpec| Variant | Description |
|---|---|
OffsetSpec::Earliest | Start from the earliest available offset |
OffsetSpec::Latest | Start from the latest offset (only new records) |
OffsetSpec::Timestamp(i64) | Start from a specific timestamp in milliseconds |
| Constant | Value | Description |
|---|---|---|
fluss::client::EARLIEST_OFFSET | -2 | Start reading from the earliest available offset |
To start reading from the latest offset (only new records), resolve the current offset via list_offsets before subscribing:
use fluss::rpc::message::OffsetSpec; let offsets = admin.list_offsets(&table_path, &[0], OffsetSpec::Latest).await?; let latest = offsets[&0]; log_scanner.subscribe(0, latest).await?;
DataTypes factory| Method | Returns | Description |
|---|---|---|
DataTypes::boolean() | DataType | Boolean type |
DataTypes::tinyint() | DataType | 8-bit signed integer |
DataTypes::smallint() | DataType | 16-bit signed integer |
DataTypes::int() | DataType | 32-bit signed integer |
DataTypes::bigint() | DataType | 64-bit signed integer |
DataTypes::float() | DataType | 32-bit floating point |
DataTypes::double() | DataType | 64-bit floating point |
DataTypes::string() | DataType | Variable-length string |
DataTypes::bytes() | DataType | Variable-length byte array |
DataTypes::date() | DataType | Date (days since epoch) |
DataTypes::time() | DataType | Time (milliseconds since midnight) |
DataTypes::timestamp() | DataType | Timestamp without timezone |
DataTypes::timestamp_ltz() | DataType | Timestamp with local timezone |
DataTypes::decimal(precision: u32, scale: u32) | DataType | Fixed-point decimal |
DataTypes::char(length: u32) | DataType | Fixed-length string |
DataTypes::binary(length: usize) | DataType | Fixed-length byte array |
DataTypes::array(element: DataType) | DataType | Array of elements |
DataTypes::map(key: DataType, value: DataType) | DataType | Map of key-value pairs |
DataTypes::row(fields: Vec<DataField>) | DataType | Nested row type |
DataField| Method | Description |
|---|---|
DataField::new(name: impl Into<String>, data_type: DataType, description: Option<String>) -> DataField | Create a data field |
fn name(&self) -> &str | Get the field name |