| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <brpc/controller.h> |
| #include <bvar/window.h> |
| #include <fmt/core.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <google/protobuf/repeated_field.h> |
| #include <gtest/gtest.h> |
| |
| #include <atomic> |
| #include <condition_variable> |
| #include <cstddef> |
| #include <cstdint> |
| #include <limits> |
| #include <memory> |
| #include <random> |
| #include <string> |
| #include <thread> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/util.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/meta_service.h" |
| #include "meta-service/meta_service_helper.h" |
| #include "meta-service/meta_service_txn.cpp" |
| #include "meta-store/keys.h" |
| #include "meta-store/mem_txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| #include "mock_resource_manager.h" |
| #include "rate-limiter/rate_limiter.h" |
| #include "recycler/recycler.h" |
| #include "resource-manager/resource_manager.h" |
| |
| using namespace doris::cloud; |
| |
| namespace doris::cloud { |
| void repair_tablet_index( |
| std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string& msg, |
| const std::string& instance_id, int64_t db_id, int64_t txn_id, |
| const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta, |
| bool is_versioned_write); |
| }; |
| |
| static doris::cloud::RecyclerThreadPoolGroup thread_group; |
| |
| int main(int argc, char** argv) { |
| const std::string conf_file = "doris_cloud.conf"; |
| if (!doris::cloud::config::init(conf_file.c_str(), true)) { |
| std::cerr << "failed to init config file, conf=" << conf_file << std::endl; |
| return -1; |
| } |
| |
| config::enable_cloud_txn_lazy_commit = true; |
| config::enable_txn_store_retry = false; |
| |
| config::label_keep_max_second = 0; |
| config::force_immediate_recycle = true; |
| |
| if (!doris::cloud::init_glog("txn_lazy_commit_test")) { |
| std::cerr << "failed to init glog" << std::endl; |
| return -1; |
| } |
| ::testing::InitGoogleTest(&argc, argv); |
| |
| auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); |
| s3_producer_pool->start(); |
| auto recycle_tablet_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); |
| recycle_tablet_pool->start(); |
| auto group_recycle_function_pool = |
| std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); |
| group_recycle_function_pool->start(); |
| thread_group = |
| RecyclerThreadPoolGroup(std::move(s3_producer_pool), std::move(recycle_tablet_pool), |
| std::move(group_recycle_function_pool)); |
| |
| return RUN_ALL_TESTS(); |
| } |
| namespace doris::cloud { |
| |
| std::unique_ptr<MetaServiceProxy> get_meta_service(std::shared_ptr<TxnKv> txn_kv, |
| bool mock_resource_mgr) { |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->remove("\x00", "\xfe"); // This is dangerous if the fdb is not correctly set |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| auto rs = mock_resource_mgr ? std::make_shared<MockResourceManager>(txn_kv) |
| : std::make_shared<ResourceManager>(txn_kv); |
| auto rl = std::make_shared<RateLimiter>(); |
| auto snapshot = std::make_shared<SnapshotManager>(txn_kv); |
| auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl, snapshot); |
| return std::make_unique<MetaServiceProxy>(std::move(meta_service)); |
| } |
| |
| static std::string next_rowset_id() { |
| static int cnt = 0; |
| return std::to_string(++cnt); |
| } |
| |
| static void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id) { |
| auto tablet = req.add_tablet_metas(); |
| tablet->set_table_id(table_id); |
| tablet->set_index_id(index_id); |
| tablet->set_partition_id(partition_id); |
| tablet->set_tablet_id(tablet_id); |
| auto schema = tablet->mutable_schema(); |
| schema->set_schema_version(0); |
| auto first_rowset = tablet->add_rs_metas(); |
| first_rowset->set_rowset_id(0); // required |
| first_rowset->set_rowset_id_v2(next_rowset_id()); |
| first_rowset->set_start_version(0); |
| first_rowset->set_end_version(1); |
| first_rowset->mutable_tablet_schema()->CopyFrom(*schema); |
| } |
| |
| static void create_tablet_without_db_id(MetaServiceProxy* meta_service, int64_t table_id, |
| int64_t index_id, int64_t partition_id, int64_t tablet_id) { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| add_tablet(req, table_id, index_id, partition_id, tablet_id); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t db_id, |
| int64_t table_id, int64_t index_id, int64_t partition_id, |
| int64_t tablet_id) { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| req.set_db_id(db_id); |
| add_tablet(req, table_id, index_id, partition_id, tablet_id); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int64_t index_id, |
| int64_t partition_id, int64_t version = -1, |
| int num_rows = 100) { |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // required |
| rowset.set_rowset_id_v2(next_rowset_id()); |
| rowset.set_tablet_id(tablet_id); |
| rowset.set_partition_id(partition_id); |
| rowset.set_index_id(index_id); |
| rowset.set_txn_id(txn_id); |
| if (version > 0) { |
| rowset.set_start_version(version); |
| rowset.set_end_version(version); |
| } |
| rowset.set_num_segments(0); |
| rowset.set_num_rows(0); |
| rowset.set_data_disk_size(0); |
| rowset.mutable_tablet_schema()->set_schema_version(0); |
| rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK |
| return rowset; |
| } |
| |
| static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t tablet_id, |
| int64_t index_id, int64_t partition_id, |
| int64_t version = -1, int num_rows = 100) { |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // required |
| rowset.set_rowset_id_v2(next_rowset_id()); |
| rowset.set_tablet_id(tablet_id); |
| rowset.set_partition_id(partition_id); |
| rowset.set_index_id(index_id); |
| rowset.set_txn_id(txn_id); |
| if (version > 0) { |
| rowset.set_start_version(version); |
| rowset.set_end_version(version); |
| } |
| rowset.set_num_segments(600); |
| for (int i = 0; i < 600; i++) { |
| auto ptr = rowset.add_segments_key_bounds(); |
| ptr->set_min_key("xxsqewqeqweeqwewqeqeq"); |
| ptr->set_max_key("dase23452rr234ewdw534523"); |
| } |
| rowset.set_num_rows(0); |
| rowset.set_data_disk_size(0); |
| rowset.mutable_tablet_schema()->set_schema_version(0); |
| rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK |
| return rowset; |
| } |
| |
| static void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, |
| CreateRowsetResponse& res) { |
| brpc::Controller cntl; |
| auto arena = res.GetArena(); |
| auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); |
| req->mutable_rowset_meta()->CopyFrom(rowset); |
| meta_service->commit_rowset(&cntl, req, &res, nullptr); |
| if (!arena) { |
| delete req; |
| } |
| } |
| |
| static std::shared_ptr<TxnKv> get_mem_txn_kv() { |
| int ret = 0; |
| // MemKv |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| if (txn_kv != nullptr) { |
| ret = txn_kv->init(); |
| [&] { ASSERT_EQ(ret, 0); }(); |
| } |
| [&] { ASSERT_NE(txn_kv.get(), nullptr); }(); |
| return txn_kv; |
| } |
| |
| static std::shared_ptr<TxnKv> get_fdb_txn_kv() { |
| int ret = 0; |
| cloud::config::fdb_cluster_file_path = "fdb.cluster"; |
| auto fdb_txn_kv = std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>()); |
| if (fdb_txn_kv != nullptr) { |
| ret = fdb_txn_kv->init(); |
| [&] { ASSERT_EQ(ret, 0); }(); |
| } |
| [&] { ASSERT_NE(fdb_txn_kv.get(), nullptr); }(); |
| return fdb_txn_kv; |
| } |
| |
| static void check_tablet_idx_db_id(std::unique_ptr<Transaction>& txn, int64_t db_id, |
| int64_t tablet_id) { |
| std::string mock_instance = "test_instance"; |
| std::string key = meta_tablet_idx_key({mock_instance, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| TabletIndexPB tablet_idx_pb; |
| tablet_idx_pb.ParseFromString(val); |
| ASSERT_EQ(tablet_idx_pb.db_id(), db_id); |
| } |
| |
| static void check_tablet_idx_without_db_id(std::unique_ptr<Transaction>& txn, int64_t tablet_id) { |
| std::string mock_instance = "test_instance"; |
| std::string key = meta_tablet_idx_key({mock_instance, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| TabletIndexPB tablet_idx_pb; |
| tablet_idx_pb.ParseFromString(val); |
| ASSERT_FALSE(tablet_idx_pb.has_db_id()); |
| } |
| |
| static void check_tmp_rowset_exist(std::unique_ptr<Transaction>& txn, int64_t tablet_id, |
| int64_t txn_id) { |
| std::string mock_instance = "test_instance"; |
| std::string key = meta_rowset_tmp_key({mock_instance, txn_id, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| } |
| |
| static void check_tmp_rowset_not_exist(std::unique_ptr<Transaction>& txn, int64_t tablet_id, |
| int64_t txn_id) { |
| std::string mock_instance = "test_instance"; |
| std::string key = meta_rowset_tmp_key({mock_instance, txn_id, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| static void check_rowset_meta_exist(std::unique_ptr<Transaction>& txn, int64_t tablet_id, |
| int64_t end_version) { |
| std::string mock_instance = "test_instance"; |
| std::string rowset_key = meta_rowset_key({mock_instance, tablet_id, end_version}); |
| std::string rowset_val; |
| ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK); |
| } |
| |
| static void check_rowset_meta_not_exist(std::unique_ptr<Transaction>& txn, int64_t tablet_id, |
| int64_t end_version) { |
| std::string mock_instance = "test_instance"; |
| std::string rowset_key = meta_rowset_key({mock_instance, tablet_id, end_version}); |
| std::string rowset_val; |
| ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| [[maybe_unused]] static void check_txn_visible(std::unique_ptr<Transaction>& txn, int64_t db_id, |
| int64_t txn_id, std::string label) { |
| std::string mock_instance = "test_instance"; |
| std::string info_key, info_val; |
| TxnInfoKeyInfo txn_info_key_info {mock_instance, db_id, txn_id}; |
| txn_info_key(txn_info_key_info, &info_key); |
| ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.ParseFromString(info_val); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_VISIBLE); |
| |
| std::string label_key, label_val; |
| txn_label_key({mock_instance, db_id, label}, &label_key); |
| ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK); |
| |
| std::string index_key, index_val; |
| index_key = txn_index_key({mock_instance, txn_id}); |
| ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK); |
| |
| std::string running_key, running_value; |
| TxnRunningKeyInfo running_key_info {mock_instance, db_id, txn_id}; |
| txn_running_key(running_key_info, &running_key); |
| ASSERT_EQ(txn->get(running_key, &running_value), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string rec_txn_key, rec_txn_val; |
| RecycleTxnKeyInfo recycle_txn_key_info {mock_instance, db_id, txn_id}; |
| recycle_txn_key(recycle_txn_key_info, &rec_txn_key); |
| ASSERT_EQ(txn->get(rec_txn_key, &rec_txn_val), TxnErrorCode::TXN_OK); |
| } |
| |
| [[maybe_unused]] static void check_txn_committed(std::unique_ptr<Transaction>& txn, int64_t db_id, |
| int64_t txn_id, std::string label) { |
| std::string mock_instance = "test_instance"; |
| std::string info_key, info_val; |
| TxnInfoKeyInfo txn_info_key_info {mock_instance, db_id, txn_id}; |
| txn_info_key(txn_info_key_info, &info_key); |
| ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.ParseFromString(info_val); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_COMMITTED); |
| |
| std::string label_key, label_val; |
| txn_label_key({mock_instance, db_id, label}, &label_key); |
| ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK); |
| |
| std::string index_key, index_val; |
| index_key = txn_index_key({mock_instance, txn_id}); |
| ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK); |
| |
| std::string running_key, running_value; |
| TxnRunningKeyInfo running_key_info {mock_instance, db_id, txn_id}; |
| txn_running_key(running_key_info, &running_key); |
| ASSERT_EQ(txn->get(running_key, &running_value), TxnErrorCode::TXN_OK); |
| |
| std::string rec_txn_key, rec_txn_val; |
| RecycleTxnKeyInfo recycle_txn_key_info {mock_instance, db_id, txn_id}; |
| recycle_txn_key(recycle_txn_key_info, &rec_txn_key); |
| ASSERT_EQ(txn->get(rec_txn_key, &rec_txn_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| static void check_txn_not_exist(std::unique_ptr<Transaction>& txn, int64_t db_id, int64_t txn_id, |
| std::string label) { |
| std::string mock_instance = "test_instance"; |
| |
| std::string info_key, info_val; |
| TxnInfoKeyInfo txn_info_key_info {mock_instance, db_id, txn_id}; |
| txn_info_key(txn_info_key_info, &info_key); |
| ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string label_key, label_val; |
| txn_label_key({mock_instance, db_id, label}, &label_key); |
| ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string index_key, index_val; |
| index_key = txn_index_key({mock_instance, txn_id}); |
| ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string running_key, running_value; |
| TxnRunningKeyInfo running_key_info {mock_instance, db_id, txn_id}; |
| txn_running_key(running_key_info, &running_key); |
| ASSERT_EQ(txn->get(running_key, &running_value), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string rec_txn_key, rec_txn_val; |
| RecycleTxnKeyInfo recycle_txn_key_info {mock_instance, db_id, txn_id}; |
| recycle_txn_key(recycle_txn_key_info, &rec_txn_key); |
| ASSERT_EQ(txn->get(rec_txn_key, &rec_txn_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| // Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager. |
| static void create_and_refresh_instance( |
| MetaServiceProxy* service, std::string instance_id, |
| MultiVersionStatus multi_version_status = MULTI_VERSION_READ_WRITE) { |
| // write instance |
| InstanceInfoPB instance_info; |
| instance_info.set_instance_id(instance_id); |
| instance_info.set_multi_version_status(multi_version_status); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(instance_key(instance_id), instance_info.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| service->resource_mgr()->refresh_instance(instance_id); |
| ASSERT_TRUE(service->resource_mgr()->is_version_write_enabled(instance_id)); |
| } |
| |
| TEST(TxnLazyCommitTest, CreateTabletWithDbIdTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| auto meta_service = get_meta_service(txn_kv, true); |
| int db_id = 1000313313; |
| int table_id = 1001414121; |
| int index_id = 1002316473; |
| int partition_id = 10035151; |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 11414703; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, CreateTabletWithoutDbIdTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| auto meta_service = get_meta_service(txn_kv, true); |
| int table_id = 3131; |
| int index_id = 4131; |
| int partition_id = 3131; |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 42411890; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_without_db_id(txn, tablet_id); |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, RepairTabletIndexTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| auto meta_service = get_meta_service(txn_kv, true); |
| int64_t db_id = 21318977; |
| int64_t table_id = 46714; |
| int64_t index_id = 83645; |
| int64_t partition_id = 123131; |
| std::string label = "test_repair_tablet_index"; |
| int64_t txn_id = 0; |
| std::string mock_instance = "test_instance"; |
| |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| |
| TxnInfoPB txn_info; |
| txn_info.set_db_id(db_id); |
| txn_info.set_label(label); |
| txn_info.add_table_ids(table_id); |
| txn_info.set_timeout_ms(6000); |
| req.mutable_txn_info()->CopyFrom(txn_info); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| } |
| |
| ASSERT_GT(txn_id, 0); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 87134121; |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| tmp_rowsets_meta.push_back(std::make_pair("mock_tmp_rowset_key", tmp_rowset)); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_without_db_id(txn, tablet_id); |
| } |
| } |
| |
| MetaServiceCode code = MetaServiceCode::UNDEFINED_ERR; |
| std::string msg; |
| repair_tablet_index(txn_kv, code, msg, mock_instance, db_id, txn_id, tmp_rowsets_meta, false); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| } |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 3131397513; |
| int64_t table_id = 3213867; |
| int64_t index_id = 123513; |
| int64_t partition_id = 113123; |
| bool commit_txn_eventually_finish_hit = false; |
| bool last_pending_txn_id_hit = false; |
| int repair_tablet_idx_count = 0; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx", [&](auto&& args) { |
| bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]); |
| LOG(INFO) << "need_repair_tablet_idx:" << need_repair_tablet_idx; |
| if (repair_tablet_idx_count == 0) { |
| ASSERT_TRUE(need_repair_tablet_idx); |
| repair_tablet_idx_count++; |
| } else { |
| ASSERT_FALSE(need_repair_tablet_idx); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&& args) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, 0); |
| last_pending_txn_id_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_commit_txn_eventually"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1103; |
| for (int i = 0; i < 2999; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| for (int i = 0; i < 2999; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| req.add_base_tablet_ids(tablet_id); |
| } |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_GE(repair_tablet_idx_count, 0); |
| ASSERT_TRUE(last_pending_txn_id_hit); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 2999; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 55432134; |
| int64_t table_id = 326843; |
| int64_t index_id = 34345678; |
| int64_t partition_id = 212343; |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_commit_txn_eventually"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 372323; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| AbortTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_reason("test"); |
| AbortTxnResponse res; |
| meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().status(), TxnStatusPB::TXN_STATUS_ABORTED); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| req.add_base_tablet_ids(tablet_id); |
| } |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::TXN_ALREADY_ABORTED); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_not_exist(txn, tablet_id, 2); |
| } |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 7651485414; |
| int64_t table_id = 31478952181; |
| int64_t index_id = 89894141; |
| int64_t partition_id = 1241241; |
| bool commit_txn_eventually_finish_hit = false; |
| bool last_pending_txn_id_hit = false; |
| int repair_tablet_idx_count = 0; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx", [&](auto&& args) { |
| bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]); |
| ASSERT_FALSE(need_repair_tablet_idx); |
| repair_tablet_idx_count++; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&& args) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, 0); |
| last_pending_txn_id_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_commit_txn_eventually2"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 3131124; |
| for (int i = 0; i < 2048; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_GE(repair_tablet_idx_count, 0); |
| ASSERT_TRUE(last_pending_txn_id_hit); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 2048; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) { |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 7651485414; |
| int64_t table_id = 31478952181; |
| int64_t index_id = 89894141; |
| int64_t partition_id = 1241241; |
| bool commit_txn_eventually_finish_hit = false; |
| bool last_pending_txn_id_hit = false; |
| int repair_tablet_idx_count = 0; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx", [&](auto&& args) { |
| bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]); |
| ASSERT_FALSE(need_repair_tablet_idx); |
| repair_tablet_idx_count++; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&& args) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, 0); |
| last_pending_txn_id_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, false); |
| std::string instance_id = "test_instance"; |
| std::string cloud_unique_id = "1:test_instance:1"; |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| SyncPoint::get_instance()->enable_processing(); |
| create_and_refresh_instance(meta_service.get(), instance_id); |
| |
| int txn_id = 0; |
| { |
| // Begin transaction |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_commit_txn_eventually2"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| } |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 3131124; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_GE(repair_tablet_idx_count, 0); |
| ASSERT_TRUE(last_pending_txn_id_hit); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| |
| // Check versioned rowset meta exists. |
| std::string rowset_key = versioned::meta_rowset_load_key({mock_instance, tablet_id, 2}); |
| doris::RowsetMetaCloudPB rowset_val; |
| Versionstamp versionstamp; |
| ASSERT_EQ(versioned::document_get(txn.get(), rowset_key, &rowset_val, &versionstamp), |
| TxnErrorCode::TXN_OK); |
| } |
| } |
| |
| { |
| // Get the partition version |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.set_partition_id(partition_id); |
| |
| GetVersionResponse resp; |
| meta_service->get_version(&ctrl, &req, &resp, nullptr); |
| |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) |
| << " status is " << resp.status().DebugString(); |
| ASSERT_GT(resp.version(), 1); |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventuallyWithoutDbIdTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 3131397513; |
| int64_t table_id = 3213867; |
| int64_t index_id = 123513; |
| int64_t partition_id = 113123; |
| bool commit_txn_eventually_finish_hit = false; |
| bool last_pending_txn_id_hit = false; |
| int repair_tablet_idx_count = 0; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx", [&](auto&& args) { |
| bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]); |
| if (repair_tablet_idx_count == 0) { |
| ASSERT_TRUE(need_repair_tablet_idx); |
| repair_tablet_idx_count++; |
| } else { |
| ASSERT_FALSE(need_repair_tablet_idx); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&& args) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, 0); |
| last_pending_txn_id_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, false); |
| std::string instance_id = "test_instance"; |
| std::string cloud_unique_id = "1:test_instance:1"; |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| SyncPoint::get_instance()->enable_processing(); |
| create_and_refresh_instance(meta_service.get(), instance_id, MULTI_VERSION_WRITE_ONLY); |
| |
| int64_t txn_id = 0; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_commit_txn_eventually"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| } |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1103; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| req.add_base_tablet_ids(tablet_id); |
| } |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_GE(repair_tablet_idx_count, 0); |
| ASSERT_TRUE(last_pending_txn_id_hit); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| |
| { |
| std::string mock_instance = "test_instance"; |
| std::string key = versioned::tablet_index_key({mock_instance, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| TabletIndexPB tablet_idx_pb; |
| tablet_idx_pb.ParseFromString(val); |
| ASSERT_EQ(tablet_idx_pb.db_id(), db_id); |
| } |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 983153141; |
| int64_t table_id = 71419093; |
| int64_t index_id = 80124; |
| int64_t partition_id = 8989313; |
| bool commit_txn_immediatelly_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_immediately::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_immediatelly_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_commit_txn_immediatelly"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 31311414; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_TRUE(commit_txn_immediatelly_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_without_db_id(txn, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, NotFallThroughCommitTxnEventuallyTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 415413556; |
| int64_t table_id = 34184234; |
| int64_t index_id = 9059444; |
| int64_t partition_id = 8934984; |
| bool commit_txn_immediatelly_hit = false; |
| bool commit_txn_eventually_finish_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_immediately::before_commit", [&](auto&& args) { |
| TxnErrorCode* err = try_any_cast<TxnErrorCode*>(args[0]); |
| *err = TxnErrorCode::TXN_BYTES_TOO_LARGE; |
| |
| MetaServiceCode* code = try_any_cast<MetaServiceCode*>(args[1]); |
| *code = cast_as<ErrCategory::COMMIT>(*err); |
| |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| commit_txn_immediatelly_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_not_fallthrough_commit_txn_eventually"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 783426908; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); |
| ASSERT_TRUE(commit_txn_immediatelly_hit); |
| ASSERT_FALSE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_not_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, FallThroughCommitTxnEventuallyTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 903437831; |
| int64_t table_id = 13845693; |
| int64_t index_id = 2366023843; |
| int64_t partition_id = 210486436; |
| bool commit_txn_immediatelly_hit = false; |
| bool commit_txn_eventually_finish_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_immediately::before_commit", [&](auto&& args) { |
| TxnErrorCode* err = try_any_cast<TxnErrorCode*>(args[0]); |
| *err = TxnErrorCode::TXN_BYTES_TOO_LARGE; |
| |
| MetaServiceCode* code = try_any_cast<MetaServiceCode*>(args[1]); |
| *code = cast_as<ErrCategory::COMMIT>(*err); |
| |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| commit_txn_immediatelly_hit = true; |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_fallthrough_commit_txn_eventually"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1908462; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.ShortDebugString(); |
| ASSERT_TRUE(commit_txn_immediatelly_hit); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < config::txn_lazy_commit_rowsets_thresold; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase1Test) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // thread1 thread2 |
| // | | |
| // commit_txn_eventually begin commit_txn_eventually begin |
| // | | |
| // lazy commit wait | |
| // | | |
| // | advance last txn |
| // | | |
| // | finish |
| // | | |
| // finish | |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 315477; |
| int64_t table_id = 31752134; |
| int64_t index_id = 3458532; |
| int64_t partition_id = 26328765; |
| |
| std::mutex go_mutex; |
| std::condition_variable go_cv; |
| |
| bool go = false; |
| std::atomic<int32_t> commit_txn_eventually_begin_count = {0}; |
| std::atomic<int32_t> last_pending_txn_id_count = {0}; |
| std::atomic<int32_t> txn_lazy_committer_wait_count = {0}; |
| std::atomic<int32_t> finish_count = {0}; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| int64_t first_txn_id = 0; |
| sp->set_call_back("commit_txn_eventually:begin", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| commit_txn_eventually_begin_count++; |
| if (commit_txn_eventually_begin_count == 1) { |
| first_txn_id = *try_any_cast<int64_t*>(args[0]); |
| } |
| if (commit_txn_eventually_begin_count == 2) { |
| { |
| go_cv.wait(_lock, [&] { return txn_lazy_committer_wait_count == 1; }); |
| } |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::advance_last_pending_txn_id", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| last_pending_txn_id_count++; |
| if (last_pending_txn_id_count == 1) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, first_txn_id); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| txn_lazy_committer_wait_count++; |
| if (txn_lazy_committer_wait_count == 1) { |
| go_cv.notify_all(); |
| go_cv.wait(_lock, [&] { return finish_count == 1; }); |
| } |
| }); |
| |
| int64_t second_txn_id = 0; |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| int64_t txn_id = *try_any_cast<int64_t*>(args[1]); |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| finish_count++; |
| if (finish_count == 1) { |
| second_txn_id = txn_id; |
| ASSERT_NE(second_txn_id, first_txn_id); |
| go_cv.notify_all(); |
| } |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1908462; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| |
| int64_t txn_id1 = 0; |
| std::thread thread1([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually2313"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id1 = res.txn_id(); |
| ASSERT_GT(txn_id1, 0); |
| } |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = |
| create_rowset(txn_id1, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id1); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| int64_t txn_id2 = 0; |
| std::thread thread2([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually234142"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id2 = res.txn_id(); |
| ASSERT_GT(txn_id2, 0); |
| } |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = |
| create_rowset(txn_id2, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id2); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| std::unique_lock<std::mutex> go_lock(go_mutex); |
| go = true; |
| go_lock.unlock(); |
| go_cv.notify_all(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| ASSERT_EQ(commit_txn_eventually_begin_count, 3); |
| ASSERT_EQ(last_pending_txn_id_count, 1); |
| ASSERT_EQ(finish_count, 2); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| std::string key = meta_tablet_idx_key({mock_instance, tablet_id}); |
| std::string val; |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| TabletIndexPB tablet_idx_pb; |
| tablet_idx_pb.ParseFromString(val); |
| ASSERT_EQ(tablet_idx_pb.db_id(), db_id); |
| |
| std::string tmp_rowset_key = |
| meta_rowset_tmp_key({mock_instance, first_txn_id, tablet_id}); |
| std::string tmp_rowset_val; |
| ASSERT_EQ(txn->get(tmp_rowset_key, &tmp_rowset_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| std::string rowset_key = meta_rowset_key({mock_instance, tablet_id, 2}); |
| std::string rowset_val; |
| ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK); |
| |
| tmp_rowset_key = meta_rowset_tmp_key({mock_instance, second_txn_id, tablet_id}); |
| tmp_rowset_val.clear(); |
| ASSERT_EQ(txn->get(tmp_rowset_key, &tmp_rowset_val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| |
| rowset_key = meta_rowset_key({mock_instance, tablet_id, 3}); |
| rowset_val.clear(); |
| ASSERT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK); |
| } |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase2Test) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // thread1 thread2 |
| // | | |
| // commit_txn_eventually begin commit_txn_immediately begin |
| // | | |
| // lazy commit wait | |
| // | | |
| // | advance last txn |
| // | | |
| // | finish |
| // | | |
| // finish | |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 134179141; |
| int64_t table_id = 3243234; |
| int64_t index_id = 8098324; |
| int64_t partition_id = 32895321; |
| |
| std::mutex go_mutex; |
| std::condition_variable go_cv; |
| bool go = false; |
| |
| std::atomic<int32_t> commit_txn_immediately_begin_count = {0}; |
| std::atomic<int32_t> last_pending_txn_id_count = {0}; |
| std::atomic<int32_t> txn_lazy_committer_wait_count = {0}; |
| std::atomic<int32_t> immediately_finish_count = {0}; |
| std::atomic<int32_t> eventually_finish_count = {0}; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| int64_t first_txn_id = 0; |
| sp->set_call_back("commit_txn_immediately:begin", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| commit_txn_immediately_begin_count++; |
| if (commit_txn_immediately_begin_count == 1) { |
| { |
| first_txn_id = *try_any_cast<int64_t*>(args[0]); |
| go_cv.wait(_lock, [&] { return txn_lazy_committer_wait_count == 1; }); |
| } |
| } |
| }); |
| |
| int64_t second_txn_id = 0; |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| txn_lazy_committer_wait_count++; |
| if (txn_lazy_committer_wait_count == 1) { |
| int64_t txn_id = *try_any_cast<int64_t*>(args[0]); |
| second_txn_id = txn_id; |
| go_cv.notify_all(); |
| go_cv.wait(_lock, [&] { return immediately_finish_count == 1; }); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_immediately::advance_last_pending_txn_id", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| last_pending_txn_id_count++; |
| if (last_pending_txn_id_count == 1) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, second_txn_id); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_immediately::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| immediately_finish_count++; |
| if (immediately_finish_count == 1) { |
| go_cv.notify_all(); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| eventually_finish_count++; |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1908462; |
| for (int i = 0; i < 1999; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| |
| int64_t txn_id1 = 0; |
| std::thread thread1([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually3441"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id1 = res.txn_id(); |
| ASSERT_GT(txn_id1, 0); |
| } |
| { |
| for (int i = 0; i < 1999; ++i) { |
| auto tmp_rowset = |
| create_rowset(txn_id1, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id1); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| int64_t txn_id2 = 0; |
| std::thread thread2([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually3245232"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id2 = res.txn_id(); |
| ASSERT_GT(txn_id2, 0); |
| } |
| { |
| for (int i = 0; i < 1999; ++i) { |
| auto tmp_rowset = |
| create_rowset(txn_id2, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id2); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(false); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| std::unique_lock<std::mutex> go_lock(go_mutex); |
| go = true; |
| go_lock.unlock(); |
| go_cv.notify_all(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| ASSERT_EQ(commit_txn_immediately_begin_count, 2); |
| ASSERT_EQ(last_pending_txn_id_count, 1); |
| ASSERT_EQ(immediately_finish_count, 1); |
| ASSERT_EQ(eventually_finish_count, 1); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 1999; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| |
| check_tmp_rowset_not_exist(txn, tablet_id, first_txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| |
| check_tmp_rowset_not_exist(txn, tablet_id, second_txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 3); |
| } |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase3Test) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // thread1 thread2 |
| // | | |
| // commit_txn_eventually begin get rowset begin |
| // | | |
| // lazy commit wait | |
| // | | |
| // | advance last txn |
| // | | |
| // | finish |
| // | | |
| // finish | |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 6544345; |
| int64_t table_id = 32431068334; |
| int64_t index_id = 132433; |
| int64_t partition_id = 956120248; |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 19201262; |
| |
| for (int i = 0; i < 10001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| { |
| int tmp_txn_id = 0; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_32ae213dasf2"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| tmp_txn_id = res.txn_id(); |
| ASSERT_GT(res.txn_id(), 0); |
| } |
| |
| { |
| for (int i = 0; i < 10001; ++i) { |
| auto tmp_rowset = |
| create_rowset(tmp_txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(tmp_txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| std::mutex go_mutex; |
| std::condition_variable go_cv; |
| bool go = false; |
| |
| std::atomic<int32_t> get_rowset_begin_count = {0}; |
| std::atomic<int32_t> last_pending_txn_id_count = {0}; |
| std::atomic<int32_t> txn_lazy_committer_submit_count = {0}; |
| std::atomic<int32_t> get_rowset_finish_count = {0}; |
| std::atomic<int32_t> eventually_finish_count = {0}; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| int64_t get_rowset_tablet_id = 0; |
| sp->set_call_back("get_rowset:begin", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| get_rowset_begin_count++; |
| if (get_rowset_begin_count == 1) { |
| { |
| get_rowset_tablet_id = *try_any_cast<int64_t*>(args[0]); |
| go_cv.wait(_lock, [&] { return txn_lazy_committer_submit_count == 1; }); |
| } |
| } |
| }); |
| |
| int64_t eventually_txn_id = 0; |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| txn_lazy_committer_submit_count++; |
| if (txn_lazy_committer_submit_count == 1) { |
| eventually_txn_id = *try_any_cast<int64_t*>(args[0]); |
| go_cv.notify_all(); |
| go_cv.wait(_lock, [&] { return get_rowset_finish_count == 1; }); |
| } |
| }); |
| |
| sp->set_call_back("get_rowset::advance_last_pending_txn_id", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| last_pending_txn_id_count++; |
| if (last_pending_txn_id_count == 1) { |
| auto txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(txn_id, eventually_txn_id); |
| } |
| }); |
| |
| sp->set_call_back("get_rowset::finish", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| get_rowset_finish_count++; |
| if (get_rowset_finish_count == 1) { |
| go_cv.notify_all(); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| eventually_finish_count++; |
| }); |
| |
| sp->enable_processing(); |
| |
| int64_t txn_id1 = 0; |
| std::thread thread1([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_315322242"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id1 = res.txn_id(); |
| ASSERT_GT(txn_id1, 0); |
| } |
| |
| { |
| for (int i = 0; i < 10001; ++i) { |
| auto tmp_rowset = |
| create_rowset(txn_id1, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id1); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| std::thread thread2([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| GetRowsetRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| auto tablet_idx = req.mutable_idx(); |
| tablet_idx->set_table_id(table_id); |
| tablet_idx->set_index_id(index_id); |
| tablet_idx->set_partition_id(partition_id); |
| tablet_idx->set_tablet_id(tablet_id_base); |
| req.set_start_version(0); |
| req.set_end_version(-1); |
| req.set_cumulative_compaction_cnt(0); |
| req.set_base_compaction_cnt(0); |
| req.set_cumulative_point(2); |
| |
| GetRowsetResponse res; |
| meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| std::unique_lock<std::mutex> go_lock(go_mutex); |
| go = true; |
| go_lock.unlock(); |
| go_cv.notify_all(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| |
| ASSERT_EQ(get_rowset_begin_count, 2); |
| ASSERT_EQ(last_pending_txn_id_count, 1); |
| ASSERT_EQ(txn_lazy_committer_submit_count, 1); |
| ASSERT_EQ(get_rowset_finish_count, 1); |
| ASSERT_EQ(eventually_finish_count, 1); |
| } |
| |
| TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // meta-service recycler |
| // | | |
| // begin | |
| // | | |
| // lazy commit submit | |
| // | | |
| // dead | |
| // | | |
| // | abort_timeout_txn begin |
| // | | |
| // | advance last txn |
| // | | |
| // | |
| // | finish |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 77623430234; |
| int64_t table_id = 96545043; |
| int64_t index_id = 2381203456; |
| int64_t partition_id = 450976544; |
| std::string mock_instance = "test_instance"; |
| const std::string label = "test_label_6787230013"; |
| |
| bool commit_txn_eventullay_hit = false; |
| bool abort_timeout_txn_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) { |
| commit_txn_eventullay_hit = true; |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| }); |
| |
| TxnInfoPB txn_info_pb; |
| sp->set_call_back("abort_timeout_txn::advance_last_pending_txn_id", [&](auto&& args) { |
| abort_timeout_txn_hit = true; |
| txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_committed(txn, db_id, txn_info_pb.txn_id(), label); |
| } |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 213430076554; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| int txn_id = 0; |
| { |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, 0); |
| } |
| |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| |
| ASSERT_EQ(recycler.init(), 0); |
| ASSERT_EQ(recycler.abort_timeout_txn(), 0); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_visible(txn, db_id, txn_id, label); |
| } |
| sleep(1); |
| ASSERT_EQ(recycler.recycle_expired_txn_label(), 0); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_not_exist(txn, db_id, txn_id, label); |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| ASSERT_TRUE(commit_txn_eventullay_hit); |
| ASSERT_TRUE(abort_timeout_txn_hit); |
| ASSERT_EQ(txn_id, txn_info_pb.txn_id()); |
| } |
| |
| TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase5Test) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // thread1 thread2 |
| // | | |
| // commit_txn_eventually begin commit_txn_with_sub_txn begin |
| // | | |
| // lazy commit wait | |
| // | | |
| // | advance last txn |
| // | | |
| // | finish |
| // | | |
| // finish | |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 134179142; |
| int64_t table_id = 3243264; |
| int64_t index_id = 8098394; |
| int64_t partition_id = 32895361; |
| |
| std::mutex go_mutex; |
| std::condition_variable go_cv; |
| bool go = false; |
| |
| std::atomic<int32_t> commit_txn_immediately_begin_count = {0}; |
| std::atomic<int32_t> last_pending_txn_id_count = {0}; |
| std::atomic<int32_t> txn_lazy_committer_wait_count = {0}; |
| std::atomic<int32_t> immediately_finish_count = {0}; |
| std::atomic<int32_t> eventually_finish_count = {0}; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| int64_t first_txn_id = 0; |
| sp->set_call_back("commit_txn_with_sub_txn:begin", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| commit_txn_immediately_begin_count++; |
| if (commit_txn_immediately_begin_count == 1) { |
| { |
| first_txn_id = *try_any_cast<int64_t*>(args[0]); |
| go_cv.wait(_lock, [&] { return txn_lazy_committer_wait_count == 1; }); |
| go_cv.notify_all(); |
| } |
| } |
| }); |
| |
| int64_t second_txn_id = 0; |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| txn_lazy_committer_wait_count++; |
| if (txn_lazy_committer_wait_count == 1) { |
| int64_t txn_id = *try_any_cast<int64_t*>(args[0]); |
| second_txn_id = txn_id; |
| go_cv.notify_all(); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_with_sub_txn::advance_last_pending_txn_id", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| last_pending_txn_id_count++; |
| if (last_pending_txn_id_count == 1) { |
| int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_pending_txn_id, second_txn_id); |
| } |
| go_cv.notify_all(); |
| }); |
| |
| sp->set_call_back("commit_txn_with_sub_txn::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| immediately_finish_count++; |
| if (immediately_finish_count == 1) { |
| go_cv.notify_all(); |
| } |
| }); |
| |
| sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) { |
| MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| eventually_finish_count++; |
| }); |
| |
| sp->set_call_back("TxnLazyCommitter::commit", [&](auto&& args) { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return last_pending_txn_id_count == 1; }); |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1908562; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| |
| int64_t txn_id1 = 0; |
| std::thread thread1([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually3442"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id1 = res.txn_id(); |
| ASSERT_GT(txn_id1, 0); |
| } |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = |
| create_rowset(txn_id1, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id1); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| int64_t txn_id2 = 0; |
| std::thread thread2([&] { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually5"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id2 = res.txn_id(); |
| ASSERT_GT(txn_id2, 0); |
| } |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = |
| create_rowset(txn_id2, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| int64_t sub_txn_id1 = txn_id2; |
| |
| // begin sub_txn1 |
| int64_t sub_txn_id2 = -1; |
| { |
| brpc::Controller cntl; |
| BeginSubTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id2); |
| req.set_sub_txn_num(0); |
| req.set_db_id(db_id); |
| req.set_label("test_label_concurrent_commit_txn_eventually5_sub"); |
| req.mutable_table_ids()->Add(table_id); |
| req.mutable_table_ids()->Add(table_id); |
| BeginSubTxnResponse res; |
| meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().table_ids().size(), 2); |
| ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1); |
| ASSERT_TRUE(res.has_sub_txn_id()); |
| sub_txn_id2 = res.sub_txn_id(); |
| ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); |
| } |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = |
| create_rowset(sub_txn_id2, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id2); |
| req.set_is_txn_load(true); |
| |
| SubTxnInfo sub_txn_info1; |
| sub_txn_info1.set_sub_txn_id(sub_txn_id1); |
| sub_txn_info1.set_table_id(table_id); |
| for (int i = 0; i < 2001; ++i) { |
| sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base + i); |
| } |
| |
| SubTxnInfo sub_txn_info2; |
| sub_txn_info2.set_sub_txn_id(sub_txn_id2); |
| sub_txn_info2.set_table_id(table_id); |
| for (int i = 0; i < 2001; ++i) { |
| sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base + i); |
| } |
| |
| req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1)); |
| req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2)); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| }); |
| |
| std::unique_lock<std::mutex> go_lock(go_mutex); |
| go = true; |
| go_lock.unlock(); |
| go_cv.notify_all(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| ASSERT_EQ(commit_txn_immediately_begin_count, 2); |
| ASSERT_EQ(last_pending_txn_id_count, 1); |
| ASSERT_EQ(immediately_finish_count, 1); |
| ASSERT_EQ(eventually_finish_count, 1); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| |
| check_tmp_rowset_not_exist(txn, tablet_id, first_txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| |
| check_tmp_rowset_not_exist(txn, tablet_id, second_txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 4); |
| } |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 5252025; |
| int64_t table_id = 35201043384; |
| int64_t index_id = 256439; |
| int64_t partition_id = 732536259; |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| int64_t tablet_id = 25910248; |
| |
| { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id); |
| } |
| { |
| int tmp_txn_id = 0; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_32ae213dasg3"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| tmp_txn_id = res.txn_id(); |
| ASSERT_GT(res.txn_id(), 0); |
| } |
| { |
| auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(tmp_txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| auto* sp = SyncPoint::get_instance(); |
| sp->set_call_back("get_rowset:meta_exceed_limit", [](auto&& args) { |
| auto* byte_size = try_any_cast<size_t*>(args[0]); |
| *byte_size = std::numeric_limits<int32_t>::max(); |
| ++(*byte_size); |
| }); |
| |
| sp->enable_processing(); |
| { |
| brpc::Controller cntl; |
| GetRowsetRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| auto* tablet_idx = req.mutable_idx(); |
| tablet_idx->set_table_id(table_id); |
| tablet_idx->set_index_id(index_id); |
| tablet_idx->set_partition_id(partition_id); |
| tablet_idx->set_tablet_id(tablet_id); |
| req.set_start_version(0); |
| req.set_end_version(-1); |
| req.set_cumulative_compaction_cnt(0); |
| req.set_base_compaction_cnt(0); |
| req.set_cumulative_point(2); |
| |
| GetRowsetResponse res; |
| meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_PARSE_ERR); |
| } |
| } |
| |
| TEST(TxnLazyCommitTest, ForceTxnLazyCommit) { |
| int counter = 0; |
| config::cloud_txn_lazy_commit_fuzzy_possibility = 0; |
| for (size_t i = 0; i < 100000; i++) { |
| if (force_txn_lazy_commit()) { |
| counter++; |
| } |
| } |
| LOG(INFO) << "force_txn_lazy_commit counter: " << counter; |
| ASSERT_EQ(counter, 0); |
| |
| config::cloud_txn_lazy_commit_fuzzy_possibility = 50; |
| counter = 0; |
| for (size_t i = 0; i < 100000; i++) { |
| if (force_txn_lazy_commit()) { |
| counter++; |
| } |
| } |
| LOG(INFO) << "force_txn_lazy_commit counter: " << counter; |
| ASSERT_GT(counter, 30000); |
| ASSERT_LT(counter, 70000); |
| config::cloud_txn_lazy_commit_fuzzy_possibility = 0; |
| } |
| |
| TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase1) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // meta-service recycler |
| // | | |
| // begin | |
| // | | |
| // prepare/commit rowset | |
| // | | |
| // lazy commit submit | |
| // | | |
| // dead | |
| // | | |
| // | recyle_tmp_rowsets |
| // | | |
| // | abort_timeout_txn |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 988032131; |
| int64_t table_id = 5145043; |
| int64_t index_id = 273456; |
| int64_t partition_id = 44576544; |
| std::string mock_instance = "test_instance"; |
| const std::string label = "test_label_67ae2q1231"; |
| |
| bool commit_txn_eventullay_hit = false; |
| bool is_txn_finished_hit = false; |
| bool abort_timeout_txn_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) { |
| commit_txn_eventullay_hit = true; |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| }); |
| |
| TxnInfoPB txn_info_pb; |
| sp->set_call_back("is_txn_finished::txn_not_finished", [&](auto&& args) { |
| is_txn_finished_hit = true; |
| txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_committed(txn, db_id, txn_info_pb.txn_id(), label); |
| } |
| }); |
| |
| txn_info_pb.Clear(); |
| sp->set_call_back("abort_timeout_txn::advance_last_pending_txn_id", [&](auto&& args) { |
| abort_timeout_txn_hit = true; |
| txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_committed(txn, db_id, txn_info_pb.txn_id(), label); |
| } |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 2313324; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| int txn_id = 0; |
| { |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, 0); |
| } |
| |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| |
| ASSERT_EQ(recycler.init(), 0); |
| ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0); |
| ASSERT_TRUE(is_txn_finished_hit); |
| |
| ASSERT_EQ(recycler.abort_timeout_txn(), 0); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_visible(txn, db_id, txn_id, label); |
| } |
| sleep(1); |
| ASSERT_EQ(recycler.recycle_expired_txn_label(), 0); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_not_exist(txn, db_id, txn_id, label); |
| } |
| ASSERT_TRUE(abort_timeout_txn_hit); |
| |
| ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| ASSERT_TRUE(commit_txn_eventullay_hit); |
| } |
| |
| TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase2) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // meta-service recycler |
| // | | |
| // begin txn | |
| // | | |
| // abort txn | |
| // | | |
| // | recyle_tmp_rowsets |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 41414; |
| int64_t table_id = 5454146; |
| int64_t index_id = 27656; |
| int64_t partition_id = 4423544; |
| std::string mock_instance = "test_instance"; |
| const std::string label = "test_label_67ae2q1231"; |
| |
| bool txn_has_been_aborted = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| TxnInfoPB txn_info_pb; |
| sp->set_call_back("is_txn_finished::txn_has_been_aborted", [&](auto&& args) { |
| txn_has_been_aborted = true; |
| txn_info_pb = *try_any_cast<TxnInfoPB*>(args[0]); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_ABORTED); |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 2313324; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| int txn_id = 0; |
| { |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, 0); |
| } |
| |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| { |
| brpc::Controller cntl; |
| AbortTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| ASSERT_GT(txn_id, 0); |
| req.set_txn_id(txn_id); |
| req.set_reason("test"); |
| AbortTxnResponse res; |
| meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().status(), TxnStatusPB::TXN_STATUS_ABORTED); |
| } |
| } |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| |
| ASSERT_EQ(recycler.init(), 0); |
| ASSERT_FALSE(txn_has_been_aborted); |
| ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0); |
| ASSERT_TRUE(txn_has_been_aborted); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase3) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // meta-service recycler |
| // | | |
| // begin txn | |
| // | | |
| // abort txn | |
| // | | |
| // recycle_expired_txn_label | |
| // | | |
| // | recyle_tmp_rowsets |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 42345236; |
| int64_t table_id = 3165524; |
| int64_t index_id = 89089; |
| int64_t partition_id = 434154; |
| std::string mock_instance = "test_instance"; |
| const std::string label = "test_label_67ae2q1231"; |
| |
| bool txn_has_been_recycled = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| TxnInfoPB txn_info_pb; |
| sp->set_call_back("is_txn_finished::txn_has_been_recycled", |
| [&](auto&& args) { txn_has_been_recycled = true; }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 2313324; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| int txn_id = 0; |
| { |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, 0); |
| } |
| |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| { |
| brpc::Controller cntl; |
| AbortTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| ASSERT_GT(txn_id, 0); |
| req.set_txn_id(txn_id); |
| req.set_reason("test"); |
| AbortTxnResponse res; |
| meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().status(), TxnStatusPB::TXN_STATUS_ABORTED); |
| } |
| } |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| |
| ASSERT_EQ(recycler.init(), 0); |
| |
| ASSERT_EQ(recycler.recycle_expired_txn_label(), 0); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_not_exist(txn, db_id, txn_id, label); |
| } |
| |
| ASSERT_FALSE(txn_has_been_recycled); |
| ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0); |
| ASSERT_TRUE(txn_has_been_recycled); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| TEST(TxnLazyCommitTest, RecyclePartitions) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // meta-service recycler |
| // | | |
| // begin | |
| // | | |
| // prepare/commit rowset | |
| // | | |
| // lazy commit submit | |
| // | | |
| // dead | |
| // | | |
| // drop partition | |
| // | | |
| // | recyle_partitions |
| // | | |
| // | | |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 988032131; |
| int64_t table_id = 5145043; |
| int64_t index_id = 273456; |
| int64_t partition_id = 44576544; |
| std::string mock_instance = "test_instance"; |
| const std::string label = "test_label_67ae2q1231"; |
| |
| bool commit_txn_eventullay_hit = false; |
| bool txn_not_finished_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) { |
| commit_txn_eventullay_hit = true; |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| }); |
| |
| sp->set_call_back("check_lazy_txn_finished::txn_not_finished", |
| [&](auto&& args) { txn_not_finished_hit = true; }); |
| |
| sp->set_call_back("recycle_tablet::begin", [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = 0; |
| ret->second = true; |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 2313324; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| int txn_id = 0; |
| { |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, 0); |
| } |
| |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| // drop partition |
| brpc::Controller cntl; |
| PartitionRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.add_partition_ids(partition_id); |
| PartitionResponse res; |
| meta_service->drop_partition( |
| reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| |
| ASSERT_EQ(recycler.init(), 0); |
| ASSERT_FALSE(txn_not_finished_hit); |
| ASSERT_EQ(recycler.recycle_partitions(), -1); |
| ASSERT_TRUE(txn_not_finished_hit); |
| |
| ASSERT_EQ(recycler.abort_timeout_txn(), 0); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_visible(txn, db_id, txn_id, label); |
| } |
| |
| ASSERT_EQ(recycler.recycle_partitions(), 0); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, RecycleIndexes) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // meta-service recycler |
| // | | |
| // begin | |
| // | | |
| // prepare/commit rowset | |
| // | | |
| // lazy commit submit | |
| // | | |
| // dead | |
| // | | |
| // drop indexes | |
| // | | |
| // | recycle_indexes |
| // | | |
| // | | |
| // | | |
| // | | |
| // v v |
| |
| auto txn_kv = get_mem_txn_kv(); |
| |
| int64_t db_id = 4524364; |
| int64_t table_id = 65354; |
| int64_t index_id = 658432; |
| int64_t partition_id = 76553; |
| std::string mock_instance = "test_instance"; |
| const std::string label = "test_label_67a34e2q1231"; |
| |
| bool commit_txn_eventullay_hit = false; |
| bool txn_not_finished_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| |
| sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) { |
| commit_txn_eventullay_hit = true; |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| }); |
| |
| sp->set_call_back("check_lazy_txn_finished::txn_not_finished", |
| [&](auto&& args) { txn_not_finished_hit = true; }); |
| |
| sp->set_call_back("recycle_tablet::begin", [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = 0; |
| ret->second = true; |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| // mock rowset and tablet |
| int64_t tablet_id_base = 2313324; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| int txn_id = 0; |
| { |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, 0); |
| } |
| |
| { |
| for (int i = 0; i < 2001; ++i) { |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| // drop partition |
| brpc::Controller cntl; |
| IndexRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| IndexResponse res; |
| meta_service->drop_index(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| |
| ASSERT_EQ(recycler.init(), 0); |
| ASSERT_FALSE(txn_not_finished_hit); |
| ASSERT_EQ(recycler.recycle_indexes(), -1); |
| ASSERT_TRUE(txn_not_finished_hit); |
| |
| ASSERT_EQ(recycler.abort_timeout_txn(), 0); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| check_txn_visible(txn, db_id, txn_id, label); |
| } |
| |
| ASSERT_EQ(recycler.recycle_indexes(), 0); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnEventuallyWithMultiTableTest) { |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 3132121; |
| int64_t table_id = 5452432; |
| int64_t index_id = 76763; |
| int64_t partition_id = 43432; |
| |
| int64_t table_id2 = 54524321231; |
| int64_t index_id2 = 543123; |
| int64_t partition_id2 = 214352; |
| bool commit_txn_eventually_finish_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_eventually::task->wait", [&](auto&& args) { |
| auto [code, msg] = *try_any_cast<std::pair<MetaServiceCode, std::string>*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_multi_table_commit_txn"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.add_table_ids(table_id2); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 3131124; |
| for (int i = 0; i < 2001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| for (int i = 2001; i < 4002; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id2, index_id2, partition_id2, |
| tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id2, partition_id2); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 4002; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) { |
| auto txn_kv = get_fdb_txn_kv(); |
| int64_t db_id = 14135425; |
| int64_t table_id = 31245456; |
| int64_t index_id = 434324; |
| int64_t partition_id = 3215764; |
| |
| int64_t table_id2 = 213476; |
| int64_t index_id2 = 126765; |
| int64_t partition_id2 = 214567; |
| bool commit_txn_eventually_finish_hit = false; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("commit_txn_eventually::task->wait", [&](auto&& args) { |
| auto [code, msg] = *try_any_cast<std::pair<MetaServiceCode, std::string>*>(args[0]); |
| ASSERT_EQ(code, MetaServiceCode::OK); |
| commit_txn_eventually_finish_hit = true; |
| }); |
| |
| sp->set_call_back("TxnLazyCommitTask::commit::max_rowsets_per_batch", [&](auto&& args) { |
| size_t max_rowsets_per_batch = *try_any_cast<size_t*>(args[0]); |
| size_t max_rowset_meta_size = *try_any_cast<size_t*>(args[1]); |
| LOG(INFO) << "max_rowsets_per_batch:" << max_rowsets_per_batch |
| << " max_rowset_meta_size:" << max_rowset_meta_size; |
| ASSERT_EQ(max_rowsets_per_batch, 134); |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label_with_huge_rowsetmeta_test"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.add_table_ids(table_id2); |
| txn_info_pb.set_timeout_ms(600000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 3131124; |
| for (int i = 0; i < 1001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| for (int i = 1001; i < 2002; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id2, index_id2, partition_id2, |
| tablet_id_base + i); |
| auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id2, partition_id2); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_TRUE(commit_txn_eventually_finish_hit); |
| } |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 2002; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_not_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| TEST(TxnLazyCommitTest, CommitTxnEventuallyWithSchemaChangeTest) { |
| // =========================================================================== |
| // threads concurrent execution flow: |
| // |
| // thread1 lazy thread1 thread3 |
| // | | | |
| // commit_txn_eventually begin | | |
| // | | | |
| // lazy commit wait | | |
| // | | | |
| // | make_committed_txn_visible | |
| // | | | |
| // | inject TXN_TOO_OLD fdb error | |
| // | | sc create new tablet tmp rowset |
| // | | | |
| // | | | |
| // retry commit_txn | | |
| // v v |
| auto txn_kv = get_mem_txn_kv(); |
| int64_t db_id = 4534445675; |
| int64_t table_id = 4365676543; |
| int64_t index_id = 665453237; |
| int64_t partition_id = 2136776543678; |
| |
| bool go = false; |
| std::mutex go_mutex; |
| std::condition_variable go_cv; |
| std::atomic<int32_t> make_committed_txn_visible_count = {0}; |
| std::atomic<int32_t> sc_create_tmp_rowset_count = {0}; |
| std::atomic<int32_t> sc_create_tmp_rowset_finish_count = {0}; |
| std::atomic<int32_t> tmp_rowsets_been_already_converted = {0}; |
| |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("TxnLazyCommitTask::make_committed_txn_visible::commit", [&](auto&& args) { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| if (make_committed_txn_visible_count == 0) { |
| make_committed_txn_visible_count++; |
| if (sc_create_tmp_rowset_count == 0) { |
| go_cv.wait(_lock, [&] { return sc_create_tmp_rowset_count == 1; }); |
| } |
| MetaServiceCode* code = try_any_cast<MetaServiceCode*>(args[0]); |
| *code = MetaServiceCode::KV_TXN_CONFLICT; |
| bool* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| LOG(INFO) << "inject kv error KV_TXN_CONFLICT"; |
| go_cv.notify_all(); |
| } |
| } |
| }); |
| |
| sp->set_call_back("convert_tmp_rowsets::already_been_converted", [&](auto&& args) { |
| auto version_pb = *try_any_cast<VersionPB*>(args[0]); |
| LOG(INFO) << "version_pb:" << version_pb.ShortDebugString(); |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| tmp_rowsets_been_already_converted++; |
| go_cv.notify_all(); |
| }); |
| |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(txn_kv, true); |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_sc_with_commit_txn_label"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(600000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| int64_t txn_id = res.txn_id(); |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 3131124; |
| for (int i = 0; i < 1001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| std::thread thread1([&] { |
| { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(false); |
| req.set_enable_txn_lazy_commit(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| LOG(INFO) << "thread1 finish"; |
| }); |
| |
| std::thread thread2([&] { |
| { |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| go_cv.wait(_lock, [&] { return go; }); |
| } |
| |
| { |
| std::unique_lock<std::mutex> _lock(go_mutex); |
| sc_create_tmp_rowset_count++; |
| if (make_committed_txn_visible_count == 0) { |
| go_cv.wait(_lock, [&] { return make_committed_txn_visible_count > 0; }); |
| } |
| for (int i = 0; i < 1001; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, |
| partition_id, tablet_id_base + i); |
| auto tmp_rowset = |
| create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| LOG(INFO) << "sc_create_tmp_rowset_finish_count finish"; |
| sc_create_tmp_rowset_finish_count++; |
| go_cv.notify_all(); |
| } |
| LOG(INFO) << "thread2 finish"; |
| } |
| }); |
| |
| std::unique_lock<std::mutex> go_lock(go_mutex); |
| go = true; |
| go_lock.unlock(); |
| go_cv.notify_all(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| ASSERT_GE(tmp_rowsets_been_already_converted, 1); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string mock_instance = "test_instance"; |
| for (int i = 0; i < 1001; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| check_tablet_idx_db_id(txn, db_id, tablet_id); |
| check_tmp_rowset_exist(txn, tablet_id, txn_id); |
| check_rowset_meta_exist(txn, tablet_id, 2); |
| } |
| } |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| } // namespace doris::cloud |