blob: e603d5fce52c49e626f25609140b0a60e1518ab8 [file]
/*-------------------------------------------------------------------------
*
* src/job_metadata.c
*
* Functions for reading and manipulating pg_cron metadata.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "fmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postgres.h"
#include "access/genam.h"
#include "access/hash.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/skey.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_task.h"
#include "catalog/pg_task_run_history.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "commands/trigger.h"
#include "executor/spi.h"
#include "postmaster/postmaster.h"
#include "postmaster/bgworker.h"
#include "storage/lock.h"
#include "task/job_metadata.h"
#include "task/pg_cron.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/formatting.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
static HTAB *CreateCronJobHash(void);
static void EnsureDeletePermission(Relation cronJobsTable, HeapTuple heapTuple);
static void InvalidateJobCacheCallback(Datum argument, Oid relationId);
static void InvalidateJobCache(void);
static CronJob *TupleToCronJob(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static Oid GetRoleOidIfCanLogin(char *username);
static entry *ParseSchedule(char *scheduleText);
static bool TryParseInterval(char *scheduleText, uint32 *secondsInterval);
/* GUC settings */
bool task_enable_superuser_jobs = true;
/* global variables */
static MemoryContext CronJobContext = NULL;
static HTAB *CronJobHash = NULL;
static Oid CachedCronJobRelationId = InvalidOid;
bool CronJobCacheValid = false;
/*
* InitializeJobMetadataCache initializes the data structures for caching
* job metadata.
*/
void
InitializeJobMetadataCache(void)
{
/* watch for invalidation events */
CacheRegisterRelcacheCallback(InvalidateJobCacheCallback, (Datum) 0);
CronJobContext = AllocSetContextCreate(CurrentMemoryContext,
"pg_cron job context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
CronJobHash = CreateCronJobHash();
}
/*
* ResetJobMetadataCache resets the job metadata cache to its initial
* state.
*/
void
ResetJobMetadataCache(void)
{
MemoryContextResetAndDeleteChildren(CronJobContext);
CronJobHash = CreateCronJobHash();
}
/*
* CreateCronJobHash creates the hash for caching job metadata.
*/
static HTAB *
CreateCronJobHash(void)
{
HTAB *taskHash = NULL;
HASHCTL info;
int hashFlags = 0;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(CronJob);
info.hash = tag_hash;
info.hcxt = CronJobContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
taskHash = hash_create("pg_cron jobs", 32, &info, hashFlags);
return taskHash;
}
/*
* GetCronJob gets the cron job with the given id.
*/
CronJob *
GetCronJob(int64 jobId)
{
CronJob *job = NULL;
int64 hashKey = jobId;
bool isPresent = false;
job = hash_search(CronJobHash, &hashKey, HASH_FIND, &isPresent);
return job;
}
/*
* ScheduleCronJob schedules a cron job with the given name.
*/
int64
ScheduleCronJob(text *scheduleText, text *commandText, text *databaseText,
text *usernameText, bool active, text *jobnameText)
{
entry *parsedSchedule = NULL;
char *schedule;
char *command;
char *database_name = NULL;
char *jobName = NULL;
char *username = NULL;
AclResult aclresult;
Oid userIdcheckacl;
int64 jobId = 0;
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
userIdcheckacl = GetUserId();
/* check schedule is valid */
schedule = text_to_cstring(scheduleText);
parsedSchedule = ParseSchedule(schedule);
if (parsedSchedule == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid schedule: %s", schedule),
errhint("Use cron format (e.g. 5 4 * * *), or interval "
"format '[1-59] seconds'")));
}
free_entry(parsedSchedule);
command = text_to_cstring(commandText);
if (jobnameText != NULL)
{
jobName = text_to_cstring(jobnameText);
}
/* username has been provided */
if (usernameText != NULL)
{
username = text_to_cstring(usernameText);
userIdcheckacl = GetRoleOidIfCanLogin(username);
}
/* database has been provided */
if (databaseText != NULL)
database_name = text_to_cstring(databaseText);
/* first do a crude check to see whether superuser jobs are allowed */
if (!task_enable_superuser_jobs && superuser_arg(userIdcheckacl))
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("cannot schedule jobs as superuser"),
errdetail("Scheduling jobs as superuser is disallowed when "
"cron.enable_superuser_jobs is set to off.")));
}
/* ensure the user that is used in the job can connect to the database */
aclresult = pg_database_aclcheck(get_database_oid(database_name, false),
userIdcheckacl, ACL_CONNECT);
if (aclresult != ACLCHECK_OK)
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("User %s does not have CONNECT privilege on %s",
GetUserNameFromId(userIdcheckacl, false), database_name)));
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
/* insert task into pg_catalog.pg_task table */
jobId = TaskCreate(schedule, command, task_host_addr, PostPortNumber,
database_name, username, active, jobName);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
InvalidateJobCache();
return jobId;
}
/*
* GetRoleOidIfCanLogin
* Checks user exist and can log in
*/
static Oid
GetRoleOidIfCanLogin(char *username)
{
HeapTuple roletup;
Form_pg_authid rform;
Oid roleOid = InvalidOid;
roletup = SearchSysCache1(AUTHNAME, PointerGetDatum(username));
if (!HeapTupleIsValid(roletup))
ereport(ERROR,
(errmsg("role \"%s\" does not exist",
username)));
rform = (Form_pg_authid) GETSTRUCT(roletup);
if (!rform->rolcanlogin)
ereport(ERROR,
(errmsg("role \"%s\" can not log in",
username),
errdetail("Jobs may only be run by roles that have the LOGIN attribute.")));
roleOid = rform->oid;
ReleaseSysCache(roletup);
return roleOid;
}
/*
* NextRunId draws a new run ID from GetNewOidWithIndex.
*/
int64
NextRunId(void)
{
Relation pg_task_run_history;
int64 runId = 0;
MemoryContext originalContext = CurrentMemoryContext;
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
pg_task_run_history = table_open(TaskRunHistoryRelationId, RowExclusiveLock);
runId = GetNewOidWithIndex(pg_task_run_history, TaskRunHistoryRunIdIndexId,
Anum_pg_task_run_history_runid);
table_close(pg_task_run_history, RowExclusiveLock);
PopActiveSnapshot();
CommitTransactionCommand();
MemoryContextSwitchTo(originalContext);
return runId;
}
Oid
UnscheduleCronJob(const char *jobname, const char *username, Oid taskid, bool missing_ok)
{
Relation pg_task = NULL;
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
pid_t cron_pid;
Form_pg_task task = NULL;
Oid jobid = InvalidOid;
pg_task = table_open(TaskRelationId, RowExclusiveLock);
if (OidIsValid(taskid)) {
ScanKeyData scanKey[1];
ScanKeyInit(&scanKey[0], Anum_pg_task_jobid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(taskid));
scanDescriptor = systable_beginscan(pg_task, TaskJobIdIndexId, false,
NULL, 1, scanKey);
}
else
{
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], Anum_pg_task_jobname,
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(jobname));
ScanKeyInit(&scanKey[1], Anum_pg_task_username,
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(username));
scanDescriptor = systable_beginscan(pg_task, TaskJobNameUserNameIndexId, false,
NULL, 2, scanKey);
}
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
if (missing_ok)
{
systable_endscan(scanDescriptor);
table_close(pg_task, RowExclusiveLock);
ereport(NOTICE,
(errmsg("task \"%s\" does not exist, skipping",
jobname)));
return InvalidOid;
} else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("task \"%s\" does not exist", jobname)));
}
task = (Form_pg_task) GETSTRUCT(heapTuple);
jobid = task->jobid;
EnsureDeletePermission(pg_task, heapTuple);
simple_heap_delete(pg_task, &heapTuple->t_self);
systable_endscan(scanDescriptor);
table_close(pg_task, RowExclusiveLock);
CommandCounterIncrement();
InvalidateJobCache();
/* Send SIGHUP to pg_cron launcher to reload the task */
cron_pid = PgCronLauncherPID();
if (cron_pid == InvalidPid)
elog(ERROR, "could not find pid of pg_cron launcher process");
if (kill(cron_pid, SIGHUP) < 0)
elog(DEBUG3, "kill(%ld,%d) failed: %m", (long) cron_pid, SIGHUP);
return jobid;
}
/*
* EnsureDeletePermission throws an error if the current user does
* not have permission to delete the given cron.job tuple.
*/
static void
EnsureDeletePermission(Relation cronJobsTable, HeapTuple heapTuple)
{
TupleDesc tupleDescriptor = RelationGetDescr(cronJobsTable);
/* check if the current user owns the row */
Oid userId = GetUserId();
char *userName = GetUserNameFromId(userId, false);
bool isNull = false;
Datum ownerNameDatum = heap_getattr(heapTuple, Anum_pg_task_username,
tupleDescriptor, &isNull);
char *ownerName = TextDatumGetCString(ownerNameDatum);
if (pg_strcasecmp(userName, ownerName) != 0)
{
/* otherwise, allow if the user has DELETE permission */
AclResult aclResult = pg_class_aclcheck(TaskRelationId, GetUserId(),
ACL_DELETE);
if (aclResult != ACLCHECK_OK)
{
aclcheck_error(aclResult,
OBJECT_TABLE,
get_rel_name(TaskRelationId));
}
}
}
/*
* Invalidate job cache ensures the job cache is reloaded on the next
* iteration of pg_cron.
*/
static void
InvalidateJobCache(void)
{
HeapTuple classTuple = NULL;
classTuple = SearchSysCache1(RELOID, ObjectIdGetDatum(TaskRelationId));
if (HeapTupleIsValid(classTuple))
{
CacheInvalidateRelcacheByTuple(classTuple);
ReleaseSysCache(classTuple);
}
}
/*
* InvalidateJobCacheCallback invalidates the job cache in response to
* an invalidation event.
*/
static void
InvalidateJobCacheCallback(Datum argument, Oid relationId)
{
if (relationId == CachedCronJobRelationId ||
CachedCronJobRelationId == InvalidOid)
{
CronJobCacheValid = false;
CachedCronJobRelationId = InvalidOid;
}
}
/*
* LoadCronJobList loads the current list of jobs from the
* cron.job table and adds each job to the CronJobHash.
*/
List *
LoadCronJobList(void)
{
List *jobList = NIL;
Relation cronJobTable = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 0;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
MemoryContext originalContext = CurrentMemoryContext;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
/*
* If we are on a hot standby, the job table is treated as being empty.
*/
if (RecoveryInProgress())
{
PopActiveSnapshot();
CommitTransactionCommand();
MemoryContextSwitchTo(originalContext);
return NIL;
}
cronJobTable = table_open(TaskRelationId, AccessShareLock);
scanDescriptor = systable_beginscan(cronJobTable,
InvalidOid, false,
NULL, scanKeyCount, scanKey);
tupleDescriptor = RelationGetDescr(cronJobTable);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
MemoryContext oldContext = NULL;
CronJob *job = NULL;
Oid jobOwnerId = InvalidOid;
oldContext = MemoryContextSwitchTo(CronJobContext);
job = TupleToCronJob(tupleDescriptor, heapTuple);
jobOwnerId = get_role_oid(job->userName, false);
if (!task_enable_superuser_jobs && superuser_arg(jobOwnerId))
{
/*
* Someone inserted a superuser into the metadata. Skip over the
* job when cron.enable_superuser_jobs is disabled. The memory
* will be cleaned up when CronJobContext is reset.
*/
ereport(WARNING, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("skipping job " INT64_FORMAT " since superuser jobs "
"are currently disallowed",
job->jobId)));
}
else
{
jobList = lappend(jobList, job);
}
MemoryContextSwitchTo(oldContext);
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
table_close(cronJobTable, AccessShareLock);
PopActiveSnapshot();
CommitTransactionCommand();
MemoryContextSwitchTo(originalContext);
return jobList;
}
/*
* TupleToCronJob takes a heap tuple and converts it into a CronJob
* struct.
*/
static CronJob *
TupleToCronJob(TupleDesc tupleDescriptor, HeapTuple heapTuple)
{
CronJob *job = NULL;
int64 jobKey = 0;
bool isNull = false;
bool isPresent = false;
entry *parsedSchedule = NULL;
Datum jobId = heap_getattr(heapTuple, Anum_pg_task_jobid,
tupleDescriptor, &isNull);
Datum schedule = heap_getattr(heapTuple, Anum_pg_task_schedule,
tupleDescriptor, &isNull);
Datum command = heap_getattr(heapTuple, Anum_pg_task_command,
tupleDescriptor, &isNull);
Datum nodeName = heap_getattr(heapTuple, Anum_pg_task_nodename,
tupleDescriptor, &isNull);
Datum nodePort = heap_getattr(heapTuple, Anum_pg_task_nodeport,
tupleDescriptor, &isNull);
Datum database = heap_getattr(heapTuple, Anum_pg_task_database,
tupleDescriptor, &isNull);
Datum userName = heap_getattr(heapTuple, Anum_pg_task_username,
tupleDescriptor, &isNull);
jobKey = DatumGetInt64(jobId);
job = hash_search(CronJobHash, &jobKey, HASH_ENTER, &isPresent);
job->jobId = DatumGetInt64(jobId);
job->scheduleText = TextDatumGetCString(schedule);
job->command = TextDatumGetCString(command);
job->nodeName = TextDatumGetCString(nodeName);
job->nodePort = DatumGetInt32(nodePort);
job->userName = TextDatumGetCString(userName);
job->database = TextDatumGetCString(database);
if (HeapTupleHeaderGetNatts(heapTuple->t_data) >= Anum_pg_task_active)
{
Datum active = heap_getattr(heapTuple, Anum_pg_task_active,
tupleDescriptor, &isNull);
Assert(!isNull);
job->active = DatumGetBool(active);
}
else
{
job->active = true;
}
if (tupleDescriptor->natts >= Anum_pg_task_jobname)
{
bool isJobNameNull = false;
Datum jobName = heap_getattr(heapTuple, Anum_pg_task_jobname,
tupleDescriptor, &isJobNameNull);
if (!isJobNameNull)
{
job->jobName = TextDatumGetCString(jobName);
}
else
{
job->jobName = NULL;
}
}
parsedSchedule = ParseSchedule(job->scheduleText);
if (parsedSchedule != NULL)
{
/* copy the schedule and free the allocated memory immediately */
job->schedule = *parsedSchedule;
free_entry(parsedSchedule);
}
else
{
ereport(LOG, (errmsg("invalid pg_cron schedule for job " INT64_FORMAT ": %s",
job->jobId, job->scheduleText)));
/* a zeroed out schedule never runs */
memset(&job->schedule, 0, sizeof(entry));
}
return job;
}
void
InsertJobRunDetail(int64 runId, int64 *jobId, char *database, char *username, char *command, char *status)
{
MemoryContext originalContext = CurrentMemoryContext;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
if (RecoveryInProgress())
{
PopActiveSnapshot();
CommitTransactionCommand();
MemoryContextSwitchTo(originalContext);
return;
}
TaskRunHistoryCreate(runId, jobId, database, username, command, status);
PopActiveSnapshot();
CommitTransactionCommand();
MemoryContextSwitchTo(originalContext);
}
void
UpdateJobRunDetail(int64 runId, int32 *job_pid, char *status, char *return_message,
TimestampTz *start_time,TimestampTz *end_time)
{
MemoryContext originalContext = CurrentMemoryContext;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
if (RecoveryInProgress())
{
PopActiveSnapshot();
CommitTransactionCommand();
MemoryContextSwitchTo(originalContext);
return;
}
TaskRunHistoryUpdate(runId, job_pid, status, return_message, start_time, end_time);
PopActiveSnapshot();
CommitTransactionCommand();
MemoryContextSwitchTo(originalContext);
}
void
AlterCronJob(int64 jobId, char *schedule, char *command,
char *database_name, char *username, bool *active)
{
AclResult aclresult;
Oid userIdcheckacl;
Oid savedUserId;
int savedSecurityContext;
entry *parsedSchedule = NULL;
userIdcheckacl = GetUserId();
savedUserId = InvalidOid;
savedSecurityContext = 0;
if (RecoveryInProgress())
{
return;
}
/* username has been provided */
if (username != NULL)
{
if (!superuser())
elog(ERROR, "must be superuser to alter username");
userIdcheckacl = GetRoleOidIfCanLogin(username);
}
if (!task_enable_superuser_jobs && superuser_arg(userIdcheckacl))
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("cannot schedule jobs as superuser"),
errdetail("Scheduling jobs as superuser is disallowed when "
"task_enable_superuser_jobs is set to off.")));
}
/* database has been provided */
if (database_name != NULL)
{
/* ensure the user that is used in the job can connect to the database */
aclresult = pg_database_aclcheck(get_database_oid(database_name, false),
userIdcheckacl, ACL_CONNECT);
if (aclresult != ACLCHECK_OK)
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("User %s does not have CONNECT privilege on %s",
GetUserNameFromId(userIdcheckacl, false), database_name)));
}
/* ensure schedule is valid */
if (schedule != NULL)
{
parsedSchedule = ParseSchedule(schedule);
if (parsedSchedule == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid schedule: %s", schedule),
errhint("Use cron format (e.g. 5 4 * * *), or interval "
"format '[1-59] seconds'")));
}
free_entry(parsedSchedule);
}
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
TaskUpdate(jobId, schedule, command, database_name, username, active);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
InvalidateJobCache();
}
void
MarkPendingRunsAsFailed(void)
{
MemoryContext originalContext = CurrentMemoryContext;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
if (RecoveryInProgress())
{
PopActiveSnapshot();
CommitTransactionCommand();
MemoryContextSwitchTo(originalContext);
return;
}
MarkRunningTaskAsFailed();
PopActiveSnapshot();
CommitTransactionCommand();
MemoryContextSwitchTo(originalContext);
}
char *
GetCronStatus(CronStatus cronstatus)
{
char *statusDesc = "unknown status";
switch (cronstatus)
{
case CRON_STATUS_STARTING:
statusDesc = "starting";
break;
case CRON_STATUS_RUNNING:
statusDesc = "running";
break;
case CRON_STATUS_SENDING:
statusDesc = "sending";
break;
case CRON_STATUS_CONNECTING:
statusDesc = "connecting";
break;
case CRON_STATUS_SUCCEEDED:
statusDesc = "succeeded";
break;
case CRON_STATUS_FAILED:
statusDesc = "failed";
break;
default:
break;
}
return statusDesc;
}
/*
* ParseSchedule attempts to parse a cron schedule or an interval in seconds.
* The returned pointer is allocated using malloc and should be freed by the
* caller.
*/
static entry *
ParseSchedule(char *scheduleText)
{
uint32 secondsInterval = 0;
/*
* First try to parse as a cron schedule.
*/
entry *schedule = parse_cron_entry(scheduleText);
if (schedule != NULL)
{
/* valid cron schedule */
return schedule;
}
/*
* Parse as interval on seconds.
*/
if (TryParseInterval(scheduleText, &secondsInterval))
{
entry *schedule = calloc(sizeof(entry), sizeof(char));
schedule->secondsInterval = secondsInterval;
return schedule;
}
elog(LOG, "failed to parse schedule: %s", scheduleText);
return NULL;
}
/*
* TryParseInterval returns whether scheduleText is of the form
* <positive number> second[s].
*/
static bool
TryParseInterval(char *scheduleText, uint32 *secondsInterval)
{
char lastChar = '\0';
char plural = '\0';
char extra = '\0';
char *lowercaseSchedule = asc_tolower(scheduleText, strlen(scheduleText));
int numParts = sscanf(lowercaseSchedule, " %u secon%c%c %c", secondsInterval,
&lastChar, &plural, &extra);
if (lastChar != 'd')
{
/* value did not have a "second" suffix */
return false;
}
if (numParts == 2)
{
/* <number> second (allow "2 second") */
return 0 < *secondsInterval && *secondsInterval < 60;
}
else if (numParts == 3 && plural == 's')
{
/* <number> seconds (allow "1 seconds") */
return 0 < *secondsInterval && *secondsInterval < 60;
}
return false;
}