[rpc] KUDU-75: refresh DNS entries if proxies hit a network error

This patch aims to tackle the following issues that revolve around
changes in addresses at runtime.
- KUDU-1885: master long-lived tserver proxies need to be re-resolved in
  case nodes are assigned different addresses; today we just retry at
  the same location forever.
- KUDU-1620: tablet consensus long-lived proxies need to be re-resolved
  on failure.
- C++ clients' usages of RemoteTabletServer also have long-lived proxies
  and are likely to run into similar problems if tservers are restarted
  and assigned new physical addresses.

It addresses this by plumbing a DnsResolver into the rpc::Proxy class,
and chaining the asynchronous callback to an asynchronous refresh of the
address with the newly introduced refreshing capabilities of the
DnsResolver.

The new style of proxy isn't currently used, but a test is added
exercising the new functionality.

Change-Id: I777d169bd3a461294e5721f05071b726ced70f7e
Reviewed-on: http://gerrit.cloudera.org:8080/17839
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 273842d..c8d831e 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -129,6 +129,7 @@
 ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
 ADD_KUDU_TEST(negotiation-test)
 ADD_KUDU_TEST(periodic-test)
+ADD_KUDU_TEST(proxy-test)
 ADD_KUDU_TEST(reactor-test)
 ADD_KUDU_TEST(request_tracker-test)
 ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
diff --git a/src/kudu/rpc/connection_id.cc b/src/kudu/rpc/connection_id.cc
index 9728f01..8a14d83 100644
--- a/src/kudu/rpc/connection_id.cc
+++ b/src/kudu/rpc/connection_id.cc
@@ -20,7 +20,7 @@
 #include <cstddef>
 #include <utility>
 
-#include <boost/functional/hash/hash.hpp>
+#include <boost/container_hash/extensions.hpp>
 #include <glog/logging.h>
 
 #include "kudu/gutil/strings/substitute.h"
