IMPALA-9128: part 2: dump traces for slow RPCs

This adds trace events for data stream RPCs and
dumps them when they take longer than
--impala_slow_rpc_threshold_ms.

I needed to modify the KRPC code to do this because it
currently only dumps traces for RPCs with deadlines.
I plan to add some version of this upstream in Kudu
so that we don't diverge our KRPC implementation.

Example output from test_exchange_small_buffer:

I1111 08:38:53.732910 26509 rpcz_store.cc:265] Call impala.DataStreamService.TransmitData from 127.0.0.1:42434 (request call id 43) took 7799ms. Request Metrics: {}
I1111 08:38:53.732928 26509 rpcz_store.cc:269] Trace:
1111 08:38:45.933412 (+     0us) impala-service-pool.cc:167] Inserting onto call queue
1111 08:38:45.933449 (+    37us) impala-service-pool.cc:254] Handling call
1111 08:38:45.933470 (+    21us) krpc-data-stream-mgr.cc:227] Added early sender
1111 08:38:47.906542 (+1973072us) krpc-data-stream-recvr.cc:327] Enqueuing deferred RPC
1111 08:38:53.732858 (+5826316us) krpc-data-stream-recvr.cc:506] Processing deferred RPC
1111 08:38:53.732860 (+     2us) krpc-data-stream-recvr.cc:399] Deserializing batch
1111 08:38:53.732888 (+    28us) krpc-data-stream-recvr.cc:426] Enqueuing deserialized batch
1111 08:38:53.732895 (+     7us) inbound_call.cc:162] Queueing success response

Disabled +-clang-diagnostic-gnu-zero-variadic-macro-arguments because it
had false positives on the TRACE_TO invocations.

Testing:
* Ran exhaustive and ASAN tests
* Ran stress test

Change-Id: Ic7af4b45c43ec731d742d3696112c5f800849947
Reviewed-on: http://gerrit.cloudera.org:8080/14668
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/.clang-tidy b/.clang-tidy
index a99d528..faf4b7b 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -44,6 +44,7 @@
 -clang-diagnostic-float-equal,\
 -clang-diagnostic-global-constructors,\
 -clang-diagnostic-gnu-anonymous-struct,\
+-clang-diagnostic-gnu-zero-variadic-macro-arguments,\
 -clang-diagnostic-header-hygiene,\
 -clang-diagnostic-implicit-fallthrough,\
 -clang-diagnostic-missing-prototypes,\
diff --git a/be/src/kudu/rpc/rpcz_store.cc b/be/src/kudu/rpc/rpcz_store.cc
index 2f0e9c8..fd4c644 100644
--- a/be/src/kudu/rpc/rpcz_store.cc
+++ b/be/src/kudu/rpc/rpcz_store.cc
@@ -263,7 +263,11 @@
     call->trace()->Dump(&LOG(INFO), true);
   } else if (duration_ms > FLAGS_rpc_duration_too_long_ms) {
     LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. "
-              << "Request Metrics: " << call->trace()->MetricsAsJSON();
+              << "Request Metrics: " << call->trace()->MetricsAsJSON() << "\n";
+    string s = call->trace()->DumpToString();
+    if (!s.empty()) {
+      LOG(INFO) << "Trace:\n" << s;
+    }
   }
 }
 
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 855d6d0..89cea22 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -24,6 +24,7 @@
 
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/trace.h"
 
 #include "exec/kudu-util.h"
 #include "runtime/exec-env.h"
@@ -223,10 +224,12 @@
     // closed_stream_cache_), the sender is timed out by the maintenance thread.
     if (!already_unregistered && recvr == nullptr) {
       AddEarlySender(finst_id, request, response, rpc_context);
+      TRACE_TO(rpc_context->trace(), "Added early sender");
       return;
     }
   }
   if (already_unregistered) {
+    TRACE_TO(rpc_context->trace(), "Sender already unregistered");
     // The receiver may remove itself from the receiver map via DeregisterRecvr() at any
     // time without considering the remaining number of senders. As a consequence,
     // FindRecvr() may return nullptr even though the receiver was once present. We
@@ -282,13 +285,19 @@
     // rows if no rows are materialized at all in the sender side.
     if (!already_unregistered && recvr == nullptr) {
       AddEarlyClosedSender(finst_id, request, response, rpc_context);
+      TRACE_TO(rpc_context->trace(), "Added early closed sender");
       return;
     }
   }
 
   // If we reach this point, either the receiver is found or it has been unregistered
   // already. In either cases, it's safe to just return an OK status.
