Minimal working examples: connect to Fluss, create a table, write data, and read it back.
use fluss::client::FlussConnection; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; use std::time::Duration; #[tokio::main] async fn main() -> Result<()> { // Connect let mut config = Config::default(); config.bootstrap_servers = "127.0.0.1:9123".to_string(); let conn = FlussConnection::new(config).await?; let admin = conn.get_admin().await?; // Create a log table let table_path = TablePath::new("fluss", "quickstart_rust"); let descriptor = TableDescriptor::builder() .schema( Schema::builder() .column("id", DataTypes::int()) .column("name", DataTypes::string()) .build()?, ) .build()?; admin.create_table(&table_path, &descriptor, true).await?; // Write let table = conn.get_table(&table_path).await?; let writer = table.new_append()?.create_writer()?; let mut row = GenericRow::new(2); row.set_field(0, 1); row.set_field(1, "hello"); writer.append(&row)?; writer.flush().await?; // Read let scanner = table.new_scan().create_log_scanner()?; scanner.subscribe(0, 0).await?; let records = scanner.poll(Duration::from_secs(5)).await?; for record in records { let row = record.row(); println!("id={}, name={}", row.get_int(0)?, row.get_string(1)?); } Ok(()) }