blob: 5327f8025a0f2ab8fb1d129210e8531156387ffc [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* pg_task.c
* save all tasks of pg cron scheduler.
*
* IDENTIFICATION
* src/backend/catalog/pg_task.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "access/genam.h"
#include "access/table.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/pg_task.h"
#include "postmaster/bgworker.h"
#include "task/pg_cron.h"
#include "utils/builtins.h"
#include "utils/rel.h"
/*
* TaskCreate
* Create an new task in pg_task.
*/
Oid
TaskCreate(const char *schedule, const char *command,
const char *nodename, int32 nodeport,
const char *database, const char *username,
bool active, const char *jobname)
{
Relation pg_task;
HeapTuple tup;
Oid jobid;
Datum values[Natts_pg_task];
bool nulls[Natts_pg_task];
pid_t cron_pid;
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
pg_task = table_open(TaskRelationId, RowExclusiveLock);
jobid = GetNewOidWithIndex(pg_task, TaskJobIdIndexId, Anum_pg_task_jobid);
values[Anum_pg_task_jobid - 1] = ObjectIdGetDatum(jobid);
values[Anum_pg_task_command - 1] = CStringGetTextDatum(command);
values[Anum_pg_task_schedule - 1] = CStringGetTextDatum(schedule);
values[Anum_pg_task_nodename - 1] = CStringGetTextDatum(nodename);
values[Anum_pg_task_nodeport - 1] = Int32GetDatum(nodeport);
values[Anum_pg_task_database - 1] = CStringGetTextDatum(database);
values[Anum_pg_task_username - 1] = CStringGetTextDatum(username);
values[Anum_pg_task_active - 1] = BoolGetDatum(active);
if (jobname)
values[Anum_pg_task_jobname - 1] = CStringGetTextDatum(jobname);
else
nulls[Anum_pg_task_jobname - 1] = true;
tup = heap_form_tuple(RelationGetDescr(pg_task), values, nulls);
CatalogTupleInsert(pg_task, tup);
heap_freetuple(tup);
table_close(pg_task, RowExclusiveLock);
/* Send SIGHUP to pg_cron launcher to reload the task */
cron_pid = PgCronLauncherPID();
if (cron_pid == InvalidPid)
elog(ERROR, "could not find pid of pg_cron launcher process");
if (kill(cron_pid, SIGHUP) < 0)
elog(DEBUG3, "kill(%ld,%d) failed: %m", (long) cron_pid, SIGHUP);
return jobid;
}
/*
* TaskUpdate
* Update an existing task in pg_task.
*/
void
TaskUpdate(Oid jobid, const char *schedule,
const char *command, const char *database,
const char *username, bool *active)
{
Relation pg_task;
HeapTuple tup;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
Datum values[Natts_pg_task];
bool nulls[Natts_pg_task];
bool doreplace[Natts_pg_task];
pid_t cron_pid;
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
memset(doreplace, false, sizeof(doreplace));
pg_task = table_open(TaskRelationId, RowExclusiveLock);
/* try to find the task */
ScanKeyInit(&scanKey[0], Anum_pg_task_jobid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(jobid));
scanDescriptor = systable_beginscan(pg_task, TaskJobIdIndexId,
true, NULL, 1, scanKey);
tup = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(tup))
elog(ERROR, "could not find valid entry for job");
/* specify the fields that need to be updated */
if (schedule)
{
values[Anum_pg_task_schedule - 1] = CStringGetTextDatum(schedule);
doreplace[Anum_pg_task_schedule - 1] = true;
}
if (command)
{
values[Anum_pg_task_command - 1] = CStringGetTextDatum(command);
doreplace[Anum_pg_task_command - 1] = true;
}
if (database)
{
values[Anum_pg_task_database - 1] = CStringGetTextDatum(database);
doreplace[Anum_pg_task_database - 1] = true;
}
if (username)
{
values[Anum_pg_task_username - 1] = CStringGetTextDatum(username);
doreplace[Anum_pg_task_username - 1] = true;
}
if (active)
{
values[Anum_pg_task_active - 1] = BoolGetDatum(*active);
doreplace[Anum_pg_task_active - 1] = true;
}
tup = heap_modify_tuple(tup, RelationGetDescr(pg_task), values, nulls, doreplace);
CatalogTupleUpdate(pg_task, &tup->t_self, tup);
heap_freetuple(tup);
systable_endscan(scanDescriptor);
table_close(pg_task, RowExclusiveLock);
/* Send SIGHUP to pg_cron launcher to reload the task */
cron_pid = PgCronLauncherPID();
if (cron_pid == InvalidPid)
elog(ERROR, "could not find pid of pg_cron launcher process");
if (kill(cron_pid, SIGHUP) < 0)
elog(DEBUG3, "kill(%ld,%d) failed: %m", (long) cron_pid, SIGHUP);
}
/*
* GetTaskJobId
* Get the jobid of a task.
*/
Oid
GetTaskJobId(const char *jobname, const char *username)
{
Relation pg_task;
HeapTuple tup;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[2];
Oid jobid = InvalidOid;
Form_pg_task task;
pg_task = table_open(TaskRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_task_jobname, BTEqualStrategyNumber,
F_TEXTEQ, CStringGetTextDatum(jobname));
ScanKeyInit(&scanKey[1], Anum_pg_task_username, BTEqualStrategyNumber,
F_TEXTEQ, CStringGetTextDatum(username));
scanDescriptor = systable_beginscan(pg_task, TaskJobNameUserNameIndexId,
true, NULL, 2, scanKey);
tup = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(tup))
{
task = (Form_pg_task) GETSTRUCT(tup);
jobid = task->jobid;
}
systable_endscan(scanDescriptor);
table_close(pg_task, AccessShareLock);
return jobid;
}
/*
* GetTaskNameById
* Get task name by job id.
*/
char *
GetTaskNameById(Oid jobid)
{
Relation pg_task;
HeapTuple tup;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
char *result = NULL;
TupleDesc tupleDesc = NULL;
bool isNull = false;
pg_task = table_open(TaskRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_task_jobid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(jobid));
scanDescriptor = systable_beginscan(pg_task, TaskJobIdIndexId,
true, NULL, 1, scanKey);
tup = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(tup))
{
tupleDesc = RelationGetDescr(pg_task);
Datum jobname = heap_getattr(tup, Anum_pg_task_jobname, tupleDesc, &isNull);
if (!isNull)
result = TextDatumGetCString(jobname);
}
systable_endscan(scanDescriptor);
table_close(pg_task, AccessShareLock);
return result;
}