@@ -52,7 +52,7 @@
 
 string ConnectionId::ToString() const {
   string remote;
-  if (remote_.is_ip() && hostname_ != remote_.host()) {
+  if (remote_.is_initialized() && remote_.is_ip() && hostname_ != remote_.host()) {
     remote = strings::Substitute("$0 ($1)", remote_.ToString(), hostname_);
   } else {
     remote = remote_.ToString();
diff --git a/src/kudu/rpc/connection_id.h b/src/kudu/rpc/connection_id.h
index 6ec98f7..0aed953 100644
--- a/src/kudu/rpc/connection_id.h
+++ b/src/kudu/rpc/connection_id.h
@@ -45,6 +45,10 @@
 
   const std::string& hostname() const { return hostname_; }
 
+  void set_remote(const Sockaddr& remote) {
+    remote_ = remote;
+  }
+
   // The credentials of the user associated with this connection, if any.
   void set_user_credentials(UserCredentials user_credentials);
 
diff --git a/src/kudu/rpc/mt-rpc-test.cc b/src/kudu/rpc/mt-rpc-test.cc
index 09bda4b..3496e26 100644
--- a/src/kudu/rpc/mt-rpc-test.cc
+++ b/src/kudu/rpc/mt-rpc-test.cc
@@ -71,7 +71,7 @@
     CHECK_OK(CreateMessenger("ClientSC", &client_messenger));
     Proxy p(client_messenger, server_addr, server_addr.host(),
             GenericCalculatorService::static_service_name());
-    *result = DoTestSyncCall(p, method_name);
+    *result = DoTestSyncCall(&p, method_name);
     latch->CountDown();
   }
 
@@ -93,7 +93,7 @@
     int i = 0;
     while (true) {
       i++;
-      Status s = DoTestSyncCall(p, method_name);
+      Status s = DoTestSyncCall(&p, method_name);
       if (!s.ok()) {
         // Return on first failure.
         LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: "
diff --git a/src/kudu/rpc/protoc-gen-krpc.cc b/src/kudu/rpc/protoc-gen-krpc.cc
index 2226160..e8d8e55 100644
--- a/src/kudu/rpc/protoc-gen-krpc.cc
+++ b/src/kudu/rpc/protoc-gen-krpc.cc
@@ -575,6 +575,10 @@
           "      std::shared_ptr<::kudu::rpc::Messenger> messenger,\n"
           "      const ::kudu::Sockaddr& sockaddr,\n"
           "      std::string hostname);\n"
+          "  $service_name$Proxy(\n"
+          "      std::shared_ptr<::kudu::rpc::Messenger> messenger,\n"
+          "      const ::kudu::HostPort& hp,\n"
+          "      DnsResolver* dns_resolver);\n"
           "  ~$service_name$Proxy();\n");
 
       for (int method_idx = 0; method_idx < service->method_count();
@@ -639,6 +643,15 @@
           "            std::move(hostname),\n"
           "            \"$full_service_name$\") {\n"
           "}\n"
+          "$service_name$Proxy::$service_name$Proxy(\n"
+          "    std::shared_ptr<::kudu::rpc::Messenger> messenger,\n"
+          "    const ::kudu::HostPort& hp,\n"
+          "    DnsResolver* dns_resolver)\n"
+          "    : Proxy(std::move(messenger),\n"
+          "            hp,\n"
+          "            dns_resolver,\n"
+          "            \"$full_service_name$\") {\n"
+          "}\n"
           "\n"
           "$service_name$Proxy::~$service_name$Proxy() {\n"
           "}\n");
@@ -652,7 +665,8 @@
             "    const $request$& req,\n"
             "    $response$* resp,\n"
             "    ::kudu::rpc::RpcController* controller) {\n"
-            "  return SyncRequest(\"$rpc_name$\", req, resp, controller);\n"
+            "  static const std::string kRpcName = \"$rpc_name$\";\n"
+            "  return SyncRequest(kRpcName, req, resp, controller);\n"
             "}\n"
             "\n"
             "void $service_name$Proxy::$rpc_name$Async(\n"
@@ -660,7 +674,8 @@
             "    $response$* resp,\n"
             "    ::kudu::rpc::RpcController* controller,\n"
             "    const ::kudu::rpc::ResponseCallback& callback) {\n"
-            "  AsyncRequest(\"$rpc_name$\", req, resp, controller, callback);\n"
+            "  static const std::string kRpcName = \"$rpc_name$\";\n"
+            "  AsyncRequest(kRpcName, req, resp, controller, callback);\n"
             "}\n");
         subs->Pop(); // method
       }
diff --git a/src/kudu/rpc/proxy-test.cc b/src/kudu/rpc/proxy-test.cc
new file mode 100644
index 0000000..34b626c
--- /dev/null
+++ b/src/kudu/rpc/proxy-test.cc
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/proxy.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rtest.pb.h"
+#include "kudu/rpc/service_pool.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_string(dns_addr_resolution_override);
+
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+class Messenger;
+
+namespace {
+
+constexpr uint16_t kPort = 1111;
+constexpr const char* kFakeHost = "fakehost";
+const HostPort kFakeHostPort(kFakeHost, kPort);
+
+Status SendRequest(Proxy* p) {
+  SleepRequestPB req;
+  req.set_sleep_micros(100 * 1000); // 100ms
+  SleepResponsePB resp;
+  RpcController controller;
+  controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+  return p->SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &controller);
+}
+
+} // anonymous namespace
+
+class RpcProxyTest : public RpcTestBase {
+};
+
+// Test that proxies initialized with a DnsResolver return errors when
+// receiving a non-transient error.
+TEST_F(RpcProxyTest, TestProxyReturnsOnNonTransientError) {
+  SKIP_IF_SLOW_NOT_ALLOWED();  // This test waits for a timeout.
+
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  Proxy p(client_messenger, kFakeHostPort, &dns_resolver,
+          CalculatorService::static_service_name());
+  p.Init();
+  Status s = SendRequest(&p);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // If we do resolve to an address that turns out to be bogus, we should
+  // time out when negotiating.
+  FLAGS_dns_addr_resolution_override = Substitute("$0=1.1.1.1", kFakeHost);
+  s = SendRequest(&p);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+}
+
+// Test that ensures a proxy initialized with an address will use that address.
+TEST_F(RpcProxyTest, TestProxyUsesInitialAddr) {
+  string ip1 = GetBindIpForDaemon(/*index*/1, kDefaultBindMode);
+  Sockaddr server_addr;
+  ASSERT_OK(server_addr.ParseString(ip1, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr));
+
+  // Despite our proxy being configured with a fake host, our request should
+  // still go through since we call Init() with a valid address.
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  Proxy p(client_messenger, kFakeHostPort, &dns_resolver,
+          CalculatorService::static_service_name());
+  p.Init(server_addr);
+  ASSERT_OK(SendRequest(&p));
+
+  server_messenger_.reset();
+  service_pool_.reset();
+
+  // With our server down, the request should fail.
+  Status s = SendRequest(&p);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // Once we bring up a new server and allow our proxy to resolve it, the
+  // request should succeed.
+  string ip2 = GetBindIpForDaemon(/*index*/2, kDefaultBindMode);
+  Sockaddr second_addr;
+  ASSERT_OK(second_addr.ParseString(ip2, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&second_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, second_addr.ToString());
+  ASSERT_OK(SendRequest(&p));
+}
+
+TEST_F(RpcProxyTest, TestNonResolvingProxyIgnoresInit) {
+  string ip = GetBindIpForDaemon(/*index*/1, kDefaultBindMode);
+  Sockaddr server_addr;
+  ASSERT_OK(server_addr.ParseString(ip, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr));
+
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  HostPort hp(ip, kPort);
+  Proxy p(client_messenger, hp, &dns_resolver, CalculatorService::static_service_name());
+
+  // Call Init() with a fake address. Because this proxy isn't configured for
+  // address re-resolution, the new address is ignored.
+  Sockaddr fake_addr;
+  ASSERT_OK(fake_addr.ParseString("1.1.1.1", kPort));
+  p.Init(fake_addr);
+
+  // We should thus have no trouble sending a request.
+  ASSERT_OK(SendRequest(&p));
+}
+
+// Start a proxy with a DNS resolver that maps a hostname to the address bound
+// by the server. Then restart the server but bind to a different address, and
+// update the DNS resolver to map the same hostname to the different address.
+// The proxy should eventually be usable.
+TEST_F(RpcProxyTest, TestProxyReresolvesAddress) {
+  string ip1 = GetBindIpForDaemon(/*index*/1, kDefaultBindMode);
+  Sockaddr server_addr;
+  ASSERT_OK(server_addr.ParseString(ip1, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, server_addr.ToString());
+
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  Proxy p(client_messenger, kFakeHostPort, &dns_resolver,
+          CalculatorService::static_service_name());
+  p.Init();
+  ASSERT_OK(SendRequest(&p));
+
+  string ip2 = GetBindIpForDaemon(/*index*/2, kDefaultBindMode);
+  Sockaddr second_addr;
+  ASSERT_OK(second_addr.ParseString(ip2, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&second_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, second_addr.ToString());
+  ASSERT_OK(SendRequest(&p));
+}
+
+TEST_F(RpcProxyTest, TestProxyReresolvesAddressFromThreads) {
+  constexpr const int kNumThreads = 4;
+
+  string ip1 = GetBindIpForDaemon(/*index*/1, kDefaultBindMode);
+  Sockaddr server_addr;
+  ASSERT_OK(server_addr.ParseString(ip1, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, server_addr.ToString());
+
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  Proxy p(client_messenger, kFakeHostPort, &dns_resolver,
+          CalculatorService::static_service_name());
+  p.Init();
+  ASSERT_OK(SendRequest(&p));
+
+  string ip2 = GetBindIpForDaemon(/*index*/2, kDefaultBindMode);
+  Sockaddr second_addr;
+  ASSERT_OK(second_addr.ParseString(ip2, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&second_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, second_addr.ToString());
+
+  vector<Status> errors(kNumThreads);
+  vector<thread> threads;
+  threads.reserve(kNumThreads);
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&, i] {
+      errors[i] = SendRequest(&p);
+    });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  for (const auto& e : errors) {
+    EXPECT_OK(e);
+  }
+}
+
+} // namespace rpc
+} // namespace kudu
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 8bd45fb..8d810b8 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -21,9 +21,11 @@
 #include <iostream>
 #include <memory>
 #include <utility>
+#include <vector>
 
 #include <glog/logging.h>
 
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/outbound_call.h"
@@ -31,6 +33,8 @@
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/user_credentials.h"
+#include "kudu/util/net/dns_resolver.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/notification.h"
 #include "kudu/util/status.h"
@@ -39,6 +43,9 @@
 using google::protobuf::Message;
 using std::string;
 using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace rpc {
@@ -48,6 +55,7 @@
              string hostname,
              string service_name)
     : service_name_(std::move(service_name)),
+      dns_resolver_(nullptr),
       messenger_(std::move(messenger)),
       is_started_(false) {
   CHECK(messenger_ != nullptr);
@@ -67,19 +75,72 @@
   conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds));
 }
 
