[rpc] add metric for AcceptorPool's dispatch timing
This patch adds 'acceptor_dispatch_times' histogram metric to track
dispatching times of newly accepted connections by AcceptorPool along
with a very basic unit test for the newly added metric.
Change-Id: I018ddd14414c8d13aaf488fa9eb4db1bf1248cc4
Reviewed-on: http://gerrit.cloudera.org:8080/20790
Tested-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Abhishek Chennaka <achennaka@cloudera.com>
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index a753f34..ec7c86b 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -30,12 +30,15 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/gutil/walltime.h"
#include "kudu/rpc/messenger.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
@@ -52,6 +55,17 @@
"Number of incoming UNIX Domain Socket connections made to the RPC server",
kudu::MetricLevel::kInfo);
+METRIC_DEFINE_histogram(server, acceptor_dispatch_times,
+ "Acceptor Dispatch Times",
+ kudu::MetricUnit::kMicroseconds,
+ "A histogram of dispatching timings for accepted "
+ "connections. Outliers in this histogram contribute "
+ "to the latency of handling incoming connection "
+ "requests and growing the backlog of pending TCP "
+ "connections to the server.",
+ kudu::MetricLevel::kInfo,
+ 1000000, 2);
+
DEFINE_int32(rpc_acceptor_listen_backlog, 128,
"Socket backlog parameter used when listening for RPC connections. "
"This defines the maximum length to which the queue of pending "
@@ -74,10 +88,12 @@
socket_(socket->Release()),
bind_address_(bind_address),
closing_(false) {
- auto& accept_metric = bind_address.is_ip() ?
- METRIC_rpc_connections_accepted :
- METRIC_rpc_connections_accepted_unix_domain_socket;
- rpc_connections_accepted_ = accept_metric.Instantiate(messenger->metric_entity());
+ const auto& metric_entity = messenger->metric_entity();
+ auto& connections_accepted = bind_address.is_ip()
+ ? METRIC_rpc_connections_accepted
+ : METRIC_rpc_connections_accepted_unix_domain_socket;
+ rpc_connections_accepted_ = connections_accepted.Instantiate(metric_entity);
+ dispatch_times_ = METRIC_acceptor_dispatch_times.Instantiate(metric_entity);
}
AcceptorPool::~AcceptorPool() {
@@ -152,12 +168,26 @@
}
void AcceptorPool::RunThread() {
+ const int64_t kCyclesPerSecond = static_cast<int64_t>(base::CyclesPerSecond());
+
while (true) {
Socket new_sock;
Sockaddr remote;
VLOG(2) << Substitute("calling accept() on socket $0 listening on $1",
socket_.GetFd(), bind_address_.ToString());
const auto s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
+ const auto accepted_at = CycleClock::Now();
+ const auto dispatch_times_recorder = MakeScopedCleanup([&]() {
+ // The timings are captured for both success and failure paths, so the
+ // 'dispatch_times_' histogram accounts for all the connection attempts
+ // that lead to successfully extracting an item from the queue of pending
+ // connections for the listened RPC socket. Meanwhile, the
+ // 'rpc_connection_accepted_' counter accounts only for connections that
+ // were successfully dispatched to the messenger for further processing.
+ dispatch_times_->Increment(
+ (CycleClock::Now() - accepted_at) * 1000000 / kCyclesPerSecond);
+ });
+
if (PREDICT_FALSE(!s.ok())) {
if (closing_) {
break;
@@ -177,8 +207,8 @@
continue;
}
}
- rpc_connections_accepted_->Increment();
messenger_->RegisterInboundSocket(&new_sock, remote);
+ rpc_connections_accepted_->Increment();
}
VLOG(1) << "AcceptorPool shutting down";
}
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index aa00df6..14b2740 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -29,6 +29,7 @@
namespace kudu {
class Counter;
+class Histogram;
class Thread;
namespace rpc {
@@ -70,10 +71,12 @@
const Sockaddr bind_address_;
std::vector<scoped_refptr<Thread>> threads_;
- scoped_refptr<Counter> rpc_connections_accepted_;
-
std::atomic<bool> closing_;
+ // Metrics.
+ scoped_refptr<Counter> rpc_connections_accepted_;
+ scoped_refptr<Histogram> dispatch_times_;
+
DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
};
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index b18bc29..52d9b89 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -82,6 +82,7 @@
METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_counter(timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep);
+METRIC_DECLARE_histogram(acceptor_dispatch_times);
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_histogram(rpc_incoming_queue_time);
@@ -1422,6 +1423,28 @@
ASSERT_EQ(1, timed_out_in_queue->value());
}
+// Basic verification for the numbers reported by 'acceptor_dispatch_times'.
+TEST_P(TestRpc, AcceptorDispatchingTimesMetric) {
+ Sockaddr server_addr;
+ ASSERT_OK(StartTestServer(&server_addr));
+
+ {
+ Socket socket;
+ ASSERT_OK(socket.Init(server_addr.family(), 0));
+ ASSERT_OK(socket.Connect(server_addr));
+ }
+
+ scoped_refptr<Histogram> dispatch_times =
+ METRIC_acceptor_dispatch_times.Instantiate(server_messenger_->metric_entity());
+ // Using ASSERT_EVENTUALLY below because of relaxed memory ordering when
+ // fetching metrics' values. Eventually, metrics reports readings that are
+ // consistent with the expected numbers.
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(1, dispatch_times->TotalCount());
+ ASSERT_GT(dispatch_times->MaxValueForTests(), 0);
+ });
+}
+
static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
CountDownLatch* latch) {
messenger->reset();