| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "TcpRemotingClient.h" |
| #include <stddef.h> |
| #if !defined(WIN32) && !defined(__APPLE__) |
| #include <sys/prctl.h> |
| #endif |
| |
| #include "Logging.h" |
| #include "MemoryOutputStream.h" |
| #include "TopAddressing.h" |
| #include "UtilAll.h" |
| |
| namespace rocketmq { |
| |
| //<!************************************************************************ |
| TcpRemotingClient::TcpRemotingClient() |
| : m_dispatchServiceWork(m_dispatchService), m_handleServiceWork(m_handleService) {} |
| TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout) |
| : m_dispatchThreadNum(1), |
| m_pullThreadNum(pullThreadNum), |
| m_tcpConnectTimeout(tcpConnectTimeout), |
| m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout), |
| m_namesrvIndex(0), |
| m_dispatchServiceWork(m_dispatchService), |
| m_handleServiceWork(m_handleService) { |
| #if !defined(WIN32) && !defined(__APPLE__) |
| string taskName = UtilAll::getProcessName(); |
| prctl(PR_SET_NAME, "DispatchTP", 0, 0, 0); |
| #endif |
| for (int i = 0; i != m_dispatchThreadNum; ++i) { |
| m_dispatchThreadPool.create_thread(boost::bind(&boost::asio::io_service::run, &m_dispatchService)); |
| } |
| #if !defined(WIN32) && !defined(__APPLE__) |
| prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); |
| #endif |
| |
| #if !defined(WIN32) && !defined(__APPLE__) |
| prctl(PR_SET_NAME, "NetworkTP", 0, 0, 0); |
| #endif |
| for (int i = 0; i != m_pullThreadNum; ++i) { |
| m_handleThreadPool.create_thread(boost::bind(&boost::asio::io_service::run, &m_handleService)); |
| } |
| #if !defined(WIN32) && !defined(__APPLE__) |
| prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); |
| #endif |
| |
| LOG_INFO("m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, m_pullThreadNum:%d", m_tcpConnectTimeout, |
| m_tcpTransportTryLockTimeout, m_pullThreadNum); |
| |
| m_timerServiceThread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this))); |
| } |
| |
| void TcpRemotingClient::boost_asio_work() { |
| LOG_INFO("TcpRemotingClient::boost asio async service running"); |
| |
| #if !defined(WIN32) && !defined(__APPLE__) |
| prctl(PR_SET_NAME, "RemotingAsioT", 0, 0, 0); |
| #endif |
| |
| // avoid async io service stops after first timer timeout callback |
| boost::asio::io_service::work work(m_timerService); |
| |
| m_timerService.run(); |
| } |
| |
| TcpRemotingClient::~TcpRemotingClient() { |
| m_tcpTable.clear(); |
| m_futureTable.clear(); |
| m_namesrvAddrList.clear(); |
| removeAllTimerCallback(); |
| } |
| |
| void TcpRemotingClient::stopAllTcpTransportThread() { |
| LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin"); |
| |
| m_timerService.stop(); |
| m_timerServiceThread->interrupt(); |
| m_timerServiceThread->join(); |
| removeAllTimerCallback(); |
| |
| { |
| std::lock_guard<std::timed_mutex> lock(m_tcpTableLock); |
| for (const auto& trans : m_tcpTable) { |
| trans.second->disconnect(trans.first); |
| } |
| m_tcpTable.clear(); |
| } |
| |
| m_handleService.stop(); |
| m_handleThreadPool.join_all(); |
| |
| m_dispatchService.stop(); |
| m_dispatchThreadPool.join_all(); |
| |
| { |
| std::lock_guard<std::mutex> lock(m_futureTableLock); |
| for (const auto& future : m_futureTable) { |
| if (future.second) { |
| if (!future.second->getAsyncFlag()) { |
| future.second->releaseThreadCondition(); |
| } |
| } |
| } |
| } |
| |
| LOG_ERROR("TcpRemotingClient::stopAllTcpTransportThread End, m_tcpTable:%lu", m_tcpTable.size()); |
| } |
| |
| void TcpRemotingClient::updateNameServerAddressList(const string& addrs) { |
| LOG_INFO("updateNameServerAddressList: [%s]", addrs.c_str()); |
| |
| if (addrs.empty()) { |
| return; |
| } |
| |
| std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.try_lock_for(std::chrono::seconds(10))) { |
| LOG_ERROR("updateNameServerAddressList get timed_mutex timeout"); |
| return; |
| } |
| } |
| |
| // clear first; |
| m_namesrvAddrList.clear(); |
| |
| vector<string> out; |
| UtilAll::Split(out, addrs, ";"); |
| for (auto addr : out) { |
| UtilAll::Trim(addr); |
| |
| string hostName; |
| short portNumber; |
| if (UtilAll::SplitURL(addr, hostName, portNumber)) { |
| LOG_INFO("update Namesrv:%s", addr.c_str()); |
| m_namesrvAddrList.push_back(addr); |
| } else { |
| LOG_INFO("This may be invalid namer server: [%s]", addr.c_str()); |
| } |
| } |
| out.clear(); |
| } |
| |
| bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis) { |
| std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); |
| if (pTcp != nullptr) { |
| int code = request.getCode(); |
| int opaque = request.getOpaque(); |
| std::shared_ptr<AsyncCallbackWrap> cbw; |
| std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis, false, cbw)); |
| addResponseFuture(opaque, responseFuture); |
| |
| if (SendCommand(pTcp, request)) { |
| responseFuture->setSendRequestOK(true); |
| unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse()); |
| if (pRsp == nullptr) { |
| LOG_ERROR("wait response timeout of heartbeat, so closeTransport of addr:%s", addr.c_str()); |
| // avoid responseFuture leak; |
| findAndDeleteResponseFuture(opaque); |
| CloseTransport(addr, pTcp); |
| return false; |
| } else if (pRsp->getCode() == SUCCESS_VALUE) { |
| return true; |
| } else { |
| LOG_WARN("get error response:%d of heartbeat to addr:%s", pRsp->getCode(), addr.c_str()); |
| return false; |
| } |
| } else { |
| // avoid responseFuture leak; |
| findAndDeleteResponseFuture(opaque); |
| CloseTransport(addr, pTcp); |
| } |
| } |
| return false; |
| } |
| |
| RemotingCommand* TcpRemotingClient::invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis) { |
| LOG_DEBUG("InvokeSync:", addr.c_str()); |
| std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); |
| if (pTcp != nullptr) { |
| int code = request.getCode(); |
| int opaque = request.getOpaque(); |
| std::shared_ptr<AsyncCallbackWrap> cbw; |
| std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis, false, cbw)); |
| addResponseFuture(opaque, responseFuture); |
| |
| if (SendCommand(pTcp, request)) { |
| responseFuture->setSendRequestOK(true); |
| RemotingCommand* pRsp = responseFuture->waitResponse(); |
| if (pRsp == nullptr) { |
| if (code != GET_CONSUMER_LIST_BY_GROUP) { |
| LOG_WARN("wait response timeout or get NULL response of code:%d, so closeTransport of addr:%s", code, |
| addr.c_str()); |
| CloseTransport(addr, pTcp); |
| } |
| // avoid responseFuture leak; |
| findAndDeleteResponseFuture(opaque); |
| return nullptr; |
| } else { |
| return pRsp; |
| } |
| } else { |
| // avoid responseFuture leak; |
| findAndDeleteResponseFuture(opaque); |
| CloseTransport(addr, pTcp); |
| } |
| } |
| LOG_DEBUG("InvokeSync [%s] Failed: Cannot Get Transport.", addr.c_str()); |
| return nullptr; |
| } |
| |
| bool TcpRemotingClient::invokeAsync(const string& addr, |
| RemotingCommand& request, |
| std::shared_ptr<AsyncCallbackWrap> callback, |
| int64 timeoutMillis, |
| int maxRetrySendTimes, |
| int retrySendTimes) { |
| std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); |
| if (pTcp != nullptr) { |
| int code = request.getCode(); |
| int opaque = request.getOpaque(); |
| |
| // delete in callback |
| std::shared_ptr<ResponseFuture> responseFuture( |
| new ResponseFuture(code, opaque, this, timeoutMillis, true, callback)); |
| responseFuture->setMaxRetrySendTimes(maxRetrySendTimes); |
| responseFuture->setRetrySendTimes(retrySendTimes); |
| responseFuture->setBrokerAddr(addr); |
| responseFuture->setRequestCommand(request); |
| addResponseFuture(opaque, responseFuture); |
| |
| // timeout monitor |
| boost::asio::deadline_timer* t = |
| new boost::asio::deadline_timer(m_timerService, boost::posix_time::milliseconds(timeoutMillis)); |
| addTimerCallback(t, opaque); |
| t->async_wait( |
| boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque)); |
| |
| // even if send failed, asyncTimerThread will trigger next pull request or report send msg failed |
| if (SendCommand(pTcp, request)) { |
| LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque); |
| responseFuture->setSendRequestOK(true); |
| } |
| return true; |
| } |
| |
| LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str()); |
| return false; |
| } |
| |
| void TcpRemotingClient::invokeOneway(const string& addr, RemotingCommand& request) { |
| //<!not need callback; |
| std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); |
| if (pTcp != nullptr) { |
| request.markOnewayRPC(); |
| if (SendCommand(pTcp, request)) { |
| LOG_DEBUG("invokeOneway success. addr:%s, code:%d", addr.c_str(), request.getCode()); |
| } else { |
| LOG_WARN("invokeOneway failed. addr:%s, code:%d", addr.c_str(), request.getCode()); |
| } |
| } else { |
| LOG_WARN("invokeOneway failed: NULL transport. addr:%s, code:%d", addr.c_str(), request.getCode()); |
| } |
| } |
| |
| std::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string& addr, bool needResponse) { |
| if (addr.empty()) { |
| LOG_DEBUG("GetTransport of NameServer"); |
| return CreateNameServerTransport(needResponse); |
| } |
| return CreateTransport(addr, needResponse); |
| } |
| |
| std::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string& addr, bool needResponse) { |
| std::shared_ptr<TcpTransport> tts; |
| |
| { |
| // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking |
| // long time, if could not get m_tcpLock, return NULL |
| std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) { |
| LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str()); |
| std::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } |
| } |
| |
| // check for reuse |
| if (m_tcpTable.find(addr) != m_tcpTable.end()) { |
| std::shared_ptr<TcpTransport> tcp = m_tcpTable[addr]; |
| |
| if (tcp) { |
| TcpConnectStatus connectStatus = tcp->getTcpConnectStatus(); |
| if (connectStatus == TCP_CONNECT_STATUS_SUCCESS) { |
| return tcp; |
| } else if (connectStatus == TCP_CONNECT_STATUS_WAIT) { |
| std::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } else if (connectStatus == TCP_CONNECT_STATUS_FAILED) { |
| LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str()); |
| tcp->disconnect(addr); // avoid coredump when connection with broker was broken |
| m_tcpTable.erase(addr); |
| } else { |
| LOG_ERROR("go to fault state, erase:%s from tcpMap, and reconnect it", addr.c_str()); |
| m_tcpTable.erase(addr); |
| } |
| } |
| } |
| |
| //<!callback; |
| TcpTransportReadCallback callback = needResponse ? &TcpRemotingClient::static_messageReceived : nullptr; |
| |
| tts = TcpTransport::CreateTransport(this, callback); |
| TcpConnectStatus connectStatus = tts->connect(addr, 0); // use non-block |
| if (connectStatus != TCP_CONNECT_STATUS_WAIT) { |
| LOG_WARN("can not connect to:%s", addr.c_str()); |
| tts->disconnect(addr); |
| std::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } else { |
| // even if connecting failed finally, this server transport will be erased by next CreateTransport |
| m_tcpTable[addr] = tts; |
| } |
| } |
| |
| TcpConnectStatus connectStatus = tts->waitTcpConnectEvent(static_cast<int>(m_tcpConnectTimeout)); |
| if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) { |
| LOG_WARN("can not connect to server:%s", addr.c_str()); |
| tts->disconnect(addr); |
| std::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } else { |
| LOG_INFO("connect server with addr:%s success", addr.c_str()); |
| return tts; |
| } |
| } |
| |
| std::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameServerTransport(bool needResponse) { |
| // m_namesrvLock was added to avoid operation of nameServer was blocked by |
| // m_tcpLock, it was used by single Thread mostly, so no performance impact |
| // try get m_tcpLock until m_tcpTransportTryLockTimeout to avoid blocking long |
| // time, if could not get m_namesrvlock, return NULL |
| LOG_DEBUG("--CreateNameserverTransport--"); |
| std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) { |
| LOG_ERROR("CreateNameserverTransport get timed_mutex timeout"); |
| std::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } |
| } |
| |
| if (!m_namesrvAddrChoosed.empty()) { |
| std::shared_ptr<TcpTransport> pTcp = CreateTransport(m_namesrvAddrChoosed, true); |
| if (pTcp) |
| return pTcp; |
| else |
| m_namesrvAddrChoosed.clear(); |
| } |
| |
| for (unsigned i = 0; i < m_namesrvAddrList.size(); i++) { |
| unsigned int index = m_namesrvIndex++ % m_namesrvAddrList.size(); |
| LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT "", m_namesrvIndex, index, |
| m_namesrvAddrList.size()); |
| std::shared_ptr<TcpTransport> pTcp = CreateTransport(m_namesrvAddrList[index], true); |
| if (pTcp) { |
| m_namesrvAddrChoosed = m_namesrvAddrList[index]; |
| return pTcp; |
| } |
| } |
| |
| std::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } |
| |
| bool TcpRemotingClient::CloseTransport(const string& addr, std::shared_ptr<TcpTransport> pTcp) { |
| if (addr.empty()) { |
| return CloseNameServerTransport(pTcp); |
| } |
| |
| std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) { |
| LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str()); |
| return false; |
| } |
| } |
| |
| LOG_ERROR("CloseTransport of:%s", addr.c_str()); |
| |
| bool removeItemFromTable = true; |
| if (m_tcpTable.find(addr) != m_tcpTable.end()) { |
| if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) { |
| LOG_INFO("tcpTransport with addr:%s has been closed before, and has been created again, nothing to do", |
| addr.c_str()); |
| removeItemFromTable = false; |
| } |
| } else { |
| LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str()); |
| removeItemFromTable = false; |
| } |
| |
| if (removeItemFromTable) { |
| LOG_WARN("closeTransport: disconnect:%s with state:%d", addr.c_str(), m_tcpTable[addr]->getTcpConnectStatus()); |
| if (m_tcpTable[addr]->getTcpConnectStatus() == TCP_CONNECT_STATUS_SUCCESS) |
| m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken |
| LOG_WARN("closeTransport: erase broker: %s", addr.c_str()); |
| m_tcpTable.erase(addr); |
| } |
| |
| LOG_ERROR("CloseTransport of:%s end", addr.c_str()); |
| |
| return removeItemFromTable; |
| } |
| |
| bool TcpRemotingClient::CloseNameServerTransport(std::shared_ptr<TcpTransport> pTcp) { |
| std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) { |
| LOG_ERROR("CreateNameServerTransport get timed_mutex timeout"); |
| return false; |
| } |
| } |
| |
| string addr = m_namesrvAddrChoosed; |
| |
| bool removeItemFromTable = CloseTransport(addr, pTcp); |
| if (removeItemFromTable) { |
| m_namesrvAddrChoosed.clear(); |
| } |
| |
| return removeItemFromTable; |
| } |
| |
| bool TcpRemotingClient::SendCommand(std::shared_ptr<TcpTransport> pTts, RemotingCommand& msg) { |
| const MemoryBlock* pHead = msg.GetHead(); |
| const MemoryBlock* pBody = msg.GetBody(); |
| |
| unique_ptr<MemoryOutputStream> buffer(new MemoryOutputStream(1024)); |
| if (pHead->getSize() > 0) { |
| buffer->write(pHead->getData(), static_cast<size_t>(pHead->getSize())); |
| } |
| if (pBody->getSize() > 0) { |
| buffer->write(pBody->getData(), static_cast<size_t>(pBody->getSize())); |
| } |
| |
| const char* pData = static_cast<const char*>(buffer->getData()); |
| size_t len = buffer->getDataSize(); |
| return pTts->sendMessage(pData, len); |
| } |
| |
| void TcpRemotingClient::static_messageReceived(void* context, const MemoryBlock& mem, const string& addr) { |
| auto* pTcpRemotingClient = reinterpret_cast<TcpRemotingClient*>(context); |
| if (pTcpRemotingClient) |
| pTcpRemotingClient->messageReceived(mem, addr); |
| } |
| |
| void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& addr) { |
| m_dispatchService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr)); |
| } |
| |
| void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) { |
| RemotingCommand* pRespondCmd = nullptr; |
| try { |
| pRespondCmd = RemotingCommand::Decode(mem); |
| } catch (...) { |
| LOG_ERROR("processData error"); |
| return; |
| } |
| |
| int opaque = pRespondCmd->getOpaque(); |
| |
| //<!process self; |
| if (pRespondCmd->isResponseType()) { |
| std::shared_ptr<ResponseFuture> pFuture = findAndDeleteResponseFuture(opaque); |
| if (!pFuture) { |
| LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", opaque); |
| deleteAndZero(pRespondCmd); |
| return; |
| } |
| |
| LOG_DEBUG("find_response opaque:%d", opaque); |
| processResponseCommand(pRespondCmd, pFuture); |
| } else { |
| m_handleService.post(boost::bind(&TcpRemotingClient::processRequestCommand, this, pRespondCmd, addr)); |
| } |
| } |
| |
| void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, std::shared_ptr<ResponseFuture> pFuture) { |
| int code = pFuture->getRequestCode(); |
| pCmd->SetExtHeader(code); // set head, for response use |
| |
| int opaque = pCmd->getOpaque(); |
| LOG_DEBUG("processResponseCommand, code:%d, opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque, |
| pFuture->getMaxRetrySendTimes(), pFuture->getRetrySendTimes()); |
| |
| if (!pFuture->setResponse(pCmd)) { |
| // this branch is unreachable normally. |
| LOG_WARN("response already timeout of opaque:%d", opaque); |
| deleteAndZero(pCmd); |
| return; |
| } |
| |
| if (pFuture->getAsyncFlag()) { |
| cancelTimerCallback(opaque); |
| |
| m_handleService.post(boost::bind(&ResponseFuture::invokeCompleteCallback, pFuture)); |
| } |
| } |
| |
| void TcpRemotingClient::handleAsyncRequestTimeout(const boost::system::error_code& e, int opaque) { |
| if (e == boost::asio::error::operation_aborted) { |
| LOG_DEBUG("handleAsyncRequestTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data()); |
| return; |
| } |
| |
| LOG_DEBUG("handleAsyncRequestTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data()); |
| |
| std::shared_ptr<ResponseFuture> pFuture(findAndDeleteResponseFuture(opaque)); |
| if (pFuture) { |
| LOG_ERROR("no response got for opaque:%d", opaque); |
| eraseTimerCallback(opaque); |
| if (pFuture->getAsyncCallbackWrap()) { |
| m_handleService.post(boost::bind(&ResponseFuture::invokeExceptionCallback, pFuture)); |
| } |
| } |
| } |
| |
| void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const string& addr) { |
| unique_ptr<RemotingCommand> pRequestCommand(pCmd); |
| int requestCode = pRequestCommand->getCode(); |
| if (m_requestTable.find(requestCode) == m_requestTable.end()) { |
| LOG_ERROR("can_not_find request:%d processor", requestCode); |
| } else { |
| unique_ptr<RemotingCommand> pResponse(m_requestTable[requestCode]->processRequest(addr, pRequestCommand.get())); |
| if (!pRequestCommand->isOnewayRPC()) { |
| if (pResponse) { |
| pResponse->setOpaque(pRequestCommand->getOpaque()); |
| pResponse->markResponseType(); |
| pResponse->Encode(); |
| |
| invokeOneway(addr, *pResponse); |
| } |
| } |
| } |
| } |
| |
| void TcpRemotingClient::addResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture) { |
| std::lock_guard<std::mutex> lock(m_futureTableLock); |
| m_futureTable[opaque] = pFuture; |
| } |
| |
| // Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will |
| // be erased, so caller must ensure the life cycle of returned shared_ptr; |
| std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture(int opaque) { |
| std::lock_guard<std::mutex> lock(m_futureTableLock); |
| std::shared_ptr<ResponseFuture> pResponseFuture; |
| if (m_futureTable.find(opaque) != m_futureTable.end()) { |
| pResponseFuture = m_futureTable[opaque]; |
| m_futureTable.erase(opaque); |
| } |
| return pResponseFuture; |
| } |
| |
| void TcpRemotingClient::registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor) { |
| if (m_requestTable.find(requestCode) != m_requestTable.end()) |
| m_requestTable.erase(requestCode); |
| m_requestTable[requestCode] = clientRemotingProcessor; |
| } |
| |
| void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int opaque) { |
| std::lock_guard<std::mutex> lock(m_asyncTimerTableLock); |
| if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) { |
| LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque); |
| boost::asio::deadline_timer* old_t = m_asyncTimerTable[opaque]; |
| m_asyncTimerTable.erase(opaque); |
| try { |
| old_t->cancel(); |
| } catch (const std::exception& ec) { |
| LOG_WARN("encounter exception when cancel old timer: %s", ec.what()); |
| } |
| delete old_t; |
| } |
| m_asyncTimerTable[opaque] = t; |
| } |
| |
| void TcpRemotingClient::eraseTimerCallback(int opaque) { |
| std::lock_guard<std::mutex> lock(m_asyncTimerTableLock); |
| if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) { |
| LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque); |
| boost::asio::deadline_timer* t = m_asyncTimerTable[opaque]; |
| m_asyncTimerTable.erase(opaque); |
| delete t; |
| } |
| } |
| |
| void TcpRemotingClient::cancelTimerCallback(int opaque) { |
| std::lock_guard<std::mutex> lock(m_asyncTimerTableLock); |
| if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) { |
| LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque); |
| boost::asio::deadline_timer* t = m_asyncTimerTable[opaque]; |
| m_asyncTimerTable.erase(opaque); |
| try { |
| t->cancel(); |
| } catch (const std::exception& ec) { |
| LOG_WARN("encounter exception when cancel timer: %s", ec.what()); |
| } |
| delete t; |
| } |
| } |
| |
| void TcpRemotingClient::removeAllTimerCallback() { |
| std::lock_guard<std::mutex> lock(m_asyncTimerTableLock); |
| for (const auto& timer : m_asyncTimerTable) { |
| boost::asio::deadline_timer* t = timer.second; |
| try { |
| t->cancel(); |
| } catch (const std::exception& ec) { |
| LOG_WARN("encounter exception when cancel timer: %s", ec.what()); |
| } |
| delete t; |
| } |
| m_asyncTimerTable.clear(); |
| } |
| |
| //<!************************************************************************ |
| } // namespace rocketmq |