/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "envswitch.h"
#include "dynrm.h"
#include "resqueuecommand.h"
#include "miscadmin.h"
#include "communication/rmcomm_QD2RM.h"
#include "utils/linkedlist.h"

#include "catalog/pg_resqueue.h"
#include "utils/resscheduler.h"
#include "commands/defrem.h"

/*******************************************************************************
 * This file contains all functions for creating, altering and dropping resource
 * queue through SQL DDL statement. All statement information is saved in the
 * argument stmt.
 ******************************************************************************/

void validateDDLAttributeOptions(List *options);

/*
 * CREATE RESOURCE QUEUE statement handler.
 */
void createResourceQueue(CreateQueueStmt *stmt)
{
	int 		 res 			 = FUNC_RETURN_OK;
	static char  errorbuf[1024];
	Relation	 pg_resqueue_rel;
	cqContext	 cqc;

	/* Permission check - only superuser can create queues. */
	if (!superuser())
	{
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("must be superuser to create resource queues")));
	}
	/*
     * MPP-7960: We cannot run CREATE RESOURCE QUEUE inside a user transaction
     * block because the shared memory structures are not cleaned up on abort,
     * resulting in "leaked", unreachable queues.
     */

    if (Gp_role == GP_ROLE_DISPATCH)
    {
        PreventTransactionChain((void *) stmt, "CREATE RESOURCE QUEUE");
    }

    /* Validate options. */
    validateDDLAttributeOptions(stmt->options);

	/*
	 * Check for an illegal name ('none' is used to signify no queue in ALTER
	 * ROLE).
	 */
	if (strcmp(stmt->queue, "none") == 0)
	{
		ereport(ERROR,
				(errcode(ERRCODE_RESERVED_NAME),
				 errmsg("resource queue name %s is reserved",
						stmt->queue),
				 errOmitLocation(true)));
	}
	/*
	 * Check the pg_resqueue relation to be certain the queue doesn't already
	 * exist.
	 */
	pg_resqueue_rel = heap_open(ResQueueRelationId, RowExclusiveLock);

	if (caql_getcount(
			caql_addrel(cqclr(&cqc), pg_resqueue_rel),
			cql("SELECT COUNT(*) FROM pg_resqueue WHERE rsqname = :1",
				CStringGetDatum(stmt->queue))))
	{
		ereport(ERROR,
				(errcode(ERRCODE_DUPLICATE_OBJECT),
				 errmsg("resource queue %s already exists",
						stmt->queue)));
	}

	heap_close(pg_resqueue_rel, NoLock);
    /*
     * Build the create resource queue request and send it to HAWQ RM process.
     * Basically, HAWQ RM runs all necessary logic to verify the statement and
     * apply the change. Therefore, QD only sends out the original information
     * and waits for the response.
     */
	int resourceid = 0;
	res = createNewResourceContext(&resourceid);
	if ( res != FUNC_RETURN_OK )
	{
		Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("too many resource contexts were created")));
	}

	/* Here, using user oid is more convenient. */
	res = registerConnectionInRMByOID(resourceid,
									  GetUserId(),
									  errorbuf,
									  sizeof(errorbuf));
	if ( res != FUNC_RETURN_OK )
	{
		releaseResourceContextWithErrorReport(resourceid);
		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
	}

    res = manipulateResourceQueue(resourceid,
    							  stmt->queue,
    							  MANIPULATE_RESQUEUE_CREATE,
    							  stmt->options,
    							  errorbuf,
    							  sizeof(errorbuf));

	/* We always unregister connection. */
	unregisterConnectionInRMWithErrorReport(resourceid);

	/* We always release resource context. */
	releaseResourceContextWithErrorReport(resourceid);

	if ( res != FUNC_RETURN_OK )
	{
		ereport(ERROR,
				(errcode(IS_TO_RM_RPC_ERROR(res) ?
						 ERRCODE_INTERNAL_ERROR :
						 ERRCODE_INVALID_OBJECT_DEFINITION),
				 errmsg("%s", errorbuf)));
	}
	elog(LOG, "Complete applying CREATE RESOURCE QUEUE statement.");
}

/*******************************************************************************
 * DROP RESOURCE QUEUE statement handler.
 * stmt[in]		The parsed statement tree.
 ******************************************************************************/
