| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <brpc/controller.h> |
| #include <bvar/window.h> |
| #include <fmt/core.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <google/protobuf/repeated_field.h> |
| #include <gtest/gtest.h> |
| |
| #include <atomic> |
| #include <condition_variable> |
| #include <cstddef> |
| #include <cstdint> |
| #include <limits> |
| #include <memory> |
| #include <random> |
| #include <string> |
| #include <thread> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/util.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/keys.h" |
| #include "meta-service/mem_txn_kv.h" |
| #include "meta-service/meta_service.h" |
| #include "meta-service/meta_service_helper.h" |
| #include "meta-service/meta_service_txn.cpp" |
| #include "meta-service/txn_kv_error.h" |
| #include "mock_resource_manager.h" |
| #include "rate-limiter/rate_limiter.h" |
| #include "recycler/recycler.h" |
| #include "resource-manager/resource_manager.h" |
| |
| using namespace doris::cloud; |
| |
| namespace doris::cloud { |
| void repair_tablet_index( |
| std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string& msg, |
| const std::string& instance_id, int64_t db_id, int64_t txn_id, |
| const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta); |
| }; |
| |
| static doris::cloud::RecyclerThreadPoolGroup thread_group; |
| |
| int main(int argc, char** argv) { |
| const std::string conf_file = "doris_cloud.conf"; |
| if (!doris::cloud::config::init(conf_file.c_str(), true)) { |
| std::cerr << "failed to init config file, conf=" << conf_file << std::endl; |
| return -1; |
| } |
| |
| config::enable_cloud_txn_lazy_commit = true; |
| config::txn_lazy_commit_rowsets_thresold = 2; |
| config::txn_lazy_max_rowsets_per_batch = 2; |
| config::txn_lazy_commit_num_threads = 2; |
| config::max_tablet_index_num_per_batch = 2; |
| |
| config::enable_txn_store_retry = false; |
| |
| config::label_keep_max_second = 0; |
| |
| if (!doris::cloud::init_glog("txn_lazy_commit_test")) { |
| std::cerr << "failed to init glog" << std::endl; |
| return -1; |
| } |
| ::testing::InitGoogleTest(&argc, argv); |
| |
| auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); |
| s3_producer_pool->start(); |
| auto recycle_tablet_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); |
| recycle_tablet_pool->start(); |
| auto group_recycle_function_pool = |
| std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); |
| group_recycle_function_pool->start(); |
| thread_group = |
| RecyclerThreadPoolGroup(std::move(s3_producer_pool), std::move(recycle_tablet_pool), |
| std::move(group_recycle_function_pool)); |
| |
| return RUN_ALL_TESTS(); |
| } |
| namespace doris::cloud { |
| |
| std::unique_ptr<MetaServiceProxy> get_meta_service(std::shared_ptr<TxnKv> txn_kv, |
| bool mock_resource_mgr) { |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->remove("\x00", "\xfe"); // This is dangerous if the fdb is not correctly set |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| auto rs = mock_resource_mgr ? std::make_shared<MockResourceManager>(txn_kv) |
| : std::make_shared<ResourceManager>(txn_kv); |
| auto rl = std::make_shared<RateLimiter>(); |
| auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl); |
| return std::make_unique<MetaServiceProxy>(std::move(meta_service)); |
| } |
| |
| static std::string next_rowset_id() { |
| static int cnt = 0; |
| return std::to_string(++cnt); |
| } |
| |
| static void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id) { |
| auto tablet = req.add_tablet_metas(); |
| tablet->set_table_id(table_id); |
| tablet->set_index_id(index_id); |
| tablet->set_partition_id(partition_id); |
| tablet->set_tablet_id(tablet_id); |
| auto schema = tablet->mutable_schema(); |
| schema->set_schema_version(0); |
| auto first_rowset = tablet->add_rs_metas(); |
| first_rowset->set_rowset_id(0); // required |
| first_rowset->set_rowset_id_v2(next_rowset_id()); |
| first_rowset->set_start_version(0); |
| first_rowset->set_end_version(1); |
| first_rowset->mutable_tablet_schema()->CopyFrom(*schema); |
| } |
| |
| static void create_tablet_without_db_id(MetaServiceProxy* meta_service, int64_t table_id, |
| int64_t index_id, int64_t partition_id, int64_t tablet_id) { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| add_tablet(req, table_id, index_id, partition_id, tablet_id); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t db_id, |
| int64_t table_id, int64_t index_id, int64_t partition_id, |
| int64_t tablet_id) { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| req.set_db_id(db_id); |
| add_tablet(req, table_id, index_id, partition_id, tablet_id); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id, |
| int64_t version = -1, int num_rows = 100) { |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // required |
| rowset.set_rowset_id_v2(next_rowset_id()); |
| rowset.set_tablet_id(tablet_id); |
| rowset.set_partition_id(partition_id); |
| rowset.set_txn_id(txn_id); |
| if (version > 0) { |
| rowset.set_start_version(version); |
| rowset.set_end_version(version); |
| } |
| rowset.set_num_segments(1); |
| rowset.set_num_rows(num_rows); |
| rowset.set_data_disk_size(num_rows * 100); |
| rowset.mutable_tablet_schema()->set_schema_version(0); |
| rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK |
| return rowset; |
| } |
| |
| static void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, |
| CreateRowsetResponse& res) { |
| brpc::Controller cntl; |
| auto arena = res.GetArena(); |
| auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); |
| req->mutable_rowset_meta()->CopyFrom(rowset); |
| meta_service->commit_rowset(&cntl, req, &res, nullptr); |
| if (!arena) { |
| delete req; |
| } |
| } |
| |
| static std::shared_ptr<TxnKv> get_mem_txn_kv() { |
| int ret = 0; |
| // MemKv |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| if (txn_kv != nullptr) { |
| ret = txn_kv->init(); |
| [&] { ASSERT_EQ(ret, 0); }(); |
| } |
| [&] { ASSERT_NE(txn_kv.get(), nullptr); }(); |
| return txn_kv; |
| } |
| |
| static void check_tablet_idx_db_id(std::unique_ptr<Transaction>& txn, int64_t db_id, |
| int64_t tablet_id) { |
| std::string mock_instance = "test_instance"; |
| std::string key = meta_tablet_idx_key({mock_instance, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| TabletIndexPB tablet_idx_pb; |
| tablet_idx_pb.ParseFromString(val); |
| ASSERT_EQ(tablet_idx_pb.db_id(), db_id); |
| } |
| |
| static void check_tablet_idx_without_db_id(std::unique_ptr<Transaction>& txn, int64_t tablet_id) { |
| std::string mock_instance = "test_instance"; |
| std::string key = meta_tablet_idx_key({mock_instance, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| TabletIndexPB tablet_idx_pb; |
| tablet_idx_pb.ParseFromString(val); |
| ASSERT_FALSE(tablet_idx_pb.has_db_id()); |
| } |
| |
| static void check_tmp_rowset_exist(std::unique_ptr<Transaction>& txn, int64_t tablet_id, |
| int64_t txn_id) { |
| std::string mock_instance = "test_instance"; |
| std::string key = meta_rowset_tmp_key({mock_instance, txn_id, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| } |
| |
| static void check_tmp_rowset_not_exist(std::unique_ptr<Transaction>& txn, int64_t tablet_id, |
| int64_t txn_id) { |
| std::string mock_instance = "test_instance"; |
| std::string key = meta_rowset_tmp_key({mock_instance, txn_id, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| static void check_rowset_meta_exist(std::unique_ptr<Transaction>& txn, int64_t tablet_id, |
| int64_t end_version) { |
| std::string mock_instance = "test_instance"; |
| std::string rowset_key = meta_rowset_key({mock_instance, tablet_id, end_version}); |
| std::string rowset_val; |
| ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK); |
| } |
| |
| static void check_rowset_meta_not_exist(std::unique_ptr<Transaction>& txn, int64_t tablet_id, |
| int64_t end_version) { |
| std::string mock_instance = "test_instance"; |
| std::string rowset_key = meta_rowset_key({mock_instance, tablet_id, end_version}); |
| std::string rowset_val; |
| ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| [[maybe_unused]] static void check_txn_visible(std::unique_ptr<Transaction>& txn, int64_t db_id, |
| int64_t txn_id, std::string label) { |
| std::string mock_instance = "test_instance"; |
| std::string info_key, info_val; |
| TxnInfoKeyInfo txn_info_key_info {mock_instance, db_id, txn_id}; |
| txn_info_key(txn_info_key_info, &info_key); |
| ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.ParseFromString(info_val); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_VISIBLE); |
| |
| std::string label_key, label_val; |
| txn_label_key({mock_instance, db_id, label}, &label_key); |
| ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK); |
| |
| std::string index_key, index_val; |
| index_key = txn_index_key({mock_instance, txn_id}); |
| ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK); |
| |
| std::string running_key, running_value; |
| TxnRunningKeyInfo running_key_info {mock_instance, db_id, txn_id}; |
| txn_running_key(running_key_info, &running_key); |
| ASSERT_EQ(txn->get(running_key, &running_value), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string rec_txn_key, rec_txn_val; |
| RecycleTxnKeyInfo recycle_txn_key_info {mock_instance, db_id, txn_id}; |
| recycle_txn_key(recycle_txn_key_info, &rec_txn_key); |
| ASSERT_EQ(txn->get(rec_txn_key, &rec_txn_val), TxnErrorCode::TXN_OK); |
| } |
| |
| [[maybe_unused]] static void check_txn_committed(std::unique_ptr<Transaction>& txn, int64_t db_id, |
| int64_t txn_id, std::string label) { |
| std::string mock_instance = "test_instance"; |
| std::string info_key, info_val; |
| TxnInfoKeyInfo txn_info_key_info {mock_instance, db_id, txn_id}; |
| txn_info_key(txn_info_key_info, &info_key); |
| ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.ParseFromString(info_val); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_COMMITTED); |
| |
| std::string label_key, label_val; |
| txn_label_key({mock_instance, db_id, label}, &label_key); |
| ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK); |
| |
| std::string index_key, index_val; |
| index_key = txn_index_key({mock_instance, txn_id}); |
| ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK); |
| |
| std::string running_key, running_value; |
| TxnRunningKeyInfo running_key_info {mock_instance, db_id, txn_id}; |
| txn_running_key(running_key_info, &running_key); |
| ASSERT_EQ(txn->get(running_key, &running_value), TxnErrorCode::TXN_OK); |
| |
| std::string rec_txn_key, rec_txn_val; |
| RecycleTxnKeyInfo recycle_txn_key_info {mock_instance, db_id, txn_id}; |
| recycle_txn_key(recycle_txn_key_info, &rec_txn_key); |
| ASSERT_EQ(txn->get(rec_txn_key, &rec_txn_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| static void check_txn_not_exist(std::unique_ptr<Transaction>& txn, int64_t db_id, int64_t txn_id, |
| std::string label) { |
| std::string mock_instance = "test_instance"; |
| |
| std::string info_key, info_val; |
| TxnInfoKeyInfo txn_info_key_info {mock_instance, db_id, txn_id}; |
| txn_info_key(txn_info_key_info, &info_key); |
| ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string label_key, label_val; |
| txn_label_key({mock_instance, db_id, label}, &label_key); |
| ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string index_key, index_val; |
| index_key = txn_index_key({mock_instance, txn_id}); |
| ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string running_key, running_value; |
| TxnRunningKeyInfo running_key_info {mock_instance, db_id, txn_id}; |
| txn_running_key(running_key_info, &running_key); |
| ASSERT_EQ(txn->get(running_key, &running_value), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string rec_txn_key, rec_txn_val; |
| RecycleTxnKeyInfo recycle_txn_key_info {mock_instance, db_id, txn_id}; |
| recycle_txn_key(recycle_txn_key_info, &rec_txn_key); |
| ASSERT_EQ(txn->get(rec_txn_key, &rec_txn_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| TEST(TxnLazyCommitTest, CreateTabletWithDbIdTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| auto meta_service = get_meta_service(txn_kv, true); |
| int db_id = 1000313313; |
| int table_id = 1001414121; |
| int index_id = 1002316473; |
| int partition_id = 10035151; |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 11414703; |
| for (int i = 0; i < 5; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, CreateTabletWithoutDbIdTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| auto meta_service = get_meta_service(txn_kv, true); |
| int table_id = 3131; |
| int index_id = 4131; |
| int partition_id = 3131; |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 42411890; |
| for (int i = 0; i < 5; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_without_db_id(txn, tablet_id); |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, RepairTabletIndexTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| auto meta_service = get_meta_service(txn_kv, true); |
| int64_t db_id = 21318977; |
| int64_t table_id = 46714; |
| int64_t index_id = 83645; |
| int64_t partition_id = 123131; |
| std::string label = "test_repair_tablet_index"; |
| int64_t txn_id = 0; |
| std::string mock_instance = "test_instance"; |
| |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| |
| TxnInfoPB txn_info; |
| txn_info.set_db_id(db_id); |
| txn_info.set_label(label); |
| txn_info.add_table_ids(table_id); |
| txn_info.set_timeout_ms(6000); |
| req.mutable_txn_info()->CopyFrom(txn_info); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| } |
| |
| ASSERT_GT(txn_id, 0); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 87134121; |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta; |
| for (int i = 0; i < 5; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); |
| tmp_rowsets_meta.push_back(std::make_pair("mock_tmp_rowset_key", tmp_rowset)); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_without_db_id(txn, tablet_id); |
| } |
| } |
| |
| MetaServiceCode code = MetaServiceCode::UNDEFINED_ERR; |
| std::string msg; |
| repair_tablet_index(txn_kv, code, msg, mock_instance, db_id, txn_id, tmp_rowsets_meta); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| } |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 3131397513; |
| int64_t table_id = 3213867; |
| int64_t index_id = 123513; |
| int64_t partition_id = 113123; |
| bool commit_txn_eventually_finish_hit = false; |
| bool last_pending_txn_id_hit = false; |
| int repair_tablet_idx_count = 0; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx", [&](auto&& args) { |
| bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]); |
| LOG(INFO) << "zhangleixxx2" << need_repair_tablet_idx; |
| if (repair_tablet_idx_count == 0) { |
| ASSERT_TRUE(need_repair_tablet_idx); |
| repair_tablet_idx_count++; |
| } else { |
| ASSERT_FALSE(need_repair_tablet_idx); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&& args) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, 0); |
| last_pending_txn_id_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_commit_txn_eventually"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1103; |
| for (int i = 0; i < 5; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| req.add_base_tablet_ids(tablet_id); |
| } |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_GE(repair_tablet_idx_count, 0); |
| ASSERT_TRUE(last_pending_txn_id_hit); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 55432134; |
| int64_t table_id = 326843; |
| int64_t index_id = 34345678; |
| int64_t partition_id = 212343; |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_commit_txn_eventually"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 372323; |
| for (int i = 0; i < 5; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| AbortTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_reason("test"); |
| AbortTxnResponse res; |
| meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().status(), TxnStatusPB::TXN_STATUS_ABORTED); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| req.add_base_tablet_ids(tablet_id); |
| } |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::TXN_ALREADY_ABORTED); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_not_exist(txn, tablet_id, 2); |
| } |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 7651485414; |
| int64_t table_id = 31478952181; |
| int64_t index_id = 89894141; |
| int64_t partition_id = 1241241; |
| bool commit_txn_eventually_finish_hit = false; |
| bool last_pending_txn_id_hit = false; |
| int repair_tablet_idx_count = 0; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx", [&](auto&& args) { |
| bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]); |
| ASSERT_FALSE(need_repair_tablet_idx); |
| repair_tablet_idx_count++; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&& args) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, 0); |
| last_pending_txn_id_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_commit_txn_eventually2"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 3131124; |
| for (int i = 0; i < 5; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_GE(repair_tablet_idx_count, 0); |
| ASSERT_TRUE(last_pending_txn_id_hit); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 5; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 983153141; |
| int64_t table_id = 71419093; |
| int64_t index_id = 80124; |
| int64_t partition_id = 8989313; |
| bool commit_txn_immediatelly_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_immediately::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_immediatelly_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_commit_txn_immediatelly"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 31311414; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_TRUE(commit_txn_immediatelly_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_without_db_id(txn, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, NotFallThroughCommitTxnEventuallyTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 415413556; |
| int64_t table_id = 34184234; |
| int64_t index_id = 9059444; |
| int64_t partition_id = 8934984; |
| bool commit_txn_immediatelly_hit = false; |
| bool commit_txn_eventually_finish_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_immediately::before_commit", [&](auto&& args) { |
| TxnErrorCode* err = try_any_cast<TxnErrorCode*>(args[0]); |
| *err = TxnErrorCode::TXN_BYTES_TOO_LARGE; |
| |
| MetaServiceCode* code = try_any_cast<MetaServiceCode*>(args[1]); |
| *code = cast_as<ErrCategory::COMMIT>(*err); |
| |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| commit_txn_immediatelly_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_not_fallthrough_commit_txn_eventually"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 783426908; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); |
| ASSERT_TRUE(commit_txn_immediatelly_hit); |
| ASSERT_FALSE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_not_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, FallThroughCommitTxnEventuallyTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 903437831; |
| int64_t table_id = 13845693; |
| int64_t index_id = 2366023843; |
| int64_t partition_id = 210486436; |
| bool commit_txn_immediatelly_hit = false; |
| bool commit_txn_eventually_finish_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_immediately::before_commit", [&](auto&& args) { |
| TxnErrorCode* err = try_any_cast<TxnErrorCode*>(args[0]); |
| *err = TxnErrorCode::TXN_BYTES_TOO_LARGE; |
| |
| MetaServiceCode* code = try_any_cast<MetaServiceCode*>(args[1]); |
| *code = cast_as<ErrCategory::COMMIT>(*err); |
| |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| commit_txn_immediatelly_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_fallthrough_commit_txn_eventually"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1908462; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_TRUE(commit_txn_immediatelly_hit); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase1Test) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // thread1 thread2 |
| // | | |
| // commit_txn_eventually begin commit_txn_eventually begin |
| // | | |
| // lazy commit wait | |
| // | | |
| // | advance last txn |
| // | | |
| // | finish |
| // | | |
| // finish | |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 315477; |
| int64_t table_id = 31752134; |
| int64_t index_id = 3458532; |
| int64_t partition_id = 26328765; |
| |
| std::mutex go_mutex; |
| std::condition_variable go_cv; |
| |
| bool go = false; |
| std::atomic<int32_t> commit_txn_eventually_begin_count = {0}; |
| std::atomic<int32_t> last_pending_txn_id_count = {0}; |
| std::atomic<int32_t> txn_lazy_committer_wait_count = {0}; |
| std::atomic<int32_t> finish_count = {0}; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| int64_t first_txn_id = 0; |
| sp->set_call_back("commit_txn_eventually:begin", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| commit_txn_eventually_begin_count++; |
| if (commit_txn_eventually_begin_count == 1) { |
| first_txn_id = *try_any_cast<int64_t*>(args[0]); |
| } |
| if (commit_txn_eventually_begin_count == 2) { |
| { |
| go_cv.wait(_lock, [&] { return txn_lazy_committer_wait_count == 1; }); |
| } |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::advance_last_pending_txn_id", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| last_pending_txn_id_count++; |
| if (last_pending_txn_id_count == 1) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, first_txn_id); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| txn_lazy_committer_wait_count++; |
| if (txn_lazy_committer_wait_count == 1) { |
| go_cv.notify_all(); |
| go_cv.wait(_lock, [&] { return finish_count == 1; }); |
| } |
| }); |
| |
| int64_t second_txn_id = 0; |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| int64_t txn_id = *try_any_cast<int64_t*>(args[1]); |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| finish_count++; |
| if (finish_count == 1) { |
| second_txn_id = txn_id; |
| ASSERT_NE(second_txn_id, first_txn_id); |
| go_cv.notify_all(); |
| } |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1908462; |
| for (int i = 0; i < 10; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| |
| int64_t txn_id1 = 0; |
| std::thread thread1([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually2313"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id1 = res.txn_id(); |
| ASSERT_GT(txn_id1, 0); |
| } |
| { |
| for (int i = 0; i < 10; ++i) { |
| auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id1); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| int64_t txn_id2 = 0; |
| std::thread thread2([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually234142"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id2 = res.txn_id(); |
| ASSERT_GT(txn_id2, 0); |
| } |
| { |
| for (int i = 0; i < 10; ++i) { |
| auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id2); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| std::unique_lock<std::mutex> go_lock(go_mutex); |
| go = true; |
| go_lock.unlock(); |
| go_cv.notify_all(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| ASSERT_EQ(commit_txn_eventually_begin_count, 3); |
| ASSERT_EQ(last_pending_txn_id_count, 1); |
| ASSERT_EQ(finish_count, 2); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 10; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| std::string key = meta_tablet_idx_key({mock_instance, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| TabletIndexPB tablet_idx_pb; |
| tablet_idx_pb.ParseFromString(val); |
| ASSERT_EQ(tablet_idx_pb.db_id(), db_id); |
| |
| std::string tmp_rowset_key = |
| meta_rowset_tmp_key({mock_instance, first_txn_id, tablet_id}); |
| std::string tmp_rowset_val; |
| ASSERT_EQ(txn->get(tmp_rowset_key, &tmp_rowset_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string rowset_key = meta_rowset_key({mock_instance, tablet_id, 2}); |
| std::string rowset_val; |
| ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK); |
| |
| tmp_rowset_key = meta_rowset_tmp_key({mock_instance, second_txn_id, tablet_id}); |
| tmp_rowset_val.clear(); |
| ASSERT_EQ(txn->get(tmp_rowset_key, &tmp_rowset_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| rowset_key = meta_rowset_key({mock_instance, tablet_id, 3}); |
| rowset_val.clear(); |
| ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK); |
| } |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase2Test) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // thread1 thread2 |
| // | | |
| // commit_txn_eventually begin commit_txn_immediately begin |
| // | | |
| // lazy commit wait | |
| // | | |
| // | advance last txn |
| // | | |
| // | finish |
| // | | |
| // finish | |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 134179141; |
| int64_t table_id = 3243234; |
| int64_t index_id = 8098324; |
| int64_t partition_id = 32895321; |
| |
| std::mutex go_mutex; |
| std::condition_variable go_cv; |
| bool go = false; |
| |
| std::atomic<int32_t> commit_txn_immediately_begin_count = {0}; |
| std::atomic<int32_t> last_pending_txn_id_count = {0}; |
| std::atomic<int32_t> txn_lazy_committer_wait_count = {0}; |
| std::atomic<int32_t> immediately_finish_count = {0}; |
| std::atomic<int32_t> eventually_finish_count = {0}; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| int64_t first_txn_id = 0; |
| sp->set_call_back("commit_txn_immediately:begin", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| commit_txn_immediately_begin_count++; |
| if (commit_txn_immediately_begin_count == 1) { |
| { |
| first_txn_id = *try_any_cast<int64_t*>(args[0]); |
| go_cv.wait(_lock, [&] { return txn_lazy_committer_wait_count == 1; }); |
| } |
| } |
| }); |
| |
| int64_t second_txn_id = 0; |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| txn_lazy_committer_wait_count++; |
| if (txn_lazy_committer_wait_count == 1) { |
| int64_t txn_id = *try_any_cast<int64_t*>(args[0]); |
| second_txn_id = txn_id; |
| go_cv.notify_all(); |
| go_cv.wait(_lock, [&] { return immediately_finish_count == 1; }); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_immediately::advance_last_pending_txn_id", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| last_pending_txn_id_count++; |
| if (last_pending_txn_id_count == 1) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, second_txn_id); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_immediately::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| immediately_finish_count++; |
| if (immediately_finish_count == 1) { |
| go_cv.notify_all(); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| eventually_finish_count++; |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1908462; |
| for (int i = 0; i < 10; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| |
| int64_t txn_id1 = 0; |
| std::thread thread1([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually3441"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id1 = res.txn_id(); |
| ASSERT_GT(txn_id1, 0); |
| } |
| { |
| for (int i = 0; i < 10; ++i) { |
| auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id1); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| int64_t txn_id2 = 0; |
| std::thread thread2([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually3245232"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id2 = res.txn_id(); |
| ASSERT_GT(txn_id2, 0); |
| } |
| { |
| for (int i = 0; i < 10; ++i) { |
| auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id2); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(false); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| std::unique_lock<std::mutex> go_lock(go_mutex); |
| go = true; |
| go_lock.unlock(); |
| go_cv.notify_all(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| ASSERT_EQ(commit_txn_immediately_begin_count, 2); |
| ASSERT_EQ(last_pending_txn_id_count, 1); |
| ASSERT_EQ(immediately_finish_count, 1); |
| ASSERT_EQ(eventually_finish_count, 1); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 10; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| |
| check_tmp_rowset_not_exist(txn, tablet_id, first_txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| |
| check_tmp_rowset_not_exist(txn, tablet_id, second_txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 3); |
| } |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase3Test) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // thread1 thread2 |
| // | | |
| // commit_txn_eventually begin get rowset begin |
| // | | |
| // lazy commit wait | |
| // | | |
| // | advance last txn |
| // | | |
| // | finish |
| // | | |
| // finish | |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 6544345; |
| int64_t table_id = 32431068334; |
| int64_t index_id = 132433; |
| int64_t partition_id = 956120248; |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 19201262; |
| |
| for (int i = 0; i < 10; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| { |
| int tmp_txn_id = 0; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_32ae213dasf2"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| tmp_txn_id = res.txn_id(); |
| ASSERT_GT(res.txn_id(), 0); |
| } |
| |
| { |
| for (int i = 0; i < 10; ++i) { |
| auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(tmp_txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| std::mutex go_mutex; |
| std::condition_variable go_cv; |
| bool go = false; |
| |
| std::atomic<int32_t> get_rowset_begin_count = {0}; |
| std::atomic<int32_t> last_pending_txn_id_count = {0}; |
| std::atomic<int32_t> txn_lazy_committer_submit_count = {0}; |
| std::atomic<int32_t> get_rowset_finish_count = {0}; |
| std::atomic<int32_t> eventually_finish_count = {0}; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| int64_t get_rowset_tablet_id = 0; |
| sp->set_call_back("get_rowset:begin", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| get_rowset_begin_count++; |
| if (get_rowset_begin_count == 1) { |
| { |
| get_rowset_tablet_id = *try_any_cast<int64_t*>(args[0]); |
| go_cv.wait(_lock, [&] { return txn_lazy_committer_submit_count == 1; }); |
| } |
| } |
| }); |
| |
| int64_t eventually_txn_id = 0; |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| txn_lazy_committer_submit_count++; |
| if (txn_lazy_committer_submit_count == 1) { |
| eventually_txn_id = *try_any_cast<int64_t*>(args[0]); |
| go_cv.notify_all(); |
| go_cv.wait(_lock, [&] { return get_rowset_finish_count == 1; }); |
| } |
| }); |
| |
| sp->set_call_back("get_rowset::advance_last_pending_txn_id", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| last_pending_txn_id_count++; |
| if (last_pending_txn_id_count == 1) { |
| auto version_pb = *try_any_cast<VersionPB*>(args[0]); |
| ASSERT_EQ(version_pb.pending_txn_ids(0), eventually_txn_id); |
| ASSERT_GT(version_pb.pending_txn_ids(0), 0); |
| } |
| }); |
| |
| sp->set_call_back("get_rowset::finish", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| get_rowset_finish_count++; |
| if (get_rowset_finish_count == 1) { |
| go_cv.notify_all(); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| eventually_finish_count++; |
| }); |
| |
| sp->enable_processing(); |
| |
| int64_t txn_id1 = 0; |
| std::thread thread1([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_315322242"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id1 = res.txn_id(); |
| ASSERT_GT(txn_id1, 0); |
| } |
| |
| { |
| for (int i = 0; i < 10; ++i) { |
| auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id1); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| std::thread thread2([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| GetRowsetRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| auto tablet_idx = req.mutable_idx(); |
| tablet_idx->set_table_id(table_id); |
| tablet_idx->set_index_id(index_id); |
| tablet_idx->set_partition_id(partition_id); |
| tablet_idx->set_tablet_id(tablet_id_base); |
| req.set_start_version(0); |
| req.set_end_version(-1); |
| req.set_cumulative_compaction_cnt(0); |
| req.set_base_compaction_cnt(0); |
| req.set_cumulative_point(2); |
| |
| GetRowsetResponse res; |
| meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| std::unique_lock<std::mutex> go_lock(go_mutex); |
| go = true; |
| go_lock.unlock(); |
| go_cv.notify_all(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| |
| ASSERT_EQ(get_rowset_begin_count, 2); |
| ASSERT_EQ(last_pending_txn_id_count, 1); |
| ASSERT_EQ(txn_lazy_committer_submit_count, 1); |
| ASSERT_EQ(get_rowset_finish_count, 1); |
| ASSERT_EQ(eventually_finish_count, 1); |
| } |
| |
| TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // meta-service recycler |
| // | | |
| // begin | |
| // | | |
| // lazy commit submit | |
| // | | |
| // dead | |
| // | | |
| // | abort_timeout_txn begin |
| // | | |
| // | advance last txn |
| // | | |
| // | |
| // | finish |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 77623430234; |
| int64_t table_id = 96545043; |
| int64_t index_id = 2381203456; |
| int64_t partition_id = 450976544; |
| std::string mock_instance = "test_instance"; |
| const std::string label = "test_label_6787230013"; |
| |
| bool commit_txn_eventullay_hit = false; |
| bool abort_timeout_txn_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) { |
| commit_txn_eventullay_hit = true; |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| }); |
| |
| TxnInfoPB txn_info_pb; |
| sp->set_call_back("abort_timeout_txn::advance_last_pending_txn_id", [&](auto&& args) { |
| abort_timeout_txn_hit = true; |
| txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_committed(txn, db_id, txn_info_pb.txn_id(), label); |
| } |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 213430076554; |
| for (int i = 0; i < 10; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| int txn_id = 0; |
| { |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, 0); |
| } |
| |
| { |
| for (int i = 0; i < 10; ++i) { |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| |
| ASSERT_EQ(recycler.init(), 0); |
| ASSERT_EQ(recycler.abort_timeout_txn(), 0); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_visible(txn, db_id, txn_id, label); |
| } |
| sleep(1); |
| ASSERT_EQ(recycler.recycle_expired_txn_label(), 0); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_not_exist(txn, db_id, txn_id, label); |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| ASSERT_TRUE(commit_txn_eventullay_hit); |
| ASSERT_TRUE(abort_timeout_txn_hit); |
| ASSERT_EQ(txn_id, txn_info_pb.txn_id()); |
| } |
| |
| TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 5252025; |
| int64_t table_id = 35201043384; |
| int64_t index_id = 256439; |
| int64_t partition_id = 732536259; |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| int64_t tablet_id = 25910248; |
| |
| { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id); |
| } |
| { |
| int tmp_txn_id = 0; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_32ae213dasg3"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| tmp_txn_id = res.txn_id(); |
| ASSERT_GT(res.txn_id(), 0); |
| } |
| { |
| auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(tmp_txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| auto* sp = SyncPoint::get_instance(); |
| sp->set_call_back("get_rowset:meta_exceed_limit", [](auto&& args) { |
| auto* byte_size = try_any_cast<size_t*>(args[0]); |
| *byte_size = std::numeric_limits<int32_t>::max(); |
| ++(*byte_size); |
| }); |
| |
| sp->enable_processing(); |
| { |
| brpc::Controller cntl; |
| GetRowsetRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| auto* tablet_idx = req.mutable_idx(); |
| tablet_idx->set_table_id(table_id); |
| tablet_idx->set_index_id(index_id); |
| tablet_idx->set_partition_id(partition_id); |
| tablet_idx->set_tablet_id(tablet_id); |
| req.set_start_version(0); |
| req.set_end_version(-1); |
| req.set_cumulative_compaction_cnt(0); |
| req.set_base_compaction_cnt(0); |
| req.set_cumulative_point(2); |
| |
| GetRowsetResponse res; |
| meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_PARSE_ERR); |
| } |
| } |
| TEST(TxnLazyCommitTest, FuzzyRandom) { |
| int counter = 0; |
| for (size_t i = 0; i < 100000; i++) { |
| if (fuzzy_random()) { |
| counter++; |
| } |
| } |
| LOG(INFO) << "fuzzy_random counter: " << counter; |
| ASSERT_GT(counter, 30000); |
| ASSERT_LT(counter, 70000); |
| } |
| |
| TEST(TxnLazyCommitTest, ForceTxnLazyCommit) { |
| int counter = 0; |
| config::enable_cloud_txn_lazy_commit_fuzzy_test = false; |
| for (size_t i = 0; i < 100000; i++) { |
| if (force_txn_lazy_commit()) { |
| counter++; |
| } |
| } |
| LOG(INFO) << "force_txn_lazy_commit counter: " << counter; |
| ASSERT_EQ(counter, 0); |
| |
| config::enable_cloud_txn_lazy_commit_fuzzy_test = true; |
| counter = 0; |
| for (size_t i = 0; i < 100000; i++) { |
| if (force_txn_lazy_commit()) { |
| counter++; |
| } |
| } |
| LOG(INFO) << "force_txn_lazy_commit counter: " << counter; |
| ASSERT_GT(counter, 30000); |
| ASSERT_LT(counter, 70000); |
| config::enable_cloud_txn_lazy_commit_fuzzy_test = false; |
| } |
| } // namespace doris::cloud |