IMPALA-9026: Use resolved IP address for statestore subscriber

This change adds a flag (--statestore_subscriber_use_resolved_address)
which, if set to true, allows statestore subscribers to use its
resolved IP address instead of its hostname as the heartbeat
address which statestore sends heartbeats / updates to.

This flag is useful in certain situation in which the subscriber's
DNS entry may not be present for a valid reason (e.g. a Kubernetes
pod whose readiness probe returns false). An example is that there
are multiple Impala coordinators but only one of them will be active
at a time (for admission control reason) and the rest will serve
as backup. In which case, we still want the backup coordinators to
receive updates from statestore but not serve any queries.

Change-Id: Ieb8302dec0e52beb9f0b88306a51c38ff42a63a2
Reviewed-on: http://gerrit.cloudera.org:8080/14388
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 1fea888..42ac3b5 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -26,10 +26,10 @@
 #include "gen-cpp/CatalogObjects_types.h"
 #include "gen-cpp/CatalogService_types.h"
 #include "statestore/statestore-subscriber.h"
+#include "util/collection-metrics.h"
 #include "util/debug-util.h"
 #include "util/event-metrics.h"
 #include "util/logging-support.h"
-#include "util/collection-metrics.h"
 #include "util/metrics.h"
 #include "util/webserver.h"
 
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 172966e..e80a2a7 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -227,7 +227,11 @@
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, backend_port)) {
 
-  // KRPC relies on resolved IP address. It's set in Init().
+  // Resolve hostname to IP address.
+  ABORT_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address_));
+
+  // KRPC relies on resolved IP address.
+  krpc_address_.__set_hostname(ip_address_);
   krpc_address_.__set_port(krpc_port);
   rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
   stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
@@ -346,13 +350,9 @@
   ImpaladMetrics::CreateMetrics(
       exec_env_->metrics()->GetOrCreateChildGroup("impala-server"));
 
-  // Resolve hostname to IP address.
-  RETURN_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address_));
-
   InitMemTracker(bytes_limit);
 
   // Initializes the RPCMgr, ControlServices and DataStreamServices.
-  krpc_address_.__set_hostname(ip_address_);
   // Initialization needs to happen in the following order due to dependencies:
   // - RPC manager, DataStreamService and DataStreamManager.
   RETURN_IF_ERROR(rpc_mgr_->Init());
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 6952fe5..6f4973d 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -59,6 +59,9 @@
     "RPC connection to the statestore. A setting of 0 means retry indefinitely");
 DEFINE_int32(statestore_subscriber_cnxn_retry_interval_ms, 3000, "The interval, in ms, "
     "to wait between attempts to make an RPC connection to the statestore.");
+DEFINE_bool(statestore_subscriber_use_resolved_address, false, "If set to true, the "
+    "subscriber will register with statestore using its resolved IP address. Note that "
+    "using resolved IP address may cause mismatch with the TLS certificate.");
 DEFINE_int64_hidden(statestore_subscriber_recovery_grace_period_ms, 30000L, "Period "
     "after the last successful subscription attempt until the subscriber will be "
     "considered fully recovered. After a successful reconnect attempt, updates to the "
@@ -250,6 +253,16 @@
     RETURN_IF_ERROR(builder.Build(&server));
     heartbeat_server_.reset(server);
     RETURN_IF_ERROR(heartbeat_server_->Start());
+
+    // Resolve the heartbeat address if necessary. Also specify the port which
+    // the heartbeat server is listening on.
+    if (FLAGS_statestore_subscriber_use_resolved_address) {
+      IpAddr ip_address;
+      RETURN_IF_ERROR(HostnameToIpAddr(heartbeat_address_.hostname, &ip_address));
+      heartbeat_address_.hostname = ip_address;
+      LOG(INFO) << Substitute("Registering with statestore with resolved address $0",
+          ip_address);
+    }
     heartbeat_address_.port = heartbeat_server_->port();
 
     LOG(INFO) << "Registering with statestore";
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index cc566a9..4e3aefe 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -198,6 +198,8 @@
 
   /// Address that the heartbeat service should be started on. Initialised in constructor,
   /// updated in Start() with the actual port if the wildcard port 0 was specified.
+  /// If FLAGS_statestore_subscriber_use_resolved_address is true, this is set to the
+  /// resolved IP address in Start().
   TNetworkAddress heartbeat_address_;
 
   /// Set to true after Register(...) is successful, after which no
diff --git a/tests/custom_cluster/test_custom_statestore.py b/tests/custom_cluster/test_custom_statestore.py
index ee810ed..742ba19 100644
--- a/tests/custom_cluster/test_custom_statestore.py
+++ b/tests/custom_cluster/test_custom_statestore.py
@@ -86,3 +86,17 @@
       else:
         assert 'Maximum subscriber limit reached:' in ''.join(response.status.error_msgs)
         return
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--statestore_subscriber_use_resolved_address=true",
+      catalogd_args="--statestore_subscriber_use_resolved_address=true")
+  def test_subscriber_with_resolved_address(self, vector):
+    # Ensure cluster has started up by running a query.
+    result = self.execute_query("select count(*) from functional_parquet.alltypes")
+    assert result.success, str(result)
+
+    self.assert_impalad_log_contains("INFO",
+        "Registering with statestore with resolved address")
+    self.assert_catalogd_log_contains("INFO",
+        "Registering with statestore with resolved address")