-  if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
+  TRACE_TO(
+      rpc_context->trace(), "Found receiver? $0", recvr != nullptr ? "true" : "false");
+  if (LIKELY(recvr != nullptr)) {
+    recvr->RemoveSender(request->sender_id());
+    TRACE_TO(rpc_context->trace(), "Removed sender from receiver");
+  }
   DataStreamService::RespondAndReleaseRpc(Status::OK(), response, rpc_context,
       service_mem_tracker_);
 }
@@ -348,6 +357,7 @@
 
 template<typename ContextType, typename RequestPBType>
 void KrpcDataStreamMgr::RespondToTimedOutSender(const std::unique_ptr<ContextType>& ctx) {
+  TRACE_TO(ctx->rpc_context->trace(), "Timed out sender");
   const RequestPBType* request = ctx->request;
   TUniqueId finst_id;
   finst_id.__set_lo(request->dest_fragment_instance_id().lo());
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 2187c85..b098a5e 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -26,6 +26,7 @@
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/trace.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/krpc-data-stream-mgr.h"
@@ -151,7 +152,7 @@
   // failed. Returns OK otherwise.
   Status AddBatchWork(int64_t batch_size, const RowBatchHeaderPB& header,
       const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
-      unique_lock<SpinLock>* lock) WARN_UNUSED_RESULT;
+      unique_lock<SpinLock>* lock, RpcContext* rpc_context) WARN_UNUSED_RESULT;
 
   // Receiver of which this queue is a member.
   KrpcDataStreamRecvr* recvr_;
@@ -323,6 +324,7 @@
 void KrpcDataStreamRecvr::SenderQueue::EnqueueDeferredRpc(
     unique_ptr<TransmitDataCtx> payload, const unique_lock<SpinLock>& lock) {
   DCHECK(lock.owns_lock());
+  TRACE_TO(payload->rpc_context->trace(), "Enqueuing deferred RPC");
   if (deferred_rpcs_.empty()) has_deferred_rpcs_start_time_ns_ = MonotonicNanos();
   deferred_rpcs_.push(move(payload));
   recvr_->num_deferred_rpcs_.Add(1);
@@ -379,7 +381,8 @@
 
 Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
     const RowBatchHeaderPB& header, const kudu::Slice& tuple_offsets,
-    const kudu::Slice& tuple_data, unique_lock<SpinLock>* lock) {
+    const kudu::Slice& tuple_data, unique_lock<SpinLock>* lock,
+    RpcContext* rpc_context) {
   DCHECK(lock != nullptr);
   DCHECK(lock->owns_lock());
   DCHECK(!is_cancelled_);
@@ -393,6 +396,7 @@
   // Deserialization may take some time due to compression and memory allocation.
   // Drop the lock so we can deserialize multiple batches in parallel.
   lock->unlock();
+  TRACE_TO(rpc_context->trace(), "Deserializing batch");
   unique_ptr<RowBatch> batch;
   Status status;
   {
@@ -414,10 +418,12 @@
     recvr_->num_buffered_bytes_.Add(-batch_size);
     VLOG_QUERY << "Failed to deserialize batch for "
                << PrintId(recvr_->fragment_instance_id());
+    TRACE_TO(rpc_context->trace(), "Failed to deserialize batch: $0", status.GetDetail());
     MarkErrorStatus(status, *lock);
     return status;
   }
   VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
+  TRACE_TO(rpc_context->trace(), "Enqueuing deserialized batch");
   COUNTER_ADD(recvr_->total_enqueued_batches_counter_, 1);
   batch_queue_.emplace_back(batch_size, move(batch));
   data_arrival_cv_.notify_one();
@@ -438,6 +444,7 @@
       unique_lock<SpinLock> l(lock_);
       MarkErrorStatus(status, l);
     }
+    TRACE_TO(rpc_context->trace(), "Error unpacking request: $0", status.GetDetail());
     DataStreamService::RespondRpc(status, response, rpc_context);
     return;
   }
@@ -453,6 +460,7 @@
     // responded to if we reach here.
     DCHECK_GT(num_remaining_senders_, 0);
     if (UNLIKELY(is_cancelled_)) {
+      TRACE_TO(rpc_context->trace(), "Receiver was cancelled");
       Status cancel_status = Status::Expected(TErrorCode::DATASTREAM_RECVR_CLOSED,
           PrintId(recvr_->fragment_instance_id()), recvr_->dest_node_id());
       DataStreamService::RespondRpc(cancel_status, response, rpc_context);
@@ -473,7 +481,7 @@
     }
 
     // At this point, we are committed to inserting the row batch into 'batch_queue_'.
-    status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+    status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l, rpc_context);
   }
 
   // Respond to the sender to ack the insertion of the row batches.
@@ -495,6 +503,7 @@
 
     // Try enqueuing the first entry into 'batch_queue_'.
     ctx.swap(deferred_rpcs_.front());