void dropResourceQueue(DropQueueStmt *stmt)
{
	int res 		= FUNC_RETURN_OK;
	char errorbuf[1024];
	Relation	 pg_resqueue_rel;
	HeapTuple	 tuple;
	cqContext	 cqc;
	cqContext	*pcqCtx;
	Oid			 queueid;

	/* Permission check - only superuser can create queues. */
	if (!superuser())
	{
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("must be superuser to drop resource queues")));
	}

	/* Cannot DROP default and root queue  */
	if ( strcmp(stmt->queue, RESOURCE_QUEUE_DEFAULT_QUEUE_NAME) == 0 ||
		 strcmp(stmt->queue, RESOURCE_QUEUE_ROOT_QUEUE_NAME) == 0 )
	{
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("cannot drop system resource queue %s",
						stmt->queue)));
	}

	/*
	 * Check the pg_resqueue relation to be certain the queue already
	 * exists.
	 */
	pg_resqueue_rel = heap_open(ResQueueRelationId, RowExclusiveLock);

	pcqCtx = caql_addrel(cqclr(&cqc), pg_resqueue_rel);

	tuple = caql_getfirst(pcqCtx,
						  cql("SELECT * FROM pg_resqueue WHERE rsqname = :1 FOR UPDATE",
						  CStringGetDatum(stmt->queue)));

	if (!HeapTupleIsValid(tuple))
		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_OBJECT),
				 errmsg("resource queue %s does not exist",
						stmt->queue)));

	/* Remember the Oid */
	queueid = HeapTupleGetOid(tuple);

	/*
	 * Check to see if any roles are in this queue.
	 */
	if (caql_getcount(
			NULL,
			cql("SELECT COUNT(*) FROM pg_authid WHERE rolresqueue = :1",
				ObjectIdGetDatum(queueid))))
	{
		ereport(ERROR,
				(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
				 errmsg("resource queue %s is used by at least one role",
						stmt->queue)));
	}

	heap_close(pg_resqueue_rel, NoLock);

	/*
	 * MPP-7960: We cannot run DROP RESOURCE QUEUE inside a user transaction
	 * block because the shared memory structures are not cleaned up on abort,
	 * resulting in "leaked", unreachable queues.
	 */

	if (Gp_role == GP_ROLE_DISPATCH)
	{
		PreventTransactionChain((void *) stmt, "DROP RESOURCE QUEUE");
	}

	/*
	 * Build the drop resource queue request and send it to HAWQ RM process.
	 * Basically, HAWQ RM runs all necessary logic to verify the statement and
	 * apply the change. Therefore, QD only sends out the original information
	 * and waits for the response.
	 */
	int resourceid = 0;
	res = createNewResourceContext(&resourceid);
	if ( res != FUNC_RETURN_OK ) {
		Assert( res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT );
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("too many resource contexts were created")));
	}

	/* Here, using user oid is more convenient. */
	res = registerConnectionInRMByOID(resourceid,
									  GetUserId(),
									  errorbuf,
									  sizeof(errorbuf));
	if ( res != FUNC_RETURN_OK ) {
		releaseResourceContextWithErrorReport(resourceid);
		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
	}

	res = manipulateResourceQueue(resourceid,
								  stmt->queue,
								  MANIPULATE_RESQUEUE_DROP,
								  NULL,
								  errorbuf,
								  sizeof(errorbuf));

	/* We always unregister connection. */
	unregisterConnectionInRMWithErrorReport(resourceid);

	/* We always release resource context. */
	releaseResourceContextWithErrorReport(resourceid);

	if ( res != FUNC_RETURN_OK )
	{
		ereport(ERROR,
				(errcode(IS_TO_RM_RPC_ERROR(res) ?
						 ERRCODE_INTERNAL_ERROR :
						 ERRCODE_INVALID_OBJECT_DEFINITION),
				 errmsg("%s", errorbuf)));
	}

	elog(LOG, "Completed applying DROP RESOURCE QUEUE statement.");
}

/*******************************************************************************
 * ALTER RESOURCE QUEUE statement handler.
 * stmt[in]		The parsed statement tree.
 ******************************************************************************/
