| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <limits> |
| #include <map> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <thread> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/quorum_util.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/master/catalog_manager.h" |
| #include "kudu/master/master.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master.proxy.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/master/table_metrics.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/rpc/user_credentials.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tablet/txn_coordinator.h" |
| #include "kudu/transactions/txn_status_manager.h" |
| #include "kudu/transactions/txn_status_tablet.h" |
| #include "kudu/tserver/heartbeater.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/tablet_server_options.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/tserver/tserver_admin.proxy.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/jsonwriter.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_bool(allow_unsafe_replication_factor); |
| DECLARE_bool(catalog_manager_evict_excess_replicas); |
| DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader); |
| DECLARE_bool(enable_leader_failure_detection); |
| DECLARE_bool(enable_txn_system_client_init); |
| DECLARE_bool(raft_prepare_replacement_before_eviction); |
| DECLARE_double(leader_failure_max_missed_heartbeat_periods); |
| DECLARE_int32(heartbeat_interval_ms); |
| DECLARE_int32(metrics_retirement_age_ms); |
| DECLARE_int32(raft_heartbeat_interval_ms); |
| DECLARE_int32(tablet_open_inject_latency_ms); |
| DECLARE_string(superuser_acl); |
| DECLARE_string(trusted_user_acl); |
| DEFINE_int32(num_election_test_loops, 3, |
| "Number of random EmulateElectionForTests() loops to execute in " |
| "TestReportNewLeaderOnLeaderChange"); |
| |
| using kudu::client::KuduClient; |
| using kudu::client::KuduInsert; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSession; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableCreator; |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::cluster::InternalMiniClusterOptions; |
| using kudu::consensus::ConsensusStatePB; |
| using kudu::consensus::GetConsensusRole; |
| using kudu::consensus::HealthReportPB; |
| using kudu::consensus::INCLUDE_HEALTH_REPORT; |
| using kudu::consensus::RaftConfigPB; |
| using kudu::consensus::RaftConsensus; |
| using kudu::consensus::RaftPeerPB; |
| using kudu::itest::SimpleIntKeyKuduSchema; |
| using kudu::itest::StartTabletCopy; |
| using kudu::itest::TabletServerMap; |
| using kudu::itest::TServerDetails; |
| using kudu::KuduPartialRow; |
| using kudu::master::CatalogManager; |
| using kudu::master::GetTableLocationsResponsePB; |
| using kudu::master::GetTableLocationsRequestPB; |
| using kudu::master::Master; |
| using kudu::master::MasterServiceProxy; |
| using kudu::master::ReportedTabletPB; |
| using kudu::master::TableInfo; |
| using kudu::master::TabletReportPB; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::rpc::Messenger; |
| using kudu::rpc::MessengerBuilder; |
| using kudu::rpc::RpcController; |
| using kudu::tablet::TabletReplica; |
| using kudu::tablet::ParticipantIdsByTxnId; |
| using kudu::tablet::TxnCoordinator; |
| using kudu::transactions::TxnStatusManager; |
| using kudu::transactions::TxnStatusTablet; |
| using kudu::tserver::MiniTabletServer; |
| using kudu::ClusterVerifier; |
| using std::map; |
| using std::shared_ptr; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tserver { |
| |
| static const char* const kTableName = "test-table"; |
| |
| class TsTabletManagerITest : public KuduTest { |
| public: |
| TsTabletManagerITest() |
| : schema_(SimpleIntKeyKuduSchema()) { |
| } |
| void SetUp() override { |
| KuduTest::SetUp(); |
| |
| MessengerBuilder bld("client"); |
| ASSERT_OK(bld.Build(&client_messenger_)); |
| } |
| |
| void StartCluster(InternalMiniClusterOptions opts) { |
| cluster_.reset(new InternalMiniCluster(env_, std::move(opts))); |
| ASSERT_OK(cluster_->Start()); |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client_)); |
| } |
| |
| protected: |
| void DisableHeartbeatingToMaster(); |
| |
| // Populate the 'replicas' container with corresponding objects representing |
| // tablet replicas running at tablet servers in the test cluster. It's assumed |
| // there is at least one tablet replica per tablet server. Also, this utility |
| // method awaits up to the specified timeout for the consensus to be running |
| // before adding an element into the output container. |
| Status PrepareTabletReplicas(MonoDelta timeout, |
| vector<scoped_refptr<TabletReplica>>* replicas); |
| |
| // Generate incremental tablet reports using test-specific method |
| // GenerateIncrementalTabletReportsForTests() of the specified heartbeater. |
| void GetIncrementalTabletReports(Heartbeater* heartbeater, |
| vector<TabletReportPB>* reports); |
| |
| const KuduSchema schema_; |
| |
| unique_ptr<InternalMiniCluster> cluster_; |
| client::sp::shared_ptr<KuduClient> client_; |
| std::shared_ptr<Messenger> client_messenger_; |
| }; |
| |
| void TsTabletManagerITest::DisableHeartbeatingToMaster() { |
| for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) { |
| MiniTabletServer* ts = cluster_->mini_tablet_server(i); |
| ts->FailHeartbeats(); |
| } |
| } |
| |
| Status TsTabletManagerITest::PrepareTabletReplicas( |
| MonoDelta timeout, vector<scoped_refptr<TabletReplica>>* replicas) { |
| const MonoTime deadline = MonoTime::Now() + timeout; |
| for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) { |
| MiniTabletServer* ts = cluster_->mini_tablet_server(i); |
| vector<scoped_refptr<TabletReplica>> ts_replicas; |
| // The replicas may not have been created yet, so loop until we see them. |
| while (MonoTime::Now() < deadline) { |
| ts->server()->tablet_manager()->GetTabletReplicas(&ts_replicas); |
| if (!ts_replicas.empty()) { |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| if (ts_replicas.empty()) { |
| return Status::TimedOut("waiting for tablet replicas register with ts manager"); |
| } |
| RETURN_NOT_OK(ts_replicas.front()->WaitUntilConsensusRunning( |
| deadline - MonoTime::Now())); |
| replicas->insert(replicas->end(), ts_replicas.begin(), ts_replicas.end()); |
| } |
| return Status::OK(); |
| } |
| |
| void TsTabletManagerITest::GetIncrementalTabletReports( |
| Heartbeater* heartbeater, vector<TabletReportPB>* reports) { |
| vector<TabletReportPB> r; |
| // The MarkDirty() callback is on an async thread so it might take the |
| // follower a few milliseconds to execute it. Wait for that to happen. |
| ASSERT_EVENTUALLY([&] { |
| r = heartbeater->GenerateIncrementalTabletReportsForTests(); |
| ASSERT_EQ(1, r.size()); |
| ASSERT_FALSE(r.front().updated_tablets().empty()); |
| }); |
| reports->swap(r); |
| } |
| |
| class FailedTabletsAreReplacedITest : |
| public TsTabletManagerITest, |
| public ::testing::WithParamInterface<bool> { |
| }; |
| // Test that when a tablet replica is marked as failed, it will eventually be |
| // evicted and replaced. |
| TEST_P(FailedTabletsAreReplacedITest, OneReplica) { |
| const bool is_3_4_3_mode = GetParam(); |
| FLAGS_raft_prepare_replacement_before_eviction = is_3_4_3_mode; |
| const auto kNumReplicas = 3; |
| const auto kNumTabletServers = kNumReplicas + (is_3_4_3_mode ? 1 : 0); |
| |
| { |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kNumTabletServers; |
| NO_FATALS(StartCluster(std::move(opts))); |
| } |
| TestWorkload work(cluster_.get()); |
| work.set_num_replicas(kNumReplicas); |
| work.Setup(); |
| work.Start(); |
| |
| // Insert data until the tablet becomes visible to the server. |
| string tablet_id; |
| ASSERT_EVENTUALLY([&] { |
| auto idx = rand() % kNumTabletServers; |
| vector<string> tablet_ids = cluster_->mini_tablet_server(idx)->ListTablets(); |
| ASSERT_EQ(1, tablet_ids.size()); |
| tablet_id = tablet_ids[0]; |
| }); |
| work.StopAndJoin(); |
| |
| // Wait until all the replicas are running before failing one arbitrarily. |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| |
| { |
| // Inject an error into one of replicas. Shutting it down will leave it in |
| // the FAILED state. |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_EVENTUALLY([&] { |
| auto idx = rand() % kNumTabletServers; |
| MiniTabletServer* ts = cluster_->mini_tablet_server(idx); |
| ASSERT_OK(ts->server()->tablet_manager()->GetTabletReplica(tablet_id, &replica)); |
| }); |
| replica->SetError(Status::IOError("INJECTED ERROR: tablet failed")); |
| replica->Shutdown(); |
| ASSERT_EQ(tablet::FAILED, replica->state()); |
| } |
| |
| // Ensure the tablet eventually is replicated. |
| NO_FATALS(v.CheckCluster()); |
| } |
| INSTANTIATE_TEST_SUITE_P(, |
| FailedTabletsAreReplacedITest, |
| ::testing::Bool()); |
| |
| class LeadershipChangeReportingTest : public TsTabletManagerITest { |
| public: |
| const int kNumReplicas = 2; |
| void SetUp() override { |
| NO_FATALS(TsTabletManagerITest::SetUp()); |
| |
| // For tests that heartbeat, set a lower interval to speed things up. |
| FLAGS_heartbeat_interval_ms = 100; |
| |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kNumReplicas; |
| NO_FATALS(StartCluster(std::move(opts))); |
| |
| // We need to control elections precisely for this test since we're using |
| // EmulateElectionForTests() with a distributed consensus configuration. |
| FLAGS_enable_leader_failure_detection = false; |
| FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false; |
| |
| // Allow creating table with even replication factor. |
| FLAGS_allow_unsafe_replication_factor = true; |
| |
| // Run a few more iters in slow-test mode. |
| OverrideFlagForSlowTests("num_election_test_loops", "10"); |
| |
| // Build a TServerDetails map so we can check for convergence. |
| const auto& addr = cluster_->mini_master()->bound_rpc_addr(); |
| master_proxy_.reset(new MasterServiceProxy(client_messenger_, addr, addr.host())); |
| } |
| |
| // Creates 'num_hashes' tablets with two replicas each. |
| Status CreateTable(int num_hashes) { |
| // Create the table. |
| client::sp::shared_ptr<KuduTable> table; |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| if (num_hashes > 1) { |
| RETURN_NOT_OK(table_creator->table_name(kTableName) |
| .schema(&schema_) |
| .set_range_partition_columns({ "key" }) |
| .num_replicas(kNumReplicas) |
| .add_hash_partitions({ "key" }, num_hashes) |
| .Create()); |
| } else { |
| RETURN_NOT_OK(table_creator->table_name(kTableName) |
| .schema(&schema_) |
| .set_range_partition_columns({ "key" }) |
| .num_replicas(kNumReplicas) |
| .Create()); |
| } |
| RETURN_NOT_OK(client_->OpenTable(kTableName, &table)); |
| |
| RETURN_NOT_OK(CreateTabletServerMap(master_proxy_, client_messenger_, &ts_map_)); |
| |
| // Collect the TabletReplicas so we get direct access to RaftConsensus. |
| RETURN_NOT_OK(PrepareTabletReplicas(MonoDelta::FromSeconds(60), &tablet_replicas_)); |
| CHECK_EQ(kNumReplicas * num_hashes, tablet_replicas_.size()); |
| return Status::OK(); |
| } |
| void TearDown() override { |
| ValueDeleter deleter(&ts_map_); |
| NO_FATALS(TsTabletManagerITest::TearDown()); |
| } |
| |
| Status TriggerElection(int min_term = 0, int* new_leader_idx = nullptr) { |
| int leader_idx = rand() % tablet_replicas_.size(); |
| LOG(INFO) << "Electing peer " << leader_idx << "..."; |
| RaftConsensus* con = CHECK_NOTNULL(tablet_replicas_[leader_idx]->consensus()); |
| RETURN_NOT_OK(con->EmulateElectionForTests()); |
| LOG(INFO) << "Waiting for servers to agree..."; |
| RETURN_NOT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5), ts_map_, |
| tablet_replicas_[leader_idx]->tablet_id(), min_term)); |
| if (new_leader_idx) { |
| *new_leader_idx = leader_idx; |
| } |
| return Status::OK(); |
| } |
| |
| protected: |
| shared_ptr<MasterServiceProxy> master_proxy_; |
| vector<scoped_refptr<TabletReplica>> tablet_replicas_; |
| itest::TabletServerMap ts_map_; |
| }; |
| |
| // Regression test for KUDU-2842: concurrent calls to GetTableLocations() |
| // shouldn't lead to data races with the changes in reported state. |
| TEST_F(LeadershipChangeReportingTest, TestConcurrentGetTableLocations) { |
| // KUDU-2842 requires there to be multiple tablets in a given report, so |
| // create multiple tablets. |
| int kNumTablets = 2; |
| ASSERT_OK(CreateTable(kNumTablets)); |
| CountDownLatch latch(1); |
| thread t([&] { |
| master::GetTableLocationsRequestPB req; |
| req.mutable_table()->set_table_name(kTableName); |
| req.set_intern_ts_infos_in_response(true); |
| while (!latch.WaitFor(MonoDelta::FromMilliseconds(10))) { |
| master::GetTableLocationsResponsePB resp; |
| rpc::RpcController rpc; |
| // Note: we only really care about data races, rather than the responses. |
| ignore_result(master_proxy_->GetTableLocations(req, &resp, &rpc)); |
| } |
| }); |
| SCOPED_CLEANUP({ |
| latch.CountDown(); |
| t.join(); |
| }); |
| for (int i = 0; i < FLAGS_num_election_test_loops; i++) { |
| for (int t = 0; t < kNumTablets; t++) { |
| // Note: we only really care about data races, rather than the success of |
| // all the elections. |
| ignore_result(TriggerElection()); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_heartbeat_interval_ms)); |
| } |
| } |
| |
| // Regression test for KUDU-2952, where a leadership change racing with the |
| // creation of a tablet report could lead to a non-leader report with stats. |
| TEST_F(LeadershipChangeReportingTest, TestReportStatsDuringLeadershipChange) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| ASSERT_OK(CreateTable(1)); |
| // Stop heartbeating we don't race against the Master. |
| DisableHeartbeatingToMaster(); |
| |
| // Create a thread that just keeps triggering elections. |
| CountDownLatch latch(1); |
| thread t([&] { |
| while (!latch.WaitFor(MonoDelta::FromMilliseconds(1))) { |
| ignore_result(TriggerElection()); |
| } |
| }); |
| SCOPED_CLEANUP({ |
| latch.CountDown(1); |
| t.join(); |
| }); |
| |
| // Generate a bunch of reports and keep checking that only the leaders have |
| // stats. |
| const auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(10); |
| while (MonoTime::Now() < deadline) { |
| for (int t = 0; t < cluster_->num_tablet_servers(); t++) { |
| ReportedTabletPB reported_tablet; |
| MiniTabletServer* mts = cluster_->mini_tablet_server(0); |
| mts->server()->tablet_manager()->CreateReportedTabletPB(tablet_replicas_[0], |
| &reported_tablet); |
| // Only reports from leaders should have stats. |
| const string& reported_leader_uuid = reported_tablet.consensus_state().leader_uuid(); |
| const string& mts_uuid = mts->uuid(); |
| if (reported_tablet.has_stats()) { |
| ASSERT_EQ(reported_leader_uuid, mts_uuid); |
| } |
| } |
| } |
| } |
| |
| // Test that when the leader changes, the tablet manager gets notified and |
| // includes that information in the next tablet report. |
| TEST_F(LeadershipChangeReportingTest, TestUpdatedConsensusState) { |
| ASSERT_OK(CreateTable(1)); |
| // Stop heartbeating we don't race against the Master. |
| DisableHeartbeatingToMaster(); |
| |
| // Loop and cause elections and term changes from different servers. |
| // TSTabletManager should acknowledge the role changes via tablet reports. |
| for (int i = 0; i < FLAGS_num_election_test_loops; i++) { |
| SCOPED_TRACE(Substitute("Iter: $0", i)); |
| int new_leader_idx; |
| ASSERT_OK(TriggerElection(i + 1, &new_leader_idx)); |
| |
| // Now check that the tablet report reports the correct role for both servers. |
| for (int replica = 0; replica < kNumReplicas; replica++) { |
| vector<TabletReportPB> reports; |
| NO_FATALS(GetIncrementalTabletReports( |
| cluster_->mini_tablet_server(replica)->server()->heartbeater(), |
| &reports)); |
| |
| // Ensure that our tablet reports are consistent. |
| TabletReportPB& report = reports[0]; |
| ASSERT_EQ(1, report.updated_tablets_size()) |
| << "Wrong report size:\n" << pb_util::SecureDebugString(report); |
| const ReportedTabletPB& reported_tablet = report.updated_tablets(0); |
| ASSERT_TRUE(reported_tablet.has_consensus_state()); |
| |
| string uuid = tablet_replicas_[replica]->permanent_uuid(); |
| RaftPeerPB::Role role = GetConsensusRole(uuid, reported_tablet.consensus_state()); |
| if (replica == new_leader_idx) { |
| ASSERT_EQ(RaftPeerPB::LEADER, role) |
| << "Tablet report: " << pb_util::SecureShortDebugString(report); |
| } else { |
| ASSERT_EQ(RaftPeerPB::FOLLOWER, role) |
| << "Tablet report: " << pb_util::SecureShortDebugString(report); |
| } |
| } |
| } |
| } |
| |
| // Test that the tablet manager generates reports on replica health status |
| // in accordance with observed changes on replica status, and that the tablet |
| // manager includes that information into the next tablet report. Specifically, |
| // verify that: |
| // 1. The leader replica provides the health status report in its consensus |
| // state, if requested. |
| // 2. The health report information matches the state of tablet replicas. |
| // 3. The tablet manager generates appropriate tablet reports with updated |
| // health information when replicas change their state. |
| TEST_F(TsTabletManagerITest, ReportOnReplicaHealthStatus) { |
| constexpr int kNumReplicas = 3; |
| const auto kTimeout = MonoDelta::FromSeconds(60); |
| |
| // This test is specific to the 3-4-3 replica management scheme. |
| FLAGS_raft_prepare_replacement_before_eviction = true; |
| FLAGS_catalog_manager_evict_excess_replicas = false; |
| { |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kNumReplicas; |
| NO_FATALS(StartCluster(std::move(opts))); |
| } |
| |
| // Create the table. |
| client::sp::shared_ptr<KuduTable> table; |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema_) |
| .set_range_partition_columns({}) // need just one tablet |
| .num_replicas(kNumReplicas) |
| .Create()); |
| ASSERT_OK(client_->OpenTable(kTableName, &table)); |
| |
| // Build a TServerDetails map so we can check for convergence. |
| const auto& addr = cluster_->mini_master()->bound_rpc_addr(); |
| shared_ptr<MasterServiceProxy> master_proxy( |
| new MasterServiceProxy(client_messenger_, addr, addr.host())); |
| |
| itest::TabletServerMap ts_map; |
| ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map)); |
| ValueDeleter deleter(&ts_map); |
| |
| // Collect the TabletReplicas so we get direct access to RaftConsensus. |
| vector<scoped_refptr<TabletReplica>> tablet_replicas; |
| ASSERT_OK(PrepareTabletReplicas(kTimeout, &tablet_replicas)); |
| ASSERT_EQ(kNumReplicas, tablet_replicas.size()); |
| |
| // Don't send heartbeats to master, otherwise it would be a race in |
| // acknowledging the heartbeats and generating new tablet reports. Clearing |
| // the 'dirty' flag on a tablet before the generated tablet report is |
| // introspected makes the test scenario very flaky. Also, this scenario does |
| // not assume that the catalog manager initiates the replacement of failed |
| // voter replicas. |
| DisableHeartbeatingToMaster(); |
| |
| // Generate health reports for every element of the 'tablet_replicas' |
| // container. Also, output the leader replica UUID from the consensus |
| // state into 'leader_replica_uuid'. |
| auto get_health_reports = [&](map<string, HealthReportPB>* reports, |
| string* leader_replica_uuid = nullptr) { |
| ConsensusStatePB cstate; |
| string leader_uuid; |
| for (const auto& replica : tablet_replicas) { |
| RaftConsensus* consensus = CHECK_NOTNULL(replica->consensus()); |
| ConsensusStatePB cs; |
| Status s = consensus->ConsensusState(&cs, INCLUDE_HEALTH_REPORT); |
| if (!s.ok()) { |
| ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); // Replica is shut down. |
| continue; |
| } |
| if (consensus->peer_uuid() == cs.leader_uuid()) { |
| // Only the leader replica has the up-to-date health report. |
| leader_uuid = cs.leader_uuid(); |
| cstate = std::move(cs); |
| break; |
| } |
| } |
| ASSERT_FALSE(leader_uuid.empty()); |
| ASSERT_TRUE(cstate.has_committed_config()); |
| const RaftConfigPB& config = cstate.committed_config(); |
| ASSERT_EQ(kNumReplicas, config.peers_size()); |
| if (reports) { |
| reports->clear(); |
| for (const auto& peer : config.peers()) { |
| ASSERT_TRUE(peer.has_health_report()); |
| reports->emplace(peer.permanent_uuid(), peer.health_report()); |
| } |
| } |
| if (leader_replica_uuid) { |
| *leader_replica_uuid = leader_uuid; |
| } |
| }; |
| |
| // Get the information on committed Raft configuration from tablet reports |
| // generated by the heartbeater of the server running the specified leader |
| // tablet replica. |
| auto get_committed_config_from_reports = [&](const string& leader_replica_uuid, |
| RaftConfigPB* config) { |
| TabletServer* leader_server = nullptr; |
| for (auto i = 0; i < kNumReplicas; ++i) { |
| MiniTabletServer* mts = cluster_->mini_tablet_server(i); |
| if (mts->uuid() == leader_replica_uuid) { |
| leader_server = mts->server(); |
| break; |
| } |
| } |
| ASSERT_NE(nullptr, leader_server); |
| |
| // TSTabletManager should acknowledge the status change via tablet reports. |
| Heartbeater* heartbeater = leader_server->heartbeater(); |
| ASSERT_NE(nullptr, heartbeater); |
| |
| vector<TabletReportPB> reports; |
| NO_FATALS(GetIncrementalTabletReports(heartbeater, &reports)); |
| ASSERT_EQ(1, reports.size()); |
| const TabletReportPB& report = reports[0]; |
| SCOPED_TRACE("Tablet report: " + pb_util::SecureDebugString(report)); |
| ASSERT_EQ(1, report.updated_tablets_size()); |
| const ReportedTabletPB& reported_tablet = report.updated_tablets(0); |
| ASSERT_TRUE(reported_tablet.has_consensus_state()); |
| const ConsensusStatePB& cstate = reported_tablet.consensus_state(); |
| ASSERT_EQ(RaftPeerPB::LEADER, GetConsensusRole(leader_replica_uuid, cstate)); |
| ASSERT_TRUE(cstate.has_committed_config()); |
| RaftConfigPB cfg = cstate.committed_config(); |
| config->Swap(&cfg); |
| }; |
| |
| // All replicas are up and running, so the leader replica should eventually |
| // report their health status as HEALTHY. |
| { |
| string leader_replica_uuid; |
| ASSERT_EVENTUALLY(([&] { |
| map<string, HealthReportPB> reports; |
| NO_FATALS(get_health_reports(&reports, &leader_replica_uuid)); |
| for (const auto& e : reports) { |
| SCOPED_TRACE("replica UUID: " + e.first); |
| ASSERT_EQ(HealthReportPB::HEALTHY, e.second.overall_health()); |
| } |
| })); |
| |
| // Other replicas are seen by the leader in UNKNOWN health state first. |
| // At this point of the test scenario, since the replicas went from the |
| // UNKNOWN to the HEALTHY state, an incremental tablet reports should |
| // reflect those health status changes. |
| RaftConfigPB config; |
| NO_FATALS(get_committed_config_from_reports(leader_replica_uuid, |
| &config)); |
| ASSERT_EQ(kNumReplicas, config.peers_size()); |
| for (const auto& p : config.peers()) { |
| ASSERT_TRUE(p.has_health_report()); |
| const HealthReportPB& report(p.health_report()); |
| ASSERT_EQ(HealthReportPB::HEALTHY, report.overall_health()); |
| } |
| } |
| |
| // Inject an error to the replica and make sure its status is eventually |
| // reported as FAILED. |
| string failed_replica_uuid; |
| { |
| auto replica = tablet_replicas.front(); |
| failed_replica_uuid = replica->consensus()->peer_uuid(); |
| replica->SetError(Status::IOError("INJECTED ERROR: tablet failed")); |
| replica->Shutdown(); |
| ASSERT_EQ(tablet::FAILED, replica->state()); |
| } |
| |
| ASSERT_EVENTUALLY(([&] { |
| map<string, HealthReportPB> reports; |
| NO_FATALS(get_health_reports(&reports)); |
| for (const auto& e : reports) { |
| const auto& replica_uuid = e.first; |
| SCOPED_TRACE("replica UUID: " + replica_uuid); |
| if (replica_uuid == failed_replica_uuid) { |
| ASSERT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, e.second.overall_health()); |
| } else { |
| ASSERT_EQ(HealthReportPB::HEALTHY, e.second.overall_health()); |
| } |
| } |
| })); |
| |
| // The scenario below assumes the leader replica does not change anymore. |
| FLAGS_enable_leader_failure_detection = false; |
| |
| { |
| string leader_replica_uuid; |
| NO_FATALS(get_health_reports(nullptr, &leader_replica_uuid)); |
| RaftConfigPB config; |
| NO_FATALS(get_committed_config_from_reports(leader_replica_uuid, |
| &config)); |
| for (const auto& peer : config.peers()) { |
| ASSERT_TRUE(peer.has_permanent_uuid()); |
| const auto& uuid = peer.permanent_uuid(); |
| ASSERT_TRUE(peer.has_health_report()); |
| const HealthReportPB& report(peer.health_report()); |
| if (uuid == failed_replica_uuid) { |
| EXPECT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, report.overall_health()); |
| } else { |
| EXPECT_EQ(HealthReportPB::HEALTHY, report.overall_health()); |
| } |
| } |
| } |
| } |
| |
| TEST_F(TsTabletManagerITest, TestDeduplicateMasterAddrsForHeartbeaters) { |
| InternalMiniClusterOptions cluster_opts; |
| NO_FATALS(StartCluster(std::move(cluster_opts))); |
| // Set up the tablet server so it points to a single master, but that master |
| // is specified multiple times. |
| int kNumDupes = 20; |
| const Sockaddr& master_addr = cluster_->mini_master()->bound_rpc_addr(); |
| tserver::MiniTabletServer* mini_tserver = cluster_->mini_tablet_server(0); |
| tserver::TabletServerOptions* opts = cluster_->mini_tablet_server(0)->options(); |
| opts->master_addresses.clear(); |
| for (int i = 0; i < kNumDupes; i++) { |
| opts->master_addresses.emplace_back(master_addr.host(), master_addr.port()); |
| } |
| mini_tserver->Shutdown(); |
| ASSERT_OK(mini_tserver->Restart()); |
| ASSERT_OK(mini_tserver->WaitStarted()); |
| |
| // The master addresses should be deduplicated and only one heartbeater |
| // thread should exist. |
| tserver::Heartbeater* hb = mini_tserver->server()->heartbeater(); |
| ASSERT_EQ(1, hb->threads_.size()); |
| } |
| |
| TEST_F(TsTabletManagerITest, TestTableStats) { |
| const int kNumMasters = 3; |
| const int kNumTservers = 3; |
| const int kNumReplicas = 3; |
| const int kNumHashBuckets = 3; |
| const int kRowsCount = rand() % 1000; |
| const string kNewTableName = "NewTableName"; |
| client::sp::shared_ptr<KuduTable> table; |
| |
| // Shorten the time and speed up the test. |
| FLAGS_heartbeat_interval_ms = 100; |
| FLAGS_raft_heartbeat_interval_ms = 100; |
| FLAGS_leader_failure_max_missed_heartbeat_periods = 2; |
| int64_t kMaxElectionTime = |
| FLAGS_raft_heartbeat_interval_ms * FLAGS_leader_failure_max_missed_heartbeat_periods; |
| |
| // Get the LEADER master. |
| const auto GetLeaderMaster = [&] () { |
| int idx = 0; |
| Master* master = nullptr; |
| Status s = cluster_->GetLeaderMasterIndex(&idx); |
| if (s.ok()) { |
| master = cluster_->mini_master(idx)->master(); |
| } |
| return CHECK_NOTNULL(master); |
| }; |
| // Get the LEADER master's service proxy. |
| const auto GetLeaderMasterServiceProxy = [&]() { |
| const auto& addr = GetLeaderMaster()->first_rpc_address(); |
| shared_ptr<MasterServiceProxy> proxy( |
| new MasterServiceProxy(client_messenger_, addr, addr.host())); |
| return proxy; |
| }; |
| // Get the LEADER master and run the check function. |
| const auto GetLeaderMasterAndRun = [&] (int64_t live_row_count, |
| const std::function<void(TableInfo*, int64_t)>& check_function) { |
| CatalogManager* catalog = GetLeaderMaster()->catalog_manager(); |
| master::CatalogManager::ScopedLeaderSharedLock l(catalog); |
| ASSERT_OK(l.first_failed_status()); |
| // Get the TableInfo. |
| vector<scoped_refptr<TableInfo>> table_infos; |
| catalog->GetAllTables(&table_infos); |
| ASSERT_EQ(1, table_infos.size()); |
| // Run the check function. |
| NO_FATALS(check_function(table_infos[0].get(), live_row_count)); |
| }; |
| // Check the stats. |
| const auto CheckStats = [&] (int64_t live_row_count) { |
| // Trigger heartbeat. |
| for (int i = 0; i < kNumTservers; ++i) { |
| TabletServer* tserver = CHECK_NOTNULL(cluster_->mini_tablet_server(i)->server()); |
| tserver->tablet_manager()->SetNextUpdateTimeForTests(); |
| tserver->heartbeater()->TriggerASAP(); |
| } |
| |
| // Check the stats. |
| NO_FATALS(GetLeaderMasterAndRun(live_row_count, [&] ( |
| TableInfo* table_info, int64_t live_row_count) { |
| ASSERT_EVENTUALLY([&] () { |
| ASSERT_TRUE(table_info->GetMetrics()->TableSupportsLiveRowCount()); |
| ASSERT_EQ(live_row_count, table_info->GetMetrics()->live_row_count->value()); |
| }); |
| })); |
| }; |
| |
| // 1. Start a cluster. |
| { |
| InternalMiniClusterOptions opts; |
| opts.num_masters = kNumMasters; |
| opts.num_tablet_servers = kNumTservers; |
| NO_FATALS(StartCluster(std::move(opts))); |
| ASSERT_EQ(kNumMasters, cluster_->num_masters()); |
| ASSERT_EQ(kNumTservers, cluster_->num_tablet_servers()); |
| } |
| |
| // 2. Create a table. |
| { |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema_) |
| .add_hash_partitions({ "key" }, kNumHashBuckets) |
| .num_replicas(kNumReplicas) |
| .timeout(MonoDelta::FromSeconds(60)) |
| .Create()); |
| ASSERT_OK(client_->OpenTable(kTableName, &table)); |
| NO_FATALS(CheckStats(0)); |
| } |
| |
| // 3. Write some rows and verify the stats. |
| { |
| client::sp::shared_ptr<KuduSession> session(client_->NewSession()); |
| ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); |
| for (int i = 0; i < kRowsCount; i++) { |
| KuduInsert* insert = table->NewInsert(); |
| KuduPartialRow* row = insert->mutable_row(); |
| ASSERT_OK(row->SetInt32(0, i)); |
| ASSERT_OK(session->Apply(insert)); |
| } |
| ASSERT_OK(session->Flush()); |
| NO_FATALS(CheckStats(kRowsCount)); |
| } |
| |
| // 4. Change the tablet leadership. |
| { |
| // Build a TServerDetails map so we can check for convergence. |
| itest::TabletServerMap ts_map; |
| ASSERT_OK(CreateTabletServerMap(GetLeaderMasterServiceProxy(), client_messenger_, &ts_map)); |
| ValueDeleter deleter(&ts_map); |
| |
| // Collect the TabletReplicas so we get direct access to RaftConsensus. |
| vector<scoped_refptr<TabletReplica>> tablet_replicas; |
| ASSERT_OK(PrepareTabletReplicas(MonoDelta::FromSeconds(60), &tablet_replicas)); |
| ASSERT_EQ(kNumReplicas * kNumHashBuckets, tablet_replicas.size()); |
| |
| for (int i = 0; i < kNumReplicas * kNumHashBuckets; ++i) { |
| scoped_refptr<TabletReplica> replica = tablet_replicas[i]; |
| if (consensus::RaftPeerPB_Role_LEADER == replica->consensus()->role()) { |
| // Step down. |
| itest::TServerDetails* tserver = |
| CHECK_NOTNULL(FindOrDie(ts_map, replica->permanent_uuid())); |
| TabletServerErrorPB error; |
| auto s = LeaderStepDown(tserver, replica->tablet_id(), |
| MonoDelta::FromSeconds(10), &error); |
| // In rare cases, the leader replica can change its role right before |
| // the step-down request is received. |
| if (s.IsIllegalState() && |
| error.code() == TabletServerErrorPB::NOT_THE_LEADER) { |
| LOG(INFO) << Substitute("T:$0 P:$1: not a leader replica anymore", |
| replica->tablet_id(), replica->permanent_uuid()); |
| continue; |
| } |
| ASSERT_OK(s); |
| SleepFor(MonoDelta::FromMilliseconds(kMaxElectionTime)); |
| // Check stats after every election. |
| NO_FATALS(CheckStats(kRowsCount)); |
| } |
| } |
| } |
| |
| // 5. Rename the table. |
| { |
| ASSERT_OK(itest::AlterTableName(GetLeaderMasterServiceProxy(), |
| table->id(), kTableName, kNewTableName, MonoDelta::FromSeconds(5))); |
| |
| // Check table id, table name and stats. |
| NO_FATALS(GetLeaderMasterAndRun(kRowsCount ,[&] ( |
| TableInfo* table_info, int64_t live_row_count) { |
| std::ostringstream out; |
| JsonWriter writer(&out, JsonWriter::PRETTY); |
| ASSERT_OK(table_info->metric_entity_->WriteAsJson(&writer, MetricJsonOptions())); |
| string metric_attrs_str = out.str(); |
| ASSERT_STR_NOT_CONTAINS(metric_attrs_str, kTableName); |
| ASSERT_STR_CONTAINS(metric_attrs_str, kNewTableName); |
| ASSERT_EQ(table->id(), table_info->metric_entity_->id()); |
| ASSERT_TRUE(table_info->GetMetrics()->TableSupportsLiveRowCount()); |
| ASSERT_EQ(live_row_count, table_info->GetMetrics()->live_row_count->value()); |
| })); |
| } |
| |
| // 6. Restart the masters. |
| { |
| for (int i = 0; i < kNumMasters; ++i) { |
| int idx = 0; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&idx)); |
| master::MiniMaster* mini_master = CHECK_NOTNULL(cluster_->mini_master(idx)); |
| mini_master->Shutdown(); |
| SleepFor(MonoDelta::FromMilliseconds(kMaxElectionTime)); |
| ASSERT_OK(mini_master->Restart()); |
| // Sometimes the election fails until the node restarts. |
| // And the restarted node is elected leader again. |
| // So, it is necessary to wait for all tservers to report. |
| SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_interval_ms)); |
| NO_FATALS(CheckStats(kRowsCount)); |
| } |
| } |
| |
| // 7. Restart the tservers. |
| { |
| for (int i = 0; i < kNumTservers; ++i) { |
| tserver::MiniTabletServer* mini_tserver = CHECK_NOTNULL(cluster_->mini_tablet_server(i)); |
| mini_tserver->Shutdown(); |
| ASSERT_OK(mini_tserver->Start()); |
| SleepFor(MonoDelta::FromMilliseconds(kMaxElectionTime)); |
| ASSERT_OK(mini_tserver->WaitStarted()); |
| NO_FATALS(CheckStats(kRowsCount)); |
| } |
| } |
| |
| // 8. Delete the table. |
| { |
| ASSERT_OK(itest::DeleteTable(GetLeaderMasterServiceProxy(), |
| table->id(), kNewTableName, MonoDelta::FromSeconds(5))); |
| |
| FLAGS_metrics_retirement_age_ms = -1; |
| MetricRegistry* metric_registry = CHECK_NOTNULL(GetLeaderMaster()->metric_registry()); |
| const auto GetMetricsString = [&] (string* ret) { |
| std::ostringstream out; |
| JsonWriter writer(&out, JsonWriter::PRETTY); |
| ASSERT_OK(metric_registry->WriteAsJson(&writer, MetricJsonOptions())); |
| *ret = out.str(); |
| }; |
| |
| string metrics_str; |
| // On the first call, the metric is returned and internally retired, but it is not deleted. |
| NO_FATALS(GetMetricsString(&metrics_str)); |
| ASSERT_STR_CONTAINS(metrics_str, kNewTableName); |
| // On the second call, the metric is returned and then deleted. |
| NO_FATALS(GetMetricsString(&metrics_str)); |
| ASSERT_STR_CONTAINS(metrics_str, kNewTableName); |
| // On the third call, the metric is no longer available. |
| NO_FATALS(GetMetricsString(&metrics_str)); |
| ASSERT_STR_NOT_CONTAINS(metrics_str, kNewTableName); |
| } |
| } |
| |
| TEST_F(TsTabletManagerITest, TestDeleteTableDuringTabletCopy) { |
| MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| const int kNumTabletServers = 3; |
| const int kNumReplicas = 3; |
| { |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kNumTabletServers; |
| NO_FATALS(StartCluster(std::move(opts))); |
| } |
| string tablet_id; |
| client::sp::shared_ptr<KuduTable> table; |
| itest::TabletServerMap ts_map; |
| ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy(), client_messenger_, &ts_map)); |
| ValueDeleter deleter(&ts_map); |
| |
| auto tserver_tablets_count = [&](int index) { |
| return cluster_->mini_tablet_server(index)->ListTablets().size(); |
| }; |
| |
| // Inject latency to test whether we could delete copying tablet. |
| FLAGS_tablet_open_inject_latency_ms = 50; |
| |
| // Create a table with one tablet. |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema_) |
| .set_range_partition_columns({}) |
| .num_replicas(kNumReplicas) |
| .Create()); |
| ASSERT_EVENTUALLY([&] { |
| for (int i = 0; i < kNumTabletServers; ++i) { |
| ASSERT_EQ(1, tserver_tablets_count(i)); |
| } |
| }); |
| tablet_id = cluster_->mini_tablet_server(0)->ListTablets()[0]; |
| ASSERT_OK(client_->OpenTable(kTableName, &table)); |
| |
| // Get the leader and follower tablet servers. |
| itest::TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(ts_map, tablet_id, kTimeout, &leader_ts)); |
| int leader_index = cluster_->tablet_server_index_by_uuid(leader_ts->uuid()); |
| int follower_index = (leader_index + 1) % kNumTabletServers; |
| MiniTabletServer* follower = cluster_->mini_tablet_server(follower_index); |
| itest::TServerDetails* follower_ts = ts_map[follower->uuid()]; |
| |
| // Tombstone the tablet on the follower. |
| ASSERT_OK(itest::DeleteTabletWithRetries(follower_ts, tablet_id, |
| tablet::TabletDataState::TABLET_DATA_TOMBSTONED, |
| kTimeout)); |
| // Copy tablet from leader_ts to follower_ts. |
| HostPort leader_addr = HostPortFromPB(leader_ts->registration.rpc_addresses(0)); |
| ASSERT_OK(itest::StartTabletCopy(follower_ts, tablet_id, leader_ts->uuid(), |
| leader_addr, std::numeric_limits<int64_t>::max(), kTimeout)); |
| |
| // Delete table during tablet copying. |
| ASSERT_OK(itest::DeleteTable(cluster_->master_proxy(), table->id(), kTableName, kTimeout)); |
| |
| // Finally all tablets deleted. |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(0, tserver_tablets_count(follower_index)); |
| }); |
| } |
| |
| namespace { |
| |
| Status GetPartitionForTxnStatusTablet(int64_t start_txn_id, int64_t end_txn_id, |
| PartitionSchema* partition_schema, |
| Partition* partition) { |
| const auto& schema = TxnStatusTablet::GetSchema(); |
| // Add range partitioning on the transaction ID column. |
| PartitionSchemaPB partition_pb; |
| auto* range_schema = partition_pb.mutable_range_schema(); |
| range_schema->add_columns()->set_name(TxnStatusTablet::kTxnIdColName); |
| PartitionSchema pschema; |
| RETURN_NOT_OK(PartitionSchema::FromPB(partition_pb, schema, &pschema)); |
| |
| // Create some bounds for the partition. |
| KuduPartialRow lower_bound(&schema); |
| KuduPartialRow upper_bound(&schema); |
| RETURN_NOT_OK(lower_bound.SetInt64(TxnStatusTablet::kTxnIdColName, start_txn_id)); |
| RETURN_NOT_OK(upper_bound.SetInt64(TxnStatusTablet::kTxnIdColName, end_txn_id)); |
| vector<Partition> ps; |
| RETURN_NOT_OK(pschema.CreatePartitions(/*split_rows=*/{}, |
| { std::make_pair(lower_bound, upper_bound) }, schema, &ps)); |
| *partition = ps[0]; |
| *partition_schema = pschema; |
| return Status::OK(); |
| } |
| |
| } // anonymous namespace |
| |
| class TxnStatusTabletManagementTest : public TsTabletManagerITest { |
| public: |
| static constexpr const char* kOwner = "jojo"; |
| const char* kParticipant = "participant"; |
| const char* kTxnStatusTabletId = "11111111111111111111111111111111"; |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| |
| void SetUp() override { |
| NO_FATALS(TsTabletManagerITest::SetUp()); |
| FLAGS_enable_txn_system_client_init = true; |
| } |
| |
| // Creates a request to create a transaction status tablet with the given IDs |
| // and Raft config. |
| static CreateTabletRequestPB CreateTxnTabletReq(const string& tablet_id, const string& replica_id, |
| RaftConfigPB raft_config, |
| bool is_txn_status_tablet) { |
| CreateTabletRequestPB req; |
| if (is_txn_status_tablet) { |
| req.set_table_type(TableTypePB::TXN_STATUS_TABLE); |
| } |
| req.set_dest_uuid(replica_id); |
| req.set_table_id(Substitute("$0_table_id", tablet_id)); |
| req.set_table_name(Substitute("$0_table_name", tablet_id)); |
| req.set_tablet_id(tablet_id); |
| *req.mutable_config() = std::move(raft_config); |
| CHECK_OK(SchemaToPB(TxnStatusTablet::GetSchema(), req.mutable_schema())); |
| return req; |
| } |
| |
| // Creates a tablet with a fixed schema, with the given tablet ID. |
| Status CreateTablet(MiniTabletServer* ts, |
| const string& tablet_id, |
| bool is_txn_status_tablet, |
| scoped_refptr<TabletReplica>* replica = nullptr) { |
| CreateTabletRequestPB req = CreateTxnTabletReq( |
| tablet_id, ts->server()->fs_manager()->uuid(), ts->CreateLocalConfig(), |
| is_txn_status_tablet); |
| CreateTabletResponsePB resp; |
| RpcController rpc; |
| |
| // Put together a partition spec for this tablet. |
| PartitionSchema partition_schema; |
| Partition partition; |
| RETURN_NOT_OK(GetPartitionForTxnStatusTablet(0, 100, &partition_schema, &partition)); |
| partition.ToPB(req.mutable_partition()); |
| |
| unique_ptr<TabletServerAdminServiceProxy> admin_proxy( |
| new TabletServerAdminServiceProxy(client_messenger_, ts->bound_rpc_addr(), |
| ts->bound_rpc_addr().host())); |
| RETURN_NOT_OK(admin_proxy->CreateTablet(req, &resp, &rpc)); |
| scoped_refptr<TabletReplica> r; |
| CHECK(ts->server()->tablet_manager()->LookupTablet(tablet_id, &r)); |
| |
| // Wait for the tablet to be in RUNNING state and its consensus running too. |
| RETURN_NOT_OK(r->WaitUntilConsensusRunning(kTimeout)); |
| auto s = r->consensus()->WaitUntilLeader(kTimeout); |
| if (replica) { |
| *replica = std::move(r); |
| } |
| return s; |
| } |
| |
| // Creates a transaction status tablet at the given tablet server, |
| // waiting for the transaction status data to be loaded from the tablet. |
| Status CreateTxnStatusTablet(MiniTabletServer* ts) { |
| scoped_refptr<TabletReplica> r; |
| RETURN_NOT_OK(CreateTablet( |
| ts, kTxnStatusTabletId, /*is_txn_status_tablet*/true, &r)); |
| TxnCoordinator* c = CHECK_NOTNULL(CHECK_NOTNULL(r)->txn_coordinator()); |
| |
| // Wait for the transaction status data to be loaded. The verification below |
| // relies on the internals of the TxnStatusManager's implementation, but it |
| // seems OK for test purposes. |
| const auto deadline = MonoTime::Now() + kTimeout; |
| bool is_data_loaded = false; |
| do { |
| auto txn_id = c->highest_txn_id(); |
| if (txn_id >= -1) { |
| is_data_loaded = true; |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } while (MonoTime::Now() < deadline); |
| |
| return is_data_loaded |
| ? Status::OK() |
| : Status::TimedOut("timed out waiting for txn status data to be loaded"); |
| } |
| |
| static Status StartTransactions(const ParticipantIdsByTxnId& txns, TxnCoordinator* coordinator) { |
| TxnStatusManager* txn_status_manager = |
| dynamic_cast<TxnStatusManager*>(coordinator); |
| TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager); |
| TabletServerErrorPB ts_error; |
| for (const auto& txn_id_and_prt_ids : txns) { |
| const auto& txn_id = txn_id_and_prt_ids.first; |
| RETURN_NOT_OK(coordinator->BeginTransaction(txn_id, kOwner, nullptr, &ts_error)); |
| for (const auto& prt_id : txn_id_and_prt_ids.second) { |
| RETURN_NOT_OK(coordinator->RegisterParticipant(txn_id, prt_id, kOwner, &ts_error)); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| static CoordinateTransactionRequestPB CreateCoordinateTxnReq( |
| const string& tablet_id, |
| const string& user, |
| CoordinatorOpPB::CoordinatorOpType type, |
| int64_t txn_id) { |
| CoordinateTransactionRequestPB req; |
| req.set_txn_status_tablet_id(tablet_id); |
| auto* op = req.mutable_op(); |
| op->set_user(user); |
| op->set_type(type); |
| op->set_txn_id(txn_id); |
| return req; |
| } |
| }; |
| |
| TEST_F(TxnStatusTabletManagementTest, TestBootstrapTransactionStatusTablet) { |
| NO_FATALS(StartCluster({})); |
| auto* ts0 = cluster_->mini_tablet_server(0); |
| ASSERT_OK(CreateTxnStatusTablet(ts0)); |
| const ParticipantIdsByTxnId kExpectedTxns = { |
| { 1, { kParticipant } }, |
| { 2, {} }, |
| }; |
| { |
| // Ensure the replica has a coordinator. |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet( |
| kTxnStatusTabletId, &replica)); |
| ASSERT_OK(replica->WaitUntilConsensusRunning(kTimeout)); |
| TxnCoordinator* coordinator = replica->txn_coordinator(); |
| ASSERT_NE(nullptr, coordinator); |
| |
| // Create a transaction and participant. |
| ASSERT_OK(StartTransactions(kExpectedTxns, coordinator)); |
| } |
| // Restart the tablet server, reload the transaction status tablet, and |
| // ensure we have the expected state. |
| ts0->Shutdown(); |
| ASSERT_OK(ts0->Restart()); |
| { |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet( |
| kTxnStatusTabletId, &replica)); |
| TxnCoordinator* coordinator = replica->txn_coordinator(); |
| ASSERT_NE(nullptr, coordinator); |
| // Wait for the contents of the tablet to be loaded into memory. |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(kExpectedTxns, coordinator->GetParticipantsByTxnIdForTests()); |
| }); |
| } |
| } |
| |
| TEST_F(TxnStatusTabletManagementTest, TestCopyTransactionStatusTablet) { |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = 2; |
| NO_FATALS(StartCluster(std::move(opts))); |
| const ParticipantIdsByTxnId kExpectedTxns = { |
| { 1, { kParticipant } }, |
| { 2, {} }, |
| }; |
| |
| auto* ts0 = cluster_->mini_tablet_server(0); |
| ASSERT_OK(CreateTxnStatusTablet(ts0)); |
| { |
| // Ensure the replica has a coordinator. |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet( |
| kTxnStatusTabletId, &replica)); |
| ASSERT_OK(replica->WaitUntilConsensusRunning(kTimeout)); |
| TxnCoordinator* coordinator = replica->txn_coordinator(); |
| ASSERT_NE(nullptr, coordinator); |
| |
| // Create a transaction and participant. |
| ASSERT_OK(StartTransactions(kExpectedTxns, coordinator)); |
| } |
| TabletServerMap ts_map; |
| ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy(), client_messenger_, &ts_map)); |
| ValueDeleter deleter(&ts_map); |
| auto* ts1 = cluster_->mini_tablet_server(1); |
| TServerDetails* src_details = ts_map[ts0->uuid()]; |
| TServerDetails* dst_details = ts_map[ts1->uuid()]; |
| HostPort src_addr = HostPortFromPB(src_details->registration.rpc_addresses(0)); |
| ASSERT_OK(StartTabletCopy(dst_details, kTxnStatusTabletId, src_details->uuid(), |
| src_addr, std::numeric_limits<int64_t>::max(), kTimeout)); |
| { |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_TRUE(ts1->server()->tablet_manager()->LookupTablet( |
| kTxnStatusTabletId, &replica)); |
| ASSERT_OK(replica->WaitUntilConsensusRunning(MonoDelta::FromSeconds(3))); |
| TxnCoordinator* coordinator = replica->txn_coordinator(); |
| ASSERT_NE(nullptr, coordinator); |
| } |
| // Restart the tserver and ensure the transactions information is |
| // correctly reloaded. |
| { |
| ts0->Shutdown(); |
| ASSERT_OK(ts0->Restart()); |
| // Wait for the contents of the tablet to be loaded into memory. |
| ASSERT_EVENTUALLY([&] { |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet( |
| kTxnStatusTabletId, &replica)); |
| TxnCoordinator* coordinator = replica->txn_coordinator(); |
| ASSERT_EQ(kExpectedTxns, coordinator->GetParticipantsByTxnIdForTests()); |
| }); |
| } |
| } |
| |
| TEST_F(TxnStatusTabletManagementTest, TestTabletServerProxyCalls) { |
| const auto& kSuperUser = "starlight"; |
| const auto& kTrustedUser = "hughie"; |
| const auto& kRegularUser = "butcher"; |
| FLAGS_superuser_acl = kSuperUser; |
| FLAGS_trusted_user_acl = kTrustedUser; |
| NO_FATALS(StartCluster({})); |
| constexpr const int kTServerIdx = 0; |
| auto* ts = cluster_->mini_tablet_server(kTServerIdx); |
| ASSERT_OK(CreateTxnStatusTablet(ts)); |
| // Put together a sequence of ops that should succeed. |
| const vector<CoordinatorOpPB::CoordinatorOpType> kOpSequence = { |
| CoordinatorOpPB::BEGIN_TXN, |
| CoordinatorOpPB::REGISTER_PARTICIPANT, |
| CoordinatorOpPB::KEEP_TXN_ALIVE, |
| CoordinatorOpPB::BEGIN_COMMIT_TXN, |
| CoordinatorOpPB::ABORT_TXN, |
| CoordinatorOpPB::GET_TXN_STATUS, |
| }; |
| // Perform the series of ops for the given transaction ID as the given user, |
| // erroring out if an unexpected result is received. If 'user' is empty, use |
| // the logged-in user (this user is the service user, since we just restarted |
| // the server). |
| const auto perform_ops = [&] (int64_t txn_id, const string& user, bool expect_success) { |
| auto admin_proxy = cluster_->tserver_admin_proxy(kTServerIdx); |
| if (!user.empty()) { |
| rpc::UserCredentials user_creds; |
| user_creds.set_real_user(user); |
| admin_proxy->set_user_credentials(std::move(user_creds)); |
| } |
| for (const auto& op_type : kOpSequence) { |
| CoordinateTransactionRequestPB req = CreateCoordinateTxnReq( |
| kTxnStatusTabletId, "user", op_type, txn_id); |
| if (op_type == CoordinatorOpPB::REGISTER_PARTICIPANT) { |
| req.mutable_op()->set_txn_participant_id(kParticipant); |
| } |
| CoordinateTransactionResponsePB resp; |
| RpcController rpc; |
| Status s = admin_proxy->CoordinateTransaction(req, &resp, &rpc); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| if (expect_success) { |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(op_type == CoordinatorOpPB::BEGIN_TXN || |
| op_type == CoordinatorOpPB::GET_TXN_STATUS, |
| resp.has_op_result()); |
| } else { |
| ASSERT_TRUE(s.IsRemoteError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Not authorized"); |
| } |
| } |
| }; |
| // The service user and super user should be able to perform these ops |
| // without issue. |
| NO_FATALS(perform_ops(1, /*user*/"", /*expect_success*/true)); |
| NO_FATALS(perform_ops(2, kSuperUser, /*expect_success*/true)); |
| |
| // Anyone else should be denied. |
| NO_FATALS(perform_ops(3, kTrustedUser, /*expect_success*/false)); |
| NO_FATALS(perform_ops(4, kRegularUser, /*expect_success*/false)); |
| } |
| |
| TEST_F(TxnStatusTabletManagementTest, TestTabletServerProxyCallErrors) { |
| NO_FATALS(StartCluster({})); |
| auto* ts = cluster_->mini_tablet_server(0); |
| auto admin_proxy = cluster_->tserver_admin_proxy(0); |
| ASSERT_OK(CreateTxnStatusTablet(ts)); |
| // If the request is missing fields, it should be rejected. |
| { |
| CoordinateTransactionRequestPB req; |
| CoordinateTransactionResponsePB resp; |
| RpcController rpc; |
| ASSERT_OK(admin_proxy->CoordinateTransaction(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| Status s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Missing fields"); |
| } |
| { |
| // Don't supply a participant ID -- that should net the same error. |
| CoordinateTransactionRequestPB req = CreateCoordinateTxnReq( |
| kTxnStatusTabletId, "user", CoordinatorOpPB::REGISTER_PARTICIPANT, 1); |
| CoordinateTransactionResponsePB resp; |
| RpcController rpc; |
| ASSERT_OK(admin_proxy->CoordinateTransaction(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| Status s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Missing participant id"); |
| } |
| // If the request is for a tablet that isn't a transaction tablet, it should |
| // be rejected. |
| { |
| const string kOtherTablet = "22222222222222222222222222222222"; |
| ASSERT_OK(CreateTablet(ts, kOtherTablet, /*is_txn_status_tablet*/false)); |
| |
| CoordinateTransactionRequestPB req = CreateCoordinateTxnReq( |
| kOtherTablet, "user", CoordinatorOpPB::BEGIN_TXN, 1); |
| CoordinateTransactionResponsePB resp; |
| RpcController rpc; |
| ASSERT_OK(admin_proxy->CoordinateTransaction(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| Status s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Requested tablet is not a txn coordinator"); |
| } |
| // If the request is for a tablet that doesn't exist, it should be rejected. |
| { |
| CoordinateTransactionRequestPB req = CreateCoordinateTxnReq( |
| "not_a_real_tablet", "user", CoordinatorOpPB::BEGIN_TXN, 1); |
| CoordinateTransactionResponsePB resp; |
| RpcController rpc; |
| ASSERT_OK(admin_proxy->CoordinateTransaction(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| Status s = StatusFromPB(resp.error().status()); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Tablet not found"); |
| ASSERT_EQ(TabletServerErrorPB::TABLET_NOT_FOUND, resp.error().code()); |
| } |
| // Logical errors (e.g. beginning a duplicate transaction) should result in |
| // application-level errors. |
| { |
| CoordinateTransactionRequestPB req = CreateCoordinateTxnReq( |
| kTxnStatusTabletId, "user", CoordinatorOpPB::BEGIN_TXN, 1); |
| { |
| CoordinateTransactionResponsePB resp; |
| RpcController rpc; |
| ASSERT_OK(admin_proxy->CoordinateTransaction(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_TRUE(resp.has_op_result()); |
| ASSERT_EQ(1, resp.op_result().highest_seen_txn_id()); |
| } |
| { |
| CoordinateTransactionResponsePB resp; |
| RpcController rpc; |
| ASSERT_OK(admin_proxy->CoordinateTransaction(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_TRUE(resp.has_op_result()); |
| ASSERT_EQ(1, resp.op_result().highest_seen_txn_id()); |
| Status s = StatusFromPB(resp.op_result().op_error()); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID so far"); |
| } |
| } |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |