KUDU-3113: fix auto-rebalancer move execution

When executing moves, the auto-rebalancer would try to resolve the
leader's address by passing its UUID instead of its host. This fixes it
to use an appropriate host.

This includes some light cleanup, and updates auto_rebalancer-test to
verify the moves lead to the copying of bytes on tablet servers.

The following flakiness is also addressed:
- NoRebalancingIfReplicasRecovering would sometimes schedule some moves
  before shutting down the tablet server, and we'd time out waiting to
  iterate without scheduling moves. I adjusted the ordering of the
  shutdown so the rebalancer doesn't get a chance to schedule moves.
- Rarely, TestHandlingFailedTservers would see a different error than
  expected when checking for failed sent RPCs. I updated the test to
  expect a couple of messages.

I looped auto_rebalancer-test in DEBUG mode and it pased 1000/1000
times, compared to failing 4/10 times with the change to actually
execute moves.

I also validated this on a real cluster:
- First, I enabled auto-rebalancing on the master.
- I put a tablet server into maintenance mode.
- I then moved all replicas off the tablet server using the rebalancer
  tool's --move_replicas_from_ignored_tservers option.
- I verified that even with the significant skew, since one of the
  tablet servers was in maintenance mode (i.e. unavailable for
  placement), the master didn't automatically move any replicas.
- Once I took the tablet server out of maintenance mode, moves were
  scheduled to repopulate it.
- Steady state was reached with a cluster skew of 1.

Change-Id: If658997dc9bcb709c27d981db56cf2db13ba235f
Reviewed-on: http://gerrit.cloudera.org:8080/15850
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/src/kudu/master/auto_rebalancer-test.cc b/src/kudu/master/auto_rebalancer-test.cc
index 1000237..cd3ce57 100644
--- a/src/kudu/master/auto_rebalancer-test.cc
+++ b/src/kudu/master/auto_rebalancer-test.cc
@@ -21,13 +21,16 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/test_workload.h"
@@ -52,6 +55,7 @@
 using std::set;
 using std::string;
 using std::unique_ptr;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -66,6 +70,8 @@
 DECLARE_uint32(auto_rebalancing_wait_for_replica_moves_seconds);
 
 METRIC_DECLARE_gauge_int32(tablet_copy_open_client_sessions);
