[ISSUE #137] split TcpRemotingClient::m_ioService into m_dispatchService and m_handleService
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 557bf32..c114ecd 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -29,17 +29,29 @@
//<!************************************************************************
TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
- : m_pullThreadNum(pullThreadNum),
+ : m_dispatchThreadNum(1),
+ m_pullThreadNum(pullThreadNum),
m_tcpConnectTimeout(tcpConnectTimeout),
m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout),
m_namesrvIndex(0),
- m_ioServiceWork(m_ioService) {
+ m_dispatchServiceWork(m_dispatchService),
+ m_handleServiceWork(m_handleService) {
#if !defined(WIN32) && !defined(__APPLE__)
string taskName = UtilAll::getProcessName();
+ prctl(PR_SET_NAME, "DispatchTP", 0, 0, 0);
+#endif
+ for (int i = 0; i != m_dispatchThreadNum; ++i) {
+ m_dispatchThreadPool.create_thread(boost::bind(&boost::asio::io_service::run, &m_dispatchService));
+ }
+#if !defined(WIN32) && !defined(__APPLE__)
+ prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
+#endif
+
+#if !defined(WIN32) && !defined(__APPLE__)
prctl(PR_SET_NAME, "NetworkTP", 0, 0, 0);
#endif
for (int i = 0; i != m_pullThreadNum; ++i) {
- m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService));
+ m_handleThreadPool.create_thread(boost::bind(&boost::asio::io_service::run, &m_handleService));
}
#if !defined(WIN32) && !defined(__APPLE__)
prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
@@ -48,7 +60,7 @@
LOG_INFO("m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, m_pullThreadNum:%d", m_tcpConnectTimeout,
m_tcpTransportTryLockTimeout, m_pullThreadNum);
- m_async_service_thread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
+ m_timerServiceThread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
}
void TcpRemotingClient::boost_asio_work() {
@@ -59,9 +71,9 @@
#endif
// avoid async io service stops after first timer timeout callback
- boost::asio::io_service::work work(m_async_ioService);
+ boost::asio::io_service::work work(m_timerService);
- m_async_ioService.run();
+ m_timerService.run();
}
TcpRemotingClient::~TcpRemotingClient() {
@@ -75,20 +87,24 @@
void TcpRemotingClient::stopAllTcpTransportThread() {
LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
- m_async_ioService.stop();
- m_async_service_thread->interrupt();
- m_async_service_thread->join();
+ m_timerService.stop();
+ m_timerServiceThread->interrupt();
+ m_timerServiceThread->join();
removeAllTimerCallback();
{
+ std::lock_guard<std::timed_mutex> lock(m_tcpTableLock);
for (const auto& trans : m_tcpTable) {
trans.second->disconnect(trans.first);
}
m_tcpTable.clear();
}
- m_ioService.stop();
- m_threadpool.join_all();
+ m_handleService.stop();
+ m_handleThreadPool.join_all();
+
+ m_dispatchService.stop();
+ m_dispatchThreadPool.join_all();
{
std::lock_guard<std::mutex> lock(m_futureTableLock);
@@ -98,7 +114,7 @@
}
}
- LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
+ LOG_ERROR("TcpRemotingClient::stopAllTcpTransportThread End, m_tcpTable:%lu", m_tcpTable.size());
}
void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
@@ -226,13 +242,13 @@
if (callback) {
boost::asio::deadline_timer* t =
- new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMillis));
+ new boost::asio::deadline_timer(m_timerService, boost::posix_time::milliseconds(timeoutMillis));
addTimerCallback(t, opaque);
t->async_wait(
boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque));
}
- // Even if send failed, asyncTimerThread will trigger next pull request or report send msg failed
+ // even if send failed, asyncTimerThread will trigger next pull request or report send msg failed
if (SendCommand(pTcp, request)) {
LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque);
responseFuture->setSendRequestOK(true);
@@ -453,7 +469,7 @@
}
void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& addr) {
- m_ioService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr));
+ m_dispatchService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr));
}
void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) {
@@ -482,7 +498,7 @@
LOG_DEBUG("find_response opaque:%d", opaque);
processResponseCommand(pRespondCmd, pFuture);
} else {
- processRequestCommand(pRespondCmd, addr);
+ m_handleService.post(boost::bind(&TcpRemotingClient::processRequestCommand, this, pRespondCmd, addr));
}
}
@@ -503,7 +519,8 @@
if (pFuture->getAsyncFlag()) {
cancelTimerCallback(opaque);
- pFuture->invokeCompleteCallback();
+
+ m_handleService.post(boost::bind(&ResponseFuture::invokeCompleteCallback, pFuture));
}
}
@@ -520,7 +537,7 @@
LOG_ERROR("no response got for opaque:%d", opaque);
eraseTimerCallback(opaque);
if (pFuture->getAsyncCallbackWrap()) {
- pFuture->invokeExceptionCallback();
+ m_handleService.post(boost::bind(&ResponseFuture::invokeExceptionCallback, pFuture));
}
}
}
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index 52e400a..ad73cd2 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -109,6 +109,7 @@
AsyncTimerMap m_asyncTimerTable;
std::mutex m_asyncTimerTableLock;
+ int m_dispatchThreadNum;
int m_pullThreadNum;
uint64_t m_tcpConnectTimeout; // ms
uint64_t m_tcpTransportTryLockTimeout; // s
@@ -119,12 +120,16 @@
string m_namesrvAddrChoosed;
unsigned int m_namesrvIndex;
- boost::asio::io_service m_ioService;
- boost::asio::io_service::work m_ioServiceWork;
- boost::thread_group m_threadpool;
+ boost::asio::io_service m_dispatchService;
+ boost::asio::io_service::work m_dispatchServiceWork;
+ boost::thread_group m_dispatchThreadPool;
- boost::asio::io_service m_async_ioService;
- unique_ptr<boost::thread> m_async_service_thread;
+ boost::asio::io_service m_handleService;
+ boost::asio::io_service::work m_handleServiceWork;
+ boost::thread_group m_handleThreadPool;
+
+ boost::asio::io_service m_timerService;
+ unique_ptr<boost::thread> m_timerServiceThread;
};
//<!************************************************************************