| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "meta-store/mem_txn_kv.h" |
| |
| #include <gtest/gtest-death-test.h> |
| #include <gtest/gtest.h> |
| |
| #include <chrono> |
| #include <memory> |
| #include <thread> |
| |
| #include "common/config.h" |
| #include "common/util.h" |
| #include "meta-service/doris_txn.h" |
| #include "meta-store/codec.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| |
| using namespace doris; |
| |
| std::shared_ptr<cloud::TxnKv> fdb_txn_kv; |
| |
| int main(int argc, char** argv) { |
| cloud::config::init(nullptr, true); |
| cloud::config::fdb_cluster_file_path = "fdb.cluster"; |
| fdb_txn_kv = std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>()); |
| if (!fdb_txn_kv.get()) { |
| std::cout << "exit get FdbTxnKv error" << std::endl; |
| return -1; |
| } |
| if (fdb_txn_kv->init() != 0) { |
| std::cout << "exit inti FdbTxnKv error" << std::endl; |
| return -1; |
| } |
| |
| ::testing::InitGoogleTest(&argc, argv); |
| return RUN_ALL_TESTS(); |
| } |
| |
| static void put_and_get_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| std::unique_ptr<Transaction> txn; |
| std::string key = "testkey1"; |
| std::string val = "testvalue1"; |
| { |
| // put |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(key, val); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| int64_t ver1; |
| ASSERT_EQ(txn->get_committed_version(&ver1), TxnErrorCode::TXN_OK); |
| |
| // get |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get(key, &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| int64_t ver2 = 0; |
| ASSERT_EQ(txn->get_read_version(&ver2), TxnErrorCode::TXN_OK); |
| ASSERT_GE(ver2, ver1) << txn_kv_class; |
| ASSERT_EQ(val, get_val) << txn_kv_class; |
| std::cout << "val:" << get_val << std::endl; |
| |
| // get not exist key |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get("NotExistKey", &get_val), TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << txn_kv_class; |
| } |
| } |
| |
| TEST(TxnMemKvTest, PutAndGetTest) { |
| using namespace doris::cloud; |
| |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| put_and_get_test(mem_txn_kv); |
| put_and_get_test(fdb_txn_kv); |
| } |
| |
| static void range_get_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| std::unique_ptr<Transaction> txn; |
| std::vector<std::pair<std::string, std::string>> put_kv = { |
| std::make_pair("key1", "val1"), std::make_pair("key2", "val2"), |
| std::make_pair("key3", "val3"), std::make_pair("key4", "val4"), |
| std::make_pair("key5", "val5"), |
| }; |
| |
| // put some kvs before test |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (const auto& [key, val] : put_kv) { |
| txn->put(key, val); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // normal range get |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> iter; |
| ASSERT_EQ(txn->get("key1", "key4", &iter), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(iter->size(), 3) << txn_kv_class; |
| ASSERT_EQ(iter->more(), false) << txn_kv_class; |
| int i = 0; |
| while (iter->has_next()) { |
| auto [key, val] = iter->next(); |
| ASSERT_EQ(key, put_kv[i].first) << txn_kv_class; |
| ASSERT_EQ(val, put_kv[i].second) << txn_kv_class; |
| ++i; |
| std::cout << "key:" << key << " val:" << val << std::endl; |
| } |
| } |
| |
| // range get with not exist end key |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> iter; |
| ASSERT_EQ(txn->get("key2", "key6", &iter), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 4) << txn_kv_class; |
| ASSERT_EQ(iter->more(), false) << txn_kv_class; |
| int i = 1; |
| while (iter->has_next()) { |
| auto [key, val] = iter->next(); |
| ASSERT_EQ(key, put_kv[i].first) << txn_kv_class; |
| ASSERT_EQ(val, put_kv[i].second) << txn_kv_class; |
| ++i; |
| std::cout << "key:" << key << " val:" << val << std::endl; |
| } |
| } |
| |
| // range get with limit |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> iter; |
| ASSERT_EQ(txn->get("key1", "key4", &iter, false, 1), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 1) << txn_kv_class; |
| ASSERT_EQ(iter->more(), true) << txn_kv_class; |
| |
| auto [key, val] = iter->next(); |
| ASSERT_EQ(key, put_kv[0].first) << txn_kv_class; |
| ASSERT_EQ(val, put_kv[0].second) << txn_kv_class; |
| } |
| |
| // range get with begin key larger than end key |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> iter; |
| ASSERT_EQ(txn->get("key4", "key1", &iter), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 0) << txn_kv_class; |
| ASSERT_EQ(txn->get("key1", "key1", &iter), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 0) << txn_kv_class; |
| } |
| } |
| |
| TEST(TxnMemKvTest, RangeGetTest) { |
| using namespace doris::cloud; |
| |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| range_get_test(mem_txn_kv); |
| range_get_test(fdb_txn_kv); |
| } |
| |
| static void remove_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::unique_ptr<Transaction> txn; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| std::vector<std::pair<std::string, std::string>> put_kv = { |
| std::make_pair("key1", "val1"), std::make_pair("key2", "val2"), |
| std::make_pair("key3", "val3"), std::make_pair("key4", "val4"), |
| std::make_pair("key5", "val5"), |
| }; |
| |
| // put some kvs before test |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (const auto& [key, val] : put_kv) { |
| txn->put(key, val); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // remove single key |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->remove("key1"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string get_val; |
| ASSERT_EQ(txn->get("key1", &get_val), TxnErrorCode::TXN_KEY_NOT_FOUND) << txn_kv_class; |
| } |
| |
| // range remove with begin key larger than end key |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->remove("key5", "key1"); |
| ASSERT_NE(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> iter; |
| ASSERT_EQ(txn->get("key2", "key6", &iter), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 4) << txn_kv_class; |
| } |
| |
| // range remove |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| txn->remove("key2", "key6"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> iter; |
| ASSERT_EQ(txn->get("key2", "key6", &iter), TxnErrorCode::TXN_OK); |
| while (iter->has_next()) { |
| auto [key, value] = iter->next(); |
| std::cout << "key: " << key << ", value: " << value << std::endl; |
| } |
| ASSERT_EQ(iter->size(), 0) << txn_kv_class; |
| } |
| } |
| TEST(TxnMemKvTest, RemoveTest) { |
| using namespace doris::cloud; |
| |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| remove_test(mem_txn_kv); |
| remove_test(fdb_txn_kv); |
| } |
| |
| static void atomic_set_ver_value_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::unique_ptr<Transaction> txn; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| // txn_kv_test.cpp |
| { |
| std::string key; |
| std::string val; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| key.push_back('\xfe'); |
| key.append(" unit_test_prefix "); |
| key.append(" GetVersionTest "); |
| txn->atomic_set_ver_value(key, ""); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| int64_t ver0 = 0; |
| ASSERT_EQ(txn->get_committed_version(&ver0), TxnErrorCode::TXN_OK); |
| ASSERT_GT(ver0, 0) << txn_kv_class; |
| |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| err = txn->get(key, &val); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK) << txn_kv_class; |
| int64_t ver1 = 0; |
| ASSERT_EQ(txn->get_read_version(&ver1), TxnErrorCode::TXN_OK); |
| ASSERT_GE(ver1, ver0) << txn_kv_class; |
| |
| int64_t ver2; |
| int64_t txn_id; |
| int ret = get_txn_id_from_fdb_ts(val, &txn_id); |
| ASSERT_EQ(ret, 0) << txn_kv_class; |
| ver2 = txn_id >> 10; |
| std::cout << "ver0=" << ver0 << " ver1=" << ver1 << " ver2=" << ver2 << std::endl; |
| |
| // ASSERT_EQ(ver0, ver2); |
| } |
| } |
| |
| TEST(TxnMemKvTest, AtomicSetVerValueTest) { |
| using namespace doris::cloud; |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| atomic_set_ver_value_test(mem_txn_kv); |
| atomic_set_ver_value_test(fdb_txn_kv); |
| } |
| |
| static void atomic_set_ver_key_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::string key_prefix = "key_1"; |
| |
| std::string versionstamp_1; |
| { |
| // write key_1 |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->atomic_set_ver_key(key_prefix, "1"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // read key_1 |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string end_key = key_prefix + "\xFF"; |
| std::unique_ptr<RangeGetIterator> it; |
| ASSERT_EQ(txn->get(key_prefix, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(it->has_next()); |
| auto&& [key_1, _1] = it->next(); |
| ASSERT_EQ(key_1.length(), key_prefix.size() + 10); // versionstamp = 10bytes |
| key_1.remove_prefix(key_prefix.size()); |
| versionstamp_1 = key_1; |
| } |
| |
| std::string versionstamp_2; |
| { |
| // write key_2 |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| key_prefix = "key_2"; |
| txn->atomic_set_ver_key(key_prefix, "2"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // read key_2 |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string end_key = key_prefix + "\xFF"; |
| std::unique_ptr<RangeGetIterator> it; |
| ASSERT_EQ(txn->get(key_prefix, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(it->has_next()); |
| auto&& [key_2, _2] = it->next(); |
| ASSERT_EQ(key_2.length(), key_prefix.size() + 10); // versionstamp = 10bytes |
| key_2.remove_prefix(key_prefix.size()); |
| versionstamp_2 = key_2; |
| } |
| |
| ASSERT_LT(versionstamp_1, versionstamp_2); |
| |
| std::string versionstamp_3; |
| { |
| // write key_3, with offset |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string suffix = "_suffix"; |
| std::string prefix = "key_3_"; |
| std::string k(prefix); |
| uint32_t offset = k.size(); |
| k.append(10, '\0'); // reserve 10 bytes for versionstamp |
| k += suffix; |
| ASSERT_TRUE(txn->atomic_set_ver_key(k, offset, "3")); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| // read key_3 |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string end_key = prefix + "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"; |
| std::unique_ptr<RangeGetIterator> it; |
| ASSERT_EQ(txn->get(prefix, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(it->has_next()); |
| auto&& [key_3, _3] = it->next(); |
| ASSERT_EQ(key_3.length(), k.size()); |
| key_3.remove_suffix(suffix.size()); |
| key_3.remove_prefix(prefix.size()); |
| versionstamp_3 = key_3; |
| } |
| |
| ASSERT_LT(versionstamp_2, versionstamp_3); |
| |
| { |
| // write key, but offset is invalid |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string prefix = "key_4_"; |
| std::string k(prefix); |
| k.append(10, '\0'); // reserve 10 bytes for versionstamp |
| uint32_t offset = k.size() + 1; // invalid offset |
| ASSERT_FALSE(txn->atomic_set_ver_key(k, offset, "4")); |
| |
| k = "fake"; |
| ASSERT_FALSE(txn->atomic_set_ver_key(k, 0, "4")); |
| } |
| } |
| |
| TEST(TxnMemKvTest, AtomicSetVerKeyTest) { |
| using namespace doris::cloud; |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| atomic_set_ver_key_test(mem_txn_kv); |
| atomic_set_ver_key_test(fdb_txn_kv); |
| } |
| |
| static void atomic_add_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::unique_ptr<Transaction> txn; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| // clear counter |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->remove("counter"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| // add to uninitialized kv |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->atomic_add("counter", 123); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string val; |
| ASSERT_EQ(txn->get("counter", &val), TxnErrorCode::TXN_OK); |
| int64_t val_int = *reinterpret_cast<const int64_t*>(val.data()); |
| ASSERT_EQ(val_int, 123) << txn_kv_class; |
| |
| txn->put("counter", "1"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // add |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->atomic_add("counter", 10); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get("counter", &val), TxnErrorCode::TXN_OK); |
| val_int = *reinterpret_cast<const int64_t*>(val.data()); |
| std::cout << "atomic add: " << val_int << std::endl; |
| ASSERT_EQ(val_int, 59) << txn_kv_class; // "1" + 10 = ASCII("1") + 10 = 49 + 10 = 59 |
| |
| // sub |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->atomic_add("counter", -5); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get("counter", &val), TxnErrorCode::TXN_OK); |
| val_int = *reinterpret_cast<const int64_t*>(val.data()); |
| std::cout << "atomic sub: " << val_int << std::endl; |
| ASSERT_EQ(val_int, 54) << txn_kv_class; |
| } |
| |
| TEST(TxnMemKvTest, AtomicAddTest) { |
| using namespace doris::cloud; |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| atomic_add_test(mem_txn_kv); |
| atomic_add_test(fdb_txn_kv); |
| } |
| |
| // modify identical key in one transcation |
| static void modify_identical_key_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::unique_ptr<Transaction> txn; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| // put after remove |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put("test", "1"); |
| txn->remove("test"); |
| txn->put("test", "2"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get("test", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "2") << txn_kv_class; |
| } |
| |
| // remove after put |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put("test", "1"); |
| txn->remove("test"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get("test", &get_val), TxnErrorCode::TXN_KEY_NOT_FOUND) << txn_kv_class; |
| } |
| } |
| |
| TEST(TxnMemKvTest, ModifyIdenticalKeyTest) { |
| using namespace doris::cloud; |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| modify_identical_key_test(mem_txn_kv); |
| modify_identical_key_test(fdb_txn_kv); |
| } |
| |
| static void modify_snapshot_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::unique_ptr<Transaction> txn_1; |
| std::unique_ptr<Transaction> txn_2; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| { |
| std::string get_val; |
| // txn_1: put <test, version1> and commit |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("test", "version1"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| // txn_2: get the snapshot of database, will see <test, version1> |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("test", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "version1") << txn_kv_class; |
| |
| // txn_1: modify <test, version1> to <test, version2> and commit |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("test", "version2"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| // txn_2: should still see the <test, version1> |
| ASSERT_EQ(txn_2->get("test", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "version1") << txn_kv_class; |
| |
| // txn_2: modify <test, version1> to <test, version3> but not commit, |
| // txn_2 should get <test, version3> |
| txn_2->put("test", "version3"); |
| ASSERT_EQ(txn_2->get("test", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "version3") << txn_kv_class; |
| |
| // txn_2: remove <test, version3> bu not commit, |
| // txn_2 should not get <test, version3> |
| txn_2->remove("test"); |
| ASSERT_EQ(txn_2->get("test", &get_val), TxnErrorCode::TXN_KEY_NOT_FOUND) << txn_kv_class; |
| |
| // txn_1: will still see <test, version2> |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_1->get("test", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "version2") << txn_kv_class; |
| |
| // txn_2: commit all changes, should conflict |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| |
| // txn_1: should not get <test, version2> |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_1->get("test", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "version2") << txn_kv_class; |
| } |
| |
| { |
| std::string get_val; |
| |
| // txn_1: put <test, version1> and commit |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("test", "version1"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| // txn_2: read the key set by atomic_set_xxx before commit |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| txn_2->atomic_set_ver_value("test", ""); |
| TxnErrorCode err = txn_2->get("test", &get_val); |
| // can not read the unreadable key |
| ASSERT_TRUE(err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << txn_kv_class; |
| // after read the unreadable key, can not commit |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK); |
| |
| // txn_1: still see the <test version1> |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_1->get("test", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "version1") << txn_kv_class; |
| } |
| } |
| |
| TEST(TxnMemKvTest, ModifySnapshotTest) { |
| using namespace doris::cloud; |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| modify_snapshot_test(mem_txn_kv); |
| modify_snapshot_test(fdb_txn_kv); |
| } |
| |
| static void check_conflicts_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| std::unique_ptr<Transaction> txn_1; |
| std::unique_ptr<Transaction> txn_2; |
| |
| // txn1 change "key" after txn2 get "key", txn2 should conflict when change "key". |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key", "txn1_1"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("key", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "txn1_1"); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key", "txn1_2"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| txn_2->put("key", "txn2_1"); |
| ASSERT_EQ(txn_2->get("key", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "txn2_1"); |
| |
| // conflicts |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| // txn1 add "key" after txn2 get "key", txn2 should conflict when add "key2". |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->remove("key"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("key", &get_val), TxnErrorCode::TXN_KEY_NOT_FOUND) << txn_kv_class; |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key", "txn1_1"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| txn_2->put("key2", "txn2_1"); |
| |
| // conflicts |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| // txn1 change "key" after txn2 get "key", |
| // txn2 can read "key2" before commit, but commit conflict. |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key", "txn1_1"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("key", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "txn1_1"); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key", "txn1_2"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| txn_2->put("key2", "txn2_2"); |
| ASSERT_EQ(txn_2->get("key2", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "txn2_2") << txn_kv_class; |
| |
| // conflicts |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| // txn1 change "key" after txn2 get "key", txn2 should conflict when atomic_set "key". |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key", "txn1_1"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("key", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "txn1_1"); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key", "txn1_2"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| txn_2->atomic_set_ver_value("key", "txn2_2"); |
| |
| // conflicts |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| // txn1 change "key1" after txn2 range get "key1~key5", txn2 should conflict when change "key2" |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key1", "v1"); |
| txn_1->put("key2", "v2"); |
| txn_1->put("key3", "v3"); |
| txn_1->put("key4", "v4"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> iter; |
| ASSERT_EQ(txn_2->get("key1", "key5", &iter), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 4) << txn_kv_class; |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key1", "v11"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| txn_2->put("key2", "v22"); |
| ASSERT_EQ(txn_2->get("key2", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "v22") << txn_kv_class; |
| |
| // conflicts |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| // txn1 change "key3" after txn2 limit range get "key1~key5", txn2 do not conflict when change "key4" |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key1", "v1"); |
| txn_1->put("key2", "v2"); |
| txn_1->put("key3", "v3"); |
| txn_1->put("key4", "v4"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> iter; |
| ASSERT_EQ(txn_2->get("key1", "key5", &iter, false, 1), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 1) << txn_kv_class; |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key3", "v33"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| txn_2->put("key4", "v44"); |
| ASSERT_EQ(txn_2->get("key4", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "v44") << txn_kv_class; |
| |
| // not conflicts |
| ASSERT_EQ(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| // txn1 remove "key1" after txn2 get "key1", txn2 should conflict when change "key5". |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key1", "v1"); |
| txn_1->put("key2", "v2"); |
| txn_1->put("key3", "v3"); |
| txn_1->put("key4", "v4"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("key1", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "v1") << txn_kv_class; |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->remove("key1"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_2->get("key1", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "v1") << txn_kv_class; |
| |
| txn_2->put("key5", "v5"); |
| ASSERT_EQ(txn_2->get("key5", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "v5") << txn_kv_class; |
| |
| // conflicts |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| // txn1 range remove "key1~key4" after txn2 get "key1", txn2 should conflict when change "key1". |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key1", "v1"); |
| txn_1->put("key2", "v2"); |
| txn_1->put("key3", "v3"); |
| txn_1->put("key4", "v4"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("key1", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "v1"); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->remove("key1", "key4"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_2->get("key1", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "v1"); |
| txn_2->put("key1", "v11"); |
| // conflicts |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| } |
| |
| TEST(TxnMemKvTest, CheckConflictsTest) { |
| using namespace doris::cloud; |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| check_conflicts_test(mem_txn_kv); |
| check_conflicts_test(fdb_txn_kv); |
| } |
| |
| // ConflictTest of txn_kv_test.cpp |
| TEST(TxnMemKvTest, ConflictTest) { |
| using namespace doris::cloud; |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| std::unique_ptr<Transaction> txn, txn1, txn2; |
| std::string key = "unit_test"; |
| std::string val, val1, val2; |
| |
| // Historical data |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put("unit_test", "xxxxxxxxxxxxx"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // txn1 begin |
| ASSERT_EQ(txn_kv->create_txn(&txn1), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn1->get(key, &val), TxnErrorCode::TXN_OK); |
| std::cout << "val1=" << val1 << std::endl; |
| |
| // txn2 begin |
| ASSERT_EQ(txn_kv->create_txn(&txn2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn2->get(key, &val), TxnErrorCode::TXN_OK); |
| std::cout << "val2=" << val2 << std::endl; |
| |
| // txn2 commit |
| val2 = "zzzzzzzzzzzzzzz"; |
| txn2->put(key, val2); |
| ASSERT_EQ(txn2->commit(), TxnErrorCode::TXN_OK); |
| |
| // txn1 commit, intend to fail |
| val1 = "yyyyyyyyyyyyyyy"; |
| txn1->put(key, val1); |
| ASSERT_NE(txn1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(val, val2); // First wins |
| std::cout << "final val=" << val << std::endl; |
| } |
| |
| static void txn_behavior_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| std::unique_ptr<Transaction> txn_1; |
| std::unique_ptr<Transaction> txn_2; |
| // av: atomic_set_ver_value |
| // ak: atomic_set_ver_key |
| // ad: atomic_add |
| // c : commit |
| |
| // txn_1: --- put<key, v1> -- av<key, v1> --------------- c |
| // txn_2: ------------------------------- ad<key, v1> ----- put<key, v2> --- c |
| // result: <key v2> |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key", "v1"); |
| txn_1->atomic_set_ver_value("key", "v1"); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| txn_2->atomic_add("key", 1); |
| |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| txn_2->put("key", "v2"); |
| ASSERT_EQ(txn_2->commit(), TxnErrorCode::TXN_OK); |
| |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("key", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "v2"); |
| } |
| |
| // txn_1: --- ad<"key",1> --- av<"key", "v1"> ------ c |
| // txn_2: ------------------- av<"key", "v2"> ---- c |
| // result: <"key", "version"+"v1"> |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->atomic_add("key", 1); |
| txn_1->atomic_set_ver_value("key", "v1"); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| txn_2->atomic_set_ver_value("key", "v2"); |
| |
| ASSERT_EQ(txn_2->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("key", &get_val), TxnErrorCode::TXN_OK); |
| std::cout << get_val << std::endl; |
| } |
| |
| // txn_1: --- put<"key", "1"> --- get<"key"> --- c |
| // result: can get "1" and commit success |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->put("key", "1"); |
| ASSERT_EQ(txn_1->get("key", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(get_val, "1") << txn_kv_class; |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // txn_1: --- ad<"key",1> --- get<"key"> --- c |
| // result: commit success |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->atomic_add("key", 1); |
| ASSERT_EQ(txn_1->get("key", &get_val), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| // txn_1: --- av<"key", "1"> --- get<"key"> --- c |
| // result: can not read the unreadable key and commit error |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->atomic_set_ver_value("key", "1"); |
| TxnErrorCode err = txn_1->get("key", &get_val); |
| // can not read the unreadable key |
| ASSERT_TRUE(err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << txn_kv_class; |
| ASSERT_NE(txn_1->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| // txn_1: --- get<"keyNotExit"> --- put<"keyNotExit", "1"> --- get<"keyNotExit"> --- c |
| // txn_2: --- get<"keyNotExit"> --- put<"keyNotExit", "1"> --- get<"keyNotExit"> ------ c |
| // result: txn_2 commit conflict |
| { |
| std::string get_val; |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| txn_1->remove("keyNotExit"); |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_1), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_1->get("keyNotExit", &get_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| txn_1->put("keyNotExit", "1"); |
| ASSERT_EQ(txn_1->get("keyNotExit", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "1"); |
| |
| ASSERT_EQ(txn_kv->create_txn(&txn_2), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn_2->get("keyNotExit", &get_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| txn_2->put("keyNotExit", "1"); |
| ASSERT_EQ(txn_2->get("keyNotExit", &get_val), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(get_val, "1"); |
| |
| ASSERT_EQ(txn_1->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_NE(txn_2->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| } |
| |
| TEST(TxnMemKvTest, TxnBehaviorTest) { |
| using namespace doris::cloud; |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| txn_behavior_test(mem_txn_kv); |
| txn_behavior_test(fdb_txn_kv); |
| } |
| |
| TEST(TxnMemKvTest, MaybeUnusedFunctionTest) { |
| using namespace doris::cloud; |
| auto mem_txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(mem_txn_kv->init(), 0); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(mem_txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put("key", "v1"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(mem_txn_kv->get_last_commited_version(), 1); |
| ASSERT_EQ(mem_txn_kv->get_last_read_version(), 1); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(mem_txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| auto t = dynamic_cast<memkv::Transaction*>(txn.get()); |
| ASSERT_EQ(t->init(), 0); |
| ASSERT_EQ(t->abort(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| auto new_mem_txn_kv = std::make_shared<MemTxnKv>(); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(new_mem_txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->atomic_set_ver_key("", "v2"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| std::unique_ptr<Transaction> txn2; |
| ASSERT_EQ(new_mem_txn_kv->create_txn(&txn2), TxnErrorCode::TXN_OK); |
| for (auto& t : new_mem_txn_kv->mem_kv_) { |
| int64_t txn_id; |
| ASSERT_EQ(get_txn_id_from_fdb_ts(t.first, &txn_id), 0); |
| auto ver = txn_id >> 10; |
| std::cout << "version: " << ver << std::endl; |
| ASSERT_EQ(ver, 1); |
| } |
| } |
| } |
| |
| TEST(TxnMemKvTest, RangeGetKeySelector) { |
| using namespace doris::cloud; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| constexpr std::string_view prefix = "range_get_key_selector_"; |
| |
| { |
| // Remove the existing keys and insert some new keys. |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| std::string last_key = fmt::format("{}{}", prefix, 9); |
| encode_int64(INT64_MAX, &last_key); |
| txn->remove(prefix, last_key); |
| for (int i = 0; i < 5; ++i) { |
| std::string key = fmt::format("{}{}", prefix, i); |
| txn->put(key, std::to_string(i)); |
| } |
| err = txn->commit(); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| } |
| |
| struct TestCase { |
| RangeKeySelector begin_key_selector, end_key_selector; |
| std::vector<std::string> expected_keys; |
| }; |
| |
| std::string range_begin = fmt::format("{}{}", prefix, 1); |
| std::string range_end = fmt::format("{}{}", prefix, 3); |
| std::vector<TestCase> test_case { |
| { |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| {fmt::format("{}{}", prefix, 1), fmt::format("{}{}", prefix, 2)}, |
| }, |
| { |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| RangeKeySelector::FIRST_GREATER_THAN, |
| { |
| fmt::format("{}{}", prefix, 1), |
| fmt::format("{}{}", prefix, 2), |
| fmt::format("{}{}", prefix, 3), |
| }, |
| }, |
| { |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| RangeKeySelector::LAST_LESS_OR_EQUAL, |
| {fmt::format("{}{}", prefix, 1), fmt::format("{}{}", prefix, 2)}, |
| }, |
| { |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| RangeKeySelector::LAST_LESS_THAN, |
| {fmt::format("{}{}", prefix, 1)}, |
| }, |
| { |
| RangeKeySelector::FIRST_GREATER_THAN, |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| {fmt::format("{}{}", prefix, 2)}, |
| }, |
| { |
| RangeKeySelector::FIRST_GREATER_THAN, |
| RangeKeySelector::FIRST_GREATER_THAN, |
| {fmt::format("{}{}", prefix, 2), fmt::format("{}{}", prefix, 3)}, |
| }, |
| { |
| RangeKeySelector::FIRST_GREATER_THAN, |
| RangeKeySelector::LAST_LESS_OR_EQUAL, |
| {fmt::format("{}{}", prefix, 2)}, |
| }, |
| { |
| RangeKeySelector::FIRST_GREATER_THAN, |
| RangeKeySelector::LAST_LESS_THAN, |
| {}, |
| }, |
| { |
| RangeKeySelector::LAST_LESS_OR_EQUAL, |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| {fmt::format("{}{}", prefix, 1), fmt::format("{}{}", prefix, 2)}, |
| }, |
| { |
| RangeKeySelector::LAST_LESS_OR_EQUAL, |
| RangeKeySelector::FIRST_GREATER_THAN, |
| { |
| fmt::format("{}{}", prefix, 1), |
| fmt::format("{}{}", prefix, 2), |
| fmt::format("{}{}", prefix, 3), |
| }, |
| }, |
| { |
| RangeKeySelector::LAST_LESS_OR_EQUAL, |
| RangeKeySelector::LAST_LESS_OR_EQUAL, |
| {fmt::format("{}{}", prefix, 1), fmt::format("{}{}", prefix, 2)}, |
| }, |
| { |
| RangeKeySelector::LAST_LESS_OR_EQUAL, |
| RangeKeySelector::LAST_LESS_THAN, |
| {fmt::format("{}{}", prefix, 1)}, |
| }, |
| { |
| RangeKeySelector::LAST_LESS_THAN, |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| { |
| fmt::format("{}{}", prefix, 0), |
| fmt::format("{}{}", prefix, 1), |
| fmt::format("{}{}", prefix, 2), |
| }, |
| }, |
| { |
| RangeKeySelector::LAST_LESS_THAN, |
| RangeKeySelector::FIRST_GREATER_THAN, |
| { |
| fmt::format("{}{}", prefix, 0), |
| fmt::format("{}{}", prefix, 1), |
| fmt::format("{}{}", prefix, 2), |
| fmt::format("{}{}", prefix, 3), |
| }, |
| }, |
| { |
| RangeKeySelector::LAST_LESS_THAN, |
| RangeKeySelector::LAST_LESS_OR_EQUAL, |
| { |
| fmt::format("{}{}", prefix, 0), |
| fmt::format("{}{}", prefix, 1), |
| fmt::format("{}{}", prefix, 2), |
| }, |
| }, |
| { |
| RangeKeySelector::LAST_LESS_THAN, |
| RangeKeySelector::LAST_LESS_THAN, |
| {fmt::format("{}{}", prefix, 0), fmt::format("{}{}", prefix, 1)}, |
| }, |
| }; |
| |
| // Scan range with different key selectors |
| for (const auto& tc : test_case) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RangeGetOptions options; |
| options.batch_limit = 1000; |
| options.begin_key_selector = tc.begin_key_selector; |
| options.end_key_selector = tc.end_key_selector; |
| std::unique_ptr<RangeGetIterator> it; |
| err = txn->get(range_begin, range_end, &it, options); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| std::vector<std::string> actual_keys; |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| actual_keys.emplace_back(k); |
| } |
| EXPECT_EQ(actual_keys, tc.expected_keys) |
| << "Failed for begin_key_selector=" << static_cast<int>(tc.begin_key_selector) |
| << ", end_key_selector=" << static_cast<int>(tc.end_key_selector); |
| } |
| } |
| |
| TEST(TxnMemKvTest, ReverseRangeGet) { |
| using namespace doris::cloud; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| constexpr std::string_view prefix = "reverse_range_get_"; |
| |
| { |
| // Remove the existing keys and insert some new keys. |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| std::string last_key = fmt::format("{}{}", prefix, 9); |
| encode_int64(INT64_MAX, &last_key); |
| txn->remove(prefix, last_key); |
| for (int i = 0; i < 5; ++i) { |
| std::string key = fmt::format("{}{}", prefix, i); |
| txn->put(key, std::to_string(i)); |
| } |
| err = txn->commit(); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| } |
| |
| std::string range_begin = fmt::format("{}{}", prefix, 1); |
| std::string range_end = fmt::format("{}{}", prefix, 3); |
| |
| struct TestCase { |
| RangeKeySelector begin_key_selector, end_key_selector; |
| std::vector<std::string> expected_keys; |
| }; |
| |
| std::vector<TestCase> test_case { |
| // 1. [begin, end) |
| { |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| {fmt::format("{}{}", prefix, 2), fmt::format("{}{}", prefix, 1)}, |
| }, |
| // 2. [begin, end] |
| { |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| RangeKeySelector::FIRST_GREATER_THAN, |
| { |
| fmt::format("{}{}", prefix, 3), |
| fmt::format("{}{}", prefix, 2), |
| fmt::format("{}{}", prefix, 1), |
| }, |
| }, |
| // 3. (begin, end) |
| { |
| RangeKeySelector::FIRST_GREATER_THAN, |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| {fmt::format("{}{}", prefix, 2)}, |
| }, |
| // 4. (begin, end] |
| { |
| RangeKeySelector::FIRST_GREATER_THAN, |
| RangeKeySelector::FIRST_GREATER_THAN, |
| {fmt::format("{}{}", prefix, 3), fmt::format("{}{}", prefix, 2)}, |
| }, |
| }; |
| for (const auto& tc : test_case) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RangeGetOptions options; |
| options.batch_limit = 1000; |
| options.begin_key_selector = tc.begin_key_selector; |
| options.end_key_selector = tc.end_key_selector; |
| options.reverse = true; // Reserve range get |
| std::unique_ptr<RangeGetIterator> it; |
| err = txn->get(range_begin, range_end, &it, options); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| std::vector<std::string> actual_keys; |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| actual_keys.emplace_back(k); |
| } |
| EXPECT_EQ(actual_keys, tc.expected_keys) |
| << "Failed for begin_key_selector=" << static_cast<int>(tc.begin_key_selector) |
| << ", end_key_selector=" << static_cast<int>(tc.end_key_selector); |
| } |
| } |
| |
| TEST(TxnMemKvTest, ReverseFullRangeGet) { |
| using namespace doris::cloud; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| constexpr std::string_view prefix = "reverse_full_range_get_"; |
| |
| { |
| // Remove the existing keys and insert some new keys. |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| std::string last_key = fmt::format("{}{:03}", prefix, 99); |
| encode_int64(INT64_MAX, &last_key); |
| txn->remove(prefix, last_key); |
| for (int i = 0; i < 100; ++i) { |
| std::string key = fmt::format("{}{:03}", prefix, i); |
| txn->put(key, std::to_string(i)); |
| } |
| err = txn->commit(); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| } |
| |
| std::string range_begin = fmt::format("{}{:03}", prefix, 1); |
| std::string range_end = fmt::format("{}{:03}", prefix, 98); |
| |
| struct TestCase { |
| RangeKeySelector begin_key_selector, end_key_selector; |
| }; |
| |
| std::vector<TestCase> test_case { |
| // 1. [begin, end) |
| { |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| }, |
| // 2. [begin, end] |
| { |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| RangeKeySelector::FIRST_GREATER_THAN, |
| }, |
| // 3. (begin, end) |
| { |
| RangeKeySelector::FIRST_GREATER_THAN, |
| RangeKeySelector::FIRST_GREATER_OR_EQUAL, |
| }, |
| // 4. (begin, end] |
| { |
| RangeKeySelector::FIRST_GREATER_THAN, |
| RangeKeySelector::FIRST_GREATER_THAN, |
| }, |
| }; |
| |
| for (const auto& tc : test_case) { |
| std::vector<std::string> expected_keys; |
| { |
| // Read the expected keys via range_get |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RangeGetOptions options; |
| options.batch_limit = 11; |
| options.begin_key_selector = tc.begin_key_selector; |
| options.end_key_selector = tc.end_key_selector; |
| options.reverse = true; // Reserve range get |
| std::string begin = range_begin, end = range_end; |
| |
| std::unique_ptr<RangeGetIterator> it; |
| do { |
| err = txn->get(begin, end, &it, options); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| expected_keys.emplace_back(k); |
| } |
| // Get next begin key for reverse range get |
| end = it->last_key(); |
| options.end_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL; |
| } while (it->more()); |
| } |
| |
| std::vector<std::string> actual_keys; |
| { |
| // Read the actual keys via full_range_get |
| FullRangeGetOptions opts(txn_kv); |
| opts.batch_limit = 11; |
| opts.begin_key_selector = tc.begin_key_selector; |
| opts.end_key_selector = tc.end_key_selector; |
| opts.reverse = true; // Reserve full range get |
| |
| auto it = txn_kv->full_range_get(range_begin, range_end, opts); |
| ASSERT_TRUE(it->is_valid()); |
| |
| while (it->has_next()) { |
| auto kvp = it->next(); |
| ASSERT_TRUE(kvp.has_value()); |
| auto [k, v] = *kvp; |
| actual_keys.emplace_back(k); |
| } |
| } |
| |
| EXPECT_EQ(actual_keys, expected_keys) |
| << "Failed for begin_key_selector=" << static_cast<int>(tc.begin_key_selector) |
| << ", end_key_selector=" << static_cast<int>(tc.end_key_selector); |
| } |
| } |
| |
| TEST(TxnMemKvTest, BatchScan) { |
| using namespace doris::cloud; |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| |
| std::vector<std::pair<std::string, std::string>> test_data = { |
| {"different_key", "different_value"}, |
| {"prefix1", "value1"}, |
| {"prefix1_sub1", "sub_value1"}, |
| {"prefix1_sub2", "sub_value2"}, |
| {"prefix2", "value2"}, |
| {"prefix2_sub1", "sub_value3"}, |
| {"prefix3", "value3"}, |
| }; |
| |
| std::unique_ptr<Transaction> txn; |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (const auto& [key, val] : test_data) { |
| txn->put(key, val); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| struct TestCase { |
| bool reverse; |
| std::vector<std::string> scan_keys; |
| std::vector<std::optional<std::string>> expected_keys; |
| }; |
| |
| std::vector<TestCase> test_cases = { |
| {false, |
| {"prefix1", "prefix2", "prefix3", "different_key"}, |
| {"prefix1", "prefix2", "prefix3", "different_key"}}, |
| {false, {"prefix1_", "prefix2_"}, {"prefix1_sub1", "prefix2_sub1"}}, |
| {false, {"prefix5_"}, {std::nullopt}}, |
| {true, {"a"}, {std::nullopt}}, |
| {true, {"prefix1_", "prefix2_"}, {"prefix1_sub2", "prefix2_sub1"}}, |
| {true, |
| {"prefix1", "prefix2", "prefix3", "different_key"}, |
| {"prefix1_sub2", "prefix2_sub1", "prefix3", "different_key"}}, |
| }; |
| |
| size_t count = 0; |
| for (auto& tc : test_cases) { |
| auto ret = txn_kv->create_txn(&txn); |
| ASSERT_EQ(ret, TxnErrorCode::TXN_OK); |
| std::vector<std::optional<std::pair<std::string, std::string>>> results; |
| Transaction::BatchGetOptions opts; |
| opts.reverse = tc.reverse; // Reverse order |
| opts.snapshot = false; |
| |
| ret = txn->batch_scan(&results, tc.scan_keys, opts); |
| ASSERT_EQ(ret, TxnErrorCode::TXN_OK); |
| ASSERT_EQ(results.size(), tc.scan_keys.size()); |
| |
| for (size_t i = 0; i < results.size(); ++i) { |
| if (tc.expected_keys[i].has_value()) { |
| ASSERT_TRUE(results[i].has_value()) |
| << "count: " << count << ", expected key at index " << i |
| << " for scan key: " << tc.scan_keys[i] |
| << ", expected: " << tc.expected_keys[i].value() << ", got: empty"; |
| std::string& key = results[i].value().first; |
| std::string& expected_key = tc.expected_keys[i].value(); |
| ASSERT_EQ(key, expected_key) << "count: " << count << ", mismatch at index " << i |
| << " for scan key: " << tc.scan_keys[i] |
| << ", expected: " << expected_key << ", got: " << key; |
| } else { |
| ASSERT_FALSE(results[i].has_value()); |
| } |
| } |
| count += 1; |
| } |
| } |
| |
| static void versionstamp_test(std::shared_ptr<cloud::TxnKv> txn_kv) { |
| using namespace doris::cloud; |
| std::string txn_kv_class = dynamic_cast<MemTxnKv*>(txn_kv.get()) != nullptr ? " memkv" : " fdb"; |
| std::unique_ptr<Transaction> txn; |
| std::string key_prefix = "versionstamp_test_"; |
| |
| // Test without enabling versionstamp |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| Versionstamp versionstamp; |
| ASSERT_EQ(txn->get_versionstamp(&versionstamp), TxnErrorCode::TXN_INVALID_ARGUMENT) |
| << txn_kv_class; |
| } |
| |
| // Test with versionstamp enabled but no commit |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->enable_get_versionstamp(); |
| Versionstamp versionstamp; |
| ASSERT_EQ(txn->get_versionstamp(&versionstamp), TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << txn_kv_class; |
| } |
| |
| // Test with versionstamp enabled and commit |
| { |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Enable versionstamp and perform versioned operations |
| txn->enable_get_versionstamp(); |
| txn->atomic_set_ver_key(key_prefix + "key1", "value1"); |
| txn->atomic_set_ver_value(key_prefix + "key2", "value2"); |
| |
| // Commit transaction |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| |
| // Get versionstamp |
| Versionstamp versionstamp; |
| ASSERT_EQ(txn->get_versionstamp(&versionstamp), TxnErrorCode::TXN_OK) << txn_kv_class; |
| |
| std::cout << txn_kv_class << " Versionstamp: " << versionstamp.to_string() << std::endl; |
| } |
| |
| // Test multiple transactions get different versionstamps |
| Versionstamp versionstamp1, versionstamp2; |
| { |
| // First transaction |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->enable_get_versionstamp(); |
| txn->atomic_set_ver_key(key_prefix + "tx1", "value1"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(txn->get_versionstamp(&versionstamp1), TxnErrorCode::TXN_OK) << txn_kv_class; |
| |
| // Small delay to ensure different timestamps (mainly for FDB) |
| std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| |
| // Second transaction |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->enable_get_versionstamp(); |
| txn->atomic_set_ver_key(key_prefix + "tx2", "value2"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << txn_kv_class; |
| ASSERT_EQ(txn->get_versionstamp(&versionstamp2), TxnErrorCode::TXN_OK) << txn_kv_class; |
| } |
| |
| ASSERT_NE(versionstamp1, versionstamp2) << txn_kv_class; |
| ASSERT_LT(versionstamp1, versionstamp2) |
| << txn_kv_class; // Later transaction should have larger versionstamp |
| } |
| |
| TEST(TxnMemKvTest, GetVersionstampTest) { |
| using namespace doris::cloud; |
| |
| auto mem_txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(mem_txn_kv.get(), nullptr); |
| |
| versionstamp_test(mem_txn_kv); |
| versionstamp_test(fdb_txn_kv); |
| } |