blob: 044fbe5c1b1772ca3d7da7afaa21f7528890c1ea [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "task_spec.h"
#include <string.h>
#include <array>
#include <memory>
#include <thread>
#include "runtime/rpc/rpc_message.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/threadpool_spec.h"
DSN_DECLARE_bool(enable_udp);
namespace dsn {
constexpr int TASK_SPEC_STORE_CAPACITY = 512;
std::set<dsn::task_code> &get_storage_rpc_req_codes()
{
static std::set<dsn::task_code> s_storage_rpc_req_codes;
return s_storage_rpc_req_codes;
}
// A sequential storage maps task_code to task_spec.
static std::array<std::unique_ptr<task_spec>, TASK_SPEC_STORE_CAPACITY> s_task_spec_store;
void task_spec::register_task_code(task_code code,
dsn_task_type_t type,
dsn_task_priority_t pri,
dsn::threadpool_code pool)
{
CHECK_GE(code, 0);
CHECK_LT(code, TASK_SPEC_STORE_CAPACITY);
if (!s_task_spec_store[code]) {
s_task_spec_store[code] =
std::make_unique<task_spec>(code, code.to_string(), type, pri, pool);
auto &spec = s_task_spec_store[code];
if (type == TASK_TYPE_RPC_REQUEST) {
std::string ack_name = std::string(code.to_string()) + std::string("_ACK");
// for a rpc request, we firstly register it's ack code to invalid threadpool,
// then the response code's definition will reassign a proper valid threadpool code.
// please refer to the DEFINE_TASK_CODE_RPC/DEFINE_STORAGE_RPC_CODE in task_code.h
// for more details.
dsn::task_code ack_code(
ack_name.c_str(), TASK_TYPE_RPC_RESPONSE, pri, THREAD_POOL_INVALID);
spec->rpc_paired_code = ack_code;
task_spec::get(ack_code.code())->rpc_paired_code = code;
}
} else {
auto spec = task_spec::get(code);
CHECK_EQ_MSG(
spec->type,
type,
"task code {} registerd for {}, which does not match with previously registered {}",
code,
enum_to_string(type),
enum_to_string(spec->type));
if (spec->priority != pri) {
LOG_WARNING("overwrite priority for task {} from {} to {}",
code,
enum_to_string(spec->priority),
enum_to_string(pri));
spec->priority = pri;
}
if (spec->pool_code != pool) {
if (spec->pool_code != THREAD_POOL_INVALID) {
LOG_WARNING("overwrite default thread pool for task {} from {} to {}",
code,
spec->pool_code,
pool);
}
spec->pool_code = pool;
}
}
}
void task_spec::register_storage_task_code(task_code code,
dsn_task_type_t type,
dsn_task_priority_t pri,
threadpool_code pool,
bool is_write_operation,
bool allow_batch,
bool is_idempotent)
{
register_task_code(code, type, pri, pool);
task_spec *spec = task_spec::get(code);
spec->rpc_request_for_storage = true;
spec->rpc_request_is_write_operation = is_write_operation;
spec->rpc_request_is_write_allow_batch = allow_batch;
spec->rpc_request_is_write_idempotent = is_idempotent;
if (TASK_TYPE_RPC_REQUEST == type) {
get_storage_rpc_req_codes().insert(code);
}
}
task_spec *task_spec::get(int code)
{
CHECK_GE(code, 0);
CHECK_LT(code, TASK_SPEC_STORE_CAPACITY);
return s_task_spec_store[code].get();
}
task_spec::task_spec(int code,
const char *name,
dsn_task_type_t type,
dsn_task_priority_t pri,
dsn::threadpool_code pool)
: code(code),
type(type),
name(name),
rpc_paired_code(TASK_CODE_INVALID),
rpc_request_for_storage(false),
rpc_request_is_write_operation(false),
rpc_request_is_write_allow_batch(false),
rpc_request_is_write_idempotent(false),
priority(pri),
pool_code(pool),
rpc_call_header_format(NET_HDR_DSN),
rpc_call_channel(RPC_CHANNEL_TCP),
rpc_message_crc_required(false),
on_task_create((std::string(name) + std::string(".create")).c_str()),
on_task_enqueue((std::string(name) + std::string(".enqueue")).c_str()),
on_task_begin((std::string(name) + std::string(".begin")).c_str()),
on_task_end((std::string(name) + std::string(".end")).c_str()),
on_task_cancelled((std::string(name) + std::string(".cancelled")).c_str()),
on_task_wait_pre((std::string(name) + std::string(".wait.pre")).c_str()),
on_task_wait_notified((std::string(name) + std::string(".wait.notified")).c_str()),
on_task_wait_post((std::string(name) + std::string(".wait.post")).c_str()),
on_task_cancel_post((std::string(name) + std::string(".cancel.post")).c_str()),
on_aio_call((std::string(name) + std::string(".aio.call")).c_str()),
on_aio_enqueue((std::string(name) + std::string(".aio.enqueue")).c_str()),
on_rpc_call((std::string(name) + std::string(".rpc.call")).c_str()),
on_rpc_request_enqueue((std::string(name) + std::string(".rpc.request.enqueue")).c_str()),
on_rpc_task_dropped((std::string(name) + std::string(".dropped")).c_str()),
on_rpc_reply((std::string(name) + std::string(".rpc.reply")).c_str()),
on_rpc_response_enqueue((std::string(name) + std::string(".rpc.response.enqueue")).c_str()),
on_rpc_create_response((std::string(name) + std::string(".rpc.response.create")).c_str())
{
CHECK_LT_MSG(strlen(name),
DSN_MAX_TASK_CODE_NAME_LENGTH,
"task code name '{}' is too long: length must be smaller than "
"DSN_MAX_TASK_CODE_NAME_LENGTH ({})",
name,
DSN_MAX_TASK_CODE_NAME_LENGTH);
rpc_call_channel = RPC_CHANNEL_TCP;
rpc_timeout_milliseconds = 5 * 1000; // 5 seconds
}
bool task_spec::init()
{
/*
[task..default]
is_trace = false
is_profile = false
[task.RPC_PREPARE]
pool_code = THREAD_POOL_REPLICATION
priority = TASK_PRIORITY_HIGH
is_trace = true
is_profile = true
*/
task_spec default_spec(
0, "placeholder", TASK_TYPE_COMPUTE, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT);
if (!read_config("task..default", default_spec))
return false;
for (int code = 0; code <= dsn::task_code::max(); code++) {
if (code == TASK_CODE_INVALID)
continue;
std::string section_name =
std::string("task.") + std::string(dsn::task_code(code).to_string());
task_spec *spec = task_spec::get(code);
CHECK_NOTNULL(spec, "");
if (!read_config(section_name.c_str(), *spec, &default_spec))
return false;
if (code == TASK_CODE_EXEC_INLINED) {
spec->allow_inline = true;
}
CHECK(spec->rpc_request_delays_milliseconds.size() == 0 ||
spec->rpc_request_delays_milliseconds.size() == 6,
"invalid length of rpc_request_delays_milliseconds, must be of length 6");
if (spec->rpc_request_delays_milliseconds.size() > 0) {
spec->rpc_request_delayer.initialize(spec->rpc_request_delays_milliseconds);
}
if (spec->rpc_request_throttling_mode != TM_NONE) {
if (spec->type != TASK_TYPE_RPC_REQUEST) {
LOG_ERROR("{}: only rpc request type can have non TM_NONE throttling_mode",
spec->name);
return false;
}
}
if (spec->rpc_call_channel == RPC_CHANNEL_UDP && !FLAGS_enable_udp) {
LOG_ERROR("task rpc_call_channel RPC_CHANNEL_UCP need udp service, make sure "
"[network].enable_udp");
return false;
}
}
return true;
}
bool threadpool_spec::init(/*out*/ std::vector<threadpool_spec> &specs)
{
/*
[threadpool..default]
worker_count = 4
worker_priority = THREAD_xPRIORITY_NORMAL
partitioned = false
queue_aspects = xxx
worker_aspects = xxx
[threadpool.THREAD_POOL_REPLICATION]
name = Thr.replication
run = true
worker_count = 4
worker_priority = THREAD_xPRIORITY_NORMAL
partitioned = false
queue_aspects = xxx
worker_aspects = xxx
*/
threadpool_spec default_spec(THREAD_POOL_INVALID);
if (false == read_config("threadpool..default", default_spec, nullptr))
return false;
default_spec.name = "";
specs.clear();
for (int code = 0; code <= threadpool_code::max(); code++) {
std::string code_name = std::string(threadpool_code(code).to_string());
std::string section_name = std::string("threadpool.") + code_name;
threadpool_spec spec(default_spec);
if (false == read_config(section_name.c_str(), spec, &default_spec))
return false;
spec.pool_code = threadpool_code(code);
if ("" == spec.name)
spec.name = code_name;
if (false == spec.worker_share_core && 0 == spec.worker_affinity_mask) {
spec.worker_affinity_mask = (1 << std::thread::hardware_concurrency()) - 1;
}
specs.push_back(spec);
}
return true;
}
} // end namespace