+METRIC_DECLARE_counter(tablet_copy_bytes_fetched);
+METRIC_DECLARE_counter(tablet_copy_bytes_sent);
 
 namespace {
 
@@ -161,8 +167,7 @@
   void CheckNoMovesScheduled() {
     int leader_idx;
     ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
-    auto initial_loop_iterations = NumLoopIterations(leader_idx);
-
+    const auto initial_loop_iterations = NumLoopIterations(leader_idx);
     ASSERT_EVENTUALLY([&] {
       ASSERT_LT(initial_loop_iterations + 3, NumLoopIterations(leader_idx));
       ASSERT_EQ(0, NumMovesScheduled(leader_idx));
@@ -172,14 +177,45 @@
   void CheckSomeMovesScheduled() {
     int leader_idx;
     ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
-    auto initial_loop_iterations = NumLoopIterations(leader_idx);
-
+    const auto initial_loop_iterations = NumLoopIterations(leader_idx);
     ASSERT_EVENTUALLY([&] {
       ASSERT_LT(initial_loop_iterations, NumLoopIterations(leader_idx));
       ASSERT_LT(0, NumMovesScheduled(leader_idx));
     });
   }
 
+  // Maps from tserver UUID to the bytes sent and fetched by each tserver as a
+  // part of tablet copying.
+  typedef unordered_map<string, int> MetricByUuid;
+  MetricByUuid GetCountersByUuid(CounterPrototype* prototype) const {
+    MetricByUuid metric_by_uuid;
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      const auto& ts = cluster_->mini_tablet_server(i);
+      scoped_refptr<Counter> metric =
+          prototype->Instantiate(ts->server()->metric_entity());
+      EmplaceOrDie(&metric_by_uuid, ts->uuid(), metric->value());
+    }
+    return metric_by_uuid;
+  }
+  MetricByUuid GetBytesSentByTServer() const {
+    return GetCountersByUuid(&METRIC_tablet_copy_bytes_sent);
+  }
+  MetricByUuid GetBytesFetchedByTServer() const {
+    return GetCountersByUuid(&METRIC_tablet_copy_bytes_fetched);
+  }
+
+  // Returns an aggregate of the counts in 'bytes_by_uuid' for tablet servers
+  // with indices in the range ['start_ts_idx', 'end_ts_idx').
+  int AggregateMetricCounts(const MetricByUuid& bytes_by_uuid,
+                            int start_ts_idx, int end_ts_idx) {
+    int ret = 0;
+    for (int i = start_ts_idx; i < end_ts_idx; i++) {
+      const auto& uuid = cluster_->mini_tablet_server(i)->uuid();
+      ret += FindOrDie(bytes_by_uuid, uuid);
+    }
+    return ret;
+  }
+
   int NumLoopIterations(int master_idx) {
     DCHECK(cluster_ != nullptr);
     return cluster_->mini_master(master_idx)->master()->catalog_manager()->
@@ -278,18 +314,36 @@
 // Bring up another tserver, and verify that moves are scheduled,
 // since the cluster is no longer balanced.
 TEST_F(AutoRebalancerTest, MovesScheduledIfAddTserver) {
-  const int kNumTservers = 3;
+  const int kNumTServers = 3;
   const int kNumTablets = 2;
-  cluster_opts_.num_tablet_servers = kNumTservers;
+  cluster_opts_.num_tablet_servers = kNumTServers;
   ASSERT_OK(CreateAndStartCluster());
   NO_FATALS(CheckAutoRebalancerStarted());
 
   SetupWorkLoad(kNumTablets, /*num_replicas*/3);
 
   NO_FATALS(CheckNoMovesScheduled());
+  // Take a snapshot of the number of copy sources started on the original
+  // tablet servers.
+  const int initial_bytes_sent_in_orig_tservers =
+      AggregateMetricCounts(GetBytesSentByTServer(), 0, kNumTServers);
 
+  // Add a tablet server and verify that the master schedules some moves, and
+  // the tablet servers copy bytes as appropriate.
   ASSERT_OK(cluster_->AddTabletServer());
   NO_FATALS(CheckSomeMovesScheduled());
+  ASSERT_EVENTUALLY([&] {
+    int bytes_sent_in_orig_tservers =
+        AggregateMetricCounts(GetBytesSentByTServer(), 0, kNumTServers);
+    ASSERT_GT(bytes_sent_in_orig_tservers, initial_bytes_sent_in_orig_tservers);
+  });
+  // Our new tablet servers should start fetching data as well.
+  ASSERT_EVENTUALLY([&] {
+    int bytes_fetched_in_new_tservers =
+        AggregateMetricCounts(GetBytesFetchedByTServer(), kNumTServers,
+                              kNumTServers + 1);
+    ASSERT_GT(bytes_fetched_in_new_tservers, 0);
+  });
 }
 
 // A cluster with no tservers is balanced.
@@ -392,27 +446,49 @@
 
   SetupWorkLoad(kNumTablets, /*num_replicas*/3);
   workload_->Start();
-  ASSERT_EVENTUALLY([&]() {
-      ASSERT_LE(1000, workload_->rows_inserted());
-  });
+  while (workload_->rows_inserted() < 1000) {
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
   workload_->StopAndJoin();
 
+  // Take a snapshot of the number of copy sources started on the original
+  // tablet servers.
+  const int initial_bytes_sent_in_orig_tservers =
+      AggregateMetricCounts(GetBytesSentByTServer(), 0, kNumOrigTservers);
+
   // Bring up additional tservers, to trigger tablet replica copying.
   for (int i = 0; i < kNumAdditionalTservers; ++i) {
     ASSERT_OK(cluster_->AddTabletServer());
   }
+
+  // At some point we should start scheduling moves and see some copy source
+  // sessions start on the original tablet servers.
   NO_FATALS(CheckSomeMovesScheduled());
+  ASSERT_EVENTUALLY([&] {
+    int bytes_sent_in_orig_tservers =
+        AggregateMetricCounts(GetBytesSentByTServer(), 0, kNumOrigTservers);
+    ASSERT_GT(bytes_sent_in_orig_tservers, initial_bytes_sent_in_orig_tservers);
+  });
+  // Our new tablet servers should start fetching data as well.
+  ASSERT_EVENTUALLY([&] {
+    int bytes_fetched_in_new_tservers =
+        AggregateMetricCounts(GetBytesFetchedByTServer(), kNumOrigTservers,
+                              kNumOrigTservers + kNumAdditionalTservers);
+    ASSERT_GT(bytes_fetched_in_new_tservers, 0);
+  });
 
   // Check metric 'tablet_copy_open_client_sessions', which must be
   // less than the auto_rebalancing_max_moves_per_server, for each tserver.
-  for (int i = 0; i < kNumOrigTservers + kNumAdditionalTservers; ++i) {
-    auto metric = cluster_->mini_tablet_server(i)->server()->metric_entity();
+  MetricByUuid open_copy_clients_by_uuid;
+  for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
+    const auto& ts = cluster_->mini_tablet_server(i);
     int open_client_sessions = METRIC_tablet_copy_open_client_sessions.
-        Instantiate(metric, 0)->value();
-
-    // Check that the total number of sessions does not excceed the flag.
-    ASSERT_GE(FLAGS_auto_rebalancing_max_moves_per_server, open_client_sessions);
+        Instantiate(ts->server()->metric_entity(), 0)->value();
+    EmplaceOrDie(&open_copy_clients_by_uuid, ts->uuid(), open_client_sessions);
   }
+  // The average number of moves per tablet server should not exceed that specified.
+  ASSERT_GE(FLAGS_auto_rebalancing_max_moves_per_server * cluster_->num_tablet_servers(),
+      AggregateMetricCounts(open_copy_clients_by_uuid, 0, cluster_->num_tablet_servers()));
 }
 
 // Attempt rebalancing a cluster with unstable Raft configurations.
@@ -420,10 +496,10 @@
   // Set a low Raft heartbeat.
   FLAGS_raft_heartbeat_interval_ms = kLowRaftTimeout;
 
-  const int kNumTservers = 3;
+  const int kNumTServers = 3;
   const int kNumTablets = 3;
 
-  cluster_opts_.num_tablet_servers = kNumTservers;
+  cluster_opts_.num_tablet_servers = kNumTServers;
   ASSERT_OK(CreateAndStartCluster());
   NO_FATALS(CheckAutoRebalancerStarted());
 
@@ -433,11 +509,28 @@
   // frequent leader re-elections.
   FLAGS_consensus_inject_latency_ms_in_notifications = kLowRaftTimeout;
 
-  // Bring up an additional tserver, to trigger rebalancing.
+  // Take a snapshot of the number of copy sources started on the original
+  // tablet servers.
+  const int initial_bytes_sent_in_orig_tservers =
+      AggregateMetricCounts(GetBytesSentByTServer(), 0, kNumTServers);
+
+  // Bring up an additional tserver to trigger rebalancing.
   // Moves should be scheduled, even if they cannot be completed because of
   // the frequent leadership changes.
   ASSERT_OK(cluster_->AddTabletServer());
   NO_FATALS(CheckSomeMovesScheduled());
+  // At some point, the move should succeed though.
+  ASSERT_EVENTUALLY([&] {
+    int bytes_sent_in_orig_tservers =
+        AggregateMetricCounts(GetBytesSentByTServer(), 0, kNumTServers);
+    ASSERT_GT(bytes_sent_in_orig_tservers, initial_bytes_sent_in_orig_tservers);
+  });
+  ASSERT_EVENTUALLY([&] {
+    int bytes_fetched_in_new_tservers =
+        AggregateMetricCounts(GetBytesFetchedByTServer(), kNumTServers,
+                              cluster_->num_tablet_servers());
+    ASSERT_GT(bytes_fetched_in_new_tservers, 0);
+  });
 }
 
 // A cluster that cannot become healthy and meet the replication factor
@@ -476,12 +569,12 @@
 
   SetupWorkLoad(kNumTablets, /*num_replicas*/3);
 
-  // Add another tserver; immediately take down one of the original tservers.
-  ASSERT_OK(cluster_->AddTabletServer());
+  // Kill a tablet server so when we add a tablet server, auto-rebalancing will
+  // do nothing. Also wait a bit until the tserver is deemed unresponsive so
+  // the auto-rebalancer will not attempt to rebalance anything.
   NO_FATALS(cluster_->mini_tablet_server(0)->Shutdown());
-  // Wait for recovery re-replication to start.
-  SleepFor(MonoDelta::FromSeconds(
-      FLAGS_follower_unavailable_considered_failed_sec));
+  SleepFor(MonoDelta::FromMilliseconds(FLAGS_tserver_unresponsive_timeout_ms));
+  ASSERT_OK(cluster_->AddTabletServer());
 
   // No rebalancing should occur while there are under-replicated replicas.
   NO_FATALS(CheckNoMovesScheduled());
@@ -521,8 +614,11 @@
     // tservers to the new one.
     NO_FATALS(CheckSomeMovesScheduled());
   }
-  ASSERT_STRINGS_ANY_MATCH(pre_capture_logs.logged_msgs(),
-                           "scheduled replica move failed to complete: Network error");
+  {
+    SCOPED_TRACE(JoinStrings(pre_capture_logs.logged_msgs(), "\n"));
+    ASSERT_STRINGS_ANY_MATCH(pre_capture_logs.logged_msgs(),
+        "scheduled replica move failed to complete|failed to send replica move request");
+  }
 
   // Wait for the TSManager to realize that the original tservers are unavailable.
   FLAGS_tserver_unresponsive_timeout_ms = 5 * 1000;
@@ -546,5 +642,6 @@
     ASSERT_STR_NOT_CONTAINS(str, "scheduled replica move failed to complete: Network error");
   }
 }
