Primary key tables support upsert, delete, and point lookup operations.
Pass primary_keys to fluss.Schema:
import pyarrow as pa schema = fluss.Schema( pa.schema([ pa.field("id", pa.int32()), pa.field("name", pa.string()), pa.field("age", pa.int64()), ]), primary_keys=["id"], ) table_path = fluss.TablePath("fluss", "users") await admin.create_table(table_path, fluss.TableDescriptor(schema, bucket_count=3), ignore_if_exists=True)
table = await conn.get_table(table_path) # Upsert (fire-and-forget, flush at the end) writer = table.new_upsert().create_writer() writer.upsert({"id": 1, "name": "Alice", "age": 25}) writer.upsert({"id": 2, "name": "Bob", "age": 30}) await writer.flush() # Per-record acknowledgment (for read-after-write) handle = writer.upsert({"id": 3, "name": "Charlie", "age": 35}) await handle.wait() # Delete by primary key handle = writer.delete({"id": 2}) await handle.wait() # Lookup lookuper = table.new_lookup().create_lookuper() result = await lookuper.lookup({"id": 1}) if result: print(f"Found: name={result['name']}, age={result['age']}")
Update specific columns while preserving others:
partial_writer = table.new_upsert().partial_update_by_name(["id", "age"]).create_writer() partial_writer.upsert({"id": 1, "age": 27}) # only updates age await partial_writer.flush()