Optimize transport layer (#165)
* always addTimerCallback for avoid memory leak when pass NULL cbw and not response.
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index cd7fbfa..0207bbe 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -242,13 +242,12 @@
responseFuture->setRequestCommand(request);
addResponseFuture(opaque, responseFuture);
- if (callback) {
- boost::asio::deadline_timer* t =
- new boost::asio::deadline_timer(m_timerService, boost::posix_time::milliseconds(timeoutMillis));
- addTimerCallback(t, opaque);
- t->async_wait(
- boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque));
- }
+ // timeout monitor
+ boost::asio::deadline_timer* t =
+ new boost::asio::deadline_timer(m_timerService, boost::posix_time::milliseconds(timeoutMillis));
+ addTimerCallback(t, opaque);
+ t->async_wait(
+ boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque));
// even if send failed, asyncTimerThread will trigger next pull request or report send msg failed
if (SendCommand(pTcp, request)) {
diff --git a/src/transport/TcpTransport.cpp b/src/transport/TcpTransport.cpp
index 884304c..641c516 100644
--- a/src/transport/TcpTransport.cpp
+++ b/src/transport/TcpTransport.cpp
@@ -67,9 +67,10 @@
}
TcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillis) {
- std::unique_lock<std::mutex> eventLock(m_connectEventLock);
if (m_tcpConnectStatus == TCP_CONNECT_STATUS_WAIT) {
- if (m_connectEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis)) == std::cv_status::timeout) {
+ std::unique_lock<std::mutex> eventLock(m_connectEventLock);
+ if (!m_connectEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis),
+ [&] { return m_tcpConnectStatus != TCP_CONNECT_STATUS_WAIT; })) {
LOG_INFO("connect timeout");
}
}
@@ -80,7 +81,7 @@
void TcpTransport::setTcpConnectEvent(TcpConnectStatus connectStatus) {
TcpConnectStatus baseStatus = m_tcpConnectStatus.exchange(connectStatus, std::memory_order_relaxed);
if (baseStatus == TCP_CONNECT_STATUS_WAIT) {
- std::unique_lock<std::mutex> eventLock(m_connectEventLock);
+ // awake waiting thread
m_connectEvent.notify_all();
}
}