void alterResourceQueue(AlterQueueStmt *stmt)
{
	int 		res 		= FUNC_RETURN_OK;
	static char errorbuf[1024];

	Relation    pg_resqueue_rel;
	cqContext	cqc;

	/* Permission check - only superuser can create queues. */
	if (!superuser())
	{
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("must be superuser to alter resource queues")));
	}

	/* Cannot DROP default and root queue  */
	if ( strcmp(stmt->queue, RESOURCE_QUEUE_ROOT_QUEUE_NAME) == 0 )
	{
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("cannot alter system resource queue %s",
						stmt->queue)));
	}

	/*
	 * MPP-7960: We cannot run ALTER RESOURCE QUEUE inside a user transaction
	 * block because the shared memory structures are not cleaned up on abort,
	 * resulting in "leaked", unreachable queues.
	 */

	if (Gp_role == GP_ROLE_DISPATCH)
	{
		PreventTransactionChain((void *) stmt, "ALTER RESOURCE QUEUE");
	}

    /* Validate options. */
    validateDDLAttributeOptions(stmt->options);

	/*
	 * Check if resource queue exists
	 */
	pg_resqueue_rel = heap_open(ResQueueRelationId, RowExclusiveLock);

	if (caql_getcount(
			caql_addrel(cqclr(&cqc), pg_resqueue_rel),
			cql("SELECT COUNT(*) FROM pg_resqueue WHERE rsqname = :1",
				CStringGetDatum(stmt->queue))) == 0)
	{
		ereport(ERROR,
				(errcode(ERRCODE_DUPLICATE_OBJECT),
				 errmsg("resource queue %s does not exist",
						stmt->queue)));
	}

	heap_close(pg_resqueue_rel, NoLock);

	/*
	 * Build the alter resource queue request and send it to HAWQ RM process.
	 * Basically, HAWQ RM runs all necessary logic to verify the statement and
	 * apply the change. Therefore, QD only sends out the original information
	 * and waits for the response.
	 */
	int resourceid = 0;
	res = createNewResourceContext(&resourceid);
	if (res != FUNC_RETURN_OK) {
		Assert(res == COMM2RM_CLIENT_FULL_RESOURCECONTEXT);
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
						errmsg("too many existing resource context")));
	}

	/* Here, using user oid is more convenient. */
	res = registerConnectionInRMByOID(resourceid,
									  GetUserId(),
									  errorbuf,
									  sizeof(errorbuf));
	if ( res != FUNC_RETURN_OK ) {
		releaseResourceContextWithErrorReport(resourceid);
		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf)));
	}

	res = manipulateResourceQueue(resourceid,
								  stmt->queue,
								  MANIPULATE_RESQUEUE_ALTER,
								  stmt->options,
								  errorbuf,
								  sizeof(errorbuf));
	/* We always unregister connection. */
	unregisterConnectionInRMWithErrorReport(resourceid);

	/* We always release resource context. */
	releaseResourceContextWithErrorReport(resourceid);

	if ( res != FUNC_RETURN_OK )
	{
		ereport(ERROR,
				(errcode(IS_TO_RM_RPC_ERROR(res) ?
						 ERRCODE_INTERNAL_ERROR :
						 ERRCODE_INVALID_OBJECT_DEFINITION),
				 errmsg("%s", errorbuf)));
	}

	elog(LOG, "Completed applying ALTER RESOURCE QUEUE statement.");
}

#define VALID_DDL_DUP(index, defel, targref)				   \
		if (strcmp((defel)->defname, RSQDDLAttrNames[(index)]) == 0)		   \
		{																	   \
			if ((targref) != NULL)											   \
			{																   \
				ereport(ERROR,												   \
						(errcode(ERRCODE_SYNTAX_ERROR),						   \
						 errmsg("redundant attribute %s",					   \
								RSQDDLAttrNames[(index)])));				   \
			}																   \
			(targref) = (defel);											   \
			continue;														   \
		}

