Partitioned tables distribute data across partitions based on column values. Partitions must exist before writing data, otherwise the client will by default retry indefinitely.
import pyarrow as pa schema = fluss.Schema(pa.schema([ pa.field("id", pa.int32()), pa.field("region", pa.string()), pa.field("value", pa.int64()), ])) table_path = fluss.TablePath("fluss", "partitioned_events") await admin.create_table( table_path, fluss.TableDescriptor(schema, partition_keys=["region"], bucket_count=1), ignore_if_exists=True, ) # Create partitions await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) await admin.create_partition(table_path, {"region": "EU"}, ignore_if_exists=True) # List partitions partition_infos = await admin.list_partition_infos(table_path)
Same as non-partitioned tables - include partition column values in each row. Partitions must exist before writing data, otherwise the client will by default retry indefinitely.
table = await conn.get_table(table_path) writer = table.new_append().create_writer() writer.append({"id": 1, "region": "US", "value": 100}) writer.append({"id": 2, "region": "EU", "value": 200}) await writer.flush()
Use subscribe_partition() or subscribe_partition_buckets() instead of subscribe():
scanner = await table.new_scan().create_record_batch_log_scanner() # Subscribe to individual partitions for p in partition_infos: scanner.subscribe_partition(partition_id=p.partition_id, bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) # Or batch-subscribe scanner.subscribe_partition_buckets({ (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos }) print(scanner.to_pandas())
To stop consuming from a specific partition bucket, use unsubscribe_partition():
scanner.unsubscribe_partition(partition_id=partition_infos[0].partition_id, bucket_id=0)
Partition columns must be part of the primary key. Partitions must exist before upserting data, otherwise the client will by default retry indefinitely.
schema = fluss.Schema( pa.schema([ pa.field("user_id", pa.int32()), pa.field("region", pa.string()), pa.field("score", pa.int64()), ]), primary_keys=["user_id", "region"], ) table_path = fluss.TablePath("fluss", "partitioned_users") await admin.create_table( table_path, fluss.TableDescriptor(schema, partition_keys=["region"]), ignore_if_exists=True, ) await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True) table = await conn.get_table(table_path) writer = table.new_upsert().create_writer() writer.upsert({"user_id": 1, "region": "US", "score": 1234}) await writer.flush() # Lookup includes partition columns lookuper = table.new_lookup().create_lookuper() result = await lookuper.lookup({"user_id": 1, "region": "US"})