| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <atomic> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <map> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <thread> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/client.pb.h" |
| #include "kudu/client/scan_batch.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/txn_id.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tablet/txn_coordinator.h" |
| #include "kudu/tablet/txn_participant.h" |
| #include "kudu/transactions/transactions.pb.h" |
| #include "kudu/transactions/txn_system_client.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_bool(enable_txn_partition_lock); |
| DECLARE_bool(enable_txn_system_client_init); |
| DECLARE_bool(txn_manager_enabled); |
| DECLARE_bool(txn_manager_lazily_initialized); |
| DECLARE_bool(txn_schedule_background_tasks); |
| DECLARE_int32(txn_status_manager_inject_latency_finalize_commit_ms); |
| DECLARE_uint32(txn_background_rpc_timeout_ms); |
| DECLARE_uint32(txn_keepalive_interval_ms); |
| DECLARE_uint32(txn_manager_status_table_num_replicas); |
| DECLARE_uint32(txn_staleness_tracker_interval_ms); |
| |
| using kudu::client::KuduClient; |
| using kudu::client::KuduSession; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTransaction; |
| using kudu::client::KuduScanner; |
| using kudu::client::KuduScanBatch; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::cluster::InternalMiniClusterOptions; |
| using kudu::tablet::TabletReplica; |
| using kudu::tablet::TxnParticipant; |
| using kudu::transactions::TxnStatePB; |
| using kudu::transactions::TxnSystemClient; |
| using kudu::transactions::TxnTokenPB; |
| using kudu::tserver::MiniTabletServer; |
| using kudu::tserver::ParticipantOpPB; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace itest { |
| |
| class TxnCommitITest : public KuduTest { |
| public: |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| const int kNumRowsPerTxn = 10; |
| |
| void SetUp() override { |
| KuduTest::SetUp(); |
| // Speed up the staleness checks to help stress cases where it might race |
| // with commits. |
| FLAGS_txn_keepalive_interval_ms = 300; |
| FLAGS_txn_staleness_tracker_interval_ms = 100; |
| NO_FATALS(SetUpClusterAndTable(1)); |
| } |
| |
| // Sets up a cluster with the given number of tservers, creating a |
| // single-replica transaction status table and user-defined table. |
| void SetUpClusterAndTable(int num_tservers, int num_replicas = 1) { |
| FLAGS_enable_txn_system_client_init = true; |
| FLAGS_txn_manager_enabled = true; |
| FLAGS_txn_manager_lazily_initialized = false; |
| FLAGS_txn_manager_status_table_num_replicas = num_replicas; |
| |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = num_tservers; |
| cluster_.reset(new InternalMiniCluster(env_, std::move(opts))); |
| ASSERT_OK(cluster_->Start()); |
| ASSERT_EVENTUALLY([&] { |
| // Find the TxnStatusManager's tablet ID. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| auto* ts = cluster_->mini_tablet_server(i); |
| ASSERT_OK(ts->WaitStarted()); |
| const auto tablet_ids = ts->ListTablets(); |
| if (tablet_ids.empty()) continue; |
| tsm_id_ = tablet_ids[0]; |
| } |
| ASSERT_FALSE(tsm_id_.empty()); |
| }); |
| TxnSystemClient::Create(cluster_->master_rpc_addrs(), |
| cluster_->messenger()->sasl_proto_name(), |
| &txn_client_); |
| ASSERT_OK(txn_client_->OpenTxnStatusTable()); |
| |
| client::KuduClientBuilder builder; |
| builder.default_admin_operation_timeout(kTimeout); |
| ASSERT_OK(cluster_->CreateClient(&builder, &client_)); |
| string authn_creds; |
| ASSERT_OK(client_->ExportAuthenticationCredentials(&authn_creds)); |
| client::AuthenticationCredentialsPB pb; |
| ASSERT_TRUE(pb.ParseFromString(authn_creds)); |
| ASSERT_TRUE(pb.has_real_user()); |
| client_user_ = pb.real_user(); |
| |
| TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH); |
| w.set_num_replicas(num_replicas); |
| w.set_num_tablets(2); |
| w.Setup(); |
| w.Start(); |
| while (w.rows_inserted() < 1) { |
| SleepFor(MonoDelta::FromMilliseconds(50)); |
| } |
| w.StopAndJoin(); |
| table_name_ = w.table_name(); |
| initial_row_count_ = w.rows_inserted(); |
| |
| // Since the test table uses hash partitioning, every tablet gets |
| // at least one write operation when inserting several rows into the test |
| // table. So, for every transaction inserting several rows into the test |
| // table, it's easy to build the list of transaction participants. |
| unordered_set<string> tablet_ids; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| auto* ts = cluster_->mini_tablet_server(i); |
| for (const auto& tablet_id : ts->ListTablets()) { |
| if (tablet_id != tsm_id_) { |
| tablet_ids.emplace(tablet_id); |
| } |
| } |
| } |
| for (auto& t : tablet_ids) { |
| participant_ids_.emplace_back(std::move(t)); |
| } |
| } |
| |
| // Start a transaction, manually registering the given participants, and |
| // returning the associated transaction and session handles. |
| Status BeginTransaction(shared_ptr<KuduTransaction>* txn, |
| shared_ptr<KuduSession>* session) { |
| shared_ptr<KuduTransaction> txn_local; |
| RETURN_NOT_OK(client_->NewTransaction(&txn_local)); |
| shared_ptr<KuduSession> txn_session_local; |
| RETURN_NOT_OK(txn_local->CreateSession(&txn_session_local)); |
| *txn = std::move(txn_local); |
| *session = std::move(txn_session_local); |
| return Status::OK(); |
| } |
| |
| // Insert 'num_rows' rows to the given session, starting with 'start_row', to |
| // every table in 'table_names' or 'table_name_' if not set. |
| Status InsertToSession( |
| const shared_ptr<KuduSession>& txn_session, int start_row, int num_rows, |
| vector<string> table_names = {}) { |
| if (table_names.empty()) { |
| table_names = { table_name_ }; |
| } |
| for (const auto& table_name : table_names) { |
| shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(client_->OpenTable(table_name, &table)); |
| const int target_row_id = start_row + num_rows; |
| for (int i = start_row; i < target_row_id; i++) { |
| auto* insert = table->NewInsertIgnore(); |
| RETURN_NOT_OK(insert->mutable_row()->SetInt32(0, i)); |
| RETURN_NOT_OK(insert->mutable_row()->SetInt32(1, i)); |
| RETURN_NOT_OK(txn_session->Apply(insert)); |
| RETURN_NOT_OK(txn_session->Flush()); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status CountRows(int* num_rows) { |
| shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(client_->OpenTable(table_name_, &table)); |
| KuduScanner scanner(table.get()); |
| RETURN_NOT_OK(scanner.Open()); |
| int rows = 0; |
| while (scanner.HasMoreRows()) { |
| KuduScanBatch batch; |
| RETURN_NOT_OK(scanner.NextBatch(&batch)); |
| rows += batch.NumRows(); |
| } |
| *num_rows = rows; |
| return Status::OK(); |
| } |
| |
| // Returns the transaction IDs participated in by 'tablet_id' that have been |
| // aborted on 'num_replicas' replicas. |
| Status GetAbortedTxnsForParticipant(const string& tablet_id, int num_replicas, |
| vector<TxnId>* aborted_txns) { |
| vector<TxnId> txn_ids; |
| unordered_map<int64_t, int> aborts_per_txn; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| scoped_refptr<TabletReplica> r; |
| Status s = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletReplica( |
| tablet_id, &r); |
| if (!s.ok()) { |
| continue; |
| } |
| for (const auto& e : r->tablet()->txn_participant()->GetTxnsForTests()) { |
| if (e.state == tablet::TxnState::kAborted) { |
| LookupOrEmplace(&aborts_per_txn, e.txn_id, 0)++; |
| } |
| } |
| } |
| for (const auto& txn_id_and_count : aborts_per_txn) { |
| if (txn_id_and_count.second == num_replicas) { |
| txn_ids.emplace_back(txn_id_and_count.first); |
| } |
| } |
| *aborted_txns = std::move(txn_ids); |
| return Status::OK(); |
| } |
| |
| protected: |
| unique_ptr<InternalMiniCluster> cluster_; |
| unique_ptr<TxnSystemClient> txn_client_; |
| |
| shared_ptr<KuduClient> client_; |
| string client_user_; |
| |
| string table_name_; |
| int initial_row_count_; |
| |
| // Needed for checking on internals of txn participants. |
| string tsm_id_; |
| vector<string> participant_ids_; |
| }; |
| |
| TEST_F(TxnCommitITest, TestBasicCommits) { |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| |
| // Even though we've inserted, we shouldn't be able to see any new rows until |
| // after we commit. |
| int num_rows = 0; |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_, num_rows); |
| |
| ASSERT_OK(txn->Commit()); |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn, num_rows); |
| |
| // IsCommitComplete() should verify that the transaction is in the right |
| // state. |
| Status completion_status; |
| bool is_complete; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_OK(completion_status); |
| ASSERT_TRUE(is_complete); |
| } |
| |
| TEST_F(TxnCommitITest, TestBasicAborts) { |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| |
| int num_rows = 0; |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_, num_rows); |
| |
| ASSERT_OK(txn->Rollback()); |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_, num_rows); |
| |
| // IsCommitComplete() should verify that the transaction is aborted. We need |
| // to wait for this to happen, since 'is_complete' is contingent on the abort |
| // tasks finishing. |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete = false; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_TRUE(is_complete); |
| }); |
| |
| // On the participants, we should see the transaction as aborted. |
| ASSERT_EVENTUALLY([&] { |
| for (const auto& tablet_id : participant_ids_) { |
| vector<TxnId> aborted_txns; |
| ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 1, &aborted_txns)); |
| ASSERT_EQ(1, aborted_txns.size()); |
| } |
| }); |
| } |
| |
| TEST_F(TxnCommitITest, TestAbortInProgress) { |
| FLAGS_txn_schedule_background_tasks = false; |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| ASSERT_OK(txn->Rollback()); |
| |
| // When background tasks are disabled, we'll be left in ABORT_IN_PROGRESS, |
| // and we should be able to determine it isn't complete. |
| Status completion_status; |
| bool is_complete = true; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_STR_CONTAINS(completion_status.ToString(), "transaction is being aborted"); |
| ASSERT_FALSE(is_complete); |
| |
| // Once enabled, background tasks should take hold and the abort should |
| // complete. |
| FLAGS_txn_schedule_background_tasks = true; |
| auto* mts = cluster_->mini_tablet_server(0); |
| mts->Shutdown(); |
| ASSERT_OK(mts->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_STR_CONTAINS(completion_status.ToString(), "transaction has been aborted"); |
| ASSERT_TRUE(is_complete); |
| }); |
| } |
| |
| TEST_F(TxnCommitITest, TestBackgroundAborts) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| string serialized_txn; |
| { |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| |
| int num_rows = 0; |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_, num_rows); |
| ASSERT_OK(txn->Serialize(&serialized_txn)); |
| } |
| // Wait a bit for a background abort to happen. |
| SleepFor(MonoDelta::FromMilliseconds(5 * FLAGS_txn_keepalive_interval_ms)); |
| |
| // IsCommitComplete() should verify that the transaction is aborted. |
| Status completion_status; |
| bool is_complete = false; |
| shared_ptr<KuduTransaction> txn; |
| ASSERT_OK(KuduTransaction::Deserialize(client_, serialized_txn, &txn)); |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_TRUE(is_complete); |
| |
| // On the participants, we should see the transaction as aborted. |
| ASSERT_EVENTUALLY([&] { |
| for (const auto& tablet_id : participant_ids_) { |
| vector<TxnId> aborted_txns; |
| ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 1, &aborted_txns)); |
| ASSERT_EQ(1, aborted_txns.size()); |
| } |
| }); |
| } |
| |
| // Test that if we delete the TxnStatusManager while tasks are on-going, |
| // nothing goes catastrophically wrong (i.e. no crashes). |
| TEST_F(TxnCommitITest, TestCommitWhileDeletingTxnStatusManager) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| |
| ASSERT_OK(txn->StartCommit()); |
| ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->DeleteTablet( |
| tsm_id_, tablet::TABLET_DATA_TOMBSTONED, std::nullopt)); |
| |
| Status completion_status; |
| bool is_complete; |
| Status s = txn->IsCommitComplete(&is_complete, &completion_status); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| } |
| |
| TEST_F(TxnCommitITest, TestCommitAfterDeletingParticipant) { |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| ASSERT_OK(client_->DeleteTable(table_name_)); |
| ASSERT_OK(txn->StartCommit()); |
| |
| // The transaction should eventually fail, treating the deleted participant |
| // as a fatal error. |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete = false; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_TRUE(is_complete); |
| }); |
| } |
| |
| TEST_F(TxnCommitITest, TestCommitAfterDroppingRangeParticipant) { |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| ASSERT_OK(client_->DeleteTable(table_name_)); |
| const auto& schema = client::KuduSchema::FromSchema(GetSimpleTestSchema()); |
| unique_ptr<client::KuduTableAlterer> alterer(client_->NewTableAlterer(table_name_)); |
| alterer->DropRangePartition(schema.NewRow(), schema.NewRow()); |
| alterer.reset(); |
| |
| ASSERT_OK(txn->StartCommit()); |
| |
| // The transaction should eventually abort, treating the deleted participant |
| // as fatal, resulting in an aborted transaction. |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete = false; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_TRUE(is_complete); |
| }); |
| } |
| |
| TEST_F(TxnCommitITest, TestRestartingWhileCommitting) { |
| FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 10000; |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| ASSERT_OK(txn->StartCommit()); |
| // Stop the tserver without allowing the finalize commit to complete. |
| cluster_->mini_tablet_server(0)->Shutdown(); |
| |
| FLAGS_txn_schedule_background_tasks = false; |
| ASSERT_OK(cluster_->mini_tablet_server(0)->Restart()); |
| |
| // The transaction should be incomplete, as background tasks are disabled, |
| // and since we shut down before allowing to finish committing. |
| Status completion_status; |
| bool is_complete = false; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsIncomplete()) << completion_status.ToString(); |
| ASSERT_FALSE(is_complete); |
| |
| // If we re-enable background tasks, background tasks should be scheduled to |
| // commit the transaction. |
| FLAGS_txn_schedule_background_tasks = true; |
| cluster_->mini_tablet_server(0)->Shutdown(); |
| ASSERT_OK(cluster_->mini_tablet_server(0)->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete = false; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_OK(completion_status); |
| ASSERT_TRUE(is_complete); |
| }); |
| } |
| |
| // Test aborting a botched commit mid-way by deleting some of its participants |
| // while committing. The result should be that the transaction gets aborted and |
| // all participants abort the local transactions. |
| TEST_F(TxnCommitITest, TestAbortRacingWithBotchedCommit) { |
| // First, create another table that we'll delete later on. |
| const string kSecondTableName = "default.second_table"; |
| TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH); |
| w.set_num_replicas(1); |
| w.set_num_tablets(2); |
| w.set_table_name(kSecondTableName); |
| w.Setup(); |
| w.Start(); |
| while (w.rows_inserted() < 1) { |
| SleepFor(MonoDelta::FromMilliseconds(50)); |
| } |
| w.StopAndJoin(); |
| unordered_set<string> participant_ids; |
| auto* mts = cluster_->mini_tablet_server(0); |
| for (const auto& tablet_id : mts->ListTablets()) { |
| if (tablet_id != tsm_id_) { |
| participant_ids.emplace(tablet_id); |
| } |
| } |
| ASSERT_EQ(4, participant_ids.size()); |
| vector<string> both_tables_participant_ids(participant_ids.begin(), participant_ids.end()); |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn, |
| { table_name_, kSecondTableName })); |
| ASSERT_OK(client_->DeleteTable(kSecondTableName)); |
| FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000; |
| ASSERT_OK(txn->StartCommit()); |
| ASSERT_OK(txn->Rollback()); |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete = false; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_TRUE(is_complete); |
| }); |
| |
| // Let's confirm that all remaining participants see the same transaction |
| // metadata. |
| vector<scoped_refptr<TabletReplica>> replicas; |
| mts->server()->tablet_manager()->GetTabletReplicas(&replicas); |
| vector<vector<TxnParticipant::TxnEntry>> txn_entries_per_replica; |
| for (const auto& r : replicas) { |
| if (r->tablet_id() != tsm_id_ && r->tablet_metadata()->table_name() != kSecondTableName) { |
| txn_entries_per_replica.emplace_back(r->tablet()->txn_participant()->GetTxnsForTests()); |
| } |
| } |
| ASSERT_GT(txn_entries_per_replica.size(), 1); |
| for (int i = 1; i < txn_entries_per_replica.size(); i++) { |
| const auto& txns = txn_entries_per_replica[i]; |
| ASSERT_FALSE(txns.empty()); |
| for (const auto& txn_entry : txns) { |
| ASSERT_EQ(tablet::kAborted, txn_entry.state); |
| } |
| EXPECT_EQ(txn_entries_per_replica[0], txns); |
| } |
| } |
| |
| // Test restarting while commit tasks are on-going, while at the same time, |
| // some participants are deleted. The transaction should be aborted on all |
| // participants. |
| TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) { |
| // First, create another table that we'll delete later on. |
| const string kSecondTableName = "default.second_table"; |
| TestWorkload w(cluster_.get()); |
| w.set_num_replicas(1); |
| w.set_num_tablets(2); |
| w.set_table_name(kSecondTableName); |
| w.Setup(); |
| w.Start(); |
| while (w.rows_inserted() < 1) { |
| SleepFor(MonoDelta::FromMilliseconds(50)); |
| } |
| w.StopAndJoin(); |
| |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn, |
| { table_name_, kSecondTableName })); |
| ASSERT_OK(client_->DeleteTable(kSecondTableName)); |
| FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 10000; |
| ASSERT_OK(txn->StartCommit()); |
| |
| // Shut down without giving time for the commit to complete. |
| auto* mts = cluster_->mini_tablet_server(0); |
| mts->Shutdown(); |
| ASSERT_OK(mts->Restart()); |
| |
| // Delete some of the participants. Upon completion, the commit process |
| // should result in an abort. |
| ASSERT_OK(mts->server()->tablet_manager()->WaitForAllBootstrapsToFinish()); |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete = false; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_TRUE(is_complete); |
| }); |
| |
| // Let's confirm that all remaining participants see the same transaction |
| // metadata. |
| vector<scoped_refptr<TabletReplica>> replicas; |
| mts->server()->tablet_manager()->GetTabletReplicas(&replicas); |
| vector<vector<TxnParticipant::TxnEntry>> txn_entries_per_replica; |
| for (const auto& r : replicas) { |
| if (r->tablet_id() != tsm_id_ && r->tablet_metadata()->table_name() != kSecondTableName) { |
| txn_entries_per_replica.emplace_back(r->tablet()->txn_participant()->GetTxnsForTests()); |
| } |
| } |
| ASSERT_GT(txn_entries_per_replica.size(), 1); |
| for (int i = 1; i < txn_entries_per_replica.size(); i++) { |
| const auto& txns = txn_entries_per_replica[i]; |
| ASSERT_FALSE(txns.empty()); |
| for (const auto& txn_entry : txns) { |
| ASSERT_EQ(tablet::kAborted, txn_entry.state); |
| } |
| EXPECT_EQ(txn_entries_per_replica[0], txns); |
| } |
| } |
| |
| // Test that when loading the TxnStatusManagers, nothing catastrophic happens |
| // if we can't connect to the masters. |
| TEST_F(TxnCommitITest, TestLoadTxnStatusManagerWhenNoMasters) { |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| |
| cluster_->mini_master()->Shutdown(); |
| cluster_->mini_tablet_server(0)->Shutdown(); |
| ASSERT_OK(cluster_->mini_tablet_server(0)->Restart()); |
| |
| // While the master is down, we can't contact the TxnManager. |
| Status s = BeginTransaction(&txn, &txn_session); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| |
| // Once restarted, it should be business as usual. |
| ASSERT_OK(cluster_->mini_master()->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| }); |
| scoped_refptr<tablet::TabletReplica> tsm_replica; |
| auto* tablet_manager = cluster_->mini_tablet_server(0)->server()->tablet_manager(); |
| ASSERT_OK(tablet_manager->GetTabletReplica(tsm_id_, &tsm_replica)); |
| auto participants_by_txn_id = |
| DCHECK_NOTNULL(tsm_replica->txn_coordinator())->GetParticipantsByTxnIdForTests(); |
| ASSERT_EQ(2, participants_by_txn_id.size()); |
| } |
| |
| // Test what happens if a participant is aborted somehow, and we try to commit. |
| // We don't expect this to happen since aborts should start with writing an |
| // abort record to the TxnStatusManager, but let's at least make sure we |
| // understand what happens if this does occur. |
| TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| |
| // Send an ABORT_TXN op to the participant. |
| ParticipantOpPB op_pb; |
| op_pb.set_txn_id(0); |
| op_pb.set_type(ParticipantOpPB::ABORT_TXN); |
| ASSERT_OK(txn_client_->ParticipateInTransaction( |
| participant_ids_[0], op_pb, MonoTime::Now() + MonoDelta::FromSeconds(3))); |
| |
| // When we try to commit, we should end up not completing. |
| ASSERT_OK(txn->StartCommit()); |
| |
| SleepFor(MonoDelta::FromSeconds(3)); |
| Status completion_status; |
| bool is_complete; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| } |
| |
| // Try concurrently beginning to commit a bunch of different transactions. |
| TEST_F(TxnCommitITest, TestConcurrentCommitCalls) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| constexpr const int kNumTxns = 4; |
| vector<shared_ptr<KuduTransaction>> txns(kNumTxns); |
| int row_start = initial_row_count_; |
| for (int i = 0; i < kNumTxns; i++) { |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txns[i], &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, row_start, kNumRowsPerTxn)); |
| row_start += kNumRowsPerTxn; |
| } |
| int num_rows = 0; |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_, num_rows); |
| |
| vector<thread> threads; |
| vector<Status> results(kNumTxns); |
| for (int i = 0; i < kNumTxns; i++) { |
| threads.emplace_back([&, i] { |
| results[i] = txns[i]->StartCommit(); |
| }); |
| } |
| for (auto& t : threads) { |
| t.join(); |
| } |
| for (const auto& s : results) { |
| EXPECT_OK(s); |
| } |
| ASSERT_EVENTUALLY([&] { |
| for (const auto& txn : txns) { |
| Status completion_status; |
| bool is_complete; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_OK(completion_status); |
| ASSERT_TRUE(is_complete); |
| } |
| }); |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn * kNumTxns, num_rows); |
| } |
| |
| TEST_F(TxnCommitITest, TestConcurrentAbortsAndCommits) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| constexpr const int kNumTxns = 10; |
| vector<shared_ptr<KuduTransaction>> txns(kNumTxns); |
| int row_start = initial_row_count_; |
| for (int i = 0; i < kNumTxns; i++) { |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txns[i], &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, row_start, kNumRowsPerTxn)); |
| row_start += kNumRowsPerTxn; |
| } |
| int num_rows = 0; |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_, num_rows); |
| // To encourage races between concurrent aborts and commits, inject a random |
| // sleep before each call. |
| constexpr const int kMaxSleepMs = 1000; |
| std::atomic<int> num_committed_txns = 0; |
| vector<thread> threads; |
| for (int i = 0; i < kNumTxns; i++) { |
| threads.emplace_back([&, i] { |
| SleepFor(MonoDelta::FromMilliseconds(rand() % kMaxSleepMs)); |
| Status s = txns[i]->Commit(); |
| if (s.ok()) { |
| num_committed_txns++; |
| } |
| }); |
| threads.emplace_back([&, i] { |
| SleepFor(MonoDelta::FromMilliseconds(rand() % kMaxSleepMs)); |
| ignore_result(txns[i]->Rollback()); |
| }); |
| } |
| for (auto& t : threads) { |
| t.join(); |
| } |
| ASSERT_OK(CountRows(&num_rows)); |
| // NOTE: we can compare an exact count here and not worry about whether we |
| // completed aborting rows because even if we didn't complete the abort, they |
| // shouldn't be visible to clients anyway. |
| const int expected_rows = initial_row_count_ + kNumRowsPerTxn * num_committed_txns; |
| VLOG(1) << Substitute("Expecting $0 rows from $1 committed transactions", |
| expected_rows, num_committed_txns.load()); |
| ASSERT_EQ(expected_rows, num_rows); |
| |
| // Ensure all transactions are either committed or aborted. |
| vector<scoped_refptr<TabletReplica>> replicas; |
| cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplicas(&replicas); |
| for (const auto& r : replicas) { |
| ASSERT_EVENTUALLY([&] { |
| if (r->tablet_metadata()->table_name() == table_name_) { |
| const auto txns = r->tablet()->txn_participant()->GetTxnsForTests(); |
| for (const auto& txn : txns) { |
| ASSERT_TRUE(txn.state == tablet::kAborted || txn.state == tablet::kCommitted) |
| << Substitute("Txn in unexpected state: $0", txn.state);; |
| } |
| } |
| }); |
| } |
| } |
| |
| // Test that committing the same transaction concurrently doesn't lead to any |
| // issues. |
| TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) { |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| int num_rows = 0; |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_, num_rows); |
| |
| constexpr const int kNumThreads = 4; |
| vector<thread> threads; |
| vector<Status> results(kNumThreads); |
| for (int i = 0; i < kNumThreads; i++) { |
| threads.emplace_back([&, i] { |
| results[i] = txn->StartCommit(); |
| }); |
| } |
| for (auto& t : threads) { |
| t.join(); |
| } |
| for (const auto& s : results) { |
| EXPECT_OK(s); |
| } |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_OK(completion_status); |
| ASSERT_TRUE(is_complete); |
| }); |
| ASSERT_OK(CountRows(&num_rows)); |
| ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn, num_rows); |
| } |
| |
| TEST_F(TxnCommitITest, TestDontBackgroundAbortIfCommitInProgress) { |
| FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 1000; |
| string serialized_txn; |
| { |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| ASSERT_OK(txn->StartCommit()); |
| ASSERT_OK(txn->Serialize(&serialized_txn)); |
| } |
| // Wait a bit to allow would-be background aborts to happen. |
| SleepFor(MonoDelta::FromSeconds(1)); |
| |
| // Since we've already begun committing, we shouldn't abort. On the contrary, |
| // we should eventually successfully fully commit the transaction. |
| bool is_complete = false; |
| shared_ptr<KuduTransaction> txn; |
| ASSERT_OK(KuduTransaction::Deserialize(client_, serialized_txn, &txn)); |
| Status completion_status; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_FALSE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_OK(completion_status); |
| ASSERT_TRUE(is_complete); |
| }); |
| } |
| |
| // Test that we can abort if a transaction hasn't finalized its commit yet. |
| TEST_F(TxnCommitITest, TestAbortIfCommitInProgress) { |
| FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 1000; |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| ASSERT_OK(txn->StartCommit()); |
| ASSERT_OK(txn->Rollback()); |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()); |
| ASSERT_TRUE(is_complete); |
| }); |
| } |
| |
| // Test that has two nodes so we can place the TxnStatusManager and transaction |
| // participant on separate nodes. This can be useful for testing when some |
| // nodes are down. |
| class TwoNodeTxnCommitITest : public TxnCommitITest { |
| public: |
| void SetUp() override { |
| KuduTest::SetUp(); |
| NO_FATALS(SetUpClusterAndTable(2)); |
| |
| // Figure out which tserver has the participant and which has the |
| // TxnStatusManager. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| auto* ts = cluster_->mini_tablet_server(i); |
| const auto tablet_ids = ts->ListTablets(); |
| for (const auto& tablet_id : tablet_ids) { |
| if (tablet_id == tsm_id_) { |
| tsm_ts_ = ts; |
| break; |
| } |
| } |
| } |
| DCHECK(tsm_ts_); |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| if (cluster_->mini_tablet_server(i) != tsm_ts_) { |
| prt_ts_ = cluster_->mini_tablet_server(i); |
| break; |
| } |
| } |
| DCHECK(prt_ts_); |
| } |
| protected: |
| // The tablet server that has a TxnStatusManager. |
| MiniTabletServer* tsm_ts_; |
| |
| // A tablet server that has at least one participant. |
| MiniTabletServer* prt_ts_; |
| }; |
| |
| // Test that nothing goes wrong when participants are down, and that we'll |
| // retry until they become available again. |
| TEST_F(TwoNodeTxnCommitITest, TestCommitWhenParticipantsAreDown) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| prt_ts_->Shutdown(); |
| ASSERT_OK(txn->StartCommit()); |
| |
| // Since our participant is down, we can't proceed with the commit. |
| Status completion_status; |
| bool is_complete; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_FALSE(is_complete); |
| ASSERT_TRUE(completion_status.IsIncomplete()) << completion_status.ToString(); |
| SleepFor(MonoDelta::FromSeconds(5)); |
| |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_FALSE(is_complete); |
| ASSERT_TRUE(completion_status.IsIncomplete()) << completion_status.ToString(); |
| |
| // Once we start the tserver with the participant, the commit should complete |
| // automatically. |
| ASSERT_OK(prt_ts_->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete; |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(is_complete); |
| }); |
| } |
| |
| // Test that when we start up, pending commits will start background tasks to |
| // finalize the commit or abort. |
| TEST_F(TwoNodeTxnCommitITest, TestStartTasksDuringStartup) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| shared_ptr<KuduTransaction> committed_txn; |
| { |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&committed_txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| } |
| shared_ptr<KuduTransaction> aborted_txn; |
| { |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&aborted_txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_ + kNumRowsPerTxn, kNumRowsPerTxn)); |
| } |
| |
| // Shut down our participant's tserver so our commit task keeps retrying. |
| prt_ts_->Shutdown(); |
| ASSERT_OK(committed_txn->StartCommit()); |
| ASSERT_OK(aborted_txn->Rollback()); |
| |
| Status completion_status; |
| bool is_complete; |
| ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_FALSE(is_complete); |
| ASSERT_TRUE(completion_status.IsIncomplete()) << completion_status.ToString(); |
| |
| ASSERT_OK(aborted_txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_FALSE(is_complete); |
| |
| // Shut down the TxnStatusManager to stop our tasks. |
| tsm_ts_->Shutdown(); |
| |
| // Restart both tservers. The background tasks should be restarted and |
| // eventually succeed. |
| ASSERT_OK(prt_ts_->Restart()); |
| ASSERT_OK(tsm_ts_->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| Status completion_status; |
| bool is_complete; |
| ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_OK(completion_status); |
| ASSERT_TRUE(is_complete); |
| |
| ASSERT_OK(aborted_txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_TRUE(is_complete); |
| }); |
| ASSERT_EVENTUALLY([&] { |
| for (const auto& tablet_id : participant_ids_) { |
| vector<TxnId> aborted_txns; |
| ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 1, &aborted_txns)); |
| ASSERT_EQ(1, aborted_txns.size()); |
| } |
| }); |
| } |
| |
| // Abruptly shut down the tablet server while running commit tasks, ensuring |
| // nothing bad happens. |
| TEST_F(TwoNodeTxnCommitITest, TestCommitWhileShuttingDownTxnStatusManager) { |
| shared_ptr<KuduTransaction> txn; |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&txn, &txn_session)); |
| |
| ASSERT_OK(txn->StartCommit()); |
| tsm_ts_->Shutdown(); |
| ASSERT_OK(tsm_ts_->Restart()); |
| |
| Status completion_status; |
| bool is_complete = false; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(is_complete); |
| }); |
| } |
| |
| // Test that has three nodes so we can test leadership. |
| class ThreeNodeTxnCommitITest : public TxnCommitITest { |
| public: |
| void SetUp() override { |
| KuduTest::SetUp(); |
| NO_FATALS(SetUpClusterAndTable(3, 3)); |
| |
| // Quiesce all but 'leader_idx_', so it becomes the leader. |
| int leader_idx = 0; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = i != leader_idx; |
| } |
| leader_ts_ = cluster_->mini_tablet_server(leader_idx); |
| // We should have two leaders for our table, and one for the |
| // TxnStatusManager. |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(3, leader_ts_->server()->num_raft_leaders()->value()); |
| }); |
| } |
| protected: |
| MiniTabletServer* leader_ts_; |
| }; |
| |
| TEST_F(ThreeNodeTxnCommitITest, TestCommitTasksReloadOnLeadershipChange) { |
| // Disable the partition lock as there are concurrent transactions. |
| // TODO(awong): update this when implementing finer grained locking. |
| FLAGS_enable_txn_partition_lock = false; |
| FLAGS_txn_schedule_background_tasks = false; |
| shared_ptr<KuduTransaction> committed_txn; |
| shared_ptr<KuduTransaction> aborted_txn; |
| { |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&committed_txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); |
| } |
| { |
| shared_ptr<KuduSession> txn_session; |
| ASSERT_OK(BeginTransaction(&aborted_txn, &txn_session)); |
| ASSERT_OK(InsertToSession(txn_session, initial_row_count_ + kNumRowsPerTxn, kNumRowsPerTxn)); |
| } |
| ASSERT_OK(committed_txn->StartCommit()); |
| Status completion_status; |
| bool is_complete = false; |
| ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsIncomplete()) << completion_status.ToString(); |
| ASSERT_FALSE(is_complete); |
| |
| ASSERT_OK(aborted_txn->Rollback()); |
| ASSERT_OK(aborted_txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_FALSE(is_complete); |
| for (const auto& tablet_id : participant_ids_) { |
| vector<TxnId> aborted_txns; |
| ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 3, &aborted_txns)); |
| ASSERT_EQ(0, aborted_txns.size()); |
| } |
| |
| FLAGS_txn_schedule_background_tasks = true; |
| // Change our quiescing states so a new leader can be elected. |
| *leader_ts_->server()->mutable_quiescing() = true; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| auto* mts = cluster_->mini_tablet_server(i); |
| if (leader_ts_ != mts) { |
| *mts->server()->mutable_quiescing() = false; |
| } |
| } |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(0, leader_ts_->server()->num_raft_leaders()->value()); |
| }); |
| // Upon becoming leader, we should have started our commit task and completed |
| // the commit. |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(is_complete); |
| }); |
| // The aborted transaction should still be aborted, and we should be able to |
| // validate its abort status in its participants' metadata. |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(aborted_txn->IsCommitComplete(&is_complete, &completion_status)); |
| ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString(); |
| ASSERT_TRUE(is_complete); |
| }); |
| ASSERT_EVENTUALLY([&] { |
| for (const auto& tablet_id : participant_ids_) { |
| vector<TxnId> aborted_txns; |
| ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 3, &aborted_txns)); |
| ASSERT_EQ(1, aborted_txns.size()); |
| } |
| }); |
| } |
| |
| } // namespace itest |
| } // namespace kudu |