IMPALA-9128: part 2: dump traces for slow RPCs
This adds trace events for data stream RPCs and
dumps them when they take longer than
--impala_slow_rpc_threshold_ms.
I needed to modify the KRPC code to do this because it
currently only dumps traces for RPCs with deadlines.
I plan to add some version of this upstream in Kudu
so that we don't diverge our KRPC implementation.
Example output from test_exchange_small_buffer:
I1111 08:38:53.732910 26509 rpcz_store.cc:265] Call impala.DataStreamService.TransmitData from 127.0.0.1:42434 (request call id 43) took 7799ms. Request Metrics: {}
I1111 08:38:53.732928 26509 rpcz_store.cc:269] Trace:
1111 08:38:45.933412 (+ 0us) impala-service-pool.cc:167] Inserting onto call queue
1111 08:38:45.933449 (+ 37us) impala-service-pool.cc:254] Handling call
1111 08:38:45.933470 (+ 21us) krpc-data-stream-mgr.cc:227] Added early sender
1111 08:38:47.906542 (+1973072us) krpc-data-stream-recvr.cc:327] Enqueuing deferred RPC
1111 08:38:53.732858 (+5826316us) krpc-data-stream-recvr.cc:506] Processing deferred RPC
1111 08:38:53.732860 (+ 2us) krpc-data-stream-recvr.cc:399] Deserializing batch
1111 08:38:53.732888 (+ 28us) krpc-data-stream-recvr.cc:426] Enqueuing deserialized batch
1111 08:38:53.732895 (+ 7us) inbound_call.cc:162] Queueing success response
Disabled +-clang-diagnostic-gnu-zero-variadic-macro-arguments because it
had false positives on the TRACE_TO invocations.
Testing:
* Ran exhaustive and ASAN tests
* Ran stress test
Change-Id: Ic7af4b45c43ec731d742d3696112c5f800849947
Reviewed-on: http://gerrit.cloudera.org:8080/14668
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/.clang-tidy b/.clang-tidy
index a99d528..faf4b7b 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -44,6 +44,7 @@
-clang-diagnostic-float-equal,\
-clang-diagnostic-global-constructors,\
-clang-diagnostic-gnu-anonymous-struct,\
+-clang-diagnostic-gnu-zero-variadic-macro-arguments,\
-clang-diagnostic-header-hygiene,\
-clang-diagnostic-implicit-fallthrough,\
-clang-diagnostic-missing-prototypes,\
diff --git a/be/src/kudu/rpc/rpcz_store.cc b/be/src/kudu/rpc/rpcz_store.cc
index 2f0e9c8..fd4c644 100644
--- a/be/src/kudu/rpc/rpcz_store.cc
+++ b/be/src/kudu/rpc/rpcz_store.cc
@@ -263,7 +263,11 @@
call->trace()->Dump(&LOG(INFO), true);
} else if (duration_ms > FLAGS_rpc_duration_too_long_ms) {
LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. "
- << "Request Metrics: " << call->trace()->MetricsAsJSON();
+ << "Request Metrics: " << call->trace()->MetricsAsJSON() << "\n";
+ string s = call->trace()->DumpToString();
+ if (!s.empty()) {
+ LOG(INFO) << "Trace:\n" << s;
+ }
}
}
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 855d6d0..89cea22 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -24,6 +24,7 @@
#include "kudu/rpc/rpc_context.h"
#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/trace.h"
#include "exec/kudu-util.h"
#include "runtime/exec-env.h"
@@ -223,10 +224,12 @@
// closed_stream_cache_), the sender is timed out by the maintenance thread.
if (!already_unregistered && recvr == nullptr) {
AddEarlySender(finst_id, request, response, rpc_context);
+ TRACE_TO(rpc_context->trace(), "Added early sender");
return;
}
}
if (already_unregistered) {
+ TRACE_TO(rpc_context->trace(), "Sender already unregistered");
// The receiver may remove itself from the receiver map via DeregisterRecvr() at any
// time without considering the remaining number of senders. As a consequence,
// FindRecvr() may return nullptr even though the receiver was once present. We
@@ -282,13 +285,19 @@
// rows if no rows are materialized at all in the sender side.
if (!already_unregistered && recvr == nullptr) {
AddEarlyClosedSender(finst_id, request, response, rpc_context);
+ TRACE_TO(rpc_context->trace(), "Added early closed sender");
return;
}
}
// If we reach this point, either the receiver is found or it has been unregistered
// already. In either cases, it's safe to just return an OK status.
- if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
+ TRACE_TO(
+ rpc_context->trace(), "Found receiver? $0", recvr != nullptr ? "true" : "false");
+ if (LIKELY(recvr != nullptr)) {
+ recvr->RemoveSender(request->sender_id());
+ TRACE_TO(rpc_context->trace(), "Removed sender from receiver");
+ }
DataStreamService::RespondAndReleaseRpc(Status::OK(), response, rpc_context,
service_mem_tracker_);
}
@@ -348,6 +357,7 @@
template<typename ContextType, typename RequestPBType>
void KrpcDataStreamMgr::RespondToTimedOutSender(const std::unique_ptr<ContextType>& ctx) {
+ TRACE_TO(ctx->rpc_context->trace(), "Timed out sender");
const RequestPBType* request = ctx->request;
TUniqueId finst_id;
finst_id.__set_lo(request->dest_fragment_instance_id().lo());
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 2187c85..b098a5e 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -26,6 +26,7 @@
#include "exec/kudu-util.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/trace.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/krpc-data-stream-recvr.h"
#include "runtime/krpc-data-stream-mgr.h"
@@ -151,7 +152,7 @@
// failed. Returns OK otherwise.
Status AddBatchWork(int64_t batch_size, const RowBatchHeaderPB& header,
const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
- unique_lock<SpinLock>* lock) WARN_UNUSED_RESULT;
+ unique_lock<SpinLock>* lock, RpcContext* rpc_context) WARN_UNUSED_RESULT;
// Receiver of which this queue is a member.
KrpcDataStreamRecvr* recvr_;
@@ -323,6 +324,7 @@
void KrpcDataStreamRecvr::SenderQueue::EnqueueDeferredRpc(
unique_ptr<TransmitDataCtx> payload, const unique_lock<SpinLock>& lock) {
DCHECK(lock.owns_lock());
+ TRACE_TO(payload->rpc_context->trace(), "Enqueuing deferred RPC");
if (deferred_rpcs_.empty()) has_deferred_rpcs_start_time_ns_ = MonotonicNanos();
deferred_rpcs_.push(move(payload));
recvr_->num_deferred_rpcs_.Add(1);
@@ -379,7 +381,8 @@
Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
const RowBatchHeaderPB& header, const kudu::Slice& tuple_offsets,
- const kudu::Slice& tuple_data, unique_lock<SpinLock>* lock) {
+ const kudu::Slice& tuple_data, unique_lock<SpinLock>* lock,
+ RpcContext* rpc_context) {
DCHECK(lock != nullptr);
DCHECK(lock->owns_lock());
DCHECK(!is_cancelled_);
@@ -393,6 +396,7 @@
// Deserialization may take some time due to compression and memory allocation.
// Drop the lock so we can deserialize multiple batches in parallel.
lock->unlock();
+ TRACE_TO(rpc_context->trace(), "Deserializing batch");
unique_ptr<RowBatch> batch;
Status status;
{
@@ -414,10 +418,12 @@
recvr_->num_buffered_bytes_.Add(-batch_size);
VLOG_QUERY << "Failed to deserialize batch for "
<< PrintId(recvr_->fragment_instance_id());
+ TRACE_TO(rpc_context->trace(), "Failed to deserialize batch: $0", status.GetDetail());
MarkErrorStatus(status, *lock);
return status;
}
VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
+ TRACE_TO(rpc_context->trace(), "Enqueuing deserialized batch");
COUNTER_ADD(recvr_->total_enqueued_batches_counter_, 1);
batch_queue_.emplace_back(batch_size, move(batch));
data_arrival_cv_.notify_one();
@@ -438,6 +444,7 @@
unique_lock<SpinLock> l(lock_);
MarkErrorStatus(status, l);
}
+ TRACE_TO(rpc_context->trace(), "Error unpacking request: $0", status.GetDetail());
DataStreamService::RespondRpc(status, response, rpc_context);
return;
}
@@ -453,6 +460,7 @@
// responded to if we reach here.
DCHECK_GT(num_remaining_senders_, 0);
if (UNLIKELY(is_cancelled_)) {
+ TRACE_TO(rpc_context->trace(), "Receiver was cancelled");
Status cancel_status = Status::Expected(TErrorCode::DATASTREAM_RECVR_CLOSED,
PrintId(recvr_->fragment_instance_id()), recvr_->dest_node_id());
DataStreamService::RespondRpc(cancel_status, response, rpc_context);
@@ -473,7 +481,7 @@
}
// At this point, we are committed to inserting the row batch into 'batch_queue_'.
- status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+ status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l, rpc_context);
}
// Respond to the sender to ack the insertion of the row batches.
@@ -495,6 +503,7 @@
// Try enqueuing the first entry into 'batch_queue_'.
ctx.swap(deferred_rpcs_.front());
+ TRACE_TO(ctx->rpc_context->trace(), "Processing deferred RPC");
kudu::Slice tuple_offsets;
kudu::Slice tuple_data;
int64_t batch_size;
@@ -502,6 +511,8 @@
&tuple_data, &batch_size);
// Reply with error status if the entry cannot be unpacked.
if (UNLIKELY(!status.ok())) {
+ TRACE_TO(ctx->rpc_context->trace(),
+ "Error unpacking deferred RPC: $0", status.GetDetail());
MarkErrorStatus(status, l);
DataStreamService::RespondAndReleaseRpc(status, ctx->response, ctx->rpc_context,
recvr_->deferred_rpc_tracker());
@@ -512,6 +523,7 @@
// Stops if inserting the batch causes us to go over the limit.
// Put 'ctx' back on the queue.
if (!CanEnqueue(batch_size, l)) {
+ TRACE_TO(ctx->rpc_context->trace(), "Batch queue is full");
ctx.swap(deferred_rpcs_.front());
DCHECK(deferred_rpcs_.front().get() != nullptr);
return;
@@ -520,7 +532,8 @@
// Dequeues the deferred batch and adds it to 'batch_queue_'.
DequeueDeferredRpc(l);
const RowBatchHeaderPB& header = ctx->request->row_batch_header();
- status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+ status = AddBatchWork(
+ batch_size, header, tuple_offsets, tuple_data, &l, ctx->rpc_context);
DCHECK(!status.ok() || !batch_queue_.empty());
// Release to MemTracker while still holding the lock to prevent race with Close().
@@ -559,6 +572,7 @@
{
unique_lock<SpinLock> l(lock_);
if (UNLIKELY(is_cancelled_)) {
+ TRACE_TO(ctx->rpc_context->trace(), "Recvr closed");
Status cancel_status = Status::Expected(TErrorCode::DATASTREAM_RECVR_CLOSED,
PrintId(recvr_->fragment_instance_id()), recvr_->dest_node_id());
DataStreamService::RespondRpc(cancel_status, ctx->response, ctx->rpc_context);
diff --git a/tests/custom_cluster/test_exchange_deferred_batches.py b/tests/custom_cluster/test_exchange_deferred_batches.py
index 31e722e..31cd78e 100644
--- a/tests/custom_cluster/test_exchange_deferred_batches.py
+++ b/tests/custom_cluster/test_exchange_deferred_batches.py
@@ -37,7 +37,8 @@
@CustomClusterTestSuite.with_args(
"--stress_datastream_recvr_delay_ms=3000"
+ " --exchg_node_buffer_size_bytes=1024"
- + " --datastream_service_num_deserialization_threads=1")
+ + " --datastream_service_num_deserialization_threads=1"
+ + " --impala_slow_rpc_threshold_ms=500")
def test_exchange_small_buffer(self, vector):
"""Exercise the code which handles deferred row batches. In particular,
the exchange buffer is set to a small value to cause incoming row batches
diff --git a/tests/custom_cluster/test_exchange_delays.py b/tests/custom_cluster/test_exchange_delays.py
index 38dfde1..1df77b3 100644
--- a/tests/custom_cluster/test_exchange_delays.py
+++ b/tests/custom_cluster/test_exchange_delays.py
@@ -39,7 +39,8 @@
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
"--stress_datastream_recvr_delay_ms={0}".format(DELAY_MS)
- + " --datastream_sender_timeout_ms=5000")
+ + " --datastream_sender_timeout_ms=5000"
+ + " --impala_slow_rpc_threshold_ms=500")
def test_exchange_small_delay(self, vector):
"""Test delays in registering data stream receivers where the first one or two
batches will time out before the receiver registers, but subsequent batches will
@@ -51,7 +52,8 @@
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
"--stress_datastream_recvr_delay_ms={0}".format(DELAY_MS)
- + " --datastream_sender_timeout_ms=1")
+ + " --datastream_sender_timeout_ms=1"
+ + " --impala_slow_rpc_threshold_ms=500")
def test_exchange_large_delay(self, vector):
"""Test delays in registering data stream receivers where all of the batches sent
will time out before the receiver registers. Before IMPALA-2987, this scenario
@@ -72,7 +74,8 @@
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
"--stress_datastream_recvr_delay_ms={0}".format(DELAY_MS)
- + " --datastream_sender_timeout_ms=1")
+ + " --datastream_sender_timeout_ms=1"
+ + " --impala_slow_rpc_threshold_ms=500")
def test_exchange_large_delay_zero_rows(self, vector):
"""Test the special case when no batches are sent and the EOS message times out."""
self.run_test_case('QueryTest/exchange-delays-zero-rows', vector)