+
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/auto_rebalancer.cc b/src/kudu/master/auto_rebalancer.cc
index 3c48797..0509652 100644
--- a/src/kudu/master/auto_rebalancer.cc
+++ b/src/kudu/master/auto_rebalancer.cc
@@ -34,6 +34,7 @@
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/consensus/metadata.pb.h"
@@ -55,6 +56,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
@@ -77,6 +79,7 @@
 using kudu::consensus::MODIFY_PEER;
 using kudu::consensus::RaftPeerPB;
 using kudu::master::TSManager;
+using kudu::pb_util::SecureShortDebugString;
 using kudu::rebalance::BuildTabletExtraInfoMap;
 using kudu::rebalance::ClusterInfo;
 using kudu::rebalance::ClusterLocalityInfo;
@@ -167,8 +170,8 @@
 Status AutoRebalancerTask::Init() {
   DCHECK(!thread_) << "AutoRebalancerTask is already initialized";
   RETURN_NOT_OK(MessengerBuilder("auto-rebalancer").Build(&messenger_));
-  return kudu::Thread::Create("catalog manager", "auto-rebalancer",
-                              [this]() { this->RunLoop(); }, &thread_);
+  return Thread::Create("catalog manager", "auto-rebalancer",
+                        [this]() { this->RunLoop(); }, &thread_);
 }
 
 void AutoRebalancerTask::Shutdown() {
@@ -181,9 +184,7 @@
 }
 
 void AutoRebalancerTask::RunLoop() {
-
   vector<Rebalancer::ReplicaMove> replica_moves;
-
   while (!shutdown_.WaitFor(
       MonoDelta::FromSeconds(FLAGS_auto_rebalancing_interval_seconds))) {
 
@@ -205,21 +206,19 @@
     ClusterRawInfo raw_info;
     ClusterInfo cluster_info;
     TabletsPlacementInfo placement_info;
-
     Status s = BuildClusterRawInfo(/*location*/boost::none, &raw_info);
     if (!s.ok()) {
       LOG(WARNING) << Substitute("Could not retrieve cluster info: $0", s.ToString());
       continue;
     }
 
-    // There should be no in-flight moves in progress, because this loop waits
-    // for scheduled moves to complete before continuing to the next iteration.
+    // NOTE: There should be no moves in progress, because this loop waits for
+    // scheduled moves to complete before continuing to the next iteration.
     s = rebalancer_.BuildClusterInfo(raw_info, Rebalancer::MovesInProgress(), &cluster_info);
     if (!s.ok()) {
       LOG(WARNING) << Substitute("Could not build cluster info: $0", s.ToString());
       continue;
     }
-
     if (config_.run_policy_fixer) {
       s = BuildTabletsPlacementInfo(raw_info, Rebalancer::MovesInProgress(), &placement_info);
       if (!s.ok()) {
@@ -228,25 +227,18 @@
       }
     }
 
-    // With the current synchronous implementation, verify that any moves
-    // scheduled in the previous iteration completed.
-    // The container 'replica_moves' should be empty.
-    DCHECK_EQ(0, replica_moves.size());
-
-    // For simplicity, if there are policy violations, only move replicas
-    // to satisfy placement policy in this loop iteration.
+    DCHECK(replica_moves.empty());
     s = GetMoves(raw_info, cluster_info.locality, placement_info, &replica_moves);
     if (!s.ok()) {
       LOG(WARNING) << Substitute("could not retrieve auto-rebalancing replica moves: $0",
                                  s.ToString());
       continue;
     }
-
     WARN_NOT_OK(ExecuteMoves(replica_moves),
                 "failed to send replica move request");
-
     moves_scheduled_this_round_for_test_ = replica_moves.size();
 
+    // Wait for all of the moves from this iteration to complete.
     do {
       if (shutdown_.WaitFor(MonoDelta::FromSeconds(
             FLAGS_auto_rebalancing_wait_for_replica_moves_seconds))) {
@@ -255,7 +247,7 @@
       WARN_NOT_OK(CheckReplicaMovesCompleted(&replica_moves),
                   "scheduled replica move failed to complete");
     } while (!replica_moves.empty());
-  } // end while
+  }
 }
 
 Status AutoRebalancerTask::GetMoves(
@@ -327,6 +319,8 @@
   auto num_tservers = raw_info.tserver_summaries.size();
   auto max_moves = FLAGS_auto_rebalancing_max_moves_per_server * num_tservers;
   max_moves -= replica_moves->size();
+  // TODO(awong): it'd be nice to track the number of on-going moves for each
+  // tablet server and enforce the max moves at a more granular level.
   if (max_moves <= 0) {
     return Status::OK();
   }
@@ -390,79 +384,68 @@
       int index = r.ts_info_idx();
       const TSInfoPB ts_info = *(ts_infos_dict.ts_info_pbs[index]);
       *leader_uuid = ts_info.permanent_uuid();
-      const auto& addr = ts_info.rpc_addresses(0);
-      leader_hp->set_host(addr.host());
-      leader_hp->set_port(addr.port());
+      *leader_hp = HostPortFromPB(ts_info.rpc_addresses(0));
       return Status::OK();
     }
   }
   return Status::NotFound(Substitute("Couldn't find leader for tablet $0", tablet_id));
 }
 
