sidebar_position: 5

Primary Key Tables

Primary key tables (KV tables) support upsert, delete, and lookup operations.

Creating a Primary Key Table

auto schema = fluss::Schema::NewBuilder()
    .AddColumn("id", fluss::DataType::Int())
    .AddColumn("name", fluss::DataType::String())
    .AddColumn("age", fluss::DataType::BigInt())
    .SetPrimaryKeys({"id"})
    .Build();

auto descriptor = fluss::TableDescriptor::NewBuilder()
    .SetSchema(schema)
    .SetBucketCount(3)
    .Build();

fluss::TablePath table_path("fluss", "users");
admin.CreateTable(table_path, descriptor, true);

Upserting Records

fluss::Table table;
conn.GetTable(table_path, table);

fluss::UpsertWriter upsert_writer;
table.NewUpsert().CreateWriter(upsert_writer);

// Fire-and-forget upserts
{
    auto row = table.NewRow();
    row.Set("id", 1);
    row.Set("name", "Alice");
    row.Set("age", static_cast<int64_t>(25));
    upsert_writer.Upsert(row);
}
{
    auto row = table.NewRow();
    row.Set("id", 2);
    row.Set("name", "Bob");
    row.Set("age", static_cast<int64_t>(30));
    upsert_writer.Upsert(row);
}
upsert_writer.Flush();

// Per-record acknowledgment
{
    auto row = table.NewRow();
    row.Set("id", 3);
    row.Set("name", "Charlie");
    row.Set("age", static_cast<int64_t>(35));
    fluss::WriteResult wr;
    upsert_writer.Upsert(row, wr);
    wr.Wait();
}

Updating Records

Upsert with the same primary key to update an existing record.

auto row = table.NewRow();
row.Set("id", 1);
row.Set("name", "Alice Updated");
row.Set("age", static_cast<int64_t>(26));
fluss::WriteResult wr;
upsert_writer.Upsert(row, wr);
wr.Wait();

Deleting Records

auto pk_row = table.NewRow();
pk_row.Set("id", 2);
fluss::WriteResult wr;
upsert_writer.Delete(pk_row, wr);
wr.Wait();

Partial Updates

Update only specific columns while preserving others.

// By column names
fluss::UpsertWriter partial_writer;
table.NewUpsert()
    .PartialUpdateByName({"id", "age"})
    .CreateWriter(partial_writer);

auto row = table.NewRow();
row.Set("id", 1);
row.Set("age", static_cast<int64_t>(27));
fluss::WriteResult wr;
partial_writer.Upsert(row, wr);
wr.Wait();

// By column indices
fluss::UpsertWriter partial_writer_idx;
table.NewUpsert()
    .PartialUpdateByIndex({0, 2})
    .CreateWriter(partial_writer_idx);

Looking Up Records

fluss::Lookuper lookuper;
table.NewLookup().CreateLookuper(lookuper);

auto pk_row = table.NewRow();
pk_row.Set("id", 1);

fluss::LookupResult result;
lookuper.Lookup(pk_row, result);

if (result.Found()) {
    std::cout << "Found: name=" << result.GetString(1)
              << ", age=" << result.GetInt64(2) << std::endl;
} else {
    std::cout << "Not found" << std::endl;
}