sidebar_position: 1

Example

Minimal working example: connect to Fluss, create a table, write data, and read it back.

import asyncio
import pyarrow as pa
import fluss

async def main():
    # Connect
    config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"})
    conn = await fluss.FlussConnection.create(config)
    admin = await conn.get_admin()

    # Create a log table
    schema = fluss.Schema(pa.schema([
        pa.field("id", pa.int32()),
        pa.field("name", pa.string()),
        pa.field("score", pa.float32()),
    ]))
    table_path = fluss.TablePath("fluss", "quick_start")
    await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True)

    # Write
    table = await conn.get_table(table_path)
    writer = table.new_append().create_writer()
    writer.append({"id": 1, "name": "Alice", "score": 95.5})
    writer.append({"id": 2, "name": "Bob", "score": 87.0})
    await writer.flush()

    # Read
    num_buckets = (await admin.get_table_info(table_path)).num_buckets
    scanner = await table.new_scan().create_record_batch_log_scanner()
    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
    print(scanner.to_pandas())

    # Cleanup
    await admin.drop_table(table_path, ignore_if_not_exists=True)
    conn.close()

asyncio.run(main())