GEODE-6718: Refactor TcrConnection::sendRequestForChunkedResponse, derived class, and remove pass-through calls in TrcConnection and derived (#479)
* Extract switch block in sendRequestForChunkedResponse into separate method
* Refactor check for empty result set into predicate method
* Remove pass-through method sendRequestForChunkedResponse, which was serving no purpose.
* Formatting changes.
* Move new methods below caller
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 2089580..c046096 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -695,36 +695,54 @@
const TcrMessage& request, size_t len, TcrMessageReply& reply,
std::chrono::microseconds sendTimeoutSec,
std::chrono::microseconds receiveTimeoutSec) {
- auto msgType = request.getMessageType();
- switch (msgType) {
- case TcrMessage::QUERY:
- case TcrMessage::QUERY_WITH_PARAMETERS:
- case TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE:
- case TcrMessage::GETDURABLECQS_MSG_TYPE:
- case TcrMessage::EXECUTE_FUNCTION:
- case TcrMessage::EXECUTE_REGION_FUNCTION:
- case TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP: {
- receiveTimeoutSec = reply.getTimeout();
- sendTimeoutSec = reply.getTimeout();
- break;
- }
- default:
- break;
+ if (useReplyTimeout(request)) {
+ receiveTimeoutSec = reply.getTimeout();
+ sendTimeoutSec = reply.getTimeout();
}
- std::chrono::microseconds timeSpent{0};
- send(timeSpent, request.getMsgData(), len, sendTimeoutSec, true);
-
- if (timeSpent >= receiveTimeoutSec)
- throwException(
- TimeoutException("TcrConnection::send: connection timed out"));
-
- receiveTimeoutSec -= timeSpent;
+ receiveTimeoutSec -= sendWithTimeouts(request.getMsgData(), len,
+ sendTimeoutSec, receiveTimeoutSec);
// to help in decoding the reply based on what was the request type
- reply.setMessageTypeRequest(msgType);
+ reply.setMessageTypeRequest(request.getMessageType());
+
+ if (replyHasResult(request, reply)) {
+ readMessageChunked(reply, receiveTimeoutSec, true);
+ }
+}
+
+bool TcrConnection::useReplyTimeout(const TcrMessage& request) const {
+ auto messageType = request.getMessageType();
+ return ((messageType == TcrMessage::QUERY) ||
+ (messageType == TcrMessage::QUERY_WITH_PARAMETERS) ||
+ (messageType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) ||
+ (messageType == TcrMessage::GETDURABLECQS_MSG_TYPE) ||
+ (messageType == TcrMessage::EXECUTE_FUNCTION) ||
+ (messageType == TcrMessage::EXECUTE_REGION_FUNCTION) ||
+ (messageType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP));
+}
+
+std::chrono::microseconds TcrConnection::sendWithTimeouts(
+ const char* data, size_t len, std::chrono::microseconds sendTimeout,
+ std::chrono::microseconds receiveTimeout) {
+ std::chrono::microseconds timeSpent{0};
+ send(timeSpent, data, len, sendTimeout, true);
+
+ if (timeSpent >= receiveTimeout) {
+ throwException(
+ TimeoutException("TcrConnection::send: connection timed out"));
+ }
+
+ return timeSpent;
+}
+
+bool TcrConnection::replyHasResult(const TcrMessage& request,
+ TcrMessageReply& reply) {
+ auto hasResult = true;
+
// no need of it now, this will not come here
- if (msgType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) {
+ if (request.getMessageType() ==
+ TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) {
ChunkedFunctionExecutionResponse* resultCollector =
static_cast<ChunkedFunctionExecutionResponse*>(
reply.getChunkedResultHandler());
@@ -732,10 +750,11 @@
LOGDEBUG(
"TcrConnection::sendRequestForChunkedResponse: function execution, "
"no response desired");
- return;
+ hasResult = false;
}
}
- readMessageChunked(reply, receiveTimeoutSec, true);
+
+ return hasResult;
}
void TcrConnection::send(const char* buffer, size_t len,
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index cd5bb21..b026062 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -417,6 +417,11 @@
volatile bool m_isBeingUsed;
std::atomic<uint32_t> m_isUsed;
ThinClientPoolDM* m_poolDM;
+ bool useReplyTimeout(const TcrMessage& request) const;
+ std::chrono::microseconds sendWithTimeouts(
+ const char* data, size_t len, std::chrono::microseconds sendTimeout,
+ std::chrono::microseconds receiveTimeout);
+ bool replyHasResult(const TcrMessage& request, TcrMessageReply& reply);
};
} // namespace client
} // namespace geode
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index 7b57e5d..812de37 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -793,7 +793,9 @@
if (((type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
(request.hasResult() & 2))) {
- sendRequestForChunkedResponse(request, reply, conn);
+ conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
+ request.getTimeout(),
+ reply.getTimeout());
} else if (type == TcrMessage::REGISTER_INTEREST_LIST ||
type == TcrMessage::REGISTER_INTEREST ||
type == TcrMessage::QUERY ||
@@ -824,7 +826,9 @@
type == TcrMessage::MONITORCQ_MSG_TYPE ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
type == TcrMessage::GETDURABLECQS_MSG_TYPE) {
- sendRequestForChunkedResponse(request, reply, conn);
+ conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
+ request.getTimeout(),
+ reply.getTimeout());
LOGDEBUG("sendRequestConn: calling sendRequestForChunkedResponse DONE");
} else {
// Chk request type to request if so request.getCallBackArg flag & setCall
@@ -1313,12 +1317,6 @@
return m_cacheImpl->getQueryService(true);
}
-void TcrEndpoint::sendRequestForChunkedResponse(const TcrMessage& request,
- TcrMessageReply& reply,
- TcrConnection* conn) {
- conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply);
-}
-
void TcrEndpoint::closeFailedConnection(TcrConnection*& conn) {
closeConnection(conn);
}
diff --git a/cppcache/src/TcrEndpoint.hpp b/cppcache/src/TcrEndpoint.hpp
index 4413529..f1cea69 100644
--- a/cppcache/src/TcrEndpoint.hpp
+++ b/cppcache/src/TcrEndpoint.hpp
@@ -210,9 +210,6 @@
virtual void processMarker();
virtual void triggerRedundancyThread();
virtual std::shared_ptr<QueryService> getQueryService();
- virtual void sendRequestForChunkedResponse(const TcrMessage& request,
- TcrMessageReply& reply,
- TcrConnection* conn);
virtual void closeFailedConnection(TcrConnection*& conn);
void closeConnection(TcrConnection*& conn);
virtual void handleNotificationStats(int64_t byteLength);
diff --git a/cppcache/src/TcrPoolEndPoint.cpp b/cppcache/src/TcrPoolEndPoint.cpp
index c3ff4c0..7ac81f4 100644
--- a/cppcache/src/TcrPoolEndPoint.cpp
+++ b/cppcache/src/TcrPoolEndPoint.cpp
@@ -40,12 +40,6 @@
std::shared_ptr<QueryService> TcrPoolEndPoint::getQueryService() {
return m_dm->getQueryServiceWithoutCheck();
}
-void TcrPoolEndPoint::sendRequestForChunkedResponse(const TcrMessage& request,
- TcrMessageReply& reply,
- TcrConnection* conn) {
- conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
- request.getTimeout(), reply.getTimeout());
-}
ThinClientPoolDM* TcrPoolEndPoint::getPoolHADM() { return m_dm; }
void TcrPoolEndPoint::triggerRedundancyThread() {
m_dm->triggerRedundancyThread();
diff --git a/cppcache/src/TcrPoolEndPoint.hpp b/cppcache/src/TcrPoolEndPoint.hpp
index ff99479..fc5fbc1 100644
--- a/cppcache/src/TcrPoolEndPoint.hpp
+++ b/cppcache/src/TcrPoolEndPoint.hpp
@@ -39,9 +39,6 @@
bool checkDupAndAdd(std::shared_ptr<EventId> eventid) override;
void processMarker() override;
std::shared_ptr<QueryService> getQueryService() override;
- void sendRequestForChunkedResponse(const TcrMessage& request,
- TcrMessageReply& reply,
- TcrConnection* conn) override;
void closeFailedConnection(TcrConnection*& conn) override;
GfErrType registerDM(bool clientNotification, bool isSecondary = false,
bool isActiveEndpoint = false,