blob: 97394d7f3214d834bd1dc57216823466e11b6049 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "rpc_engine.h"
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <string.h>
#include <limits>
#include <list>
#include <set>
#include "common/gpid.h"
#include "runtime/api_layer1.h"
#include "runtime/global_config.h"
#include "runtime/rpc/group_address.h"
#include "runtime/rpc/network.h"
#include "runtime/rpc/serialization.h"
#include "runtime/service_engine.h"
#include "utils/customizable_id.h"
#include "utils/factory_store.h"
#include "utils/flags.h"
#include "utils/join_point.h"
#include "utils/link.h"
#include "utils/rand.h"
#include "utils/threadpool_code.h"
namespace dsn {
DSN_DECLARE_uint32(local_hash);
DEFINE_TASK_CODE(LPC_RPC_TIMEOUT, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
class rpc_timeout_task : public task
{
public:
rpc_timeout_task(rpc_client_matcher *matcher, uint64_t id, service_node *node)
: task(LPC_RPC_TIMEOUT, 0, node)
{
_matcher = matcher;
_id = id;
}
virtual void exec() { _matcher->on_rpc_timeout(_id); }
private:
// use the following if the matcher is per rpc session
// rpc_client_matcher_ptr _matcher;
rpc_client_matcher *_matcher;
uint64_t _id;
};
rpc_client_matcher::~rpc_client_matcher()
{
for (int i = 0; i < MATCHER_BUCKET_NR; i++) {
CHECK_EQ_MSG(
_requests[i].size(), 0, "all rpc entries must be removed before the matcher ends");
}
}
bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *reply, int delay_ms)
{
rpc_response_task_ptr call;
task_ptr timeout_task;
int bucket_index = key % MATCHER_BUCKET_NR;
{
utils::auto_lock<::dsn::utils::ex_lock_nr_spin> l(_requests_lock[bucket_index]);
auto it = _requests[bucket_index].find(key);
if (it != _requests[bucket_index].end()) {
call = std::move(it->second.resp_task);
timeout_task = std::move(it->second.timeout_task);
_requests[bucket_index].erase(it);
} else {
if (reply) {
CHECK_EQ_MSG(
reply->get_count(), 0, "reply should not be referenced by anybody so far");
delete reply;
}
return false;
}
}
DCHECK_NOTNULL(call, "rpc response task cannot be nullptr");
DCHECK_NOTNULL(timeout_task, "rpc timeout task cannot be nullptr");
if (timeout_task != task::get_current_task()) {
timeout_task->cancel(false); // no need to wait
}
auto req = call->get_request();
auto spec = task_spec::get(req->local_rpc_code);
// if rpc is early terminated with empty reply
if (nullptr == reply) {
if (req->server_address.type() == HOST_TYPE_GROUP && spec->grpc_mode == GRPC_TO_LEADER &&
req->server_address.group_address()->is_update_leader_automatically()) {
req->server_address.group_address()->leader_forward();
}
call->set_delay(delay_ms);
call->enqueue(ERR_NETWORK_FAILURE, reply);
return true;
}
// normal reply
auto err = reply->error();
// if this is pure client (no server port assigned), we can only do fake forwarding,
// in this case, the server will return ERR_FORWARD_TO_OTHERS
if (err == ERR_FORWARD_TO_OTHERS) {
rpc_address addr;
::dsn::unmarshall((dsn::message_ex *)reply, addr);
// handle the case of forwarding to itself where addr == req->to_address.
DCHECK_NE_MSG(
addr,
req->to_address,
"impossible forwarding to myself as this only happens when i'm pure client so "
"i don't get a named to_address {}",
addr);
// server address side effect
switch (req->server_address.type()) {
case HOST_TYPE_GROUP:
switch (spec->grpc_mode) {
case GRPC_TO_LEADER:
if (req->server_address.group_address()->is_update_leader_automatically()) {
req->server_address.group_address()->set_leader(addr);
}
break;
default:
break;
}
break;
default:
CHECK(false, "not implemented");
break;
}
// do fake forwarding, reset request_id
// TODO(qinzuoyan): reset timeout to new value
_engine->call_ip(addr, req, call, true);
CHECK_EQ_MSG(reply->get_count(), 0, "reply should not be referenced by anybody so far");
delete reply;
} else {
// server address side effect
if (reply->header->context.u.is_forwarded) {
switch (req->server_address.type()) {
case HOST_TYPE_GROUP:
switch (spec->grpc_mode) {
case GRPC_TO_LEADER:
if (err == ERR_OK &&
req->server_address.group_address()->is_update_leader_automatically()) {
req->server_address.group_address()->set_leader(
reply->header->from_address);
}
break;
default:
break;
}
break;
default:
CHECK(false, "not implemented");
break;
}
}
call->set_delay(delay_ms);
// failure injection applied
if (!call->enqueue(err, reply)) {
LOG_INFO("rpc reply {} is dropped (fault inject), trace_id = {:#018x}",
reply->header->rpc_name,
reply->header->trace_id);
// call network failure model
net->inject_drop_message(reply, false);
}
}
return true;
}
void rpc_client_matcher::on_rpc_timeout(uint64_t key)
{
rpc_response_task_ptr call;
int bucket_index = key % MATCHER_BUCKET_NR;
uint64_t timeout_ts_ms;
bool resend = false;
{
utils::auto_lock<::dsn::utils::ex_lock_nr_spin> l(_requests_lock[bucket_index]);
auto it = _requests[bucket_index].find(key);
if (it != _requests[bucket_index].end()) {
timeout_ts_ms = it->second.timeout_ts_ms;
call = it->second.resp_task;
if (timeout_ts_ms == 0) {
_requests[bucket_index].erase(it);
}
// resend is enabled
else {
// do it in next check so we can do expensive things
// outside of the lock
resend = true;
}
} else {
return;
}
}
DCHECK_NOTNULL(call, "rpc response task is missing for rpc request {}", key);
// if timeout
if (!resend) {
call->enqueue(ERR_TIMEOUT, nullptr);
return;
}
// prepare resend context and check again
uint64_t now_ts_ms = dsn_now_ms();
// resend when timeout is not yet, and the call is not cancelled
// TODO: time overflow
resend = (now_ts_ms < timeout_ts_ms && call->state() == TASK_STATE_READY);
// TODO: memory pool for this task
task_ptr new_timeout_task;
if (resend) {
new_timeout_task = new rpc_timeout_task(this, key, call->node());
}
{
utils::auto_lock<::dsn::utils::ex_lock_nr_spin> l(_requests_lock[bucket_index]);
auto it = _requests[bucket_index].find(key);
if (it != _requests[bucket_index].end()) {
// timeout
if (!resend) {
_requests[bucket_index].erase(it);
}
// resend
else {
// reset timeout task
it->second.timeout_task = new_timeout_task;
}
}
// response is received
else {
resend = false;
}
}
if (resend) {
auto req = call->get_request();
LOG_DEBUG("resend request message for rpc trace_id = {:#018x}, key = {}",
req->header->trace_id,
key);
// resend without handling rpc_matcher, use the same request_id
_engine->call_ip(req->to_address, req, nullptr);
// use rest of the timeout to resend once only
new_timeout_task->set_delay(static_cast<int>(timeout_ts_ms - now_ts_ms));
new_timeout_task->enqueue();
}
}
void rpc_client_matcher::on_call(message_ex *request, const rpc_response_task_ptr &call)
{
message_header &hdr = *request->header;
int bucket_index = hdr.id % MATCHER_BUCKET_NR;
auto sp = task_spec::get(request->local_rpc_code);
int timeout_ms = hdr.client.timeout_ms;
uint64_t timeout_ts_ms = 0;
// reset timeout when resend is enabled
if (sp->rpc_request_resend_timeout_milliseconds > 0 &&
timeout_ms > sp->rpc_request_resend_timeout_milliseconds) {
timeout_ts_ms = dsn_now_ms() + timeout_ms; // non-zero for resend
timeout_ms = sp->rpc_request_resend_timeout_milliseconds;
}
DCHECK_NOTNULL(call, "rpc response task cannot be nullptr");
task *timeout_task(new rpc_timeout_task(this, hdr.id, call->node()));
{
utils::auto_lock<::dsn::utils::ex_lock_nr_spin> l(_requests_lock[bucket_index]);
auto pr =
_requests[bucket_index].emplace(hdr.id, match_entry{call, timeout_task, timeout_ts_ms});
CHECK(pr.second, "the message is already on the fly!!!");
}
timeout_task->set_delay(timeout_ms);
timeout_task->enqueue();
}
//----------------------------------------------------------------------------------------------
rpc_server_dispatcher::rpc_server_dispatcher()
{
_vhandlers.resize(dsn::task_code::max() + 1);
for (auto &h : _vhandlers) {
h = new std::pair<std::unique_ptr<handler_entry>, utils::rw_lock_nr>();
}
_handlers.clear();
}
rpc_server_dispatcher::~rpc_server_dispatcher()
{
for (auto &h : _vhandlers) {
delete h;
}
_vhandlers.clear();
_handlers.clear();
CHECK_EQ_MSG(
_handlers.size(), 0, "please make sure all rpc handlers are unregistered at this point");
}
bool rpc_server_dispatcher::register_rpc_handler(dsn::task_code code,
const char *extra_name,
const rpc_request_handler &h)
{
std::unique_ptr<handler_entry> ctx(new handler_entry{code, extra_name, h});
utils::auto_write_lock l(_handlers_lock);
auto it = _handlers.find(code.to_string());
auto it2 = _handlers.find(extra_name);
CHECK(it == _handlers.end() && it2 == _handlers.end(),
"rpc registration confliction for '{}' '{}'",
code,
extra_name);
_handlers[code.to_string()] = ctx.get();
_handlers[ctx->extra_name] = ctx.get();
{
utils::auto_write_lock l(_vhandlers[code.code()]->second);
_vhandlers[code.code()]->first = std::move(ctx);
}
// TODO(yingchun): should return void
return true;
}
bool rpc_server_dispatcher::unregister_rpc_handler(dsn::task_code rpc_code)
{
{
utils::auto_write_lock l(_handlers_lock);
auto it = _handlers.find(rpc_code.to_string());
if (it == _handlers.end())
return false;
handler_entry *ctx = it->second;
_handlers.erase(it);
_handlers.erase(ctx->extra_name);
{
utils::auto_write_lock l(_vhandlers[rpc_code]->second);
_vhandlers[rpc_code]->first.reset();
}
}
return true;
}
rpc_request_task *rpc_server_dispatcher::on_request(message_ex *msg, service_node *node)
{
rpc_request_handler handler;
if (TASK_CODE_INVALID != msg->local_rpc_code) {
utils::auto_read_lock l(_vhandlers[msg->local_rpc_code]->second);
handler_entry *ctx = _vhandlers[msg->local_rpc_code]->first.get();
if (ctx != nullptr) {
handler = ctx->h;
}
} else {
utils::auto_read_lock l(_handlers_lock);
auto it = _handlers.find(msg->header->rpc_name);
if (it != _handlers.end()) {
msg->local_rpc_code = it->second->code;
handler = it->second->h;
}
}
if (handler) {
auto r = new rpc_request_task(msg, std::move(handler), node);
r->spec().on_task_create.execute(task::get_current_task(), r);
return r;
} else
return nullptr;
}
//----------------------------------------------------------------------------------------------
rpc_engine::rpc_engine(service_node *node) : _node(node), _rpc_matcher(this)
{
CHECK_NOTNULL(_node, "");
_is_running = false;
_is_serving = false;
}
//
// management routines
//
network *rpc_engine::create_network(const network_server_config &netcs,
bool client_only,
network_header_format client_hdr_format)
{
network *net = utils::factory_store<network>::create(
netcs.factory_name.c_str(), ::dsn::PROVIDER_TYPE_MAIN, this, nullptr);
net->reset_parser_attr(client_hdr_format, netcs.message_buffer_block_size);
// start the net
// If check failed, means mem leaked, don't care as it halts the program
CHECK_EQ_MSG(
net->start(netcs.channel, netcs.port, client_only), ERR_OK, "create network failed");
return net;
}
error_code rpc_engine::start(const service_app_spec &aspec)
{
if (_is_running) {
return ERR_SERVICE_ALREADY_RUNNING;
}
// start client networks
_client_nets.resize(network_header_format::max_value() + 1);
// for each format
for (int i = NET_HDR_INVALID + 1; i <= network_header_format::max_value(); i++) {
std::vector<std::unique_ptr<network>> &pnet = _client_nets[i];
pnet.resize(rpc_channel::max_value() + 1);
auto client_hdr_format = network_header_format(network_header_format::to_string(i));
// for each channel
for (int j = 0; j <= rpc_channel::max_value(); j++) {
rpc_channel c = rpc_channel(rpc_channel::to_string(j));
std::string factory;
int blk_size;
auto it1 = aspec.network_client_confs.find(c);
if (it1 != aspec.network_client_confs.end()) {
factory = it1->second.factory_name;
blk_size = it1->second.message_buffer_block_size;
} else {
LOG_WARNING(
"network client for channel {} not registered, assuming not used further", c);
continue;
}
network_server_config cs(aspec.id, c);
cs.factory_name = factory;
cs.message_buffer_block_size = blk_size;
auto net = create_network(cs, true, client_hdr_format);
if (!net)
return ERR_NETWORK_INIT_FAILED;
pnet[j].reset(net);
LOG_INFO("[{}] network client started at port {}, channel = {}, fmt = {} ...",
node()->full_name(),
cs.port,
cs.channel,
client_hdr_format);
}
}
// start server networks
for (auto &sp : aspec.network_server_confs) {
int port = sp.second.port;
std::vector<std::unique_ptr<network>> *pnets;
auto it = _server_nets.find(port);
if (it == _server_nets.end()) {
auto pr = _server_nets.emplace(port, std::vector<std::unique_ptr<network>>{});
pnets = &pr.first->second;
pnets->resize(rpc_channel::max_value() + 1);
} else {
pnets = &it->second;
}
auto net = create_network(sp.second, false, NET_HDR_DSN);
if (net == nullptr) {
return ERR_NETWORK_INIT_FAILED;
}
(*pnets)[sp.second.channel].reset(net);
LOG_WARNING("[{}] network server started at port {}, channel = {}, ...",
node()->full_name(),
port,
sp.second.channel);
}
_local_primary_address = _client_nets[NET_HDR_DSN][0]->address();
_local_primary_address.set_port(aspec.ports.size() > 0 ? *aspec.ports.begin() : aspec.id);
LOG_INFO("=== service_node=[{}], primary_address=[{}] ===",
_node->full_name(),
_local_primary_address);
_is_running = true;
return ERR_OK;
}
bool rpc_engine::register_rpc_handler(dsn::task_code code,
const char *extra_name,
const rpc_request_handler &h)
{
return _rpc_dispatcher.register_rpc_handler(code, extra_name, h);
}
bool rpc_engine::unregister_rpc_handler(dsn::task_code rpc_code)
{
return _rpc_dispatcher.unregister_rpc_handler(rpc_code);
}
void rpc_engine::on_recv_request(network *net, message_ex *msg, int delay_ms)
{
if (!_is_serving) {
LOG_WARNING(
"recv message with rpc name {} from {} when rpc engine is not serving, trace_id = {}",
msg->header->rpc_name,
msg->header->from_address,
msg->header->trace_id);
CHECK_EQ_MSG(msg->get_count(), 0, "request should not be referenced by anybody so far");
delete msg;
return;
}
auto code = msg->rpc_code();
if (code != ::dsn::TASK_CODE_INVALID) {
rpc_request_task *tsk = nullptr;
// handle replication
if (msg->header->gpid.get_app_id() > 0) {
tsk = _node->generate_intercepted_request_task(msg);
}
if (tsk == nullptr) {
tsk = _rpc_dispatcher.on_request(msg, _node);
}
if (tsk != nullptr) {
// injector
if (tsk->spec().on_rpc_request_enqueue.execute(tsk, true)) {
// we set a default delay if it isn't generated by fault-injector
if (tsk->delay_milliseconds() == 0)
tsk->set_delay(delay_ms);
tsk->enqueue();
}
// release the task when necessary
else {
LOG_INFO("rpc request {} is dropped (fault inject), trace_id = {:#018x}",
msg->header->rpc_name,
msg->header->trace_id);
// call network failure model when network is present
net->inject_drop_message(msg, false);
// because (1) initially, the ref count is zero
// (2) upper apps may call add_ref already
tsk->add_ref();
tsk->release_ref();
}
} else {
LOG_WARNING("recv message with unhandled rpc name {} from {}, trace_id = {:#018x}",
msg->header->rpc_name,
msg->header->from_address,
msg->header->trace_id);
CHECK_EQ_MSG(msg->get_count(), 0, "request should not be referenced by anybody so far");
msg->add_ref();
dsn_rpc_reply(msg->create_response(), ::dsn::ERR_HANDLER_NOT_FOUND);
msg->release_ref();
}
} else {
LOG_WARNING("recv message with unknown rpc name {} from {}, trace_id = {:#018x}",
msg->header->rpc_name,
msg->header->from_address,
msg->header->trace_id);
CHECK_EQ_MSG(msg->get_count(), 0, "request should not be referenced by anybody so far");
msg->add_ref();
dsn_rpc_reply(msg->create_response(), ::dsn::ERR_HANDLER_NOT_FOUND);
msg->release_ref();
}
}
void rpc_engine::call(message_ex *request, const rpc_response_task_ptr &call)
{
auto &hdr = *request->header;
hdr.from_address = primary_address();
hdr.trace_id = rand::next_u64(std::numeric_limits<decltype(hdr.trace_id)>::min(),
std::numeric_limits<decltype(hdr.trace_id)>::max());
call_address(request->server_address, request, call);
}
void rpc_engine::call_group(rpc_address addr,
message_ex *request,
const rpc_response_task_ptr &call)
{
DCHECK_EQ_MSG(addr.type(), HOST_TYPE_GROUP, "only group is now supported");
auto sp = task_spec::get(request->local_rpc_code);
switch (sp->grpc_mode) {
case GRPC_TO_LEADER:
call_ip(request->server_address.group_address()->possible_leader(), request, call);
break;
case GRPC_TO_ANY:
// TODO: performance optimization
call_ip(request->server_address.group_address()->random_member(), request, call);
break;
case GRPC_TO_ALL:
CHECK(false, "to be implemented");
break;
default:
CHECK(false, "invalid group rpc mode {}", sp->grpc_mode);
}
}
void rpc_engine::call_ip(rpc_address addr,
message_ex *request,
const rpc_response_task_ptr &call,
bool reset_request_id,
bool set_forwarded)
{
DCHECK_EQ_MSG(addr.type(), HOST_TYPE_IPV4, "only IPV4 is now supported");
DCHECK_GT_MSG(addr.port(), MAX_CLIENT_PORT, "only server address can be called");
CHECK(!request->header->from_address.is_invalid(),
"from address must be set before call call_ip");
while (!request->dl.is_alone()) {
LOG_WARNING("msg request {} (trace_id = {:#018x}) is in sending queue, try to pick out ...",
request->header->rpc_name,
request->header->trace_id);
auto s = request->io_session;
if (s.get() != nullptr) {
s->cancel(request);
}
}
request->to_address = addr;
auto sp = task_spec::get(request->local_rpc_code);
auto &hdr = *request->header;
network *net = _client_nets[request->hdr_format][sp->rpc_call_channel].get();
CHECK_NOTNULL(net,
"network not present for rpc channel '{}' with format '{}' used by rpc {}",
sp->rpc_call_channel,
sp->rpc_call_header_format,
hdr.rpc_name);
LOG_DEBUG("rpc_name = {}, remote_addr = {}, header_format = {}, channel = {}, seq_id = {}, "
"trace_id = {:#018x}",
hdr.rpc_name,
addr,
request->hdr_format,
sp->rpc_call_channel,
hdr.id,
hdr.trace_id);
if (reset_request_id) {
hdr.id = message_ex::new_id();
}
if (set_forwarded && request->header->context.u.is_forwarded == false) {
request->header->context.u.is_forwarded = true;
}
// join point and possible fault injection
if (!sp->on_rpc_call.execute(task::get_current_task(), request, call, true)) {
LOG_INFO("rpc request {} is dropped (fault inject), trace_id = {:#018x}",
request->header->rpc_name,
request->header->trace_id);
// call network failure model
net->inject_drop_message(request, true);
if (call != nullptr) {
call->set_delay(hdr.client.timeout_ms);
call->enqueue(ERR_TIMEOUT, nullptr);
} else {
// as ref_count for request may be zero
request->add_ref();
request->release_ref();
}
return;
}
if (call != nullptr) {
_rpc_matcher.on_call(request, call);
}
net->send_message(request);
}
void rpc_engine::reply(message_ex *response, error_code err)
{
// when a message doesn't need to reply, we don't do the on_rpc_reply hooks to avoid mistakes
// for example, the profiler may be mistakenly calculated
auto s = response->io_session.get();
if (s == nullptr && response->to_address.is_invalid()) {
LOG_DEBUG("rpc reply {} is dropped (invalid to-address), trace_id = {:#018x}",
response->header->rpc_name,
response->header->trace_id);
response->add_ref();
response->release_ref();
return;
}
strncpy(response->header->server.error_name,
err.to_string(),
sizeof(response->header->server.error_name) - 1);
response->header->server.error_name[sizeof(response->header->server.error_name) - 1] = '\0';
response->header->server.error_code.local_code = err;
response->header->server.error_code.local_hash = FLAGS_local_hash;
// response rpc code may be TASK_CODE_INVALID when request rpc code is not exist
auto sp = response->local_rpc_code == TASK_CODE_INVALID
? nullptr
: task_spec::get(response->local_rpc_code);
bool no_fail = true;
if (sp) {
// current task may be nullptr when this method is directly invoked from rpc_engine.
task *cur_task = task::get_current_task();
if (cur_task) {
no_fail = sp->on_rpc_reply.execute(cur_task, response, true);
}
}
// connection oriented network, we have bound session
if (s != nullptr) {
// not forwarded, we can use the original rpc session
if (!response->header->context.u.is_forwarded) {
if (no_fail) {
s->send_message(response);
} else {
s->net().inject_drop_message(response, true);
}
}
// request is forwarded, we cannot use the original rpc session,
// so use client session to send response.
else {
DCHECK_GT_MSG(response->to_address.port(),
MAX_CLIENT_PORT,
"target address must have named port in this case");
// use the header format recorded in the message
auto rpc_channel = sp ? sp->rpc_call_channel : RPC_CHANNEL_TCP;
network *net = _client_nets[response->hdr_format][rpc_channel].get();
CHECK_NOTNULL(
net,
"client network not present for rpc channel '{}' with format '{}' used by rpc {}",
RPC_CHANNEL_TCP,
response->hdr_format,
response->header->rpc_name);
if (no_fail) {
net->send_message(response);
} else {
net->inject_drop_message(response, true);
}
}
}
// not connection oriented network, we always use the named network to send msgs
else {
DCHECK_GT_MSG(response->to_address.port(),
MAX_CLIENT_PORT,
"target address must have named port in this case");
auto rpc_channel = sp ? sp->rpc_call_channel : RPC_CHANNEL_TCP;
network *net = _server_nets[response->header->from_address.port()][rpc_channel].get();
CHECK_NOTNULL(net,
"server network not present for rpc channel '{}' on port {} used by rpc {}",
RPC_CHANNEL_UDP,
response->header->from_address.port(),
response->header->rpc_name);
if (no_fail) {
net->send_message(response);
} else {
net->inject_drop_message(response, true);
}
}
if (!no_fail) {
// because (1) initially, the ref count is zero
// (2) upper apps may call add_ref already
response->add_ref();
response->release_ref();
}
}
void rpc_engine::forward(message_ex *request, rpc_address address)
{
CHECK(request->header->context.u.is_request, "only rpc request can be forwarded");
CHECK(request->header->context.u.is_forward_supported,
"rpc msg {} (trace_id = {:#018x}) does not support being forwared",
task_spec::get(request->local_rpc_code)->name,
request->header->trace_id);
CHECK_NE_MSG(address,
primary_address(),
"cannot forward msg {} (trace_id = {:#018x}) to the local node",
task_spec::get(request->local_rpc_code)->name,
request->header->trace_id);
// msg is from pure client (no server port assigned)
// in this case, we have no way to directly post a message
// to it but reusing the current server connection
// we therefore cannot really do the forwarding but fake it
if (request->header->from_address.port() <= MAX_CLIENT_PORT) {
auto resp = request->create_response();
::dsn::marshall(resp, address);
::dsn::task::get_current_rpc()->reply(resp, ::dsn::ERR_FORWARD_TO_OTHERS);
}
// do real forwarding, not reset request_id, but set forwarded flag
// if forwarding failed for non-timeout reason (such as connection denied),
// we will consider this as msg lost from the client side's perspective as
else {
auto copied_request = request->copy_and_prepare_send(false);
call_ip(address, copied_request, nullptr, false, true);
}
}
} // namespace dsn