-// TODO(hannah.nguyen): remove moves that fail to be scheduled from 'replica_moves'.
+// TODO(hannah.nguyen): remove moves that fail to be scheduled from
+// 'replica_moves'.
 Status AutoRebalancerTask::ExecuteMoves(
     const vector<Rebalancer::ReplicaMove>& replica_moves) {
-
   for (const auto& move_info : replica_moves) {
     const auto& tablet_id = move_info.tablet_uuid;
     const auto& src_ts_uuid = move_info.ts_uuid_from;
     const auto& dst_ts_uuid = move_info.ts_uuid_to;
-
     string leader_uuid;
     HostPort leader_hp;
     RETURN_NOT_OK(GetTabletLeader(tablet_id, &leader_uuid, &leader_hp));
-
-    // Mark the replica to be replaced.
-    BulkChangeConfigRequestPB req;
-    auto* replace = req.add_config_changes();
-    replace->set_type(MODIFY_PEER);
-    *replace->mutable_peer()->mutable_permanent_uuid() = src_ts_uuid;
-    replace->mutable_peer()->mutable_attrs()->set_replace(true);
-
-    shared_ptr<TSDescriptor> desc;
-    if (!ts_manager_->LookupTSByUUID(leader_uuid, &desc)) {
+    shared_ptr<TSDescriptor> leader_desc;
+    if (!ts_manager_->LookupTSByUUID(leader_uuid, &leader_desc)) {
       return Status::NotFound(
           Substitute("Couldn't find leader replica's tserver $0", leader_uuid));
     }
+    // Mark the replica to be replaced.
+    BulkChangeConfigRequestPB req;
+    auto* modify_peer = req.add_config_changes();
+    modify_peer->set_type(MODIFY_PEER);
+    *modify_peer->mutable_peer()->mutable_permanent_uuid() = src_ts_uuid;
+    modify_peer->mutable_peer()->mutable_attrs()->set_replace(true);
 
-    // Set up the proxy to communicate with the leader replica's tserver.
-    shared_ptr<ConsensusServiceProxy> proxy;
-    HostPort hp;
-    RETURN_NOT_OK(hp.ParseString(leader_uuid, leader_hp.port()));
-    vector<Sockaddr> resolved;
-    RETURN_NOT_OK(hp.ResolveAddresses(&resolved));
-    proxy.reset(new ConsensusServiceProxy(messenger_, resolved[0], hp.host()));
-
-    // Find the specified destination tserver, if load rebalancing.
-    // Otherwise, replica moves to fix placement policy violations do not have
-    // destination tservers specified, i.e. dst_ts_uuid will be empty if
-    // there are placement policy violations.
+    // NOTE: 'dst_ts_uuid' is empty if the move was scheduled to fix location
+    // policy violations.
     if (!dst_ts_uuid.empty()) {
       // Verify that the destination tserver exists.
       shared_ptr<TSDescriptor> dest_desc;
       if (!ts_manager_->LookupTSByUUID(dst_ts_uuid, &dest_desc)) {
         return Status::NotFound("Could not find destination tserver");
       }
+      ServerRegistrationPB dest_reg;
+      dest_desc->GetRegistration(&dest_reg);
 
-      auto* change = req.add_config_changes();
-      change->set_type(ADD_PEER);
-      *change->mutable_peer()->mutable_permanent_uuid() = dst_ts_uuid;
-      change->mutable_peer()->set_member_type(RaftPeerPB::NON_VOTER);
-      change->mutable_peer()->mutable_attrs()->set_promote(true);
-      *change->mutable_peer()->mutable_last_known_addr() = HostPortToPB(hp);
+      auto* add_peer_change = req.add_config_changes();
+      add_peer_change->set_type(ADD_PEER);
+      auto* new_peer = add_peer_change->mutable_peer();
+      new_peer->set_permanent_uuid(dst_ts_uuid);
+      new_peer->set_member_type(RaftPeerPB::NON_VOTER);
+      new_peer->mutable_attrs()->set_promote(true);
+      *new_peer->mutable_last_known_addr() = dest_reg.rpc_addresses(0);
     }
 
-    // Request movement or replacement of the replica.
+    // Send the change config request to the tablet leader.
     ChangeConfigResponsePB resp;
     RpcController rpc;
     rpc.set_timeout(MonoDelta::FromSeconds(FLAGS_auto_rebalancing_rpc_timeout_seconds));
     req.set_dest_uuid(leader_uuid);
     req.set_tablet_id(tablet_id);
-
-    RETURN_NOT_OK(proxy->BulkChangeConfig(req, &resp, &rpc));
+    vector<Sockaddr> resolved;
+    RETURN_NOT_OK(leader_hp.ResolveAddresses(&resolved));
+    ConsensusServiceProxy proxy(messenger_, resolved[0], leader_hp.host());
+    RETURN_NOT_OK(proxy.BulkChangeConfig(req, &resp, &rpc));
     if (resp.has_error()) return StatusFromPB(resp.error().status());
   }
-
   return Status::OK();
 }
 
@@ -496,7 +479,7 @@
     }
     summary.health = ServerHealth::HEALTHY;
     tserver_uuids.insert(summary.uuid);
