The Fluss Rust client uses a unified Error type and a Result<T> alias for all fallible operations.
use fluss::error::{Error, Result}; // All operations return Result<T> let conn = FlussConnection::new(config).await?; let admin = conn.get_admin().await?; let table = conn.get_table(&table_path).await?;
Use the ? operator to propagate errors, or match on specific variants for fine-grained handling.
use fluss::error::Error; match result { Ok(val) => { // handle success } Err(Error::RpcError { message, .. }) => { eprintln!("RPC failure: {}", message); } Err(Error::UnsupportedOperation { message }) => { eprintln!("Unsupported: {}", message); } Err(Error::FlussAPIError { api_error }) => { eprintln!("Server error: {}", api_error); } Err(e) => { eprintln!("Unexpected error: {}", e); } }
| Variant | Description |
|---|---|
UnexpectedError | General unexpected errors with a message and optional source |
IoUnexpectedError | I/O errors (network, file system) |
RemoteStorageUnexpectedError | Remote storage errors (OpenDAL backend failures) |
RpcError | RPC communication failures (connection refused, timeout) |
RowConvertError | Row conversion failures (type mismatch, invalid data) |
ArrowError | Arrow data handling errors (schema mismatch, encoding) |
IllegalArgument | Invalid arguments passed to an API method |
UnsupportedOperation | Operation not supported on the table type |
FlussAPIError | Server-side API errors returned by the Fluss cluster |
Server side errors are represented as FlussAPIError with a specific error code. Use the api_error() helper to match them ergonomically:
use fluss::error::FlussError; match result { Err(ref e) if e.api_error() == Some(FlussError::InvalidTableException) => { eprintln!("Invalid table: {}", e); } Err(ref e) if e.api_error() == Some(FlussError::PartitionNotExists) => { eprintln!("Partition does not exist: {}", e); } Err(ref e) if e.api_error() == Some(FlussError::LeaderNotAvailableException) => { eprintln!("Leader not available: {}", e); } Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => { eprintln!("Authentication failed: {}", e); } _ => {} }
Some errors are transient, where the server may be temporarily unavailable, mid-election, or under load. is_retriable() can be used for deciding to retry an operation rather than treating the error as permanent.
Error::is_retriable() is available directly on any Error value. RpcError is always retriable; FlussAPIError delegates to the server error code; all other variants return false.
use fluss::error::Error; match writer.append(&row) { Ok(_) => {} Err(ref e) if e.is_retriable() => { // Transient failure — safe to retry } Err(e) => { // Permanent failure — log and abort eprintln!("Fatal error: {}", e); } }
| Variant / Error | Code | Reason |
|---|---|---|
Error::RpcError | — | Network-level failure, always retriable |
FlussError::NetworkException | 1 | Server disconnected |
FlussError::CorruptMessage | 3 | CRC or size error |
FlussError::SchemaNotExist | 9 | Schema may not exist |
FlussError::LogStorageException | 10 | Transient log storage error |
FlussError::KvStorageException | 11 | Transient KV storage error |
FlussError::NotLeaderOrFollower | 12 | Leader election in progress |
FlussError::CorruptRecordException | 14 | Corrupt record |
FlussError::UnknownTableOrBucketException | 21 | Metadata not yet available |
FlussError::RequestTimeOut | 25 | Request timed out |
FlussError::StorageException | 26 | Transient storage error |
FlussError::NotEnoughReplicasAfterAppendException | 28 | Wrote to server but with low ISR size |
FlussError::NotEnoughReplicasException | 29 | Low ISR size at write time |
FlussError::LeaderNotAvailableException | 44 | No leader available for partition |
All other Error variants (e.g. RowConvertError, IllegalArgument, UnsupportedOperation) always return false from is_retriable().
The Fluss cluster is not running or the address is incorrect.
let result = FlussConnection::new(config).await; match result { Err(Error::RpcError { message, .. }) => { eprintln!("Cannot connect to cluster: {}", message); } _ => {} }
The table does not exist or has been dropped.
use fluss::error::{Error, FlussError}; // Admin operations return FlussError::TableNotExist (code 7) let result = admin.drop_table(&table_path, false).await; match result { Err(ref e) if e.api_error() == Some(FlussError::TableNotExist) => { eprintln!("Table not found: {}", e); } _ => {} } // conn.get_table() wraps the error differently, match on FlussAPIError directly let result = conn.get_table(&table_path).await; match result { Err(Error::FlussAPIError { ref api_error }) => { eprintln!("Server error (code {}): {}", api_error.code, api_error.message); } _ => {} }
The partition does not exist on a partitioned table.
use fluss::error::FlussError; let result = admin.drop_partition(&table_path, &spec, false).await; match result { Err(ref e) if e.api_error() == Some(FlussError::PartitionNotExists) => { eprintln!("Partition does not exist: {}", e); } _ => {} }
SASL credentials are incorrect or the user does not exist.
use fluss::error::{Error, FlussError}; let result = FlussConnection::new(config).await; match result { Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => { eprintln!("Authentication failed: {}", e); } _ => {} }
Row data does not match the expected table schema.
let result = writer.append(&row); match result { Err(Error::RowConvertError { .. }) => { eprintln!("Row does not match table schema"); } _ => {} }
Result<T> in Application CodeThe fluss::error::Result<T> type alias makes it easy to use Fluss errors with the ? operator in your application functions:
use fluss::error::Result; async fn my_pipeline() -> Result<()> { let conn = FlussConnection::new(config).await?; let admin = conn.get_admin().await?; let table = conn.get_table(&table_path).await?; let writer = table.new_append()?.create_writer()?; writer.append(&row)?; writer.flush().await?; Ok(()) }
For applications that use other error types alongside Fluss errors, you can convert with standard From / Into traits or use crates like anyhow:
use anyhow::Result; #[tokio::main] async fn main() -> Result<()> { let conn = FlussConnection::new(config).await?; // fluss::error::Error implements std::error::Error, // so it converts into anyhow::Error automatically Ok(()) }