| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <cstdlib> |
| #include <map> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/quorum_util.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master.proxy.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tserver/heartbeater.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_bool(allow_unsafe_replication_factor); |
| DECLARE_bool(catalog_manager_evict_excess_replicas); |
| DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader); |
| DECLARE_bool(enable_leader_failure_detection); |
| DECLARE_bool(raft_prepare_replacement_before_eviction); |
| DEFINE_int32(num_election_test_loops, 3, |
| "Number of random EmulateElection() loops to execute in " |
| "TestReportNewLeaderOnLeaderChange"); |
| |
| using kudu::client::KuduClient; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableCreator; |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::cluster::InternalMiniClusterOptions; |
| using kudu::consensus::ConsensusStatePB; |
| using kudu::consensus::GetConsensusRole; |
| using kudu::consensus::HealthReportPB; |
| using kudu::consensus::INCLUDE_HEALTH_REPORT; |
| using kudu::consensus::RaftConfigPB; |
| using kudu::consensus::RaftConsensus; |
| using kudu::consensus::RaftPeerPB; |
| using kudu::itest::SimpleIntKeyKuduSchema; |
| using kudu::master::MasterServiceProxy; |
| using kudu::master::ReportedTabletPB; |
| using kudu::master::TabletReportPB; |
| using kudu::rpc::Messenger; |
| using kudu::rpc::MessengerBuilder; |
| using kudu::tablet::TabletReplica; |
| using kudu::tserver::MiniTabletServer; |
| using kudu::ClusterVerifier; |
| using std::map; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tserver { |
| |
| static const char* const kTableName = "test-table"; |
| |
| class TsTabletManagerITest : public KuduTest { |
| public: |
| TsTabletManagerITest() |
| : schema_(SimpleIntKeyKuduSchema()) { |
| } |
| void SetUp() override { |
| KuduTest::SetUp(); |
| |
| MessengerBuilder bld("client"); |
| ASSERT_OK(bld.Build(&client_messenger_)); |
| } |
| |
| void StartCluster(InternalMiniClusterOptions opts) { |
| cluster_.reset(new InternalMiniCluster(env_, std::move(opts))); |
| ASSERT_OK(cluster_->Start()); |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client_)); |
| } |
| |
| protected: |
| void DisableHeartbeatingToMaster(); |
| |
| // Populate the 'replicas' container with corresponding objects representing |
| // tablet replicas running at tablet servers in the test cluster. It's assumed |
| // there is at least one tablet replica per tablet server. Also, this utility |
| // method awaits up to the specified timeout for the consensus to be running |
| // before adding an element into the output container. |
| Status PrepareTabletReplicas(MonoDelta timeout, |
| vector<scoped_refptr<TabletReplica>>* replicas); |
| |
| // Generate incremental tablet reports using test-specific method |
| // GenerateIncrementalTabletReportsForTests() of the specified heartbeater. |
| void GetIncrementalTabletReports(Heartbeater* heartbeater, |
| vector<TabletReportPB>* reports); |
| |
| const KuduSchema schema_; |
| |
| unique_ptr<InternalMiniCluster> cluster_; |
| client::sp::shared_ptr<KuduClient> client_; |
| std::shared_ptr<Messenger> client_messenger_; |
| }; |
| |
| void TsTabletManagerITest::DisableHeartbeatingToMaster() { |
| for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) { |
| MiniTabletServer* ts = cluster_->mini_tablet_server(i); |
| ts->FailHeartbeats(); |
| } |
| } |
| |
| Status TsTabletManagerITest::PrepareTabletReplicas( |
| MonoDelta timeout, vector<scoped_refptr<TabletReplica>>* replicas) { |
| const MonoTime deadline = MonoTime::Now() + timeout; |
| for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) { |
| MiniTabletServer* ts = cluster_->mini_tablet_server(i); |
| vector<scoped_refptr<TabletReplica>> ts_replicas; |
| // The replicas may not have been created yet, so loop until we see them. |
| while (MonoTime::Now() < deadline) { |
| ts->server()->tablet_manager()->GetTabletReplicas(&ts_replicas); |
| if (!ts_replicas.empty()) { |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| if (ts_replicas.empty()) { |
| return Status::TimedOut("waiting for tablet replicas register with ts manager"); |
| } |
| RETURN_NOT_OK(ts_replicas.front()->WaitUntilConsensusRunning( |
| deadline - MonoTime::Now())); |
| replicas->insert(replicas->end(), ts_replicas.begin(), ts_replicas.end()); |
| } |
| return Status::OK(); |
| } |
| |
| void TsTabletManagerITest::GetIncrementalTabletReports( |
| Heartbeater* heartbeater, vector<TabletReportPB>* reports) { |
| vector<TabletReportPB> r; |
| // The MarkDirty() callback is on an async thread so it might take the |
| // follower a few milliseconds to execute it. Wait for that to happen. |
| ASSERT_EVENTUALLY([&] { |
| r = heartbeater->GenerateIncrementalTabletReportsForTests(); |
| ASSERT_EQ(1, r.size()); |
| ASSERT_FALSE(r.front().updated_tablets().empty()); |
| }); |
| reports->swap(r); |
| } |
| |
| class FailedTabletsAreReplacedITest : |
| public TsTabletManagerITest, |
| public ::testing::WithParamInterface<bool> { |
| }; |
| // Test that when a tablet replica is marked as failed, it will eventually be |
| // evicted and replaced. |
| TEST_P(FailedTabletsAreReplacedITest, OneReplica) { |
| const bool is_3_4_3_mode = GetParam(); |
| FLAGS_raft_prepare_replacement_before_eviction = is_3_4_3_mode; |
| const auto kNumReplicas = 3; |
| const auto kNumTabletServers = kNumReplicas + (is_3_4_3_mode ? 1 : 0); |
| |
| { |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kNumTabletServers; |
| NO_FATALS(StartCluster(std::move(opts))); |
| } |
| TestWorkload work(cluster_.get()); |
| work.set_num_replicas(kNumReplicas); |
| work.Setup(); |
| work.Start(); |
| |
| // Insert data until the tablet becomes visible to the server. |
| string tablet_id; |
| ASSERT_EVENTUALLY([&] { |
| auto idx = rand() % kNumTabletServers; |
| vector<string> tablet_ids = cluster_->mini_tablet_server(idx)->ListTablets(); |
| ASSERT_EQ(1, tablet_ids.size()); |
| tablet_id = tablet_ids[0]; |
| }); |
| work.StopAndJoin(); |
| |
| // Wait until all the replicas are running before failing one arbitrarily. |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| |
| { |
| // Inject an error into one of replicas. Shutting it down will leave it in |
| // the FAILED state. |
| scoped_refptr<TabletReplica> replica; |
| ASSERT_EVENTUALLY([&] { |
| auto idx = rand() % kNumTabletServers; |
| MiniTabletServer* ts = cluster_->mini_tablet_server(idx); |
| ASSERT_OK(ts->server()->tablet_manager()->GetTabletReplica(tablet_id, &replica)); |
| }); |
| replica->SetError(Status::IOError("INJECTED ERROR: tablet failed")); |
| replica->Shutdown(); |
| ASSERT_EQ(tablet::FAILED, replica->state()); |
| } |
| |
| // Ensure the tablet eventually is replicated. |
| NO_FATALS(v.CheckCluster()); |
| } |
| INSTANTIATE_TEST_CASE_P(, |
| FailedTabletsAreReplacedITest, |
| ::testing::Bool()); |
| |
| // Test that when the leader changes, the tablet manager gets notified and |
| // includes that information in the next tablet report. |
| TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) { |
| const int kNumReplicas = 2; |
| { |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kNumReplicas; |
| NO_FATALS(StartCluster(std::move(opts))); |
| } |
| |
| // We need to control elections precisely for this test since we're using |
| // EmulateElection() with a distributed consensus configuration. |
| FLAGS_enable_leader_failure_detection = false; |
| FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false; |
| |
| // Allow creating table with even replication factor. |
| FLAGS_allow_unsafe_replication_factor = true; |
| |
| // Run a few more iters in slow-test mode. |
| OverrideFlagForSlowTests("num_election_test_loops", "10"); |
| |
| // Create the table. |
| client::sp::shared_ptr<KuduTable> table; |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema_) |
| .set_range_partition_columns({ "key" }) |
| .num_replicas(kNumReplicas) |
| .Create()); |
| ASSERT_OK(client_->OpenTable(kTableName, &table)); |
| |
| // Build a TServerDetails map so we can check for convergence. |
| const auto& addr = cluster_->mini_master()->bound_rpc_addr(); |
| shared_ptr<MasterServiceProxy> master_proxy( |
| new MasterServiceProxy(client_messenger_, addr, addr.host())); |
| |
| itest::TabletServerMap ts_map; |
| ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map)); |
| ValueDeleter deleter(&ts_map); |
| |
| // Collect the TabletReplicas so we get direct access to RaftConsensus. |
| vector<scoped_refptr<TabletReplica>> tablet_replicas; |
| ASSERT_OK(PrepareTabletReplicas(MonoDelta::FromSeconds(60), &tablet_replicas)); |
| ASSERT_EQ(kNumReplicas, tablet_replicas.size()); |
| |
| // Stop heartbeating we don't race against the Master. |
| DisableHeartbeatingToMaster(); |
| |
| // Loop and cause elections and term changes from different servers. |
| // TSTabletManager should acknowledge the role changes via tablet reports. |
| for (int i = 0; i < FLAGS_num_election_test_loops; i++) { |
| SCOPED_TRACE(Substitute("Iter: $0", i)); |
| int new_leader_idx = rand() % 2; |
| LOG(INFO) << "Electing peer " << new_leader_idx << "..."; |
| RaftConsensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus()); |
| ASSERT_OK(con->EmulateElection()); |
| LOG(INFO) << "Waiting for servers to agree..."; |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5), |
| ts_map, tablet_replicas[0]->tablet_id(), i + 1)); |
| |
| // Now check that the tablet report reports the correct role for both servers. |
| for (int replica = 0; replica < kNumReplicas; replica++) { |
| vector<TabletReportPB> reports; |
| NO_FATALS(GetIncrementalTabletReports( |
| cluster_->mini_tablet_server(replica)->server()->heartbeater(), |
| &reports)); |
| |
| // Ensure that our tablet reports are consistent. |
| TabletReportPB& report = reports[0]; |
| ASSERT_EQ(1, report.updated_tablets_size()) |
| << "Wrong report size:\n" << pb_util::SecureDebugString(report); |
| const ReportedTabletPB& reported_tablet = report.updated_tablets(0); |
| ASSERT_TRUE(reported_tablet.has_consensus_state()); |
| |
| string uuid = tablet_replicas[replica]->permanent_uuid(); |
| RaftPeerPB::Role role = GetConsensusRole(uuid, reported_tablet.consensus_state()); |
| if (replica == new_leader_idx) { |
| ASSERT_EQ(RaftPeerPB::LEADER, role) |
| << "Tablet report: " << pb_util::SecureShortDebugString(report); |
| } else { |
| ASSERT_EQ(RaftPeerPB::FOLLOWER, role) |
| << "Tablet report: " << pb_util::SecureShortDebugString(report); |
| } |
| } |
| } |
| } |
| |
| // Test that the tablet manager generates reports on replica health status |
| // in accordance with observed changes on replica status, and that the tablet |
| // manager includes that information into the next tablet report. Specifically, |
| // verify that: |
| // 1. The leader replica provides the health status report in its consensus |
| // state, if requested. |
| // 2. The health report information matches the state of tablet replicas. |
| // 3. The tablet manager generates appropriate tablet reports with updated |
| // health information when replicas change their state. |
| TEST_F(TsTabletManagerITest, ReportOnReplicaHealthStatus) { |
| constexpr int kNumReplicas = 3; |
| const auto kTimeout = MonoDelta::FromSeconds(60); |
| |
| // This test is specific to the 3-4-3 replica management scheme. |
| FLAGS_raft_prepare_replacement_before_eviction = true; |
| FLAGS_catalog_manager_evict_excess_replicas = false; |
| { |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kNumReplicas; |
| NO_FATALS(StartCluster(std::move(opts))); |
| } |
| |
| // Create the table. |
| client::sp::shared_ptr<KuduTable> table; |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema_) |
| .set_range_partition_columns({}) // need just one tablet |
| .num_replicas(kNumReplicas) |
| .Create()); |
| ASSERT_OK(client_->OpenTable(kTableName, &table)); |
| |
| // Build a TServerDetails map so we can check for convergence. |
| const auto& addr = cluster_->mini_master()->bound_rpc_addr(); |
| shared_ptr<MasterServiceProxy> master_proxy( |
| new MasterServiceProxy(client_messenger_, addr, addr.host())); |
| |
| itest::TabletServerMap ts_map; |
| ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map)); |
| ValueDeleter deleter(&ts_map); |
| |
| // Collect the TabletReplicas so we get direct access to RaftConsensus. |
| vector<scoped_refptr<TabletReplica>> tablet_replicas; |
| ASSERT_OK(PrepareTabletReplicas(kTimeout, &tablet_replicas)); |
| ASSERT_EQ(kNumReplicas, tablet_replicas.size()); |
| |
| // Don't send heartbeats to master, otherwise it would be a race in |
| // acknowledging the heartbeats and generating new tablet reports. Clearing |
| // the 'dirty' flag on a tablet before the generated tablet report is |
| // introspected makes the test scenario very flaky. Also, this scenario does |
| // not assume that the catalog manager initiates the replacement of failed |
| // voter replicas. |
| DisableHeartbeatingToMaster(); |
| |
| // Generate health reports for every element of the 'tablet_replicas' |
| // container. Also, output the leader replica UUID from the consensus |
| // state into 'leader_replica_uuid'. |
| auto get_health_reports = [&](map<string, HealthReportPB>* reports, |
| string* leader_replica_uuid = nullptr) { |
| ConsensusStatePB cstate; |
| string leader_uuid; |
| for (const auto& replica : tablet_replicas) { |
| RaftConsensus* consensus = CHECK_NOTNULL(replica->consensus()); |
| ConsensusStatePB cs; |
| Status s = consensus->ConsensusState(&cs, INCLUDE_HEALTH_REPORT); |
| if (!s.ok()) { |
| ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); // Replica is shut down. |
| continue; |
| } |
| if (consensus->peer_uuid() == cs.leader_uuid()) { |
| // Only the leader replica has the up-to-date health report. |
| leader_uuid = cs.leader_uuid(); |
| cstate = std::move(cs); |
| break; |
| } |
| } |
| ASSERT_FALSE(leader_uuid.empty()); |
| ASSERT_TRUE(cstate.has_committed_config()); |
| const RaftConfigPB& config = cstate.committed_config(); |
| ASSERT_EQ(kNumReplicas, config.peers_size()); |
| if (reports) { |
| reports->clear(); |
| for (const auto& peer : config.peers()) { |
| ASSERT_TRUE(peer.has_health_report()); |
| reports->emplace(peer.permanent_uuid(), peer.health_report()); |
| } |
| } |
| if (leader_replica_uuid) { |
| *leader_replica_uuid = leader_uuid; |
| } |
| }; |
| |
| // Get the information on committed Raft configuration from tablet reports |
| // generated by the heartbeater of the server running the specified leader |
| // tablet replica. |
| auto get_committed_config_from_reports = [&](const string& leader_replica_uuid, |
| RaftConfigPB* config) { |
| TabletServer* leader_server = nullptr; |
| for (auto i = 0; i < kNumReplicas; ++i) { |
| MiniTabletServer* mts = cluster_->mini_tablet_server(i); |
| if (mts->uuid() == leader_replica_uuid) { |
| leader_server = mts->server(); |
| break; |
| } |
| } |
| ASSERT_NE(nullptr, leader_server); |
| |
| // TSTabletManager should acknowledge the status change via tablet reports. |
| Heartbeater* heartbeater = leader_server->heartbeater(); |
| ASSERT_NE(nullptr, heartbeater); |
| |
| vector<TabletReportPB> reports; |
| NO_FATALS(GetIncrementalTabletReports(heartbeater, &reports)); |
| ASSERT_EQ(1, reports.size()); |
| const TabletReportPB& report = reports[0]; |
| SCOPED_TRACE("Tablet report: " + pb_util::SecureDebugString(report)); |
| ASSERT_EQ(1, report.updated_tablets_size()); |
| const ReportedTabletPB& reported_tablet = report.updated_tablets(0); |
| ASSERT_TRUE(reported_tablet.has_consensus_state()); |
| const ConsensusStatePB& cstate = reported_tablet.consensus_state(); |
| ASSERT_EQ(RaftPeerPB::LEADER, GetConsensusRole(leader_replica_uuid, cstate)); |
| ASSERT_TRUE(cstate.has_committed_config()); |
| RaftConfigPB cfg = cstate.committed_config(); |
| config->Swap(&cfg); |
| }; |
| |
| // All replicas are up and running, so the leader replica should eventually |
| // report their health status as HEALTHY. |
| { |
| string leader_replica_uuid; |
| ASSERT_EVENTUALLY(([&] { |
| map<string, HealthReportPB> reports; |
| NO_FATALS(get_health_reports(&reports, &leader_replica_uuid)); |
| for (const auto& e : reports) { |
| SCOPED_TRACE("replica UUID: " + e.first); |
| ASSERT_EQ(HealthReportPB::HEALTHY, e.second.overall_health()); |
| } |
| })); |
| |
| // Other replicas are seen by the leader in UNKNOWN health state first. |
| // At this point of the test scenario, since the replicas went from the |
| // UNKNOWN to the HEALTHY state, an incremental tablet reports should |
| // reflect those health status changes. |
| RaftConfigPB config; |
| NO_FATALS(get_committed_config_from_reports(leader_replica_uuid, |
| &config)); |
| ASSERT_EQ(kNumReplicas, config.peers_size()); |
| for (const auto& p : config.peers()) { |
| ASSERT_TRUE(p.has_health_report()); |
| const HealthReportPB& report(p.health_report()); |
| ASSERT_EQ(HealthReportPB::HEALTHY, report.overall_health()); |
| } |
| } |
| |
| // Inject an error to the replica and make sure its status is eventually |
| // reported as FAILED. |
| string failed_replica_uuid; |
| { |
| auto replica = tablet_replicas.front(); |
| failed_replica_uuid = replica->consensus()->peer_uuid(); |
| replica->SetError(Status::IOError("INJECTED ERROR: tablet failed")); |
| replica->Shutdown(); |
| ASSERT_EQ(tablet::FAILED, replica->state()); |
| } |
| |
| ASSERT_EVENTUALLY(([&] { |
| map<string, HealthReportPB> reports; |
| NO_FATALS(get_health_reports(&reports)); |
| for (const auto& e : reports) { |
| const auto& replica_uuid = e.first; |
| SCOPED_TRACE("replica UUID: " + replica_uuid); |
| if (replica_uuid == failed_replica_uuid) { |
| ASSERT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, e.second.overall_health()); |
| } else { |
| ASSERT_EQ(HealthReportPB::HEALTHY, e.second.overall_health()); |
| } |
| } |
| })); |
| |
| // The scenario below assumes the leader replica does not change anymore. |
| FLAGS_enable_leader_failure_detection = false; |
| |
| { |
| string leader_replica_uuid; |
| NO_FATALS(get_health_reports(nullptr, &leader_replica_uuid)); |
| RaftConfigPB config; |
| NO_FATALS(get_committed_config_from_reports(leader_replica_uuid, |
| &config)); |
| for (const auto& peer : config.peers()) { |
| ASSERT_TRUE(peer.has_permanent_uuid()); |
| const auto& uuid = peer.permanent_uuid(); |
| ASSERT_TRUE(peer.has_health_report()); |
| const HealthReportPB& report(peer.health_report()); |
| if (uuid == failed_replica_uuid) { |
| EXPECT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, report.overall_health()); |
| } else { |
| EXPECT_EQ(HealthReportPB::HEALTHY, report.overall_health()); |
| } |
| } |
| } |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |