Partitioned tables distribute data across partitions based on partition column values, enabling efficient data organization and querying. Both log tables and primary key tables support partitioning.
auto schema = fluss::Schema::NewBuilder() .AddColumn("event_id", fluss::DataType::Int()) .AddColumn("event_type", fluss::DataType::String()) .AddColumn("dt", fluss::DataType::String()) .AddColumn("region", fluss::DataType::String()) .Build(); auto descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) .SetPartitionKeys({"dt", "region"}) .SetBucketCount(3) .Build(); fluss::TablePath table_path("fluss", "partitioned_events"); admin.CreateTable(table_path, descriptor, true);
Partitions must exist before writing data, otherwise the client will by default retry indefinitely. Include partition column values in each row, the client routes records to the correct partition automatically.
fluss::Table table; conn.GetTable(table_path, table); fluss::AppendWriter writer; table.NewAppend().CreateWriter(writer); fluss::GenericRow row; row.SetInt32(0, 1); row.SetString(1, "user_login"); row.SetString(2, "2024-01-15"); row.SetString(3, "US"); writer.Append(row); writer.Flush();
For partitioned tables, use partition-aware subscribe methods.
fluss::Table table; conn.GetTable(table_path, table); fluss::LogScanner scanner; table.NewScan().CreateLogScanner(scanner); // Subscribe to individual partitions for (const auto& pi : partition_infos) { scanner.SubscribePartitionBuckets(pi.partition_id, 0, 0); } fluss::ScanRecords records; scanner.Poll(5000, records); for (const auto& rec : records) { std::cout << "bucket_id=" << rec.bucket_id << " offset=" << rec.offset << std::endl; } // Or batch-subscribe to all partitions at once fluss::LogScanner batch_scanner; table.NewScan().CreateLogScanner(batch_scanner); std::vector<fluss::PartitionBucketSubscription> subs; for (const auto& pi : partition_infos) { subs.push_back({pi.partition_id, 0, 0}); } batch_scanner.SubscribePartitionBuckets(subs);
Unsubscribe from a partition bucket:
// Stop receiving records from a specific partition bucket scanner.UnsubscribePartition(partition_infos[0].partition_id, 0);
// Create a partition admin.CreatePartition(table_path, {{"dt", "2024-01-15"}, {"region", "EMEA"}}, true); // List partitions std::vector<fluss::PartitionInfo> partition_infos; admin.ListPartitionInfos(table_path, partition_infos); // Query partition offsets std::vector<int32_t> bucket_ids = {0, 1, 2}; std::unordered_map<int32_t, int64_t> offsets; admin.ListPartitionOffsets(table_path, "2024-01-15$US", bucket_ids, fluss::OffsetSpec::Latest(), offsets);
Partitioned KV tables combine partitioning with primary key operations. Partition columns must be part of the primary key.
auto schema = fluss::Schema::NewBuilder() .AddColumn("user_id", fluss::DataType::Int()) .AddColumn("region", fluss::DataType::String()) .AddColumn("zone", fluss::DataType::BigInt()) .AddColumn("score", fluss::DataType::BigInt()) .SetPrimaryKeys({"user_id", "region", "zone"}) .Build(); auto descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) .SetPartitionKeys({"region", "zone"}) .SetBucketCount(3) .Build(); fluss::TablePath table_path("fluss", "partitioned_users"); admin.CreateTable(table_path, descriptor, true);
Partitions must exist before upserting data, otherwise the client will by default retry indefinitely.
fluss::Table table; conn.GetTable(table_path, table); // Create partitions first admin.CreatePartition(table_path, {{"region", "APAC"}, {"zone", "1"}}, true); admin.CreatePartition(table_path, {{"region", "EMEA"}, {"zone", "2"}}, true); admin.CreatePartition(table_path, {{"region", "US"}, {"zone", "3"}}, true); fluss::UpsertWriter writer; table.NewUpsert().CreateWriter(writer); auto row = table.NewRow(); row.Set("user_id", 1001); row.Set("region", "APAC"); row.Set("zone", static_cast<int64_t>(1)); row.Set("score", static_cast<int64_t>(1234)); writer.Upsert(row); writer.Flush();
Lookup requires all primary key columns including partition columns.
Note: Scanning partitioned primary key tables is not supported. Use lookup operations instead.
fluss::Lookuper lookuper; table.NewLookup().CreateLookuper(lookuper); auto pk = table.NewRow(); pk.Set("user_id", 1001); pk.Set("region", "APAC"); pk.Set("zone", static_cast<int64_t>(1)); fluss::LookupResult result; lookuper.Lookup(pk, result); if (result.Found()) { std::cout << "score=" << result.GetInt64(3) << std::endl; }