| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <functional> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <thread> |
| #include <unordered_map> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/external_mini_cluster-itest-base.h" |
| #include "kudu/master/txn_manager.pb.h" |
| #include "kudu/master/txn_manager.proxy.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/transactions/transactions.pb.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| using kudu::client::KuduClient; |
| using kudu::client::KuduTransaction; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::ExternalMiniClusterOptions; |
| using kudu::cluster::TabletIdAndTableName; |
| using kudu::itest::TServerDetails; |
| using kudu::itest::WaitForOpFromCurrentTerm; |
| using kudu::rpc::Messenger; |
| using kudu::rpc::RpcController; |
| using kudu::transactions::CommitTransactionRequestPB; |
| using kudu::transactions::CommitTransactionResponsePB; |
| using kudu::transactions::BeginTransactionRequestPB; |
| using kudu::transactions::BeginTransactionResponsePB; |
| using kudu::transactions::GetTransactionStateRequestPB; |
| using kudu::transactions::GetTransactionStateResponsePB; |
| using kudu::transactions::KeepTransactionAliveRequestPB; |
| using kudu::transactions::KeepTransactionAliveResponsePB; |
| using kudu::transactions::TxnManagerServiceProxy; |
| using kudu::transactions::TxnStatePB; |
| using std::string; |
| using std::thread; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| class TxnStatusManagerITest : public ExternalMiniClusterITestBase { |
| public: |
| static const constexpr auto kNumTabletServers = 3; |
| static const constexpr auto kTxnTrackerIntervalMs = 100; |
| static const constexpr auto kTxnKeepaliveIntervalMs = |
| kTxnTrackerIntervalMs * 5; |
| static const constexpr auto kRaftHbIntervalMs = 50; |
| static const constexpr char* const kTxnTrackerIntervalFlag = |
| "txn_staleness_tracker_interval_ms"; |
| |
| TxnStatusManagerITest() { |
| cluster_opts_.num_tablet_servers = kNumTabletServers; |
| |
| // Enable TxnManager in Kudu masters. |
| // TODO(aserbin): remove this customization once the flag is 'on' by default |
| cluster_opts_.extra_master_flags.emplace_back( |
| "--txn_manager_enabled=true"); |
| // Scenarios based on this test fixture assume the txn status table |
| // is created at start, not on first transaction-related operation. |
| cluster_opts_.extra_master_flags.emplace_back( |
| "--txn_manager_lazily_initialized=false"); |
| |
| // To speed up test scenarios, set shorter intervals for the transaction |
| // keepalive interval ... |
| cluster_opts_.extra_tserver_flags.emplace_back(Substitute( |
| "--txn_keepalive_interval_ms=$0", kTxnKeepaliveIntervalMs)); |
| // ... and the polling interval of the txn staleness tracker. |
| cluster_opts_.extra_tserver_flags.emplace_back(Substitute( |
| "--$0=$1", kTxnTrackerIntervalFlag, kTxnTrackerIntervalMs)); |
| cluster_opts_.extra_tserver_flags.emplace_back(Substitute( |
| "--txn_staleness_tracker_disabled_interval_ms=$0", kTxnTrackerIntervalMs)); |
| // Speed up Raft re-elections in case of non-responsive leader replicas. |
| cluster_opts_.extra_tserver_flags.emplace_back(Substitute( |
| "--raft_heartbeat_interval_ms=$0", kRaftHbIntervalMs)); |
| cluster_opts_.extra_tserver_flags.emplace_back( |
| "--leader_failure_max_missed_heartbeat_periods=1.25"); |
| cluster_opts_.extra_tserver_flags.emplace_back( |
| "--enable_txn_system_client_init=true"); |
| |
| // Some of these tests rely on checking state assuming no background tasks. |
| // For simplicity, disable the background commits. |
| cluster_opts_.extra_tserver_flags.emplace_back( |
| "--txn_schedule_background_tasks=false"); |
| } |
| |
| void SetUp() override { |
| static constexpr const char* const kTxnStatusTableName = |
| "kudu_system.kudu_transactions"; |
| |
| // This assertion is an explicit statement that all scenarios in this test |
| // assume there is only one TxnManager instance in the cluster. |
| // |
| // TODO(aserbin): make this tests parameterized by the number of TxnManger |
| // instances (i.e. Kudu masters) and extend the scenarios |
| // as needed. |
| ASSERT_EQ(1, cluster_opts_.num_masters); |
| |
| NO_FATALS(StartClusterWithOpts(cluster_opts_)); |
| |
| // Wait for txn status tablets created at each of the tablet servers. |
| vector<TabletIdAndTableName> tablets_info; |
| for (auto idx = 0; idx < kNumTabletServers; ++idx) { |
| vector<TabletIdAndTableName> info; |
| ASSERT_OK(cluster_->WaitForTabletsRunning( |
| cluster_->tablet_server(idx), 1, kTimeout, &info)); |
| ASSERT_EQ(1, info.size()); |
| tablets_info.emplace_back(std::move(*info.begin())); |
| } |
| |
| const string& tablet_id = tablets_info.begin()->tablet_id; |
| for (const auto& elem : tablets_info) { |
| ASSERT_EQ(tablet_id, elem.tablet_id); |
| ASSERT_EQ(kTxnStatusTableName, elem.table_name); |
| } |
| txn_status_tablet_id_ = tablet_id; |
| |
| rpc::MessengerBuilder b("txn-keepalive"); |
| ASSERT_OK(b.Build(&messenger_)); |
| const auto rpc_addr = cluster_->master()->bound_rpc_addr(); |
| txn_manager_proxy_.reset(new TxnManagerServiceProxy( |
| messenger_, rpc_addr, rpc_addr.host())); |
| } |
| |
| Status GetTxnStatusTabletLeader(string* ts_uuid) { |
| CHECK(ts_uuid); |
| TServerDetails* leader; |
| RETURN_NOT_OK(FindTabletLeader( |
| ts_map_, txn_status_tablet_id_, kTimeout, &leader)); |
| *ts_uuid = leader->uuid(); |
| |
| return Status::OK(); |
| } |
| |
| Status BeginTransaction(int64_t* txn_id, uint32_t* keepalive_ms) { |
| CHECK(txn_id); |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| BeginTransactionRequestPB req; |
| BeginTransactionResponsePB resp; |
| RETURN_NOT_OK(txn_manager_proxy_->BeginTransaction(req, &resp, &ctx)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| CHECK(resp.has_txn_id()); |
| *txn_id = resp.txn_id(); |
| CHECK(resp.has_keepalive_millis()); |
| *keepalive_ms = resp.keepalive_millis(); |
| |
| return Status::OK(); |
| } |
| |
| Status CommitTransaction(int64_t txn_id) { |
| CHECK(txn_id); |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| CommitTransactionRequestPB req; |
| req.set_txn_id(txn_id); |
| CommitTransactionResponsePB resp; |
| RETURN_NOT_OK(txn_manager_proxy_->CommitTransaction(req, &resp, &ctx)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status GetTxnState(int64_t txn_id, TxnStatePB* state) { |
| CHECK(state); |
| CHECK(txn_manager_proxy_); |
| GetTransactionStateRequestPB req; |
| req.set_txn_id(txn_id); |
| GetTransactionStateResponsePB resp; |
| RpcController ctx; |
| PrepareRpcController(&ctx); |
| RETURN_NOT_OK(txn_manager_proxy_->GetTransactionState(req, &resp, &ctx)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| CHECK(resp.has_state()); |
| *state = resp.state(); |
| |
| return Status::OK(); |
| } |
| |
| void CheckTxnState(int64_t txn_id, TxnStatePB expected_state) { |
| TxnStatePB txn_state; |
| ASSERT_OK(GetTxnState(txn_id, &txn_state)); |
| ASSERT_EQ(expected_state, txn_state); |
| } |
| |
| protected: |
| const string& txn_tablet_id() const { |
| return txn_status_tablet_id_; |
| } |
| |
| static void PrepareRpcController(RpcController* ctx) { |
| static const MonoDelta kRpcTimeout = MonoDelta::FromSeconds(15); |
| CHECK_NOTNULL(ctx)->set_timeout(kRpcTimeout); |
| } |
| |
| static const MonoDelta kTimeout; |
| |
| ExternalMiniClusterOptions cluster_opts_; |
| string txn_status_tablet_id_; |
| |
| std::shared_ptr<rpc::Messenger> messenger_; |
| std::unique_ptr<TxnManagerServiceProxy> txn_manager_proxy_; |
| }; |
| |
| const MonoDelta TxnStatusManagerITest::kTimeout = MonoDelta::FromSeconds(15); |
| |
| // The test to verify basic functionality of the transaction tracker: it should |
| // detect transactions that haven't received KeepTransactionAlive() requests |
| // for longer than the transaction's keepalive interval and automatically abort |
| // those. |
| TEST_F(TxnStatusManagerITest, StaleTransactionsCleanup) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| // Check that the transaction staleness is detected and the stale transaction |
| // is aborted while the transaction is in OPEN state. |
| { |
| int64_t txn_id; |
| uint32_t keepalive_interval_ms; |
| // ASSERT_EVENTUALLY is needed because this test uses raw TxnManagerProxy, |
| // and we don't wait for TxnManager to initialize in SetUp(). |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms)); |
| }); |
| |
| // Wait for longer than the transaction keep-alive interval to allow |
| // the transaction tracker to detect the staleness of the transaction |
| // and abort it. An extra margin here is to avoid flakiness due to |
| // scheduling anomalies. |
| SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms)); |
| NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORT_IN_PROGRESS)); |
| } |
| |
| // Check that the transaction staleness is detected and the stale transaction |
| // is aborted while the transaction is in COMMIT_IN_PROGRESS state. |
| { |
| int64_t txn_id; |
| uint32_t keepalive_interval_ms; |
| ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms)); |
| ASSERT_OK(CommitTransaction(txn_id)); |
| NO_FATALS(CheckTxnState(txn_id, TxnStatePB::COMMIT_IN_PROGRESS)); |
| |
| // A transaction in COMMIT_IN_PROGRESS state isn't automatically aborted |
| // even if no txn keepalive messages are received. |
| SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms)); |
| NO_FATALS(CheckTxnState(txn_id, TxnStatePB::COMMIT_IN_PROGRESS)); |
| } |
| } |
| |
| // Make sure it's possible to disable and enable back the transaction |
| // staleness tracking in run-time without restarting the processes hosting |
| // TxnStatusManager instances (i.e. tablet servers). |
| TEST_F(TxnStatusManagerITest, ToggleStaleTxnTrackerInRuntime) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| // Disable txn transaction tracking in run-time. |
| for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) { |
| auto* ts = cluster_->tablet_server(i); |
| ASSERT_OK(cluster_->SetFlag(ts, kTxnTrackerIntervalFlag, "0")); |
| } |
| |
| int64_t txn_id; |
| uint32_t keepalive_interval_ms; |
| // ASSERT_EVENTUALLY is needed because this test uses raw TxnManagerProxy, |
| // and we don't wait for TxnManager to initialize in SetUp(). |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms)); |
| }); |
| ASSERT_EQ(kTxnKeepaliveIntervalMs, keepalive_interval_ms); |
| |
| // Now, with no transaction staleness tracking, the transaction should |
| // not be aborted automatically even if not sending keepalive requests. |
| SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms)); |
| NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN)); |
| |
| // Re-enable txn transaction tracking in run-time. |
| for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) { |
| auto* ts = cluster_->tablet_server(i); |
| ASSERT_OK(cluster_->SetFlag( |
| ts, kTxnTrackerIntervalFlag, std::to_string(kTxnTrackerIntervalMs))); |
| } |
| |
| // Check that the transaction staleness is detected and the stale transaction |
| // is aborted once stale transaction tracking is re-enabled. |
| SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms)); |
| NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORT_IN_PROGRESS)); |
| } |
| |
| // Verify the functionality of the stale transaction tracker in TxnStatusManager |
| // in case of replicated txn status table. The crux of this scenario is to make |
| // sure that a transaction isn't aborted if keepalive requests are sent as |
| // required even in case of Raft leader re-elections and restarts |
| // of the TxnStatusManager instances. |
| TEST_F(TxnStatusManagerITest, TxnKeepAliveMultiTxnStatusManagerInstances) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| int64_t txn_id; |
| uint32_t keepalive_interval_ms; |
| // ASSERT_EVENTUALLY is needed because this test uses raw TxnManagerProxy, |
| // and we don't wait for TxnManager to initialize in SetUp(). |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms)); |
| }); |
| |
| CountDownLatch latch(1); |
| Status keep_txn_alive_status; |
| thread txn_keepalive_sender([&] { |
| const auto period = MonoDelta::FromMilliseconds(keepalive_interval_ms / 5); |
| const auto timeout = MonoDelta::FromMilliseconds(keepalive_interval_ms / 10); |
| // Keepalive thread uses its own messenger and proxy. |
| std::shared_ptr<rpc::Messenger> m; |
| rpc::MessengerBuilder b("txn-keepalive"); |
| ASSERT_OK(b.Build(&m)); |
| const auto rpc_addr = cluster_->master()->bound_rpc_addr(); |
| TxnManagerServiceProxy txn_manager_proxy(m, rpc_addr, rpc_addr.host()); |
| do { |
| // The timeout for KeepTransactionAlive() requests should be short, |
| // otherwise this thread might miss sending the requests with proper |
| // timing. |
| RpcController ctx; |
| ctx.set_timeout(timeout); |
| KeepTransactionAliveRequestPB req; |
| req.set_txn_id(txn_id); |
| KeepTransactionAliveResponsePB resp; |
| auto s = txn_manager_proxy.KeepTransactionAlive(req, &resp, &ctx); |
| if (resp.has_error()) { |
| if (resp.error().has_status()) { |
| keep_txn_alive_status = StatusFromPB(resp.error().status()); |
| } else { |
| keep_txn_alive_status = Status::RemoteError("unspecified status"); |
| } |
| } else { |
| keep_txn_alive_status = s; |
| } |
| if (!keep_txn_alive_status.ok()) { |
| LOG(WARNING) << Substitute( |
| "KeepTransactionAlive() returned non-OK status: $0", |
| keep_txn_alive_status.ToString()); |
| } |
| } while (!latch.WaitFor(period)); |
| }); |
| auto cleanup = MakeScopedCleanup([&] { |
| latch.CountDown(); |
| txn_keepalive_sender.join(); |
| }); |
| |
| // Pause tserver processes. This is to check how the stale txn tracker works |
| // in case of 'frozen' and then 'thawed' processes. The essence is to make |
| // sure the former leader doesn't abort a transaction in case of scheduling |
| // anomalies, and TxnManager forwards txn keep-alive messages to proper |
| // TxnStatusMananger instance when leadership changes. |
| for (auto i = 0; i < 5; ++i) { |
| string old_leader_uuid; |
| ASSERT_OK(GetTxnStatusTabletLeader(&old_leader_uuid)); |
| auto* ts = cluster_->tablet_server_by_uuid(old_leader_uuid); |
| ASSERT_EVENTUALLY([&]{ |
| ASSERT_OK(ts->Pause()); |
| SleepFor(MonoDelta::FromMilliseconds( |
| std::max(3 * kRaftHbIntervalMs, 2 * kTxnKeepaliveIntervalMs))); |
| ASSERT_OK(ts->Resume()); |
| string new_leader_uuid; |
| ASSERT_OK(GetTxnStatusTabletLeader(&new_leader_uuid)); |
| ASSERT_NE(old_leader_uuid, new_leader_uuid); |
| auto* other_ts = FindOrDie(ts_map_, new_leader_uuid); |
| // Make sure the new leader has established itself up to the point that |
| // it can write into its backing txn status tablet: this is necessary to |
| // make sure it can abort the transaction, if it finds the transaction |
| // has stalled. |
| ASSERT_OK(WaitForOpFromCurrentTerm( |
| other_ts, txn_tablet_id(), consensus::COMMITTED_OPID, kTimeout)); |
| }); |
| // Give the TxnStatusManager instance running with the leader tablet replica |
| // a chance to detect and abort stale transactions, if any detected. |
| SleepFor(MonoDelta::FromMilliseconds(2 * kTxnKeepaliveIntervalMs)); |
| } |
| |
| NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN)); |
| |
| // Restart tablet servers hosting TxnStatusManager instances. This is to |
| // check that starting TxnStatusManager doesn't abort an open transaction |
| // which sending txn keepalive messages as required. |
| for (auto i = 0; i < 5; ++i) { |
| string old_leader_uuid; |
| ASSERT_OK(GetTxnStatusTabletLeader(&old_leader_uuid)); |
| auto* ts = cluster_->tablet_server_by_uuid(old_leader_uuid); |
| ts->Shutdown(); |
| ASSERT_EVENTUALLY([&]{ |
| string new_leader_uuid; |
| ASSERT_OK(GetTxnStatusTabletLeader(&new_leader_uuid)); |
| ASSERT_NE(old_leader_uuid, new_leader_uuid); |
| auto* other_ts = FindOrDie(ts_map_, new_leader_uuid); |
| // Make sure the new txn status tablet leader has established itself. For |
| // the reasoning, see the comment in the 'pause scenario' scope above. |
| ASSERT_OK(WaitForOpFromCurrentTerm( |
| other_ts, txn_tablet_id(), consensus::COMMITTED_OPID, kTimeout)); |
| }); |
| ASSERT_OK(ts->Restart()); |
| SleepFor(MonoDelta::FromMilliseconds(2 * kTxnKeepaliveIntervalMs)); |
| } |
| |
| NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN)); |
| |
| latch.CountDown(); |
| txn_keepalive_sender.join(); |
| cleanup.cancel(); |
| |
| // An extra sanity check: make sure the recent keepalive requests were sent |
| // successfully, as expected. |
| ASSERT_OK(keep_txn_alive_status); |
| |
| // Now, when no txn keepalive heartbeats are sent, the transaction |
| // should be automatically aborted by TxnStatusManager running with the |
| // leader replica of the txn status tablet. |
| ASSERT_EVENTUALLY([&]{ |
| NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORT_IN_PROGRESS)); |
| }); |
| |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| } |
| |
| // Check that internal txn heartbeater in KuduClient keeps sending |
| // KeepTransactionAlive requests even if no TxnStatusManager instance is |
| // accessible for some time, and the txn keepalive messages reach the |
| // destination after TxnStatusManager is back online. So, the txn should not be |
| // auto-aborted when its KuduTransaction objects is kept in the scope. |
| TEST_F(TxnStatusManagerITest, TxnKeptAliveByClientIfStatusManagerRestarted) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| shared_ptr<KuduClient> c; |
| ASSERT_OK(cluster_->CreateClient(nullptr, &c)); |
| |
| shared_ptr<KuduTransaction> txn; |
| ASSERT_OK(c->NewTransaction(&txn)); |
| |
| for (auto idx = 0; idx < cluster_->num_tablet_servers(); ++idx) { |
| auto* ts = cluster_->tablet_server(idx); |
| ts->Shutdown(); |
| } |
| |
| SleepFor(MonoDelta::FromMilliseconds(3 * kTxnKeepaliveIntervalMs)); |
| |
| for (auto idx = 0; idx < cluster_->num_tablet_servers(); ++idx) { |
| auto* ts = cluster_->tablet_server(idx); |
| ASSERT_OK(ts->Restart()); |
| } |
| |
| SleepFor(MonoDelta::FromMilliseconds(5 * kTxnKeepaliveIntervalMs)); |
| |
| ASSERT_OK(txn->StartCommit()); |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| } |
| |
| } // namespace kudu |