blob: 50b7e5e2892f5b91b896d9b2616fa35afa8e0df9 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <functional>
#include <list>
#include <utility>
#include "runtime/api_layer1.h"
#include "runtime/api_task.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/service_engine.h"
#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_engine.h"
#include "runtime/task/task_spec.h"
#include "runtime/task/task_worker.h"
#include "utils/error_code.h"
#include "utils/fmt_logging.h"
#include "utils/join_point.h"
#include "utils/threadpool_code.h"
namespace dsn {
rpc_request_task::rpc_request_task(message_ex *request, rpc_request_handler &&h, service_node *node)
: task(request->rpc_code(), request->header->client.thread_hash, node),
_request(request),
_handler(std::move(h)),
_enqueue_ts_ns(0)
{
DCHECK_EQ_MSG(
TASK_TYPE_RPC_REQUEST,
spec().type,
"{} is not a RPC_REQUEST task, please use DEFINE_TASK_CODE_RPC to define the task code",
spec().name);
_request->add_ref(); // released in dctor
}
rpc_request_task::~rpc_request_task()
{
_request->release_ref(); // added in ctor
}
void rpc_request_task::enqueue()
{
if (spec().rpc_request_dropped_before_execution_when_timeout) {
_enqueue_ts_ns = dsn_now_ns();
}
task::enqueue(node()->computation()->get_pool(spec().pool_code));
}
rpc_response_task::rpc_response_task(message_ex *request,
const rpc_response_handler &cb,
int hash,
service_node *node)
: rpc_response_task(request, rpc_response_handler(cb), hash, node)
{
}
rpc_response_task::rpc_response_task(message_ex *request,
rpc_response_handler &&cb,
int hash,
service_node *node)
: task(task_spec::get(request->local_rpc_code)->rpc_paired_code,
hash == 0 ? request->header->client.thread_hash : hash,
node),
_cb(std::move(cb))
{
_is_null = (_cb == nullptr);
set_error_code(ERR_IO_PENDING);
DCHECK_EQ_MSG(TASK_TYPE_RPC_RESPONSE,
spec().type,
"{} is not of RPC_RESPONSE type, please use DEFINE_TASK_CODE_RPC to define the "
"request task code",
spec().name);
_request = request;
_response = nullptr;
_caller_pool = get_current_worker() ? get_current_worker()->pool() : nullptr;
_request->add_ref(); // released in dctor
}
rpc_response_task::~rpc_response_task()
{
_request->release_ref(); // added in ctor
if (_response != nullptr)
_response->release_ref(); // added in enqueue
}
bool rpc_response_task::enqueue(error_code err, message_ex *reply)
{
set_error_code(err);
if (_response != nullptr)
_response->release_ref(); // added in previous enqueue
_response = reply;
if (nullptr != reply) {
reply->add_ref(); // released in dctor
}
bool ret = true;
if (!spec().on_rpc_response_enqueue.execute(this, true)) {
set_error_code(ERR_NETWORK_FAILURE);
ret = false;
}
rpc_response_task::enqueue();
return ret;
}
void rpc_response_task::enqueue()
{
if (_caller_pool)
task::enqueue(_caller_pool);
// possible when it is called in non-rDSN threads
else {
auto pool = node()->computation()->get_pool(spec().pool_code);
task::enqueue(pool);
}
}
} // namespace dsn