blob: 0a40b4cb3592662b7b3cf7108c69f922b9660ce0 [file]
/*-------------------------------------------------------------------------
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* hook_wrappers.cpp
*
* IDENTIFICATION
* gpcontrib/gp_stats_collector/src/hook_wrappers.cpp
*
*-------------------------------------------------------------------------
*/
#define typeid __typeid
extern "C" {
#include "postgres.h"
#include "funcapi.h"
#include "executor/executor.h"
#include "executor/execUtils.h"
#include "utils/elog.h"
#include "utils/builtins.h"
#include "utils/metrics_utils.h"
#include "cdb/cdbvars.h"
#include "cdb/ml_ipc.h"
#include "tcop/utility.h"
#include "stat_statements_parser/pg_stat_statements_ya_parser.h"
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <errno.h>
#include <poll.h>
}
#undef typeid
#include "Config.h"
#include "GpscStat.h"
#include "EventSender.h"
#include "hook_wrappers.h"
#include "memory/gpdbwrappers.h"
static ExecutorStart_hook_type previous_ExecutorStart_hook = nullptr;
static ExecutorRun_hook_type previous_ExecutorRun_hook = nullptr;
static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr;
static ExecutorEnd_hook_type previous_ExecutorEnd_hook = nullptr;
static query_info_collect_hook_type previous_query_info_collect_hook = nullptr;
#ifdef ANALYZE_STATS_COLLECT_HOOK
static analyze_stats_collect_hook_type previous_analyze_stats_collect_hook =
nullptr;
#endif
#ifdef IC_TEARDOWN_HOOK
static ic_teardown_hook_type previous_ic_teardown_hook = nullptr;
#endif
static ProcessUtility_hook_type previous_ProcessUtility_hook = nullptr;
static void gpsc_ExecutorStart_hook(QueryDesc *query_desc, int eflags);
static void gpsc_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction,
uint64 count, bool execute_once);
static void gpsc_ExecutorFinish_hook(QueryDesc *query_desc);
static void gpsc_ExecutorEnd_hook(QueryDesc *query_desc);
static void gpsc_query_info_collect_hook(QueryMetricsStatus status, void *arg);
#ifdef IC_TEARDOWN_HOOK
static void gpsc_ic_teardown_hook(ChunkTransportState *transportStates,
bool hasErrors);
#endif
#ifdef ANALYZE_STATS_COLLECT_HOOK
static void gpsc_analyze_stats_collect_hook(QueryDesc *query_desc);
#endif
static void gpsc_process_utility_hook(PlannedStmt *pstmt, const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest, QueryCompletion *qc);
#define TEST_MAX_CONNECTIONS 4
#define TEST_RCV_BUF_SIZE 8192
#define TEST_POLL_TIMEOUT_MS 200
static int test_server_fd = -1;
static char *test_sock_path = NULL;
static EventSender *sender = nullptr;
static inline EventSender *get_sender() {
if (!sender) {
sender = new EventSender();
}
return sender;
}
template <typename T, typename R, typename... Args>
R cpp_call(T *obj, R (T::*func)(Args...), Args... args) {
try {
return (obj->*func)(args...);
} catch (const std::exception &e) {
ereport(FATAL, (errmsg("Unexpected exception in gpsc %s", e.what())));
}
}
void hooks_init() {
Config::init_gucs();
GpscStat::init();
previous_ExecutorStart_hook = ExecutorStart_hook;
ExecutorStart_hook = gpsc_ExecutorStart_hook;
previous_ExecutorRun_hook = ExecutorRun_hook;
ExecutorRun_hook = gpsc_ExecutorRun_hook;
previous_ExecutorFinish_hook = ExecutorFinish_hook;
ExecutorFinish_hook = gpsc_ExecutorFinish_hook;
previous_ExecutorEnd_hook = ExecutorEnd_hook;
ExecutorEnd_hook = gpsc_ExecutorEnd_hook;
previous_query_info_collect_hook = query_info_collect_hook;
query_info_collect_hook = gpsc_query_info_collect_hook;
#ifdef IC_TEARDOWN_HOOK
previous_ic_teardown_hook = ic_teardown_hook;
ic_teardown_hook = gpsc_ic_teardown_hook;
#endif
#ifdef ANALYZE_STATS_COLLECT_HOOK
previous_analyze_stats_collect_hook = analyze_stats_collect_hook;
analyze_stats_collect_hook = gpsc_analyze_stats_collect_hook;
#endif
stat_statements_parser_init();
previous_ProcessUtility_hook = ProcessUtility_hook;
ProcessUtility_hook = gpsc_process_utility_hook;
}
void hooks_deinit() {
ExecutorStart_hook = previous_ExecutorStart_hook;
ExecutorEnd_hook = previous_ExecutorEnd_hook;
ExecutorRun_hook = previous_ExecutorRun_hook;
ExecutorFinish_hook = previous_ExecutorFinish_hook;
query_info_collect_hook = previous_query_info_collect_hook;
#ifdef IC_TEARDOWN_HOOK
ic_teardown_hook = previous_ic_teardown_hook;
#endif
#ifdef ANALYZE_STATS_COLLECT_HOOK
analyze_stats_collect_hook = previous_analyze_stats_collect_hook;
#endif
stat_statements_parser_deinit();
if (sender) {
delete sender;
}
GpscStat::deinit();
ProcessUtility_hook = previous_ProcessUtility_hook;
}
void gpsc_ExecutorStart_hook(QueryDesc *query_desc, int eflags) {
cpp_call(get_sender(), &EventSender::executor_before_start, query_desc,
eflags);
if (previous_ExecutorStart_hook) {
(*previous_ExecutorStart_hook)(query_desc, eflags);
} else {
standard_ExecutorStart(query_desc, eflags);
}
cpp_call(get_sender(), &EventSender::executor_after_start, query_desc,
eflags);
}
void gpsc_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction,
uint64 count, bool execute_once) {
get_sender()->incr_depth();
PG_TRY();
{
if (previous_ExecutorRun_hook)
previous_ExecutorRun_hook(query_desc, direction, count, execute_once);
else
standard_ExecutorRun(query_desc, direction, count, execute_once);
get_sender()->decr_depth();
}
PG_CATCH();
{
get_sender()->decr_depth();
PG_RE_THROW();
}
PG_END_TRY();
}
void gpsc_ExecutorFinish_hook(QueryDesc *query_desc) {
get_sender()->incr_depth();
PG_TRY();
{
if (previous_ExecutorFinish_hook)
previous_ExecutorFinish_hook(query_desc);
else
standard_ExecutorFinish(query_desc);
get_sender()->decr_depth();
}
PG_CATCH();
{
get_sender()->decr_depth();
PG_RE_THROW();
}
PG_END_TRY();
}
void gpsc_ExecutorEnd_hook(QueryDesc *query_desc) {
cpp_call(get_sender(), &EventSender::executor_end, query_desc);
if (previous_ExecutorEnd_hook) {
(*previous_ExecutorEnd_hook)(query_desc);
} else {
standard_ExecutorEnd(query_desc);
}
}
void gpsc_query_info_collect_hook(QueryMetricsStatus status, void *arg) {
cpp_call(get_sender(), &EventSender::query_metrics_collect, status,
arg /* queryDesc */, false /* utility */, (ErrorData *)NULL);
if (previous_query_info_collect_hook) {
(*previous_query_info_collect_hook)(status, arg);
}
}
#ifdef IC_TEARDOWN_HOOK
void gpsc_ic_teardown_hook(ChunkTransportState *transportStates, bool hasErrors) {
cpp_call(get_sender(), &EventSender::ic_metrics_collect);
if (previous_ic_teardown_hook) {
(*previous_ic_teardown_hook)(transportStates, hasErrors);
}
}
#endif
#ifdef ANALYZE_STATS_COLLECT_HOOK
void gpsc_analyze_stats_collect_hook(QueryDesc *query_desc) {
cpp_call(get_sender(), &EventSender::analyze_stats_collect, query_desc);
if (previous_analyze_stats_collect_hook) {
(*previous_analyze_stats_collect_hook)(query_desc);
}
}
#endif
static void gpsc_process_utility_hook(PlannedStmt *pstmt, const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest, QueryCompletion *qc) {
/* Project utility data on QueryDesc to use existing logic */
QueryDesc *query_desc = (QueryDesc *)palloc0(sizeof(QueryDesc));
query_desc->sourceText = queryString;
cpp_call(get_sender(), &EventSender::query_metrics_collect,
METRICS_QUERY_SUBMIT, (void *)query_desc, true /* utility */,
(ErrorData *)NULL);
get_sender()->incr_depth();
PG_TRY();
{
if (previous_ProcessUtility_hook) {
(*previous_ProcessUtility_hook)(pstmt, queryString, readOnlyTree, context,
params, queryEnv, dest, qc);
} else {
standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, params,
queryEnv, dest, qc);
}
get_sender()->decr_depth();
cpp_call(get_sender(), &EventSender::query_metrics_collect,
METRICS_QUERY_DONE, (void *)query_desc, true /* utility */,
(ErrorData *)NULL);
pfree(query_desc);
}
PG_CATCH();
{
ErrorData *edata;
MemoryContext oldctx;
oldctx = MemoryContextSwitchTo(TopMemoryContext);
edata = CopyErrorData();
FlushErrorState();
MemoryContextSwitchTo(oldctx);
get_sender()->decr_depth();
cpp_call(get_sender(), &EventSender::query_metrics_collect,
METRICS_QUERY_ERROR, (void *)query_desc, true /* utility */,
edata);
pfree(query_desc);
ReThrowError(edata);
}
PG_END_TRY();
}
static void check_stats_loaded() {
if (!GpscStat::loaded()) {
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("gp_stats_collector must be loaded via "
"shared_preload_libraries")));
}
}
void gpsc_functions_reset() {
check_stats_loaded();
GpscStat::reset();
}
Datum gpsc_functions_get(FunctionCallInfo fcinfo) {
const int ATTNUM = 6;
check_stats_loaded();
auto stats = GpscStat::get_stats();
TupleDesc tupdesc = CreateTemplateTupleDesc(ATTNUM);
TupleDescInitEntry(tupdesc, (AttrNumber)1, "segid", INT4OID, -1 /* typmod */,
0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber)2, "total_messages", INT8OID,
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber)3, "send_failures", INT8OID,
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber)4, "connection_failures", INT8OID,
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber)5, "other_errors", INT8OID,
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber)6, "max_message_size", INT4OID,
-1 /* typmod */, 0 /* attdim */);
tupdesc = BlessTupleDesc(tupdesc);
Datum values[ATTNUM];
bool nulls[ATTNUM];
MemSet(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(GpIdentity.segindex);
values[1] = Int64GetDatum(stats.total);
values[2] = Int64GetDatum(stats.failed_sends);
values[3] = Int64GetDatum(stats.failed_connects);
values[4] = Int64GetDatum(stats.failed_other);
values[5] = Int32GetDatum(stats.max_message_size);
HeapTuple tuple = gpdb::heap_form_tuple(tupdesc, values, nulls);
Datum result = HeapTupleGetDatum(tuple);
PG_RETURN_DATUM(result);
}
void test_uds_stop_server() {
if (test_server_fd >= 0) {
close(test_server_fd);
test_server_fd = -1;
}
if (test_sock_path) {
unlink(test_sock_path);
pfree(test_sock_path);
test_sock_path = NULL;
}
}
void test_uds_start_server(const char *path) {
struct sockaddr_un addr = {.sun_family = AF_UNIX};
if (strlen(path) >= sizeof(addr.sun_path))
ereport(ERROR, (errmsg("path too long")));
test_uds_stop_server();
strlcpy(addr.sun_path, path, sizeof(addr.sun_path));
test_sock_path = MemoryContextStrdup(TopMemoryContext, path);
unlink(path);
if ((test_server_fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0 ||
bind(test_server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0 ||
listen(test_server_fd, TEST_MAX_CONNECTIONS) < 0) {
test_uds_stop_server();
ereport(ERROR, (errmsg("socket setup failed: %m")));
}
}
int64 test_uds_receive(int timeout_ms) {
char buf[TEST_RCV_BUF_SIZE];
int rc;
struct pollfd pfd = {.fd = test_server_fd, .events = POLLIN};
int64 total = 0;
if (test_server_fd < 0)
ereport(ERROR, (errmsg("server not started")));
for (;;) {
CHECK_FOR_INTERRUPTS();
rc = poll(&pfd, 1, Min(timeout_ms, TEST_POLL_TIMEOUT_MS));
if (rc > 0)
break;
if (rc < 0 && errno != EINTR)
ereport(ERROR, (errmsg("poll: %m")));
timeout_ms -= TEST_POLL_TIMEOUT_MS;
if (timeout_ms <= 0)
return total;
}
if (pfd.revents & POLLIN) {
int client = accept(test_server_fd, NULL, NULL);
ssize_t n;
if (client < 0)
ereport(ERROR, (errmsg("accept: %m")));
while ((n = recv(client, buf, sizeof(buf), 0)) != 0) {
if (n > 0)
total += n;
else if (errno != EINTR)
break;
}
close(client);
}
return total;
}