Log tables are append-only tables without primary keys, suitable for event streaming.
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; let table_descriptor = TableDescriptor::builder() .schema( Schema::builder() .column("event_id", DataTypes::int()) .column("event_type", DataTypes::string()) .column("timestamp", DataTypes::bigint()) .build()?, ) .build()?; let table_path = TablePath::new("fluss", "events"); admin.create_table(&table_path, &table_descriptor, true).await?;
use fluss::row::{GenericRow, InternalRow}; let table = conn.get_table(&table_path).await?; let append_writer = table.new_append()?.create_writer()?; let mut row = GenericRow::new(3); row.set_field(0, 1); // event_id row.set_field(1, "user_login"); // event_type row.set_field(2, 1704067200000i64); // timestamp append_writer.append(&row)?; append_writer.flush().await?;
Write operations use a fire-and-forget pattern for efficient batching. Each call queues the write and returns a WriteResultFuture immediately. Call flush() to ensure all queued writes are sent to the server.
For per-record acknowledgment:
append_writer.append(&row)?.await?;
use std::time::Duration; let table = conn.get_table(&table_path).await?; let log_scanner = table.new_scan().create_log_scanner()?; // Subscribe to bucket 0 starting from offset 0 log_scanner.subscribe(0, 0).await?; // Poll for records let records = log_scanner.poll(Duration::from_secs(10)).await?; // Per-bucket access for (bucket, bucket_records) in records.records_by_buckets() { println!("Bucket {}: {} records", bucket.bucket_id(), bucket_records.len()); for record in bucket_records { let row = record.row(); println!( " event_id={}, event_type={} @ offset={}", row.get_int(0)?, row.get_string(1)?, record.offset() ); } } // Or flat iteration (consumes ScanRecords) for record in records { let row = record.row(); println!( "event_id={}, event_type={}, timestamp={} @ offset={}", row.get_int(0)?, row.get_string(1)?, row.get_long(2)?, record.offset() ); }
Subscribe from special offsets:
use fluss::client::EARLIEST_OFFSET; log_scanner.subscribe(0, EARLIEST_OFFSET).await?; // from earliest log_scanner.subscribe(0, 42).await?; // from specific offset
Subscribe from latest offset (only new records):
To start reading only new records, first resolve the current latest offset via list_offsets, then subscribe at that offset:
use fluss::rpc::message::OffsetSpec; let admin = conn.get_admin().await?; let offsets = admin.list_offsets(&table_path, &[0], OffsetSpec::Latest).await?; let latest = offsets[&0]; log_scanner.subscribe(0, latest).await?;
Subscribe to all buckets:
let num_buckets = table.get_table_info().get_num_buckets(); for bucket_id in 0..num_buckets { log_scanner.subscribe(bucket_id, 0).await?; }
Subscribe to multiple buckets at once:
use std::collections::HashMap; let mut bucket_offsets = HashMap::new(); bucket_offsets.insert(0, 0i64); bucket_offsets.insert(1, 100i64); log_scanner.subscribe_buckets(&bucket_offsets).await?;
Unsubscribe from a bucket:
// Non-partitioned tables log_scanner.unsubscribe(bucket_id).await?; // Partitioned tables log_scanner.unsubscribe_partition(partition_id, bucket_id).await?;
// Project by column index let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?; // Project by column name let scanner = table.new_scan() .project_by_name(&["event_id", "timestamp"])? .create_log_scanner()?;