blob: 3c5e0e060b06472b11cd227e323b2015f2a7a26c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "envswitch.h"
#include "dynrm.h"
#include "resqueuecommand.h"
#include "miscadmin.h"
#include "communication/rmcomm_QD2RM.h"
#include "utils/linkedlist.h"
#include "catalog/pg_resqueue.h"
#include "utils/resscheduler.h"
#include "commands/defrem.h"
/*******************************************************************************
* This file contains all functions for creating, altering and dropping resource
* queue through SQL DDL statement. All statement information is saved in the
* argument stmt.
******************************************************************************/
void validateDDLAttributeOptions(List *options);
/*
* CREATE RESOURCE QUEUE statement handler.
*/
void createResourceQueue(CreateQueueStmt *stmt)
{
int res = FUNC_RETURN_OK;
static char errorbuf[1024];
Relation pg_resqueue_rel;
cqContext cqc;
/* Permission check - only superuser can create queues. */
if (!superuser())
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to create resource queues")));
}
/*
* MPP-7960: We cannot run CREATE RESOURCE QUEUE inside a user transaction
* block because the shared memory structures are not cleaned up on abort,
* resulting in "leaked", unreachable queues.
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
PreventTransactionChain((void *) stmt, "CREATE RESOURCE QUEUE");
}
/* Validate options. */
validateDDLAttributeOptions(stmt->options);
/*
* Check for an illegal name ('none' is used to signify no queue in ALTER
* ROLE).
*/
if (strcmp(stmt->queue, "none") == 0)
{
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
errmsg("resource queue name %s is reserved",
stmt->queue),
errOmitLocation(true)));
}
/*
* Check the pg_resqueue relation to be certain the queue doesn't already
* exist.
*/
pg_resqueue_rel = heap_open(ResQueueRelationId, RowExclusiveLock);
if (caql_getcount(
caql_addrel(cqclr(&cqc), pg_resqueue_rel),
cql("SELECT COUNT(*) FROM pg_resqueue WHERE rsqname = :1",
CStringGetDatum(stmt->queue))))
{
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("resource queue \'%s\' already exists",
stmt->queue)));
}
heap_close(pg_resqueue_rel, NoLock);
/*
* Build the create resource queue request and send it to HAWQ RM process.
* Basically, HAWQ RM runs all necessary logic to verify the statement and
* apply the change. Therefore, QD only sends out the original information
* and waits for the response.
*/
int resourceid = 0;
res = createNewResourceContext(&resourceid);
if ( res != FUNC_RETURN_OK )
{
Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("too many resource contexts were created")));
}
/* Here, using user oid is more convenient. */
res = registerConnectionInRMByOID(resourceid,
GetUserId(),
errorbuf,
sizeof(errorbuf));
if ( res != FUNC_RETURN_OK )
{
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
res = manipulateResourceQueue(resourceid,
stmt->queue,
MANIPULATE_RESQUEUE_CREATE,
stmt->options,
errorbuf,
sizeof(errorbuf));
/* We always unregister connection. */
unregisterConnectionInRMWithErrorReport(resourceid);
/* We always release resource context. */
releaseResourceContextWithErrorReport(resourceid);
if ( res != FUNC_RETURN_OK )
{
ereport(ERROR,
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("%s", errorbuf)));
}
elog(LOG, "Complete applying CREATE RESOURCE QUEUE statement.");
}
/*******************************************************************************
* DROP RESOURCE QUEUE statement handler.
* stmt[in] The parsed statement tree.
******************************************************************************/
void dropResourceQueue(DropQueueStmt *stmt)
{
int res = FUNC_RETURN_OK;
char errorbuf[1024];
Relation pg_resqueue_rel;
HeapTuple tuple;
cqContext cqc;
cqContext *pcqCtx;
Oid queueid;
/* Permission check - only superuser can create queues. */
if (!superuser())
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to drop resource queues")));
}
/* Cannot DROP default and root queue */
if ( strcmp(stmt->queue, RESOURCE_QUEUE_DEFAULT_QUEUE_NAME) == 0 ||
strcmp(stmt->queue, RESOURCE_QUEUE_ROOT_QUEUE_NAME) == 0 )
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot drop system resource queue \'%s\'",
stmt->queue)));
}
/*
* Check the pg_resqueue relation to be certain the queue already
* exists.
*/
pg_resqueue_rel = heap_open(ResQueueRelationId, RowExclusiveLock);
pcqCtx = caql_addrel(cqclr(&cqc), pg_resqueue_rel);
tuple = caql_getfirst(pcqCtx,
cql("SELECT * FROM pg_resqueue WHERE rsqname = :1 FOR UPDATE",
CStringGetDatum(stmt->queue)));
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("resource queue \'%s\' does not exist",
stmt->queue)));
/* Remember the Oid */
queueid = HeapTupleGetOid(tuple);
/*
* Check to see if any roles are in this queue.
*/
if (caql_getcount(
NULL,
cql("SELECT COUNT(*) FROM pg_authid WHERE rolresqueue = :1",
ObjectIdGetDatum(queueid))))
{
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("resource queue \'%s\' is used by at least one role",
stmt->queue)));
}
heap_close(pg_resqueue_rel, NoLock);
/*
* MPP-7960: We cannot run DROP RESOURCE QUEUE inside a user transaction
* block because the shared memory structures are not cleaned up on abort,
* resulting in "leaked", unreachable queues.
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
PreventTransactionChain((void *) stmt, "DROP RESOURCE QUEUE");
}
/*
* Build the drop resource queue request and send it to HAWQ RM process.
* Basically, HAWQ RM runs all necessary logic to verify the statement and
* apply the change. Therefore, QD only sends out the original information
* and waits for the response.
*/
int resourceid = 0;
res = createNewResourceContext(&resourceid);
if ( res != FUNC_RETURN_OK ) {
Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("too many resource contexts were created")));
}
/* Here, using user oid is more convenient. */
res = registerConnectionInRMByOID(resourceid,
GetUserId(),
errorbuf,
sizeof(errorbuf));
if ( res != FUNC_RETURN_OK ) {
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
res = manipulateResourceQueue(resourceid,
stmt->queue,
MANIPULATE_RESQUEUE_DROP,
NULL,
errorbuf,
sizeof(errorbuf));
/* We always unregister connection. */
unregisterConnectionInRMWithErrorReport(resourceid);
/* We always release resource context. */
releaseResourceContextWithErrorReport(resourceid);
if ( res != FUNC_RETURN_OK )
{
ereport(ERROR,
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("%s", errorbuf)));
}
elog(LOG, "Completed applying DROP RESOURCE QUEUE statement.");
}
/*******************************************************************************
* ALTER RESOURCE QUEUE statement handler.
* stmt[in] The parsed statement tree.
******************************************************************************/
void alterResourceQueue(AlterQueueStmt *stmt)
{
int res = FUNC_RETURN_OK;
static char errorbuf[1024];
Relation pg_resqueue_rel;
cqContext cqc;
/* Permission check - only superuser can create queues. */
if (!superuser())
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to alter resource queues")));
}
/* Cannot DROP default and root queue */
if ( strcmp(stmt->queue, RESOURCE_QUEUE_ROOT_QUEUE_NAME) == 0 )
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter system resource queue \'%s\'",
stmt->queue)));
}
/*
* MPP-7960: We cannot run ALTER RESOURCE QUEUE inside a user transaction
* block because the shared memory structures are not cleaned up on abort,
* resulting in "leaked", unreachable queues.
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
PreventTransactionChain((void *) stmt, "ALTER RESOURCE QUEUE");
}
/* Validate options. */
validateDDLAttributeOptions(stmt->options);
/*
* Check if resource queue exists
*/
pg_resqueue_rel = heap_open(ResQueueRelationId, RowExclusiveLock);
if (caql_getcount(
caql_addrel(cqclr(&cqc), pg_resqueue_rel),
cql("SELECT COUNT(*) FROM pg_resqueue WHERE rsqname = :1",
CStringGetDatum(stmt->queue))) == 0)
{
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("resource queue \'%s\' does not exist",
stmt->queue)));
}
heap_close(pg_resqueue_rel, NoLock);
/*
* Build the alter resource queue request and send it to HAWQ RM process.
* Basically, HAWQ RM runs all necessary logic to verify the statement and
* apply the change. Therefore, QD only sends out the original information
* and waits for the response.
*/
int resourceid = 0;
res = createNewResourceContext(&resourceid);
if (res != FUNC_RETURN_OK) {
Assert(res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("too many existing resource context")));
}
/* Here, using user oid is more convenient. */
res = registerConnectionInRMByOID(resourceid,
GetUserId(),
errorbuf,
sizeof(errorbuf));
if ( res != FUNC_RETURN_OK ) {
releaseResourceContextWithErrorReport(resourceid);
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
}
res = manipulateResourceQueue(resourceid,
stmt->queue,
MANIPULATE_RESQUEUE_ALTER,
stmt->options,
errorbuf,
sizeof(errorbuf));
/* We always unregister connection. */
unregisterConnectionInRMWithErrorReport(resourceid);
/* We always release resource context. */
releaseResourceContextWithErrorReport(resourceid);
if ( res != FUNC_RETURN_OK )
{
ereport(ERROR,
(errcode(IS_TO_RM_RPC_ERROR(res) ?
ERRCODE_INTERNAL_ERROR :
ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("%s", errorbuf)));
}
elog(LOG, "Completed applying ALTER RESOURCE QUEUE statement.");
}
#define VALID_DDL_DUP(index, defel, targref) \
if (strcmp((defel)->defname, RSQDDLAttrNames[(index)]) == 0) \
{ \
if ((targref) != NULL) \
{ \
ereport(ERROR, \
(errcode(ERRCODE_SYNTAX_ERROR), \
errmsg("redundant attribute %s", \
RSQDDLAttrNames[(index)]))); \
} \
(targref) = (defel); \
continue; \
}
void validateDDLAttributeOptions(List *options)
{
DefElem *dparent = NULL;
DefElem *dactivelimit = NULL;
DefElem *dmemorylimit = NULL;
DefElem *dcorelimit = NULL;
DefElem *dvsegresquota = NULL;
DefElem *dallocpolicy = NULL;
DefElem *dresovercommit = NULL;
DefElem *dnvsegupperlimit = NULL;
DefElem *dnvseglowerlimit = NULL;
DefElem *dnvsegupperlimitpseg = NULL;
DefElem *dnvseglowerlimitpseg = NULL;
Cost activelimit = INVALID_RES_LIMIT_THRESHOLD;
ListCell *option = NULL;
/* Extract options from the statement node tree, check duplicate options. */
foreach(option, options)
{
DefElem *defel = (DefElem *) lfirst(option);
VALID_DDL_DUP(RSQ_DDL_ATTR_PARENT, defel, dparent)
VALID_DDL_DUP(RSQ_DDL_ATTR_ACTIVE_STATMENTS, defel, dactivelimit)
VALID_DDL_DUP(RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, defel, dmemorylimit)
VALID_DDL_DUP(RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, defel, dcorelimit)
VALID_DDL_DUP(RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA, defel, dvsegresquota)
VALID_DDL_DUP(RSQ_DDL_ATTR_ALLOCATION_POLICY, defel, dallocpolicy)
VALID_DDL_DUP(RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR, defel, dresovercommit)
VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT, defel, dnvsegupperlimit)
VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT, defel, dnvseglowerlimit)
VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG, defel, dnvsegupperlimitpseg)
VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG, defel, dnvseglowerlimitpseg)
}
/* Perform range checks on the various thresholds.*/
if (dactivelimit)
{
activelimit = (Cost) defGetInt64(dactivelimit);
if ( activelimit < MINIMUM_RESQUEUE_ACTIVESTATS_N )
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s is less than %d",
RSQDDLAttrNames[RSQ_DDL_ATTR_ACTIVE_STATMENTS],
MINIMUM_RESQUEUE_ACTIVESTATS_N)));
}
}
/* Memory and core expression must be the same. */
if (dmemorylimit && dcorelimit)
{
bool need_free_mem = false;
bool need_free_core = false;
char *memory_limit = defGetString(dmemorylimit, &need_free_mem);
char *core_limit = defGetString(dcorelimit, &need_free_core);
if (memory_limit != NULL && core_limit != NULL)
{
if(strcmp(memory_limit, core_limit) != 0)
{
if(need_free_mem) { free(memory_limit); }
if(need_free_core) { free(core_limit); }
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("the values of %s and %s are not identical",
RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER])));
}
}
else
{
if(need_free_mem) { free(memory_limit); }
if(need_free_core) { free(core_limit); }
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value of %s or %s",
RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER])));
}
}
/*
* NVSEG_UPPER_LIMIT/NVSEG_LOWER_LIMIT has 0 as default value that means the
* setting is not effective, otherwise, it must be greater than 0.
*/
int64_t nvsegupperlimit = -1;
int64_t nvseglowerlimit = -1;
if (dnvsegupperlimit != NULL)
{
nvsegupperlimit = defGetInt64(dnvsegupperlimit);
if (nvsegupperlimit < MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s is less than %s",
RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT],
MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT)));
}
}
if (dnvseglowerlimit != NULL)
{
nvseglowerlimit = defGetInt64(dnvseglowerlimit);
if (nvseglowerlimit < MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s is less than %s",
RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT],
MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT)));
}
}
/*
* NVSEG_UPPER_LIMIT_PERSEG/NVSEG_LOWER_LIMIT_PERSEG has 0 as default value
* that means the setting is not effective, otherwise, it must be greater
* than 0.
*/
double nvsegupperlimitpseg = -1.0;
double nvseglowerlimitpseg = -1.0;
if (dnvsegupperlimitpseg != NULL)
{
nvsegupperlimitpseg = defGetNumeric(dnvsegupperlimitpseg);
if (nvsegupperlimitpseg < MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s is less than %s",
RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT)));
}
}
if (dnvseglowerlimitpseg != NULL)
{
nvseglowerlimitpseg = defGetNumeric(dnvseglowerlimitpseg);
if (nvseglowerlimitpseg < MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s is less than %s",
RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT)));
}
}
/* The resource upper factor must be no less than 1. */
if( dresovercommit != NULL)
{
double resovercommit = defGetNumeric(dresovercommit);
if (resovercommit < MINIMUM_RESQUEUE_OVERCOMMIT_N)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s is less than %s",
RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR],
MINIMUM_RESQUEUE_OVERCOMMIT)));
}
}
}