blob: e7e1633402c8a9bb7a2fde05b5b6584aabaaa48b [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* src/pg_cron.c
*
* Implementation of the pg_cron task scheduler.
* Wording:
* - A job is a scheduling definition of a task
* - A task is what is actually executed within the database engine
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include <sys/resource.h>
#include "fmgr.h"
#include "pgstat.h"
#include "postgres.h"
/* these are always necessary for a bgworker */
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "storage/shmem.h"
/* these headers are used by this particular worker's code */
#define MAIN_PROGRAM
#ifdef HAVE_POLL_H
#include <poll.h>
#elif defined(HAVE_SYS_POLL_H)
#include <sys/poll.h>
#endif
#include "sys/time.h"
#include "time.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/printtup.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/pg_extension.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "cdb/cdbvars.h"
#include "commands/async.h"
#include "commands/dbcommands.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "commands/trigger.h"
#if (PG_VERSION_NUM >= 160000)
#include "utils/guc_hooks.h"
#else
#include "commands/variable.h"
#endif
#include "lib/stringinfo.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "nodes/nodes.h"
#include "parser/analyze.h"
#include "postmaster/postmaster.h"
#include "task/pg_cron.h"
#include "task/task_states.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
#include "utils/varlena.h"
#ifndef MAXINT8LEN
#define MAXINT8LEN 20
#endif
/* Table-of-contents constants for our dynamic shared memory segment. */
#define PG_CRON_MAGIC 0x51028080
#define PG_CRON_KEY_DATABASE 0
#define PG_CRON_KEY_USERNAME 1
#define PG_CRON_KEY_COMMAND 2
#define PG_CRON_KEY_QUEUE 3
#define PG_CRON_NKEYS 4
/* ways in which the clock can change between main loop iterations */
typedef enum
{
CLOCK_JUMP_BACKWARD = 0,
CLOCK_PROGRESSED = 1,
CLOCK_JUMP_FORWARD = 2,
CLOCK_CHANGE = 3
} ClockProgress;
static void pg_cron_sigterm(SIGNAL_ARGS);
static void pg_cron_sighup(SIGNAL_ARGS);
static void StartAllPendingRuns(List *taskList, TimestampTz currentTime);
static void StartPendingRuns(CronTask *task, ClockProgress clockProgress,
TimestampTz lastMinute, TimestampTz currentTime);
static int MinutesPassed(TimestampTz startTime, TimestampTz stopTime);
static TimestampTz TimestampMinuteStart(TimestampTz time);
static TimestampTz TimestampMinuteEnd(TimestampTz time);
static bool ShouldRunTask(entry *schedule, TimestampTz currentMinute,
bool doWild, bool doNonWild);
static void WaitForCronTasks(List *taskList);
static void WaitForLatch(int timeoutMs);
static void PollForTasks(List *taskList);
static bool CanStartTask(CronTask *task);
static void ManageCronTasks(List *taskList, TimestampTz currentTime);
static void ManageCronTask(CronTask *task, TimestampTz currentTime);
static void ExecuteSqlString(const char *sql);
static void GetTaskFeedback(PGresult *result, CronTask *task);
static void ProcessBgwTaskFeedback(CronTask *task, bool running);
static bool jobCanceled(CronTask *task);
static bool jobStartupTimeout(CronTask *task, TimestampTz currentTime);
static char* pg_cron_cmdTuples(char *msg);
static void bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata);
/* GUC settings */
bool task_log_statement = true;
bool task_log_run = true;
bool task_use_background_worker = false;
char *task_timezone = "GMT";
int max_running_tasks = 5;
char *task_host_addr = "127.0.0.1";
static pg_tz *task_timezone_tz = NULL;
/* flags set by signal handlers */
static volatile sig_atomic_t got_sigterm = false;
/* global variables */
static int CronTaskStartTimeout = 10000; /* maximum connection time */
static const int MaxWait = 1000; /* maximum time in ms that poll() can block */
static bool RebootJobsScheduled = false;
static int RunningTaskCount = 0;
static char *CronTableDatabaseName = "postgres";
static PgCronData *PgCron = NULL;
/*
* Signal handler for SIGTERM
* Set a flag to let the main loop to terminate, and set our latch to wake it up.
*/
static void
pg_cron_sigterm(SIGNAL_ARGS)
{
got_sigterm = true;
if (MyProc != NULL)
{
SetLatch(&MyProc->procLatch);
}
}
/*
* Signal handler for SIGHUP
* Set a flag to tell the main loop to reload the cron jobs.
*/
static void
pg_cron_sighup(SIGNAL_ARGS)
{
CronJobCacheValid = false;
if (MyProc != NULL)
{
SetLatch(&MyProc->procLatch);
}
}
/*
* pg_cron_cmdTuples -
* mainly copy/pasted from PQcmdTuples
* If the last command was INSERT/UPDATE/DELETE/MOVE/FETCH/COPY, return
* a string containing the number of inserted/affected tuples. If not, return "".
* XXX: this should probably return an int
*/
static char *
pg_cron_cmdTuples(char *msg)
{
char *p,
*c;
if (!msg)
return "";
if (strncmp(msg, "INSERT ", 7) == 0)
{
p = msg + 7;
/* INSERT: skip oid and space */
while (*p && *p != ' ')
p++;
if (*p == 0)
goto interpret_error; /* no space? */
p++;
}
else if (strncmp(msg, "SELECT ", 7) == 0 ||
strncmp(msg, "DELETE ", 7) == 0 ||
strncmp(msg, "UPDATE ", 7) == 0)
p = msg + 7;
else if (strncmp(msg, "FETCH ", 6) == 0)
p = msg + 6;
else if (strncmp(msg, "MOVE ", 5) == 0 ||
strncmp(msg, "COPY ", 5) == 0)
p = msg + 5;
else
return "";
/* check that we have an integer (at least one digit, nothing else) */
for (c = p; *c; c++)
{
if (!isdigit((unsigned char) *c))
goto interpret_error;
}
if (c == p)
goto interpret_error;
return p;
interpret_error:
ereport(LOG, (errmsg("could not interpret result from server: %s", msg)));
return "";
}
/*
* bgw_generate_returned_message -
* generates the message to be inserted into the job_run_details table
* first part is comming from error_severity (elog.c)
*/
static void
bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata)
{
const char *prefix;
switch (edata.elevel)
{
case DEBUG1:
case DEBUG2:
case DEBUG3:
case DEBUG4:
case DEBUG5:
prefix = gettext_noop("DEBUG");
break;
case LOG:
case LOG_SERVER_ONLY:
prefix = gettext_noop("LOG");
break;
case INFO:
prefix = gettext_noop("INFO");
break;
case NOTICE:
prefix = gettext_noop("NOTICE");
break;
case WARNING:
prefix = gettext_noop("WARNING");
break;
case ERROR:
prefix = gettext_noop("ERROR");
break;
case FATAL:
prefix = gettext_noop("FATAL");
break;
case PANIC:
prefix = gettext_noop("PANIC");
break;
default:
prefix = "???";
break;
}
appendStringInfo(display_msg, "%s: %s", prefix, edata.message);
if (edata.detail != NULL)
appendStringInfo(display_msg, "\nDETAIL: %s", edata.detail);
if (edata.hint != NULL)
appendStringInfo(display_msg, "\nHINT: %s", edata.hint);
if (edata.context != NULL)
appendStringInfo(display_msg, "\nCONTEXT: %s", edata.context);
}
void
assign_task_timezone(const char *newval, void *extra)
{
task_timezone_tz = *((pg_tz **) extra);
}
const char *show_task_timezone(void)
{
const char *tzn;
/* Always show the zone's canonical name */
tzn = pg_get_timezone_name(task_timezone_tz);
if (tzn != NULL)
return tzn;
return "unknown";
}
bool PgCronStartRule(Datum main_arg)
{
return (Gp_role == GP_ROLE_DISPATCH);
}
pid_t
PgCronLauncherPID(void)
{
return PgCron->cron_pid;
}
Size PgCronLauncherShmemSize(void)
{
Size size = 0;
size = add_size(size, sizeof(PgCronData));
return size;
}
/* Allocate and initialize pg cron related shared memory */
void
PgCronLauncherShmemInit(void)
{
bool found;
PgCron = (PgCronData *)
ShmemInitStruct("Cron Data", PgCronLauncherShmemSize(), &found);
if (!found)
{
/* First time through, so initialize */
MemSet(PgCron, 0, PgCronLauncherShmemSize());
PgCron->cron_pid = 0;
}
}
/*
* PgCronLauncherMain is the main entry-point for the background worker
* that performs tasks.
*/
void
PgCronLauncherMain(Datum arg)
{
PgCron->cron_pid = MyProcPid;
MemoryContext CronLoopContext = NULL;
/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, pg_cron_sighup);
pqsignal(SIGINT, SIG_IGN);
pqsignal(SIGTERM, pg_cron_sigterm);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Connect to our database */
BackgroundWorkerInitializeConnection(CronTableDatabaseName, NULL, 0);
/* Make pg_cron recognisable in pg_stat_activity */
pgstat_report_appname("pg_cron scheduler");
/*
* Mark anything that was in progress before the database restarted as
* failed.
*/
MarkPendingRunsAsFailed();
CronLoopContext = AllocSetContextCreate(CurrentMemoryContext,
"pg_cron loop context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
InitializeJobMetadataCache();
InitializeTaskStateHash();
ereport(LOG, (errmsg("pg_cron scheduler started")));
MemoryContextSwitchTo(CronLoopContext);
while (!got_sigterm)
{
List *taskList = NIL;
TimestampTz currentTime = 0;
AcceptInvalidationMessages();
if (!CronJobCacheValid)
{
RefreshTaskHash();
}
taskList = CurrentTaskList();
currentTime = GetCurrentTimestamp();
StartAllPendingRuns(taskList, currentTime);
WaitForCronTasks(taskList);
ManageCronTasks(taskList, currentTime);
MemoryContextReset(CronLoopContext);
}
ereport(LOG, (errmsg("pg_cron scheduler shutting down")));
proc_exit(0);
}
/*
* StartPendingRuns goes through the list of tasks and kicks of
* runs for tasks that should start, taking clock changes into
* into consideration.
*/
static void
StartAllPendingRuns(List *taskList, TimestampTz currentTime)
{
static TimestampTz lastMinute = 0;
int minutesPassed = 0;
ListCell *taskCell = NULL;
ClockProgress clockProgress;
if (!RebootJobsScheduled)
{
/* find jobs with @reboot as a schedule */
foreach(taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);
CronJob *cronJob = GetCronJob(task->jobId);
entry *schedule = &cronJob->schedule;
if (schedule->flags & WHEN_REBOOT &&
task->isActive)
{
task->pendingRunCount += 1;
}
}
RebootJobsScheduled = true;
}
foreach(taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);
if (task->secondsInterval > 0 && task->isActive)
{
/*
* For interval jobs, if a task takes longer than the interval,
* we only queue up once. So if a task that is supposed to run
* every 30 seconds takes 5 minutes, we start another run
* immediately after 5 minutes, but then return to regular cadence.
*/
if (task->pendingRunCount == 0 &&
TimestampDifferenceExceeds(task->lastStartTime, currentTime,
task->secondsInterval * 1000))
{
task->pendingRunCount += 1;
}
}
}
if (lastMinute == 0)
{
lastMinute = TimestampMinuteStart(currentTime);
}
minutesPassed = MinutesPassed(lastMinute, currentTime);
if (minutesPassed == 0)
{
/* wait for new minute */
return;
}
/* use Vixie cron logic for clock jumps */
if (minutesPassed > (3*MINUTE_COUNT))
{
/* clock jumped forward by more than 3 hours */
clockProgress = CLOCK_CHANGE;
}
else if (minutesPassed > 5)
{
/* clock went forward by more than 5 minutes (DST?) */
clockProgress = CLOCK_JUMP_FORWARD;
}
else if (minutesPassed > 0)
{
/* clock went forward by 1-5 minutes */
clockProgress = CLOCK_PROGRESSED;
}
else if (minutesPassed > -(3*MINUTE_COUNT))
{
/* clock jumped backwards by less than 3 hours (DST?) */
clockProgress = CLOCK_JUMP_BACKWARD;
}
else
{
/* clock jumped backwards 3 hours or more */
clockProgress = CLOCK_CHANGE;
}
foreach(taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);
if (!task->isActive)
{
/*
* The job has been unscheduled, so we should not schedule
* new runs. The task will be safely removed on the next call
* to ManageCronTask.
*/
continue;
}
StartPendingRuns(task, clockProgress, lastMinute, currentTime);
}
/*
* If the clock jump backwards then we avoid repeating the fixed-time
* tasks by preserving the last minute from before the clock jump,
* until the clock has caught up (clockProgress will be
* CLOCK_JUMP_BACKWARD until then).
*/
if (clockProgress != CLOCK_JUMP_BACKWARD)
{
lastMinute = TimestampMinuteStart(currentTime);
}
}
/*
* StartPendingRuns kicks off pending runs for a task if it
* should start, taking clock changes into consideration.
*/
static void
StartPendingRuns(CronTask *task, ClockProgress clockProgress,
TimestampTz lastMinute, TimestampTz currentTime)
{
CronJob *cronJob = GetCronJob(task->jobId);
entry *schedule = &cronJob->schedule;
TimestampTz virtualTime = lastMinute;
TimestampTz currentMinute = TimestampMinuteStart(currentTime);
switch (clockProgress)
{
case CLOCK_PROGRESSED:
{
/*
* case 1: minutesPassed is a small positive number
* run jobs for each virtual minute until caught up.
*/
do
{
virtualTime = TimestampTzPlusMilliseconds(virtualTime,
60*1000);
if (ShouldRunTask(schedule, virtualTime, true, true))
{
task->pendingRunCount += 1;
}
}
while (virtualTime < currentMinute);
break;
}
case CLOCK_JUMP_FORWARD:
{
/*
* case 2: minutesPassed is a medium-sized positive number,
* for example because we went to DST run wildcard
* jobs once, then run any fixed-time jobs that would
* otherwise be skipped if we use up our minute
* (possible, if there are a lot of jobs to run) go
* around the loop again so that wildcard jobs have
* a chance to run, and we do our housekeeping
*/
/* run fixed-time jobs for each minute missed */
do
{
virtualTime = TimestampTzPlusMilliseconds(virtualTime,
60*1000);
if (ShouldRunTask(schedule, virtualTime, false, true))
{
task->pendingRunCount += 1;
}
} while (virtualTime < currentMinute);
/* run wildcard jobs for current minute */
if (ShouldRunTask(schedule, currentMinute, true, false))
{
task->pendingRunCount += 1;
}
break;
}
case CLOCK_JUMP_BACKWARD:
{
/*
* case 3: timeDiff is a small or medium-sized
* negative num, eg. because of DST ending just run
* the wildcard jobs. The fixed-time jobs probably
* have already run, and should not be repeated
* virtual time does not change until we are caught up
*/
if (ShouldRunTask(schedule, currentMinute, true, false))
{
task->pendingRunCount += 1;
}
break;
}
default:
{
/*
* other: time has changed a *lot*, skip over any
* intermediate fixed-time jobs and go back to
* normal operation.
*/
if (ShouldRunTask(schedule, currentMinute, true, true))
{
task->pendingRunCount += 1;
}
}
}
}
/*
* MinutesPassed returns the number of minutes between startTime and
* stopTime rounded down to the closest integer.
*/
static int
MinutesPassed(TimestampTz startTime, TimestampTz stopTime)
{
int microsPassed = 0;
long secondsPassed = 0;
int minutesPassed = 0;
TimestampDifference(startTime, stopTime,
&secondsPassed, &microsPassed);
minutesPassed = secondsPassed / 60;
return minutesPassed;
}
/*
* TimestampMinuteEnd returns the timestamp at the start of the
* current minute for the given time.
*/
static TimestampTz
TimestampMinuteStart(TimestampTz time)
{
TimestampTz result = 0;
#ifdef HAVE_INT64_TIMESTAMP
result = time - time % 60000000;
#else
result = (long) time - (long) time % 60;
#endif
return result;
}
/*
* TimestampMinuteEnd returns the timestamp at the start of the
* next minute from the given time.
*/
static TimestampTz
TimestampMinuteEnd(TimestampTz time)
{
TimestampTz result = TimestampMinuteStart(time);
#ifdef HAVE_INT64_TIMESTAMP
result += 60000000;
#else
result += 60;
#endif
return result;
}
/*
* ShouldRunTask returns whether a job should run in the current
* minute according to its schedule.
*/
static bool
ShouldRunTask(entry *schedule, TimestampTz currentTime, bool doWild,
bool doNonWild)
{
pg_time_t currentTime_t = timestamptz_to_time_t(currentTime);
struct pg_tm* tm = pg_localtime(&currentTime_t, pg_tzset(task_timezone));
int minute = tm->tm_min -FIRST_MINUTE;
int hour = tm->tm_hour -FIRST_HOUR;
int dayOfMonth = tm->tm_mday -FIRST_DOM;
int month = tm->tm_mon +1 -FIRST_MONTH;
int dayOfWeek = tm->tm_wday -FIRST_DOW;
if (bit_test(schedule->minute, minute) &&
bit_test(schedule->hour, hour) &&
bit_test(schedule->month, month) &&
( ((schedule->flags & DOM_STAR) || (schedule->flags & DOW_STAR))
? (bit_test(schedule->dow,dayOfWeek) && bit_test(schedule->dom,dayOfMonth))
: (bit_test(schedule->dow,dayOfWeek) || bit_test(schedule->dom,dayOfMonth)))) {
if ((doNonWild && !(schedule->flags & (MIN_STAR|HR_STAR)))
|| (doWild && (schedule->flags & (MIN_STAR|HR_STAR))))
{
return true;
}
}
return false;
}
/*
* WaitForCronTasks blocks waiting for any active task for at most
* 1 second.
*/
static void
WaitForCronTasks(List *taskList)
{
int taskCount = list_length(taskList);
if (taskCount > 0)
{
PollForTasks(taskList);
}
else
{
WaitForLatch(MaxWait);
}
}
/*
* WaitForLatch waits for the given number of milliseconds unless a signal
* is received or postmaster shuts down.
*/
static void
WaitForLatch(int timeoutMs)
{
int rc = 0;
int waitFlags = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
/* nothing to do, wait for new jobs */
rc = WaitLatch(MyLatch, waitFlags, timeoutMs, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
if (rc & WL_POSTMASTER_DEATH)
{
/* postmaster died and we should bail out immediately */
proc_exit(1);
}
}
/*
* PollForTasks calls poll() for the sockets of all tasks. It checks for
* read or write events based on the pollingStatus of the task.
*/
static void
PollForTasks(List *taskList)
{
TimestampTz currentTime = 0;
TimestampTz nextEventTime = 0;
int pollTimeout = 0;
long waitSeconds = 0;
int waitMicros = 0;
CronTask **polledTasks = NULL;
struct pollfd *pollFDs = NULL;
int pollResult = 0;
int taskIndex = 0;
int taskCount = list_length(taskList);
int activeTaskCount = 0;
ListCell *taskCell = NULL;
polledTasks = (CronTask **) palloc0(taskCount * sizeof(CronTask *));
pollFDs = (struct pollfd *) palloc0(taskCount * sizeof(struct pollfd));
currentTime = GetCurrentTimestamp();
/*
* At the latest, wake up when the next minute starts.
*/
nextEventTime = TimestampMinuteEnd(currentTime);
foreach(taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);
PostgresPollingStatusType pollingStatus = task->pollingStatus;
struct pollfd *pollFileDescriptor = &pollFDs[activeTaskCount];
if (activeTaskCount >= max_running_tasks)
{
/* already polling the maximum number of tasks */
break;
}
if (task->state == CRON_TASK_ERROR || task->state == CRON_TASK_DONE ||
CanStartTask(task))
{
/* there is work to be done, don't wait */
pfree(polledTasks);
pfree(pollFDs);
return;
}
if (task->state == CRON_TASK_WAITING && task->pendingRunCount == 0)
{
/*
* Make sure we do not wait past the next run time of an interval
* job.
*/
if (task->secondsInterval > 0)
{
TimestampTz nextRunTime =
TimestampTzPlusMilliseconds(task->lastStartTime,
task->secondsInterval * 1000);
if (TimestampDifferenceExceeds(nextRunTime, nextEventTime, 0))
{
nextEventTime = nextRunTime;
}
}
/* don't poll idle tasks */
continue;
}
if (task->state == CRON_TASK_CONNECTING ||
task->state == CRON_TASK_SENDING)
{
/*
* We need to wake up when a timeout expires.
* Take the minimum of nextEventTime and task->startDeadline.
*/
if (TimestampDifferenceExceeds(task->startDeadline, nextEventTime, 0))
{
nextEventTime = task->startDeadline;
}
}
/* we plan to poll this task */
pollFileDescriptor = &pollFDs[activeTaskCount];
polledTasks[activeTaskCount] = task;
if (task->state == CRON_TASK_CONNECTING ||
task->state == CRON_TASK_SENDING ||
task->state == CRON_TASK_BGW_RUNNING ||
task->state == CRON_TASK_RUNNING)
{
PGconn *connection = task->connection;
int pollEventMask = 0;
/*
* Set the appropriate mask for poll, based on the current polling
* status of the task, controlled by ManageCronTask.
*/
if (pollingStatus == PGRES_POLLING_READING)
{
pollEventMask = POLLERR | POLLIN;
}
else if (pollingStatus == PGRES_POLLING_WRITING)
{
pollEventMask = POLLERR | POLLOUT;
}
pollFileDescriptor->fd = PQsocket(connection);
pollFileDescriptor->events = pollEventMask;
}
else
{
/*
* Task is not running.
*/
pollFileDescriptor->fd = -1;
pollFileDescriptor->events = 0;
}
pollFileDescriptor->revents = 0;
if (pollFileDescriptor->fd >= 0)
activeTaskCount++;
}
/*
* Find the first time-based event, which is either the start of a new
* minute or a timeout.
*/
TimestampDifference(currentTime, nextEventTime, &waitSeconds, &waitMicros);
pollTimeout = waitSeconds * 1000 + waitMicros / 1000;
if (pollTimeout <= 0)
{
/*
* Interval jobs might frequently be overdue, inject a small
* 1ms wait to avoid getting into a tight loop.
*/
pollTimeout = 1;
}
else if (pollTimeout > MaxWait)
{
/*
* We never wait more than 1 second, this gives us a chance to react
* to external events like a TERM signal and job changes.
*/
pollTimeout = MaxWait;
}
if (activeTaskCount == 0)
{
/* turns out there's nothing to do, just wait for something to happen */
WaitForLatch(pollTimeout);
pfree(polledTasks);
pfree(pollFDs);
return;
}
pollResult = poll(pollFDs, activeTaskCount, pollTimeout);
if (pollResult < 0)
{
/*
* This typically happens in case of a signal, though we should
* probably check errno in case something bad happened.
*/
pfree(polledTasks);
pfree(pollFDs);
return;
}
for (taskIndex = 0; taskIndex < activeTaskCount; taskIndex++)
{
CronTask *task = polledTasks[taskIndex];
struct pollfd *pollFileDescriptor = &pollFDs[taskIndex];
task->isSocketReady = pollFileDescriptor->revents &
pollFileDescriptor->events;
}
pfree(polledTasks);
pfree(pollFDs);
}
/*
* CanStartTask determines whether a task is ready to be started because
* it has pending runs and we are running less than MaxRunningTasks.
*/
static bool
CanStartTask(CronTask *task)
{
return task->state == CRON_TASK_WAITING && task->pendingRunCount > 0 &&
RunningTaskCount < max_running_tasks;
}
/*
* ManageCronTasks proceeds the state machines of the given list of tasks.
*/
static void
ManageCronTasks(List *taskList, TimestampTz currentTime)
{
ListCell *taskCell = NULL;
foreach(taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);
ManageCronTask(task, currentTime);
}
}
/*
* ManageCronTask implements the cron task state machine.
*/
static void
ManageCronTask(CronTask *task, TimestampTz currentTime)
{
CronTaskState checkState = task->state;
int64 jobId = task->jobId;
CronJob *cronJob = GetCronJob(jobId);
PGconn *connection = task->connection;
ConnStatusType connectionStatus = CONNECTION_BAD;
TimestampTz start_time;
switch (checkState)
{
case CRON_TASK_WAITING:
{
/* check if job has been removed */
if (!task->isActive)
{
/* remove task as well */
RemoveTask(jobId);
break;
}
if (!CanStartTask(task))
{
break;
}
task->pendingRunCount -= 1;
if (task_use_background_worker)
task->state = CRON_TASK_BGW_START;
else
task->state = CRON_TASK_START;
task->lastStartTime = currentTime;
RunningTaskCount++;
/* Add new entry to audit table. */
task->runId = NextRunId();
if (task_log_run)
InsertJobRunDetail(task->runId, &cronJob->jobId,
cronJob->database, cronJob->userName,
cronJob->command, GetCronStatus(CRON_STATUS_STARTING));
}
case CRON_TASK_START:
{
/*
* as there is no break at the end of the previous case
* to not add an extra second, then do another check here
*/
if (!task_use_background_worker)
{
const char *clientEncoding = GetDatabaseEncodingName();
char nodePortString[12];
TimestampTz startDeadline = 0;
const char *keywordArray[] = {
"host",
"port",
"fallback_application_name",
"client_encoding",
"dbname",
"user",
NULL
};
const char *valueArray[] = {
cronJob->nodeName,
nodePortString,
"pg_cron",
clientEncoding,
cronJob->database,
cronJob->userName,
NULL
};
sprintf(nodePortString, "%d", cronJob->nodePort);
Assert(sizeof(keywordArray) == sizeof(valueArray));
if (task_log_statement)
{
char *command = cronJob->command;
ereport(LOG, (errmsg("cron job " INT64_FORMAT " %s: %s",
jobId, GetCronStatus(CRON_STATUS_STARTING), command)));
}
connection = PQconnectStartParams(keywordArray, valueArray, false);
PQsetnonblocking(connection, 1);
connectionStatus = PQstatus(connection);
if (connectionStatus == CONNECTION_BAD)
{
/* make sure we call PQfinish on the connection */
task->connection = connection;
task->errorMessage = "connection failed";
task->pollingStatus = 0;
task->state = CRON_TASK_ERROR;
break;
}
startDeadline = TimestampTzPlusMilliseconds(currentTime, CronTaskStartTimeout);
task->startDeadline = startDeadline;
task->connection = connection;
task->pollingStatus = PGRES_POLLING_WRITING;
task->state = CRON_TASK_CONNECTING;
if (task_log_run)
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_CONNECTING), NULL, NULL, NULL);
break;
}
}
case CRON_TASK_BGW_START:
{
BackgroundWorker worker;
pid_t pid;
shm_toc_estimator e;
shm_toc *toc;
char *database;
char *username;
char *command;
MemoryContext oldcontext;
shm_mq *mq;
Size segsize;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
bool registered;
TimestampTz startDeadline = 0;
/*
* break in the previous case has not been reached
* checking just for extra precaution
*/
Assert(task_use_background_worker);
#define QUEUE_SIZE ((Size) 65536)
/*
* Create the shared memory that we will pass to the background
* worker process. We use DSM_CREATE_NULL_IF_MAXSEGMENTS so that we
* do not ERROR here. This way, we can mark the job as failed and
* keep the launcher process running normally.
*/
shm_toc_initialize_estimator(&e);
shm_toc_estimate_chunk(&e, strlen(cronJob->database) + 1);
shm_toc_estimate_chunk(&e, strlen(cronJob->userName) + 1);
shm_toc_estimate_chunk(&e, strlen(cronJob->command) + 1);
shm_toc_estimate_chunk(&e, QUEUE_SIZE);
shm_toc_estimate_keys(&e, PG_CRON_NKEYS);
segsize = shm_toc_estimate(&e);
task->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (task->seg == NULL)
{
task->state = CRON_TASK_ERROR;
task->errorMessage = "unable to create a DSM segment; more "
"details may be available in the server log";
ereport(WARNING,
(errmsg("max number of DSM segments may has been reached")));
break;
}
toc = shm_toc_create(PG_CRON_MAGIC, dsm_segment_address(task->seg), segsize);
database = shm_toc_allocate(toc, strlen(cronJob->database) + 1);
strcpy(database, cronJob->database);
shm_toc_insert(toc, PG_CRON_KEY_DATABASE, database);
username = shm_toc_allocate(toc, strlen(cronJob->userName) + 1);
strcpy(username, cronJob->userName);
shm_toc_insert(toc, PG_CRON_KEY_USERNAME, username);
command = shm_toc_allocate(toc, strlen(cronJob->command) + 1);
strcpy(command, cronJob->command);
shm_toc_insert(toc, PG_CRON_KEY_COMMAND, command);
mq = shm_mq_create(shm_toc_allocate(toc, QUEUE_SIZE), QUEUE_SIZE);
shm_toc_insert(toc, PG_CRON_KEY_QUEUE, mq);
shm_mq_set_receiver(mq, MyProc);
/*
* Attach the queue before launching a worker, so that we'll automatically
* detach the queue if we error out. (Otherwise, the worker might sit
* there trying to write the queue long after we've gone away.)
*/
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
task->sharedMemoryQueue = shm_mq_attach(mq, task->seg, NULL);
MemoryContextSwitchTo(oldcontext);
/* Prepare the background worker. */
memset(&worker, 0, sizeof(BackgroundWorker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "postgres");
sprintf(worker.bgw_function_name, "CronBackgroundWorker");
snprintf(worker.bgw_type, BGW_MAXLEN, "pg_cron");
snprintf(worker.bgw_name, BGW_MAXLEN, "pg_cron worker");
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(task->seg));
worker.bgw_notify_pid = MyProcPid;
/*
* Start the worker process.
*/
if (task_log_statement)
{
ereport(LOG, (errmsg("cron job " INT64_FORMAT " %s: %s",
jobId, GetCronStatus(CRON_STATUS_STARTING), command)));
}
/*
* If no no background worker slots are currently available
* let's try until we reach jobStartupTimeout
*/
startDeadline = TimestampTzPlusMilliseconds(currentTime,
CronTaskStartTimeout);
task->startDeadline = startDeadline;
do
{
registered = RegisterDynamicBackgroundWorker(&worker, &handle);
}
while (!registered && !jobStartupTimeout(task, GetCurrentTimestamp()));
if (!registered)
{
dsm_detach(task->seg);
task->seg = NULL;
task->state = CRON_TASK_ERROR;
task->errorMessage = "could not start background process; more "
"details may be available in the server log";
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
errhint("You might need to increase max_worker_processes.")));
break;
}
task->startDeadline = 0;
task->handle = *handle;
status = WaitForBackgroundWorkerStartup(&task->handle, &pid);
if (status != BGWH_STARTED && status != BGWH_STOPPED)
{
dsm_detach(task->seg);
task->seg = NULL;
task->state = CRON_TASK_ERROR;
task->errorMessage = "could not start background process; more "
"details may be available in the server log";
break;
}
start_time = GetCurrentTimestamp();
if (task_log_run)
UpdateJobRunDetail(task->runId, &pid, GetCronStatus(CRON_STATUS_RUNNING), NULL, &start_time, NULL);
task->state = CRON_TASK_BGW_RUNNING;
break;
}
case CRON_TASK_CONNECTING:
{
PostgresPollingStatusType pollingStatus = 0;
Assert(!task_use_background_worker);
/* check if job has been removed */
if (jobCanceled(task))
break;
/* check if timeout has been reached */
if (jobStartupTimeout(task, currentTime))
break;
/* check if connection is still alive */
connectionStatus = PQstatus(connection);
if (connectionStatus == CONNECTION_BAD)
{
task->errorMessage = "connection failed";
task->pollingStatus = 0;
task->state = CRON_TASK_ERROR;
break;
}
/* check if socket is ready to send */
if (!task->isSocketReady)
{
break;
}
/* check whether a connection has been established */
pollingStatus = PQconnectPoll(connection);
if (pollingStatus == PGRES_POLLING_OK)
{
pid_t pid;
/* wait for socket to be ready to send a query */
task->pollingStatus = PGRES_POLLING_WRITING;
task->state = CRON_TASK_SENDING;
pid = (pid_t) PQbackendPID(connection);
if (task_log_run)
UpdateJobRunDetail(task->runId, &pid, GetCronStatus(CRON_STATUS_SENDING), NULL, NULL, NULL);
}
else if (pollingStatus == PGRES_POLLING_FAILED)
{
task->errorMessage = "connection failed";
task->pollingStatus = 0;
task->state = CRON_TASK_ERROR;
}
else
{
/*
* Connection is still being established.
*
* On the next WaitForTasks round, we wait for reading or writing
* based on the status returned by PQconnectPoll, see:
* https://www.postgresql.org/docs/9.5/static/libpq-connect.html
*/
task->pollingStatus = pollingStatus;
}
break;
}
case CRON_TASK_SENDING:
{
char *command = cronJob->command;
int sendResult = 0;
Assert(!task_use_background_worker);
/* check if job has been removed */
if (jobCanceled(task))
break;
/* check if timeout has been reached */
if (jobStartupTimeout(task, currentTime))
break;
/* check if socket is ready to send */
if (!task->isSocketReady)
{
break;
}
/* check if connection is still alive */
connectionStatus = PQstatus(connection);
if (connectionStatus == CONNECTION_BAD)
{
task->errorMessage = "connection lost";
task->pollingStatus = 0;
task->state = CRON_TASK_ERROR;
break;
}
sendResult = PQsendQuery(connection, command);
if (sendResult == 1)
{
/* wait for socket to be ready to receive results */
task->pollingStatus = PGRES_POLLING_READING;
/* command is underway, stop using timeout */
task->startDeadline = 0;
task->state = CRON_TASK_RUNNING;
start_time = GetCurrentTimestamp();
if (task_log_run)
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_RUNNING), NULL, &start_time, NULL);
}
else
{
/* not yet ready to send */
}
break;
}
case CRON_TASK_RUNNING:
{
int connectionBusy = 0;
PGresult *result = NULL;
Assert(!task_use_background_worker);
/* check if job has been removed */
if (jobCanceled(task))
break;
/* check if connection is still alive */
connectionStatus = PQstatus(connection);
if (connectionStatus == CONNECTION_BAD)
{
task->errorMessage = "connection lost";
task->pollingStatus = 0;
task->state = CRON_TASK_ERROR;
break;
}
/* check if socket is ready to send */
if (!task->isSocketReady)
{
break;
}
PQconsumeInput(connection);
connectionBusy = PQisBusy(connection);
if (connectionBusy)
{
/* still waiting for results */
break;
}
while ((result = PQgetResult(connection)) != NULL)
{
GetTaskFeedback(result, task);
}
PQfinish(connection);
task->connection = NULL;
task->pollingStatus = 0;
task->isSocketReady = false;
task->state = CRON_TASK_DONE;
RunningTaskCount--;
break;
}
case CRON_TASK_BGW_RUNNING:
{
pid_t pid;
Assert(task_use_background_worker);
/* check if job has been removed */
if (jobCanceled(task))
{
TerminateBackgroundWorker(&task->handle);
WaitForBackgroundWorkerShutdown(&task->handle);
dsm_detach(task->seg);
task->seg = NULL;
break;
}
/* still waiting for job to complete */
if (GetBackgroundWorkerPid(&task->handle, &pid) != BGWH_STOPPED)
{
bool isRunning = true;
/* process notices and warnings */
ProcessBgwTaskFeedback(task, isRunning);
}
else
{
bool isRunning = false;
/* process remaining notices and final task result */
ProcessBgwTaskFeedback(task, isRunning);
task->state = CRON_TASK_DONE;
dsm_detach(task->seg);
task->seg = NULL;
RunningTaskCount--;
}
break;
}
case CRON_TASK_ERROR:
{
if (connection != NULL)
{
PQfinish(connection);
task->connection = NULL;
}
if (!task->isActive)
{
RemoveTask(jobId);
}
if (task->errorMessage != NULL)
{
if (task_log_run)
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_FAILED), task->errorMessage, NULL, NULL);
ereport(LOG, (errmsg("cron job " INT64_FORMAT " %s",
jobId, task->errorMessage)));
if (task->freeErrorMessage)
{
free(task->errorMessage);
}
}
else
{
ereport(LOG, (errmsg("cron job " INT64_FORMAT " %s", jobId, GetCronStatus(CRON_STATUS_FAILED))));
}
task->startDeadline = 0;
task->isSocketReady = false;
task->state = CRON_TASK_DONE;
RunningTaskCount--;
/* fall through to CRON_TASK_DONE */
}
case CRON_TASK_DONE:
default:
{
int currentPendingRunCount = task->pendingRunCount;
CronJob *job = GetCronJob(jobId);
/*
* It may happen that job was unscheduled during task execution.
* In this case we keep task as-is. Otherwise, we should
* re-initialize task, i.e. reset fields to initial values including
* status.
*/
if (job != NULL && job->active)
InitializeCronTask(task, jobId);
else
task->state = CRON_TASK_WAITING;
/*
* We keep the number of runs that should have started while
* the task was still running. If >0, this will trigger another
* run immediately.
*/
task->pendingRunCount = currentPendingRunCount;
}
}
}
static void
GetTaskFeedback(PGresult *result, CronTask *task)
{
TimestampTz end_time;
ExecStatusType executionStatus;
end_time = GetCurrentTimestamp();
executionStatus = PQresultStatus(result);
switch (executionStatus)
{
case PGRES_COMMAND_OK:
{
char *cmdStatus = PQcmdStatus(result);
char *cmdTuples = PQcmdTuples(result);
if (task_log_run)
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_SUCCEEDED), cmdStatus, NULL, &end_time);
if (task_log_statement)
{
ereport(LOG, (errmsg("cron job " INT64_FORMAT " COMMAND completed: %s %s",
task->jobId, cmdStatus, cmdTuples)));
}
break;
}
case PGRES_BAD_RESPONSE:
case PGRES_FATAL_ERROR:
{
task->errorMessage = strdup(PQresultErrorMessage(result));
task->freeErrorMessage = true;
task->pollingStatus = 0;
task->state = CRON_TASK_ERROR;
if (task_log_run)
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_FAILED), task->errorMessage, NULL, &end_time);
PQclear(result);
return;
}
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
case PGRES_COPY_BOTH:
{
/* cannot handle COPY input/output */
task->errorMessage = "COPY not supported";
task->pollingStatus = 0;
task->state = CRON_TASK_ERROR;
if (task_log_run)
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_FAILED), task->errorMessage, NULL, &end_time);
PQclear(result);
return;
}
case PGRES_TUPLES_OK:
case PGRES_EMPTY_QUERY:
case PGRES_SINGLE_TUPLE:
case PGRES_NONFATAL_ERROR:
default:
{
int tupleCount = PQntuples(result);
char *rowString = ngettext("row", "rows",
tupleCount);
char rows[MAXINT8LEN + 1];
char outputrows[MAXINT8LEN + 4 + 1];
pg_lltoa(tupleCount, rows);
snprintf(outputrows, sizeof(outputrows), "%s %s", rows, rowString);
if (task_log_run)
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_SUCCEEDED), outputrows, NULL, &end_time);
if (task_log_statement)
{
ereport(LOG, (errmsg("cron job " INT64_FORMAT " completed: "
"%d %s",
task->jobId, tupleCount,
rowString)));
}
break;
}
}
PQclear(result);
}
/*
* ProcessBgwTaskFeedback reads messages from a shared memory queue associated
* with the background worker that is executing a given task. If the task is
* still running, the function does not block if the queue is empty. Otherwise,
* it reads until the end of the queue.
*/
static void
ProcessBgwTaskFeedback(CronTask *task, bool running)
{
shm_mq_handle *responseq = task->sharedMemoryQueue;
TimestampTz end_time;
Size nbytes;
void *data;
char msgtype;
StringInfoData msg;
shm_mq_result res;
end_time = GetCurrentTimestamp();
/*
* Message-parsing routines operate on a null-terminated StringInfo,
* so we must construct one.
*/
for (;;)
{
/* do not wait if the task is running */
bool nowait = running;
/* Get next message. */
res = shm_mq_receive(responseq, &nbytes, &data, nowait);
if (res != SHM_MQ_SUCCESS)
break;
initStringInfo(&msg);
resetStringInfo(&msg);
enlargeStringInfo(&msg, nbytes);
msg.len = nbytes;
memcpy(msg.data, data, nbytes);
msg.data[nbytes] = '\0';
msgtype = pq_getmsgbyte(&msg);
switch (msgtype)
{
case 'N':
case 'E':
{
ErrorData edata;
StringInfoData display_msg;
pq_parse_errornotice(&msg, &edata);
initStringInfo(&display_msg);
bgw_generate_returned_message(&display_msg, edata);
if (task_log_run)
{
if (edata.elevel >= ERROR)
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_FAILED), display_msg.data, NULL, &end_time);
else if (running)
UpdateJobRunDetail(task->runId, NULL, NULL, display_msg.data, NULL, NULL);
else
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_SUCCEEDED), display_msg.data, NULL, &end_time);
}
ereport(LOG, (errmsg("cron job " INT64_FORMAT ": %s",
task->jobId, display_msg.data)));
pfree(display_msg.data);
break;
}
case 'T':
break;
case 'C':
{
const char *tag = pq_getmsgstring(&msg);
char *nonconst_tag;
char *cmdTuples;
nonconst_tag = strdup(tag);
if (task_log_run)
UpdateJobRunDetail(task->runId, NULL, GetCronStatus(CRON_STATUS_SUCCEEDED), nonconst_tag, NULL, &end_time);
if (task_log_statement) {
cmdTuples = pg_cron_cmdTuples(nonconst_tag);
ereport(LOG, (errmsg("cron job " INT64_FORMAT " COMMAND completed: %s %s",
task->jobId, nonconst_tag, cmdTuples)));
}
free(nonconst_tag);
break;
}
case 'A':
case 'D':
case 'G':
case 'H':
case 'W':
case 'Z':
break;
default:
elog(WARNING, "unknown message type: %c (%zu bytes)",
msg.data[0], nbytes);
break;
}
pfree(msg.data);
}
}
/*
* Background worker logic.
*/
void
CronBackgroundWorker(Datum main_arg)
{
dsm_segment *seg;
shm_toc *toc;
char *database;
char *username;
char *command;
shm_mq *mq;
shm_mq_handle *responseq;
/* handle SIGTERM like regular backend */
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
/* Set up a memory context and resource owner. */
Assert(CurrentResourceOwner == NULL);
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_cron");
CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
"pg_cron worker",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* Set up a dynamic shared memory segment. */
seg = dsm_attach(DatumGetInt32(main_arg));
if (seg == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("unable to map dynamic shared memory segment")));
toc = shm_toc_attach(PG_CRON_MAGIC, dsm_segment_address(seg));
if (toc == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("bad magic number in dynamic shared memory segment")));
database = shm_toc_lookup(toc, PG_CRON_KEY_DATABASE, false);
username = shm_toc_lookup(toc, PG_CRON_KEY_USERNAME, false);
command = shm_toc_lookup(toc, PG_CRON_KEY_COMMAND, false);
mq = shm_toc_lookup(toc, PG_CRON_KEY_QUEUE, false);
shm_mq_set_sender(mq, MyProc);
responseq = shm_mq_attach(mq, seg, NULL);
pq_redirect_to_shm_mq(seg, responseq);
BackgroundWorkerInitializeConnection(database, username, 0);
/* Prepare to execute the query. */
SetCurrentStatementStartTimestamp();
debug_query_string = command;
pgstat_report_activity(STATE_RUNNING, command);
StartTransactionCommand();
if (StatementTimeout > 0)
enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout);
else
disable_timeout(STATEMENT_TIMEOUT, false);
/* Execute the query. */
ExecuteSqlString(command);
/* Post-execution cleanup. */
disable_timeout(STATEMENT_TIMEOUT, false);
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, command);
pgstat_report_stat(true);
/* Signal that we are done. */
ReadyForQuery(DestRemote);
dsm_detach(seg);
proc_exit(0);
}
/*
* Execute given SQL string without SPI or a libpq session.
*/
static void
ExecuteSqlString(const char *sql)
{
List *raw_parsetree_list;
ListCell *lc1;
bool isTopLevel;
int commands_remaining;
MemoryContext parsecontext;
MemoryContext oldcontext;
/*
* Parse the SQL string into a list of raw parse trees.
*
* Because we allow statements that perform internal transaction control,
* we can't do this in TopTransactionContext; the parse trees might get
* blown away before we're done executing them.
*/
parsecontext = AllocSetContextCreate(TopMemoryContext,
"pg_cron parse/plan",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldcontext = MemoryContextSwitchTo(parsecontext);
raw_parsetree_list = pg_parse_query(sql);
commands_remaining = list_length(raw_parsetree_list);
isTopLevel = commands_remaining == 1;
MemoryContextSwitchTo(oldcontext);
/*
* Do parse analysis, rule rewrite, planning, and execution for each raw
* parsetree. We must fully execute each query before beginning parse
* analysis on the next one, since there may be interdependencies.
*/
foreach(lc1, raw_parsetree_list)
{
RawStmt *parsetree = (RawStmt *) lfirst(lc1);
CommandTag commandTag;
QueryCompletion qc;
List *querytree_list;
List *plantree_list;
bool snapshot_set = false;
Portal portal;
DestReceiver *receiver;
int16 format = 1;
/*
* We don't allow transaction-control commands like COMMIT and ABORT
* here. The entire SQL statement is executed as a single transaction
* which commits if no errors are encountered.
*/
if (IsA(parsetree, TransactionStmt))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("transaction control statements are not allowed in pg_cron")));
/*
* Get the command name for use in status display (it also becomes the
* default completion tag, down inside PortalRun). Set ps_status and
* do any special start-of-SQL-command processing needed by the
* destination.
*/
commandTag = CreateCommandTag(parsetree->stmt);
set_ps_display(GetCommandTagName(commandTag));
BeginCommand(commandTag, DestNone);
/* Set up a snapshot if parse analysis/planning will need one. */
if (analyze_requires_snapshot(parsetree))
{
PushActiveSnapshot(GetTransactionSnapshot());
snapshot_set = true;
}
/*
* OK to analyze, rewrite, and plan this query.
*
* As with parsing, we need to make sure this data outlives the
* transaction, because of the possibility that the statement might
* perform internal transaction control.
*/
oldcontext = MemoryContextSwitchTo(parsecontext);
#if PG_VERSION_NUM >= 150000
querytree_list = pg_analyze_and_rewrite_fixedparams(parsetree, sql, NULL, 0, NULL);
#elif PG_VERSION_NUM >= 100000
querytree_list = pg_analyze_and_rewrite_fixedparams(parsetree, sql, NULL, 0, NULL);
#else
querytree_list = pg_analyze_and_rewrite(parsetree, sql, NULL, 0);
#endif
plantree_list = pg_plan_queries(querytree_list, sql, 0, NULL);
/* Done with the snapshot used for parsing/planning */
if (snapshot_set)
PopActiveSnapshot();
/* If we got a cancel signal in analysis or planning, quit */
CHECK_FOR_INTERRUPTS();
/*
* Execute the query using the unnamed portal.
*/
portal = CreatePortal("", true, true);
/* Don't display the portal in pg_cursors */
portal->visible = false;
PortalDefineQuery(portal, NULL, sql, nodeTag(parsetree->stmt), commandTag, plantree_list, NULL);
PortalStart(portal, NULL, 0, InvalidSnapshot, NULL);
PortalSetResultFormat(portal, 1, &format); /* binary format */
--commands_remaining;
receiver = CreateDestReceiver(DestNone);
/*
* Only once the portal and destreceiver have been established can
* we return to the transaction context. All that stuff needs to
* survive an internal commit inside PortalRun!
*/
MemoryContextSwitchTo(oldcontext);
/* Here's where we actually execute the command. */
(void) PortalRun(portal, FETCH_ALL, isTopLevel, true, receiver, receiver, &qc);
/* Clean up the receiver. */
(*receiver->rDestroy) (receiver);
/*
* Send a CommandComplete message even if we suppressed the query
* results. The user backend will report these in the absence of
* any true query results.
*/
EndCommand(&qc, DestRemote, false);
/* Clean up the portal. */
PortalDrop(portal, false);
}
/* Be sure to advance the command counter after the last script command */
CommandCounterIncrement();
}
/*
* If a task is not marked as active, set an appropriate error state on the task
* and return true. Note that this should only be called after a task has
* already been launched.
*/
static bool
jobCanceled(CronTask *task)
{
Assert(task->state == CRON_TASK_CONNECTING || \
task->state == CRON_TASK_SENDING || \
task->state == CRON_TASK_BGW_RUNNING || \
task->state == CRON_TASK_RUNNING);
if (task->isActive)
return false;
else
{
/* Use the American spelling for consistency with PG code. */
task->errorMessage = "job canceled";
task->state = CRON_TASK_ERROR;
/*
* Technically, pollingStatus is only used by when UseBackgroundWorkers
* is false, but no damage in setting it in both cases.
*/
task->pollingStatus = 0;
return true;
}
}
/*
* If a task has hit it's startup deadline, set an appropriate error state on
* the task and return true. Note that this should only be called after a task
* has already been launched.
*/
static bool
jobStartupTimeout(CronTask *task, TimestampTz currentTime)
{
Assert(task->state == CRON_TASK_CONNECTING || \
task->state == CRON_TASK_SENDING || \
task->state == CRON_TASK_BGW_START);
if (TimestampDifferenceExceeds(task->startDeadline, currentTime, 0))
{
task->errorMessage = "job startup timeout";
task->pollingStatus = 0;
task->state = CRON_TASK_ERROR;
return true;
}
else
return false;
}