[rpc] add metric for AcceptorPool's dispatch timing

This patch adds 'acceptor_dispatch_times' histogram metric to track
dispatching times of newly accepted connections by AcceptorPool along
with a very basic unit test for the newly added metric.

Change-Id: I018ddd14414c8d13aaf488fa9eb4db1bf1248cc4
Reviewed-on: http://gerrit.cloudera.org:8080/20790
Tested-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Abhishek Chennaka <achennaka@cloudera.com>
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index a753f34..ec7c86b 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -30,12 +30,15 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/gutil/walltime.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
@@ -52,6 +55,17 @@
                       "Number of incoming UNIX Domain Socket connections made to the RPC server",
                       kudu::MetricLevel::kInfo);
 
+METRIC_DEFINE_histogram(server, acceptor_dispatch_times,
+                        "Acceptor Dispatch Times",
+                        kudu::MetricUnit::kMicroseconds,
+                        "A histogram of dispatching timings for accepted "
+                        "connections. Outliers in this histogram contribute "
+                        "to the latency of handling incoming connection "
+                        "requests and growing the backlog of pending TCP "
+                        "connections to the server.",
+                        kudu::MetricLevel::kInfo,
+                        1000000, 2);
+
 DEFINE_int32(rpc_acceptor_listen_backlog, 128,
              "Socket backlog parameter used when listening for RPC connections. "
              "This defines the maximum length to which the queue of pending "
@@ -74,10 +88,12 @@
       socket_(socket->Release()),
       bind_address_(bind_address),
       closing_(false) {
-  auto& accept_metric = bind_address.is_ip() ?
-      METRIC_rpc_connections_accepted :
-      METRIC_rpc_connections_accepted_unix_domain_socket;
-  rpc_connections_accepted_ = accept_metric.Instantiate(messenger->metric_entity());
+  const auto& metric_entity = messenger->metric_entity();
+  auto& connections_accepted = bind_address.is_ip()
+      ? METRIC_rpc_connections_accepted
+      : METRIC_rpc_connections_accepted_unix_domain_socket;
+  rpc_connections_accepted_ = connections_accepted.Instantiate(metric_entity);
+  dispatch_times_ = METRIC_acceptor_dispatch_times.Instantiate(metric_entity);
 }
 
 AcceptorPool::~AcceptorPool() {
@@ -152,12 +168,26 @@
 }
 
 void AcceptorPool::RunThread() {
+  const int64_t kCyclesPerSecond = static_cast<int64_t>(base::CyclesPerSecond());
+
   while (true) {
     Socket new_sock;
     Sockaddr remote;
     VLOG(2) << Substitute("calling accept() on socket $0 listening on $1",
                           socket_.GetFd(), bind_address_.ToString());
     const auto s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
+    const auto accepted_at = CycleClock::Now();
+    const auto dispatch_times_recorder = MakeScopedCleanup([&]() {
+      // The timings are captured for both success and failure paths, so the
+      // 'dispatch_times_' histogram accounts for all the connection attempts
+      // that lead to successfully extracting an item from the queue of pending
+      // connections for the listened RPC socket. Meanwhile, the
+      // 'rpc_connection_accepted_' counter accounts only for connections that
+      // were successfully dispatched to the messenger for further processing.
+      dispatch_times_->Increment(
+          (CycleClock::Now() - accepted_at) * 1000000 / kCyclesPerSecond);
+    });
+
     if (PREDICT_FALSE(!s.ok())) {
       if (closing_) {
         break;
@@ -177,8 +207,8 @@
         continue;
       }
     }
-    rpc_connections_accepted_->Increment();
     messenger_->RegisterInboundSocket(&new_sock, remote);
+    rpc_connections_accepted_->Increment();
   }
   VLOG(1) << "AcceptorPool shutting down";
 }
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index aa00df6..14b2740 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -29,6 +29,7 @@
 namespace kudu {
 
 class Counter;
+class Histogram;
 class Thread;
 
 namespace rpc {
@@ -70,10 +71,12 @@
   const Sockaddr bind_address_;
   std::vector<scoped_refptr<Thread>> threads_;
 
-  scoped_refptr<Counter> rpc_connections_accepted_;
-
   std::atomic<bool> closing_;
 
+  // Metrics.
+  scoped_refptr<Counter> rpc_connections_accepted_;
+  scoped_refptr<Histogram> dispatch_times_;
+
   DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
 };
 
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index b18bc29..52d9b89 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -82,6 +82,7 @@
 
 METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
 METRIC_DECLARE_counter(timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep);
+METRIC_DECLARE_histogram(acceptor_dispatch_times);
 METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
 METRIC_DECLARE_histogram(rpc_incoming_queue_time);
 
@@ -1422,6 +1423,28 @@
   ASSERT_EQ(1, timed_out_in_queue->value());
 }
 
+// Basic verification for the numbers reported by 'acceptor_dispatch_times'.
+TEST_P(TestRpc, AcceptorDispatchingTimesMetric) {
+  Sockaddr server_addr;
+  ASSERT_OK(StartTestServer(&server_addr));
+
+  {
+    Socket socket;
+    ASSERT_OK(socket.Init(server_addr.family(), 0));
+    ASSERT_OK(socket.Connect(server_addr));
+  }
+
+  scoped_refptr<Histogram> dispatch_times =
+      METRIC_acceptor_dispatch_times.Instantiate(server_messenger_->metric_entity());
+  // Using ASSERT_EVENTUALLY below because of relaxed memory ordering when
+  // fetching metrics' values. Eventually, metrics reports readings that are
+  // consistent with the expected numbers.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(1, dispatch_times->TotalCount());
+    ASSERT_GT(dispatch_times->MaxValueForTests(), 0);
+  });
+}
+
 static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
                                      CountDownLatch* latch) {
   messenger->reset();