| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cstdint> |
| #include <functional> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <thread> |
| #include <unordered_map> |
| #include <vector> |
| |
| #include <boost/none_t.hpp> |
| #include <boost/optional/optional.hpp> |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/client.pb.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/master/master.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/master/ts_manager.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/mini-cluster/mini_cluster.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/transactions/transactions.pb.h" |
| #include "kudu/transactions/txn_status_tablet.h" |
| #include "kudu/transactions/txn_system_client.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_double(leader_failure_max_missed_heartbeat_periods); |
| DECLARE_string(superuser_acl); |
| DECLARE_string(user_acl); |
| DECLARE_uint32(transaction_keepalive_interval_ms); |
| |
| using kudu::client::AuthenticationCredentialsPB; |
| using kudu::client::KuduClient; |
| using kudu::client::KuduClientBuilder; |
| using kudu::client::KuduTable; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::cluster::InternalMiniClusterOptions; |
| using kudu::itest::TServerDetails; |
| using kudu::itest::TabletServerMap; |
| using kudu::tablet::TabletReplica; |
| using kudu::transactions::TxnStatePB; |
| using kudu::transactions::TxnStatusEntryPB; |
| using kudu::transactions::TxnStatusTablet; |
| using kudu::transactions::TxnSystemClient; |
| using std::map; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace itest { |
| |
| namespace { |
| string ParticipantId(int i) { |
| return Substitute("strawhat-$0", i); |
| } |
| } // anonymous namespace |
| |
| class TxnStatusTableITest : public KuduTest { |
| public: |
| TxnStatusTableITest() {} |
| |
| void SetUp() override { |
| KuduTest::SetUp(); |
| cluster_.reset(new InternalMiniCluster(env_, {})); |
| ASSERT_OK(cluster_->Start()); |
| |
| // Create the txn system client with which to communicate with the cluster. |
| vector<string> master_addrs; |
| for (const auto& hp : cluster_->master_rpc_addrs()) { |
| master_addrs.emplace_back(hp.ToString()); |
| } |
| ASSERT_OK(TxnSystemClient::Create(master_addrs, &txn_sys_client_)); |
| } |
| |
| // Ensures that all replicas have the right table type set. |
| void CheckTableTypes(const map<TableTypePB, int>& expected_counts) { |
| // Check that the tablets of the table all have transaction coordinators. |
| vector<scoped_refptr<TabletReplica>> replicas; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| auto* ts = cluster_->mini_tablet_server(i); |
| const auto& tablets = ts->ListTablets(); |
| for (const auto& t : tablets) { |
| scoped_refptr<TabletReplica> r; |
| ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(t, &r)); |
| replicas.emplace_back(std::move(r)); |
| } |
| } |
| map<TableTypePB, int> type_counts; |
| for (const auto& r : replicas) { |
| const auto optional_table_type = r->tablet_metadata()->table_type(); |
| if (boost::none == optional_table_type) { |
| LookupOrEmplace(&type_counts, TableTypePB::DEFAULT_TABLE, 0)++; |
| } else { |
| ASSERT_EQ(TableTypePB::TXN_STATUS_TABLE, *optional_table_type); |
| ASSERT_NE(nullptr, r->txn_coordinator()); |
| LookupOrEmplace(&type_counts, *optional_table_type, 0)++; |
| } |
| } |
| ASSERT_EQ(expected_counts, type_counts); |
| } |
| |
| // Creates and returns a client as the given user. |
| Status CreateClientAs(const string& user, shared_ptr<KuduClient>* client) { |
| KuduClientBuilder client_builder; |
| string authn_creds; |
| AuthenticationCredentialsPB pb; |
| pb.set_real_user(user); |
| CHECK(pb.SerializeToString(&authn_creds)); |
| client_builder.import_authentication_credentials(authn_creds); |
| shared_ptr<KuduClient> c; |
| RETURN_NOT_OK(cluster_->CreateClient(&client_builder, &c)); |
| *client = std::move(c); |
| return Status::OK(); |
| } |
| |
| // Sets the superuser and user ACLs, restarts the master, and waits for some |
| // tablet servers to register, yielding a fatal if anything fails. |
| void SetSuperuserAndUser(const string& superuser, const string& user) { |
| FLAGS_superuser_acl = superuser; |
| FLAGS_user_acl = user; |
| cluster_->mini_master()->Shutdown(); |
| ASSERT_OK(cluster_->mini_master()->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_LT(0, cluster_->mini_master()->master()->ts_manager()->GetLiveCount()); |
| }); |
| } |
| |
| shared_ptr<KuduClient> client_sp() { |
| return txn_sys_client_->client_; |
| } |
| |
| protected: |
| static constexpr const char* kUser = "user"; |
| unique_ptr<InternalMiniCluster> cluster_; |
| unique_ptr<TxnSystemClient> txn_sys_client_; |
| }; |
| |
| // Test that the transaction status table is not listed. We expect it to be |
| // fairly common for a super-user to list tables to backup a cluster, so we |
| // hide the transaction status table, which we don't expect to back up. |
| TEST_F(TxnStatusTableITest, TestTxnStatusTableNotListed) { |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| auto client = client_sp(); |
| vector<string> tables_list; |
| ASSERT_OK(client->ListTables(&tables_list)); |
| ASSERT_TRUE(tables_list.empty()); |
| |
| // While we prevent the table from being listed, clients should still be able |
| // to view it if it's explicitly asked for. |
| const string& kTableName = TxnStatusTablet::kTxnStatusTableName; |
| bool exists = false; |
| ASSERT_OK(client->TableExists(kTableName, &exists)); |
| ASSERT_TRUE(exists); |
| |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(kTableName, &table)); |
| ASSERT_NE(nullptr, table); |
| } |
| |
| // Test that only the service- or super-user can create or alter the |
| // transaction status table. |
| TEST_F(TxnStatusTableITest, TestProtectCreateAndAlter) { |
| auto service_client = client_sp(); |
| // We're both a super user and a service user since the ACLs default to "*". |
| // We should thus have access to the table. |
| ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, service_client.get())); |
| ASSERT_OK(TxnSystemClient::AddTxnStatusTableRangeWithClient(100, 200, service_client.get())); |
| const string& kTableName = TxnStatusTablet::kTxnStatusTableName; |
| ASSERT_OK(service_client->DeleteTable(kTableName)); |
| |
| // The service user should be able to create and alter the transaction status |
| // table. |
| NO_FATALS(SetSuperuserAndUser("nobody", "nobody")); |
| ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, service_client.get())); |
| ASSERT_OK(TxnSystemClient::AddTxnStatusTableRangeWithClient(100, 200, service_client.get())); |
| |
| // The service user doesn't have access to the delete table endpoint. |
| Status s = service_client->DeleteTable(kTableName); |
| ASSERT_TRUE(s.IsRemoteError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Not authorized"); |
| |
| // The super user should be able to create, alter, and delete the transaction |
| // status table. |
| NO_FATALS(SetSuperuserAndUser("bob", "nobody")); |
| shared_ptr<KuduClient> bob_client; |
| ASSERT_OK(CreateClientAs("bob", &bob_client)); |
| ASSERT_OK(bob_client->DeleteTable(kTableName)); |
| ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, bob_client.get())); |
| ASSERT_OK(TxnSystemClient::AddTxnStatusTableRangeWithClient(100, 200, bob_client.get())); |
| ASSERT_OK(bob_client->DeleteTable(kTableName)); |
| |
| // Regular users shouldn't be able to do anything. |
| NO_FATALS(SetSuperuserAndUser("nobody", "bob")); |
| s = TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, bob_client.get()); |
| ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); |
| ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, service_client.get())); |
| s = TxnSystemClient::AddTxnStatusTableRangeWithClient(100, 200, bob_client.get()); |
| ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); |
| s = service_client->DeleteTable(kTableName); |
| ASSERT_TRUE(s.IsRemoteError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Not authorized"); |
| } |
| |
| // Test that accessing the transaction system table only succeeds for service |
| // or super users. |
| TEST_F(TxnStatusTableITest, TestProtectAccess) { |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| auto client_sp = this->client_sp(); |
| const string& kTableName = TxnStatusTablet::kTxnStatusTableName; |
| #define STORE_AND_PREPEND(s, msg) do { \ |
| Status _s = (s); \ |
| results.emplace_back(_s.CloneAndPrepend(msg)); \ |
| } while (0) |
| const auto attempt_accesses_and_delete = [&] (KuduClient* client) { |
| // Go through various table access methods on the transaction status table. |
| // Collect the results so we can verify whether we were authorized |
| // properly. |
| vector<Status> results; |
| bool unused; |
| STORE_AND_PREPEND(client->TableExists(kTableName, &unused), "failed to check existence"); |
| shared_ptr<KuduTable> unused_table; |
| STORE_AND_PREPEND(client->OpenTable(kTableName, &unused_table), "failed to open table"); |
| client::KuduTableStatistics* unused_stats = nullptr; |
| STORE_AND_PREPEND(client->GetTableStatistics(kTableName, &unused_stats), "failed to get stats"); |
| unique_ptr<client::KuduTableStatistics> unused_unique_stats(unused_stats); |
| STORE_AND_PREPEND(client->DeleteTable(kTableName), "failed to delete table"); |
| return results; |
| }; |
| // We're both a super user and a service user since the ACLs default to "*". |
| // We should thus have access to the table. |
| auto results = attempt_accesses_and_delete(client_sp.get()); |
| for (const auto& r : results) { |
| ASSERT_OK(r); |
| } |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| |
| // Even though we're the service user, we shouldn't be able to access |
| // anything because most RPCs require us to be a super-user or user. |
| NO_FATALS(SetSuperuserAndUser("nobody", "nobody")); |
| results = attempt_accesses_and_delete(client_sp.get()); |
| for (const auto& r : results) { |
| // NOTE: we don't check the type because authorization errors may surface |
| // as RemoteErrors or NotAuthorized errors. |
| ASSERT_FALSE(r.ok()); |
| ASSERT_STR_CONTAINS(r.ToString(), "Not authorized"); |
| } |
| // As a sanity check, our last delete have failed, and we should be unable to |
| // create another transaction status table. |
| Status s = txn_sys_client_->CreateTxnStatusTable(100); |
| ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString(); |
| |
| // If we're a super user but not a service user, we should have access to the |
| // table. |
| NO_FATALS(SetSuperuserAndUser("bob", "nobody")); |
| // Create a client as 'bob', who is not the service user. |
| shared_ptr<KuduClient> bob_client; |
| ASSERT_OK(CreateClientAs("bob", &bob_client)); |
| results = attempt_accesses_and_delete(bob_client.get()); |
| for (const auto& r : results) { |
| ASSERT_OK(r); |
| } |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| |
| // If we're a regular user and nothing more, we shouldn't be able to access |
| // the table. |
| NO_FATALS(SetSuperuserAndUser("nobody", "bob")); |
| results = attempt_accesses_and_delete(bob_client.get()); |
| for (const auto& r : results) { |
| ASSERT_FALSE(r.ok()); |
| ASSERT_STR_CONTAINS(r.ToString(), "Not authorized"); |
| } |
| s = txn_sys_client_->CreateTxnStatusTable(100); |
| ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString(); |
| } |
| |
| // Test that DDL for the transaction status table results in the creation of |
| // transaction status tablets. |
| TEST_F(TxnStatusTableITest, TestCreateTxnStatusTablePartitions) { |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| Status s = txn_sys_client_->CreateTxnStatusTable(100); |
| ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString(); |
| NO_FATALS(CheckTableTypes({ { TableTypePB::TXN_STATUS_TABLE, 1 } })); |
| |
| // Now add more partitions and try again. |
| ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200)); |
| s = txn_sys_client_->AddTxnStatusTableRange(100, 200); |
| ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString(); |
| NO_FATALS(CheckTableTypes({ { TableTypePB::TXN_STATUS_TABLE, 2 } })); |
| |
| // Ensure we still create transaction status tablets even after the master is |
| // restarted. |
| cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY); |
| ASSERT_OK(cluster_->Start()); |
| ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(200, 300)); |
| NO_FATALS(CheckTableTypes({ { TableTypePB::TXN_STATUS_TABLE, 3 } })); |
| } |
| |
| // Test that tablet servers can host both transaction status tablets and |
| // regular tablets. |
| TEST_F(TxnStatusTableITest, TestTxnStatusTableColocatedWithTables) { |
| TestWorkload w(cluster_.get()); |
| w.set_num_replicas(1); |
| w.Setup(); |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| NO_FATALS(CheckTableTypes({ |
| { TableTypePB::TXN_STATUS_TABLE, 1 }, |
| { TableTypePB::DEFAULT_TABLE, 1 } |
| })); |
| } |
| |
| TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) { |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); |
| uint32_t txn_keepalive_ms; |
| ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &txn_keepalive_ms)); |
| ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms); |
| ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser)); |
| |
| // If we write out of range, we should see an error. |
| { |
| int64_t highest_seen_txn_id = -1; |
| uint32_t txn_keepalive_ms = 0; |
| auto s = txn_sys_client_->BeginTransaction( |
| 100, kUser, &txn_keepalive_ms, &highest_seen_txn_id); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| // The 'highest_seen_txn_id' should be left untouched. |
| ASSERT_EQ(-1, highest_seen_txn_id); |
| // txn_keepalive_ms isn't assigned in case of non-OK status. |
| ASSERT_EQ(0, txn_keepalive_ms); |
| ASSERT_NE(0, FLAGS_transaction_keepalive_interval_ms); |
| } |
| { |
| auto s = txn_sys_client_->BeginCommitTransaction(100, kUser); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| } |
| { |
| auto s = txn_sys_client_->AbortTransaction(100, kUser); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| } |
| |
| // Once we add a new range, we should be able to leverage it. |
| ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200)); |
| int64_t highest_seen_txn_id = -1; |
| ASSERT_OK(txn_sys_client_->BeginTransaction( |
| 100, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id)); |
| ASSERT_EQ(100, highest_seen_txn_id); |
| ASSERT_OK(txn_sys_client_->BeginCommitTransaction(100, kUser)); |
| ASSERT_OK(txn_sys_client_->AbortTransaction(100, kUser)); |
| } |
| |
| TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) { |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); |
| |
| cluster_->mini_tablet_server(0)->Shutdown(); |
| |
| // When the only server is down, the system client should keep trying until |
| // it times out. |
| { |
| int64_t highest_seen_txn_id = -1; |
| auto s = txn_sys_client_->BeginTransaction( |
| 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id, |
| MonoDelta::FromMilliseconds(100)); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| // The 'highest_seen_txn_id' should be left untouched. |
| ASSERT_EQ(-1, highest_seen_txn_id); |
| } |
| |
| // Now try with a longer timeout and ensure that if the server comes back up, |
| // the system client will succeed. |
| thread t([&] { |
| // Wait a bit to give some time for the system client to make its request |
| // and retry some. |
| SleepFor(MonoDelta::FromMilliseconds(500)); |
| ASSERT_OK(cluster_->mini_tablet_server(0)->Restart()); |
| }); |
| SCOPED_CLEANUP({ |
| t.join(); |
| }); |
| |
| int64_t highest_seen_txn_id = -1; |
| ASSERT_OK(txn_sys_client_->BeginTransaction( |
| 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id, |
| MonoDelta::FromSeconds(3))); |
| ASSERT_EQ(highest_seen_txn_id, 1); |
| } |
| |
| TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) { |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); |
| int64_t highest_seen_txn_id = -1; |
| uint32_t txn_keepalive_ms; |
| ASSERT_OK(txn_sys_client_->BeginTransaction( |
| 1, kUser, &txn_keepalive_ms, &highest_seen_txn_id)); |
| ASSERT_EQ(1, highest_seen_txn_id); |
| ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms); |
| |
| // Trying to start another transaction with a used ID should yield an error. |
| { |
| int64_t highest_seen_txn_id = -1; |
| auto s = txn_sys_client_->BeginTransaction( |
| 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_EQ(1, highest_seen_txn_id); |
| ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID"); |
| } |
| |
| // The same should be true with a different user. |
| { |
| int64_t highest_seen_txn_id = -1; |
| auto s = txn_sys_client_->BeginTransaction( |
| 1, "stranger", nullptr /* txn_keepalive_ms */, &highest_seen_txn_id); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_EQ(1, highest_seen_txn_id); |
| ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID"); |
| } |
| } |
| |
| TEST_F(TxnStatusTableITest, TestSystemClientRegisterParticipantErrors) { |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); |
| Status s = txn_sys_client_->RegisterParticipant(1, "participant", kUser); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_STR_MATCHES(s.ToString(), "transaction ID.*not found, current highest txn ID:.*"); |
| |
| ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser)); |
| ASSERT_OK(txn_sys_client_->RegisterParticipant(1, ParticipantId(1), kUser)); |
| |
| // If a user other than the transaction owner is passed as an argument, the |
| // request should be rejected. |
| s = txn_sys_client_->RegisterParticipant(1, ParticipantId(2), "stranger"); |
| ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); |
| } |
| |
| TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) { |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); |
| |
| ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser)); |
| ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser)); |
| |
| // Calling BeginCommitTransaction() on already committing transaction is OK. |
| ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser)); |
| // It's completely legal to abort a transaction that has entered the commit |
| // phase but hasn't finalized yet. |
| ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser)); |
| // Aborting already aborted transaction is fine (aborting is idempotent). |
| ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser)); |
| |
| // Even if the transaction is aborted, an attempt to start another transaction |
| // with already used ID should yield an error. |
| { |
| int64_t highest_seen_txn_id = -1; |
| auto s = txn_sys_client_->BeginTransaction( |
| 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_EQ(1, highest_seen_txn_id); |
| ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID"); |
| } |
| |
| // An attempt to start committing a non-existent transaction should report |
| // an error. |
| { |
| auto s = txn_sys_client_->BeginCommitTransaction(2, kUser); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not found"); |
| } |
| |
| // An attempt to abort a non-existent transaction should report an error. |
| { |
| auto s = txn_sys_client_->AbortTransaction(2, kUser); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not found"); |
| } |
| |
| // An attempt to commit already aborted transaction should fail with an error. |
| { |
| ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser)); |
| ASSERT_OK(txn_sys_client_->AbortTransaction(2, kUser)); |
| auto s = txn_sys_client_->BeginCommitTransaction(2, kUser); |
| ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 is not open"); |
| } |
| |
| // Commit/Abort transaction should be called as of the transaction's owner. |
| { |
| ASSERT_OK(txn_sys_client_->BeginTransaction(3, kUser)); |
| auto s = txn_sys_client_->BeginCommitTransaction(3, "stranger"); |
| ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 3 not owned by stranger"); |
| s = txn_sys_client_->AbortTransaction(3, "stranger"); |
| ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 3 not owned by stranger"); |
| } |
| } |
| |
| TEST_F(TxnStatusTableITest, GetTransactionStatus) { |
| const auto verify_state = [&](TxnStatePB state) { |
| TxnStatusEntryPB txn_status; |
| ASSERT_OK(txn_sys_client_->GetTransactionStatus(1, kUser, &txn_status)); |
| ASSERT_TRUE(txn_status.has_user()); |
| ASSERT_STREQ(kUser, txn_status.user().c_str()); |
| ASSERT_TRUE(txn_status.has_state()); |
| ASSERT_EQ(state, txn_status.state()); |
| }; |
| |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100)); |
| ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); |
| |
| ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser)); |
| NO_FATALS(verify_state(TxnStatePB::OPEN)); |
| |
| ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser)); |
| NO_FATALS(verify_state(TxnStatePB::COMMIT_IN_PROGRESS)); |
| |
| ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser)); |
| NO_FATALS(verify_state(TxnStatePB::ABORTED)); |
| |
| { |
| auto s = txn_sys_client_->BeginCommitTransaction(1, kUser); |
| ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); |
| NO_FATALS(verify_state(TxnStatePB::ABORTED)); |
| } |
| |
| // In the negative scenarios below, check for the expected status code |
| // and make sure nothing is set in the output argument. |
| { |
| TxnStatusEntryPB empty; |
| auto s = txn_sys_client_->GetTransactionStatus(1, "interloper", &empty); |
| ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); |
| ASSERT_FALSE(empty.has_user()); |
| ASSERT_FALSE(empty.has_state()); |
| } |
| |
| { |
| TxnStatusEntryPB empty; |
| auto s = txn_sys_client_->GetTransactionStatus(2, kUser, &empty); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_FALSE(empty.has_user()); |
| ASSERT_FALSE(empty.has_state()); |
| } |
| |
| { |
| TxnStatusEntryPB empty; |
| auto s = txn_sys_client_->GetTransactionStatus(2, "", &empty); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_FALSE(empty.has_user()); |
| ASSERT_FALSE(empty.has_state()); |
| } |
| } |
| |
| // Test that a transaction system client can make concurrent calls to multiple |
| // transaction status tablets. |
| TEST_F(TxnStatusTableITest, TestSystemClientConcurrentCalls) { |
| int kPartitionWidth = 10; |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(kPartitionWidth)); |
| ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); |
| ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(kPartitionWidth, 2 * kPartitionWidth)); |
| vector<thread> threads; |
| Status statuses[2 * kPartitionWidth]; |
| for (int t = 0; t < 2; t++) { |
| threads.emplace_back([&, t] { |
| // NOTE: we can "race" transaction IDs so they're not monotonically |
| // increasing, as long as within each range partition they are |
| // monotonically increasing. |
| for (int i = 0; i < kPartitionWidth; i++) { |
| int64_t txn_id = t * kPartitionWidth + i; |
| Status s = txn_sys_client_->BeginTransaction(txn_id, kUser).AndThen([&] { |
| return txn_sys_client_->RegisterParticipant(txn_id, ParticipantId(1), kUser); |
| }); |
| if (PREDICT_FALSE(!s.ok())) { |
| statuses[txn_id] = s; |
| } |
| } |
| }); |
| } |
| std::for_each(threads.begin(), threads.end(), [&] (thread& t) { t.join(); }); |
| for (const auto& s : statuses) { |
| EXPECT_OK(s); |
| } |
| } |
| |
| class MultiServerTxnStatusTableITest : public TxnStatusTableITest { |
| public: |
| void SetUp() override { |
| KuduTest::SetUp(); |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = 4; |
| cluster_.reset(new InternalMiniCluster(env_, std::move(opts))); |
| ASSERT_OK(cluster_->Start()); |
| vector<string> master_addrs; |
| for (const auto& hp : cluster_->master_rpc_addrs()) { |
| master_addrs.emplace_back(hp.ToString()); |
| } |
| ASSERT_OK(TxnSystemClient::Create(master_addrs, &txn_sys_client_)); |
| |
| // Create the initial transaction status table partitions and start an |
| // initial transaction. |
| ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100, 3)); |
| ASSERT_OK(txn_sys_client_->OpenTxnStatusTable()); |
| ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser)); |
| } |
| |
| // Returns the tablet ID found in the cluster, expecting a single tablet. |
| string GetTabletId() const { |
| string tablet_id; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| const auto& tablets = cluster_->mini_tablet_server(i)->ListTablets(); |
| if (!tablets.empty()) { |
| if (!tablet_id.empty()) { |
| DCHECK_EQ(tablet_id, tablets[0]); |
| continue; |
| } |
| tablet_id = tablets[0]; |
| } |
| } |
| DCHECK(!tablet_id.empty()); |
| return tablet_id; |
| } |
| |
| // Returns the UUID of the given tablet's leader replica. |
| Status FindLeaderId(const string& tablet_id, string* uuid) { |
| TabletServerMap ts_map; |
| RETURN_NOT_OK(CreateTabletServerMap(cluster_->master_proxy(), cluster_->messenger(), &ts_map)); |
| ValueDeleter deleter(&ts_map); |
| TServerDetails* leader_ts; |
| RETURN_NOT_OK(FindTabletLeader(ts_map, tablet_id, MonoDelta::FromSeconds(10), &leader_ts)); |
| *uuid = leader_ts->uuid(); |
| return Status::OK(); |
| } |
| }; |
| |
| TEST_F(MultiServerTxnStatusTableITest, TestSystemClientDeletedTablet) { |
| // Find the leader and force it to step down. The system client should be |
| // able to find the new leader. |
| const string& tablet_id = GetTabletId(); |
| ASSERT_FALSE(tablet_id.empty()); |
| string orig_leader_uuid; |
| ASSERT_OK(FindLeaderId(tablet_id, &orig_leader_uuid)); |
| ASSERT_FALSE(orig_leader_uuid.empty()); |
| ASSERT_OK(cluster_->mini_tablet_server_by_uuid( |
| orig_leader_uuid)->server()->tablet_manager()->DeleteTablet( |
| tablet_id, tablet::TABLET_DATA_TOMBSTONED, /*cas_config_index*/boost::none)); |
| |
| // The client should automatically try to get a new location for the tablet. |
| ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser)); |
| } |
| |
| TEST_F(MultiServerTxnStatusTableITest, TestSystemClientLeadershipChange) { |
| // Find the leader and force it to step down. The system client should be |
| // able to find the new leader. |
| const string& tablet_id = GetTabletId(); |
| ASSERT_FALSE(tablet_id.empty()); |
| string orig_leader_uuid; |
| ASSERT_OK(FindLeaderId(tablet_id, &orig_leader_uuid)); |
| ASSERT_FALSE(orig_leader_uuid.empty()); |
| cluster_->mini_tablet_server_by_uuid( |
| orig_leader_uuid)->server()->mutable_quiescing()->store(true); |
| ASSERT_EVENTUALLY([&] { |
| string new_leader_uuid; |
| ASSERT_OK(FindLeaderId(tablet_id, &new_leader_uuid)); |
| ASSERT_NE(new_leader_uuid, orig_leader_uuid); |
| }); |
| ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser)); |
| } |
| |
| TEST_F(MultiServerTxnStatusTableITest, TestSystemClientCrashedNodes) { |
| // Find the leader and shut it down. The system client should be able to |
| // find the new leader. |
| const auto& tablet_id = GetTabletId(); |
| ASSERT_FALSE(tablet_id.empty()); |
| string leader_uuid; |
| ASSERT_OK(FindLeaderId(tablet_id, &leader_uuid)); |
| ASSERT_FALSE(leader_uuid.empty()); |
| FLAGS_leader_failure_max_missed_heartbeat_periods = 1; |
| cluster_->mini_tablet_server_by_uuid(leader_uuid)->Shutdown(); |
| // We have to wait for a leader to be elected. Until that happens, the system |
| // client may try to start transactions on followers, and in doing so use up |
| // transaction IDs. Have the system client try again with a higher |
| // transaction ID until a leader is elected. |
| int txn_id = 2; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(txn_sys_client_->BeginTransaction(++txn_id, kUser)); |
| }); |
| } |
| |
| } // namespace itest |
| } // namespace kudu |