[test] Fix flakiness with detecting a master in FAILED_UNRECOVERABLE state

Observed one test failure on the flakiness dashboard in the
dynamic_multi_master test where system catalog WAL is expected to be
GC'ed and hence new master can't be caught up from WAL which should
result in the new master going to FAILED_UNRECOVERABLE state.
However the new master gets caught up from WAL and is in HEALTHY
state.

Expected: consensus::HealthReportPB::FAILED_UNRECOVERABLE
  Which is: 2
To be equal to: peer.health_report().overall_health()
  Which is: 1

This is because the check for system catalog WAL was made against
master index 0 which may not be the leader master and that's what
happened in case of the test failure.

The fix makes the GC count check against all masters as master
leadership could change.

This change also includes another place where master index 0 was used
and it'd be good to explicitly use the leader index to future-proof
it in case order of validation is changed.

Change-Id: Id6017f1601eaed22be28c8a5babd6e35e93b1d2e
Reviewed-on: http://gerrit.cloudera.org:8080/17089
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <awong@cloudera.com>
diff --git a/src/kudu/master/dynamic_multi_master-test.cc b/src/kudu/master/dynamic_multi_master-test.cc
index 801ba07..2bb6e46 100644
--- a/src/kudu/master/dynamic_multi_master-test.cc
+++ b/src/kudu/master/dynamic_multi_master-test.cc
@@ -153,9 +153,10 @@
     // for starting the new master.
     NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, master_hps));
 
-    auto get_sys_catalog_wal_gc_count = [&] {
+    // Function to fetch the GC count of the system catalog WAL.
+    auto get_sys_catalog_wal_gc_count = [&] (int master_idx) {
       int64_t sys_catalog_wal_gc_count = 0;
-      CHECK_OK(itest::GetInt64Metric(cluster_->master(0)->bound_http_hostport(),
+      CHECK_OK(itest::GetInt64Metric(cluster_->master(master_idx)->bound_http_hostport(),
                                      &METRIC_ENTITY_tablet,
                                      master::SysCatalogTable::kSysCatalogTabletId,
                                      &METRIC_log_gc_duration,
@@ -163,36 +164,52 @@
                                      &sys_catalog_wal_gc_count));
       return sys_catalog_wal_gc_count;
     };
-    const int64_t orig_gc_count = get_sys_catalog_wal_gc_count();
-    int64_t updated_gc_count;
+
+    vector<int64_t> orig_gc_count(orig_num_masters_);
+    for (int master_idx = 0; master_idx < orig_num_masters_; master_idx++) {
+      orig_gc_count[master_idx] = get_sys_catalog_wal_gc_count(master_idx);
+    }
+
+    // Function to compute whether all masters have updated the system catalog WAL GC count.
+    // Ideally we could just check against the leader master but the leader master could
+    // potentially change hence checking across all masters.
+    auto all_masters_updated_wal_gc_count = [&] {
+      int num_masters_gc_updated = 0;
+      for (int master_idx = 0; master_idx < orig_num_masters_; master_idx++) {
+        if (get_sys_catalog_wal_gc_count(master_idx) > orig_gc_count[master_idx]) {
+          num_masters_gc_updated++;
+        }
+      }
+      return num_masters_gc_updated == orig_num_masters_;
+    };
 
     // Create a bunch of tables to ensure sys catalog WAL gets GC'ed.
     // Need to create around 1k tables even with lowest flush threshold and log segment size.
     int i;
+    bool wal_gc_counts_updated = false;
     for (i = 1; i < 1000; i++) {
-      updated_gc_count = get_sys_catalog_wal_gc_count();
-      if (updated_gc_count > orig_gc_count) {
+      if (all_masters_updated_wal_gc_count()) {
+        wal_gc_counts_updated = true;
         break;
       }
       string table_name = Substitute("Table.$0.$1", CURRENT_TEST_NAME(), std::to_string(i));
       ASSERT_OK(CreateTable(cluster_.get(), table_name));
     }
     LOG(INFO) << "Number of tables created: " << i - 1;
-    if (updated_gc_count > orig_gc_count) {
+    if (wal_gc_counts_updated) {
       // We are done here and no need to wait further.
       return;
     }
 
     MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(2);
     while (MonoTime::Now() < deadline) {
-      updated_gc_count = get_sys_catalog_wal_gc_count();
-      if (updated_gc_count > orig_gc_count) {
+      if (all_masters_updated_wal_gc_count()) {
+        wal_gc_counts_updated = true;
         break;
       }
       SleepFor(MonoDelta::FromMilliseconds(100));
     }
-    ASSERT_GT(updated_gc_count, orig_gc_count)
-      << "Timed out waiting for system catalog WAL to be GC'ed";
+    ASSERT_TRUE(wal_gc_counts_updated) << "Timed out waiting for system catalog WAL to be GC'ed";
   }
 
   // Functor that takes a leader_master_idx and runs the desired master RPC against
@@ -675,8 +692,9 @@
   });
 
   // Double check by directly contacting the new master.
-  VerifyNewMasterDirectly({ consensus::RaftPeerPB::FOLLOWER, consensus::RaftPeerPB::LEADER },
-                          consensus::RaftPeerPB::VOTER);
+  NO_FATALS(VerifyNewMasterDirectly(
+      { consensus::RaftPeerPB::FOLLOWER, consensus::RaftPeerPB::LEADER },
+      consensus::RaftPeerPB::VOTER));
 
   // Adding the same master again should print a message but not throw an error.
   {
@@ -753,7 +771,7 @@
                                 "-clean_unsafe"}));
 
 
-  // Copy from remote system catalog
+  // Copy from remote system catalog from any existing master.
   LOG(INFO) << "Copying from remote master: "
     << cluster_->master(0)->bound_rpc_hostport().ToString();
   ASSERT_OK(tools::RunKuduTool({"local_replica", "copy_from_remote",
@@ -1142,18 +1160,23 @@
   vector<HostPort> master_hps;
   NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
 
+  // Master leadership transfer could result in a different error and hence disabling it.
+  NO_FATALS(DisableMasterLeadershipTransfer());
+
   // Random uuid
   Random rng(SeedRandom());
   auto random_uuid = std::to_string(rng.Next64());
-  auto master_to_remove = cluster_->master(0)->bound_rpc_hostport();
-  ASSERT_NE(random_uuid, cluster_->master(0)->uuid());
+  int non_leader_idx = -1;
+  ASSERT_OK(GetFollowerMasterIndex(&non_leader_idx));
+  auto master_to_remove = cluster_->master(non_leader_idx)->bound_rpc_hostport();
+  ASSERT_NE(random_uuid, cluster_->master(non_leader_idx)->uuid());
   string err;
   Status actual = RemoveMasterFromClusterUsingCLITool(master_to_remove, &err, random_uuid);
   ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
   ASSERT_STR_CONTAINS(err,
                       Substitute("Mismatch in UUID of the master $0 to be removed. "
                                  "Expected: $1, supplied: $2.", master_to_remove.ToString(),
-                                 cluster_->master(0)->uuid(), random_uuid));
+                                 cluster_->master(non_leader_idx)->uuid(), random_uuid));
 
   // Verify no change in number of masters.
   NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));