+    TRACE_TO(ctx->rpc_context->trace(), "Processing deferred RPC");
     kudu::Slice tuple_offsets;
     kudu::Slice tuple_data;
     int64_t batch_size;
@@ -502,6 +511,8 @@
         &tuple_data, &batch_size);
     // Reply with error status if the entry cannot be unpacked.
     if (UNLIKELY(!status.ok())) {
+      TRACE_TO(ctx->rpc_context->trace(),
+          "Error unpacking deferred RPC: $0", status.GetDetail());
       MarkErrorStatus(status, l);
       DataStreamService::RespondAndReleaseRpc(status, ctx->response, ctx->rpc_context,
           recvr_->deferred_rpc_tracker());
@@ -512,6 +523,7 @@
     // Stops if inserting the batch causes us to go over the limit.
     // Put 'ctx' back on the queue.
     if (!CanEnqueue(batch_size, l)) {
+      TRACE_TO(ctx->rpc_context->trace(), "Batch queue is full");
       ctx.swap(deferred_rpcs_.front());
       DCHECK(deferred_rpcs_.front().get() != nullptr);
       return;
@@ -520,7 +532,8 @@
     // Dequeues the deferred batch and adds it to 'batch_queue_'.
     DequeueDeferredRpc(l);
     const RowBatchHeaderPB& header = ctx->request->row_batch_header();
-    status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+    status = AddBatchWork(
+        batch_size, header, tuple_offsets, tuple_data, &l, ctx->rpc_context);
     DCHECK(!status.ok() || !batch_queue_.empty());
 
     // Release to MemTracker while still holding the lock to prevent race with Close().
@@ -559,6 +572,7 @@
   {
     unique_lock<SpinLock> l(lock_);
     if (UNLIKELY(is_cancelled_)) {
+      TRACE_TO(ctx->rpc_context->trace(), "Recvr closed");
       Status cancel_status = Status::Expected(TErrorCode::DATASTREAM_RECVR_CLOSED,
           PrintId(recvr_->fragment_instance_id()), recvr_->dest_node_id());
       DataStreamService::RespondRpc(cancel_status, ctx->response, ctx->rpc_context);
diff --git a/tests/custom_cluster/test_exchange_deferred_batches.py b/tests/custom_cluster/test_exchange_deferred_batches.py
index 31e722e..31cd78e 100644
--- a/tests/custom_cluster/test_exchange_deferred_batches.py
+++ b/tests/custom_cluster/test_exchange_deferred_batches.py
@@ -37,7 +37,8 @@
     @CustomClusterTestSuite.with_args(
         "--stress_datastream_recvr_delay_ms=3000"
         + " --exchg_node_buffer_size_bytes=1024"
-        + " --datastream_service_num_deserialization_threads=1")
+        + " --datastream_service_num_deserialization_threads=1"
+        + " --impala_slow_rpc_threshold_ms=500")
     def test_exchange_small_buffer(self, vector):
         """Exercise the code which handles deferred row batches. In particular,
         the exchange buffer is set to a small value to cause incoming row batches
diff --git a/tests/custom_cluster/test_exchange_delays.py b/tests/custom_cluster/test_exchange_delays.py
index 38dfde1..1df77b3 100644
--- a/tests/custom_cluster/test_exchange_delays.py
+++ b/tests/custom_cluster/test_exchange_delays.py
@@ -39,7 +39,8 @@
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       "--stress_datastream_recvr_delay_ms={0}".format(DELAY_MS)
-      + " --datastream_sender_timeout_ms=5000")
+      + " --datastream_sender_timeout_ms=5000"
+      + " --impala_slow_rpc_threshold_ms=500")
   def test_exchange_small_delay(self, vector):
     """Test delays in registering data stream receivers where the first one or two
     batches will time out before the receiver registers, but subsequent batches will
@@ -51,7 +52,8 @@
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       "--stress_datastream_recvr_delay_ms={0}".format(DELAY_MS)
-      + " --datastream_sender_timeout_ms=1")
+      + " --datastream_sender_timeout_ms=1"
+      + " --impala_slow_rpc_threshold_ms=500")
   def test_exchange_large_delay(self, vector):
     """Test delays in registering data stream receivers where all of the batches sent
     will time out before the receiver registers. Before IMPALA-2987, this scenario
@@ -72,7 +74,8 @@
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       "--stress_datastream_recvr_delay_ms={0}".format(DELAY_MS)
-      + " --datastream_sender_timeout_ms=1")
+      + " --datastream_sender_timeout_ms=1"
+      + " --impala_slow_rpc_threshold_ms=500")
   def test_exchange_large_delay_zero_rows(self, vector):
     """Test the special case when no batches are sent and the EOS message times out."""
     self.run_test_case('QueryTest/exchange-delays-zero-rows', vector)