+Proxy::Proxy(std::shared_ptr<Messenger> messenger,
+             HostPort hp,
+             DnsResolver* dns_resolver,
+             string service_name)
+    : service_name_(std::move(service_name)),
+      hp_(std::move(hp)),
+      dns_resolver_(dns_resolver),
+      messenger_(std::move(messenger)),
+      is_started_(false) {
+  CHECK(messenger_ != nullptr);
+  DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
+  DCHECK(hp_.Initialized());
+}
+
+Sockaddr* Proxy::GetSingleSockaddr(std::vector<Sockaddr>* addrs) const {
+  DCHECK(!addrs->empty());
+  if (PREDICT_FALSE(addrs->size() > 1)) {
+    LOG(WARNING) << Substitute(
+        "$0 proxy host/port $1 resolves to $2 different addresses. Using $3",
+        service_name_, hp_.ToString(), addrs->size(), (*addrs)[0].ToString());
+  }
+  return &(*addrs)[0];
+}
+
+void Proxy::Init(Sockaddr addr) {
+  if (!dns_resolver_) {
+    return;
+  }
+  // By default, we set the real user to the currently logged-in user.
+  // Effective user and password remain blank.
+  string real_user;
+  Status s = GetLoggedInUser(&real_user);
+  if (!s.ok()) {
+    LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: "
+        << s.ToString() << " before connecting to host/port: " << hp_.ToString();
+  }
+  vector<Sockaddr> addrs;
+  if (!addr.is_initialized()) {
+    s = dns_resolver_->ResolveAddresses(hp_, &addrs);
+    if (PREDICT_TRUE(s.ok() && !addrs.empty())) {
+      addr = *GetSingleSockaddr(&addrs);
+      DCHECK(addr.is_initialized());
+      addr.set_port(hp_.port());
+      // NOTE: it's ok to proceed on failure -- the address will remain
+      // uninitialized and be re-resolved when sending the next request.
+    }
+  }
+
+  UserCredentials creds;
+  creds.set_real_user(std::move(real_user));
+  conn_id_ = ConnectionId(addr, hp_.host(), std::move(creds));
+}
+
 Proxy::~Proxy() {
 }
 
