Log tables are append-only tables without primary keys, suitable for event streaming.
import pyarrow as pa schema = fluss.Schema(pa.schema([ pa.field("id", pa.int32()), pa.field("name", pa.string()), pa.field("score", pa.float32()), ])) table_path = fluss.TablePath("fluss", "events") await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True)
Rows can be appended as dicts, lists, or tuples. For bulk writes, use write_arrow(), write_arrow_batch(), or write_pandas().
Write methods like append() and write_arrow_batch() return a WriteResultHandle. You can ignore it for fire-and-forget semantics (flush at the end), or await handle.wait() to block until the server acknowledges that specific write.
table = await conn.get_table(table_path) writer = table.new_append().create_writer() # Fire-and-forget: queue writes, flush at the end writer.append({"id": 1, "name": "Alice", "score": 95.5}) writer.append([2, "Bob", 87.0]) await writer.flush() # Per-record acknowledgment handle = writer.append({"id": 3, "name": "Charlie", "score": 91.0}) await handle.wait() # Bulk writes writer.write_arrow(pa_table) # PyArrow Table writer.write_arrow_batch(record_batch) # PyArrow RecordBatch writer.write_pandas(df) # Pandas DataFrame await writer.flush()
There are two scanner types:
create_record_batch_log_scanner()): returns Arrow Tables or DataFrames, best for analyticscreate_log_scanner()): returns individual records with metadata (offset, timestamp, change type), best for streamingAnd two reading modes:
to_arrow() / to_pandas(): reads all data from subscribed buckets up to the current latest offset, then returns. Best for one-shot batch reads.poll_arrow() / poll() / poll_record_batch(): returns whatever data is available within the timeout, then returns. Call in a loop for continuous streaming.num_buckets = (await admin.get_table_info(table_path)).num_buckets scanner = await table.new_scan().create_record_batch_log_scanner() scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) # Reads everything up to current latest offset, then returns arrow_table = scanner.to_arrow() df = scanner.to_pandas()
Use poll_arrow() or poll() in a loop for streaming consumption:
# Batch scanner: poll as Arrow Tables scanner = await table.new_scan().create_record_batch_log_scanner() scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) while True: result = scanner.poll_arrow(timeout_ms=5000) if result.num_rows > 0: print(result.to_pandas()) # Record scanner: poll individual records scanner = await table.new_scan().create_log_scanner() scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) while True: scan_records = scanner.poll(timeout_ms=5000) for record in scan_records: print(f"offset={record.offset}, change={record.change_type.short_string()}, row={record.row}") # Or per-bucket access (dict-like) for bucket, records in scan_records.items(): for record in records: print(f"bucket={bucket.bucket_id}, offset={record.offset}, row={record.row}")
To stop consuming from a bucket, use unsubscribe():
scanner.unsubscribe(bucket_id=0)
To only consume new records (skip existing data), first resolve the current latest offset via list_offsets, then subscribe at that offset:
admin = await conn.get_admin() offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest()) latest = offsets[0] scanner = await table.new_scan().create_record_batch_log_scanner() scanner.subscribe(bucket_id=0, start_offset=latest)
scanner = await table.new_scan().project([0, 2]).create_record_batch_log_scanner() # or by name scanner = await table.new_scan().project_by_name(["id", "score"]).create_record_batch_log_scanner()