[util] allow DnsResolver to refresh DNS addresses

This patch introduces functionality to the DnsResolver to refresh an
address, rather than looking it up in the cache. It does this by
removing any cached entry and performing the lookup.

This will be used in a follow-up change to refresh the address on
certain transient failures.

A new --dns_addr_resolution_override flag is also introduced for testing
purposes.

Change-Id: I0616f3e6fb50aba271f106b05d1926fc46a53ed0
Reviewed-on: http://gerrit.cloudera.org:8080/17849
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/src/kudu/util/net/dns_resolver-test.cc b/src/kudu/util/net/dns_resolver-test.cc
index f53df7d..b861c68 100644
--- a/src/kudu/util/net/dns_resolver-test.cc
+++ b/src/kudu/util/net/dns_resolver-test.cc
@@ -21,9 +21,10 @@
 #include <cstdlib>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <vector>
 
-#include <gflags/gflags_declare.h>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -34,10 +35,13 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/test_macros.h"
 
+DECLARE_string(dns_addr_resolution_override);
 DECLARE_uint32(dns_resolver_cache_capacity_mb);
 
+using std::thread;
 using std::vector;
 using strings::Substitute;
 
@@ -59,6 +63,92 @@
   }
 }
 
+TEST(DnsResolverTest, RefreshCachedEntry) {
+  gflags::FlagSaver saver;
+  vector<Sockaddr> addrs;
+  DnsResolver resolver(1/* max_threads_num */, 1024 * 1024/* cache_capacity_bytes */);
+  ASSERT_OK(resolver.ResolveAddresses(HostPort("localhost", 12345), &addrs));
+  ASSERT_TRUE(!addrs.empty());
+  for (const Sockaddr& addr : addrs) {
+    LOG(INFO) << "Address: " << addr.ToString();
+    EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+    EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+  }
+  // If we override the DNS lookup address, when we refresh the address, the
+  // cached entry gets reset.
+  constexpr const char* kFakeAddr = "1.1.1.1";
+  FLAGS_dns_addr_resolution_override = Substitute("localhost=$0", kFakeAddr);
+  Synchronizer s;
+  resolver.RefreshAddressesAsync(HostPort("localhost", 1111), &addrs,
+                                 s.AsStatusCallback());
+  ASSERT_OK(s.Wait());
+  ASSERT_EQ(1, addrs.size());
+  ASSERT_EQ(Substitute("$0:1111", kFakeAddr), addrs[0].ToString());
+  ASSERT_EQ(1111, addrs[0].port());
+
+  // Once we stop overriding DNS lookups, simply getting the address from the
+  // resolver will read from the cache.
+  FLAGS_dns_addr_resolution_override = "";
+  ASSERT_OK(resolver.ResolveAddresses(HostPort("localhost", 12345), &addrs));
+  ASSERT_EQ(1, addrs.size());
+  ASSERT_EQ(Substitute("$0:12345", kFakeAddr), addrs[0].ToString());
+  ASSERT_EQ(12345, addrs[0].port());
+
+  // But a refresh should return the original address.
+  Synchronizer s2;
+  resolver.RefreshAddressesAsync(HostPort("localhost", 12345), &addrs,
+                                 s2.AsStatusCallback());
+  ASSERT_OK(s2.Wait());
+  EXPECT_FALSE(addrs.empty());
+  for (const Sockaddr& addr : addrs) {
+    LOG(INFO) << "Address: " << addr.ToString();
+    EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+    EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+  }
+}
+
+TEST(DnsResolverTest, ConcurrentRefreshesAndResolutions) {
+  constexpr int kNumThreads = 3;
+  constexpr int kNumResolutionsPerThread = 10;
+  DnsResolver resolver(1/* max_threads_num */, 1024 * 1024/* cache_capacity_bytes */);
+  vector<thread> threads;
+  auto cancel_threads = MakeScopedCleanup([&] {
+    for (auto& t : threads) {
+      t.join();
+    }
+  });
+  const auto validate_addrs = [] (const vector<Sockaddr>& addrs) {
+    ASSERT_FALSE(addrs.empty());
+    for (const Sockaddr& addr : addrs) {
+      EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+      EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+    }
+  };
+  for (int i = 0; i < kNumThreads - 1; i++) {
+    threads.emplace_back([&] {
+      for (int r = 0; r < kNumResolutionsPerThread; r++) {
+        vector<Sockaddr> addrs;
+        Synchronizer s;
+        resolver.RefreshAddressesAsync(HostPort("localhost", 12345), &addrs,
+                                       s.AsStatusCallback());
+        ASSERT_OK(s.Wait());
+        NO_FATALS(validate_addrs(addrs));
+      }
+    });
+  }
+  threads.emplace_back([&] {
+    for (int r = 0; r < kNumResolutionsPerThread; r++) {
+      vector<Sockaddr> addrs;
+      ASSERT_OK(resolver.ResolveAddresses(HostPort("localhost", 12345), &addrs));
+      NO_FATALS(validate_addrs(addrs));
+    }
+  });
+  for (auto& t : threads) {
+    t.join();
+  }
+  cancel_threads.cancel();
+}
+
 TEST(DnsResolverTest, CachingVsNonCachingResolver) {
   constexpr const auto kNumIterations = 1000;
   constexpr const auto kIdxNonCached = 0;
@@ -80,7 +170,7 @@
       }
       ASSERT_TRUE(!addrs.empty());
       for (const Sockaddr& addr : addrs) {
-        EXPECT_TRUE(HasSuffixString(addr.ToString(), Substitute(":$0", port)));
+        EXPECT_TRUE(HasSuffixString(addr.ToString(), Substitute(":$0", port))) << addr.ToString();
       }
     }
     timings[idx] = MonoTime::Now() - start_time;
