blob: b28ceba175aec1f554715705e9f3fc684c66dbf7 [file]
/*-------------------------------------------------------------------------
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* EventSender.cpp
*
* IDENTIFICATION
* gpcontrib/gp_stats_collector/src/EventSender.cpp
*
*-------------------------------------------------------------------------
*/
#include "UDSConnector.h"
#include "memory/gpdbwrappers.h"
#include "log/LogOps.h"
#define typeid __typeid
extern "C" {
#include "postgres.h"
#include "executor/executor.h"
#include "utils/elog.h"
#include "utils/guc.h"
#include "cdb/cdbexplain.h"
#include "cdb/cdbvars.h"
#include "cdb/ml_ipc.h"
}
#undef typeid
#include "EventSender.h"
#include "PgUtils.h"
#include "ProtoUtils.h"
#define need_collect_analyze() \
(Gp_role == GP_ROLE_DISPATCH && config.min_analyze_time() >= 0 && \
config.enable_analyze())
bool EventSender::verify_query(QueryDesc *query_desc, QueryState state,
bool utility) {
if (!proto_verified) {
return false;
}
if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) {
return false;
}
switch (state) {
case QueryState::SUBMIT:
// Cache GUCs once at SUBMIT. Synced GUCs are visible to all subsequent
// states. Without caching, a query that unsets/sets filtering GUCs would
// see different filter criteria at DONE, because at SUBMIT the query was
// not executed yet, causing DONE to be skipped/added.
config.sync();
if (utility && !config.enable_utility()) {
return false;
}
// Register qkey for a nested query we won't report,
// so we can detect nesting_level > 0 and skip reporting at end/done.
if (!need_report_nested_query() && nesting_level > 0) {
QueryKey::register_qkey(query_desc, nesting_level);
return false;
}
if (is_top_level_query(query_desc, nesting_level)) {
nested_timing = 0;
nested_calls = 0;
}
break;
case QueryState::START:
if (!qdesc_submitted(query_desc)) {
collect_query_submit(query_desc, false /* utility */);
}
break;
case QueryState::DONE:
if (utility && !config.enable_utility()) {
return false;
}
default:
break;
}
if (filter_query(query_desc)) {
return false;
}
if (!nesting_is_valid(query_desc, nesting_level)) {
return false;
}
return true;
}
bool EventSender::log_query_req(const gpsc::SetQueryReq &req,
const std::string &event, bool utility) {
bool clear_big_fields = false;
switch (config.logging_mode()) {
case LOG_MODE_UDS:
clear_big_fields = UDSConnector::report_query(req, event, config);
break;
case LOG_MODE_TBL:
gpdb::insert_log(req, utility);
clear_big_fields = false;
break;
default:
Assert(false);
}
return clear_big_fields;
}
void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg,
bool utility, ErrorData *edata) {
auto *query_desc = reinterpret_cast<QueryDesc *>(arg);
switch (status) {
case METRICS_PLAN_NODE_INITIALIZE:
case METRICS_PLAN_NODE_EXECUTING:
case METRICS_PLAN_NODE_FINISHED:
// TODO
break;
case METRICS_QUERY_SUBMIT:
collect_query_submit(query_desc, utility);
break;
case METRICS_QUERY_START:
// no-op: executor_after_start is enough
break;
case METRICS_QUERY_CANCELING:
// it appears we're only interested in the actual CANCELED event.
// for now we will ignore CANCELING state unless otherwise requested from
// end users
break;
case METRICS_QUERY_DONE:
case METRICS_QUERY_ERROR:
case METRICS_QUERY_CANCELED:
case METRICS_INNER_QUERY_DONE:
collect_query_done(query_desc, utility, status, edata);
break;
default:
ereport(FATAL, (errmsg("Unknown query status: %d", status)));
}
}
void EventSender::executor_before_start(QueryDesc *query_desc, int eflags) {
if (!verify_query(query_desc, QueryState::START, false /* utility*/)) {
return;
}
if (Gp_role == GP_ROLE_DISPATCH && config.enable_analyze() &&
(eflags & EXEC_FLAG_EXPLAIN_ONLY) == 0) {
query_desc->instrument_options |= INSTRUMENT_BUFFERS;
query_desc->instrument_options |= INSTRUMENT_ROWS;
query_desc->instrument_options |= INSTRUMENT_TIMER;
if (config.enable_cdbstats()) {
query_desc->instrument_options |= INSTRUMENT_CDB;
if (!query_desc->showstatctx) {
instr_time starttime;
INSTR_TIME_SET_CURRENT(starttime);
query_desc->showstatctx =
gpdb::cdbexplain_showExecStatsBegin(query_desc, starttime);
}
}
}
}
void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) {
if (!verify_query(query_desc, QueryState::START, false /* utility */)) {
return;
}
auto &query = get_query(query_desc);
auto query_msg = query.message.get();
*query_msg->mutable_start_time() = current_ts();
update_query_state(query, QueryState::START, false /* utility */);
set_query_plan(query_msg, query_desc, config);
if (need_collect_analyze()) {
// Set up to track total elapsed time during query run.
// Make sure the space is allocated in the per-query
// context so it will go away at executor_end.
if (query_desc->totaltime == NULL) {
MemoryContext oldcxt =
gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt);
query_desc->totaltime = gpdb::instr_alloc(1, INSTRUMENT_ALL, false);
gpdb::mem_ctx_switch_to(oldcxt);
}
}
gpsc::GPMetrics stats;
std::swap(stats, *query_msg->mutable_query_metrics());
if (log_query_req(*query_msg, "started", false /* utility */)) {
clear_big_fields(query_msg);
}
std::swap(stats, *query_msg->mutable_query_metrics());
}
void EventSender::executor_end(QueryDesc *query_desc) {
if (!verify_query(query_desc, QueryState::END, false /* utility */)) {
return;
}
auto &query = get_query(query_desc);
auto *query_msg = query.message.get();
*query_msg->mutable_end_time() = current_ts();
update_query_state(query, QueryState::END, false /* utility */);
if (is_top_level_query(query_desc, nesting_level)) {
set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, nested_calls,
nested_timing);
} else {
set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 0, 0);
}
if (log_query_req(*query_msg, "ended", false /* utility */)) {
clear_big_fields(query_msg);
}
}
void EventSender::collect_query_submit(QueryDesc *query_desc, bool utility) {
if (!verify_query(query_desc, QueryState::SUBMIT, utility)) {
return;
}
submit_query(query_desc);
auto &query = get_query(query_desc);
auto *query_msg = query.message.get();
*query_msg = create_query_req(gpsc::QueryStatus::QUERY_STATUS_SUBMIT);
*query_msg->mutable_submit_time() = current_ts();
set_query_info(query_msg);
set_qi_nesting_level(query_msg, nesting_level);
set_qi_slice_id(query_msg);
set_query_text(query_msg, query_desc, config);
if (log_query_req(*query_msg, "submit", utility)) {
clear_big_fields(query_msg);
}
// take initial metrics snapshot so that we can safely take diff afterwards
// in END or DONE events.
set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 0, 0);
#ifdef IC_TEARDOWN_HOOK
// same for interconnect statistics
ic_metrics_collect();
set_ic_stats(query_msg->mutable_query_metrics()->mutable_instrumentation(),
&ic_statistics);
#endif
}
void EventSender::report_query_done(QueryDesc *query_desc, QueryItem &query,
QueryMetricsStatus status, bool utility,
ErrorData *edata) {
gpsc::QueryStatus query_status;
std::string msg;
switch (status) {
case METRICS_QUERY_DONE:
case METRICS_INNER_QUERY_DONE:
query_status = gpsc::QueryStatus::QUERY_STATUS_DONE;
msg = "done";
break;
case METRICS_QUERY_ERROR:
query_status = gpsc::QueryStatus::QUERY_STATUS_ERROR;
msg = "error";
break;
case METRICS_QUERY_CANCELING:
// at the moment we don't track this event, but I`ll leave this code
// here just in case
Assert(false);
query_status = gpsc::QueryStatus::QUERY_STATUS_CANCELLING;
msg = "cancelling";
break;
case METRICS_QUERY_CANCELED:
query_status = gpsc::QueryStatus::QUERY_STATUS_CANCELED;
msg = "cancelled";
break;
default:
ereport(FATAL,
(errmsg("Unexpected query status in query_done hook: %d", status)));
}
auto prev_state = query.state;
update_query_state(query, QueryState::DONE, utility,
query_status == gpsc::QueryStatus::QUERY_STATUS_DONE);
auto query_msg = query.message.get();
query_msg->set_query_status(query_status);
if (status == METRICS_QUERY_ERROR) {
bool error_flushed = elog_message() == NULL;
if (error_flushed && (edata == NULL || edata->message == NULL)) {
ereport(WARNING, (errmsg("GPSC missing error message")));
ereport(DEBUG3,
(errmsg("GPSC query sourceText: %s", query_desc->sourceText)));
} else {
set_qi_error_message(
query_msg, error_flushed ? edata->message : elog_message(), config);
}
}
if (prev_state == START) {
// We've missed ExecutorEnd call due to query cancel or error. It's
// fine, but now we need to collect and report execution stats
*query_msg->mutable_end_time() = current_ts();
set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, nested_calls,
nested_timing);
}
#ifdef IC_TEARDOWN_HOOK
ic_metrics_collect();
set_ic_stats(query_msg->mutable_query_metrics()->mutable_instrumentation(),
&ic_statistics);
#endif
(void)log_query_req(*query_msg, msg, utility);
}
void EventSender::collect_query_done(QueryDesc *query_desc, bool utility,
QueryMetricsStatus status,
ErrorData *edata) {
if (!verify_query(query_desc, QueryState::DONE, utility)) {
return;
}
// Skip sending done message if query errored before submit.
if (!qdesc_submitted(query_desc)) {
if (status != METRICS_QUERY_ERROR) {
ereport(WARNING, (errmsg("GPSC trying to process DONE hook for "
"unsubmitted and unerrored query")));
ereport(DEBUG3,
(errmsg("GPSC query sourceText: %s", query_desc->sourceText)));
}
return;
}
if (queries.empty()) {
ereport(WARNING, (errmsg("GPSC cannot find query to process DONE hook")));
ereport(DEBUG3,
(errmsg("GPSC query sourceText: %s", query_desc->sourceText)));
return;
}
auto &query = get_query(query_desc);
report_query_done(query_desc, query, status, utility, edata);
if (need_report_nested_query())
update_nested_counters(query_desc);
queries.erase(QueryKey::from_qdesc(query_desc));
pfree(query_desc->gpsc_query_key);
query_desc->gpsc_query_key = NULL;
}
void EventSender::ic_metrics_collect() {
#ifdef IC_TEARDOWN_HOOK
if (Gp_interconnect_type != INTERCONNECT_TYPE_UDPIFC) {
return;
}
if (!proto_verified || gp_command_count == 0 || !config.enable_collector() ||
config.filter_user(get_user_name())) {
return;
}
// we also would like to know nesting level here and filter queries BUT we
// don't have this kind of information from this callback. Will have to
// collect stats anyways and throw it away later, if necessary
auto metrics = UDPIFCGetICStats();
ic_statistics.totalRecvQueueSize += metrics.totalRecvQueueSize;
ic_statistics.recvQueueSizeCountingTime += metrics.recvQueueSizeCountingTime;
ic_statistics.totalCapacity += metrics.totalCapacity;
ic_statistics.capacityCountingTime += metrics.capacityCountingTime;
ic_statistics.totalBuffers += metrics.totalBuffers;
ic_statistics.bufferCountingTime += metrics.bufferCountingTime;
ic_statistics.activeConnectionsNum += metrics.activeConnectionsNum;
ic_statistics.retransmits += metrics.retransmits;
ic_statistics.startupCachedPktNum += metrics.startupCachedPktNum;
ic_statistics.mismatchNum += metrics.mismatchNum;
ic_statistics.crcErrors += metrics.crcErrors;
ic_statistics.sndPktNum += metrics.sndPktNum;
ic_statistics.recvPktNum += metrics.recvPktNum;
ic_statistics.disorderedPktNum += metrics.disorderedPktNum;
ic_statistics.duplicatedPktNum += metrics.duplicatedPktNum;
ic_statistics.recvAckNum += metrics.recvAckNum;
ic_statistics.statusQueryMsgNum += metrics.statusQueryMsgNum;
#endif
}
void EventSender::analyze_stats_collect(QueryDesc *query_desc) {
if (!verify_query(query_desc, QueryState::END, false /* utility */)) {
return;
}
if (Gp_role != GP_ROLE_DISPATCH) {
return;
}
if (!query_desc->totaltime || !need_collect_analyze()) {
return;
}
// Make sure stats accumulation is done.
// (Note: it's okay if several levels of hook all do this.)
gpdb::instr_end_loop(query_desc->totaltime);
double ms = query_desc->totaltime->total * 1000.0;
if (ms >= config.min_analyze_time()) {
auto &query = get_query(query_desc);
auto *query_msg = query.message.get();
set_analyze_plan_text(query_desc, query_msg, config);
}
}
EventSender::EventSender() {
// Perform initial sync to get default GUC values
config.sync();
if (config.enable_collector()) {
try {
GOOGLE_PROTOBUF_VERIFY_VERSION;
proto_verified = true;
} catch (const std::exception &e) {
ereport(INFO, (errmsg("Unable to start query tracing %s", e.what())));
}
}
#ifdef IC_TEARDOWN_HOOK
memset(&ic_statistics, 0, sizeof(ICStatistics));
#endif
}
EventSender::~EventSender() {
for (const auto &[qkey, _] : queries) {
ereport(LOG, (errmsg("GPSC query with missing done event: "
"tmid=%d ssid=%d ccnt=%d nlvl=%d",
qkey.tmid, qkey.ssid, qkey.ccnt, qkey.nesting_level)));
}
}
// That's basically a very simplistic state machine to fix or highlight any bugs
// coming from GP
void EventSender::update_query_state(QueryItem &query, QueryState new_state,
bool utility, bool success) {
switch (new_state) {
case QueryState::SUBMIT:
Assert(false);
break;
case QueryState::START:
if (query.state == QueryState::SUBMIT) {
query.message->set_query_status(gpsc::QueryStatus::QUERY_STATUS_START);
} else {
Assert(false);
}
break;
case QueryState::END:
// Example of below assert triggering: CURSOR closes before ever being
// executed Assert(query->state == QueryState::START ||
// IsAbortInProgress());
query.message->set_query_status(gpsc::QueryStatus::QUERY_STATUS_END);
break;
case QueryState::DONE:
Assert(query.state == QueryState::END || !success || utility);
query.message->set_query_status(gpsc::QueryStatus::QUERY_STATUS_DONE);
break;
default:
Assert(false);
}
query.state = new_state;
}
EventSender::QueryItem &EventSender::get_query(QueryDesc *query_desc) {
if (!qdesc_submitted(query_desc)) {
ereport(WARNING,
(errmsg("GPSC attempting to get query that was not submitted")));
ereport(DEBUG3,
(errmsg("GPSC query sourceText: %s", query_desc->sourceText)));
throw std::runtime_error("Attempting to get query that was not submitted");
}
return queries.find(QueryKey::from_qdesc(query_desc))->second;
}
void EventSender::submit_query(QueryDesc *query_desc) {
if (query_desc->gpsc_query_key) {
ereport(WARNING,
(errmsg("GPSC trying to submit already submitted query")));
ereport(DEBUG3,
(errmsg("GPSC query sourceText: %s", query_desc->sourceText)));
}
QueryKey::register_qkey(query_desc, nesting_level);
auto key = QueryKey::from_qdesc(query_desc);
auto [_, inserted] = queries.emplace(key, QueryItem(QueryState::SUBMIT));
if (!inserted) {
ereport(WARNING, (errmsg("GPSC duplicate query submit detected")));
ereport(DEBUG3,
(errmsg("GPSC query sourceText: %s", query_desc->sourceText)));
}
}
void EventSender::update_nested_counters(QueryDesc *query_desc) {
if (!is_top_level_query(query_desc, nesting_level)) {
auto &query = get_query(query_desc);
nested_calls++;
double end_time = protots_to_double(query.message->end_time());
double start_time = protots_to_double(query.message->start_time());
if (end_time >= start_time) {
nested_timing += end_time - start_time;
} else {
ereport(WARNING, (errmsg("GPSC query start_time > end_time (%f > %f)",
start_time, end_time)));
ereport(DEBUG3,
(errmsg("GPSC nested query text %s", query_desc->sourceText)));
}
}
}
bool EventSender::qdesc_submitted(QueryDesc *query_desc) {
if (query_desc->gpsc_query_key == NULL) {
return false;
}
return queries.find(QueryKey::from_qdesc(query_desc)) != queries.end();
}
bool EventSender::nesting_is_valid(QueryDesc *query_desc, int nesting_level) {
return need_report_nested_query() ||
is_top_level_query(query_desc, nesting_level);
}
bool EventSender::need_report_nested_query() {
return config.report_nested_queries() && Gp_role == GP_ROLE_DISPATCH;
}
bool EventSender::filter_query(QueryDesc *query_desc) {
return gp_command_count == 0 || query_desc->sourceText == nullptr ||
!config.enable_collector() || config.filter_user(get_user_name());
}
EventSender::QueryItem::QueryItem(QueryState st)
: message(std::make_unique<gpsc::SetQueryReq>()), state(st) {}