blob: 83d2ba11e7db03d8cdbb7f9961d71827400833d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/* ----------
* pg_stat_activity_history_process.c
* ----------
*/
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "postgres.h"
#include <unistd.h>
#include <fcntl.h>
#include <sys/param.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <time.h>
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
#include "pgstat.h"
#include "access/heapam.h"
#include "access/transam.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "catalog/pg_proc.h"
#include "catalog/catquery.h"
#include "libpq/ip.h"
#include "libpq/libpq.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "executor/instrument.h"
#include "pg_trace.h"
#include "postmaster/autovacuum.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "storage/backendid.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
#include "storage/pmsignal.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/rel.h"
#include "utils/tqual.h"
#include "utils/portal.h"
#include "cdb/cdbvars.h"
#include "pg_stat_activity_history_process.h"
#include "utils/timestamp.h"
#include "postgres_ext.h"
#include "libpq/pqcomm.h"
#include "pgtime.h"
#include "postmaster/syslogger.h"
/* ----------
* Timer definitions.
* ----------
*/
#define PGSTATACTIVITYHISTORY_RESTART_INTERVAL 60 /* How often to attempt to restart a
* failed pg_stat_activity_history process; in
* seconds. */
#define PGSTATACTIVITYHISTORY_SELECT_TIMEOUT 2 /* How often to check for postmaster
* death; in seconds. */
#define MAXQUERYLENGTH 5120 /* The maximum length of query we can write into
* pg_stat_activity_history table */
int pgStatActivityHistorySock = -1;
/* ----------
* Local data
* ----------
*/
static struct sockaddr_storage pgStatActivityHistoryAddr;
static time_t last_pgStatActivityHistory_start_time;
static volatile bool need_exit = false;
static volatile bool got_SIGHUP = false;
/* ----------
* Local function forward declarations
* ----------
*/
#ifdef EXEC_BACKEND
static pid_t pgstatactivityhistory_forkexec(void);
#endif
NON_EXEC_STATIC void PgStatActivityHistoryMain(int argc, char *argv[]);
static void pgstatactivityhistory_exit(SIGNAL_ARGS);
static void pgstatactivityhistory_sighup_handler(SIGNAL_ARGS);
static void WriteDataIntoPSAH(FILE *fp, void *data);
/* ------------------------------------------------------------
* Public functions called from postmaster follow
* ------------------------------------------------------------
*/
/* ----------
* pgstatactivityhistory_init() -
*
* Called from postmaster at startup. Create the resources required
* by the pg stat activity history process. If unable to do so, do not
* fail --- better to let the postmaster start with stats collection
* disabled.
* ----------
*/
void
pgstatactivityhistory_init(void)
{
socklen_t alen;
struct addrinfo *addrs = NULL,
*addr,
hints;
int ret;
fd_set rset;
struct timeval tv;
char test_byte;
int sel_res;
int tries = 0;
#define TESTBYTEVAL ((char) 199)
/*
* Create the UDP socket for sending and receiving statistic messages
*/
hints.ai_flags = AI_PASSIVE;
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = 0;
hints.ai_addrlen = 0;
hints.ai_addr = NULL;
hints.ai_canonname = NULL;
hints.ai_next = NULL;
ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
if (ret || !addrs)
{
ereport(LOG,
(errmsg("could not resolve \"localhost\": %s",
gai_strerror(ret))));
goto startup_failed;
}
/*
* On some platforms, pg_getaddrinfo_all() may return multiple addresses
* only one of which will actually work (eg, both IPv6 and IPv4 addresses
* when kernel will reject IPv6). Worse, the failure may occur at the
* bind() or perhaps even connect() stage. So we must loop through the
* results till we find a working combination. We will generate LOG
* messages, but no error, for bogus combinations.
*/
for (addr = addrs; addr; addr = addr->ai_next)
{
#ifdef HAVE_UNIX_SOCKETS
/* Ignore AF_UNIX sockets, if any are returned. */
if (addr->ai_family == AF_UNIX)
continue;
#endif
if (++tries > 1)
ereport(LOG,
(errmsg("trying another address for history logger process")));
/*
* Create the socket.
*/
if ((pgStatActivityHistorySock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not create socket for history logger process: %m")));
continue;
}
/*
* Bind it to a kernel assigned port on localhost and get the assigned
* port via getsockname().
*/
if (bind(pgStatActivityHistorySock, addr->ai_addr, addr->ai_addrlen) < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not bind socket for history logger process: %m")));
closesocket(pgStatActivityHistorySock);
pgStatActivityHistorySock = -1;
continue;
}
alen = sizeof(pgStatActivityHistoryAddr);
if (getsockname(pgStatActivityHistorySock, (struct sockaddr *) & pgStatActivityHistoryAddr, &alen) < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not get address of socket for history logger process: %m")));
closesocket(pgStatActivityHistorySock);
pgStatActivityHistorySock = -1;
continue;
}
/*
* Connect the socket to its own address. This saves a few cycles by
* not having to respecify the target address on every send. This also
* provides a kernel-level check that only packets from this same
* address will be received.
*/
if (connect(pgStatActivityHistorySock, (struct sockaddr *) & pgStatActivityHistoryAddr, alen) < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not connect socket for history logger process: %m")));
closesocket(pgStatActivityHistorySock);
pgStatActivityHistorySock = -1;
continue;
}
/*
* Try to send and receive a one-byte test message on the socket. This
* is to catch situations where the socket can be created but will not
* actually pass data (for instance, because kernel packet filtering
* rules prevent it).
*/
test_byte = TESTBYTEVAL;
retry1:
if (send(pgStatActivityHistorySock, &test_byte, 1, 0) != 1)
{
if (errno == EINTR)
goto retry1; /* if interrupted, just retry */
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not send test message on socket for history logger process: %m")));
closesocket(pgStatActivityHistorySock);
pgStatActivityHistorySock = -1;
continue;
}
/*
* There could possibly be a little delay before the message can be
* received. We arbitrarily allow up to half a second before deciding
* it's broken.
*/
for (;;) /* need a loop to handle EINTR */
{
FD_ZERO(&rset);
FD_SET (pgStatActivityHistorySock, &rset);
tv.tv_sec = 0;
tv.tv_usec = 500000;
sel_res = select(pgStatActivityHistorySock + 1, &rset, NULL, NULL, &tv);
if (sel_res >= 0 || errno != EINTR)
break;
}
if (sel_res < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("select() failed in history logger process: %m")));
closesocket(pgStatActivityHistorySock);
pgStatActivityHistorySock = -1;
continue;
}
if (sel_res == 0 || !FD_ISSET(pgStatActivityHistorySock, &rset))
{
/*
* This is the case we actually think is likely, so take pains to
* give a specific message for it.
*
* errno will not be set meaningfully here, so don't use it.
*/
ereport(LOG,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("test message did not get through on socket for history logger process")));
closesocket(pgStatActivityHistorySock);
pgStatActivityHistorySock = -1;
continue;
}
test_byte++; /* just make sure variable is changed */
retry2:
if (recv(pgStatActivityHistorySock, &test_byte, 1, 0) != 1)
{
if (errno == EINTR)
goto retry2; /* if interrupted, just retry */
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not receive test message on socket for history logger process: %m")));
closesocket(pgStatActivityHistorySock);
pgStatActivityHistorySock = -1;
continue;
}
if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
{
ereport(LOG,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("incorrect test message transmission on socket for history logger process")));
closesocket(pgStatActivityHistorySock);
pgStatActivityHistorySock = -1;
continue;
}
/* If we get here, we have a working socket */
break;
}
/* Did we find a working address? */
if (!addr || pgStatActivityHistorySock < 0)
goto startup_failed;
/*
* Set the socket to non-blocking IO. This ensures that if the collector
* falls behind, statistics messages will be discarded; backends won't
* block waiting to send messages to the collector.
*/
if (!pg_set_noblock(pgStatActivityHistorySock))
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not set history logger process socket to nonblocking mode: %m")));
goto startup_failed;
}
pg_freeaddrinfo_all(hints.ai_family, addrs);
return;
startup_failed:
ereport(LOG,
(errmsg("disabling history logger process for lack of working socket")));
if (addrs)
pg_freeaddrinfo_all(hints.ai_family, addrs);
if (pgStatActivityHistorySock >= 0)
closesocket(pgStatActivityHistorySock);
pgStatActivityHistorySock = -1;
}
#ifdef EXEC_BACKEND
/*
* pgstatactivityhistory_forkexec() -
*
* Format up the arglist for, then fork and exec, pg_stat_activity_history process
*/
static pid_t
pgstatactivityhistory_forkexec(void)
{
char *av[10];
int ac = 0;
av[ac++] = "postgres";
av[ac++] = "--forkcol";
av[ac++] = NULL; /* filled in by postmaster_forkexec */
av[ac] = NULL;
Assert(ac < lengthof(av));
return postmaster_forkexec(ac, av);
}
#endif /* EXEC_BACKEND */
/*
* pgstatactivityhistory_start() -
*
* Called from postmaster at startup or after an existing collector
* died. Attempt to write query information in pg_stat_activity_history
*
* Returns PID of child process, or 0 if fail.
*
* Note: if fail, we will be called again from the postmaster main loop.
*/
int
pgstatactivityhistory_start(void)
{
time_t curtime;
pid_t pgStatActivityHistroyPID;
/*
* Check that the socket is there, else pgstatactivityhistory_init failed and we can do
* nothing useful.
*/
if (pgStatActivityHistorySock < 0)
return 0;
/*
* Do nothing if too soon since last collector start. This is a safety
* valve to protect against continuous respawn attempts if the collector
* is dying immediately at launch. Note that since we will be re-called
* from the postmaster main loop, we will get another chance later.
*/
curtime = time(NULL);
if ((unsigned int) (curtime - last_pgStatActivityHistory_start_time) <
(unsigned int) PGSTATACTIVITYHISTORY_RESTART_INTERVAL)
return 0;
last_pgStatActivityHistory_start_time = curtime;
/*
* Okay, fork off pg_stat_activity_history process.
*/
#ifdef EXEC_BACKEND
switch ((pgStatActivityHistroyPID = pgstatactivityhistory_forkexec()))
#else
switch ((pgStatActivityHistroyPID = fork_process()))
#endif
{
case -1:
ereport(LOG,
(errmsg("could not fork history logger process: %m")));
return 0;
#ifndef EXEC_BACKEND
case 0:
/* in postmaster child ... */
/* Close the postmaster's sockets */
ClosePostmasterPorts(false);
/* Lose the postmaster's on-exit routines */
on_exit_reset();
/* Drop our connection to postmaster's shared memory, as well */
PGSharedMemoryDetach();
PgStatActivityHistoryMain(0, NULL);
break;
#endif
default:
return (int) pgStatActivityHistroyPID;
}
/* shouldn't get here */
return 0;
}
void
allow_immediate_pgStatActivityHistory_restart(void)
{
last_pgStatActivityHistory_start_time = 0;
}
/* ----------
* PgstatCollectorMain() -
*
* Start up the statistics collector process. This is the body of the
* postmaster child process.
*
* The argc/argv parameters are valid only in EXEC_BACKEND case.
* ----------
*/
NON_EXEC_STATIC void
PgStatActivityHistoryMain(int argc, char *argv[])
{
int len;
void *msg = palloc(sizeof(queryHistoryInfo) + MAXQUERYLENGTH);
#ifndef WIN32
#ifdef HAVE_POLL
struct pollfd input_fd;
#else
struct timeval sel_timeout;
fd_set rfds;
#endif
#endif
IsUnderPostmaster = true; /* we are a postmaster subprocess now */
MyProcPid = getpid(); /* reset MyProcPid */
MyStartTime = time(NULL); /* record Start Time for logging */
/*
* If possible, make this process a group leader, so that the postmaster
* can signal any child processes too. (pgstat probably never has any
* child processes, but for consistency we make all postmaster child
* processes do this.)
*/
#ifdef HAVE_SETSID
if (setsid() < 0)
elog(FATAL, "setsid() failed: %m");
#endif
/*
* Ignore all signals usually bound to some action in the postmaster,
* except SIGQUIT.
*/
pqsignal(SIGHUP, pgstatactivityhistory_sighup_handler);
pqsignal(SIGINT, SIG_IGN);
pqsignal(SIGTERM, SIG_IGN);
pqsignal(SIGQUIT, pgstatactivityhistory_exit);
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, SIG_IGN);
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGCHLD, SIG_DFL);
pqsignal(SIGTTIN, SIG_DFL);
pqsignal(SIGTTOU, SIG_DFL);
pqsignal(SIGCONT, SIG_DFL);
pqsignal(SIGWINCH, SIG_DFL);
PG_SETMASK(&UnBlockSig);
/*
* Identify myself via ps
*/
init_ps_display("history logger process", "", "", "");
/*
* Setup the descriptor set for select(2). Since only one bit in the set
* ever changes, we need not repeat FD_ZERO each time.
*/
#if !defined(HAVE_POLL) && !defined(WIN32)
FD_ZERO(&rfds);
#endif
/*
* open pg_stat_activity_history txt file
*/
struct pg_tm *tm;
pg_time_t timestamp = time(NULL);
tm = pg_localtime(&timestamp, log_timezone);
FILE *fp = NULL;
char filestr[1024];
memset(filestr, '\0', 1024);
int res = snprintf(filestr, 1024, "%s/%s/hawq-%d-%.2d-%.2d-%.2d%.2d%.2d.history",
DataDir, Log_directory, (tm->tm_year+1900), tm->tm_mon + 1, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec);
if (res < 0)
elog(WARNING, "hawq history file's path length is bigger than 1024! The array need expanded.\n");
if((fp = fopen(filestr, "a")) == NULL)
elog(ERROR, "%s cannot be opened!\n", filestr);
/*
* Loop to process messages until we get SIGQUIT or detect ungraceful
* death of our parent postmaster.
*
* For performance reasons, we don't want to do a PostmasterIsAlive() test
* after every message; instead, do it only when select()/poll() is
* interrupted by timeout. In essence, we'll stay alive as long as
* backends keep sending us stuff often, even if the postmaster is gone.
*/
for (;;)
{
int got_data;
/*
* Quit if we get SIGQUIT from the postmaster.
*/
if (need_exit)
break;
/*
* Reload configuration if we got SIGHUP from the postmaster.
*/
if (got_SIGHUP)
{
ProcessConfigFile(PGC_SIGHUP);
got_SIGHUP = false;
}
/*
* Wait for a message to arrive; but not for more than
* PGSTAT_SELECT_TIMEOUT seconds. (This determines how quickly we will
* shut down after an ungraceful postmaster termination; so it needn't
* be very fast. However, on some systems SIGQUIT won't interrupt the
* poll/select call, so this also limits speed of response to SIGQUIT,
* which is more important.)
*
* We use poll(2) if available, otherwise select(2). Win32 has its own
* implementation.
*/
#ifndef WIN32
#ifdef HAVE_POLL
input_fd.fd = pgStatActivityHistorySock;
input_fd.events = POLLIN | POLLERR;
input_fd.revents = 0;
if (poll(&input_fd, 1, PGSTATACTIVITYHISTORY_SELECT_TIMEOUT * 1000) < 0)
{
if (errno == EINTR)
continue;
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("poll() failed in history logger process: %m")));
}
got_data = (input_fd.revents != 0);
#else /* !HAVE_POLL */
FD_SET(pgStatActivityHistorySock, &rfds);
/*
* timeout struct is modified by select() on some operating systems,
* so re-fill it each time.
*/
sel_timeout.tv_sec = PGSTATACTIVITYHISTORY_SELECT_TIMEOUT;
sel_timeout.tv_usec = 0;
if (select(pgStatActivityHistorySock + 1, &rfds, NULL, NULL, &sel_timeout) < 0)
{
if (errno == EINTR)
continue;
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("select() failed in history logger process: %m")));
}
got_data = FD_ISSET(pgStatActivityHistorySock, &rfds);
#endif /* HAVE_POLL */
#else /* WIN32 */
got_data = pgwin32_waitforsinglesocket(pgStatActivityHistorySock, FD_READ,
PGSTATACTIVITYHISTORY_SELECT_TIMEOUT*1000);
#endif
/*
* If there is a message on the socket, read it and check for
* validity.
*/
if (got_data)
{
len = recv(pgStatActivityHistorySock, (char *) msg,
sizeof(queryHistoryInfo) + MAXQUERYLENGTH, 0);
if (len < 0)
{
if (errno == EINTR)
continue;
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("could not read statistics message: %m")));
}
if (Gp_role == GP_ROLE_DISPATCH)
{
WriteDataIntoPSAH(fp, (void *)msg);
fflush(fp);
}
}
else
{
/*
* We can only get here if the select/poll timeout elapsed. Check
* for postmaster death.
*/
if (!PostmasterIsAlive(true))
break;
}
} /* end of message-processing loop */
fclose(fp);
pfree(msg);
exit(0);
}
/* SIGQUIT signal handler for pg_stat_activity_history process */
static void
pgstatactivityhistory_exit(SIGNAL_ARGS)
{
need_exit = true;
}
/* SIGHUP handler for pg_stat_activity_history process */
static void
pgstatactivityhistory_sighup_handler(SIGNAL_ARGS)
{
got_SIGHUP = true;
}
/*
* checkQuery()
*
* We don't write BEGIN/ABORT/COMMIT query into history table
*/
static bool isQueryValid(const char *query)
{
const char *str1 = "BEGIN";
const char *str2 = "ABORT";
const char *str3 = "COMMIT";
if (pg_strncasecmp(query, str1, 5) == 0 || pg_strncasecmp(query, str2, 5) == 0 || pg_strncasecmp(query, str3, 6) == 0)
return false;
return true;
}
/*
* pgStatActivityHistory_send()
*
* Send query information to pg_stat_activity_history process
*/
void pgStatActivityHistory_send(Oid databaseId, Oid userId, int processId,
Oid sessionId, const char *creation_time, const char *end_time,
struct Port *tmpProcPort, char *application_name, double cpuUsage,
uint32_t memoryUsage, int status, char *errorInfo,const char *query)
{
// check if current query is valid
if (!isQueryValid(query))
return ;
void *data;
int copy_len = 0, i, j, send_size = 0;
const char *longQuery = "Query is too long!";
queryHistoryInfo *qhi;
if (pgStatActivityHistorySock == PGINVALID_SOCKET)
return;
if (MAXQUERYLENGTH - 1 < strlen(query))
{
send_size = sizeof(queryHistoryInfo) + strlen(longQuery);
data = palloc(send_size);
qhi = (queryHistoryInfo*)data;
qhi->queryLen = strlen(longQuery);
memcpy((char *)data + sizeof(queryHistoryInfo), longQuery, strlen(longQuery));
}
else
{
send_size = sizeof(queryHistoryInfo) + strlen(query);
data = palloc(send_size);
qhi = (queryHistoryInfo*)data;
qhi->queryLen = strlen(query);
memcpy((char *)data + sizeof(queryHistoryInfo), query, strlen(query));
}
qhi->databaseId = databaseId;
char* database_name = caql_getcstring(
NULL,
cql("SELECT datname FROM pg_database "
" WHERE oid = :1 ",
ObjectIdGetDatum(databaseId)));
memset(qhi->database_name, '\0', MAXDATABASENAME);
if (MAXDATABASENAME - 1 < strlen(database_name))
{
elog(WARNING, "The database name array size is too small!"
"(database name size : %lu)\n", strlen(database_name));
copy_len = MAXDATABASENAME - 1;
}
else
copy_len = strlen(database_name);
strncpy(qhi->database_name, database_name, copy_len);
qhi->userId = userId;
int fetchCount;
char* user_name = caql_getcstring_plus(
NULL,
&fetchCount,
NULL,
cql("SELECT rolname FROM pg_authid WHERE oid = :1 ",
ObjectIdGetDatum(userId)));
memset(qhi->user_name, '\0', MAXUSERNAME);
if (MAXUSERNAME - 1 < strlen(user_name))
{
elog(WARNING, "The user name array size is too small!"
"(user name size : %lu)\n", strlen(user_name));
copy_len = MAXUSERNAME - 1;
}
else
copy_len = strlen(user_name);
strncpy(qhi->user_name, user_name, copy_len);
qhi->processId = processId;
qhi->sessionId = sessionId;
memset(qhi->creation_time, '\0', MAXTIMELENGTH);
if (MAXTIMELENGTH - 1 < strlen(creation_time))
{
elog(WARNING, "The creation_time array size is too small!"
"(creation_time size : %lu)\n", strlen(creation_time));
copy_len = MAXTIMELENGTH - 1;
}
else
copy_len = strlen(creation_time);
strncpy(qhi->creation_time, creation_time, copy_len);
memset(qhi->end_time, '\0', MAXTIMELENGTH);
if (MAXTIMELENGTH - 1 < strlen(end_time))
{
elog(WARNING, "The end_time array size is too small!"
"(end_time size : %lu)\n", strlen(end_time));
copy_len = MAXTIMELENGTH - 1;
}
else
copy_len = strlen(end_time);
strncpy(qhi->end_time, end_time, copy_len);
memset(qhi->client_addr, '\0', MAXCLIENTADDRLENGTH);
SockAddr clientaddr;
if (tmpProcPort)
{
memcpy(&clientaddr, &tmpProcPort->raddr, sizeof(clientaddr));
}
else
{
MemSet(&clientaddr, 0, sizeof(clientaddr));
}
SockAddr zero_clientaddr;
memset(&zero_clientaddr, 0, sizeof(zero_clientaddr));
if (memcmp(&(clientaddr), &zero_clientaddr,
(size_t) (sizeof(zero_clientaddr) == 0)))
{
qhi->client_port = -2;
}
else
{
if (clientaddr.addr.ss_family == AF_INET
#ifdef HAVE_IPV6
|| clientaddr.addr.ss_family == AF_INET6
#endif
)
{
char remote_host[NI_MAXHOST];
char remote_port[NI_MAXSERV];
int ret;
remote_host[0] = '\0';
remote_port[0] = '\0';
ret = pg_getnameinfo_all(&clientaddr.addr,
clientaddr.salen,
remote_host, sizeof(remote_host),
remote_port, sizeof(remote_port),
NI_NUMERICHOST | NI_NUMERICSERV);
if (ret)
{
qhi->client_port = -2;
}
else
{
clean_ipv6_addr(clientaddr.addr.ss_family, remote_host);
if (MAXCLIENTADDRLENGTH - 1 < strlen(remote_host))
{
elog(WARNING, "The application name array size is too small!"
"(application name size : %lu)\n", strlen(remote_host));
copy_len = MAXCLIENTADDRLENGTH - 1;
}
else
copy_len = strlen(remote_host);
strncpy(qhi->client_addr, remote_host, copy_len);
qhi->client_port = atoi(remote_port);
}
}
else if (clientaddr.addr.ss_family == AF_UNIX)
{
qhi->client_port = -1;
}
else
{
qhi->client_port = -2;
}
}
memset(qhi->application_name, '\0', MAXAPPNAMELENGTH);
if (MAXAPPNAMELENGTH - 1 < strlen(application_name))
{
elog(WARNING, "The application name array size is too small!"
"(application name size : %lu)\n", strlen(application_name));
copy_len = MAXAPPNAMELENGTH - 1;
}
else
copy_len = strlen(application_name);
strncpy(qhi->application_name, application_name, copy_len);
qhi->cpuUsage = cpuUsage;
qhi->memoryUsage = memoryUsage;
memset(qhi->status, '\0', MAXSTATUSLENGTH);
if (status == 1)
strncpy(qhi->status, "success", 7);
else if (status == -1)
strncpy(qhi->status, "failed", 6);
else
strncpy(qhi->status, "waiting", 7);
memset(qhi->errorInfo, '\0', MAXERRORINFOLENGTH);
for (i = 0, j = 0; i < MAXERRORINFOLENGTH - 1 && j < strlen(errorInfo); ++i, ++j)
{
if (errorInfo[j] == '"')
qhi->errorInfo[i++] = '"';
qhi->errorInfo[i] = errorInfo[j];
}
if (j < strlen(errorInfo))
elog(WARNING, "The error information array size is too small!"
"(error information size : %lu)\n", strlen(errorInfo));
int rc;
/* We'll retry after EINTR, but ignore all other failures */
do
{
rc = send(pgStatActivityHistorySock, (void *)data, send_size, 0);
} while (rc < 0 && errno == EINTR);
pfree(data);
is_qtype_sql = false;
#ifdef USE_ASSERT_CHECKING
/* In debug builds, log send failures ... */
if (rc < 0)
elog(LOG, "could not send to history logger process: %m");
#endif
}
static void
WriteDataIntoPSAH(FILE *fp, void *data)
{
int i, j;
queryHistoryInfo *qhi = (queryHistoryInfo *)data;
char *recv_query = (char *)data + sizeof(queryHistoryInfo);
char *final_query = palloc(2*(qhi->queryLen));
memset(final_query, '\0', 2*(qhi->queryLen));
for (i = 0, j = 0; j < qhi->queryLen; ++i, ++j)
{
if (recv_query[j] == '"')
final_query[i++] = '"';
final_query[i] = recv_query[j];
}
if (qhi->client_port != -2)
fprintf(fp,"%d,\"%s\",%d,\"%s\",%d,%d,\"%s\",\"%s\",\"%s\",%d,\"%s\",\"%.3f vcore\",\"%d MB\",\"%s\",\"%s\",\"%s\"\n",
qhi->databaseId, qhi->database_name, qhi->userId, qhi->user_name, qhi->processId, qhi->sessionId,
qhi->creation_time, qhi->end_time, qhi->client_addr, qhi->client_port, qhi->application_name,
qhi->cpuUsage, qhi->memoryUsage, qhi->status, qhi->errorInfo, final_query);
else
fprintf(fp,"%d,\"%s\",%d,\"%s\",%d,%d,\"%s\",\"%s\",\"%s\",,\"%s\",\"%.3f vcore\",\"%d MB\",\"%s\",\"%s\",\"%s\"\n",
qhi->databaseId, qhi->database_name, qhi->userId, qhi->user_name, qhi->processId, qhi->sessionId,
qhi->creation_time, qhi->end_time, qhi->client_addr, qhi->application_name,
qhi->cpuUsage, qhi->memoryUsage, qhi->status, qhi->errorInfo, final_query);
pfree(final_query);
}