blob: e85d88ed5fedc4d106ca38cf68514ae6edcd3be4 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "runtime/tracer.h"
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <inttypes.h>
#include <stdio.h>
#include <mutex>
#include <string>
#include <vector>
#include "aio/aio_task.h"
#include "fmt/core.h"
#include "fmt/format.h"
#include "runtime/global_config.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_spec.h"
#include "utils/command_manager.h"
#include "utils/config_api.h"
#include "utils/enum_helper.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/join_point.h"
DSN_DEFINE_bool(task..default, is_trace, false, "Whether to trace task");
namespace dsn {
namespace tools {
static void tracer_on_task_create(task *caller, task *callee)
{
dsn_task_type_t type = callee->spec().type;
if (TASK_TYPE_RPC_REQUEST == type) {
rpc_request_task *tsk = (rpc_request_task *)callee;
LOG_INFO("{} CREATE, task_id = {:#018x}, type = {}, rpc_name = {}, trace_id = {:#018x}",
callee->spec().name,
callee->id(),
enum_to_string(type),
tsk->get_request()->header->rpc_name,
tsk->get_request()->header->trace_id);
} else if (TASK_TYPE_RPC_RESPONSE == type) {
rpc_response_task *tsk = (rpc_response_task *)callee;
LOG_INFO("{} CREATE, task_id = {:#018x}, type = {}, rpc_name = {}, trace_id = {:#018x}",
callee->spec().name,
callee->id(),
enum_to_string(type),
tsk->get_request()->header->rpc_name,
tsk->get_request()->header->trace_id);
} else {
LOG_INFO("{} CREATE, task_id = {:#018x}, type = {}",
callee->spec().name,
callee->id(),
enum_to_string(type));
}
}
static void tracer_on_task_enqueue(task *caller, task *callee)
{
LOG_INFO("{} ENQUEUE, task_id = {:#018x}, delay = {} ms, queue size = {}",
callee->spec().name,
callee->id(),
callee->delay_milliseconds(),
tls_dsn.last_worker_queue_size);
}
static void tracer_on_task_begin(task *this_)
{
switch (this_->spec().type) {
case dsn_task_type_t::TASK_TYPE_COMPUTE:
case dsn_task_type_t::TASK_TYPE_AIO:
LOG_INFO("{} EXEC BEGIN, task_id = {:#018x}", this_->spec().name, this_->id());
break;
case dsn_task_type_t::TASK_TYPE_RPC_REQUEST: {
rpc_request_task *tsk = (rpc_request_task *)this_;
LOG_INFO("{} EXEC BEGIN, task_id = {:#018x}, {} => {}, trace_id = {:#018x}",
this_->spec().name,
this_->id(),
tsk->get_request()->header->from_address,
tsk->get_request()->to_address,
tsk->get_request()->header->trace_id);
} break;
case dsn_task_type_t::TASK_TYPE_RPC_RESPONSE: {
rpc_response_task *tsk = (rpc_response_task *)this_;
LOG_INFO("{} EXEC BEGIN, task_id = {:#018x}, {} => {}, trace_id = {:#018x}",
this_->spec().name,
this_->id(),
tsk->get_request()->to_address,
tsk->get_request()->header->from_address,
tsk->get_request()->header->trace_id);
} break;
default:
break;
}
}
static void tracer_on_task_end(task *this_)
{
LOG_INFO("{} EXEC END, task_id = {:#018x}, err = {}",
this_->spec().name,
this_->id(),
this_->error());
}
static void tracer_on_task_cancelled(task *this_)
{
LOG_INFO("{} CANCELLED, task_id = {:#018x}", this_->spec().name, this_->id());
}
static void tracer_on_task_wait_pre(task *caller, task *callee, uint32_t timeout_ms) {}
static void tracer_on_task_wait_post(task *caller, task *callee, bool succ) {}
static void tracer_on_task_cancel_post(task *caller, task *callee, bool succ) {}
// return true means continue, otherwise early terminate with task::set_error_code
static void tracer_on_aio_call(task *caller, aio_task *callee)
{
LOG_INFO("{} AIO.CALL, task_id = {:#018x}, offset = {}, size = {}",
callee->spec().name,
callee->id(),
callee->get_aio_context()->file_offset,
callee->get_aio_context()->buffer_size);
}
static void tracer_on_aio_enqueue(aio_task *this_)
{
LOG_INFO("{} AIO.ENQUEUE, task_id = {:#018x}, queue size = {}",
this_->spec().name,
this_->id(),
tls_dsn.last_worker_queue_size);
}
// return true means continue, otherwise early terminate with task::set_error_code
static void tracer_on_rpc_call(task *caller, message_ex *req, rpc_response_task *callee)
{
message_header &hdr = *req->header;
LOG_INFO(
"{} RPC.CALL: {} => {}, trace_id = {:#018x}, callback_task = {:#018x}, timeout = {} ms",
hdr.rpc_name,
req->header->from_address,
req->to_address,
hdr.trace_id,
callee ? callee->id() : 0,
hdr.client.timeout_ms);
}
static void tracer_on_rpc_request_enqueue(rpc_request_task *callee)
{
LOG_INFO("{} RPC.REQUEST.ENQUEUE ({}), task_id = {:#018x}, {} => {}, trace_id = {:#018x}, "
"queue size = {}",
callee->spec().name,
fmt::ptr(callee),
callee->id(),
callee->get_request()->header->from_address,
callee->get_request()->to_address,
callee->get_request()->header->trace_id,
tls_dsn.last_worker_queue_size);
}
// return true means continue, otherwise early terminate with task::set_error_code
static void tracer_on_rpc_reply(task *caller, message_ex *msg)
{
message_header &hdr = *msg->header;
LOG_INFO("{} RPC.REPLY: {} => {}, trace_id = {:#018x}",
hdr.rpc_name,
msg->header->from_address,
msg->to_address,
hdr.trace_id);
}
static void tracer_on_rpc_response_enqueue(rpc_response_task *resp)
{
LOG_INFO("{} RPC.RESPONSE.ENQUEUE, task_id = {:#018x}, {} => {}, trace_id = {:#018x}, queue "
"size = {}",
resp->spec().name,
resp->id(),
resp->get_request()->to_address,
resp->get_request()->header->from_address,
resp->get_request()->header->trace_id,
tls_dsn.last_worker_queue_size);
}
static void tracer_on_rpc_create_response(message_ex *req, message_ex *resp)
{
LOG_INFO("{} RPC.CREATE.RESPONSE, trace_id = {:#018x}",
resp->header->rpc_name,
resp->header->trace_id);
}
enum logged_event_t
{
LET_TASK_BEGIN,
LET_TASK_END,
LET_LOG,
LET_RPC_CALL,
LET_RPC_REPLY,
LET_AIO_CALL,
LET_LPC_CALL,
LET_INVALID
};
ENUM_BEGIN(logged_event_t, LET_INVALID)
ENUM_REG(LET_TASK_BEGIN)
ENUM_REG(LET_TASK_END)
ENUM_REG(LET_RPC_CALL)
ENUM_REG(LET_RPC_REPLY)
ENUM_REG(LET_AIO_CALL)
ENUM_REG(LET_LPC_CALL)
ENUM_END(logged_event_t)
struct logged_event
{
uint64_t ts;
logged_event_t event_type;
uint64_t correlation_id; // task or rpc
std::string context; // log or rpc address
};
struct logged_task
{
uint64_t task_id;
uint64_t trace_id; // if present
std::vector<logged_event> events;
};
static std::string tracer_log_flow_error(const char *msg)
{
return std::string("invalid arguments for tracer.find: ") + msg;
}
static std::string tracer_log_flow(const std::vector<std::string> &args)
{
if (args.size() < 4) {
return tracer_log_flow_error("not enough arguments");
}
// TODO: implement this
if (args[0] == "forward" || args[0] == "f") {
} else if (args[0] == "backward" || args[0] == "b") {
} else {
return tracer_log_flow_error("invalid direction argument - must be forward|f|backward|b");
}
// TODO: implement this
if (args[1] == "rpc" || args[1] == "r") {
} else if (args[1] == "task" || args[1] == "t") {
} else {
return tracer_log_flow_error("invalid id type argument - must be rpc|r|task|t");
}
uint64_t xid = 0;
sscanf(args[2].c_str(), "%016" PRIx64, &xid);
if (xid == 0) {
return tracer_log_flow_error("invalid id value - must be with 016" PRIx64 " format");
}
std::string log_dir = utils::filesystem::path_combine(tools::spec().data_dir, "logs");
std::string fpath = utils::filesystem::path_combine(log_dir, args[3]);
if (!utils::filesystem::file_exists(fpath)) {
return tracer_log_flow_error((fpath + " not exist").c_str());
}
return "Not implemented";
}
void tracer::install(service_spec &spec)
{
for (int i = 0; i <= dsn::task_code::max(); i++) {
if (i == TASK_CODE_INVALID)
continue;
std::string section_name = fmt::format("task.{}", dsn::task_code(i));
task_spec *spec = task_spec::get(i);
CHECK_NOTNULL(spec, "");
if (!dsn_config_get_value_bool(section_name.c_str(),
"is_trace",
FLAGS_is_trace,
"whether to trace this kind of task"))
continue;
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_task_create",
true,
"whether to trace when a task is created"))
spec->on_task_create.put_back(tracer_on_task_create, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_task_enqueue",
true,
"whether to trace when a timer or async task is enqueued"))
spec->on_task_enqueue.put_back(tracer_on_task_enqueue, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_task_begin",
true,
"whether to trace when a task begins"))
spec->on_task_begin.put_back(tracer_on_task_begin, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_task_end",
true,
"whether to trace when a task ends"))
spec->on_task_end.put_back(tracer_on_task_end, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_task_cancelled",
true,
"whether to trace when a task is cancelled"))
spec->on_task_cancelled.put_back(tracer_on_task_cancelled, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_task_wait_pre",
true,
"whether to trace when a task is to be wait"))
spec->on_task_wait_pre.put_back(tracer_on_task_wait_pre, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_task_wait_post",
true,
"whether to trace when a task is wait post"))
spec->on_task_wait_post.put_back(tracer_on_task_wait_post, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_task_cancel_post",
true,
"whether to trace when a task is cancel post"))
spec->on_task_cancel_post.put_back(tracer_on_task_cancel_post, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_aio_call",
true,
"whether to trace when an aio task is called"))
spec->on_aio_call.put_back(tracer_on_aio_call, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_aio_enqueue",
true,
"whether to trace when an aio task is enqueued"))
spec->on_aio_enqueue.put_back(tracer_on_aio_enqueue, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_rpc_call",
true,
"whether to trace when a rpc is made"))
spec->on_rpc_call.put_back(tracer_on_rpc_call, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_rpc_request_enqueue",
true,
"whether to trace when a rpc request task is enqueued"))
spec->on_rpc_request_enqueue.put_back(tracer_on_rpc_request_enqueue, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_rpc_reply",
true,
"whether to trace when reply a rpc request"))
spec->on_rpc_reply.put_back(tracer_on_rpc_reply, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_rpc_response_enqueue",
true,
"whetehr to trace when a rpc response task is enqueued"))
spec->on_rpc_response_enqueue.put_back(tracer_on_rpc_response_enqueue, "tracer");
if (dsn_config_get_value_bool(section_name.c_str(),
"tracer::on_rpc_create_response",
true,
"whetehr to trace when a rpc response task is created"))
spec->on_rpc_create_response.put_back(tracer_on_rpc_create_response, "tracer");
}
static std::once_flag flag;
std::call_once(flag, [&]() {
_tracer_find_cmd = command_manager::instance().register_single_command(
"tracer.find",
"Find related logs",
"[forward|f|backward|b] [rpc|r|task|t] [trace_id|task_id(e.g., a023003920302390)] "
"<log_file_name(e.g., replica.log.yyyyMMdd_hhmmss_SSS)>",
tracer_log_flow);
});
}
tracer::tracer(const char *name) : toollet(name) {}
} // namespace tools
} // namespace dsn