blob: c524cb7eb6fb3311ad4dd8b6de2a267c77f63990 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TcpRemotingClient.h"
#include <cstddef>
#include <algorithm> // std::move
#include "Logging.h"
#include "UtilAll.h"
namespace rocketmq {
class ResponseFutureInfo : public TcpTransportInfo {
public:
using FutureMap = std::map<int, ResponseFuturePtr>;
public:
~ResponseFutureInfo() {
// FIXME: Can optimize?
activeAllResponses();
}
void putResponseFuture(int opaque, ResponseFuturePtr future) {
std::lock_guard<std::mutex> lock(future_table_mutex_);
const auto it = future_table_.find(opaque);
if (it != future_table_.end()) {
LOG_WARN_NEW("[BUG] response futurn with opaque:{} is exist.", opaque);
}
future_table_[opaque] = future;
}
ResponseFuturePtr popResponseFuture(int opaque) {
std::lock_guard<std::mutex> lock(future_table_mutex_);
const auto& it = future_table_.find(opaque);
if (it != future_table_.end()) {
auto future = it->second;
future_table_.erase(it);
return future;
}
return nullptr;
}
void scanResponseTable(uint64_t now, std::vector<ResponseFuturePtr>& futureList) {
std::lock_guard<std::mutex> lock(future_table_mutex_);
for (auto it = future_table_.begin(); it != future_table_.end();) {
auto& future = it->second; // NOTE: future is a reference
if (future->begin_timestamp() + future->timeout_millis() + 1000 <= now) {
LOG_WARN_NEW("remove timeout request, code:{}, opaque:{}", future->request_code(), future->opaque());
futureList.push_back(future);
it = future_table_.erase(it);
} else {
++it;
}
}
}
void activeAllResponses() {
std::lock_guard<std::mutex> lock(future_table_mutex_);
for (const auto& it : future_table_) {
auto& future = it.second;
if (future->hasInvokeCallback()) {
future->executeInvokeCallback(); // callback async request
} else {
future->putResponse(nullptr); // wake up sync request
}
}
future_table_.clear();
}
private:
FutureMap future_table_; // opaque -> future
std::mutex future_table_mutex_;
};
TcpRemotingClient::TcpRemotingClient(int workerThreadNum,
uint64_t tcpConnectTimeout,
uint64_t tcpTransportTryLockTimeout)
: tcp_connect_timeout_(tcpConnectTimeout),
tcp_transport_try_lock_timeout_(tcpTransportTryLockTimeout),
namesrv_index_(0),
dispatch_executor_("MessageDispatchExecutor", 1, false),
handle_executor_("MessageHandleExecutor", workerThreadNum, false),
timeout_executor_("TimeoutScanExecutor", false) {}
TcpRemotingClient::~TcpRemotingClient() {
LOG_DEBUG_NEW("TcpRemotingClient is destructed.");
}
void TcpRemotingClient::start() {
dispatch_executor_.startup();
handle_executor_.startup();
LOG_INFO_NEW("tcpConnectTimeout:{}, tcpTransportTryLockTimeout:{}, pullThreadNums:{}", tcp_connect_timeout_,
tcp_transport_try_lock_timeout_, handle_executor_.thread_nums());
timeout_executor_.startup();
// scanResponseTable
timeout_executor_.schedule(std::bind(&TcpRemotingClient::scanResponseTablePeriodically, this), 1000 * 3,
time_unit::milliseconds);
}
void TcpRemotingClient::shutdown() {
LOG_INFO_NEW("TcpRemotingClient::shutdown Begin");
timeout_executor_.shutdown();
{
std::lock_guard<std::timed_mutex> lock(transport_table_mutex_);
for (const auto& trans : transport_table_) {
trans.second->disconnect(trans.first);
}
transport_table_.clear();
}
handle_executor_.shutdown();
dispatch_executor_.shutdown();
LOG_INFO_NEW("TcpRemotingClient::shutdown End, transport_table_:{}", transport_table_.size());
}
void TcpRemotingClient::registerRPCHook(RPCHookPtr rpcHook) {
if (rpcHook != nullptr) {
for (auto& hook : rpc_hooks_) {
if (hook == rpcHook) {
return;
}
}
rpc_hooks_.push_back(rpcHook);
}
}
void TcpRemotingClient::updateNameServerAddressList(const std::string& addrs) {
LOG_INFO_NEW("updateNameServerAddressList: [{}]", addrs);
if (addrs.empty()) {
return;
}
if (!UtilAll::try_lock_for(namesrv_lock_, 1000 * 10)) {
LOG_ERROR_NEW("updateNameServerAddressList get timed_mutex timeout");
return;
}
std::lock_guard<std::timed_mutex> lock(namesrv_lock_, std::adopt_lock);
// clear first
namesrv_addr_list_.clear();
std::vector<std::string> out;
UtilAll::Split(out, addrs, ";");
for (auto& addr : out) {
UtilAll::Trim(addr);
std::string hostName;
short portNumber;
if (UtilAll::SplitURL(addr, hostName, portNumber)) {
LOG_INFO_NEW("update Namesrv:{}", addr);
namesrv_addr_list_.push_back(addr);
} else {
LOG_INFO_NEW("This may be invalid namer server: [{}]", addr);
}
}
}
void TcpRemotingClient::scanResponseTablePeriodically() {
try {
scanResponseTable();
} catch (std::exception& e) {
LOG_ERROR_NEW("scanResponseTable exception: {}", e.what());
}
// next round
timeout_executor_.schedule(std::bind(&TcpRemotingClient::scanResponseTablePeriodically, this), 1000,
time_unit::milliseconds);
}
void TcpRemotingClient::scanResponseTable() {
std::vector<TcpTransportPtr> channelList;
{
std::lock_guard<std::timed_mutex> lock(transport_table_mutex_);
for (const auto& trans : transport_table_) {
channelList.push_back(trans.second);
}
}
auto now = UtilAll::currentTimeMillis();
std::vector<ResponseFuturePtr> rfList;
for (auto channel : channelList) {
static_cast<ResponseFutureInfo*>(channel->getInfo())->scanResponseTable(now, rfList);
}
for (auto rf : rfList) {
if (rf->hasInvokeCallback()) {
handle_executor_.submit(std::bind(&ResponseFuture::executeInvokeCallback, rf));
}
}
}
std::unique_ptr<RemotingCommand> TcpRemotingClient::invokeSync(const std::string& addr,
RemotingCommand& request,
int timeoutMillis) {
auto beginStartTime = UtilAll::currentTimeMillis();
auto channel = GetTransport(addr);
if (channel != nullptr) {
try {
doBeforeRpcHooks(addr, request, true);
auto costTime = UtilAll::currentTimeMillis() - beginStartTime;
if (timeoutMillis <= 0 || timeoutMillis < costTime) {
THROW_MQEXCEPTION(RemotingTimeoutException, "invokeSync call timeout", -1);
}
std::unique_ptr<RemotingCommand> response(invokeSyncImpl(channel, request, timeoutMillis));
doAfterRpcHooks(addr, request, response.get(), false);
return response;
} catch (const RemotingSendRequestException& e) {
LOG_WARN_NEW("invokeSync: send request exception, so close the channel[{}]", channel->getPeerAddrAndPort());
CloseTransport(addr, channel);
throw;
} catch (const RemotingTimeoutException& e) {
int code = request.code();
if (code != GET_CONSUMER_LIST_BY_GROUP) {
CloseTransport(addr, channel);
LOG_WARN_NEW("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis,
channel->getPeerAddrAndPort());
}
LOG_WARN_NEW("invokeSync: wait response timeout exception, the channel[{}]", channel->getPeerAddrAndPort());
throw;
}
} else {
THROW_MQEXCEPTION(RemotingConnectException, "connect to <" + addr + "> failed", -1);
}
}
std::unique_ptr<RemotingCommand> TcpRemotingClient::invokeSyncImpl(TcpTransportPtr channel,
RemotingCommand& request,
int64_t timeoutMillis) {
int code = request.code();
int opaque = request.opaque();
auto responseFuture = std::make_shared<ResponseFuture>(code, opaque, timeoutMillis);
putResponseFuture(channel, opaque, responseFuture);
if (SendCommand(channel, request)) {
responseFuture->set_send_request_ok(true);
} else {
responseFuture->set_send_request_ok(false);
popResponseFuture(channel, opaque); // clean future
// wake up potential waitResponse() is unnecessary, so not call putResponse()
LOG_WARN_NEW("send a request command to channel <{}> failed.", channel->getPeerAddrAndPort());
THROW_MQEXCEPTION(RemotingSendRequestException, "send request to <" + channel->getPeerAddrAndPort() + "> failed",
-1);
}
std::unique_ptr<RemotingCommand> response(responseFuture->waitResponse(timeoutMillis));
if (nullptr == response) { // timeout
popResponseFuture(channel, opaque); // clean future
THROW_MQEXCEPTION(RemotingTimeoutException,
"wait response on the addr <" + channel->getPeerAddrAndPort() + "> timeout", -1);
}
return response;
}
void TcpRemotingClient::invokeAsync(const std::string& addr,
RemotingCommand& request,
std::unique_ptr<InvokeCallback>& invokeCallback,
int64_t timeoutMillis) {
auto beginStartTime = UtilAll::currentTimeMillis();
auto channel = GetTransport(addr);
if (channel != nullptr) {
try {
doBeforeRpcHooks(addr, request, true);
auto costTime = UtilAll::currentTimeMillis() - beginStartTime;
if (timeoutMillis <= 0 || timeoutMillis < costTime) {
THROW_MQEXCEPTION(RemotingTooMuchRequestException, "invokeAsync call timeout", -1);
}
invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
} catch (const RemotingSendRequestException& e) {
LOG_WARN_NEW("invokeAsync: send request exception, so close the channel[{}]", channel->getPeerAddrAndPort());
CloseTransport(addr, channel);
throw;
}
} else {
THROW_MQEXCEPTION(RemotingConnectException, "connect to <" + addr + "> failed", -1);
}
}
void TcpRemotingClient::invokeAsyncImpl(TcpTransportPtr channel,
RemotingCommand& request,
int64_t timeoutMillis,
std::unique_ptr<InvokeCallback>& invokeCallback) {
int code = request.code();
int opaque = request.opaque();
// delete in callback
auto responseFuture = std::make_shared<ResponseFuture>(code, opaque, timeoutMillis, std::move(invokeCallback));
putResponseFuture(channel, opaque, responseFuture);
if (SendCommand(channel, request)) {
responseFuture->set_send_request_ok(true);
} else {
// request fail
responseFuture = popResponseFuture(channel, opaque);
if (responseFuture != nullptr) {
responseFuture->set_send_request_ok(false);
if (responseFuture->hasInvokeCallback()) {
handle_executor_.submit(std::bind(&ResponseFuture::executeInvokeCallback, responseFuture));
}
}
LOG_WARN_NEW("send a request command to channel <{}> failed.", channel->getPeerAddrAndPort());
}
}
void TcpRemotingClient::invokeOneway(const std::string& addr, RemotingCommand& request) {
auto channel = GetTransport(addr);
if (channel != nullptr) {
try {
doBeforeRpcHooks(addr, request, true);
invokeOnewayImpl(channel, request);
} catch (const RemotingSendRequestException& e) {
LOG_WARN_NEW("invokeOneway: send request exception, so close the channel[{}]", channel->getPeerAddrAndPort());
CloseTransport(addr, channel);
throw;
}
} else {
THROW_MQEXCEPTION(RemotingConnectException, "connect to <" + addr + "> failed", -1);
}
}
void TcpRemotingClient::invokeOnewayImpl(TcpTransportPtr channel, RemotingCommand& request) {
request.markOnewayRPC();
try {
if (!SendCommand(channel, request)) {
LOG_WARN_NEW("send a request command to channel <{}> failed.", channel->getPeerAddrAndPort());
}
} catch (const std::exception& e) {
LOG_WARN_NEW("send a request command to channel <{}> Exception.\n{}", channel->getPeerAddrAndPort(), e.what());
THROW_MQEXCEPTION(RemotingSendRequestException, "send request to <" + channel->getPeerAddrAndPort() + "> failed",
-1);
}
}
void TcpRemotingClient::doBeforeRpcHooks(const std::string& addr, RemotingCommand& request, bool toSent) {
if (rpc_hooks_.size() > 0) {
for (auto& rpcHook : rpc_hooks_) {
rpcHook->doBeforeRequest(addr, request, toSent);
}
}
}
void TcpRemotingClient::doAfterRpcHooks(const std::string& addr,
RemotingCommand& request,
RemotingCommand* response,
bool toSent) {
if (rpc_hooks_.size() > 0) {
for (auto& rpcHook : rpc_hooks_) {
rpcHook->doAfterResponse(addr, request, response, toSent);
}
}
}
TcpTransportPtr TcpRemotingClient::GetTransport(const std::string& addr) {
if (addr.empty()) {
LOG_DEBUG_NEW("Get namesrv transport");
return CreateNameServerTransport();
}
return CreateTransport(addr);
}
TcpTransportPtr TcpRemotingClient::CreateTransport(const std::string& addr) {
TcpTransportPtr channel;
{
// try get transport_table_mutex_ until tcp_transport_try_lock_timeout_ to avoid blocking long time,
// if could not get transport_table_mutex_, return NULL
if (!UtilAll::try_lock_for(transport_table_mutex_, 1000 * tcp_transport_try_lock_timeout_)) {
LOG_ERROR_NEW("GetTransport of:{} get timed_mutex timeout", addr);
return nullptr;
}
std::lock_guard<std::timed_mutex> lock(transport_table_mutex_, std::adopt_lock);
// check for reuse
const auto& it = transport_table_.find(addr);
if (it != transport_table_.end()) {
channel = it->second;
}
if (channel != nullptr) {
TcpConnectStatus connectStatus = channel->getTcpConnectStatus();
switch (connectStatus) {
// case TCP_CONNECT_STATUS_CREATED:
case TCP_CONNECT_STATUS_CONNECTED:
return channel;
case TCP_CONNECT_STATUS_CONNECTING:
// wait server answer
break;
case TCP_CONNECT_STATUS_FAILED:
LOG_ERROR_NEW("tcpTransport with server disconnected, erase server:{}", addr);
channel->disconnect(addr);
transport_table_.erase(it);
break;
default: // TCP_CONNECT_STATUS_CLOSED
LOG_ERROR_NEW("go to CLOSED state, erase:{} from transportTable, and reconnect it", addr);
transport_table_.erase(it);
break;
}
} else {
// callback
TcpTransport::ReadCallback readCallback =
std::bind(&TcpRemotingClient::messageReceived, this, std::placeholders::_1, std::placeholders::_2);
TcpTransport::CloseCallback closeCallback =
std::bind(&TcpRemotingClient::channelClosed, this, std::placeholders::_1);
// create new transport, then connect server
std::unique_ptr<ResponseFutureInfo> responseFutureInfo(new ResponseFutureInfo());
channel = TcpTransport::CreateTransport(readCallback, closeCallback, std::move(responseFutureInfo));
TcpConnectStatus connectStatus = channel->connect(addr, 0); // use non-block
if (connectStatus != TCP_CONNECT_STATUS_CONNECTING) {
LOG_WARN_NEW("can not connect to:{}", addr);
channel->disconnect(addr);
return nullptr;
} else {
// even if connecting failed finally, this server transport will be erased by next CreateTransport
transport_table_[addr] = channel;
}
}
}
// waiting...
TcpConnectStatus connectStatus = channel->waitTcpConnectEvent(static_cast<int>(tcp_connect_timeout_));
if (connectStatus != TCP_CONNECT_STATUS_CONNECTED) {
LOG_WARN_NEW("can not connect to server:{}", addr);
// channel->disconnect(addr);
return nullptr;
} else {
LOG_INFO_NEW("connect server with addr:{} success", addr);
return channel;
}
}
TcpTransportPtr TcpRemotingClient::CreateNameServerTransport() {
// namesrv_lock_ was added to avoid operation of NameServer was blocked by
// m_tcpLock, it was used by single Thread mostly, so no performance impact
// try get m_tcpLock until tcp_transport_try_lock_timeout_ to avoid blocking long
// time, if could not get m_namesrvlock, return NULL
LOG_DEBUG_NEW("create namesrv transport");
if (!UtilAll::try_lock_for(namesrv_lock_, 1000 * tcp_transport_try_lock_timeout_)) {
LOG_ERROR_NEW("CreateNameserverTransport get timed_mutex timeout");
return nullptr;
}
std::lock_guard<std::timed_mutex> lock(namesrv_lock_, std::adopt_lock);
if (!namesrv_addr_choosed_.empty()) {
auto channel = CreateTransport(namesrv_addr_choosed_);
if (channel != nullptr) {
return channel;
} else {
namesrv_addr_choosed_.clear();
}
}
for (unsigned i = 0; i < namesrv_addr_list_.size(); i++) {
auto index = namesrv_index_++ % namesrv_addr_list_.size();
LOG_INFO_NEW("namesrvIndex is:{}, index:{}, namesrvaddrlist size:{}", namesrv_index_, index,
namesrv_addr_list_.size());
auto channel = CreateTransport(namesrv_addr_list_[index]);
if (channel != nullptr) {
namesrv_addr_choosed_ = namesrv_addr_list_[index];
return channel;
}
}
return nullptr;
}
bool TcpRemotingClient::CloseTransport(const std::string& addr, TcpTransportPtr channel) {
if (addr.empty()) {
return CloseNameServerTransport(channel);
}
if (!UtilAll::try_lock_for(transport_table_mutex_, 1000 * tcp_transport_try_lock_timeout_)) {
LOG_ERROR_NEW("CloseTransport of:{} get timed_mutex timeout", addr);
return false;
}
std::lock_guard<std::timed_mutex> lock(transport_table_mutex_, std::adopt_lock);
LOG_INFO_NEW("CloseTransport of:{}", addr);
bool removeItemFromTable = true;
const auto& it = transport_table_.find(addr);
if (it != transport_table_.end()) {
if (it->second->getStartTime() != channel->getStartTime()) {
LOG_INFO_NEW("tcpTransport with addr:{} has been closed before, and has been created again, nothing to do", addr);
removeItemFromTable = false;
}
} else {
LOG_INFO_NEW("tcpTransport with addr:{} had been removed from tcpTable before", addr);
removeItemFromTable = false;
}
if (removeItemFromTable) {
LOG_WARN_NEW("closeTransport: erase broker: {}", addr);
transport_table_.erase(addr);
}
LOG_WARN_NEW("closeTransport: disconnect:{} with state:{}", addr, channel->getTcpConnectStatus());
if (channel->getTcpConnectStatus() != TCP_CONNECT_STATUS_CLOSED) {
channel->disconnect(addr); // avoid coredump when connection with server was broken
}
LOG_ERROR_NEW("CloseTransport of:{} end", addr);
return removeItemFromTable;
}
bool TcpRemotingClient::CloseNameServerTransport(TcpTransportPtr channel) {
if (!UtilAll::try_lock_for(namesrv_lock_, 1000 * tcp_transport_try_lock_timeout_)) {
LOG_ERROR_NEW("CloseNameServerTransport get timed_mutex timeout");
return false;
}
std::lock_guard<std::timed_mutex> lock(namesrv_lock_, std::adopt_lock);
std::string addr = namesrv_addr_choosed_;
bool removeItemFromTable = CloseTransport(addr, channel);
if (removeItemFromTable) {
namesrv_addr_choosed_.clear();
}
return removeItemFromTable;
}
bool TcpRemotingClient::SendCommand(TcpTransportPtr channel, RemotingCommand& msg) noexcept {
auto package = msg.encode();
return channel->sendMessage(package->array(), package->size());
}
void TcpRemotingClient::channelClosed(TcpTransportPtr channel) {
LOG_DEBUG_NEW("channel of {} is closed.", channel->getPeerAddrAndPort());
handle_executor_.submit([channel] { static_cast<ResponseFutureInfo*>(channel->getInfo())->activeAllResponses(); });
}
void TcpRemotingClient::messageReceived(ByteArrayRef msg, TcpTransportPtr channel) {
dispatch_executor_.submit(std::bind(&TcpRemotingClient::processMessageReceived, this, std::move(msg), channel));
}
void TcpRemotingClient::processMessageReceived(ByteArrayRef msg, TcpTransportPtr channel) {
std::unique_ptr<RemotingCommand> cmd;
try {
cmd = RemotingCommand::Decode(std::move(msg));
} catch (...) {
LOG_ERROR_NEW("processMessageReceived error");
return;
}
if (cmd->isResponseType()) {
processResponseCommand(std::move(cmd), channel);
} else {
class task_adaptor {
public:
task_adaptor(TcpRemotingClient* client, std::unique_ptr<RemotingCommand> cmd, TcpTransportPtr channel)
: client_(client), cmd_(cmd.release()), channel_(channel) {}
task_adaptor(const task_adaptor& other) : client_(other.client_), cmd_(other.cmd_), channel_(other.channel_) {
// force move
const_cast<task_adaptor*>(&other)->cmd_ = nullptr;
}
task_adaptor(task_adaptor&& other) : client_(other.client_), cmd_(other.cmd_), channel_(other.channel_) {
other.cmd_ = nullptr;
}
~task_adaptor() { delete cmd_; }
void operator()() {
std::unique_ptr<RemotingCommand> requestCommand(cmd_);
cmd_ = nullptr;
client_->processRequestCommand(std::move(requestCommand), channel_);
}
private:
TcpRemotingClient* client_;
RemotingCommand* cmd_;
TcpTransportPtr channel_;
};
handle_executor_.submit(task_adaptor(this, std::move(cmd), channel));
}
}
void TcpRemotingClient::processResponseCommand(std::unique_ptr<RemotingCommand> responseCommand,
TcpTransportPtr channel) {
int opaque = responseCommand->opaque();
auto responseFuture = popResponseFuture(channel, opaque);
if (responseFuture != nullptr) {
int code = responseFuture->request_code();
LOG_DEBUG_NEW("processResponseCommand, opaque:{}, request code:{}, server:{}", opaque, code,
channel->getPeerAddrAndPort());
if (responseFuture->hasInvokeCallback()) {
responseFuture->setResponseCommand(std::move(responseCommand));
// bind shared_ptr can save object's life
handle_executor_.submit(std::bind(&ResponseFuture::executeInvokeCallback, responseFuture));
} else {
responseFuture->putResponse(std::move(responseCommand));
}
} else {
LOG_DEBUG_NEW("responseFuture was deleted by timeout of opaque:{}, server:{}", opaque,
channel->getPeerAddrAndPort());
}
}
void TcpRemotingClient::processRequestCommand(std::unique_ptr<RemotingCommand> requestCommand,
TcpTransportPtr channel) {
std::unique_ptr<RemotingCommand> response;
int requestCode = requestCommand->code();
const auto& it = processor_table_.find(requestCode);
if (it != processor_table_.end()) {
try {
auto* processor = it->second;
doBeforeRpcHooks(channel->getPeerAddrAndPort(), *requestCommand, false);
response = processor->processRequest(channel, requestCommand.get());
doAfterRpcHooks(channel->getPeerAddrAndPort(), *response, response.get(), true);
} catch (std::exception& e) {
LOG_ERROR_NEW("process request exception. {}", e.what());
// send SYSTEM_ERROR response
response.reset(new RemotingCommand(SYSTEM_ERROR, e.what()));
}
} else {
// send REQUEST_CODE_NOT_SUPPORTED response
std::string error = "request type " + UtilAll::to_string(requestCommand->code()) + " not supported";
response.reset(new RemotingCommand(REQUEST_CODE_NOT_SUPPORTED, error));
LOG_ERROR_NEW("{}: {}", channel->getPeerAddrAndPort(), error);
}
if (!requestCommand->isOnewayRPC() && response != nullptr) {
response->set_opaque(requestCommand->opaque());
response->markResponseType();
try {
if (!SendCommand(channel, *response)) {
LOG_WARN_NEW("send a response command to channel <{}> failed.", channel->getPeerAddrAndPort());
}
} catch (const std::exception& e) {
LOG_ERROR_NEW("process request over, but response failed. {}", e.what());
}
}
}
void TcpRemotingClient::putResponseFuture(TcpTransportPtr channel, int opaque, ResponseFuturePtr future) {
return static_cast<ResponseFutureInfo*>(channel->getInfo())->putResponseFuture(opaque, future);
}
// NOTE: after call this function, shared_ptr of future_table_[opaque] will
// be erased, so caller must ensure the life cycle of returned shared_ptr
ResponseFuturePtr TcpRemotingClient::popResponseFuture(TcpTransportPtr channel, int opaque) {
return static_cast<ResponseFutureInfo*>(channel->getInfo())->popResponseFuture(opaque);
}
void TcpRemotingClient::registerProcessor(MQRequestCode requestCode, RequestProcessor* requestProcessor) {
// replace
processor_table_[requestCode] = requestProcessor;
}
} // namespace rocketmq