blob: 7c01cf1ccd482015704f5cdee09d2f9de5ea9794 [file] [log] [blame] [view]
---
sidebar_position: 4
---
# Log Tables
Log tables are append-only tables without primary keys, suitable for event streaming.
## Creating a Log Table
```rust
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("event_id", DataTypes::int())
.column("event_type", DataTypes::string())
.column("timestamp", DataTypes::bigint())
.build()?,
)
.build()?;
let table_path = TablePath::new("fluss", "events");
admin.create_table(&table_path, &table_descriptor, true).await?;
```
## Writing to Log Tables
```rust
use fluss::row::{GenericRow, InternalRow};
let table = conn.get_table(&table_path).await?;
let append_writer = table.new_append()?.create_writer()?;
let mut row = GenericRow::new(3);
row.set_field(0, 1); // event_id
row.set_field(1, "user_login"); // event_type
row.set_field(2, 1704067200000i64); // timestamp
append_writer.append(&row)?;
append_writer.flush().await?;
```
Write operations use a **fire-and-forget** pattern for efficient batching. Each call queues the write and returns a `WriteResultFuture` immediately. Call `flush()` to ensure all queued writes are sent to the server.
For per-record acknowledgment:
```rust
append_writer.append(&row)?.await?;
```
## Reading from Log Tables
```rust
use std::time::Duration;
let table = conn.get_table(&table_path).await?;
let log_scanner = table.new_scan().create_log_scanner()?;
// Subscribe to bucket 0 starting from offset 0
log_scanner.subscribe(0, 0).await?;
// Poll for records
let records = log_scanner.poll(Duration::from_secs(10)).await?;
// Per-bucket access
for (bucket, bucket_records) in records.records_by_buckets() {
println!("Bucket {}: {} records", bucket.bucket_id(), bucket_records.len());
for record in bucket_records {
let row = record.row();
println!(
" event_id={}, event_type={} @ offset={}",
row.get_int(0)?,
row.get_string(1)?,
record.offset()
);
}
}
// Or flat iteration (consumes ScanRecords)
for record in records {
let row = record.row();
println!(
"event_id={}, event_type={}, timestamp={} @ offset={}",
row.get_int(0)?,
row.get_string(1)?,
row.get_long(2)?,
record.offset()
);
}
```
**Subscribe from special offsets:**
```rust
use fluss::client::EARLIEST_OFFSET;
log_scanner.subscribe(0, EARLIEST_OFFSET).await?; // from earliest
log_scanner.subscribe(0, 42).await?; // from specific offset
```
**Subscribe from latest offset (only new records):**
To start reading only new records, first resolve the current latest offset via `list_offsets`, then subscribe at that offset:
```rust
use fluss::rpc::message::OffsetSpec;
let admin = conn.get_admin().await?;
let offsets = admin.list_offsets(&table_path, &[0], OffsetSpec::Latest).await?;
let latest = offsets[&0];
log_scanner.subscribe(0, latest).await?;
```
**Subscribe to all buckets:**
```rust
let num_buckets = table.get_table_info().get_num_buckets();
for bucket_id in 0..num_buckets {
log_scanner.subscribe(bucket_id, 0).await?;
}
```
**Subscribe to multiple buckets at once:**
```rust
use std::collections::HashMap;
let mut bucket_offsets = HashMap::new();
bucket_offsets.insert(0, 0i64);
bucket_offsets.insert(1, 100i64);
log_scanner.subscribe_buckets(&bucket_offsets).await?;
```
**Unsubscribe from a bucket:**
```rust
// Non-partitioned tables
log_scanner.unsubscribe(bucket_id).await?;
// Partitioned tables
log_scanner.unsubscribe_partition(partition_id, bucket_id).await?;
```
## Column Projection
```rust
// Project by column index
let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
// Project by column name
let scanner = table.new_scan()
.project_by_name(&["event_id", "timestamp"])?
.create_log_scanner()?;
```