-void Proxy::AsyncRequest(const string& method,
-                         const google::protobuf::Message& req,
-                         google::protobuf::Message* response,
-                         RpcController* controller,
-                         const ResponseCallback& callback) const {
-  CHECK(!controller->call_) << "Controller should be reset";
-  base::subtle::NoBarrier_Store(&is_started_, true);
+void Proxy::EnqueueRequest(const string& method,
+                           const google::protobuf::Message& req,
+                           google::protobuf::Message* response,
+                           RpcController* controller,
+                           const ResponseCallback& callback) const {
+  ConnectionId connection = conn_id();
+  DCHECK(connection.remote().is_initialized());
   RemoteMethod remote_method(service_name_, method);
   controller->call_.reset(
-      new OutboundCall(conn_id_, remote_method, response, controller, callback));
+      new OutboundCall(connection, remote_method, response, controller, callback));
   controller->SetRequestParam(req);
   controller->SetMessenger(messenger_.get());
 
@@ -88,11 +149,86 @@
   messenger_->QueueOutboundCall(controller->call_);
 }
 
+void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method,
+                                        const google::protobuf::Message& req,
+                                        google::protobuf::Message* response,
+                                        RpcController* controller,
+                                        const ResponseCallback& callback) {
+  DCHECK(!controller->call_);
+  vector<Sockaddr>* addrs = new vector<Sockaddr>();
+  DCHECK_NOTNULL(dns_resolver_)->RefreshAddressesAsync(hp_, addrs,
+      [this, &req, &method, callback, response, controller, addrs] (const Status& s) {
+    unique_ptr<vector<Sockaddr>> unique_addrs(addrs);
+    // If we fail to resolve the address, treat the call as failed.
+    if (!s.ok() || addrs->empty()) {
+      DCHECK(!controller->call_);
+      // NOTE: we need to keep a reference here because the callback may end up
+      // destructing the controller and the outbound call, _while_ the callback
+      // is running from within the call!
+      auto shared_call = std::make_shared<OutboundCall>(
+          conn_id(), RemoteMethod{service_name_, method}, response, controller, callback);
+      controller->call_ = shared_call;
+      controller->call_->SetFailed(s.CloneAndPrepend("failed to refresh physical address"));
+      return;
+    }
+    auto* addr = GetSingleSockaddr(addrs);
+    DCHECK(addr->is_initialized());
+    addr->set_port(hp_.port());
+    {
+      std::lock_guard<simple_spinlock> l(lock_);
+      conn_id_.set_remote(*addr);
+    }
+    EnqueueRequest(method, req, response, controller, callback);
+  });
+}
+
+void Proxy::AsyncRequest(const string& method,
+                         const google::protobuf::Message& req,
+                         google::protobuf::Message* response,
+                         RpcController* controller,
+                         const ResponseCallback& callback) {
+  CHECK(!controller->call_) << "Controller should be reset";
+  base::subtle::NoBarrier_Store(&is_started_, true);
+  if (!dns_resolver_) {
+    EnqueueRequest(method, req, response, controller, callback);
+    return;
+  }
+
+  // If we haven't successfully initialized the remote, e.g. because the DNS
+  // lookup failed, refresh the DNS entry and enqueue the request.
+  bool remote_initialized;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    remote_initialized = conn_id_.remote().is_initialized();
+  }
+  if (!remote_initialized) {
+    RefreshDnsAndEnqueueRequest(method, req, response, controller, callback);
+    return;
+  }
+
+  // Otherwise, just enqueue the request, but retry if there's a network error,
+  // since it's possible the physical address of the host was changed. We only
+  // retry once more before calling the callback.
+  auto refresh_dns_and_cb = [this, &req, &method,
+                             callback, response, controller] () {
+    // TODO(awong): we should be more specific here -- consider having the RPC
+    // layer set a flag in the controller that warrants a retry.
+    if (PREDICT_FALSE(!controller->status().ok())) {
+      controller->Reset();
+      RefreshDnsAndEnqueueRequest(method, req, response, controller, callback);
+      return;
+    }
+    // For any other status, OK or otherwise, just run the callback.
+    callback();
+  };
+  EnqueueRequest(method, req, response, controller, refresh_dns_and_cb);
+}
+
 
 Status Proxy::SyncRequest(const string& method,
                           const google::protobuf::Message& req,
                           google::protobuf::Message* resp,
-                          RpcController* controller) const {
+                          RpcController* controller) {
   Notification note;
   AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller,
                [&note]() { note.Notify(); });
