[rpc] Expose KRPC call_id to client application
KRPC assign call_id number for each RPC call at Request header. This
call_id then sent back in the Response and allows to match it to the
original Request. KRPC client, such as Impala, can utilize this call_id
for tracing or logging slow RPC.
This patch expose call_id through RpcController and RpcContext class for
such purpose.
Testing:
- Add TestCallId in rpc-test.cc.
- Add call_id check in CalculatorService::Add().
- Run and pass rpc-test.
Change-Id: If20114ef2b416ed9b39277e90639a6277b226fbb
Reviewed-on: http://gerrit.cloudera.org:8080/17866
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 602fa6e..c32423e 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -261,6 +261,7 @@
void Add(const AddRequestPB *req, AddResponsePB *resp, RpcContext *context) override {
CHECK_GT(context->GetTransferSize(), 0);
+ CHECK_GE(context->call_id(), 0);
resp->set_result(req->x() + req->y());
context->RespondSuccess();
}
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 7a3c657..bd7adf8 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -1580,6 +1580,32 @@
}
}
+// Test that call_id is returned in call response and accessible through RpcController.
+TEST_P(TestRpc, TestCallId) {
+ // Set up server.
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
+
+ // Set up client.
+ shared_ptr<Messenger> client_messenger;
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
+ GenericCalculatorService::static_service_name());
+
+ for (int i = 0; i < 10; i++) {
+ AddRequestPB req;
+ req.set_x(rand());
+ req.set_y(rand());
+ RpcController controller;
+ controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+
+ AddResponsePB resp;
+ ASSERT_OK(p.SyncRequest(GenericCalculatorService::kAddMethodName,
+ req, &resp, &controller));
+ ASSERT_EQ(req.x() + req.y(), resp.result());
+ ASSERT_EQ(i, controller.call_id());
+ }
+}
} // namespace rpc
} // namespace kudu
diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc
index 36c6ac3..1c5e67f 100644
--- a/src/kudu/rpc/rpc_context.cc
+++ b/src/kudu/rpc/rpc_context.cc
@@ -143,6 +143,10 @@
return call_->header().has_request_id() ? &call_->header().request_id() : nullptr;
}
+int32_t RpcContext::call_id() const {
+ return call_->call_id();
+}
+
size_t RpcContext::GetTransferSize() const {
return call_->GetTransferSize();
}
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index dfffdc0..6b68ad7 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -16,8 +16,10 @@
// under the License.
#pragma once
-#include <memory>
#include <stddef.h>
+#include <stdint.h>
+
+#include <memory>
#include <string>
#include <glog/logging.h>
@@ -226,6 +228,9 @@
// Returns this call's request id, if it is set.
const rpc::RequestIdPB* request_id() const;
+ // Returns this call's call_id.
+ int32_t call_id() const;
+
// Returns the size of the transfer buffer that backs 'call_'. If the
// transfer buffer no longer exists (e.g. GetTransferSize() is called after
// DiscardTransfer()), returns 0.
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index 52de3cb..d0e07dd 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -103,6 +103,10 @@
return nullptr;
}
+int32_t RpcController::call_id() const {
+ return call_->call_response_->call_id();
+}
+
Status RpcController::GetInboundSidecar(int idx, Slice* sidecar) const {
return call_->call_response_->GetSidecar(idx, sidecar);
}
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index bcdabf5..a8c0af5 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -209,6 +209,12 @@
credentials_policy_ = policy;
}
+ // Returns the call_id of this RPC call.
+ // Should only be called after the call's Response has been received (that is, when
+ // Connection::HandleCallResponse() has been executed over 'call_'), but the
+ // controller has not been Reset().
+ int32_t call_id() const;
+
// Fills the 'sidecar' parameter with the slice pointing to the i-th
// sidecar upon success.
//