The official Rust client library for Apache Fluss™ (Incubating) — a streaming storage built for real-time analytics, serving as the real-time data layer for Lakehouse architectures. This is a client SDK, not the Fluss server itself.
This repository contains:
fluss-rs — the Rust core client (crates.io: fluss-rs)fluss-rsFluss bridges the gap between streaming data and the data Lakehouse by enabling low-latency, high-throughput data ingestion and processing while seamlessly integrating with popular compute engines (Flink, Spark, Trino).
Key concepts:
fluss-rs)| Category | Capabilities |
|---|---|
| Connection | Bootstrap to Fluss cluster, SASL authentication, graceful shutdown |
| Admin | Create/drop/list databases & tables, manage partitions, list offsets |
| Log Tables | Append (single-row + Arrow RecordBatch), scan with subscribe/poll |
| KV Tables | Upsert, delete, point lookup, prefix lookup, partitioned KV support |
| Data Types | Int, BigInt, String, Float, Double, Boolean, Bytes, Decimal, Date, Time, Timestamp, TimestampLTZ, Char, Binary |
| Config | Batch sizing, buffering, retries, compression, timeouts, prefetch, concurrency |
| Storage | Memory, Filesystem, S3, OSS (via OpenDAL) |
| Observability | Connection, writer, and scanner metrics via the metrics facade (Prometheus, StatsD, etc.) |
| WASM | Compiles for wasm32 target |
| Language | Package / Build | Async Runtime | Data Format |
|---|---|---|---|
| Rust | fluss-rs (crates.io) | Tokio | Arrow RecordBatch / GenericRow |
| Python | Build from source (PyO3) | asyncio | PyArrow / Pandas / dict |
| C++ | CMake / Bazel (FFI) | Synchronous (Tokio internally) | Arrow RecordBatch / GenericRow |
| Elixir | Rustler NIFs | Erlang processes | Elixir values |
fluss-rust/ ├── crates/ │ ├── fluss/ # Core Rust client (fluss-rs) │ │ ├── src/client/ # Connection, Admin, Table, Scan, Upsert, Lookup │ │ ├── src/metadata/ # Schema, TableDescriptor, DataTypes, Partitions │ │ ├── src/row/ # GenericRow, InternalRow, Arrow integration │ │ ├── src/rpc/ # gRPC transport layer │ │ └── src/config.rs # Client configuration │ ├── examples/ # runnable examples (log, KV, partitioned, prefix lookup, metrics) │ └── fluss-test-cluster/ # Test harness for integration tests ├── bindings/ │ ├── python/ # Python binding (PyO3) │ ├── cpp/ # C++ binding (FFI + header) │ └── elixir/ # Elixir binding (Rustler NIF) ├── website/ # Docusaurus documentation site ├── docs/ # Supplementary documentation └── scripts/ # Release & version management
# Download and extract Fluss (0.8.0+) curl -LO https://dlcdn.apache.org/incubator/fluss/0.8.0/fluss-0.8.0-incubating-bin.tgz tar -xzf fluss-0.8.0-incubating-bin.tgz cd fluss-0.8.0-incubating/ # Start a local cluster ./bin/local-cluster.sh start
fluss-rs to Your Project[dependencies] fluss = { package = "fluss-rs", version = "0.2" } tokio = { version = "1", features = ["full"] }
use fluss::client::{EARLIEST_OFFSET, FlussConnection}; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{DataGetters, GenericRow}; use std::time::Duration; #[tokio::main] async fn main() -> Result<()> { let mut config = Config::default(); config.bootstrap_servers = "127.0.0.1:9123".to_string(); let conn = FlussConnection::new(config).await?; let admin = conn.get_admin()?; // Create a log table let table_path = TablePath::new("fluss", "events"); let schema = Schema::builder() .column("ts", DataTypes::bigint()) .column("message", DataTypes::string()) .build()?; let descriptor = TableDescriptor::builder().schema(schema).build()?; admin.create_table(&table_path, &descriptor, true).await?; // Append rows let table = conn.get_table(&table_path).await?; let writer = table.new_append()?.create_writer()?; let mut row = GenericRow::new(2); row.set_field(0, 1_700_000_000_000i64); row.set_field(1, "hello fluss"); writer.append(&row)?; writer.flush().await?; // Scan logs let scanner = table.new_scan().create_log_scanner()?; scanner.subscribe(0, EARLIEST_OFFSET).await?; loop { let records = scanner.poll(Duration::from_secs(5)).await?; for record in records { let row = record.row(); println!("offset={}, ts={}, message={}", record.offset(), row.get_long(0)?, row.get_string(1)?); } } }
use fluss::client::FlussConnection; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{DataGetters, GenericRow}; #[tokio::main] async fn main() -> Result<()> { let mut config = Config::default(); config.bootstrap_servers = "127.0.0.1:9123".to_string(); let conn = FlussConnection::new(config).await?; let admin = conn.get_admin()?; // Create a KV table let table_path = TablePath::new("fluss", "users"); let schema = Schema::builder() .column("id", DataTypes::int()) .column("name", DataTypes::string()) .column("score", DataTypes::bigint()) .primary_key(vec!["id"]) .build()?; let descriptor = TableDescriptor::builder().schema(schema).build()?; admin.create_table(&table_path, &descriptor, true).await?; // Upsert rows let table = conn.get_table(&table_path).await?; let writer = table.new_upsert()?.create_writer()?; for (id, name, score) in [(1, "Alice", 95i64), (2, "Bob", 87)] { let mut row = GenericRow::new(3); row.set_field(0, id); row.set_field(1, name); row.set_field(2, score); writer.upsert(&row)?; } writer.flush().await?; // Point lookup by primary key let lookuper = table.new_lookup()?.create_lookuper()?; let mut key = GenericRow::new(1); key.set_field(0, 1i32); if let Some(row) = lookuper.lookup(&key).await?.get_single_row()? { println!("id={}, name={}, score={}", row.get_int(0)?, row.get_string(1)?, row.get_long(2)?); } Ok(()) }
| Example | Description |
|---|---|
example-table | Log table: append + scan with Arrow batch |
example-upsert-lookup | KV table: upsert + point lookup |
example-partitioned-upsert-lookup | KV table with partitions |
example-prefix-lookup | Prefix lookup on bucket keys |
example-partitioned-prefix-lookup | Prefix lookup on partitioned tables |
example-prometheus-metrics | Expose client metrics on a Prometheus endpoint |
Build and run any example:
cargo build --example example-table --release ./target/release/examples/example-table
Config supports the following key options (all with sensible defaults):
| Option | Default | Description |
|---|---|---|
bootstrap_servers | 127.0.0.1:9123 | Fluss coordinator address |
writer_batch_size | 2 MB | Max batch size before flushing |
writer_batch_timeout_ms | 100 ms | Max time before auto-flush |
writer_buffer_memory_size | 64 MB | Total buffer memory for pending writes |
writer_retries | i32::MAX | Max write retries |
scanner_log_fetch_max_bytes | 16 MB | Max bytes per fetch request |
scanner_log_fetch_wait_max_time_ms | 500 ms | Max wait time for fetch |
scanner_remote_log_read_concurrency | 4 | Concurrency for remote log reads |
connect_timeout_ms | 120 s | Connection timeout |
security_sasl_username / security_sasl_password | — | SASL PLAIN authentication |
Configuration can be set programmatically or via CLI flags (using clap).
# Build cargo build # Run tests cargo test # Run integration tests (requires Docker for test cluster) cargo test --features integration_tests # Build C++ bindings cd bindings/cpp && mkdir build && cd build && cmake .. && cmake --build . # Build Python bindings cd bindings/python && maturin develop # Elixir tests cd bindings/elixir && mix test
This project is part of the Apache Fluss (Incubating) community. Contributions are welcome!
Licensed under the Apache License, Version 2.0.
Copyright 2025-2026 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (https://www.apache.org/).
Apache Fluss, Fluss, Apache, the Apache feather logo, and the Apache Fluss project logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.