@@ -113,7 +249,7 @@
 }
 
 std::string Proxy::ToString() const {
-  return strings::Substitute("$0@$1", service_name_, conn_id_.ToString());
+  return Substitute("$0@$1", service_name_, conn_id_.ToString());
 }
 
 } // namespace rpc
diff --git a/src/kudu/rpc/proxy.h b/src/kudu/rpc/proxy.h
index ccf5f18..b0bc512 100644
--- a/src/kudu/rpc/proxy.h
+++ b/src/kudu/rpc/proxy.h
@@ -17,12 +17,16 @@
 #pragma once
 
 #include <memory>
+#include <mutex>
 #include <string>
+#include <vector>
 
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/rpc/connection_id.h"
 #include "kudu/rpc/response_callback.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
 
 namespace google {
@@ -33,6 +37,7 @@
 
 namespace kudu {
 
+class DnsResolver;
 class Sockaddr;
 
 namespace rpc {
@@ -65,8 +70,27 @@
         std::string hostname,
         std::string service_name);
 
+  // TODO(awong): consider a separate auto-resolving proxy class?
+  Proxy(std::shared_ptr<Messenger> messenger,
+        HostPort hp,
+        DnsResolver* dns_resolver,
+        std::string service_name);
+
   ~Proxy();
 
+  // If the proxy is configured for address re-resolution (by supplying a
+  // DnsResolver and HostPort in the constructor), performs an initial
+  // resolution of the address using the HostPort. If 'addr' is supplied, it is
+  // used instead of performing resolution (this is useful to initialize
+  // several proxies with a single external DNS resolution).
+  //
+  // Otherwise, this is a no-op.
+  //
+  // NOTE: it is always OK to skip calling this method -- if this proxy is
+  // configured for address re-resolution and this is skipped, the resolution
+  // will happen upon sending the first request.
+  void Init(Sockaddr addr = {});
+
   // Call a remote method asynchronously.
   //
   // Typically, users will not call this directly, but rather through
@@ -97,14 +121,14 @@
                     const google::protobuf::Message& req,
                     google::protobuf::Message* resp,
                     RpcController* controller,
-                    const ResponseCallback& callback) const;
+                    const ResponseCallback& callback);
 
   // The same as AsyncRequest(), except that the call blocks until the call
   // finishes. If the call fails, returns a non-OK result.
   Status SyncRequest(const std::string& method,
                      const google::protobuf::Message& req,
                      google::protobuf::Message* resp,
-                     RpcController* controller) const;
+                     RpcController* controller);
 
   // Set the user credentials which should be used to log in.
   void set_user_credentials(UserCredentials user_credentials);
@@ -121,9 +145,48 @@
   std::string ToString() const;
 
  private:
+  // Asynchronously refreshes the DNS, enqueueing the given request upon
+  // success, or failing the call and calling the callback upon failure.
+  void RefreshDnsAndEnqueueRequest(const std::string& method,
+                                   const google::protobuf::Message& req,
+                                   google::protobuf::Message* response,
+                                   RpcController* controller,
+                                   const ResponseCallback& callback);
+
+  // Queues the given request as an outbound call using the given messenger,
+  // controller, and response.
+  void EnqueueRequest(const std::string& method,
+                      const google::protobuf::Message& req,
+                      google::protobuf::Message* response,
+                      RpcController* controller,
+                      const ResponseCallback& callback) const;
+
+  // Returns a single Sockaddr from the 'addrs', logging a warning if there is
+  // more than one to choose from.
+  Sockaddr* GetSingleSockaddr(std::vector<Sockaddr>* addrs) const;
+
+  ConnectionId conn_id() const {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return conn_id_;
+  }
+
   const std::string service_name_;
+  HostPort hp_;
+  DnsResolver* dns_resolver_;
   std::shared_ptr<Messenger> messenger_;
+
+  // TODO(awong): consider implementing some lock-free list of ConnectionIds
+  // instead of taking a lock every time we want to get the "current"
+  // ConnectionId.
+  //
+  // Connection ID used by this proxy. Once the proxy has started sending
+  // requests, the connection ID may be updated in response to calls (e.g. if
+  // we re-resolved the physical address in response to an invalid DNS entry).
+  // As such, 'conn_id_' is protected by 'lock_', and should be copied and
+  // passed around, rather than used directly
+  mutable simple_spinlock lock_;
   ConnectionId conn_id_;
+
   mutable Atomic32 is_started_;
 
   DISALLOW_COPY_AND_ASSIGN(Proxy);
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 4a87b0f..602fa6e 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -467,8 +467,8 @@
     return bld.Build(messenger);
   }
 
