blob: 17c1c2057d381643510b95fb034faf1dec6d1729 [file] [log] [blame] [view]
---
sidebar_position: 6
---
# Partitioned Tables
Partitioned tables distribute data across partitions based on partition column values, enabling efficient data organization and querying. Both log tables and primary key tables support partitioning.
## Partitioned Log Tables
### Creating a Partitioned Log Table
```cpp
auto schema = fluss::Schema::NewBuilder()
.AddColumn("event_id", fluss::DataType::Int())
.AddColumn("event_type", fluss::DataType::String())
.AddColumn("dt", fluss::DataType::String())
.AddColumn("region", fluss::DataType::String())
.Build();
auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetPartitionKeys({"dt", "region"})
.SetBucketCount(3)
.Build();
fluss::TablePath table_path("fluss", "partitioned_events");
admin.CreateTable(table_path, descriptor, true);
```
### Writing to Partitioned Log Tables
**Partitions must exist before writing data, otherwise the client will by default retry indefinitely.** Include partition column values in each row, the client routes records to the correct partition automatically.
```cpp
fluss::Table table;
conn.GetTable(table_path, table);
fluss::AppendWriter writer;
table.NewAppend().CreateWriter(writer);
fluss::GenericRow row;
row.SetInt32(0, 1);
row.SetString(1, "user_login");
row.SetString(2, "2024-01-15");
row.SetString(3, "US");
writer.Append(row);
writer.Flush();
```
### Reading from Partitioned Log Tables
For partitioned tables, use partition-aware subscribe methods.
```cpp
fluss::Table table;
conn.GetTable(table_path, table);
fluss::LogScanner scanner;
table.NewScan().CreateLogScanner(scanner);
// Subscribe to individual partitions
for (const auto& pi : partition_infos) {
scanner.SubscribePartitionBuckets(pi.partition_id, 0, 0);
}
fluss::ScanRecords records;
scanner.Poll(5000, records);
for (const auto& rec : records) {
std::cout << "bucket_id=" << rec.bucket_id
<< " offset=" << rec.offset << std::endl;
}
// Or batch-subscribe to all partitions at once
fluss::LogScanner batch_scanner;
table.NewScan().CreateLogScanner(batch_scanner);
std::vector<fluss::PartitionBucketSubscription> subs;
for (const auto& pi : partition_infos) {
subs.push_back({pi.partition_id, 0, 0});
}
batch_scanner.SubscribePartitionBuckets(subs);
```
**Unsubscribe from a partition bucket:**
```cpp
// Stop receiving records from a specific partition bucket
scanner.UnsubscribePartition(partition_infos[0].partition_id, 0);
```
### Managing Partitions
```cpp
// Create a partition
admin.CreatePartition(table_path, {{"dt", "2024-01-15"}, {"region", "EMEA"}}, true);
// List partitions
std::vector<fluss::PartitionInfo> partition_infos;
admin.ListPartitionInfos(table_path, partition_infos);
// Query partition offsets
std::vector<int32_t> bucket_ids = {0, 1, 2};
std::unordered_map<int32_t, int64_t> offsets;
admin.ListPartitionOffsets(table_path, "2024-01-15$US",
bucket_ids, fluss::OffsetSpec::Latest(), offsets);
```
## Partitioned Primary Key Tables
Partitioned KV tables combine partitioning with primary key operations. Partition columns must be part of the primary key.
### Creating a Partitioned Primary Key Table
```cpp
auto schema = fluss::Schema::NewBuilder()
.AddColumn("user_id", fluss::DataType::Int())
.AddColumn("region", fluss::DataType::String())
.AddColumn("zone", fluss::DataType::BigInt())
.AddColumn("score", fluss::DataType::BigInt())
.SetPrimaryKeys({"user_id", "region", "zone"})
.Build();
auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetPartitionKeys({"region", "zone"})
.SetBucketCount(3)
.Build();
fluss::TablePath table_path("fluss", "partitioned_users");
admin.CreateTable(table_path, descriptor, true);
```
### Writing to Partitioned Primary Key Tables
**Partitions must exist before upserting data, otherwise the client will by default retry indefinitely.**
```cpp
fluss::Table table;
conn.GetTable(table_path, table);
// Create partitions first
admin.CreatePartition(table_path, {{"region", "APAC"}, {"zone", "1"}}, true);
admin.CreatePartition(table_path, {{"region", "EMEA"}, {"zone", "2"}}, true);
admin.CreatePartition(table_path, {{"region", "US"}, {"zone", "3"}}, true);
fluss::UpsertWriter writer;
table.NewUpsert().CreateWriter(writer);
auto row = table.NewRow();
row.Set("user_id", 1001);
row.Set("region", "APAC");
row.Set("zone", static_cast<int64_t>(1));
row.Set("score", static_cast<int64_t>(1234));
writer.Upsert(row);
writer.Flush();
```
### Looking Up Records in Partitioned Tables
Lookup requires all primary key columns including partition columns.
> **Note:** Scanning partitioned primary key tables is not supported. Use lookup operations instead.
```cpp
fluss::Lookuper lookuper;
table.NewLookup().CreateLookuper(lookuper);
auto pk = table.NewRow();
pk.Set("user_id", 1001);
pk.Set("region", "APAC");
pk.Set("zone", static_cast<int64_t>(1));
fluss::LookupResult result;
lookuper.Lookup(pk, result);
if (result.Found()) {
std::cout << "score=" << result.GetInt64(3) << std::endl;
}
```