-    tserver_summaries.push_back(std::move(summary));
+    tserver_summaries.emplace_back(std::move(summary));
   }
 
   vector<scoped_refptr<TableInfo>> table_infos;
@@ -554,9 +537,9 @@
       vector<string> non_voters;
       for (const auto& peer : cstatepb.committed_config().peers()) {
         if (peer.member_type() == RaftPeerPB::VOTER) {
-          voters.push_back(peer.permanent_uuid());
+          voters.emplace_back(peer.permanent_uuid());
         } else if (peer.member_type() == RaftPeerPB::NON_VOTER) {
-          non_voters.push_back(peer.permanent_uuid());
+          non_voters.emplace_back(peer.permanent_uuid());
         }
       }
 
@@ -583,10 +566,9 @@
         }
         rep.is_voter = true;
         rep.ts_healthy = true;
-        replicas.push_back(rep);
+        replicas.emplace_back(std::move(rep));
       }
-
-      tablet_summary.replicas.swap(replicas);
+      tablet_summary.replicas = std::move(replicas);
 
       // Determine if tablet is healthy enough for rebalancing.
       if (voters.size() < table_summary.replication_factor) {
@@ -596,11 +578,9 @@
       } else {
         tablet_summary.result = HealthCheckResult::HEALTHY;
       }