-  Status DoTestSyncCall(const Proxy &p, const char *method,
-                        CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS) {
+  static Status DoTestSyncCall(Proxy* p, const char *method,
+                               CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS) {
     AddRequestPB req;
     req.set_x(rand());
     req.set_y(rand());
@@ -476,13 +476,13 @@
     RpcController controller;
     controller.set_timeout(MonoDelta::FromMilliseconds(10000));
     controller.set_credentials_policy(policy);
-    RETURN_NOT_OK(p.SyncRequest(method, req, &resp, &controller));
+    RETURN_NOT_OK(p->SyncRequest(method, req, &resp, &controller));
 
     CHECK_EQ(req.x() + req.y(), resp.result());
     return Status::OK();
   }
 
-  void DoTestSidecar(const Proxy &p, int size1, int size2) {
+static void DoTestSidecar(Proxy* p, int size1, int size2) {
     const uint32_t kSeed = 12345;
 
     SendTwoStringsRequestPB req;
@@ -493,8 +493,8 @@
     SendTwoStringsResponsePB resp;
     RpcController controller;
     controller.set_timeout(MonoDelta::FromMilliseconds(10000));
-    CHECK_OK(p.SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName,
-                           req, &resp, &controller));
+    CHECK_OK(p->SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName,
+                            req, &resp, &controller));
 
     Slice first = GetSidecarPointer(controller, resp.sidecar1(), size1);
     Slice second = GetSidecarPointer(controller, resp.sidecar2(), size2);
@@ -510,11 +510,11 @@
     CHECK_EQ(Slice(expected), second);
   }
 
-  static Status DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
+  static Status DoTestOutgoingSidecar(Proxy* p, int size1, int size2) {
     return DoTestOutgoingSidecar(p, {std::string(size1, 'a'), std::string(size2, 'b')});
   }
 
-  static Status DoTestOutgoingSidecar(const Proxy& p, const std::vector<std::string>& strings) {
+  static Status DoTestOutgoingSidecar(Proxy* p, const std::vector<std::string>& strings) {
     PushStringsRequestPB request;
     RpcController controller;
 
@@ -525,8 +525,8 @@
     }
 
     PushStringsResponsePB resp;
-    KUDU_RETURN_NOT_OK(p.SyncRequest(GenericCalculatorService::kPushStringsMethodName,
-                                     request, &resp, &controller));
+    KUDU_RETURN_NOT_OK(p->SyncRequest(GenericCalculatorService::kPushStringsMethodName,
+                                      request, &resp, &controller));
     for (int i = 0; i < strings.size(); i++) {
       CHECK_EQ(strings[i].size(), resp.sizes(i));
       CHECK_EQ(crc::Crc32c(strings[i].data(), strings[i].size()),
@@ -536,11 +536,11 @@
     return Status::OK();
   }
 
-  void DoTestOutgoingSidecarExpectOK(const Proxy &p, int size1, int size2) {
+  static void DoTestOutgoingSidecarExpectOK(Proxy* p, int size1, int size2) {
     CHECK_OK(DoTestOutgoingSidecar(p, size1, size2));
   }
 
-  static void DoTestExpectTimeout(const Proxy& p,
+  static void DoTestExpectTimeout(Proxy* p,
                                   const MonoDelta& timeout,
                                   bool will_be_cancelled = false,
                                   bool* is_negotiaton_error = nullptr) {
@@ -554,7 +554,7 @@
     c.set_timeout(timeout);
     Stopwatch sw;
     sw.start();
-    Status s = p.SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &c);
+    Status s = p->SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &c);
     sw.stop();
     ASSERT_FALSE(s.ok());
     if (is_negotiaton_error != nullptr) {
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index a3eaa04..7a3c657 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -207,7 +207,7 @@
 
   Proxy p(messenger, server_addr, kRemoteHostName,
           GenericCalculatorService::static_service_name());
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test making successful RPC calls.
@@ -227,7 +227,7 @@
                                                expected_remote_str(server_addr)));
 
   for (int i = 0; i < 10; i++) {
-    ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+    ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
   }
 }
 
@@ -259,7 +259,7 @@
                                                             "{remote=$0, user_credentials=",
                                                         expected_remote_str(server_addr)));
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test for KUDU-2041.
@@ -290,7 +290,7 @@
                                                             "{remote=$0, user_credentials=",
                                                         expected_remote_str(server_addr)));
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test making successful RPC calls while using a TLS certificate with a password protected
@@ -326,7 +326,7 @@
                                                             "{remote=$0, user_credentials=",
                                                         expected_remote_str(server_addr)));
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test that using a TLS certificate with a password protected private key and providing
@@ -368,7 +368,7 @@
   // Loop a few calls to make sure that we properly set up and tear down
   // the connections.
   for (int i = 0; i < 5; i++) {
-    Status s = DoTestSyncCall(p, GenericCalculatorService::kAddMethodName);
+    Status s = DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName);
     LOG(INFO) << "Status: " << s.ToString();
     ASSERT_TRUE(s.IsNetworkError()) << "unexpected status: " << s.ToString();
   }
