Rust Client for Apache Fluss (Incubating)

Clone this repo:

Branches

  1. 364b824 [github] Update GitHub issue template by Jark Wu · 21 hours ago main
  2. 5031d12 Initial Commit by Jark Wu · 21 hours ago
  3. 8f30d43 Initial Commit by Jark Wu · 21 hours ago

Apache Fluss™ Rust (Incubating)

Experimental

Rust implementation of Apache Fluss™.

Why Fluss?

Fluss is a streaming storage built for real-time analytics which can serve as the real-time data layer for Lakehouse architectures. It bridges the gap between streaming data and the data Lakehouse by enabling low-latency, high-throughput data ingestion and processing while seamlessly integrating with popular compute engines.

Why Fluss Rust Client

It's an unofficial experimental Rust client for interacting with Fluss. This client provides foundational capabilities for table management and log streaming operations, enabling developers to explore Fluss within Rust ecosystems.

Quick-Start

Step1 Start Fluss cluster

Requirements

Fluss runs on all UNIX-like environments, e.g. Linux, Mac OS X. Before you start to setup the system, make sure you have the following software installed on your test machine:

Java 17 or higher (Java 8 and Java 11 are not recommended) If your cluster does not fulfill these software requirements you will need to install/upgrade it.

Fluss requires the JAVA_HOME environment variable to be set on all nodes and point to the directory of your Java installation.

Fluss Setup

Go to the downloads page and download the Fluss-0.6.0. Make sure to pick the Fluss package matching your Java version. After downloading the latest release, extract it:

tar -xzf fluss-0.7-SNAPSHOT-bin.tgz
cd fluss-0.7-SNAPSHOT/

You can start Fluss local cluster by running the following command:

./bin/local-cluster.sh start

After that, the Fluss local cluster is started.

Run Provided Example

Only supports Linux or macOs. You will need to install Rust firstly.

After that, go the project directory, build it and run the example:

cargo build --example example-table --release
cd target/release/examples
./example-table

The example code is as follows:

#[tokio::main]
pub async fn main() -> Result<()> {
    // 1: create the table;
    let mut args = Args::default();
    args.bootstrap_server = "127.0.0.1:9123".to_string();
    let conn_config = ConnectionConfig::from_args(args);
    let conn = FlussConnection::new(conn_config).await;

    let admin = conn.get_admin();

    let table_descriptor = TableDescriptor::builder()
        .schema(
            Schema::builder()
                .column("c1", DataTypes::int())
                .column("c2", DataTypes::string())
                .build(),
        )
        .build();

    let table_path = TablePath::new("fluss".to_owned(), "rust_test".to_owned());

    admin
        .create_table(&table_path, &table_descriptor, true)
        .await
        .unwrap();

    // 2: get the table
    let table_info = admin.get_table(&table_path).await.unwrap();
    print!("Get created table:\n {}\n", table_info);

    // let's sleep 2 seconds to wait leader ready
    thread::sleep(Duration::from_secs(2));

    // 3: append log to the table
    let table = conn.get_table(&table_path).await;
    let append_writer = table.new_append().create_writer();
    let batch = record_batch!(("c1", Int32, [1, 2, 3, 4, 5, 6]), ("c2", Utf8, ["a1", "a2", "a3", "a4", "a5", "a6"])).unwrap();
    append_writer.append(batch).await?;
    println!("Start to scan log records......");
    // 4: scan the records
    let log_scanner = table.new_scan().create_log_scanner();
    log_scanner.subscribe(0, 0).await;

    loop {
        let scan_records = log_scanner.poll(Duration::from_secs(10)).await?;
        println!("Start to poll records......");
        for record in scan_records {
            let row = record.row();
            println!(
                "{{{}, {}}}@{}",
                row.get_int(0),
                row.get_string(1),
                record.offset()
            );
        }
    }
    Ok(())
}

You can change it according to your needs, have fun!

Clear environment

Then, stop your Fluss cluster. Go to your Fluss home, stop it via the following commands:

./bin/local-cluster.sh stop

License

Licensed under the Apache License, Version 2.0