-
-      tablet_summaries.push_back(std::move(tablet_summary));
+      tablet_summaries.emplace_back(std::move(tablet_summary));
     }
-
-    table_summaries.push_back(std::move(table_summary));
+    table_summaries.emplace_back(std::move(table_summary));
   }
 
   if (!location) {
@@ -608,21 +588,18 @@
     raw_info->tserver_summaries = std::move(tserver_summaries);
     raw_info->tablet_summaries = std::move(tablet_summaries);
     raw_info->table_summaries = std::move(table_summaries);
-
     return Status::OK();
   }
 
   // Information on the specified location only: filter out non-relevant info.
   const auto& location_str = *location;
-
   unordered_set<string> ts_ids_at_location;
   for (const auto& summary : tserver_summaries) {
     if (summary.ts_location == location_str) {
-      raw_info->tserver_summaries.push_back(summary);
+      raw_info->tserver_summaries.emplace_back(summary);
       InsertOrDie(&ts_ids_at_location, summary.uuid);
     }
   }
-
   unordered_set<string> table_ids_at_location;
   for (const auto& summary : tablet_summaries) {
     const auto& replicas = summary.replicas;
@@ -630,22 +607,20 @@
     replicas_at_location.reserve(replicas.size());
     for (const auto& replica : replicas) {
       if (ContainsKey(ts_ids_at_location, replica.ts_uuid)) {
-        replicas_at_location.push_back(replica);
+        replicas_at_location.emplace_back(replica);
       }
     }
     if (!replicas_at_location.empty()) {
       table_ids_at_location.insert(summary.table_id);
-      raw_info->tablet_summaries.push_back(summary);
+      raw_info->tablet_summaries.emplace_back(summary);
       raw_info->tablet_summaries.back().replicas = std::move(replicas_at_location);
     }
   }