diff --git a/src/kudu/util/net/dns_resolver.cc b/src/kudu/util/net/dns_resolver.cc
index 6a29811..5eb9f0d 100644
--- a/src/kudu/util/net/dns_resolver.cc
+++ b/src/kudu/util/net/dns_resolver.cc
@@ -92,6 +92,26 @@
   }
 }
 
+void DnsResolver::RefreshAddressesAsync(const HostPort& hostport,
+                                        vector<Sockaddr>* addresses,
+                                        const StatusCallback& cb) {
+  if (PREDICT_TRUE(cache_)) {
+    cache_->Erase(hostport.host());
+  }
+  const auto s = pool_->Submit([=]() {
+    // Before performing the resolution, check if another task has already
+    // resolved it and cached a new entry.
+    if (this->GetCachedAddresses(hostport, addresses)) {
+      cb(Status::OK());
+      return;
+    }
+    this->DoResolutionCb(hostport, addresses, cb);
+  });
+  if (!s.ok()) {
+    cb(s);
+  }
+}
+
 Status DnsResolver::DoResolution(const HostPort& hostport,
                                  vector<Sockaddr>* addresses) {
   vector<Sockaddr> resolved_addresses;
diff --git a/src/kudu/util/net/dns_resolver.h b/src/kudu/util/net/dns_resolver.h
index 6cde473..3031a6b 100644
--- a/src/kudu/util/net/dns_resolver.h
+++ b/src/kudu/util/net/dns_resolver.h
@@ -77,6 +77,12 @@
                              std::vector<Sockaddr>* addresses,
                              const StatusCallback& cb);
 
+  // Like ResolveAddressesAsync(), but initially removes any existing cached
+  // entry, in favor of resolving the address explicitly.
+  void RefreshAddressesAsync(const HostPort& hostport,
+                             std::vector<Sockaddr>* addresses,
+                             const StatusCallback& cb);
+
  private:
   // The cache is keyed by the host part of the HostPort structure, and the
   // entry stores a vector of all Sockaddr structures produced by DNS resolution
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index 2f1c671..aba828e 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -55,6 +55,7 @@
 #include "kudu/util/net/socket.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
+#include "kudu/util/string_case.h"
 #include "kudu/util/subprocess.h"
 #include "kudu/util/thread_restrictions.h"
 #include "kudu/util/trace.h"
@@ -71,6 +72,12 @@
               "dns resolution attempts. Only takes effect if --fail_dns_resolution is 'true'.");
 TAG_FLAG(fail_dns_resolution_hostports, hidden);
 
+DEFINE_string(dns_addr_resolution_override, "",
+              "Comma-separated list of '='-separated pairs of hosts to addresses. The left-hand "
+              "side of the '=' is taken as a host, and will resolve to the right-hand side which "
+              "is expected to be a socket address with no port.");
+TAG_FLAG(dns_addr_resolution_override, hidden);
+
 using std::function;
 using std::string;
 using std::unordered_set;
@@ -193,6 +200,22 @@
   TRACE_EVENT1("net", "HostPort::ResolveAddresses",
                "host", host_);
   TRACE_COUNTER_SCOPE_LATENCY_US("dns_us");
+  if (PREDICT_FALSE(!FLAGS_dns_addr_resolution_override.empty())) {
+    vector<string> hosts_and_addrs = Split(FLAGS_dns_addr_resolution_override, ",");
+    for (const auto& ha : hosts_and_addrs) {
+      vector<string> host_and_addr = Split(ha, "=");
+      if (host_and_addr.size() != 2) {
+        return Status::InvalidArgument("failed to parse injected address override");
+      }
+      if (iequals(host_and_addr[0], host_)) {
+        Sockaddr addr;
+        RETURN_NOT_OK_PREPEND(addr.ParseString(host_and_addr[1], port_),
+            "failed to parse injected address override");
+        *addresses = { addr };
+        return Status::OK();
+      }
+    }
+  }
   struct addrinfo hints;
   memset(&hints, 0, sizeof(hints));
   hints.ai_family = AF_INET;
diff --git a/src/kudu/util/ttl_cache.h b/src/kudu/util/ttl_cache.h
index 5ee6233..5c9c459 100644
--- a/src/kudu/util/ttl_cache.h
+++ b/src/kudu/util/ttl_cache.h
@@ -180,6 +180,10 @@
     return EntryHandle(DCHECK_NOTNULL(entry_ptr->val_ptr), std::move(h));
   }
 
+  void Erase(const K& key) {
+    cache_->Erase(key);
+  }
+
   // For the specified key, add an entry into the cache or replace already
   // existing one. The 'charge' parameter specifies the charge to associate
   // with the entry with regard to the cache's capacity. This method returns