IMPALA-12699: Set timeout for catalog RPCs

We have seen trivial GetPartialCatalogObject RPCs hanging in coordinator
side, e.g. IMPALA-11409. Due to the piggyback mechanism of fetching
metadata in local-catalog mode (see comments in
CatalogdMetaProvider#loadWithCaching()), a hanging RPC on shared
metadata (e.g. db/table list) could block other queries on the same
coordinator.

Such lightweight requests don't need to acquire table lock or trigger
table loading in catalogd. The causes of the hanging are usually
network issues, e.g. TCP connection become half open due to TCP
retransmissions timed out. A retry on the RPC helps to recover from such
failures. Currently, the timeout for catalog RPC is set to 0 by default.
This prevent the retry and let the client to wait infinitely.

This patch distinguishes the lightweight catalog RPCs and uses a
dedicated catalogd client cache for them. They use a timeout of 30 mins
which is longer enough to tolerate TCP retransmission timeouts.
Also sets a timeout of 10 hours for other catalog RPCs. Operations take
longer than that are usually abnormal and hanging.

Tests
 - Add e2e test to verify the lightweight RPC client cache is used.
 - Adjust TestRestart.test_catalog_connection_retries to use local
   catalog mode since in the legacy catalog mode, coordinator only sends
   PrioritizeLoad requests which are lightweight RPCs.

This is a continuation of patch by Wenzhe Zhou <wzhou@cloudera.com>

Change-Id: Iad39a79d0c89f2b04380f610a7e60558429e9c6e
Reviewed-on: http://gerrit.cloudera.org:8080/21146
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 985928a..0691d30 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -375,9 +375,13 @@
   if (FLAGS_inject_latency_before_catalog_fetch_ms > 0) {
     SleepForMs(FLAGS_inject_latency_before_catalog_fetch_ms);
   }
+  // Non-table requests are lightweight requests that won't be blocked by table loading
+  // or table locks. Note that when loading table list of a db, the type is DB.
+  auto client_cache_ptr = (req.object_desc.type == TCatalogObjectType::TABLE) ?
+      env_->catalogd_client_cache() : env_->catalogd_lightweight_req_client_cache();
   int attempt = 0; // Used for debug action only.
   CatalogServiceConnection::RpcStatus rpc_status =
-      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+      CatalogServiceConnection::DoRpcWithRetry(client_cache_ptr,
           *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
           &CatalogServiceClientWrapper::GetPartialCatalogObject, req,
           FLAGS_catalog_client_connection_num_retries,
@@ -399,7 +403,8 @@
     TPrioritizeLoadResponse* result) {
   int attempt = 0; // Used for debug action only.
   CatalogServiceConnection::RpcStatus rpc_status =
-      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+      CatalogServiceConnection::DoRpcWithRetry(
+          env_->catalogd_lightweight_req_client_cache(),
           *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
           &CatalogServiceClientWrapper::PrioritizeLoad, req,
           FLAGS_catalog_client_connection_num_retries,
@@ -427,7 +432,11 @@
   TUpdateTableUsageResponse* resp) {
   int attempt = 0; // Used for debug action only.
   CatalogServiceConnection::RpcStatus rpc_status =
-      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+      CatalogServiceConnection::DoRpcWithRetry(
+          // The operation doesn't require table lock in catalogd. It doesn't require
+          // the table being loaded neither. So we can use clients for lightweight
+          // requests.
+          env_->catalogd_lightweight_req_client_cache(),
           *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
           &CatalogServiceClientWrapper::UpdateTableUsage, req,
           FLAGS_catalog_client_connection_num_retries,
@@ -441,7 +450,8 @@
     const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse* result) {
   int attempt = 0; // Used for debug action only.
   CatalogServiceConnection::RpcStatus rpc_status =
-      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+      CatalogServiceConnection::DoRpcWithRetry(
+          env_->catalogd_lightweight_req_client_cache(),
           *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
           &CatalogServiceClientWrapper::GetNullPartitionName, req,
           FLAGS_catalog_client_connection_num_retries,
@@ -455,7 +465,8 @@
     const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result) {
   int attempt = 0; // Used for debug action only.
   CatalogServiceConnection::RpcStatus rpc_status =
-      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+      CatalogServiceConnection::DoRpcWithRetry(
+          env_->catalogd_lightweight_req_client_cache(),
           *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
           &CatalogServiceClientWrapper::GetLatestCompactions, req,
           FLAGS_catalog_client_connection_num_retries,
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index fdf44c1..c1936a2 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -233,17 +233,24 @@
   }
 }
 
-void ClientCacheHelper::InitMetrics(MetricGroup* metrics, const string& key_prefix) {
+void ClientCacheHelper::InitMetrics(MetricGroup* metrics, const string& key_prefix,
+    const string& key_appendix) {
   DCHECK(metrics != NULL);
   // Not strictly needed if InitMetrics is called before any cache usage, but ensures that
   // metrics_enabled_ is published.
   lock_guard<mutex> lock(cache_lock_);
   stringstream count_ss;
   count_ss << key_prefix << ".client-cache.clients-in-use";
+  if (!key_appendix.empty()) {
+    count_ss << "-" << key_appendix;
+  }
   clients_in_use_metric_ = metrics->AddGauge(count_ss.str(), 0);
 
   stringstream max_ss;
   max_ss << key_prefix << ".client-cache.total-clients";
+  if (!key_appendix.empty()) {
+    max_ss << "-" << key_appendix;
+  }
   total_clients_metric_ = metrics->AddGauge(max_ss.str(), 0);
   metrics_enabled_ = true;
 }
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 018c366..cb80e0b 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -123,7 +123,8 @@
 
   /// Creates two metrics for this cache measuring the number of clients currently used,
   /// and the total number in the cache.
-  void InitMetrics(MetricGroup* metrics, const std::string& key_prefix);
+  void InitMetrics(MetricGroup* metrics, const std::string& key_prefix,
+      const std::string& key_appendix);
 
  private:
   template <class T> friend class ClientCache;
@@ -447,8 +448,9 @@
   /// metrics have keys that are prefixed by the key_prefix argument
   /// (which should not end in a period).
   /// Must be called before the cache is used, otherwise the metrics might be wrong
-  void InitMetrics(MetricGroup* metrics, const std::string& key_prefix) {
-    client_cache_helper_.InitMetrics(metrics, key_prefix);
+  void InitMetrics(MetricGroup* metrics, const std::string& key_prefix,
+      const std::string& key_appendix = "") {
+    client_cache_helper_.InitMetrics(metrics, key_prefix, key_appendix);
   }
 
  private:
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index d0430dd..fb1ed73 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -161,8 +161,13 @@
 
 DEFINE_int32(catalog_client_connection_num_retries, 10, "The number of times connections "
     "or RPCs to the catalog should be retried.");
-DEFINE_int32(catalog_client_rpc_timeout_ms, 0, "(Advanced) The underlying TSocket "
-    "send/recv timeout in milliseconds for a catalog client RPC.");
+DEFINE_int32(catalog_client_rpc_timeout_ms, 36000000, "(Advanced) The underlying TSocket "
+    "send/recv timeout in milliseconds for a catalog client RPC. The default is 10 hours."
+    " Operations take longer than this are usually abnormal and hanging.");
+DEFINE_int32(catalog_lightweight_rpc_timeout_ms, 1800000, "(Advanced) The underlying "
+    "TSocket send/recv timeout in milliseconds for a lightweight catalog RPC which "
+    "shouldn't take long in catalogd, e.g. fetching db/table list. The default is 30 "
+    "minutes which is long enough to tolerate TCP timeout due to retransmission.");
 DEFINE_int32(catalog_client_rpc_retry_interval_ms, 3000, "(Advanced) The time to wait "
     "before retrying when the catalog RPC client fails to connect to catalogd or when "
     "RPCs to the catalogd fail.");
@@ -225,6 +230,10 @@
     catalogd_client_cache_(new CatalogServiceClientCache(1, 0,
         FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "",
         !FLAGS_ssl_client_ca_certificate.empty())),
+    catalogd_lightweight_req_client_cache_(new CatalogServiceClientCache(1, 0,
+        FLAGS_catalog_lightweight_rpc_timeout_ms,
+        FLAGS_catalog_lightweight_rpc_timeout_ms, "",
+        !FLAGS_ssl_client_ca_certificate.empty())),
     htable_factory_(new HBaseTableFactory()),
     disk_io_mgr_(new io::DiskIoMgr()),
     webserver_(new Webserver(FLAGS_webserver_interface, webserver_port, metrics_.get())),
@@ -429,6 +438,8 @@
     RETURN_IF_ERROR(metrics_webserver_->Start());
   }
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
+  catalogd_lightweight_req_client_cache_->InitMetrics(
+      metrics_.get(), "catalog.server", "for-lightweight-rpc");
   RETURN_IF_ERROR(RegisterMemoryMetrics(
       metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
   // Initialize impalad metrics
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 53f8f68..3a87df6 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -127,6 +127,9 @@
   CatalogServiceClientCache* catalogd_client_cache() {
     return catalogd_client_cache_.get();
   }
+  CatalogServiceClientCache* catalogd_lightweight_req_client_cache() {
+    return catalogd_lightweight_req_client_cache_.get();
+  }
   HBaseTableFactory* htable_factory() { return htable_factory_.get(); }
   io::DiskIoMgr* disk_io_mgr() { return disk_io_mgr_.get(); }
   Webserver* webserver() { return webserver_.get(); }
@@ -232,6 +235,7 @@
   boost::scoped_ptr<AdmissionController> admission_controller_;
   boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
   boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_;
+  boost::scoped_ptr<CatalogServiceClientCache> catalogd_lightweight_req_client_cache_;
   boost::scoped_ptr<HBaseTableFactory> htable_factory_;
   boost::scoped_ptr<io::DiskIoMgr> disk_io_mgr_;
   boost::scoped_ptr<Webserver> webserver_;
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 7cfc5d1..5cbd478 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -522,6 +522,26 @@
     "key": "catalog.server.client-cache.total-clients"
   },
   {
+    "description": "The number of clients currently in use by the Catalog Server client cache for lightweight RPC.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Catalog Server Client Cache Clients In Use for lightweight RPC",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "catalog.server.client-cache.clients-in-use-for-lightweight-rpc"
+  },
+  {
+    "description": "The total number of clients in the Catalog Server client cache for lightweight RPC.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Catalog Server Client Cache Total Clients for lightweight RPC",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "catalog.server.client-cache.total-clients-for-lightweight-rpc"
+  },
+  {
     "description": "The full version string of the Catalog Server.",
     "contexts": [
       "CATALOGSERVER"
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index d958cd7..93a587a 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -560,6 +560,20 @@
     finally:
       client.close()
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_catalog=true",
+      catalogd_args="--catalog_topic_mode=minimal")
+  def test_lightweight_rpc_metrics(self):
+    """Verify catalogd client cache for lightweight RPCs is used correctly"""
+    # Fetching the db and table list should be lightweight requests
+    self.execute_query("describe database functional")
+    self.execute_query("show tables in functional")
+    impalad = self.cluster.impalads[0].service
+    assert 0 == impalad.get_metric_value("catalog.server.client-cache.total-clients")
+    assert 1 == impalad.get_metric_value(
+        "catalog.server.client-cache.total-clients-for-lightweight-rpc")
+
 
 class TestFullAcid(CustomClusterTestSuite):
   @classmethod
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index cd0f765..bc9e803 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -118,6 +118,9 @@
       self.wait_for_state(handle, QueryState.EXCEPTION, 20, client=client)
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      catalogd_args="--catalog_topic_mode=minimal",
+      impalad_args="--use_local_catalog=true")
   def test_catalog_connection_retries(self):
     """Test that connections to the catalogd are retried, both new connections and cached
     connections."""