blob: 9c4f7a02fb3411ec341aa75b125c721e9c2172e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include "test_utils.h"
class KvTableTest : public ::testing::Test {
protected:
fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); }
fluss::Connection& connection() {
return fluss_test::FlussTestEnvironment::Instance()->GetConnection();
}
};
TEST_F(KvTableTest, UpsertDeleteAndLookup) {
auto& adm = admin();
auto& conn = connection();
fluss::TablePath table_path("fluss", "test_upsert_and_lookup_cpp");
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.AddColumn("age", fluss::DataType::BigInt())
.SetPrimaryKeys({"id"})
.Build();
auto table_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetProperty("table.replication.factor", "1")
.Build();
fluss_test::CreateTable(adm, table_path, table_descriptor);
fluss::Table table;
ASSERT_OK(conn.GetTable(table_path, table));
// Create upsert writer
auto table_upsert = table.NewUpsert();
fluss::UpsertWriter upsert_writer;
ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
// Upsert 3 rows (fire-and-forget, then flush)
struct TestData {
int32_t id;
std::string name;
int64_t age;
};
std::vector<TestData> test_data = {{1, "Verso", 32}, {2, "Noco", 25}, {3, "Esquie", 35}};
for (const auto& d : test_data) {
fluss::GenericRow row(3);
row.SetInt32(0, d.id);
row.SetString(1, d.name);
row.SetInt64(2, d.age);
ASSERT_OK(upsert_writer.Upsert(row));
}
ASSERT_OK(upsert_writer.Flush());
// Create lookuper
fluss::Lookuper lookuper;
ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
// Verify lookup results
for (const auto& d : test_data) {
fluss::GenericRow key(3);
key.SetInt32(0, d.id);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found()) << "Row with id=" << d.id << " should exist";
EXPECT_EQ(result.GetInt32(0), d.id) << "id mismatch";
EXPECT_EQ(result.GetString(1), d.name) << "name mismatch";
EXPECT_EQ(result.GetInt64(2), d.age) << "age mismatch";
}
// Update record with id=1 (await acknowledgment)
{
fluss::GenericRow updated_row(3);
updated_row.SetInt32(0, 1);
updated_row.SetString(1, "Verso");
updated_row.SetInt64(2, 33);
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(updated_row, wr));
ASSERT_OK(wr.Wait());
}
// Verify the update
{
fluss::GenericRow key(3);
key.SetInt32(0, 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(result.GetInt64(2), 33) << "Age should be updated";
EXPECT_EQ(result.GetString(1), "Verso") << "Name should remain unchanged";
}
// Delete record with id=1 (await acknowledgment)
{
fluss::GenericRow delete_row(3);
delete_row.SetInt32(0, 1);
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Delete(delete_row, wr));
ASSERT_OK(wr.Wait());
}
// Verify deletion
{
fluss::GenericRow key(3);
key.SetInt32(0, 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_FALSE(result.Found()) << "Record 1 should not exist after delete";
}
// Verify other records still exist
for (int id : {2, 3}) {
fluss::GenericRow key(3);
key.SetInt32(0, id);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found()) << "Record " << id
<< " should still exist after deleting record 1";
}
// Lookup non-existent key
{
fluss::GenericRow key(3);
key.SetInt32(0, 999);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_FALSE(result.Found()) << "Non-existent key should return not found";
}
ASSERT_OK(adm.DropTable(table_path, false));
}
TEST_F(KvTableTest, CompositePrimaryKeys) {
auto& adm = admin();
auto& conn = connection();
fluss::TablePath table_path("fluss", "test_composite_pk_cpp");
auto schema = fluss::Schema::NewBuilder()
.AddColumn("region", fluss::DataType::String())
.AddColumn("score", fluss::DataType::BigInt())
.AddColumn("user_id", fluss::DataType::Int())
.SetPrimaryKeys({"region", "user_id"})
.Build();
auto table_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetProperty("table.replication.factor", "1")
.Build();
fluss_test::CreateTable(adm, table_path, table_descriptor);
fluss::Table table;
ASSERT_OK(conn.GetTable(table_path, table));
auto table_upsert = table.NewUpsert();
fluss::UpsertWriter upsert_writer;
ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
// Insert records with composite keys
struct TestData {
std::string region;
int32_t user_id;
int64_t score;
};
std::vector<TestData> test_data = {
{"US", 1, 100}, {"US", 2, 200}, {"EU", 1, 150}, {"EU", 2, 250}};
for (const auto& d : test_data) {
auto row = table.NewRow();
row.Set("region", d.region);
row.Set("score", d.score);
row.Set("user_id", d.user_id);
ASSERT_OK(upsert_writer.Upsert(row));
}
ASSERT_OK(upsert_writer.Flush());
// Create lookuper
fluss::Lookuper lookuper;
ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
// Lookup (US, 1) - should return score 100
{
auto key = table.NewRow();
key.Set("region", "US");
key.Set("user_id", 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(result.GetInt64("score"), 100) << "Score for (US, 1) should be 100";
}
// Lookup (EU, 2) - should return score 250
{
auto key = table.NewRow();
key.Set("region", "EU");
key.Set("user_id", 2);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(result.GetInt64("score"), 250) << "Score for (EU, 2) should be 250";
}
// Update (US, 1) score (await acknowledgment)
{
auto update_row = table.NewRow();
update_row.Set("region", "US");
update_row.Set("user_id", 1);
update_row.Set("score", static_cast<int64_t>(500));
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(update_row, wr));
ASSERT_OK(wr.Wait());
}
// Verify update
{
auto key = table.NewRow();
key.Set("region", "US");
key.Set("user_id", 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(result.GetInt64("score"), 500) << "Row score should be updated";
}
ASSERT_OK(adm.DropTable(table_path, false));
}
TEST_F(KvTableTest, PartialUpdate) {
auto& adm = admin();
auto& conn = connection();
fluss::TablePath table_path("fluss", "test_partial_update_cpp");
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.AddColumn("age", fluss::DataType::BigInt())
.AddColumn("score", fluss::DataType::BigInt())
.SetPrimaryKeys({"id"})
.Build();
auto table_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetProperty("table.replication.factor", "1")
.Build();
fluss_test::CreateTable(adm, table_path, table_descriptor);
fluss::Table table;
ASSERT_OK(conn.GetTable(table_path, table));
// Insert initial record with all columns
auto table_upsert = table.NewUpsert();
fluss::UpsertWriter upsert_writer;
ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
{
fluss::GenericRow row(4);
row.SetInt32(0, 1);
row.SetString(1, "Verso");
row.SetInt64(2, 32);
row.SetInt64(3, 6942);
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(row, wr));
ASSERT_OK(wr.Wait());
}
// Verify initial record
fluss::Lookuper lookuper;
ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
{
fluss::GenericRow key(4);
key.SetInt32(0, 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(result.GetInt32(0), 1);
EXPECT_EQ(result.GetString(1), "Verso");
EXPECT_EQ(result.GetInt64(2), 32);
EXPECT_EQ(result.GetInt64(3), 6942);
}
// Create partial update writer to update only score column
auto partial_upsert = table.NewUpsert();
partial_upsert.PartialUpdateByName({"id", "score"});
fluss::UpsertWriter partial_writer;
ASSERT_OK(partial_upsert.CreateWriter(partial_writer));
// Update only the score column (await acknowledgment)
{
fluss::GenericRow partial_row(4);
partial_row.SetInt32(0, 1);
partial_row.SetNull(1); // not in partial update
partial_row.SetNull(2); // not in partial update
partial_row.SetInt64(3, 420);
fluss::WriteResult wr;
ASSERT_OK(partial_writer.Upsert(partial_row, wr));
ASSERT_OK(wr.Wait());
}
// Verify partial update - name and age should remain unchanged
{
fluss::GenericRow key(4);
key.SetInt32(0, 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1";
EXPECT_EQ(result.GetString(1), "Verso") << "name should remain unchanged";
EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged";
EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420";
}
ASSERT_OK(adm.DropTable(table_path, false));
}
TEST_F(KvTableTest, PartialUpdateByIndex) {
auto& adm = admin();
auto& conn = connection();
fluss::TablePath table_path("fluss", "test_partial_update_by_index_cpp");
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.AddColumn("age", fluss::DataType::BigInt())
.AddColumn("score", fluss::DataType::BigInt())
.SetPrimaryKeys({"id"})
.Build();
auto table_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetProperty("table.replication.factor", "1")
.Build();
fluss_test::CreateTable(adm, table_path, table_descriptor);
fluss::Table table;
ASSERT_OK(conn.GetTable(table_path, table));
// Insert initial record with all columns
auto table_upsert = table.NewUpsert();
fluss::UpsertWriter upsert_writer;
ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
{
fluss::GenericRow row(4);
row.SetInt32(0, 1);
row.SetString(1, "Verso");
row.SetInt64(2, 32);
row.SetInt64(3, 6942);
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(row, wr));
ASSERT_OK(wr.Wait());
}
// Verify initial record
fluss::Lookuper lookuper;
ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
{
fluss::GenericRow key(4);
key.SetInt32(0, 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(result.GetInt32(0), 1);
EXPECT_EQ(result.GetString(1), "Verso");
EXPECT_EQ(result.GetInt64(2), 32);
EXPECT_EQ(result.GetInt64(3), 6942);
}
// Create partial update writer using column indices: 0 (id) and 3 (score)
auto partial_upsert = table.NewUpsert();
partial_upsert.PartialUpdateByIndex({0, 3});
fluss::UpsertWriter partial_writer;
ASSERT_OK(partial_upsert.CreateWriter(partial_writer));
// Update only the score column (await acknowledgment)
{
fluss::GenericRow partial_row(4);
partial_row.SetInt32(0, 1);
partial_row.SetNull(1); // not in partial update
partial_row.SetNull(2); // not in partial update
partial_row.SetInt64(3, 420);
fluss::WriteResult wr;
ASSERT_OK(partial_writer.Upsert(partial_row, wr));
ASSERT_OK(wr.Wait());
}
// Verify partial update - name and age should remain unchanged
{
fluss::GenericRow key(4);
key.SetInt32(0, 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1";
EXPECT_EQ(result.GetString(1), "Verso") << "name should remain unchanged";
EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged";
EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420";
}
ASSERT_OK(adm.DropTable(table_path, false));
}
TEST_F(KvTableTest, PartitionedTableUpsertAndLookup) {
auto& adm = admin();
auto& conn = connection();
fluss::TablePath table_path("fluss", "test_partitioned_kv_table_cpp");
// Create a partitioned KV table with region as partition key
auto schema = fluss::Schema::NewBuilder()
.AddColumn("region", fluss::DataType::String())
.AddColumn("user_id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.AddColumn("score", fluss::DataType::BigInt())
.SetPrimaryKeys({"region", "user_id"})
.Build();
auto table_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetPartitionKeys({"region"})
.SetProperty("table.replication.factor", "1")
.Build();
fluss_test::CreateTable(adm, table_path, table_descriptor);
// Create partitions
fluss_test::CreatePartitions(adm, table_path, "region", {"US", "EU", "APAC"});
fluss::Table table;
ASSERT_OK(conn.GetTable(table_path, table));
auto table_upsert = table.NewUpsert();
fluss::UpsertWriter upsert_writer;
ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
// Insert records with different partitions
struct TestData {
std::string region;
int32_t user_id;
std::string name;
int64_t score;
};
std::vector<TestData> test_data = {{"US", 1, "Gustave", 100}, {"US", 2, "Lune", 200},
{"EU", 1, "Sciel", 150}, {"EU", 2, "Maelle", 250},
{"APAC", 1, "Noco", 300}};
for (const auto& d : test_data) {
fluss::GenericRow row(4);
row.SetString(0, d.region);
row.SetInt32(1, d.user_id);
row.SetString(2, d.name);
row.SetInt64(3, d.score);
ASSERT_OK(upsert_writer.Upsert(row));
}
ASSERT_OK(upsert_writer.Flush());
// Create lookuper
fluss::Lookuper lookuper;
ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
// Lookup records
for (const auto& d : test_data) {
fluss::GenericRow key(4);
key.SetString(0, d.region);
key.SetInt32(1, d.user_id);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(std::string(result.GetString(0)), d.region) << "region mismatch";
EXPECT_EQ(result.GetInt32(1), d.user_id) << "user_id mismatch";
EXPECT_EQ(std::string(result.GetString(2)), d.name) << "name mismatch";
EXPECT_EQ(result.GetInt64(3), d.score) << "score mismatch";
}
// Update within a partition (await acknowledgment)
{
fluss::GenericRow updated_row(4);
updated_row.SetString(0, "US");
updated_row.SetInt32(1, 1);
updated_row.SetString(2, "Gustave Updated");
updated_row.SetInt64(3, 999);
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(updated_row, wr));
ASSERT_OK(wr.Wait());
}
// Verify the update
{
fluss::GenericRow key(4);
key.SetString(0, "US");
key.SetInt32(1, 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(std::string(result.GetString(2)), "Gustave Updated");
EXPECT_EQ(result.GetInt64(3), 999);
}
// Lookup in non-existent partition should return not found
{
fluss::GenericRow key(4);
key.SetString(0, "UNKNOWN_REGION");
key.SetInt32(1, 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_FALSE(result.Found()) << "Lookup in non-existent partition should return not found";
}
// Delete a record within a partition (await acknowledgment)
{
fluss::GenericRow delete_key(4);
delete_key.SetString(0, "EU");
delete_key.SetInt32(1, 1);
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Delete(delete_key, wr));
ASSERT_OK(wr.Wait());
}
// Verify deletion
{
fluss::GenericRow key(4);
key.SetString(0, "EU");
key.SetInt32(1, 1);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_FALSE(result.Found()) << "Deleted record should not exist";
}
// Verify other records in same partition still exist
{
fluss::GenericRow key(4);
key.SetString(0, "EU");
key.SetInt32(1, 2);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(std::string(result.GetString(2)), "Maelle");
}
ASSERT_OK(adm.DropTable(table_path, false));
}
TEST_F(KvTableTest, AllSupportedDatatypes) {
auto& adm = admin();
auto& conn = connection();
fluss::TablePath table_path("fluss", "test_all_datatypes_cpp");
// Create a table with all supported datatypes
auto schema = fluss::Schema::NewBuilder()
.AddColumn("pk_int", fluss::DataType::Int())
.AddColumn("col_boolean", fluss::DataType::Boolean())
.AddColumn("col_tinyint", fluss::DataType::TinyInt())
.AddColumn("col_smallint", fluss::DataType::SmallInt())
.AddColumn("col_int", fluss::DataType::Int())
.AddColumn("col_bigint", fluss::DataType::BigInt())
.AddColumn("col_float", fluss::DataType::Float())
.AddColumn("col_double", fluss::DataType::Double())
.AddColumn("col_char", fluss::DataType::Char(10))
.AddColumn("col_string", fluss::DataType::String())
.AddColumn("col_decimal", fluss::DataType::Decimal(10, 2))
.AddColumn("col_date", fluss::DataType::Date())
.AddColumn("col_time", fluss::DataType::Time())
.AddColumn("col_timestamp", fluss::DataType::Timestamp())
.AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz())
.AddColumn("col_bytes", fluss::DataType::Bytes())
.AddColumn("col_binary", fluss::DataType::Binary(20))
.SetPrimaryKeys({"pk_int"})
.Build();
auto table_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetProperty("table.replication.factor", "1")
.Build();
fluss_test::CreateTable(adm, table_path, table_descriptor);
fluss::Table table;
ASSERT_OK(conn.GetTable(table_path, table));
auto table_upsert = table.NewUpsert();
fluss::UpsertWriter upsert_writer;
ASSERT_OK(table_upsert.CreateWriter(upsert_writer));
// Test data
int32_t pk_int = 1;
bool col_boolean = true;
int32_t col_tinyint = 127;
int32_t col_smallint = 32767;
int32_t col_int = 2147483647;
int64_t col_bigint = 9223372036854775807LL;
float col_float = 3.14f;
double col_double = 2.718281828459045;
std::string col_char = "hello";
std::string col_string = "world of fluss rust client";
std::string col_decimal = "123.45";
auto col_date = fluss::Date::FromDays(20476); // 2026-01-23
auto col_time = fluss::Time::FromMillis(36827000); // 10:13:47
auto col_timestamp = fluss::Timestamp::FromMillis(1769163227123); // 2026-01-23 10:13:47.123
auto col_timestamp_ltz = fluss::Timestamp::FromMillis(1769163227123);
std::vector<uint8_t> col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd', 'a', 't', 'a'};
std::vector<uint8_t> col_binary = {'f', 'i', 'x', 'e', 'd', ' ', 'b', 'i', 'n', 'a',
'r', 'y', ' ', 'd', 'a', 't', 'a', '!', '!', '!'};
// Upsert a row with all datatypes
{
fluss::GenericRow row(17);
row.SetInt32(0, pk_int);
row.SetBool(1, col_boolean);
row.SetInt32(2, col_tinyint);
row.SetInt32(3, col_smallint);
row.SetInt32(4, col_int);
row.SetInt64(5, col_bigint);
row.SetFloat32(6, col_float);
row.SetFloat64(7, col_double);
row.SetString(8, col_char);
row.SetString(9, col_string);
row.SetDecimal(10, col_decimal);
row.SetDate(11, col_date);
row.SetTime(12, col_time);
row.SetTimestampNtz(13, col_timestamp);
row.SetTimestampLtz(14, col_timestamp_ltz);
row.SetBytes(15, col_bytes);
row.SetBytes(16, col_binary);
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(row, wr));
ASSERT_OK(wr.Wait());
}
// Lookup the record
fluss::Lookuper lookuper;
ASSERT_OK(table.NewLookup().CreateLookuper(lookuper));
{
fluss::GenericRow key(17);
key.SetInt32(0, pk_int);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
// Verify all datatypes
EXPECT_EQ(result.GetInt32(0), pk_int) << "pk_int mismatch";
EXPECT_EQ(result.GetBool(1), col_boolean) << "col_boolean mismatch";
EXPECT_EQ(result.GetInt32(2), col_tinyint) << "col_tinyint mismatch";
EXPECT_EQ(result.GetInt32(3), col_smallint) << "col_smallint mismatch";
EXPECT_EQ(result.GetInt32(4), col_int) << "col_int mismatch";
EXPECT_EQ(result.GetInt64(5), col_bigint) << "col_bigint mismatch";
EXPECT_NEAR(result.GetFloat32(6), col_float, 1e-6f) << "col_float mismatch";
EXPECT_NEAR(result.GetFloat64(7), col_double, 1e-15) << "col_double mismatch";
EXPECT_EQ(result.GetString(8), col_char) << "col_char mismatch";
EXPECT_EQ(result.GetString(9), col_string) << "col_string mismatch";
EXPECT_EQ(result.GetDecimalString(10), col_decimal) << "col_decimal mismatch";
EXPECT_EQ(result.GetDate(11).days_since_epoch, col_date.days_since_epoch) << "col_date mismatch";
EXPECT_EQ(result.GetTime(12).millis_since_midnight, col_time.millis_since_midnight) << "col_time mismatch";
EXPECT_EQ(result.GetTimestamp(13).epoch_millis, col_timestamp.epoch_millis)
<< "col_timestamp mismatch";
EXPECT_EQ(result.GetTimestamp(14).epoch_millis, col_timestamp_ltz.epoch_millis)
<< "col_timestamp_ltz mismatch";
auto [bytes_ptr, bytes_len] = result.GetBytes(15);
EXPECT_EQ(bytes_len, col_bytes.size()) << "col_bytes length mismatch";
EXPECT_TRUE(std::memcmp(bytes_ptr, col_bytes.data(), bytes_len) == 0)
<< "col_bytes mismatch";
auto [binary_ptr, binary_len] = result.GetBytes(16);
EXPECT_EQ(binary_len, col_binary.size()) << "col_binary length mismatch";
EXPECT_TRUE(std::memcmp(binary_ptr, col_binary.data(), binary_len) == 0)
<< "col_binary mismatch";
}
// Test with null values for nullable columns
{
fluss::GenericRow row_with_nulls(17);
row_with_nulls.SetInt32(0, 2); // pk_int = 2
for (size_t i = 1; i < 17; ++i) {
row_with_nulls.SetNull(i);
}
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(row_with_nulls, wr));
ASSERT_OK(wr.Wait());
}
// Lookup row with nulls
{
fluss::GenericRow key(17);
key.SetInt32(0, 2);
fluss::LookupResult result;
ASSERT_OK(lookuper.Lookup(key, result));
ASSERT_TRUE(result.Found());
EXPECT_EQ(result.GetInt32(0), 2) << "pk_int mismatch";
for (size_t i = 1; i < 17; ++i) {
EXPECT_TRUE(result.IsNull(i)) << "column " << i << " should be null";
}
}
ASSERT_OK(adm.DropTable(table_path, false));
}