[tests] fix flakiness in TestFailDuringScanWorkload

This patch fixes flakiness in the
TabletServerDiskErrorITest.TestFailDuringScanWorkload scenario.
There was a prior attempt to make the scenario more stable [1],
but that hadn't ruled out sporadic test failures due to
  * various scheduler anomalies
  * random distribution of replicas chosen by client to read data from

With that, the scenario was failing in about 1 out of 10 runs for
RELEASE and ASAN builds [2].

To eliminate the flakiness, it's necessary to make sure that
  * the dedicated tablet server ends up with at least one replica
    from which the client tries to fetch the data
  * scan requests arrive to tablet replicas hosted by the dedicated
    tablet server only after IO failures have been injected
This patch does so by
  * having more control over the selection of tablet replicas that
    client sends scan requests to
  * starting the scan operation only after injecting IO errors

To verify the fix, I ran the test scenario built in ASAN configuration
with and without this patch.  Without this patch, 96 out of 1024 runs
failed [3].  With the patch applied, 0 out of 1024 runs failed [4].

[1] https://github.com/apache/kudu/commit/ccbbfb3006314f2c37f3a40bfec355db9fc90e02
[2] http://dist-test.cloudera.org:8080/test_drilldown?test_name=disk_failure-itest
[3] http://dist-test.cloudera.org/job?job_id=aserbin.1662847551.105230
[4] http://dist-test.cloudera.org/job?job_id=aserbin.1662873124.94488

Change-Id: Ia29bfdc9761139426344532bab3e5d0b3c1b12ad
Reviewed-on: http://gerrit.cloudera.org:8080/18967
Reviewed-by: Yifan Zhang <chinazhangyifan@163.com>
Tested-by: Yifan Zhang <chinazhangyifan@163.com>
diff --git a/src/kudu/integration-tests/disk_failure-itest.cc b/src/kudu/integration-tests/disk_failure-itest.cc
index 7b0544f..3cbcddc 100644
--- a/src/kudu/integration-tests/disk_failure-itest.cc
+++ b/src/kudu/integration-tests/disk_failure-itest.cc
@@ -21,6 +21,7 @@
 #include <ostream>
 #include <string>
 #include <tuple>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -50,7 +51,9 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+METRIC_DECLARE_entity(server);
 METRIC_DECLARE_entity(tablet);
+METRIC_DECLARE_gauge_int32(num_raft_leaders);
 METRIC_DECLARE_gauge_size(num_rowsets_on_disk);
 METRIC_DECLARE_gauge_uint64(data_dirs_failed);
 METRIC_DECLARE_gauge_uint32(tablets_num_failed);
@@ -218,7 +221,7 @@
 // A generalized test for different kinds of disk errors in tablet servers.
 class TabletServerDiskErrorITest : public DiskErrorITestBase {
  public:
-  const int kNumTablets = 10;
+  static constexpr int kNumTablets = 10;
 
   // Set up a cluster with 4 tservers, with `kNumTablets` spread across the
   // first three tservers. This ensures that injecting failures into any of the
@@ -227,7 +230,7 @@
   // Also configure the cluster to not delete or copy tablets, even on error.
   // This allows us to check all tablets are failed appropriately.
   void SetUp() override {
-    const int kNumRows = 10000;
+    constexpr int kNumRows = 10000;
     ExternalMiniClusterOptions opts;
     // Use 3 tservers at first; we'll add an empty one later.
     opts.num_tablet_servers = 3;
@@ -367,21 +370,60 @@
 };
 
 TEST_P(TabletServerDiskErrorITest, TestFailDuringScanWorkload) {
-  // Set up a workload that only reads from the tablets.
+  // Make one server to be more likely to host leader replicas: its Raft
+  // heartbeat interval is much shorter than the others, so it's about to start
+  // leader elections faster when not hearing from former leaders.
+  ExternalTabletServer* error_ts = cluster_->tablet_server(0);
+  error_ts->mutable_flags()->emplace_back("--raft_heartbeat_interval_ms=50");
+  error_ts->Shutdown();
+  ASSERT_OK(error_ts->Restart());
+
+  // Make other servers to be less likely to host leader replicas: their Raft
+  // heartbeat interval is much longer than that of 'error_ts', so they detect
+  // leader failures not so fast as replicas hosted at 'error_ts'.
+  for (auto i = 1; i < cluster_->num_tablet_servers(); ++i) {
+    auto* ts = cluster_->tablet_server(i);
+    ts->mutable_flags()->emplace_back("--raft_heartbeat_interval_ms=250");
+    ts->Shutdown();
+    // Let the rest tablet servers to elect new leaders for the tablets that had
+    // former leaders at the tablet server that hash just been shutdown.
+    SleepFor(MonoDelta::FromSeconds(1));
+    ASSERT_OK(ts->Restart());
+  }
+
+  // Inject errors into every IO operation at one of the servers.
+  ASSERT_OK(SetFlags(error_ts, InjectionFlags(GetParam(), error_ts)));
+
+  // Make sure 'ts_error' hosts at least one leader replica for some tablet
+  // that the read workload is about to fetch data from. The only existing table
+  // in the cluster is the one created by TabletServerDiskErrorITest::SetUp().
+  ASSERT_EVENTUALLY([&] {
+    int64_t num_leaders = 0;
+    ASSERT_OK(itest::GetInt64Metric(cluster_->tablet_server_by_uuid(
+        error_ts->uuid())->bound_http_hostport(),
+        &METRIC_ENTITY_server,
+        nullptr,
+        &METRIC_num_raft_leaders,
+        "value",
+        &num_leaders));
+    ASSERT_GT(num_leaders, 0);
+  });
+
+  // Now start a workload that reads only from leader replicas. Some leader
+  // replicas should be hosted by 'error_ts', and each attempted scan is going
+  // to fail due to the injected errors. So, at least one tablet replica
+  // is about to be marked as failed at 'error_ts' as a result.
   TestWorkload read(cluster_.get());
   read.set_num_write_threads(0);
   read.set_num_read_threads(1);
+  read.set_scanner_selection(KuduClient::ReplicaSelection::LEADER_ONLY);
   read.Setup();
   read.Start();
 
-  // Inject the errors into one of the non-empty servers.
-  ExternalTabletServer* error_ts = cluster_->tablet_server(0);
-  ASSERT_OK(SetFlags(error_ts, InjectionFlags(GetParam(), error_ts)));
-
-  // Wait for some of the tablets to reach a failed state.
-  // We can't wait for all of the tablets in this case because
-  // some may not be scanned and will therefore not be marked as failed.
+  // Wait for at least one tablet replica to reach failed state.
   NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets, /*require_all_fail*/false));
+
+  // Remove all the injected IO failures.
   ASSERT_OK(AllowRecovery());
   NO_FATALS(read.StopAndJoin());