blob: b7e4b4543698ab01df2acb9b1bc3eba46bf6c408 [file] [log] [blame] [view]
---
sidebar_position: 4
---
# Error Handling
The Fluss Rust client uses a unified `Error` type and a `Result<T>` alias for all fallible operations.
## Basic Usage
```rust
use fluss::error::{Error, Result};
// All operations return Result<T>
let conn = FlussConnection::new(config).await?;
let admin = conn.get_admin().await?;
let table = conn.get_table(&table_path).await?;
```
Use the `?` operator to propagate errors, or `match` on specific variants for fine-grained handling.
## Matching Error Variants
```rust
use fluss::error::Error;
match result {
Ok(val) => {
// handle success
}
Err(Error::RpcError { message, .. }) => {
eprintln!("RPC failure: {}", message);
}
Err(Error::UnsupportedOperation { message }) => {
eprintln!("Unsupported: {}", message);
}
Err(Error::FlussAPIError { api_error }) => {
eprintln!("Server error: {}", api_error);
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
}
}
```
## Error Variants
| Variant | Description |
|--------------------------------|--------------------------------------------------------------|
| `UnexpectedError` | General unexpected errors with a message and optional source |
| `IoUnexpectedError` | I/O errors (network, file system) |
| `RemoteStorageUnexpectedError` | Remote storage errors (OpenDAL backend failures) |
| `RpcError` | RPC communication failures (connection refused, timeout) |
| `RowConvertError` | Row conversion failures (type mismatch, invalid data) |
| `ArrowError` | Arrow data handling errors (schema mismatch, encoding) |
| `IllegalArgument` | Invalid arguments passed to an API method |
| `UnsupportedOperation` | Operation not supported on the table type |
| `FlussAPIError` | Server-side API errors returned by the Fluss cluster |
Server side errors are represented as `FlussAPIError` with a specific error code. Use the `api_error()` helper to match them ergonomically:
```rust
use fluss::error::FlussError;
match result {
Err(ref e) if e.api_error() == Some(FlussError::InvalidTableException) => {
eprintln!("Invalid table: {}", e);
}
Err(ref e) if e.api_error() == Some(FlussError::PartitionNotExists) => {
eprintln!("Partition does not exist: {}", e);
}
Err(ref e) if e.api_error() == Some(FlussError::LeaderNotAvailableException) => {
eprintln!("Leader not available: {}", e);
}
Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => {
eprintln!("Authentication failed: {}", e);
}
_ => {}
}
```
## Retry Logic
Some errors are transient, where the server may be temporarily unavailable, mid-election, or under load. `is_retriable()` can be used for deciding to retry an operation rather than treating the error as permanent.
`Error::is_retriable()` is available directly on any `Error` value. `RpcError` is always retriable; `FlussAPIError` delegates to the server error code; all other variants return `false`.
```rust
use fluss::error::Error;
match writer.append(&row) {
Ok(_) => {}
Err(ref e) if e.is_retriable() => {
// Transient failure — safe to retry
}
Err(e) => {
// Permanent failure — log and abort
eprintln!("Fatal error: {}", e);
}
}
```
### Retriable Variants
| Variant / Error | Code | Reason |
|----------------------------------------------|------|-------------------------------------------|
| `Error::RpcError` | | Network-level failure, always retriable |
| `FlussError::NetworkException` | 1 | Server disconnected |
| `FlussError::CorruptMessage` | 3 | CRC or size error |
| `FlussError::SchemaNotExist` | 9 | Schema may not exist |
| `FlussError::LogStorageException` | 10 | Transient log storage error |
| `FlussError::KvStorageException` | 11 | Transient KV storage error |
| `FlussError::NotLeaderOrFollower` | 12 | Leader election in progress |
| `FlussError::CorruptRecordException` | 14 | Corrupt record |
| `FlussError::UnknownTableOrBucketException` | 21 | Metadata not yet available |
| `FlussError::RequestTimeOut` | 25 | Request timed out |
| `FlussError::StorageException` | 26 | Transient storage error |
| `FlussError::NotEnoughReplicasAfterAppendException` | 28 | Wrote to server but with low ISR size |
| `FlussError::NotEnoughReplicasException` | 29 | Low ISR size at write time |
| `FlussError::LeaderNotAvailableException` | 44 | No leader available for partition |
All other `Error` variants (e.g. `RowConvertError`, `IllegalArgument`, `UnsupportedOperation`) always return `false` from `is_retriable()`.
## Common Error Scenarios
### Connection Refused
The Fluss cluster is not running or the address is incorrect.
```rust
let result = FlussConnection::new(config).await;
match result {
Err(Error::RpcError { message, .. }) => {
eprintln!("Cannot connect to cluster: {}", message);
}
_ => {}
}
```
### Table Not Found
The table does not exist or has been dropped.
```rust
use fluss::error::{Error, FlussError};
// Admin operations return FlussError::TableNotExist (code 7)
let result = admin.drop_table(&table_path, false).await;
match result {
Err(ref e) if e.api_error() == Some(FlussError::TableNotExist) => {
eprintln!("Table not found: {}", e);
}
_ => {}
}
// conn.get_table() wraps the error differently, match on FlussAPIError directly
let result = conn.get_table(&table_path).await;
match result {
Err(Error::FlussAPIError { ref api_error }) => {
eprintln!("Server error (code {}): {}", api_error.code, api_error.message);
}
_ => {}
}
```
### Partition Not Found
The partition does not exist on a partitioned table.
```rust
use fluss::error::FlussError;
let result = admin.drop_partition(&table_path, &spec, false).await;
match result {
Err(ref e) if e.api_error() == Some(FlussError::PartitionNotExists) => {
eprintln!("Partition does not exist: {}", e);
}
_ => {}
}
```
### Authentication Failed
SASL credentials are incorrect or the user does not exist.
```rust
use fluss::error::{Error, FlussError};
let result = FlussConnection::new(config).await;
match result {
Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => {
eprintln!("Authentication failed: {}", e);
}
_ => {}
}
```
### Schema Mismatch
Row data does not match the expected table schema.
```rust
let result = writer.append(&row);
match result {
Err(Error::RowConvertError { .. }) => {
eprintln!("Row does not match table schema");
}
_ => {}
}
```
## Using `Result<T>` in Application Code
The `fluss::error::Result<T>` type alias makes it easy to use Fluss errors with the `?` operator in your application functions:
```rust
use fluss::error::Result;
async fn my_pipeline() -> Result<()> {
let conn = FlussConnection::new(config).await?;
let admin = conn.get_admin().await?;
let table = conn.get_table(&table_path).await?;
let writer = table.new_append()?.create_writer()?;
writer.append(&row)?;
writer.flush().await?;
Ok(())
}
```
For applications that use other error types alongside Fluss errors, you can convert with standard `From` / `Into` traits or use crates like `anyhow`:
```rust
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()> {
let conn = FlussConnection::new(config).await?;
// fluss::error::Error implements std::error::Error,
// so it converts into anyhow::Error automatically
Ok(())
}
```