void validateDDLAttributeOptions(List *options)
{
	DefElem		*dparent		  	  = NULL;
	DefElem     *dactivelimit 	  	  = NULL;
	DefElem     *dmemorylimit 	  	  = NULL;
	DefElem     *dcorelimit 	  	  = NULL;
	DefElem		*dvsegresquota	  	  = NULL;
	DefElem		*dallocpolicy	  	  = NULL;
	DefElem		*dresovercommit  	  = NULL;
	DefElem		*dnvsegupperlimit 	  = NULL;
	DefElem		*dnvseglowerlimit 	  = NULL;
	DefElem		*dnvsegupperlimitpseg = NULL;
	DefElem		*dnvseglowerlimitpseg = NULL;

	Cost		 activelimit 	 = INVALID_RES_LIMIT_THRESHOLD;
	ListCell    *option			 = NULL;

	/* Extract options from the statement node tree, check duplicate options. */
	foreach(option, options)
	{
		DefElem    *defel = (DefElem *) lfirst(option);
		VALID_DDL_DUP(RSQ_DDL_ATTR_PARENT, 						defel, dparent)
		VALID_DDL_DUP(RSQ_DDL_ATTR_ACTIVE_STATMENTS, 			defel, dactivelimit)
		VALID_DDL_DUP(RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER, 		defel, dmemorylimit)
		VALID_DDL_DUP(RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER, 			defel, dcorelimit)
		VALID_DDL_DUP(RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA, 		defel, dvsegresquota)
		VALID_DDL_DUP(RSQ_DDL_ATTR_ALLOCATION_POLICY, 			defel, dallocpolicy)
		VALID_DDL_DUP(RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR,	defel, dresovercommit)
		VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT, 			defel, dnvsegupperlimit)
		VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT, 			defel, dnvseglowerlimit)
		VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG, 	defel, dnvsegupperlimitpseg)
		VALID_DDL_DUP(RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG, 	defel, dnvseglowerlimitpseg)
	}

	/* Perform range checks on the various thresholds.*/
	if (dactivelimit)
	{
		activelimit = (Cost) defGetInt64(dactivelimit);
		if ( activelimit < MINIMUM_RESQUEUE_ACTIVESTATS_N )
		{
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("%s is less than %d",
						   RSQDDLAttrNames[RSQ_DDL_ATTR_ACTIVE_STATMENTS],
						   MINIMUM_RESQUEUE_ACTIVESTATS_N)));
		}
	}

	/* Memory and core expression must be the same. */
	if (dmemorylimit && dcorelimit)
	{
		bool  need_free_mem  = false;
		bool  need_free_core = false;
		char *memory_limit   = defGetString(dmemorylimit, &need_free_mem);
		char *core_limit     = defGetString(dcorelimit,   &need_free_core);

		if (memory_limit != NULL && core_limit != NULL)
		{
			if(strcmp(memory_limit, core_limit) != 0)
			{
				if(need_free_mem) { free(memory_limit); }
				if(need_free_core) { free(core_limit); }
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
						errmsg("the values of %s and %s are not identical",
								RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
								RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER])));
			}
		}
		else
		{
			if(need_free_mem) { free(memory_limit); }
			if(need_free_core) { free(core_limit); }
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					errmsg("invalid value of %s or %s",
						   RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
						   RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER])));
		}
	}

	/*
	 * NVSEG_UPPER_LIMIT/NVSEG_LOWER_LIMIT has 0 as default value that means the
	 * setting is not effective, otherwise, it must be greater than 0.
	 */
	int64_t nvsegupperlimit = -1;
	int64_t nvseglowerlimit = -1;
	if (dnvsegupperlimit != NULL)
	{
		nvsegupperlimit = defGetInt64(dnvsegupperlimit);
		if (nvsegupperlimit < MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N)
		{
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("%s is less than %s",
							RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT],
							MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT)));
		}
	}

	if (dnvseglowerlimit != NULL)
	{
		nvseglowerlimit = defGetInt64(dnvseglowerlimit);
		if (nvseglowerlimit < MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N)
		{
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("%s is less than %s",
							RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT],
							MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT)));
		}
	}

	/*
	 * NVSEG_UPPER_LIMIT_PERSEG/NVSEG_LOWER_LIMIT_PERSEG has 0 as default value
	 * that means the setting is not effective, otherwise, it must be greater
	 * than 0.
	 */
	double nvsegupperlimitpseg = -1.0;
	double nvseglowerlimitpseg = -1.0;
	if (dnvsegupperlimitpseg != NULL)
	{
		nvsegupperlimitpseg = defGetNumeric(dnvsegupperlimitpseg);
		if (nvsegupperlimitpseg < MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N)
		{
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("%s is less than %s",
							RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
							MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT)));
		}
	}

	if (dnvseglowerlimitpseg != NULL)
	{
		nvseglowerlimitpseg = defGetNumeric(dnvseglowerlimitpseg);
		if (nvseglowerlimitpseg < MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N)
		{
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("%s is less than %s",
							RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
							MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT)));
		}
	}


	/* The resource upper factor must be no less than 1. */
	if( dresovercommit != NULL)
	{
		double resovercommit = defGetNumeric(dresovercommit);
		if (resovercommit < MINIMUM_RESQUEUE_OVERCOMMIT_N)
		{
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("%s is less than %s",
							RSQDDLAttrNames[RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR],
							MINIMUM_RESQUEUE_OVERCOMMIT)));
		}
	}
}
