[master] KUDU-3389 support turning on/off auto rebalancer at runtime

kudu-master was able to run the auto-rebalancer, but toggling it on/off
in runtime hadn't been supported. This patch use the existing
--auto_rebalancing_enabled flag and support be set it in runtime via the
SetFlag() RPC, toggling the auto-rebalancer on/off dynamically.

Change-Id: I0ecee5d8b0b2344c3ad190548526ed6a2551cfdb
Reviewed-on: http://gerrit.cloudera.org:8080/18923
Reviewed-by: Yuqi Du <shenxingwuying@gmail.com>
Tested-by: Kudu Jenkins
Reviewed-by: Yingchun Lai <acelyc1112009@gmail.com>
Reviewed-by: Mahesh Reddy <mreddy@cloudera.com>
Reviewed-by: Alexey Serbin <alexey@apache.org>
diff --git a/src/kudu/master/auto_rebalancer-test.cc b/src/kudu/master/auto_rebalancer-test.cc
index cf57511..abc4070 100644
--- a/src/kudu/master/auto_rebalancer-test.cc
+++ b/src/kudu/master/auto_rebalancer-test.cc
@@ -22,6 +22,7 @@
 #include <optional>
 #include <set>
 #include <string>
+#include <type_traits>
 #include <unordered_map>
 #include <unordered_set>
 #include <vector>
@@ -30,7 +31,6 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/client/client.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
@@ -282,6 +282,33 @@
   }
 }
 
+// Make sure the auto-rebalancing can be toggled on/off in runtime.
+TEST_F(AutoRebalancerTest, AutoRebalancingTurnOffAndOn) {
+  cluster_opts_.num_masters = 1;
+  cluster_opts_.num_tablet_servers = 3;
+  ASSERT_OK(CreateAndStartCluster());
+  NO_FATALS(CheckAutoRebalancerStarted());
+
+  CreateWorkloadTable(8, /*num_replicas*/ 3);
+  int leader_idx;
+  ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+  ASSERT_EQ(0, leader_idx);
+
+  FLAGS_auto_rebalancing_enabled = false;
+  ASSERT_OK(cluster_->AddTabletServer());
+  for (int i = 0; i < 3; i++) {
+    // Wait a schedule period, 1s.
+    SleepFor(MonoDelta::FromSeconds(FLAGS_auto_rebalancing_interval_seconds));
+    ASSERT_EQ(0, NumMovesScheduled(leader_idx));
+  }
+  int num_iterations = NumLoopIterations(leader_idx);
+  SleepFor(MonoDelta::FromSeconds(FLAGS_auto_rebalancing_interval_seconds * 2));
+  ASSERT_EQ(num_iterations, NumLoopIterations(leader_idx));
+
+  FLAGS_auto_rebalancing_enabled = true;
+  CheckSomeMovesScheduled();
+}
+
 // If the leader master goes down, the next elected master should perform
 // auto-rebalancing.
 TEST_F(AutoRebalancerTest, NextLeaderResumesAutoRebalancing) {
diff --git a/src/kudu/master/auto_rebalancer.cc b/src/kudu/master/auto_rebalancer.cc
index 18eae7e..faa9a2c 100644
--- a/src/kudu/master/auto_rebalancer.cc
+++ b/src/kudu/master/auto_rebalancer.cc
@@ -24,6 +24,7 @@
 #include <ostream>
 #include <random>
 #include <string>
+#include <type_traits>
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
@@ -135,6 +136,8 @@
               "How long to wait before checking to see if the scheduled replica movement "
               "in this iteration of auto-rebalancing has completed.");
 
+DECLARE_bool(auto_rebalancing_enabled);
+
 namespace kudu {
 
 namespace master {
@@ -190,7 +193,12 @@
   vector<Rebalancer::ReplicaMove> replica_moves;
   while (!shutdown_.WaitFor(
       MonoDelta::FromSeconds(FLAGS_auto_rebalancing_interval_seconds))) {
-
+    if (!FLAGS_auto_rebalancing_enabled) {
+      // Toggling the auto-rebalancer on/off by changing FLAGS_auto_rebalancing_enabled,
+      // will take effect in the next loop. Already scheduled/running replica moves will
+      // be unaffected.
+      continue;
+    }
     // If catalog manager isn't initialized or isn't the leader, don't do rebalancing.
     // Putting the auto-rebalancer to sleep shouldn't affect the master's ability
     // to become the leader. When the thread wakes up and discovers it is now
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 0772be7..76397ee 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -346,6 +346,7 @@
             "Whether auto-rebalancing is enabled.");
 TAG_FLAG(auto_rebalancing_enabled, advanced);
 TAG_FLAG(auto_rebalancing_enabled, experimental);
+TAG_FLAG(auto_rebalancing_enabled, runtime);
 
 DEFINE_uint32(table_locations_cache_capacity_mb, 0,
               "Capacity for the table locations cache (in MiB); a value "
@@ -1021,12 +1022,9 @@
   RETURN_NOT_OK_PREPEND(sys_catalog_->WaitUntilRunning(),
                         "Failed waiting for the catalog tablet to run");
 
-  if (FLAGS_auto_rebalancing_enabled) {
-    unique_ptr<AutoRebalancerTask> task(
-        new AutoRebalancerTask(this, master_->ts_manager()));
-    RETURN_NOT_OK_PREPEND(task->Init(), "failed to initialize auto-rebalancing task");
-    auto_rebalancer_ = std::move(task);
-  }
+  unique_ptr<AutoRebalancerTask> task(new AutoRebalancerTask(this, master_->ts_manager()));
+  RETURN_NOT_OK_PREPEND(task->Init(), "failed to initialize auto-rebalancing task");
+  auto_rebalancer_ = std::move(task);
 
   vector<HostPort> master_addresses;
   RETURN_NOT_OK(master_->GetMasterHostPorts(&master_addresses));