blob: 4de5564533be0d4853447d0b1733260e6716f901 [file]
#include "Config.h"
#include "ProcStats.h"
#include "UDSConnector.h"
#include <chrono>
#include <ctime>
#define typeid __typeid
#define operator __operator
extern "C" {
#include "postgres.h"
#include "access/hash.h"
#include "access/xact.h"
#include "commands/dbcommands.h"
#include "commands/explain.h"
#include "commands/resgroupcmds.h"
#include "executor/executor.h"
#include "utils/elog.h"
#include "utils/workfile_mgr.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbexplain.h"
#include "cdb/cdbinterconnect.h"
#include "cdb/cdbvars.h"
#include "stat_statements_parser/pg_stat_statements_ya_parser.h"
#include "tcop/utility.h"
}
#undef typeid
#undef operator
#include "EventSender.h"
namespace {
std::string *get_user_name() {
const char *username = GetConfigOption("session_authorization", false, false);
// username is not to be freed
return username ? new std::string(username) : nullptr;
}
std::string *get_db_name() {
char *dbname = get_database_name(MyDatabaseId);
std::string *result = nullptr;
if (dbname) {
result = new std::string(dbname);
pfree(dbname);
}
return result;
}
std::string *get_rg_name() {
auto groupId = ResGroupGetGroupIdBySessionId(MySessionState->sessionId);
if (!OidIsValid(groupId))
return nullptr;
char *rgname = GetResGroupNameForId(groupId);
if (rgname == nullptr)
return nullptr;
return new std::string(rgname);
}
google::protobuf::Timestamp current_ts() {
google::protobuf::Timestamp current_ts;
struct timeval tv;
gettimeofday(&tv, nullptr);
current_ts.set_seconds(tv.tv_sec);
current_ts.set_nanos(static_cast<int32_t>(tv.tv_usec * 1000));
return current_ts;
}
void set_query_key(yagpcc::QueryKey *key, QueryDesc *query_desc) {
key->set_ccnt(gp_command_count);
key->set_ssid(gp_session_id);
int32 tmid = 0;
gpmon_gettmid(&tmid);
key->set_tmid(tmid);
}
void set_segment_key(yagpcc::SegmentKey *key, QueryDesc *query_desc) {
key->set_dbid(GpIdentity.dbid);
key->set_segindex(GpIdentity.segindex);
}
ExplainState get_explain_state(QueryDesc *query_desc, bool costs) {
ExplainState es;
ExplainInitState(&es);
es.costs = costs;
es.verbose = true;
es.format = EXPLAIN_FORMAT_TEXT;
ExplainBeginOutput(&es);
ExplainPrintPlan(&es, query_desc);
ExplainEndOutput(&es);
return es;
}
inline std::string char_to_trimmed_str(const char *str, size_t len) {
return std::string(str, std::min(len, Config::max_text_size()));
}
void set_plan_text(std::string *plan_text, QueryDesc *query_desc) {
MemoryContext oldcxt =
MemoryContextSwitchTo(query_desc->estate->es_query_cxt);
auto es = get_explain_state(query_desc, true);
*plan_text = char_to_trimmed_str(es.str->data, es.str->len);
pfree(es.str->data);
MemoryContextSwitchTo(oldcxt);
}
void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->plannedstmt) {
auto qi = req->mutable_query_info();
qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER
? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER
: yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER);
set_plan_text(qi->mutable_plan_text(), query_desc);
StringInfo norm_plan = gen_normplan(qi->plan_text().c_str());
*qi->mutable_template_plan_text() = std::string(norm_plan->data);
qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, norm_plan->len));
qi->set_query_id(query_desc->plannedstmt->queryId);
}
}
void set_query_text(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->sourceText) {
auto qi = req->mutable_query_info();
*qi->mutable_query_text() = char_to_trimmed_str(
query_desc->sourceText, strlen(query_desc->sourceText));
char *norm_query = gen_normquery(query_desc->sourceText);
*qi->mutable_template_query_text() =
char_to_trimmed_str(norm_query, strlen(norm_query));
}
}
void clear_big_fields(yagpcc::SetQueryReq *req) {
if (Gp_session_role == GP_ROLE_DISPATCH) {
auto qi = req->mutable_query_info();
qi->clear_plan_text();
qi->clear_query_text();
}
}
void set_query_info(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
if (Gp_session_role == GP_ROLE_DISPATCH) {
auto qi = req->mutable_query_info();
qi->set_allocated_username(get_user_name());
qi->set_allocated_databasename(get_db_name());
qi->set_allocated_rsgname(get_rg_name());
}
}
void set_qi_nesting_level(yagpcc::SetQueryReq *req, int nesting_level) {
auto aqi = req->mutable_add_info();
aqi->set_nested_level(nesting_level);
}
void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics,
QueryDesc *query_desc) {
auto instrument = query_desc->planstate->instrument;
if (instrument) {
metrics->set_ntuples(instrument->ntuples);
metrics->set_nloops(instrument->nloops);
metrics->set_tuplecount(instrument->tuplecount);
metrics->set_firsttuple(instrument->firsttuple);
metrics->set_startup(instrument->startup);
metrics->set_total(instrument->total);
auto &buffusage = instrument->bufusage;
metrics->set_shared_blks_hit(buffusage.shared_blks_hit);
metrics->set_shared_blks_read(buffusage.shared_blks_read);
metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied);
metrics->set_shared_blks_written(buffusage.shared_blks_written);
metrics->set_local_blks_hit(buffusage.local_blks_hit);
metrics->set_local_blks_read(buffusage.local_blks_read);
metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied);
metrics->set_local_blks_written(buffusage.local_blks_written);
metrics->set_temp_blks_read(buffusage.temp_blks_read);
metrics->set_temp_blks_written(buffusage.temp_blks_written);
metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time));
metrics->set_blk_write_time(
INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time));
}
if (query_desc->estate && query_desc->estate->motionlayer_context) {
MotionLayerState *mlstate =
(MotionLayerState *)query_desc->estate->motionlayer_context;
metrics->mutable_sent()->set_total_bytes(mlstate->stat_total_bytes_sent);
metrics->mutable_sent()->set_tuple_bytes(mlstate->stat_tuple_bytes_sent);
metrics->mutable_sent()->set_chunks(mlstate->stat_total_chunks_sent);
metrics->mutable_received()->set_total_bytes(
mlstate->stat_total_bytes_recvd);
metrics->mutable_received()->set_tuple_bytes(
mlstate->stat_tuple_bytes_recvd);
metrics->mutable_received()->set_chunks(mlstate->stat_total_chunks_recvd);
}
}
decltype(std::chrono::high_resolution_clock::now()) query_start_time;
void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc) {
if (query_desc->planstate && query_desc->planstate->instrument) {
set_metric_instrumentation(metrics->mutable_instrumentation(), query_desc);
}
fill_self_stats(metrics->mutable_systemstat());
std::chrono::duration<double> elapsed_seconds =
std::chrono::high_resolution_clock::now() - query_start_time;
metrics->mutable_systemstat()->set_runningtimeseconds(
elapsed_seconds.count());
metrics->mutable_spill()->set_filecount(WorkfileTotalFilesCreated());
metrics->mutable_spill()->set_totalbytes(WorkfileTotalBytesWritten());
}
yagpcc::SetQueryReq create_query_req(QueryDesc *query_desc,
yagpcc::QueryStatus status) {
yagpcc::SetQueryReq req;
req.set_query_status(status);
*req.mutable_datetime() = current_ts();
set_query_key(req.mutable_query_key(), query_desc);
set_segment_key(req.mutable_segment_key(), query_desc);
return req;
}
inline bool is_top_level_query(QueryDesc *query_desc, int nesting_level) {
return (query_desc->gpmon_pkt &&
query_desc->gpmon_pkt->u.qexec.key.tmid == 0) ||
nesting_level == 0;
}
inline bool need_collect(QueryDesc *query_desc, int nesting_level) {
return (Config::report_nested_queries() ||
is_top_level_query(query_desc, nesting_level)) &&
gp_command_count != 0 && query_desc->sourceText != nullptr &&
Config::enable_collector() && !Config::filter_user(get_user_name());
}
} // namespace
void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) {
if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) {
return;
}
switch (status) {
case METRICS_PLAN_NODE_INITIALIZE:
case METRICS_PLAN_NODE_EXECUTING:
case METRICS_PLAN_NODE_FINISHED:
// TODO
break;
case METRICS_QUERY_SUBMIT:
// don't collect anything here. We will fake this call in ExecutorStart as
// it really makes no difference. Just complicates things
break;
case METRICS_QUERY_START:
// no-op: executor_after_start is enough
break;
case METRICS_QUERY_DONE:
case METRICS_QUERY_ERROR:
case METRICS_QUERY_CANCELING:
case METRICS_QUERY_CANCELED:
case METRICS_INNER_QUERY_DONE:
collect_query_done(reinterpret_cast<QueryDesc *>(arg), status);
break;
default:
ereport(FATAL, (errmsg("Unknown query status: %d", status)));
}
}
void EventSender::executor_before_start(QueryDesc *query_desc,
int /* eflags*/) {
if (!connector) {
return;
}
if (!need_collect(query_desc, nesting_level)) {
return;
}
collect_query_submit(query_desc);
query_start_time = std::chrono::high_resolution_clock::now();
WorkfileResetBackendStats();
if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze()) {
query_desc->instrument_options |= INSTRUMENT_BUFFERS;
query_desc->instrument_options |= INSTRUMENT_ROWS;
query_desc->instrument_options |= INSTRUMENT_TIMER;
if (Config::enable_cdbstats()) {
query_desc->instrument_options |= INSTRUMENT_CDB;
if (!query_desc->showstatctx) {
instr_time starttime;
INSTR_TIME_SET_CURRENT(starttime);
query_desc->showstatctx =
cdbexplain_showExecStatsBegin(query_desc, starttime);
}
}
}
}
void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) {
if (!connector) {
return;
}
if ((Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) &&
need_collect(query_desc, nesting_level)) {
auto *query = get_query_message(query_desc);
update_query_state(query_desc, query, QueryState::START);
auto query_msg = query->message;
*query_msg->mutable_start_time() = current_ts();
set_query_plan(query_msg, query_desc);
if (connector->report_query(*query_msg, "started")) {
clear_big_fields(query_msg);
}
}
}
void EventSender::executor_end(QueryDesc *query_desc) {
if (!connector) {
return;
}
if (!need_collect(query_desc, nesting_level) ||
(Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE)) {
return;
}
/* TODO: when querying via CURSOR this call freezes. Need to investigate.
To reproduce - uncomment it and run installchecks. It will freeze around
join test. Needs investigation
if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze() &&
Config::enable_cdbstats() && query_desc->estate->dispatcherState &&
query_desc->estate->dispatcherState->primaryResults) {
cdbdisp_checkDispatchResult(query_desc->estate->dispatcherState,
DISPATCH_WAIT_NONE);
}*/
auto *query = get_query_message(query_desc);
if (query->state == UNKNOWN && !Config::report_nested_queries()) {
// COMMIT/ROLLBACK of a nested query. Happens in top-level
return;
}
update_query_state(query_desc, query, QueryState::END);
auto query_msg = query->message;
*query_msg->mutable_end_time() = current_ts();
set_gp_metrics(query_msg->mutable_query_metrics(), query_desc);
if (connector->report_query(*query_msg, "ended")) {
clear_big_fields(query_msg);
}
}
void EventSender::collect_query_submit(QueryDesc *query_desc) {
if (connector && need_collect(query_desc, nesting_level)) {
auto *query = get_query_message(query_desc);
query->state = QueryState::SUBMIT;
auto query_msg = query->message;
*query_msg =
create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT);
*query_msg->mutable_submit_time() = current_ts();
set_query_info(query_msg, query_desc);
set_qi_nesting_level(query_msg, query_desc->gpmon_pkt->u.qexec.key.tmid);
set_query_text(query_msg, query_desc);
if (connector->report_query(*query_msg, "submit")) {
clear_big_fields(query_msg);
}
}
}
void EventSender::collect_query_done(QueryDesc *query_desc,
QueryMetricsStatus status) {
if (connector && need_collect(query_desc, nesting_level)) {
yagpcc::QueryStatus query_status;
std::string msg;
switch (status) {
case METRICS_QUERY_DONE:
case METRICS_INNER_QUERY_DONE:
query_status = yagpcc::QueryStatus::QUERY_STATUS_DONE;
msg = "done";
break;
case METRICS_QUERY_ERROR:
query_status = yagpcc::QueryStatus::QUERY_STATUS_ERROR;
msg = "error";
break;
case METRICS_QUERY_CANCELING:
query_status = yagpcc::QueryStatus::QUERY_STATUS_CANCELLING;
msg = "cancelling";
break;
case METRICS_QUERY_CANCELED:
query_status = yagpcc::QueryStatus::QUERY_STATUS_CANCELED;
msg = "cancelled";
break;
default:
ereport(FATAL, (errmsg("Unexpected query status in query_done hook: %d",
status)));
}
auto *query = get_query_message(query_desc);
if (query->state != UNKNOWN || Config::report_nested_queries()) {
update_query_state(query_desc, query, QueryState::DONE,
query_status ==
yagpcc::QueryStatus::QUERY_STATUS_DONE);
auto query_msg = query->message;
query_msg->set_query_status(query_status);
connector->report_query(*query_msg, msg);
} else {
// otherwise it`s a nested query being committed/aborted at top level
// and we should ignore it
}
query_msgs.erase({query_desc->gpmon_pkt->u.qexec.key.ccnt,
query_desc->gpmon_pkt->u.qexec.key.tmid});
pfree(query_desc->gpmon_pkt);
}
}
EventSender::EventSender() {
if (Config::enable_collector() && !Config::filter_user(get_user_name())) {
try {
connector = new UDSConnector();
} catch (const std::exception &e) {
ereport(INFO, (errmsg("Unable to start query tracing %s", e.what())));
}
}
}
EventSender::~EventSender() {
delete connector;
for (auto iter = query_msgs.begin(); iter != query_msgs.end(); ++iter) {
delete iter->second.message;
}
}
// That's basically a very simplistic state machine to fix or highlight any bugs
// coming from GP
void EventSender::update_query_state(QueryDesc *query_desc, QueryItem *query,
QueryState new_state, bool success) {
if (query->state == UNKNOWN) {
collect_query_submit(query_desc);
}
switch (new_state) {
case QueryState::SUBMIT:
Assert(false);
break;
case QueryState::START:
if (query->state == QueryState::SUBMIT) {
query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START);
} else {
Assert(false);
}
break;
case QueryState::END:
Assert(query->state == QueryState::START || IsAbortInProgress());
query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END);
break;
case QueryState::DONE:
Assert(query->state == QueryState::END || !success);
query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE);
break;
default:
Assert(false);
}
query->state = new_state;
}
EventSender::QueryItem *EventSender::get_query_message(QueryDesc *query_desc) {
if (query_desc->gpmon_pkt == nullptr ||
query_msgs.find({query_desc->gpmon_pkt->u.qexec.key.ccnt,
query_desc->gpmon_pkt->u.qexec.key.tmid}) ==
query_msgs.end()) {
query_desc->gpmon_pkt = (gpmon_packet_t *)palloc0(sizeof(gpmon_packet_t));
query_desc->gpmon_pkt->u.qexec.key.ccnt = gp_command_count;
query_desc->gpmon_pkt->u.qexec.key.tmid = nesting_level;
query_msgs.insert({{gp_command_count, nesting_level},
QueryItem(UNKNOWN, new yagpcc::SetQueryReq())});
}
return &query_msgs.at({query_desc->gpmon_pkt->u.qexec.key.ccnt,
query_desc->gpmon_pkt->u.qexec.key.tmid});
}
EventSender::QueryItem::QueryItem(EventSender::QueryState st,
yagpcc::SetQueryReq *msg)
: state(st), message(msg) {}