blob: adaa162ada8e9e3489a43f395d6c47163a2709f6 [file] [log] [blame] [view]
---
sidebar_position: 4
---
# Log Tables
Log tables are append-only tables without primary keys, suitable for event streaming.
## Creating a Log Table
```python
import pyarrow as pa
schema = fluss.Schema(pa.schema([
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field("score", pa.float32()),
]))
table_path = fluss.TablePath("fluss", "events")
await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True)
```
## Writing
Rows can be appended as dicts, lists, or tuples. For bulk writes, use `write_arrow()`, `write_arrow_batch()`, or `write_pandas()`.
Write methods like `append()` and `write_arrow_batch()` return a `WriteResultHandle`. You can ignore it for fire-and-forget semantics (flush at the end), or `await handle.wait()` to block until the server acknowledges that specific write.
```python
table = await conn.get_table(table_path)
writer = table.new_append().create_writer()
# Fire-and-forget: queue writes, flush at the end
writer.append({"id": 1, "name": "Alice", "score": 95.5})
writer.append([2, "Bob", 87.0])
await writer.flush()
# Per-record acknowledgment
handle = writer.append({"id": 3, "name": "Charlie", "score": 91.0})
await handle.wait()
# Bulk writes
writer.write_arrow(pa_table) # PyArrow Table
writer.write_arrow_batch(record_batch) # PyArrow RecordBatch
writer.write_pandas(df) # Pandas DataFrame
await writer.flush()
```
## Reading
There are two scanner types:
- **Batch scanner** (`create_record_batch_log_scanner()`): returns Arrow Tables or DataFrames, best for analytics
- **Record scanner** (`create_log_scanner()`): returns individual records with metadata (offset, timestamp, change type), best for streaming
And two reading modes:
- **`to_arrow()` / `to_pandas()`**: reads all data from subscribed buckets up to the current latest offset, then returns. Best for one-shot batch reads.
- **`poll_arrow()` / `poll()` / `poll_record_batch()`**: returns whatever data is available within the timeout, then returns. Call in a loop for continuous streaming.
### Batch Read (One-Shot)
```python
num_buckets = (await admin.get_table_info(table_path)).num_buckets
scanner = await table.new_scan().create_record_batch_log_scanner()
scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
# Reads everything up to current latest offset, then returns
arrow_table = scanner.to_arrow()
df = scanner.to_pandas()
```
### Continuous Polling
Use `poll_arrow()` or `poll()` in a loop for streaming consumption:
```python
# Batch scanner: poll as Arrow Tables
scanner = await table.new_scan().create_record_batch_log_scanner()
scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET)
while True:
result = scanner.poll_arrow(timeout_ms=5000)
if result.num_rows > 0:
print(result.to_pandas())
# Record scanner: poll individual records
scanner = await table.new_scan().create_log_scanner()
scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
while True:
scan_records = scanner.poll(timeout_ms=5000)
for record in scan_records:
print(f"offset={record.offset}, change={record.change_type.short_string()}, row={record.row}")
# Or per-bucket access (dict-like)
for bucket, records in scan_records.items():
for record in records:
print(f"bucket={bucket.bucket_id}, offset={record.offset}, row={record.row}")
```
### Unsubscribing
To stop consuming from a bucket, use `unsubscribe()`:
```python
scanner.unsubscribe(bucket_id=0)
```
### Subscribe from Latest Offset
To only consume new records (skip existing data), first resolve the current latest offset via `list_offsets`, then subscribe at that offset:
```python
admin = await conn.get_admin()
offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
latest = offsets[0]
scanner = await table.new_scan().create_record_batch_log_scanner()
scanner.subscribe(bucket_id=0, start_offset=latest)
```
## Column Projection
```python
scanner = await table.new_scan().project([0, 2]).create_record_batch_log_scanner()
# or by name
scanner = await table.new_scan().project_by_name(["id", "score"]).create_record_batch_log_scanner()
```