-
   for (const auto& summary : table_summaries) {
     if (ContainsKey(table_ids_at_location, summary.id)) {
-      raw_info->table_summaries.push_back(summary);
+      raw_info->table_summaries.emplace_back(summary);
     }
   }
-
   return Status::OK();
 }
 
@@ -658,18 +633,17 @@
   for (int i = 0; i < replica_moves->size(); ++i) {
     const rebalance::Rebalancer::ReplicaMove& move = (*replica_moves)[i];
 
-    // Check if there was an error in checking move completion.
-    // If so, moves are erased from 'replica_moves' other than this problematic one.
+    // Check if there was an error in checking move completion. If so, remove
+    // the problematic one from 'replica_moves'.
     Status s = CheckMoveCompleted(move, &move_is_complete);
     if (!s.ok()) {
       replica_moves->erase(replica_moves->begin() + i);
       LOG(WARNING) << Substitute("Could not move replica: $0", s.ToString());
       return s;
     }
-
-    // If move was completed, remove it from 'replica_moves'.
+    // If the move was completed, remove it from 'replica_moves'.
     if (move_is_complete) {
-      indexes_to_remove.push_back(i);
+      indexes_to_remove.emplace_back(i);
     }
   }
 
@@ -681,8 +655,9 @@
   return Status::OK();
 }
 
-// TODO(hannah.nguyen): Retrieve consensus state information from the CatalogManager instead.
-// Currently, this implementation mirrors CheckCompleteMove() in tools/tool_replica_util.
+// TODO(hannah.nguyen): Retrieve consensus state information from the
+// CatalogManager instead. The current implementation mirrors
+// CheckCompleteMove() in tools/tool_replica_util.cc.
 Status AutoRebalancerTask::CheckMoveCompleted(
     const rebalance::Rebalancer::ReplicaMove& replica_move,
     bool* is_complete) {
@@ -739,8 +714,10 @@
   // Failure case: newly added replica is no longer in the config.
   if (!to_ts_uuid.empty() && !to_ts_uuid_in_config) {
     return Status::Incomplete(Substitute(
-        "tablet $0, TS $1 -> TS $2 move failed, target replica disappeared",
-        tablet_uuid, from_ts_uuid, to_ts_uuid));
+        "tablet $0, TS $1 -> TS $2 move failed, destination replica "
+        "disappeared from tablet's Raft config: $3",
+        tablet_uuid, from_ts_uuid, to_ts_uuid,
+        SecureShortDebugString(cstate.committed_config())));
   }
 
   // Check if replica slated for removal is still in the config.