blob: 8fb60baa24ac48110afef20da8c8374952cbe93c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use clap::Parser;
use fluss::client::FlussConnection;
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
#[tokio::main]
#[allow(dead_code)]
pub async fn main() -> Result<()> {
let mut config = Config::parse();
config.bootstrap_servers = "127.0.0.1:9123".to_string();
let conn = FlussConnection::new(config).await?;
let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.column("age", DataTypes::bigint())
.primary_key(vec!["id"])
.build()?,
)
.build()?;
let table_path = TablePath::new("fluss", "rust_upsert_lookup_example");
let admin = conn.get_admin().await?;
admin
.create_table(&table_path, &table_descriptor, true)
.await?;
println!(
"Created KV Table:\n {}\n",
admin.get_table_info(&table_path).await?
);
let table = conn.get_table(&table_path).await?;
let table_upsert = table.new_upsert()?;
let upsert_writer = table_upsert.create_writer()?;
println!("\n=== Upserting ===");
for (id, name, age) in [(1, "Verso", 32i64), (2, "Noco", 25), (3, "Esquie", 35)] {
let mut row = GenericRow::new(3);
row.set_field(0, id);
row.set_field(1, name);
row.set_field(2, age);
upsert_writer.upsert(&row)?;
println!("Upserted: {row:?}");
}
upsert_writer.flush().await?;
println!("\n=== Looking up ===");
let mut lookuper = table.new_lookup()?.create_lookuper()?;
for id in 1..=3 {
let result = lookuper.lookup(&make_key(id)).await?;
let row = result.get_single_row()?.unwrap();
println!(
"Found id={id}: name={}, age={}",
row.get_string(1)?,
row.get_long(2)?
);
}
println!("\n=== Updating ===");
let mut row = GenericRow::new(3);
row.set_field(0, 1);
row.set_field(1, "Verso");
row.set_field(2, 33i64);
upsert_writer.upsert(&row)?.await?;
println!("Updated: {row:?}");
let result = lookuper.lookup(&make_key(1)).await?;
let row = result.get_single_row()?.unwrap();
println!(
"Verified update: name={}, age={}",
row.get_string(1)?,
row.get_long(2)?
);
println!("\n=== Deleting ===");
// For delete, only primary key field needs to be set; other fields can remain null
let mut row = GenericRow::new(3);
row.set_field(0, 2);
upsert_writer.delete(&row)?.await?;
println!("Deleted row with id=2");
let result = lookuper.lookup(&make_key(2)).await?;
if result.get_single_row()?.is_none() {
println!("Verified deletion");
}
Ok(())
}
fn make_key(id: i32) -> GenericRow<'static> {
let mut row = GenericRow::new(1);
row.set_field(0, id);
row
}