@@ -388,7 +388,7 @@
           GenericCalculatorService::static_service_name());
 
   // Call the method which fails.
-  Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
+  Status s = DoTestSyncCall(&p, "ThisMethodDoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "bad method");
 }
@@ -406,7 +406,7 @@
   Proxy p(client_messenger, server_addr, "localhost", "WrongServiceName");
 
   // Call the method which fails.
-  Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
+  Status s = DoTestSyncCall(&p, "ThisMethodDoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(),
                       "Service unavailable: service WrongServiceName "
@@ -415,7 +415,7 @@
   // If the server has been marked as having registered all services, we should
   // expect a "not found" error instead.
   server_messenger_->SetServicesRegistered();
-  s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
+  s = DoTestSyncCall(&p, "ThisMethodDoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(),
                       "Not found: service WrongServiceName "
@@ -449,7 +449,7 @@
   ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
   Proxy p(client_messenger, server_addr, kRemoteHostName,
           GenericCalculatorService::static_service_name());
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test that connections are kept alive between calls.
@@ -470,7 +470,7 @@
   Proxy p(client_messenger, server_addr, kRemoteHostName,
           GenericCalculatorService::static_service_name());
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 
   SleepFor(MonoDelta::FromMilliseconds(5));
 
@@ -514,7 +514,7 @@
   Proxy p(client_messenger, server_addr, kRemoteHostName,
           GenericCalculatorService::static_service_name());
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 
   ReactorMetrics metrics;
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
@@ -651,7 +651,7 @@
 
   // Run several iterations, just in case.
   for (int i = 0; i < 32; ++i) {
-    ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+    ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
     ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
     ASSERT_EQ(0, metrics.total_client_connections_);
     ASSERT_EQ(i + 1, metrics.total_server_connections_);
@@ -692,7 +692,7 @@
   ASSERT_EQ(0, metrics.total_server_connections_);
 
   // Make an RPC call with ANY_CREDENTIALS policy.
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(1, metrics.total_server_connections_);
@@ -708,7 +708,7 @@
   // Make an RPC call with PRIMARY_CREDENTIALS policy. Currently open connection
   // with ANY_CREDENTIALS policy should be closed and a new one established
   // with PRIMARY_CREDENTIALS policy.
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName,
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName,
                            CredentialsPolicy::PRIMARY_CREDENTIALS));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
@@ -723,7 +723,7 @@
   // connection with PRIMARY_CREDENTIALS policy should be re-used because
   // the ANY_CREDENTIALS policy satisfies the PRIMARY_CREDENTIALS policy which
   // the currently open connection has been established with.
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(2, metrics.total_server_connections_);
@@ -769,7 +769,7 @@
   ASSERT_EQ(0, metrics.num_client_connections_);
 
   // Make an RPC call with the default network plane.
-  ASSERT_OK(DoTestSyncCall(p1, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p1, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(1, metrics.total_server_connections_);
@@ -780,7 +780,7 @@
   ASSERT_EQ(1, metrics.num_client_connections_);
 
   // Make an RPC call with the non-default network plane.
-  ASSERT_OK(DoTestSyncCall(p2, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p2, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(2, metrics.total_server_connections_);
@@ -792,7 +792,7 @@
 
   // Make an RPC call with the default network plane again and verify that
   // there are no new connections.
-  ASSERT_OK(DoTestSyncCall(p1, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p1, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(2, metrics.total_server_connections_);
@@ -870,18 +870,18 @@
           GenericCalculatorService::static_service_name());
 
   // Test a zero-length sidecar
-  DoTestSidecar(p, 0, 0);
+  DoTestSidecar(&p, 0, 0);
 
   // Test some small sidecars
-  DoTestSidecar(p, 123, 456);
+  DoTestSidecar(&p, 123, 456);
 
   // Test some larger sidecars to verify that we properly handle the case where
   // we can't write the whole response to the socket in a single call.
-  DoTestSidecar(p, 3000 * 1024, 2000 * 1024);
+  DoTestSidecar(&p, 3000 * 1024, 2000 * 1024);
 
-  DoTestOutgoingSidecarExpectOK(p, 0, 0);
-  DoTestOutgoingSidecarExpectOK(p, 123, 456);
-  DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024);
+  DoTestOutgoingSidecarExpectOK(&p, 0, 0);
+  DoTestOutgoingSidecarExpectOK(&p, 123, 456);
+  DoTestOutgoingSidecarExpectOK(&p, 3000 * 1024, 2000 * 1024);
 }
 
 // Test sending the maximum number of sidecars, each of them being a single
@@ -903,7 +903,7 @@
   for (auto& s : strings) {
     s = RandomString(2, &rng);
   }
-  ASSERT_OK(DoTestOutgoingSidecar(p, strings));
+  ASSERT_OK(DoTestOutgoingSidecar(&p, strings));
 }
 
 TEST_P(TestRpc, TestRpcSidecarLimits) {
@@ -1008,15 +1008,15 @@
   // Test a very short timeout - we expect this will time out while the
   // call is still trying to connect, or in the send queue. This was triggering ASAN failures
   // before.
-  NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromNanoseconds(1)));
+  NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromNanoseconds(1)));
 
   // Test a longer timeout - expect this will time out after we send the request,
   // but shorter than our threshold for two-stage timeout handling.
