blob: 1b8baa7a1838e051e276e372313d101ab5647fad [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "client/partition_resolver.h"
// IWYU pragma: no_include <type_traits>
#include "partition_resolver_manager.h"
#include "runtime/api_layer1.h"
#include "runtime/api_task.h"
#include "runtime/task/task_spec.h"
#include "utils/fmt_logging.h"
#include "utils/threadpool_code.h"
namespace dsn {
namespace replication {
/*static*/
partition_resolver_ptr partition_resolver::get_resolver(const char *cluster_name,
const std::vector<rpc_address> &meta_list,
const char *app_name)
{
return partition_resolver_manager::instance().find_or_create(cluster_name, meta_list, app_name);
}
DEFINE_TASK_CODE(LPC_RPC_DELAY_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
static inline bool error_retry(error_code err)
{
return (err != ERR_HANDLER_NOT_FOUND && err != ERR_APP_NOT_EXIST &&
err != ERR_OPERATION_DISABLED && err != ERR_BUSY && err != ERR_SPLITTING &&
err != ERR_DISK_INSUFFICIENT);
}
void partition_resolver::call_task(const rpc_response_task_ptr &t)
{
auto &hdr = *(t->get_request()->header);
uint64_t deadline_ms = dsn_now_ms() + hdr.client.timeout_ms;
rpc_response_handler old_callback;
t->fetch_current_handler(old_callback);
auto new_callback = [ this, deadline_ms, oc = std::move(old_callback) ](
dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (req->header->gpid.value() != 0 && err != ERR_OK && error_retry(err)) {
on_access_failure(req->header->gpid.get_partition_index(), err);
// still got time, retry
uint64_t nms = dsn_now_ms();
uint64_t gap = 8 << req->send_retry_count;
if (gap > 1000)
gap = 1000;
if (nms + gap < deadline_ms) {
req->send_retry_count++;
req->header->client.timeout_ms = static_cast<int>(deadline_ms - nms - gap);
rpc_response_task_ptr ctask =
dynamic_cast<rpc_response_task *>(task::get_current_task());
partition_resolver_ptr r(this);
CHECK_NOTNULL(ctask, "current task must be rpc_response_task");
ctask->replace_callback(std::move(oc));
CHECK(ctask->set_retry(false),
"rpc_response_task set retry failed, state = {}",
enum_to_string(ctask->state()));
// sleep gap milliseconds before retry
tasking::enqueue(LPC_RPC_DELAY_CALL,
nullptr,
[r, ctask]() { r->call_task(ctask); },
0,
std::chrono::milliseconds(gap));
return;
} else {
LOG_ERROR("service access failed ({}), no more time for further tries, set error "
"= ERR_TIMEOUT, trace_id = {:#018x}",
err,
req->header->trace_id);
err = ERR_TIMEOUT;
}
}
if (oc)
oc(err, req, resp);
};
t->replace_callback(std::move(new_callback));
resolve(hdr.client.partition_hash,
[t](resolve_result &&result) mutable {
if (result.err != ERR_OK) {
t->enqueue(result.err, nullptr);
return;
}
// update gpid when necessary
auto &hdr = *(t->get_request()->header);
if (hdr.gpid.value() != result.pid.value()) {
if (hdr.client.thread_hash == 0 // thread_hash is not assigned by applications
||
hdr.gpid.value() != 0 // requests set to child redirect to parent
) {
hdr.client.thread_hash = result.pid.thread_hash();
}
hdr.gpid = result.pid;
}
dsn_rpc_call(result.address, t.get());
},
hdr.client.timeout_ms);
}
} // namespace replication
} // namespace dsn