| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/master/txn_manager.h" |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <thread> |
| #include <unordered_set> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/gutil/sysinfo.h" |
| #include "kudu/master/master.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/master/txn_manager.pb.h" |
| #include "kudu/master/txn_manager.proxy.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/transactions/transactions.pb.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/util/barrier.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::cluster::InternalMiniClusterOptions; |
| using kudu::rpc::RpcController; |
| using kudu::transactions::TxnStatePB; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::unordered_set; |
| using std::vector; |
| |
| DECLARE_bool(txn_manager_enabled); |
| DECLARE_bool(txn_manager_lazily_initialized); |
| DECLARE_int32(rpc_service_queue_length); |
| DECLARE_int64(txn_manager_status_table_range_partition_span); |
| DECLARE_uint32(txn_manager_status_table_num_replicas); |
| DECLARE_uint32(transaction_keepalive_interval_ms); |
| |
| namespace kudu { |
| namespace transactions { |
| |
| class TxnManagerTest : public KuduTest { |
| protected: |
| TxnManagerTest() |
| : master_(nullptr) { |
| // Master is necessary since it hosts the TxnManager RPC service. |
| opts_.num_masters = 1; |
| // At least one tablet server is necessary to host transaction status |
| // tablets. |
| opts_.num_tablet_servers = 1; |
| } |
| |
| void SetUp() override { |
| // Explicitly setting the flags just for better readability. |
| FLAGS_txn_manager_enabled = true; |
| FLAGS_txn_manager_lazily_initialized = true; |
| |
| // In this test, there is just a single tablet servers in the cluster. |
| FLAGS_txn_manager_status_table_num_replicas = 1; |
| |
| // Make TxnManager creating new ranges in the transaction status table more |
| // often, so it's not necessary to start too many transactions to see it |
| // switching to a new tablet of the transaction status table. |
| FLAGS_txn_manager_status_table_range_partition_span = 1024; |
| |
| // A few scenarios (e.g. LazyInitializationConcurrentCalls) might require |
| // an insanely high capacity of the service RPC queue since their workload |
| // depends on the number of CPU cores available. They can send many requests |
| // at once to stress the system while verifying important invariants. |
| FLAGS_rpc_service_queue_length = 10000; |
| |
| KuduTest::SetUp(); |
| cluster_.reset(new InternalMiniCluster(env_, opts_)); |
| ASSERT_OK(Start()); |
| } |
| |
| void TearDown() override { |
| cluster_->Shutdown(); |
| KuduTest::TearDown(); |
| } |
| |
| // Prepare the cluster to run test scenarios: start the cluster, wait for |
| // TxnManager initialization, and setup TxnManager's proxy. |
| Status Start() { |
| RETURN_NOT_OK(cluster_->Start()); |
| // InternalMiniCluster::Start() resets the set of mini-masters: need to |
| // update the shortcut pointer. |
| master_ = cluster_->mini_master()->master(); |
| if (!FLAGS_txn_manager_lazily_initialized) { |
| RETURN_NOT_OK(cluster_->mini_master()->master()->WaitForTxnManagerInit()); |
| } |
| |
| proxy_.reset(new TxnManagerServiceProxy( |
| cluster_->messenger(), |
| cluster_->mini_master()->bound_rpc_addr(), |
| cluster_->mini_master()->bound_rpc_addr().host())); |
| return Status::OK(); |
| } |
| |
| static void PrepareRpcController(RpcController* ctx) { |
| static const MonoDelta kRpcTimeout = MonoDelta::FromSeconds(30); |
| ASSERT_NE(nullptr, ctx); |
| ctx->set_timeout(kRpcTimeout); |
| } |
| |
| InternalMiniClusterOptions opts_; |
| unique_ptr<InternalMiniCluster> cluster_; |
| unique_ptr<TxnManagerServiceProxy> proxy_; |
| // A shortcut to the Master object. |
| master::Master* master_; |
| }; |
| |
| // Verify the basic functionality when TxnManager is lazily initialized. |
| TEST_F(TxnManagerTest, LazyInitialization) { |
| // The lazy initialization mode is on by default. |
| ASSERT_TRUE(FLAGS_txn_manager_lazily_initialized); |
| ASSERT_TRUE(master_->txn_manager()->is_lazily_initialized_); |
| ASSERT_FALSE(master_->txn_manager()->initialized_); |
| |
| // Timeout is not very relevant here, it only limits the amount of time to |
| // wait. By default, the lazy initialization mode is on, so TxnManager |
| // should not be initialized. |
| { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(1); |
| auto s = master_->WaitForTxnManagerInit(kTimeout); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), |
| "timed out waiting for TxnManager to initialize"); |
| } |
| |
| // Since the lazy initialization mode is on and there haven't been any calls |
| // to the TxnManager so far, the TxnManager should not be initialized. |
| ASSERT_FALSE(master_->txn_manager()->initialized_); |
| |
| // Make a call to TxnManager using a not-yet-seen txn_id. The result should |
| // be an error, but after this TxnManager should become initialized. |
| { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| GetTransactionStateRequestPB req; |
| GetTransactionStateResponsePB resp; |
| req.set_txn_id(0); |
| ASSERT_OK(proxy_->GetTransactionState(req, &resp, &ctx)); |
| ASSERT_TRUE(resp.has_error()); |
| auto s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 0 not found"); |
| } |
| |
| ASSERT_OK(master_->WaitForTxnManagerInit()); |
| ASSERT_TRUE(master_->txn_manager()->initialized_); |
| |
| // Current implementation starts assigning transaction identifiers with 0, |
| // and the very first range partition created upon initialization covers |
| // 0+ range. If making a call to TxnManager using a negative txn_id, then |
| // the result shows the fact that the corresponding tablet doesn't exist. |
| { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| AbortTransactionRequestPB req; |
| AbortTransactionResponsePB resp; |
| req.set_txn_id(-1); |
| ASSERT_OK(proxy_->AbortTransaction(req, &resp, &ctx)); |
| ASSERT_TRUE(resp.has_error()); |
| auto s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), |
| "No tablet covering the requested range partition"); |
| } |
| |
| // Shutdown and start the cluster again. This is to verify that initialization |
| // code works as expected when the transaction status table already exists. |
| cluster_->Shutdown(); |
| ASSERT_OK(Start()); |
| ASSERT_FALSE(master_->txn_manager()->initialized_); |
| |
| { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| CommitTransactionRequestPB req; |
| CommitTransactionResponsePB resp; |
| req.set_txn_id(0); |
| ASSERT_OK(proxy_->CommitTransaction(req, &resp, &ctx)); |
| ASSERT_TRUE(resp.has_error()); |
| auto s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 0 not found"); |
| } |
| ASSERT_OK(master_->WaitForTxnManagerInit()); |
| ASSERT_TRUE(master_->txn_manager()->initialized_); |
| } |
| |
| // Scenario to verify that the lazy initialization of the TxnManager works as |
| // expected in the presence of multiple concurrent calls. |
| TEST_F(TxnManagerTest, LazyInitializationConcurrentCalls) { |
| // In this functor CHECK_ is used instead of ASSERT_ because it's targeted |
| // for multi-thread use. |
| const auto txn_initiator = [this](size_t txn_num, Barrier* b) { |
| // Create its own proxy: this is important if trying to create more |
| // concurrency since a proxy serializes RPC calls. |
| TxnManagerServiceProxy p( |
| cluster_->messenger(), |
| cluster_->mini_master()->bound_rpc_addr(), |
| cluster_->mini_master()->bound_rpc_addr().host()); |
| for (auto id = 0; id < txn_num; ++id) { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| GetTransactionStateRequestPB req; |
| GetTransactionStateResponsePB resp; |
| req.set_txn_id(id); |
| b->Wait(); |
| CHECK_OK(proxy_->GetTransactionState(req, &resp, &ctx)); |
| CHECK(resp.has_error()); |
| auto s = StatusFromPB(resp.error().status()); |
| CHECK(s.IsNotFound()) << s.ToString(); |
| } |
| }; |
| |
| static constexpr int64_t kNumCallsPerThread = 8; |
| const int kNumCPUs = base::NumCPUs(); |
| const size_t kNumThreads = 2 * kNumCPUs; |
| vector<thread> threads; |
| threads.reserve(kNumThreads); |
| Barrier barrier(kNumThreads); |
| for (auto idx = 0; idx < kNumThreads; ++idx) { |
| threads.emplace_back(txn_initiator, kNumCallsPerThread, &barrier); |
| } |
| for (auto& t : threads) { |
| t.join(); |
| } |
| |
| // TxnManager should be initialized, of course. |
| ASSERT_OK(master_->WaitForTxnManagerInit()); |
| ASSERT_TRUE(master_->txn_manager()->initialized_); |
| } |
| |
| // Verify the basic functionality when TxnManager is initialized in a |
| // non-lazy manner. |
| TEST_F(TxnManagerTest, NonlazyInitialization) { |
| FLAGS_txn_manager_lazily_initialized = false; |
| cluster_.reset(new InternalMiniCluster(env_, opts_)); |
| ASSERT_OK(Start()); |
| ASSERT_FALSE(master_->txn_manager()->is_lazily_initialized_); |
| // Eventually, TxnManager should come up initialized: master initializes |
| // it on startup in case of non-lazy initialization mode. |
| ASSERT_OK(master_->WaitForTxnManagerInit()); |
| ASSERT_TRUE(master_->txn_manager()->initialized_); |
| |
| // Shutdown and start the cluster again. This is to verify that initialization |
| // code works as expected when the transaction status table already exists. |
| cluster_->Shutdown(); |
| ASSERT_OK(Start()); |
| ASSERT_FALSE(master_->txn_manager()->is_lazily_initialized_); |
| ASSERT_OK(master_->WaitForTxnManagerInit()); |
| ASSERT_TRUE(master_->txn_manager()->initialized_); |
| |
| { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| CommitTransactionRequestPB req; |
| CommitTransactionResponsePB resp; |
| req.set_txn_id(0); |
| ASSERT_OK(proxy_->CommitTransaction(req, &resp, &ctx)); |
| ASSERT_TRUE(resp.has_error()); |
| auto s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 0 not found"); |
| } |
| } |
| |
| // This is scenario calls almost all methods of the TxnManager. |
| TEST_F(TxnManagerTest, AbortedTransactionLifecycle) { |
| const auto fetch_txn_status = [this] (int64_t txn_id, TxnStatePB* state) { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| GetTransactionStateRequestPB req; |
| GetTransactionStateResponsePB resp; |
| req.set_txn_id(txn_id); |
| ASSERT_OK(proxy_->GetTransactionState(req, &resp, &ctx)); |
| ASSERT_FALSE(resp.has_error()) |
| << StatusFromPB(resp.error().status()).ToString(); |
| ASSERT_TRUE(resp.has_state()); |
| *state = resp.state(); |
| }; |
| |
| int64_t txn_id = -1; |
| { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| BeginTransactionRequestPB req; |
| BeginTransactionResponsePB resp; |
| ASSERT_OK(proxy_->BeginTransaction(req, &resp, &ctx)); |
| ASSERT_FALSE(resp.has_error()) |
| << StatusFromPB(resp.error().status()).ToString(); |
| ASSERT_TRUE(resp.has_txn_id()); |
| txn_id = resp.txn_id(); |
| ASSERT_LE(0, txn_id); |
| ASSERT_TRUE(resp.has_keepalive_millis()); |
| ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis()); |
| TxnStatePB txn_state; |
| NO_FATALS(fetch_txn_status(txn_id, &txn_state)); |
| ASSERT_EQ(TxnStatePB::OPEN, txn_state); |
| } |
| |
| { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| CommitTransactionRequestPB req; |
| CommitTransactionResponsePB resp; |
| req.set_txn_id(txn_id); |
| ASSERT_OK(proxy_->CommitTransaction(req, &resp, &ctx)); |
| ASSERT_FALSE(resp.has_error()) |
| << StatusFromPB(resp.error().status()).ToString(); |
| TxnStatePB txn_state; |
| NO_FATALS(fetch_txn_status(txn_id, &txn_state)); |
| ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_state); |
| } |
| |
| // TODO(aserbin): add call to KeepTransactionAlive() when TxnStatusManager |
| // has the functionality implemented. |
| |
| { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| AbortTransactionRequestPB req; |
| AbortTransactionResponsePB resp; |
| req.set_txn_id(txn_id); |
| ASSERT_OK(proxy_->AbortTransaction(req, &resp, &ctx)); |
| ASSERT_FALSE(resp.has_error()) |
| << StatusFromPB(resp.error().status()).ToString(); |
| TxnStatePB txn_state; |
| NO_FATALS(fetch_txn_status(txn_id, &txn_state)); |
| ASSERT_EQ(TxnStatePB::ABORTED, txn_state); |
| } |
| } |
| |
| TEST_F(TxnManagerTest, BeginManyTransactions) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| // In this functor CHECK_ is used instead of ASSERT_ because it's targeted |
| // for multi-thread use, and ASSERT_ macros do not seem working as expected |
| // in such case. |
| const auto txn_initiator = [this]( |
| size_t txn_num, |
| vector<int64_t>* txn_ids) { |
| // Create its own proxy: this is important if trying to create more |
| // concurrency since a proxy serializes RPC calls. |
| TxnManagerServiceProxy p( |
| cluster_->messenger(), |
| cluster_->mini_master()->bound_rpc_addr(), |
| cluster_->mini_master()->bound_rpc_addr().host()); |
| int64_t max_txn_id = -1; |
| for (auto id = 0; id < txn_num; ++id) { |
| BeginTransactionResponsePB resp; |
| while (true) { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| BeginTransactionRequestPB req; |
| resp.Clear(); |
| auto s = p.BeginTransaction(req, &resp, &ctx); |
| // The only acceptable non-OK status here is Status::ServiceUnavailable. |
| if (s.IsServiceUnavailable() || |
| (resp.has_error() && |
| StatusFromPB(resp.error().status()).IsServiceUnavailable())) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| continue; |
| } |
| break; |
| } |
| CHECK(!resp.has_error()) << StatusFromPB(resp.error().status()).ToString(); |
| CHECK(resp.has_txn_id()); |
| int64_t txn_id = resp.txn_id(); |
| CHECK_GT(txn_id, max_txn_id); |
| max_txn_id = txn_id; |
| if (txn_ids) { |
| txn_ids->emplace_back(txn_id); |
| } |
| CHECK(resp.has_keepalive_millis()); |
| CHECK_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis()); |
| } |
| }; |
| |
| // First, a simple sequential case: start many transactions one after another. |
| // The point here is to make sure the TxnManager: |
| // * takes care adding new range partitions to the transaction status table |
| // * transaction identifiers assigned to the newly started transactions are |
| // ** unique |
| // ** increase monotonically |
| { |
| const int64_t kNumTransactions = |
| FLAGS_txn_manager_status_table_range_partition_span * 3; |
| |
| // TxnManager is lazily initialized, so no tablets of the transaction |
| // status tablet should be created yet. |
| const auto txn_tablets_before = |
| cluster_->mini_tablet_server(0)->ListTablets(); |
| ASSERT_EQ(0, txn_tablets_before.size()); |
| |
| vector<int64_t> txn_ids; |
| txn_initiator(kNumTransactions, &txn_ids); |
| ASSERT_EQ(kNumTransactions, txn_ids.size()); |
| int64_t prev_txn_id = -1; |
| for (const auto& txn_id : txn_ids) { |
| ASSERT_GT(txn_id, prev_txn_id); |
| prev_txn_id = txn_id; |
| } |
| |
| // Check that corresponding tablets have been created for the transaction |
| // status table. |
| const auto txn_tablets_after = |
| cluster_->mini_tablet_server(0)->ListTablets(); |
| auto expected_tablets_num = 1 + |
| prev_txn_id / FLAGS_txn_manager_status_table_range_partition_span; |
| ASSERT_EQ(expected_tablets_num, txn_tablets_after.size()); |
| } |
| |
| // A more complex case: run multiple threads, each starting many transactions. |
| // Make sure the generated transaction identifiers are unique. |
| { |
| const int64_t kNumTransactionsPerThread = |
| FLAGS_txn_manager_status_table_range_partition_span * 2; |
| const int kNumCPUs = base::NumCPUs(); |
| const size_t kNumThreads = 2 * kNumCPUs; |
| vector<thread> threads; |
| threads.reserve(kNumThreads); |
| vector<vector<int64_t>> txn_ids_per_thread; |
| txn_ids_per_thread.resize(kNumThreads); |
| for (auto& slice : txn_ids_per_thread) { |
| slice.reserve(kNumTransactionsPerThread); |
| } |
| for (auto idx = 0; idx < kNumThreads; ++idx) { |
| threads.emplace_back(txn_initiator, |
| kNumTransactionsPerThread, |
| &txn_ids_per_thread[idx]); |
| } |
| for (auto& t : threads) { |
| t.join(); |
| } |
| |
| // Verify the uniqueness of the identifiers across all the threads. Instead |
| // of sort/unique, use std::unordered_set. |
| unordered_set<int64_t> txn_ids; |
| size_t total_size = 0; |
| for (const auto& slice: txn_ids_per_thread) { |
| EXPECT_EQ(kNumTransactionsPerThread, slice.size()); |
| txn_ids.insert(slice.begin(), slice.end()); |
| total_size += slice.size(); |
| } |
| ASSERT_EQ(kNumTransactionsPerThread * kNumThreads, total_size); |
| ASSERT_EQ(total_size, txn_ids.size()); |
| } |
| |
| // Now start a single transaction to get the highest assigned txn_id so far. |
| // This is to check for the number of tablets in the transaction status table |
| // after all this activity. |
| { |
| vector<int64_t> txn_ids; |
| txn_initiator(1, &txn_ids); |
| ASSERT_EQ(1, txn_ids.size()); |
| const auto txn_tablets = cluster_->mini_tablet_server(0)->ListTablets(); |
| auto expected_tablets_num = 1 + |
| txn_ids[0] / FLAGS_txn_manager_status_table_range_partition_span; |
| ASSERT_EQ(expected_tablets_num, txn_tablets.size()); |
| } |
| } |
| |
| // TODO(aserbin): add test scenarios involving a multi-master Kudu cluster |
| // (hence there will be multiple TxnManager instances) and verify |
| // how all this works in case of frequent master re-elections. |
| |
| // KeepTransactionAlive is not yet supported. |
| // TODO(aserbin): update this scenario once KeepTransactionAlive is implemented |
| TEST_F(TxnManagerTest, KeepTransactionAliveRpc) { |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| KeepTransactionAliveRequestPB req; |
| req.set_txn_id(0); |
| KeepTransactionAliveResponsePB resp; |
| ASSERT_OK(proxy_->KeepTransactionAlive(req, &resp, &ctx)); |
| ASSERT_TRUE(resp.has_error()); |
| auto s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsNotSupported()) << s.ToString(); |
| ASSERT_STR_CONTAINS( |
| s.ToString(), "Not implemented: KeepTransactionAlive is not supported yet"); |
| } |
| |
| } // namespace transactions |
| } // namespace kudu |