-  NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(200)));
+  NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(200)));
 
   // Test a longer timeout - expect this will trigger the "two-stage timeout"
   // code path.
-  NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1500)));
+  NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(1500)));
 }
 
 // Inject 500ms delay in negotiation, and send a call with a short timeout, followed by
@@ -1035,8 +1035,8 @@
           GenericCalculatorService::static_service_name());
 
   FLAGS_rpc_negotiation_inject_delay_ms = 500;
-  NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(50)));
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(50)));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 
   // Only the second call should have been received by the server, because we
   // don't bother sending an already-timed-out call.
@@ -1079,7 +1079,7 @@
 
   bool is_negotiation_error = false;
   NO_FATALS(DoTestExpectTimeout(
-      p, MonoDelta::FromMilliseconds(100), false, &is_negotiation_error));
+      &p, MonoDelta::FromMilliseconds(100), false, &is_negotiation_error));
   EXPECT_TRUE(is_negotiation_error);
 }
 
@@ -1374,14 +1374,14 @@
       case OutboundCall::ON_OUTBOUND_QUEUE:
       case OutboundCall::SENDING:
       case OutboundCall::SENT:
-        ASSERT_TRUE(DoTestOutgoingSidecar(p, 0, 0).IsAborted());
-        ASSERT_TRUE(DoTestOutgoingSidecar(p, 123, 456).IsAborted());
-        ASSERT_TRUE(DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024).IsAborted());
-        DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(timeout_ms), true);
+        ASSERT_TRUE(DoTestOutgoingSidecar(&p, 0, 0).IsAborted());
+        ASSERT_TRUE(DoTestOutgoingSidecar(&p, 123, 456).IsAborted());
+        ASSERT_TRUE(DoTestOutgoingSidecar(&p, 3000 * 1024, 2000 * 1024).IsAborted());
+        DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(timeout_ms), true);
         break;
       case OutboundCall::NEGOTIATION_TIMED_OUT:
       case OutboundCall::TIMED_OUT:
-        DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1000));
+        DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(1000));
         break;
       case OutboundCall::CANCELLED:
         break;
@@ -1399,9 +1399,9 @@
         break;
       }
       case OutboundCall::FINISHED_SUCCESS:
-        DoTestOutgoingSidecarExpectOK(p, 0, 0);
-        DoTestOutgoingSidecarExpectOK(p, 123, 456);
-        DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024);
+        DoTestOutgoingSidecarExpectOK(&p, 0, 0);
+        DoTestOutgoingSidecarExpectOK(&p, 123, 456);
+        DoTestOutgoingSidecarExpectOK(&p, 3000 * 1024, 2000 * 1024);
         break;
     }
   }
@@ -1568,7 +1568,7 @@
     Stopwatch sw(Stopwatch::ALL_THREADS);
     sw.start();
     for (int i = 0; i < kNumMb / kMbPerRpc; i++) {
-      DoTestOutgoingSidecar(p, sidecars);
+      DoTestOutgoingSidecar(&p, sidecars);
     }
     sw.stop();
     LOG(INFO) << strings::Substitute(
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index 59c7667..d446227 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -357,7 +357,7 @@
   Proxy p(client_messenger_, server_addr_, server_addr_.host(),
           CalculatorService::static_service_name());
 
-  Status s = DoTestSyncCall(p, "DoesNotExist");
+  Status s = DoTestSyncCall(&p, "DoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "with an invalid method name: DoesNotExist");
 }
diff --git a/src/kudu/util/net/sockaddr.cc b/src/kudu/util/net/sockaddr.cc
index 7653617..d34b8ec 100644
--- a/src/kudu/util/net/sockaddr.cc
+++ b/src/kudu/util/net/sockaddr.cc
@@ -227,6 +227,9 @@
 }
 
 std::string Sockaddr::ToString() const {
+  if (!is_initialized()) {
+    return "<uninitialized>";
+  }
   switch (family()) {
     case AF_INET:
       return Substitute("$0:$1", host(), port());
diff --git a/src/kudu/util/net/sockaddr.h b/src/kudu/util/net/sockaddr.h
index e473c8b..6fb9e59 100644
--- a/src/kudu/util/net/sockaddr.h
+++ b/src/kudu/util/net/sockaddr.h
@@ -23,7 +23,7 @@
 
 #include <cstdint>
 #include <string>
-#include <type_traits>
+#include <utility>
 #include <vector>
 
 #include <glog/logging.h>