blob: 1c2b775da9766f652434f86b38a90b4aeedf238d [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* taskcmds.c
* Cloudberry TASK SCHEDULE support code.
*
* IDENTIFICATION
* src/backend/commands/taskcmds.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "catalog/pg_task.h"
#include "catalog/pg_task_run_history.h"
#include "cdb/cdbvars.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/taskcmds.h"
#include "task/job_metadata.h"
#include "utils/acl.h"
#include "utils/builtins.h"
/*
* DefineTask
* Create a new cron task.
*/
ObjectAddress
DefineTask(ParseState *pstate, CreateTaskStmt * stmt)
{
ObjectAddress address;
char *dbname = NULL;
char *username = NULL;
ListCell *option;
DefElem *d_dbname = NULL;
DefElem *d_username = NULL;
Oid jobid = InvalidOid;
AclResult aclresult;
/* must have CREATE privilege on database */
aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, OBJECT_DATABASE,
get_database_name(MyDatabaseId));
foreach(option, stmt->options)
{
DefElem *defel = (DefElem *) lfirst(option);
if (strcmp(defel->defname, "dbname") == 0)
{
if (d_dbname)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
d_dbname = defel;
}
else if (strcmp(defel->defname, "username") == 0)
{
if (d_username)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
d_username = defel;
}
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
/* use the specified or current user */
if (d_username != NULL && d_username->arg)
{
username = defGetString(d_username);
if (!OidIsValid(get_role_oid(username, false)))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("role \"%s\" does not exist", username)));
}
else
username = GetUserNameFromId(GetUserId(), false);
/* use the specified or current database */
if (d_dbname != NULL && d_dbname->arg)
dbname = defGetString(d_dbname);
else
dbname = get_database_name(MyDatabaseId);
if (!OidIsValid(get_database_oid(dbname, true)))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
/* check if the task already exists */
if (stmt->if_not_exists)
{
if (OidIsValid(GetTaskJobId(stmt->taskname, username)))
{
ereport(NOTICE,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("task \"%s\" already exists, skipping",
stmt->taskname)));
return InvalidObjectAddress;
}
}
if (!allowSystemTableMods && strncmp(stmt->taskname, DYNAMIC_TASK_PREFIX, 25) == 0)
{
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
errmsg("unacceptable task name \"%s\"", stmt->taskname),
errdetail("The prefix \"%s\" is reserved for system tasks.",
DYNAMIC_TASK_PREFIX)));
}
jobid = ScheduleCronJob(cstring_to_text(stmt->schedule), cstring_to_text(stmt->sql),
cstring_to_text(dbname), cstring_to_text(username),
true, cstring_to_text(stmt->taskname));
/* Depend on owner. */
recordDependencyOnOwner(TaskRelationId, jobid, get_role_oid(username, false));
ObjectAddressSet(address, TaskRelationId, jobid);
return address;
}
/*
* AlterTask
* Alter an existing cron task.
*/
ObjectAddress
AlterTask(ParseState *pstate, AlterTaskStmt * stmt)
{
ObjectAddress address = InvalidObjectAddress;
char *current_user;
Oid jobid = InvalidOid;
DefElem *d_schedule = NULL;
DefElem *d_dbname = NULL;
DefElem *d_username = NULL;
DefElem *d_active = NULL;
DefElem *d_sql = NULL;
ListCell *option;
char *schedule = NULL;
char *dbname = NULL;
char *username = NULL;
bool active;
char *sql = NULL;
current_user = GetUserNameFromId(GetUserId(), false);
jobid = GetTaskJobId(stmt->taskname, current_user);
if (!OidIsValid(jobid))
{
if (stmt->missing_ok)
{
ereport(NOTICE,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("task \"%s\" does not exist, skipping",
stmt->taskname)));
return address;
}
else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("task \"%s\" does not exist", stmt->taskname)));
}
foreach(option, stmt->options)
{
DefElem *defel = (DefElem *) lfirst(option);
if (strcmp(defel->defname, "schedule") == 0)
{
if (d_schedule)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
d_schedule = defel;
}
else if (strcmp(defel->defname, "dbname") == 0)
{
if (d_dbname)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
d_dbname = defel;
}
else if (strcmp(defel->defname, "username") == 0)
{
if (d_username)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
d_username = defel;
}
else if (strcmp(defel->defname, "active") == 0)
{
if (d_active)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
d_active = defel;
}
else if (strcmp(defel->defname, "sql") == 0)
{
if (d_sql)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
d_sql = defel;
}
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
if (d_schedule != NULL && d_schedule->arg)
schedule = defGetString(d_schedule);
if (d_dbname != NULL && d_dbname->arg)
{
dbname = defGetString(d_dbname);
if (!OidIsValid(get_database_oid(dbname, true)))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
}
if (d_username != NULL && d_username->arg)
{
username = defGetString(d_username);
if (!OidIsValid(get_role_oid(username, true)))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("role \"%s\" does not exist", username)));
}
if (d_active != NULL)
{
active = intVal(d_active->arg);
Assert(BoolIsValid(active));
}
if (d_sql != NULL && d_sql->arg)
{
if (strncmp(stmt->taskname, DYNAMIC_TASK_PREFIX, 25) == 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("can not alter REFRESH SQL of dynamic tables")));
else
sql = defGetString(d_sql);
}
AlterCronJob(jobid, schedule, sql, dbname, username, d_active != NULL ? &active : NULL);
if (username)
{
/* Update owner dependency reference */
changeDependencyOnOwner(TaskRelationId,
jobid,
get_role_oid(username, false));
}
ObjectAddressSet(address, TaskRelationId, jobid);
return address;
}
/*
* DropTask
* Drop an existing cron task.
*/
ObjectAddress
DropTask(ParseState *pstate, DropTaskStmt * stmt)
{
ObjectAddress address = InvalidObjectAddress;
char *username;
Oid jobid = InvalidOid;
/* current username */
username = GetUserNameFromId(GetUserId(), false);
/* delete from pg_task */
jobid = UnscheduleCronJob(stmt->taskname, username, InvalidOid, stmt->missing_ok);
/* delete from pg_task_run_history according to the jobid */
if (OidIsValid(jobid))
{
if (!allowSystemTableMods && strncmp(stmt->taskname, DYNAMIC_TASK_PREFIX, 25) == 0)
{
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
errmsg("can not drop a internal task \"%s\" paried with dynamic table", stmt->taskname),
errdetail("please drop the dynamic table instead")));
}
RemoveTaskRunHistoryByJobId(jobid);
ObjectAddressSet(address, TaskRelationId, jobid);
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(TaskRelationId, jobid, 0);
}
return address;
}
/*
* RemoveTaskById
* Remove an existing cron task by jobid.
*/
void
RemoveTaskById(Oid jobid)
{
/* remove the cron task in pg_task */
UnscheduleCronJob(NULL, NULL, jobid, false);
/* delete from pg_task_run_history according to the jobid */
if (OidIsValid(jobid))
{
RemoveTaskRunHistoryByJobId(jobid);
}
}