| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "TcpRemotingClient.h" |
| #include <stddef.h> |
| #if !defined(WIN32) && !defined(__APPLE__) |
| #include <sys/prctl.h> |
| #endif |
| #include "Logging.h" |
| #include "MemoryOutputStream.h" |
| #include "TopAddressing.h" |
| #include "UtilAll.h" |
| |
| namespace rocketmq { |
| |
| //<!************************************************************************ |
| TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout) |
| : m_pullThreadNum(pullThreadNum), |
| m_tcpConnectTimeout(tcpConnectTimeout), |
| m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout), |
| m_namesrvIndex(0), |
| m_ioServiceWork(m_ioService) { |
| #if !defined(WIN32) && !defined(__APPLE__) |
| string taskName = UtilAll::getProcessName(); |
| prctl(PR_SET_NAME, "networkTP", 0, 0, 0); |
| #endif |
| for (int i = 0; i != pullThreadNum; ++i) { |
| m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService)); |
| } |
| #if !defined(WIN32) && !defined(__APPLE__) |
| prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); |
| #endif |
| LOG_INFO( |
| "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, " |
| "m_pullThreadNum:%d", |
| m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum); |
| m_async_service_thread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this))); |
| } |
| |
| void TcpRemotingClient::boost_asio_work() { |
| LOG_INFO("TcpRemotingClient::boost asio async service runing"); |
| boost::asio::io_service::work work(m_async_ioService); // avoid async io |
| // service stops after |
| // first timer timeout |
| // callback |
| m_async_ioService.run(); |
| } |
| |
| TcpRemotingClient::~TcpRemotingClient() { |
| m_tcpTable.clear(); |
| m_futureTable.clear(); |
| m_asyncFutureTable.clear(); |
| m_namesrvAddrList.clear(); |
| removeAllTimerCallback(); |
| } |
| |
| void TcpRemotingClient::stopAllTcpTransportThread() { |
| LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin"); |
| m_async_ioService.stop(); |
| m_async_service_thread->interrupt(); |
| m_async_service_thread->join(); |
| removeAllTimerCallback(); |
| |
| { |
| TcpMap::iterator it = m_tcpTable.begin(); |
| for (; it != m_tcpTable.end(); ++it) { |
| it->second->disconnect(it->first); |
| } |
| m_tcpTable.clear(); |
| } |
| |
| m_ioService.stop(); |
| m_threadpool.join_all(); |
| |
| { |
| boost::lock_guard<boost::mutex> lock(m_futureTableMutex); |
| for (ResMap::iterator it = m_futureTable.begin(); it != m_futureTable.end(); ++it) { |
| if (it->second) |
| it->second->releaseThreadCondition(); |
| } |
| } |
| LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End"); |
| } |
| |
| void TcpRemotingClient::updateNameServerAddressList(const string& addrs) { |
| LOG_INFO("updateNameServerAddressList: [%s]", addrs.c_str()); |
| if (!addrs.empty()) { |
| boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(10))) { |
| LOG_ERROR("updateNameServerAddressList get timed_mutex timeout"); |
| return; |
| } |
| } |
| // clear first; |
| m_namesrvAddrList.clear(); |
| |
| vector<string> out; |
| UtilAll::Split(out, addrs, ";"); |
| for (size_t i = 0; i < out.size(); i++) { |
| string addr = out[i]; |
| UtilAll::Trim(addr); |
| |
| string hostName; |
| short portNumber; |
| if (UtilAll::SplitURL(addr, hostName, portNumber)) { |
| LOG_INFO("update Namesrv:%s", addr.c_str()); |
| m_namesrvAddrList.push_back(addr); |
| } else { |
| LOG_INFO("This may be invalid namer server: [%s]", addr.c_str()); |
| } |
| } |
| out.clear(); |
| } |
| } |
| |
| bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& request) { |
| boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); |
| if (pTcp != NULL) { |
| int code = request.getCode(); |
| int opaque = request.getOpaque(); |
| boost::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, 3000, false, NULL)); |
| addResponseFuture(opaque, responseFuture); |
| // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d, |
| // timeoutms:%d", addr.c_str(), code, opaque, 3000); |
| |
| if (SendCommand(pTcp, request)) { |
| responseFuture->setSendRequestOK(true); |
| unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000)); |
| if (pRsp == NULL) { |
| LOG_ERROR("wait response timeout of heartbeat, so closeTransport of addr:%s", addr.c_str()); |
| findAndDeleteResponseFuture(opaque); |
| CloseTransport(addr, pTcp); |
| return false; |
| } else if (pRsp->getCode() == SUCCESS_VALUE) { |
| return true; |
| } else { |
| LOG_WARN("get error response:%d of heartbeat to addr:%s", pRsp->getCode(), addr.c_str()); |
| return false; |
| } |
| } else { |
| findAndDeleteResponseFuture(opaque); |
| CloseTransport(addr, pTcp); |
| } |
| } |
| return false; |
| } |
| |
| RemotingCommand* TcpRemotingClient::invokeSync(const string& addr, |
| RemotingCommand& request, |
| int timeoutMillis /* = 3000 */) { |
| LOG_DEBUG("InvokeSync:", addr.c_str()); |
| boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); |
| if (pTcp != NULL) { |
| int code = request.getCode(); |
| int opaque = request.getOpaque(); |
| boost::shared_ptr<ResponseFuture> responseFuture( |
| new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL)); |
| addResponseFuture(opaque, responseFuture); |
| |
| if (SendCommand(pTcp, request)) { |
| // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d, |
| // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis); |
| responseFuture->setSendRequestOK(true); |
| RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis); |
| if (pRsp == NULL) { |
| if (code != GET_CONSUMER_LIST_BY_GROUP) { |
| LOG_WARN( |
| "wait response timeout or get NULL response of code:%d, so " |
| "closeTransport of addr:%s", |
| code, addr.c_str()); |
| CloseTransport(addr, pTcp); |
| } |
| // avoid responseFuture leak; |
| findAndDeleteResponseFuture(opaque); |
| return NULL; |
| } else { |
| return pRsp; |
| } |
| } else { |
| // avoid responseFuture leak; |
| findAndDeleteResponseFuture(opaque); |
| CloseTransport(addr, pTcp); |
| } |
| } |
| LOG_DEBUG("InvokeSync [%s] Failed: Cannot Get Transport.", addr.c_str()); |
| return NULL; |
| } |
| |
| bool TcpRemotingClient::invokeAsync(const string& addr, |
| RemotingCommand& request, |
| AsyncCallbackWrap* cbw, |
| int64 timeoutMilliseconds, |
| int maxRetrySendTimes, |
| int retrySendTimes) { |
| boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); |
| if (pTcp != NULL) { |
| //<!not delete, for callback to delete; |
| int code = request.getCode(); |
| int opaque = request.getOpaque(); |
| boost::shared_ptr<ResponseFuture> responseFuture( |
| new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw)); |
| responseFuture->setMaxRetrySendTimes(maxRetrySendTimes); |
| responseFuture->setRetrySendTimes(retrySendTimes); |
| responseFuture->setBrokerAddr(addr); |
| responseFuture->setRequestCommand(request); |
| addAsyncResponseFuture(opaque, responseFuture); |
| if (cbw) { |
| boost::asio::deadline_timer* t = |
| new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMilliseconds)); |
| addTimerCallback(t, opaque); |
| boost::system::error_code e; |
| t->async_wait(boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout, this, e, opaque)); |
| } |
| |
| if (SendCommand(pTcp, request)) // Even if send failed, asyncTimerThread |
| // will trigger next pull request or report |
| // send msg failed |
| { |
| LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque); |
| responseFuture->setSendRequestOK(true); |
| } |
| return true; |
| } |
| LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str()); |
| return false; |
| } |
| |
| void TcpRemotingClient::invokeOneway(const string& addr, RemotingCommand& request) { |
| //<!not need callback; |
| boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); |
| if (pTcp != NULL) { |
| request.markOnewayRPC(); |
| LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(), request.getCode()); |
| SendCommand(pTcp, request); |
| } |
| } |
| |
| boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string& addr, bool needRespons) { |
| if (addr.empty()) { |
| LOG_DEBUG("GetTransport of NameServer"); |
| return CreateNameserverTransport(needRespons); |
| } |
| return CreateTransport(addr, needRespons); |
| } |
| |
| boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string& addr, bool needRespons) { |
| boost::shared_ptr<TcpTransport> tts; |
| { |
| // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking |
| // long |
| // time, if could not get m_tcpLock, return NULL |
| bool bGetMutex = false; |
| boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { |
| LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str()); |
| boost::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } else { |
| bGetMutex = true; |
| } |
| } else { |
| bGetMutex = true; |
| } |
| if (bGetMutex) { |
| if (m_tcpTable.find(addr) != m_tcpTable.end()) { |
| boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]); |
| boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock(); |
| if (tcp) { |
| tcpConnectStatus connectStatus = tcp->getTcpConnectStatus(); |
| if (connectStatus == e_connectWaitResponse) { |
| boost::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } else if (connectStatus == e_connectFail) { |
| LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str()); |
| tcp->disconnect(addr); // avoid coredump when connection with broker was broken |
| m_tcpTable.erase(addr); |
| } else if (connectStatus == e_connectSuccess) { |
| return tcp; |
| } else { |
| LOG_ERROR( |
| "go to fault state, erase:%s from tcpMap, and reconnect " |
| "it", |
| addr.c_str()); |
| m_tcpTable.erase(addr); |
| } |
| } |
| } |
| |
| //<!callback; |
| READ_CALLBACK callback = needRespons ? &TcpRemotingClient::static_messageReceived : NULL; |
| |
| tts.reset(new TcpTransport(this, callback)); |
| tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout); |
| if (connectStatus != e_connectWaitResponse) { |
| LOG_WARN("can not connect to :%s", addr.c_str()); |
| tts->disconnect(addr); |
| boost::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } else { |
| m_tcpTable[addr] = tts; // even if connecting failed finally, this |
| // server transport will be erased by next |
| // CreateTransport |
| } |
| } else { |
| LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str()); |
| boost::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } |
| } |
| |
| tcpConnectStatus connectStatus = tts->waitTcpConnectEvent(m_tcpConnectTimeout); |
| if (connectStatus != e_connectSuccess) { |
| LOG_WARN("can not connect to server:%s", addr.c_str()); |
| tts->disconnect(addr); |
| boost::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } else { |
| LOG_INFO("connect server with addr:%s success", addr.c_str()); |
| return tts; |
| } |
| } |
| |
| boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameserverTransport(bool needRespons) { |
| // m_namesrvLock was added to avoid operation of nameServer was blocked by |
| // m_tcpLock, it was used by single Thread mostly, so no performance impact |
| // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long |
| // time, if could not get m_namesrvlock, return NULL |
| LOG_DEBUG("--CreateNameserverTransport--"); |
| bool bGetMutex = false; |
| boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { |
| LOG_ERROR("CreateNameserverTransport get timed_mutex timeout"); |
| boost::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } else { |
| bGetMutex = true; |
| } |
| } else { |
| bGetMutex = true; |
| } |
| |
| if (bGetMutex) { |
| if (!m_namesrvAddrChoosed.empty()) { |
| boost::shared_ptr<TcpTransport> pTcp = GetTransport(m_namesrvAddrChoosed, true); |
| if (pTcp) |
| return pTcp; |
| else |
| m_namesrvAddrChoosed.clear(); |
| } |
| |
| vector<string>::iterator itp = m_namesrvAddrList.begin(); |
| for (; itp != m_namesrvAddrList.end(); ++itp) { |
| unsigned int index = m_namesrvIndex % m_namesrvAddrList.size(); |
| if (m_namesrvIndex == numeric_limits<unsigned int>::max()) |
| m_namesrvIndex = 0; |
| m_namesrvIndex++; |
| LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT "", m_namesrvIndex, index, |
| m_namesrvAddrList.size()); |
| boost::shared_ptr<TcpTransport> pTcp = GetTransport(m_namesrvAddrList[index], true); |
| if (pTcp) { |
| m_namesrvAddrChoosed = m_namesrvAddrList[index]; |
| return pTcp; |
| } |
| } |
| boost::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } else { |
| LOG_WARN("get nameServer tcpTransport mutex failed"); |
| boost::shared_ptr<TcpTransport> pTcp; |
| return pTcp; |
| } |
| } |
| |
| void TcpRemotingClient::CloseTransport(const string& addr, boost::shared_ptr<TcpTransport> pTcp) { |
| if (addr.empty()) { |
| return CloseNameServerTransport(pTcp); |
| } |
| |
| bool bGetMutex = false; |
| boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { |
| LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str()); |
| return; |
| } else { |
| bGetMutex = true; |
| } |
| } else { |
| bGetMutex = true; |
| } |
| LOG_ERROR("CloseTransport of:%s", addr.c_str()); |
| if (bGetMutex) { |
| bool removeItemFromTable = true; |
| if (m_tcpTable.find(addr) != m_tcpTable.end()) { |
| if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) { |
| LOG_INFO( |
| "tcpTransport with addr:%s has been closed before, and has been " |
| "created again, nothing to do", |
| addr.c_str()); |
| removeItemFromTable = false; |
| } |
| } else { |
| LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str()); |
| removeItemFromTable = false; |
| } |
| |
| if (removeItemFromTable == true) { |
| LOG_WARN("closeTransport: disconnect broker:%s with state:%d", addr.c_str(), |
| m_tcpTable[addr]->getTcpConnectStatus()); |
| if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess) |
| m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken |
| LOG_WARN("closeTransport: erase broker: %s", addr.c_str()); |
| m_tcpTable.erase(addr); |
| } |
| } else { |
| LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str()); |
| return; |
| } |
| LOG_ERROR("CloseTransport of:%s end", addr.c_str()); |
| } |
| |
| void TcpRemotingClient::CloseNameServerTransport(boost::shared_ptr<TcpTransport> pTcp) { |
| bool bGetMutex = false; |
| boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock); |
| if (!lock.owns_lock()) { |
| if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { |
| LOG_ERROR("CreateNameserverTransport get timed_mutex timeout"); |
| return; |
| } else { |
| bGetMutex = true; |
| } |
| } else { |
| bGetMutex = true; |
| } |
| if (bGetMutex) { |
| string addr = m_namesrvAddrChoosed; |
| bool removeItemFromTable = true; |
| if (m_tcpTable.find(addr) != m_tcpTable.end()) { |
| if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) { |
| LOG_INFO( |
| "tcpTransport with addr:%s has been closed before, and has been " |
| "created again, nothing to do", |
| addr.c_str()); |
| removeItemFromTable = false; |
| } |
| } else { |
| LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str()); |
| removeItemFromTable = false; |
| } |
| |
| if (removeItemFromTable == true) { |
| m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken |
| LOG_WARN("closeTransport: erase broker: %s", addr.c_str()); |
| m_tcpTable.erase(addr); |
| m_namesrvAddrChoosed.clear(); |
| } |
| } else { |
| LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s", m_namesrvAddrChoosed.c_str()); |
| return; |
| } |
| } |
| |
| bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts, RemotingCommand& msg) { |
| const MemoryBlock* phead = msg.GetHead(); |
| const MemoryBlock* pbody = msg.GetBody(); |
| |
| unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024)); |
| if (phead->getData()) { |
| result->write(phead->getData(), phead->getSize()); |
| } |
| if (pbody->getData()) { |
| result->write(pbody->getData(), pbody->getSize()); |
| } |
| const char* pData = static_cast<const char*>(result->getData()); |
| int len = result->getDataSize(); |
| return pTts->sendMessage(pData, len); |
| } |
| |
| void TcpRemotingClient::static_messageReceived(void* context, const MemoryBlock& mem, const string& addr) { |
| TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context; |
| if (pTcpRemotingClient) |
| pTcpRemotingClient->messageReceived(mem, addr); |
| } |
| |
| void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& addr) { |
| m_ioService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr)); |
| } |
| |
| void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) { |
| RemotingCommand* pRespondCmd = NULL; |
| try { |
| pRespondCmd = RemotingCommand::Decode(mem); |
| } catch (...) { |
| LOG_ERROR("processData_error"); |
| return; |
| } |
| |
| int opaque = pRespondCmd->getOpaque(); |
| |
| //<!process self; |
| if (pRespondCmd->isResponseType()) { |
| boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque)); |
| if (!pFuture) { |
| pFuture = findAndDeleteResponseFuture(opaque); |
| if (pFuture) { |
| if (pFuture->getSyncResponseFlag()) { |
| LOG_WARN("waitResponse already timeout of opaque:%d", opaque); |
| deleteAndZero(pRespondCmd); |
| return; |
| } |
| LOG_DEBUG("find_response opaque:%d", opaque); |
| } else { |
| LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", opaque); |
| deleteAndZero(pRespondCmd); |
| return; |
| } |
| } |
| processResponseCommand(pRespondCmd, pFuture); |
| } else { |
| processRequestCommand(pRespondCmd, addr); |
| } |
| } |
| |
| void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, boost::shared_ptr<ResponseFuture> pfuture) { |
| int code = pfuture->getRequestCode(); |
| int opaque = pCmd->getOpaque(); |
| LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque, |
| pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes()); |
| pCmd->SetExtHeader(code); // set head , for response use |
| |
| pfuture->setResponse(pCmd); |
| |
| if (pfuture->getASyncFlag()) { |
| if (!pfuture->getAsyncResponseFlag()) { |
| pfuture->setAsyncResponseFlag(); |
| pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response); |
| cancelTimerCallback(opaque); |
| pfuture->executeInvokeCallback(); |
| } |
| } |
| } |
| |
| void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const string& addr) { |
| unique_ptr<RemotingCommand> pRequestCommand(pCmd); |
| int requestCode = pRequestCommand->getCode(); |
| if (m_requestTable.find(requestCode) == m_requestTable.end()) { |
| LOG_ERROR("can_not_find request:%d processor", requestCode); |
| } else { |
| unique_ptr<RemotingCommand> pResponse(m_requestTable[requestCode]->processRequest(addr, pRequestCommand.get())); |
| if (!pRequestCommand->isOnewayRPC()) { |
| if (pResponse) { |
| pResponse->setOpaque(pRequestCommand->getOpaque()); |
| pResponse->markResponseType(); |
| pResponse->Encode(); |
| |
| invokeOneway(addr, *pResponse); |
| } |
| } |
| } |
| } |
| |
| void TcpRemotingClient::addResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture) { |
| boost::lock_guard<boost::mutex> lock(m_futureTableMutex); |
| m_futureTable[opaque] = pfuture; |
| } |
| |
| // Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will |
| // be erased, so caller must ensure the life cycle of returned shared_ptr; |
| boost::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture(int opaque) { |
| boost::lock_guard<boost::mutex> lock(m_futureTableMutex); |
| boost::shared_ptr<ResponseFuture> pResponseFuture; |
| if (m_futureTable.find(opaque) != m_futureTable.end()) { |
| pResponseFuture = m_futureTable[opaque]; |
| m_futureTable.erase(opaque); |
| } |
| return pResponseFuture; |
| } |
| |
| void TcpRemotingClient::handleAsyncPullForResponseTimeout(const boost::system::error_code& e, int opaque) { |
| if (e == boost::asio::error::operation_aborted) { |
| LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(), |
| e.message().data()); |
| return; |
| } |
| |
| LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data()); |
| boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque)); |
| if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) { |
| if ((pFuture->getAsyncResponseFlag() != true)) // if no response received, then check timeout or not |
| { |
| LOG_ERROR("no response got for opaque:%d", opaque); |
| pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout); |
| pFuture->executeInvokeCallbackException(); |
| } |
| } |
| |
| eraseTimerCallback(opaque); |
| } |
| |
| void TcpRemotingClient::addAsyncResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture) { |
| boost::lock_guard<boost::mutex> lock(m_asyncFutureLock); |
| m_asyncFutureTable[opaque] = pfuture; |
| } |
| |
| // Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] will |
| // be erased, so caller must ensure the life cycle of returned shared_ptr; |
| boost::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) { |
| boost::lock_guard<boost::mutex> lock(m_asyncFutureLock); |
| boost::shared_ptr<ResponseFuture> pResponseFuture; |
| if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) { |
| pResponseFuture = m_asyncFutureTable[opaque]; |
| m_asyncFutureTable.erase(opaque); |
| } |
| |
| return pResponseFuture; |
| } |
| |
| void TcpRemotingClient::registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor) { |
| if (m_requestTable.find(requestCode) != m_requestTable.end()) |
| m_requestTable.erase(requestCode); |
| m_requestTable[requestCode] = clientRemotingProcessor; |
| } |
| |
| void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int opaque) { |
| boost::lock_guard<boost::mutex> lock(m_timerMapMutex); |
| if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) { |
| LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque); |
| boost::asio::deadline_timer* old_t = m_async_timer_map[opaque]; |
| old_t->cancel(); |
| delete old_t; |
| old_t = NULL; |
| m_async_timer_map.erase(opaque); |
| } |
| m_async_timer_map[opaque] = t; |
| } |
| |
| void TcpRemotingClient::eraseTimerCallback(int opaque) { |
| boost::lock_guard<boost::mutex> lock(m_timerMapMutex); |
| if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) { |
| LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque); |
| boost::asio::deadline_timer* t = m_async_timer_map[opaque]; |
| delete t; |
| t = NULL; |
| m_async_timer_map.erase(opaque); |
| } |
| } |
| |
| void TcpRemotingClient::cancelTimerCallback(int opaque) { |
| boost::lock_guard<boost::mutex> lock(m_timerMapMutex); |
| if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) { |
| LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque); |
| boost::asio::deadline_timer* t = m_async_timer_map[opaque]; |
| t->cancel(); |
| delete t; |
| t = NULL; |
| m_async_timer_map.erase(opaque); |
| } |
| } |
| |
| void TcpRemotingClient::removeAllTimerCallback() { |
| boost::lock_guard<boost::mutex> lock(m_timerMapMutex); |
| for (asyncTimerMap::iterator it = m_async_timer_map.begin(); it != m_async_timer_map.end(); ++it) { |
| boost::asio::deadline_timer* t = it->second; |
| t->cancel(); |
| delete t; |
| t = NULL; |
| } |
| m_async_timer_map.clear(); |
| } |
| |
| void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) { |
| // delete the map record of opaque<->ResponseFuture, so the answer for the pull request will discard when receive it |
| // later |
| boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque)); |
| if (!pFuture) { |
| pFuture = findAndDeleteResponseFuture(opaque); |
| if (pFuture) { |
| LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); |
| } |
| } else { |
| LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); |
| } |
| // delete the timeout timer for opaque for pullrequest |
| cancelTimerCallback(opaque); |
| } |
| |
| //<!************************************************************************ |
| } // namespace rocketmq |