Partitioned tables distribute data across partitions based on partition column values, enabling efficient data organization and querying. Both log tables and primary key tables support partitioning.
use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, TablePath}; let table_descriptor = TableDescriptor::builder() .schema( Schema::builder() .column("event_id", DataTypes::int()) .column("event_type", DataTypes::string()) .column("dt", DataTypes::string()) .column("region", DataTypes::string()) .build()?, ) .partitioned_by(vec!["dt", "region"]) .log_format(LogFormat::ARROW) .build()?; let table_path = TablePath::new("fluss", "partitioned_events"); admin.create_table(&table_path, &table_descriptor, true).await?;
Partitions must exist before writing data, otherwise the client will by default retry indefinitely. Include partition column values in each row, the client routes records to the correct partition automatically.
use fluss::metadata::PartitionSpec; use std::collections::HashMap; let table = conn.get_table(&table_path).await?; // Create the partition before writing let mut partition_values = HashMap::new(); partition_values.insert("dt", "2024-01-15"); partition_values.insert("region", "US"); admin.create_partition(&table_path, &PartitionSpec::new(partition_values), true).await?; let append_writer = table.new_append()?.create_writer()?; let mut row = GenericRow::new(4); row.set_field(0, 1); // event_id row.set_field(1, "user_login"); // event_type row.set_field(2, "2024-01-15"); // dt (partition column) row.set_field(3, "US"); // region (partition column) append_writer.append(&row)?; append_writer.flush().await?;
For partitioned tables, use partition-aware subscribe methods.
use std::time::Duration; let table = conn.get_table(&table_path).await?; let admin = conn.get_admin()?; let partitions = admin.list_partition_infos(&table_path).await?; let log_scanner = table.new_scan().create_log_scanner()?; // Subscribe to each partition's buckets for partition_info in &partitions { let partition_id = partition_info.get_partition_id(); let num_buckets = table.get_table_info().get_num_buckets(); for bucket_id in 0..num_buckets { log_scanner.subscribe_partition(partition_id, bucket_id, 0).await?; } } let records = log_scanner.poll(Duration::from_secs(10)).await?; for record in records { println!("Record: {:?}", record.row()); }
Subscribe to multiple partition-buckets at once:
use std::collections::HashMap; let mut partition_bucket_offsets = HashMap::new(); partition_bucket_offsets.insert((partition_id, 0), 0i64); partition_bucket_offsets.insert((partition_id, 1), 0i64); log_scanner.subscribe_partition_buckets(&partition_bucket_offsets).await?;
use fluss::metadata::PartitionSpec; use std::collections::HashMap; // Create a partition let mut partition_values = HashMap::new(); partition_values.insert("dt", "2024-01-15"); partition_values.insert("region", "EMEA"); let spec = PartitionSpec::new(partition_values); admin.create_partition(&table_path, &spec, true).await?; // List all partitions let partitions = admin.list_partition_infos(&table_path).await?; for partition in &partitions { println!( "Partition: id={}, name={}", partition.get_partition_id(), partition.get_partition_name() ); } // List with filter let mut partial_values = HashMap::new(); partial_values.insert("dt", "2024-01-15"); let partial_spec = PartitionSpec::new(partial_values); let filtered = admin.list_partition_infos_with_spec( &table_path, Some(&partial_spec), ).await?; // Drop a partition admin.drop_partition(&table_path, &spec, true).await?;
Partitioned KV tables combine partitioning with primary key operations. Partition columns must be part of the primary key.
use fluss::metadata::{DataTypes, KvFormat, Schema, TableDescriptor, TablePath}; let table_descriptor = TableDescriptor::builder() .schema( Schema::builder() .column("user_id", DataTypes::int()) .column("region", DataTypes::string()) .column("zone", DataTypes::bigint()) .column("score", DataTypes::bigint()) .primary_key(vec!["user_id", "region", "zone"]) .build()?, ) .partitioned_by(vec!["region", "zone"]) .kv_format(KvFormat::COMPACTED) .build()?; let table_path = TablePath::new("fluss", "partitioned_users"); admin.create_table(&table_path, &table_descriptor, true).await?;
Partitions must exist before upserting data, otherwise the client will by default retry indefinitely.
use fluss::metadata::PartitionSpec; use std::collections::HashMap; let table = conn.get_table(&table_path).await?; // Create partitions first for (region, zone) in [("APAC", "1"), ("EMEA", "2"), ("US", "3")] { let mut values = HashMap::new(); values.insert("region", region); values.insert("zone", zone); admin.create_partition(&table_path, &PartitionSpec::new(values), true).await?; } let table_upsert = table.new_upsert()?; let upsert_writer = table_upsert.create_writer()?; for (user_id, region, zone, score) in [ (1001, "APAC", 1i64, 1234i64), (1002, "EMEA", 2, 2234), (1003, "US", 3, 3234), ] { let mut row = GenericRow::new(4); row.set_field(0, user_id); row.set_field(1, region); row.set_field(2, zone); row.set_field(3, score); upsert_writer.upsert(&row)?; } upsert_writer.flush().await?;
Lookup requires all primary key columns including partition columns.
let mut lookuper = table.new_lookup()?.create_lookuper()?; let mut key = GenericRow::new(3); key.set_field(0, 1001); // user_id key.set_field(1, "APAC"); // region (partition column) key.set_field(2, 1i64); // zone (partition column) let result = lookuper.lookup(&key).await?; if let Some(row) = result.get_single_row()? { println!("Found: score={}", row.get_long(3)?); }
Note: Scanning partitioned primary key tables is not supported. Use lookup operations instead.