sidebar_position: 4

Error Handling

The Fluss Rust client uses a unified Error type and a Result<T> alias for all fallible operations.

Basic Usage

use fluss::error::{Error, Result};

// All operations return Result<T>
let conn = FlussConnection::new(config).await?;
let admin = conn.get_admin().await?;
let table = conn.get_table(&table_path).await?;

Use the ? operator to propagate errors, or match on specific variants for fine-grained handling.

Matching Error Variants

use fluss::error::Error;

match result {
    Ok(val) => {
        // handle success
    }
    Err(Error::RpcError { message, .. }) => {
        eprintln!("RPC failure: {}", message);
    }
    Err(Error::UnsupportedOperation { message }) => {
        eprintln!("Unsupported: {}", message);
    }
    Err(Error::FlussAPIError { api_error }) => {
        eprintln!("Server error: {}", api_error);
    }
    Err(e) => {
        eprintln!("Unexpected error: {}", e);
    }
}

Error Variants

VariantDescription
UnexpectedErrorGeneral unexpected errors with a message and optional source
IoUnexpectedErrorI/O errors (network, file system)
RemoteStorageUnexpectedErrorRemote storage errors (OpenDAL backend failures)
RpcErrorRPC communication failures (connection refused, timeout)
RowConvertErrorRow conversion failures (type mismatch, invalid data)
ArrowErrorArrow data handling errors (schema mismatch, encoding)
IllegalArgumentInvalid arguments passed to an API method
UnsupportedOperationOperation not supported on the table type
FlussAPIErrorServer-side API errors returned by the Fluss cluster

Server side errors are represented as FlussAPIError with a specific error code. Use the api_error() helper to match them ergonomically:

use fluss::error::FlussError;

match result {
    Err(ref e) if e.api_error() == Some(FlussError::InvalidTableException) => {
        eprintln!("Invalid table: {}", e);
    }
    Err(ref e) if e.api_error() == Some(FlussError::PartitionNotExists) => {
        eprintln!("Partition does not exist: {}", e);
    }
    Err(ref e) if e.api_error() == Some(FlussError::LeaderNotAvailableException) => {
        eprintln!("Leader not available: {}", e);
    }
    Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => {
        eprintln!("Authentication failed: {}", e);
    }
    _ => {}
}

Retry Logic

Some errors are transient, where the server may be temporarily unavailable, mid-election, or under load. is_retriable() can be used for deciding to retry an operation rather than treating the error as permanent.

Error::is_retriable() is available directly on any Error value. RpcError is always retriable; FlussAPIError delegates to the server error code; all other variants return false.

use fluss::error::Error;

match writer.append(&row) {
    Ok(_) => {}
    Err(ref e) if e.is_retriable() => {
        // Transient failure — safe to retry
    }
    Err(e) => {
        // Permanent failure — log and abort
        eprintln!("Fatal error: {}", e);
    }
}

Retriable Variants

Variant / ErrorCodeReason
Error::RpcErrorNetwork-level failure, always retriable
FlussError::NetworkException1Server disconnected
FlussError::CorruptMessage3CRC or size error
FlussError::SchemaNotExist9Schema may not exist
FlussError::LogStorageException10Transient log storage error
FlussError::KvStorageException11Transient KV storage error
FlussError::NotLeaderOrFollower12Leader election in progress
FlussError::CorruptRecordException14Corrupt record
FlussError::UnknownTableOrBucketException21Metadata not yet available
FlussError::RequestTimeOut25Request timed out
FlussError::StorageException26Transient storage error
FlussError::NotEnoughReplicasAfterAppendException28Wrote to server but with low ISR size
FlussError::NotEnoughReplicasException29Low ISR size at write time
FlussError::LeaderNotAvailableException44No leader available for partition

All other Error variants (e.g. RowConvertError, IllegalArgument, UnsupportedOperation) always return false from is_retriable().

Common Error Scenarios

Connection Refused

The Fluss cluster is not running or the address is incorrect.

let result = FlussConnection::new(config).await;
match result {
    Err(Error::RpcError { message, .. }) => {
        eprintln!("Cannot connect to cluster: {}", message);
    }
    _ => {}
}

Table Not Found

The table does not exist or has been dropped.

use fluss::error::{Error, FlussError};

// Admin operations return FlussError::TableNotExist (code 7)
let result = admin.drop_table(&table_path, false).await;
match result {
    Err(ref e) if e.api_error() == Some(FlussError::TableNotExist) => {
        eprintln!("Table not found: {}", e);
    }
    _ => {}
}

// conn.get_table() wraps the error differently, match on FlussAPIError directly
let result = conn.get_table(&table_path).await;
match result {
    Err(Error::FlussAPIError { ref api_error }) => {
        eprintln!("Server error (code {}): {}", api_error.code, api_error.message);
    }
    _ => {}
}

Partition Not Found

The partition does not exist on a partitioned table.

use fluss::error::FlussError;

let result = admin.drop_partition(&table_path, &spec, false).await;
match result {
    Err(ref e) if e.api_error() == Some(FlussError::PartitionNotExists) => {
        eprintln!("Partition does not exist: {}", e);
    }
    _ => {}
}

Authentication Failed

SASL credentials are incorrect or the user does not exist.

use fluss::error::{Error, FlussError};

let result = FlussConnection::new(config).await;
match result {
    Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => {
        eprintln!("Authentication failed: {}", e);
    }
    _ => {}
}

Schema Mismatch

Row data does not match the expected table schema.

let result = writer.append(&row);
match result {
    Err(Error::RowConvertError { .. }) => {
        eprintln!("Row does not match table schema");
    }
    _ => {}
}

Using Result<T> in Application Code

The fluss::error::Result<T> type alias makes it easy to use Fluss errors with the ? operator in your application functions:

use fluss::error::Result;

async fn my_pipeline() -> Result<()> {
    let conn = FlussConnection::new(config).await?;
    let admin = conn.get_admin().await?;
    let table = conn.get_table(&table_path).await?;
    let writer = table.new_append()?.create_writer()?;
    writer.append(&row)?;
    writer.flush().await?;
    Ok(())
}

For applications that use other error types alongside Fluss errors, you can convert with standard From / Into traits or use crates like anyhow:

use anyhow::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let conn = FlussConnection::new(config).await?;
    // fluss::error::Error implements std::error::Error,
    // so it converts into anyhow::Error automatically
    Ok(())
}