/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "dynrm.h"
#include "utils/simplestring.h"
#include "utils/network_utils.h"
#include "utils/kvproperties.h"
#include "utils/memutilities.h"
#include "communication/rmcomm_MessageHandler.h"
#include "communication/rmcomm_QD_RM_Protocol.h"
#include "catalog/pg_resqueue.h"
#include "resourcemanager/resqueuemanager.h"

/*
 * The DDL statement attribute name strings.
 */
char RSQDDLAttrNames[RSQ_DDL_ATTR_COUNT]
					[RESOURCE_QUEUE_DDL_ATTR_LENGTH_MAX+1] = {
	"parent",
	"active_statements",
	"memory_limit_cluster",
	"core_limit_cluster",
	"vseg_resource_quota",
	"allocation_policy",
	"resource_overcommit_factor",
	"nvseg_upper_limit",
	"nvseg_lower_limit",
	"nvseg_upper_limit_perseg",
	"nvseg_lower_limit_perseg"
};

/*
 * The attribute names for expressing one complete resource queue definition.
 */
char RSQTBLAttrNames[RSQ_TBL_ATTR_COUNT]
					 [RESOURCE_QUEUE_TBL_COLNAME_LENGTH_MAX+1] = {
	"parentoid",
	"activestats",
	"memorylimit",
	"corelimit",
	"vsegresourcequota",
	"allocpolicy",
	"resovercommit",
	"nvsegupperlimit",
	"nvseglowerlimit",
	"nvsegupperlimitperseg",
	"nvseglowerlimitperseg",

	"oid",
	"rsqname",
	"creationtime",
	"updatetime",
	"status"
};

/*
 * The possible resource allocation policy names.
 */
static char RSQDDLValueAllocationPolicy[RSQ_ALLOCATION_POLICY_COUNT]
									   [RESOURCE_QUEUE_DDL_POLICY_LENGTH_MAX] = {
	"even"
};

/*
 * The attributes for expressing one complete role/user definition.
 */
static char USRTBLAttrNames[USR_TBL_ATTR_COUNT]
						   [RESOURCE_ROLE_DDL_ATTR_LENGTH_MAX] = {
	"oid",
	"name",
	"target",
	"priority",
	"is_superuser"
};

/* Internal functions. */

/*------------------------------------------
 * The resource quota calculation functions.
 *------------------------------------------*/
typedef int  (* computeQueryQuotaByPolicy )(DynResourceQueueTrack,
											int32_t *,
											int32_t *,
											int32_t,
											char *,
											int);

int computeQueryQuota_EVEN(DynResourceQueueTrack	track,
						   int32_t			   	   *segnum,
						   int32_t			   	   *segnummin,
						   int32_t					segnumlimit,
						   char				   	   *errorbuf,
						   int						errorbufsize);

int32_t min(int32_t a, int32_t b);
int32_t max(int32_t a, int32_t b);
computeQueryQuotaByPolicy AllocationPolicy[RSQ_ALLOCATION_POLICY_COUNT] = {
	computeQueryQuota_EVEN
};

/*------------------------------------------
 * The resource distribution functions.
 *------------------------------------------*/
typedef int (* dispatchResourceToQueriesByPolicy )(DynResourceQueueTrack);

int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track);

dispatchResourceToQueriesByPolicy DispatchPolicy[RSQ_ALLOCATION_POLICY_COUNT] = {
	dispatchResourceToQueries_EVEN
};

void dispatchResourceToQueriesInOneQueue(DynResourceQueueTrack track);

/* Functions for operating resource queue tracker instance. */
DynResourceQueueTrack createDynResourceQueueTrack(DynResourceQueue queue);

void returnAllocatedResourceToLeafQueue(DynResourceQueueTrack 	track,
								    	int32_t 				memorymb,
								    	double					core);

void refreshResourceQueuePercentageCapacityInternal(uint32_t clustermemmb,
													uint32_t clustercore,
													bool	 queuechanged);

/* Internal APIs for maintaining memory/core ratio trackers. */
int32_t addResourceQueueRatio(DynResourceQueueTrack track);
void removeResourceQueueRatio(DynResourceQueueTrack track);

DynMemoryCoreRatioTrack createDynMemoryCoreRatioTrack(uint32_t ratio,
		  	  	  	  	  	  	  	  	  	  	      int32_t  index );
void freeMemoryCoreTrack(DynMemoryCoreRatioTrack mctrack);
int removeQueueTrackFromMemoryCoreTrack(DynMemoryCoreRatioTrack mctrack,
										DynResourceQueueTrack   track);

int getRSQTBLAttributeNameIndex(SimpStringPtr attrname);
int getRSQDDLAttributeNameIndex(SimpStringPtr attrname);
int getUSRTBLAttributeNameIndex(SimpStringPtr attrname);

void detectAndDealWithDeadLock(DynResourceQueueTrack track);

void markMemoryCoreRatioWaterMark(DQueue 		marks,
								  uint64_t 		curmicrosec,
								  int32_t 		memmb,
								  double 		core);

void buildAcquireResourceErrorResponseAndSend(ConnectionTrack  conntrack,
										  	  int 		  	   errorcode,
											  char			  *errorbuf);

void buildAcquireResourceErrorResponse(ConnectionTrack	conntrack,
									   int				errorcode,
									   char			   *errorbuf);

RESOURCEPROBLEM isResourceAcceptable(ConnectionTrack conn, int segnumact);
/*----------------------------------------------------------------------------*/
/*                    RESOURCE QUEUE MANAGER EXTERNAL APIs                    */
/*----------------------------------------------------------------------------*/

int32_t min(int32_t a, int32_t b)
{
	return a>b ? b : a;
}

int32_t max(int32_t a, int32_t b)
{
	return a<b ? b : a;
}

/* Initialize the resource queue manager instance. */
void initializeResourceQueueManager(void)
{
    ASSERT_DRM_GLOBAL_INSTANCE_CREATED

	PQUEMGR->RootTrack    	= NULL;
    PQUEMGR->DefaultTrack 	= NULL;
    PQUEMGR->Queues 		= NULL;

	/* 
	 * The two hash tables hold only the mapping from queue id or queue name to
	 * the queue object saved in Queues.
  	 */
    initializeHASHTABLE(&(PQUEMGR->QueuesIDIndex),
    					PCONTEXT,
						HASHTABLE_SLOT_VOLUME_DEFAULT,
						HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
						HASHTABLE_KEYTYPE_CHARARRAY,
						NULL);

    initializeHASHTABLE(&(PQUEMGR->QueuesNameIndex),
    					PCONTEXT,
						HASHTABLE_SLOT_VOLUME_DEFAULT,
						HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
						HASHTABLE_KEYTYPE_SIMPSTR,
						NULL);

    /* Initialize user information part. */
    PQUEMGR->Users = NULL;

    initializeHASHTABLE(&(PQUEMGR->UsersIDIndex),
    					PCONTEXT,
						HASHTABLE_SLOT_VOLUME_DEFAULT,
						HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
						HASHTABLE_KEYTYPE_CHARARRAY,
						NULL);

    initializeHASHTABLE(&(PQUEMGR->UsersNameIndex),
    					PCONTEXT,
						HASHTABLE_SLOT_VOLUME_DEFAULT,
						HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
						HASHTABLE_KEYTYPE_SIMPSTR,
						NULL);

    /* Initialize memory/core ratio counter and index. */
    PQUEMGR->RatioCount = 0;
    initializeHASHTABLE(&(PQUEMGR->RatioIndex),
    					PCONTEXT,
    			  	  	HASHTABLE_SLOT_VOLUME_DEFAULT,
    			  	  	HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
    			  	  	HASHTABLE_KEYTYPE_UINT32,
    			  	  	NULL);
    for ( int i = 0 ; i < RESOURCE_QUEUE_RATIO_SIZE ; ++i)
    {
    	PQUEMGR->RatioReverseIndex[i]     = -1;
    	PQUEMGR->RatioReferenceCounter[i] = 0;
    	PQUEMGR->RatioTrackers[i]         = NULL;
    	initializeDQueue(&(PQUEMGR->RatioWaterMarks[i]), PCONTEXT);
    }

    PQUEMGR->LastCheckingDeadAllocationTime = 0;
    PQUEMGR->LastCheckingQueuedTimeoutTime  = 0;
    PQUEMGR->GRMQueueMaxCapacity 			= 1.0;
    PQUEMGR->GRMQueueCapacity				= 1.0;
    PQUEMGR->GRMQueueCurCapacity			= 0.0;
    PQUEMGR->GRMQueueResourceTight			= false;
    PQUEMGR->toRunQueryDispatch 			= false;

    for ( int i = 0 ; i < RESPROBLEM_COUNT ; ++i )
    {
    	PQUEMGR->hasResourceProblem[i] = false;
    }

    PQUEMGR->ActualMinGRMContainerPerSeg = rm_min_resource_perseg;
}

/*
 * Recognize DDL attributes and shallow parse to fine grained attributes. This
 * function should be expanded when we hope to map one DDL attribute to more
 * than one TABLE attributes or we hope to reformat the value.
 */
int shallowparseResourceQueueWithAttributes(List 	*rawattr,
											List   **fineattr,
											char  	*errorbuf,
											int		 errorbufsize)
{
	ListCell *cell = NULL;
	foreach(cell, rawattr)
	{
		KVProperty property = lfirst(cell);

		if ( SimpleStringComp(&(property->Key),
							  (char *)getRSQTBLAttributeName(RSQ_TBL_ATTR_NAME)) == 0 )
		{
			KVProperty newprop = createPropertyString(
									 PCONTEXT,
									 NULL,
									 getRSQTBLAttributeName(RSQ_TBL_ATTR_NAME),
									 NULL,
									 property->Val.Str);

			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
			*fineattr = lappend(*fineattr, newprop);
			MEMORY_CONTEXT_SWITCH_BACK
			continue;
		}

		int attrindex = getRSQDDLAttributeNameIndex(&(property->Key));
		if ( attrindex == -1 )
		{
			snprintf(errorbuf, errorbufsize,
					 "not defined DDL attribute name %s",
					 property->Key.Str);
			elog(WARNING, "Resource manager failed parsing attribute, %s",
						  errorbuf);
			return RMDDL_WRONG_ATTRNAME;
		}

		switch(attrindex)
		{
		case RSQ_DDL_ATTR_PARENT:
		{
			/* Find oid of the parent resource queue. */
			DynResourceQueueTrack parentque =
				getQueueTrackByQueueName(property->Val.Str, property->Val.Len);
			if ( parentque == NULL )
			{
				snprintf(errorbuf, errorbufsize,
						 "cannot recognize parent resource queue name %s.",
						 property->Val.Str);
				elog(WARNING, "Resource manager failed parsing attribute, %s",
							  errorbuf);
				return RMDDL_WRONG_ATTRVALUE;
			}
			Assert( parentque != NULL );

			/* Build property. */
			Oid parentoid = (Oid) (parentque->QueueInfo->OID);
			KVProperty newprop = createPropertyOID(
									 PCONTEXT,
									 NULL,
									 getRSQTBLAttributeName(RSQ_TBL_ATTR_PARENT),
									 NULL,
									 parentoid);
			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
			*fineattr = lappend(*fineattr, newprop);
			MEMORY_CONTEXT_SWITCH_BACK
			break;
		}
		case RSQ_DDL_ATTR_ACTIVE_STATMENTS:
		case RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER:
		case RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER:
		case RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA:
		case RSQ_DDL_ATTR_ALLOCATION_POLICY:
		case RSQ_DDL_ATTR_RESOURCE_OVERCOMMIT_FACTOR:
		case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT:
		case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT:
		case RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG:
		case RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG:
		{

			/* The empty value is not allowed. */
			if ( SimpleStringEmpty(&(property->Val)) )
			{
				snprintf(errorbuf, errorbufsize,
						 "the value of %s cannot be empty",
						 RSQDDLAttrNames[attrindex]);
				elog(WARNING, "Resource manager failed parsing attribute, %s",
							  errorbuf);
				return RMDDL_WRONG_ATTRVALUE;
			}
			/*
			 * Build property.
			 *
			 * NOTE, this logic works, because there is a premise.
			 * RSQ_TBL_ATTR_XXX == RSQ_DDL_ATTR_XXX, for all enum values of
			 * RESOURCE_QUEUE_DDL_ATTR_INDEX.
			 *
			 */
			KVProperty newprop = createPropertyString(
									 PCONTEXT,
									 NULL,
									 getRSQTBLAttributeName(attrindex),
									 NULL,
									 property->Val.Str);

			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
			*fineattr = lappend(*fineattr, newprop);
			MEMORY_CONTEXT_SWITCH_BACK
			break;
		}
		default:
			Assert(false); /* Should never occur. */
		}

	DQUEUE_LOOP_END

	return FUNC_RETURN_OK;
}

/*
 * This function parses the attributes and translate into DynResourceQueue
 * struct's attributes. This functions does not generate logs higher than
 * WARNING, the concrete error is also saved in error buffer to make the caller
 * able to pass back the message to client.
 */
int parseResourceQueueAttributes( List 			 	*attributes,
								  DynResourceQueue 	 queue,
								  bool				 checkformatonly,
								  bool				 loadcatalog,
								  char 				*errorbuf,
								  int   			 errorbufsize)
{
	int 			res 		 = FUNC_RETURN_OK;
	int 			attrindex 	 = -1;
	Oid 			parentid;
	Oid				oid;

	bool			memlimit_percentage  = false;
	bool			memlimit_value		 = false;
	bool			corelimit_percentage = false;
	bool			corelimit_value		 = false;

	SimpStringPtr 	attrname  			 = NULL;
	SimpStringPtr 	attrvalue 			 = NULL;

	Assert( queue != NULL );

	/* Initialize attributes. */
	queue->OID						= InvalidOid;
	queue->ParentOID 				= InvalidOid;
	queue->ParallelCount			= -1;
	queue->ClusterMemoryMB			= -1;
	queue->Status					= RESOURCE_QUEUE_STATUS_VALID_LEAF;

	queue->ClusterVCore				= -1.0;
	queue->SegResourceQuotaVCore 	= -1.0;
	queue->SegResourceQuotaMemoryMB = -1;

	queue->ResourceOvercommit 		= DEFAULT_RESQUEUE_OVERCOMMIT_N;
	queue->NVSegUpperLimit			= DEFAULT_RESQUEUE_NVSEG_UPPER_LIMIT_N;
	queue->NVSegLowerLimit			= DEFAULT_RESQUEUE_NVSEG_LOWER_LIMIT_N;
	queue->NVSegUpperLimitPerSeg	= DEFAULT_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N;
	queue->NVSegLowerLimitPerSeg	= DEFAULT_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N;

	queue->AllocatePolicy 			= -1;
	queue->QueuingPolicy 			= -1;
	queue->InterQueuePolicy 		= -1;

	queue->ClusterMemoryPer			= -1;
	queue->ClusterVCorePer			= -1;

	memset(queue->Name, '\0', sizeof(queue->Name));

	/* Go through each attribute content. */
	errorbuf[0] = '\0';
	ListCell *cell = NULL;
	foreach(cell, attributes)
	{
		KVProperty value = lfirst(cell);
		attrname  = &(value->Key);
		attrvalue = &(value->Val);

		attrindex = getRSQTBLAttributeNameIndex(attrname);

		if ( SimpleStringEmpty(attrvalue) )
		{
			elog(DEBUG3, "No value for attribute %s.", attrname->Str);
			continue;
		}

		Assert( attrindex != -1 );
		/*
		 * Actually parse each attribute.
		 */
		switch(attrindex)
		{
		case RSQ_TBL_ATTR_OID:
			res = SimpleStringToOid(attrvalue, &oid);
			queue->OID = oid;
			break;

		case RSQ_TBL_ATTR_PARENT:
			res = SimpleStringToOid(attrvalue, &parentid);
			queue->ParentOID = parentid;
			break;

		case RSQ_TBL_ATTR_ACTIVE_STATMENTS:
			res = SimpleStringToInt32(attrvalue, &(queue->ParallelCount));
			break;

		case RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER:
			if ( SimpleStringIsPercentage(attrvalue) )
			{
				memlimit_percentage = true;
				int8_t inputval = 0;
				res = SimpleStringToPercentage(attrvalue, &inputval);
				queue->ClusterMemoryPer = inputval;
				queue->Status |= RESOURCE_QUEUE_STATUS_EXPRESS_PERCENT;
			}
			else
			{
				memlimit_value = true;
				res = SimpleStringToStorageSizeMB(attrvalue,
												  &(queue->ClusterMemoryMB));
			}
			break;

		case RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER:
			if ( SimpleStringIsPercentage(attrvalue) )
			{
				corelimit_percentage = true;
				int8_t inputval = 0;
				res = SimpleStringToPercentage(attrvalue, &inputval);
				queue->ClusterVCorePer = inputval;
				queue->Status |= RESOURCE_QUEUE_STATUS_EXPRESS_PERCENT;
			}
			else
			{
				corelimit_value = true;
				res = SimpleStringToDouble(attrvalue, &(queue->ClusterVCore));
			}
			break;

		case RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA:
			/* Decide it is a memory quota or core quota. */
			if ( SimpleStringStartWith(attrvalue,
									   RESOURCE_QUEUE_SEG_RES_QUOTA_MEM) == FUNC_RETURN_OK )
			{
				SimpString valuestr;
				setSimpleStringRef(&valuestr,
								   attrvalue->Str+sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)-1,
								   attrvalue->Len-sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)+1);

				res = SimpleStringToStorageSizeMB(&valuestr,
												  &(queue->SegResourceQuotaMemoryMB));

				/*
				 *--------------------------------------------------------------
				 * Check the value. We accept only :
				 * 128mb, 256mb, 512mb, 1gb, 2gb, 4gb, 8gb, 16gb
				 *--------------------------------------------------------------
				 */
				if ( res == FUNC_RETURN_OK )
				{
					elog(DEBUG3, "Resource manager parseResourceQueueAttributes() "
								 "parsed segment resource quota %d MB",
								 queue->SegResourceQuotaMemoryMB);

					if ( !(queue->SegResourceQuotaMemoryMB == (1<<7))  &&
						 !(queue->SegResourceQuotaMemoryMB == (1<<8))  &&
						 !(queue->SegResourceQuotaMemoryMB == (1<<9))  &&
						 !(queue->SegResourceQuotaMemoryMB == (1<<10)) &&
						 !(queue->SegResourceQuotaMemoryMB == (1<<11)) &&
						 !(queue->SegResourceQuotaMemoryMB == (1<<12)) &&
						 !(queue->SegResourceQuotaMemoryMB == (1<<13)) &&
						 !(queue->SegResourceQuotaMemoryMB == (1<<14)) )
					{
						res = RESQUEMGR_WRONG_RES_QUOTA_EXP;
						snprintf(errorbuf, errorbufsize,
								 "%s value %s is not valid, only 128mb, 256mb, "
								 "512mb, 1gb, 2gb, 4gb, 8gb, 16gb are "
								 "valid.",
								 loadcatalog ?
									 RSQTBLAttrNames[RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA] :
									 RSQDDLAttrNames[RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA],
								 attrvalue->Str);
					}
				}
			}
			else
			{
				res = RESQUEMGR_WRONG_RES_QUOTA_EXP;
				snprintf(errorbuf, errorbufsize,
						 "%s format %s is not valid.",
						 loadcatalog ?
							 RSQTBLAttrNames[RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA] :
							 RSQDDLAttrNames[RSQ_DDL_ATTR_VSEG_RESOURCE_QUOTA],
						 attrvalue->Str);
			}
			break;

		case RSQ_TBL_ATTR_RESOURCE_OVERCOMMIT_FACTOR:
			res = SimpleStringToDouble(attrvalue, &(queue->ResourceOvercommit));
			break;

		case RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT:
			res = SimpleStringToInt32(attrvalue, &(queue->NVSegUpperLimit));
			break;

		case RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT:
			res = SimpleStringToInt32(attrvalue, &(queue->NVSegLowerLimit));
			break;

		case RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG:
			res = SimpleStringToDouble(attrvalue, &(queue->NVSegUpperLimitPerSeg));
			break;

		case RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT_PERSEG:
			res = SimpleStringToDouble(attrvalue, &(queue->NVSegLowerLimitPerSeg));
			break;

		case RSQ_TBL_ATTR_ALLOCATION_POLICY:
			res = SimpleStringToMapIndexInt8(attrvalue,
											 (char *)RSQDDLValueAllocationPolicy,
											 RSQ_ALLOCATION_POLICY_COUNT,
											 sizeof(RSQDDLValueAllocationPolicy[0]),
											 &(queue->AllocatePolicy));
			break;

		case RSQ_TBL_ATTR_NAME:
			queue->NameLen = attrvalue->Len;
			strncpy(queue->Name, attrvalue->Str, sizeof(queue->Name)-1);

			if ( SimpleStringComp(attrvalue,
								  RESOURCE_QUEUE_DEFAULT_QUEUE_NAME) == 0 )
			{
				queue->Status |= RESOURCE_QUEUE_STATUS_IS_DEFAULT;
			}
			else if ( SimpleStringComp(attrvalue,
									   RESOURCE_QUEUE_ROOT_QUEUE_NAME) == 0 )
			{
				queue->Status |= RESOURCE_QUEUE_STATUS_IS_ROOT;
			}
			break;

		case RSQ_TBL_ATTR_STATUS:
			/*
			 * 'branch' indicates one branch queue in-use.
			 * 'invalidate' indicates the queue is invalid, and not in-use.
			 */
			if ( SimpleStringFind(attrvalue, "branch") == FUNC_RETURN_OK )
			{
				queue->Status |= RESOURCE_QUEUE_STATUS_VALID_BRANCH;
			}
			if ( SimpleStringFind(attrvalue, "invalid") == FUNC_RETURN_OK )
			{
				queue->Status |= RESOURCE_QUEUE_STATUS_VALID_INVALID;
			}

			if ( !RESQUEUE_IS_BRANCH(queue) )
			{
				queue->Status |= RESOURCE_QUEUE_STATUS_VALID_LEAF;
			}

			if ( (queue->Status & RESOURCE_QUEUE_STATUS_VALID_INVALID) == 0 )
			{
				queue->Status |= RESOURCE_QUEUE_STATUS_VALID_INUSE;
			}
			break;

		case RSQ_TBL_ATTR_CREATION_TIME:
		case RSQ_TBL_ATTR_UPDATE_TIME:
			break;
		default:
			/* Should not occur. Invalid attribute name has been checked. */
			Assert(false);
		}

		if ( res != FUNC_RETURN_OK )
		{
			res = RESQUEMGR_WRONG_ATTR;
			if ( errorbuf[0] == '\0' )
			{
				snprintf(errorbuf, errorbufsize,
						 "wrong resource queue attribute setting %s=%s",
						 loadcatalog ?
							 RSQTBLAttrNames[attrindex] :
							 RSQDDLAttrNames[attrindex],
						 attrvalue->Str);
			}
			elog(WARNING, "Resource manager failed to parse resource queue "
						  "attribute, %s",
						  errorbuf);
			return res;
		}
	}

	if ( checkformatonly )
	{
		return res;
	}

	/*
	 * Memory and Core resource must be specified and they must use the same way
	 * to express the resource.
	 */
	if ( !memlimit_value && !memlimit_percentage )
	{
		res = RESQUEMGR_LACK_ATTR;
		snprintf(errorbuf, errorbufsize,
				 "%s must be specified",
				 loadcatalog?
					 RSQTBLAttrNames[RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER] :
					 RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER]);
		elog(WARNING, "%s", errorbuf);
		return res;
	}

	if ( !corelimit_value && !corelimit_percentage )
	{
		res = RESQUEMGR_LACK_ATTR;
		snprintf(errorbuf, errorbufsize,
				 "%s must be specified",
				 loadcatalog ?
					 RSQTBLAttrNames[RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER] :
					 RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
		elog(WARNING, "%s", errorbuf);
		return res;
	}

	if ( (memlimit_value 		&& corelimit_percentage	) ||
		 (memlimit_percentage 	&& corelimit_value		)	)
	{
		res = RESQUEMGR_INCONSISTENT_RESOURCE_EXP;
		snprintf(errorbuf, errorbufsize,
				 "%s and %s must use the same way to express resource limit",
				 loadcatalog ?
					 RSQTBLAttrNames[RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER] :
					 RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
				 loadcatalog ?
					 RSQTBLAttrNames[RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER] :
					 RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER]);
		elog(WARNING, "%s", errorbuf);
		return res;
	}

	if ( memlimit_percentage && corelimit_percentage )
	{
		queue->Status |= RESOURCE_QUEUE_STATUS_EXPRESS_PERCENT;
	}
	else
	{
		Assert(memlimit_value && corelimit_value);
	}
	return res;
}

/*
 * This function parses the attributes and translate into an exist DynResourceQueue
 * struct's attributes. This functions does not generate logs higher than
 * WARNING, the concrete error is also saved in error buffer to make the caller
 * able to pass back the message to remote process.
 *
 */
int updateResourceQueueAttributesInShadow(List 			 		*attributes,
								  	  	  DynResourceQueueTrack	 queue,
										  char					*errorbuf,
										  int					 errorbufsize)
{
	int			  			res				  = FUNC_RETURN_OK;
	ListCell 			   *cell 			  = NULL;
	int 		  			attrindex		  = -1;
	int			  			percentage_change = 0;
	DynResourceQueue		shadowqueinfo	  = NULL;

	SimpStringPtr			attrname		  = NULL;
	SimpStringPtr			attrvalue		  = NULL;

	Assert(queue != NULL);
	shadowqueinfo = queue->ShadowQueueTrack->QueueInfo;

	/* Go through each property content. */
	foreach(cell, attributes)
	{
		KVProperty value = lfirst(cell);

		attrname  = &(value->Key);
		attrvalue = &(value->Val);

		attrindex = getRSQTBLAttributeNameIndex(attrname);

		/*
		 * Actually parse each attribute.
		 */
		switch(attrindex)
		{
		case RSQ_TBL_ATTR_PARENT:
			res = RESQUEMGR_WRONG_ATTRNAME;
			snprintf(errorbuf, errorbufsize,
					 "cannot alter resource queue parent name");
			elog(WARNING, "Cannot update resource queue %s attribute, %s",
						  queue->QueueInfo->Name,
						  errorbuf);
			return res;
		case RSQ_TBL_ATTR_NAME:
			break;

		case RSQ_TBL_ATTR_ACTIVE_STATMENTS:
			res = SimpleStringToInt32(attrvalue, &(shadowqueinfo->ParallelCount));
			if ( res == FUNC_RETURN_OK )
			{
				elog(RMLOG, "Resource manager updated %s %d in shadow "
							"of resource queue %s.",
							RSQTBLAttrNames[RSQ_TBL_ATTR_ACTIVE_STATMENTS],
							shadowqueinfo->ParallelCount,
							queue->QueueInfo->Name);
			}
			break;

		case RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER:
			if ( SimpleStringIsPercentage(attrvalue) )
			{
				percentage_change += 1;
				int8_t inputval = 0;
				res = SimpleStringToPercentage(attrvalue, &inputval);
				shadowqueinfo->ClusterMemoryPer = inputval;

				if ( res == FUNC_RETURN_OK )
				{
					elog(RMLOG, "Resource manager updated %s %lf.0%% in shadow "
								"of resource queue %s.",
								RSQTBLAttrNames[RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER],
								shadowqueinfo->ClusterMemoryPer,
								queue->QueueInfo->Name);
				}

				shadowqueinfo->Status |= RESOURCE_QUEUE_STATUS_EXPRESS_PERCENT;
			}
			else
			{
				Assert(false);
			}
			break;

		case RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER:
			if ( SimpleStringIsPercentage(attrvalue) )
			{
				percentage_change += 1;
				int8_t inputval = 0;
				res = SimpleStringToPercentage(attrvalue, &inputval);
				shadowqueinfo->ClusterVCorePer = inputval;

				if ( res == FUNC_RETURN_OK )
				{
					elog(RMLOG, "Resource manager updated %s %lf.0%% in shadow "
								"of resource queue %s.",
								RSQTBLAttrNames[RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER],
								shadowqueinfo->ClusterVCorePer,
								queue->QueueInfo->Name);
				}

				shadowqueinfo->Status |= RESOURCE_QUEUE_STATUS_EXPRESS_PERCENT;
			}
			else
			{
				Assert(false);
			}
			break;

		case RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA:
			/* Decide it is a memory quota or core quota. */
			if ( SimpleStringStartWith(
					 attrvalue,
					 RESOURCE_QUEUE_SEG_RES_QUOTA_MEM) == FUNC_RETURN_OK )
			{
				SimpString valuestr;
				setSimpleStringRef(
					&valuestr,
					attrvalue->Str+sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)-1,
					attrvalue->Len-sizeof(RESOURCE_QUEUE_SEG_RES_QUOTA_MEM)+1);

				res = SimpleStringToStorageSizeMB(
						&valuestr,
						&(shadowqueinfo->SegResourceQuotaMemoryMB));
				if ( res == FUNC_RETURN_OK )
				{
					shadowqueinfo->SegResourceQuotaVCore = -1;
					elog(RMLOG, "Resource manager updated %s memory quota %d MB "
								"in shadow of resource queue %s.",
								RSQTBLAttrNames[RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA],
								shadowqueinfo->SegResourceQuotaMemoryMB,
								queue->QueueInfo->Name);
				}
			}
			break;

		case RSQ_TBL_ATTR_RESOURCE_OVERCOMMIT_FACTOR:
			res = SimpleStringToDouble(attrvalue,
									   &(shadowqueinfo->ResourceOvercommit));
			if ( res == FUNC_RETURN_OK )
			{
				elog(RMLOG, "Resource manager updated %s %lf in shadow of "
							"resource queue %s.",
							RSQTBLAttrNames[RSQ_TBL_ATTR_RESOURCE_OVERCOMMIT_FACTOR],
							shadowqueinfo->ResourceOvercommit,
							queue->QueueInfo->Name);
			}
			break;

		case RSQ_TBL_ATTR_ALLOCATION_POLICY:
			res = SimpleStringToMapIndexInt8(attrvalue,
											 (char *)RSQDDLValueAllocationPolicy,
											 RSQ_ALLOCATION_POLICY_COUNT,
											 sizeof(RSQDDLValueAllocationPolicy[0]),
											 &(shadowqueinfo->AllocatePolicy));
			if ( res == FUNC_RETURN_OK )
			{
				elog(RMLOG, "Resource manager updated %s %s index %d in shadow of "
							"resource queue %s.",
							RSQTBLAttrNames[RSQ_TBL_ATTR_ALLOCATION_POLICY],
							RSQDDLValueAllocationPolicy[shadowqueinfo->AllocatePolicy],
							shadowqueinfo->AllocatePolicy,
							queue->QueueInfo->Name);
			}
			break;

		case RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT:
			res = SimpleStringToInt32(attrvalue,
									  &(shadowqueinfo->NVSegUpperLimit));
			if ( res == FUNC_RETURN_OK )
			{
				elog(RMLOG, "Resource manager updated %s %d in shadow of "
							"resource queue %s.",
							RSQTBLAttrNames[RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT],
							shadowqueinfo->NVSegUpperLimit,
							queue->QueueInfo->Name);
			}
			break;
		case RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT:
			res = SimpleStringToInt32(attrvalue,
									  &(shadowqueinfo->NVSegLowerLimit));
			if ( res == FUNC_RETURN_OK )
			{
				elog(RMLOG, "Resource manager updated %s %d in shadow of "
							"resource queue %s.",
							RSQTBLAttrNames[RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT],
							shadowqueinfo->NVSegLowerLimit,
							queue->QueueInfo->Name);
			}
			break;
		case RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG:
			res = SimpleStringToDouble(attrvalue,
									   &(shadowqueinfo->NVSegUpperLimitPerSeg));
			if ( res == FUNC_RETURN_OK )
			{
				elog(RMLOG, "Resource manager updated %s %lf in shadow of "
							"resource queue %s.",
							RSQTBLAttrNames[RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
							shadowqueinfo->NVSegUpperLimitPerSeg,
							queue->QueueInfo->Name);
			}
			break;
		case RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT_PERSEG:
			res = SimpleStringToDouble(attrvalue,
									   &(shadowqueinfo->NVSegLowerLimitPerSeg));
			if ( res == FUNC_RETURN_OK )
			{
				elog(RMLOG, "Resource manager updated %s %lf in shadow of "
							"resource queue %s.",
							RSQTBLAttrNames[RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
							shadowqueinfo->NVSegLowerLimitPerSeg,
							queue->QueueInfo->Name);
			}
			break;
		default:
			/* Should not occur. Invalid attribute name has been checked. */
			Assert(false);
		}

		if ( res != FUNC_RETURN_OK )
		{
			res = RESQUEMGR_WRONG_ATTR;
			snprintf(errorbuf, errorbufsize,
					 "failed to recognize attribute setting %s=%s",
					 attrname->Str,
					 attrvalue->Str);
			elog(WARNING, "Resource manager failed to update resource queue "
						  "attribute in shadow of resource queue %s, %s",
						  queue->QueueInfo->Name,
						  errorbuf);
			return res;
		}
	}

	return res;
}

#define ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue,errorbuf)				   \
		elog(WARNING, "Resource manager cannot complete resource queue %s, %s",\
					  (queue)->Name,										   \
					  (errorbuf));
/*
 * This is one API for checking if new resource queue definition is valid to be
 * created.This functions does not generate logs higher than WARNING, the error
 * is also saved in error buffer to make the caller able to pass the message to
 * remote process.
 */
int checkAndCompleteNewResourceQueueAttributes(DynResourceQueue  queue,
											   char				*errorbuf,
											   int				 errorbufsize)
{
	DynResourceQueueTrack	parenttrack	 = NULL;
	int 					res 		 = FUNC_RETURN_OK;

	Assert( queue != NULL );

	if ( queue->Status & RESOURCE_QUEUE_STATUS_IS_VER1X )
	{
		/* TODO: Validate Version 1.x resource queue definition here. */
		return res;
	}

	/*** Validate HAWQ 2.0 resource queue definition ***/

	/*
	 * STEP 1. Validate parent queue attribute.
	 */
	if ( !RESQUEUE_IS_ROOT(queue) && queue->ParentOID == InvalidOid )
	{
		res = RESQUEMGR_LACK_ATTR;
		snprintf(errorbuf, errorbufsize,
				 "attribute %s must be specified",
				 RSQDDLAttrNames[RSQ_DDL_ATTR_PARENT]);
		ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue, errorbuf)
		return res;
	}

	if ( !RESQUEUE_IS_ROOT(queue) && queue->ParentOID != InvalidOid )
	{
		parenttrack = getQueueTrackByQueueOID(queue->ParentOID);
		Assert(parenttrack != NULL);

		/* pg_default cannot be a parent queue. */
		if ( RESQUEUE_IS_DEFAULT(parenttrack->QueueInfo) )
		{
			res = RESQUEMGR_WRONG_ATTR;
			snprintf(errorbuf, errorbufsize,
					 "%s cannot have children resource queues",
					 RESOURCE_QUEUE_DEFAULT_QUEUE_NAME);
			ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue, errorbuf)
			return res;
		}

		/* The parent queue cannot have roles assigned. */
		if ( hasUserAssignedToQueue(parenttrack->QueueInfo) )
		{
			res = RESQUEMGR_WRONG_ATTR;
			snprintf(errorbuf, errorbufsize,
					 "%s has at least one role assigned",
					 parenttrack->QueueInfo->Name);
			ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue, errorbuf)
			return res;
		}
	}

	/*
	 * Parent queue must exist. Basically, default queue is already created as
	 * the root of whole resource queue tree. This checking is for self-test.
	 */
	if ( RESQUEUE_IS_ROOT(queue) )
	{
		Assert(queue->ParentOID == InvalidOid);
		parenttrack = NULL;
	}

	/*
	 * STEP 2. Validate active_statements attributes. For leaf queue only.
	 */

	if ( queue->ParallelCount <= 0 )
	{
		queue->ParallelCount = DEFAULT_RESQUEUE_ACTIVESTATS_N;
	}

	/*
     * STEP 3. Validate resource limit attributes.
	 */

	/*======================================*/
	/* STEP 3 CASE1: percentage expression. */
	/*======================================*/
	if ( RESQUEUE_IS_PERCENT(queue) )
	{
		/*
		 * The values of MEMORY_LIMIT_CLUSER, CORE_LIMIT_CLUSTER must be
		 * identical.
		 */
		if ( queue->ClusterVCorePer != queue->ClusterMemoryPer )
		{
			res = RESQUEMGR_WRONG_ATTR;
			snprintf(errorbuf, errorbufsize,
					 "the value of %s must be identical with the value of %s, "
					 "wrong value of %s = %.0lf%%, "
					 "wrong value of %s = %.0lf%% ",
					 RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
					 RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
					 RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
					 queue->ClusterMemoryPer,
					 RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
					 queue->ClusterVCorePer);
			ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue, errorbuf)
			return res;
		}

		/*
		 * check siblings' resource limit
		 */
		if (queue->ParentOID != InvalidOid)
		{
			double current = 0.0;
			parenttrack = getQueueTrackByQueueOID(queue->ParentOID);
			if (parenttrack != NULL)
			{
				ListCell *cell = NULL;
				foreach(cell, parenttrack->ChildrenTracks)
				{
					DynResourceQueueTrack track = (DynResourceQueueTrack)lfirst(cell);
					if (strncmp(track->QueueInfo->Name, queue->Name, queue->NameLen) != 0)
					{
						current += track->QueueInfo->ClusterMemoryPer;
					}
				}

				if ((current + queue->ClusterMemoryPer) > 100)
				{
					res = RESQUEMGR_WRONG_ATTR;
					snprintf(errorbuf, errorbufsize,
							 "the value of %s and %s exceeds parent queue's limit, "
							 "wrong value = %.0lf%%",
							 RSQDDLAttrNames[RSQ_DDL_ATTR_CORE_LIMIT_CLUSTER],
							 RSQDDLAttrNames[RSQ_DDL_ATTR_MEMORY_LIMIT_CLUSTER],
							 queue->ClusterMemoryPer);
					ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue, errorbuf)
					return res;
				}
			}
		}
	}
	/*============================================================*/
	/* STEP 3 CASE2: value expression. Temporarily not supported. */
	/*============================================================*/
	else {
		Assert(false);
	}

	/*
	 * STEP 4: Check resource quota.
	 */
	if ( queue->SegResourceQuotaMemoryMB == -1 &&
		 queue->SegResourceQuotaVCore    == -1.0 )
	{
		queue->SegResourceQuotaMemoryMB = DEFAULT_RESQUEUE_VSEGRESOURCEQUOTA_N;
	}

	Assert( queue->SegResourceQuotaMemoryMB != -1 );

	/*
	 * STEP 5: Check policy and set default value.
	 */
	if ( queue->AllocatePolicy == -1 )
	{
		queue->AllocatePolicy = RSQ_ALLOCATION_POLICY_EVEN;
	}

	/*
	 * STEP 6. Check number of vseg limit.
	 */

	if ( queue->NVSegUpperLimit > 0 &&
		 queue->NVSegLowerLimit > 0 &&
		 queue->NVSegUpperLimit < queue->NVSegLowerLimit )
	{
		res = RESQUEMGR_WRONG_ATTR;
		snprintf(errorbuf, errorbufsize,
				 "%s is less than %s",
				 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT],
				 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT]);
		ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue, errorbuf)
		return res;
	}

	/*
	 * STEP 7. Check number of vseg limit per segment.
	 */
	if ( queue->NVSegUpperLimitPerSeg < MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N )
	{
		res = RESQUEMGR_WRONG_ATTR;
		snprintf(errorbuf, errorbufsize,
				 "%s is less than %lf, wrong value %lf",
				 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
				 MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N,
				 queue->NVSegUpperLimitPerSeg);
		ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue, errorbuf)
		return res;
	}

	if ( queue->NVSegLowerLimitPerSeg < MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N )
	{
		res = RESQUEMGR_WRONG_ATTR;
		snprintf(errorbuf, errorbufsize,
				 "%s is less than %lf, wrong value %lf",
				 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
				 MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N,
				 queue->NVSegLowerLimitPerSeg);
		ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue, errorbuf)
		return res;
	}

	if ( queue->NVSegUpperLimitPerSeg > 0 &&
		 queue->NVSegLowerLimitPerSeg > 0 &&
		 queue->NVSegUpperLimitPerSeg < queue->NVSegLowerLimitPerSeg )
	{
		res = RESQUEMGR_WRONG_ATTR;
		snprintf(errorbuf, errorbufsize,
				 "%s is less than %s",
				 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_UPPER_LIMIT_PERSEG],
				 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG]);
		ELOG_WARNING_ERRORMESSAGE_COMPLETEQUEUE(queue, errorbuf)
		return res;
	}
	return res;
}

#define ELOG_WARNING_ERRORMESSAGE_CREATEQUEUETRACK(queue,errorbuf)			   \
		elog(WARNING, "Resource manager cannot create resource queue track "   \
					  "instance for queue %s, %s",  						   \
					  (queue)->Name,										   \
					  (errorbuf));
/**
 * Create queue definition and tracker in the resource queue manager.
 *
 * queue[in]			The resource queue definition instance.
 * track[out]			The corresponding resource queue tracker instance. This
 * 						is a new created instance.
 * errorbuf[out]		The error message if something is wrong.
 * errorbufsize[out]	The limit of error message buffer.
 */
int createQueueAndTrack( DynResourceQueue		queue,
						 DynResourceQueueTrack *track,
						 char				   *errorbuf,
						 int					errorbufsize)
{
	int 				  	res 			 = FUNC_RETURN_OK;
	DynResourceQueueTrack	parenttrack		 = NULL;
    DynResourceQueueTrack 	newqueuetrack 	 = NULL;
    DynResourceQueueTrack 	tmpquetrack		 = NULL;
    bool					isDefaultQueue   = false;
    bool					isRootQueue		 = false;

    /* Input validation. */
    Assert(track != NULL);
    Assert(queue != NULL);

	/*
	 * Create new queue tracking instance. If there is something wrong, this
	 * instance will be freed.
	 */
	newqueuetrack = createDynResourceQueueTrack(queue);

    /*
     * Check queue oid ( for loading existing queues only ). In case loading
     * existing queues from file or catalog, the oid should be explicitly
     * specified. In case the queue is to be created, no need to check this.
     */
    if ( queue->OID > InvalidOid )
    {
    	tmpquetrack = getQueueTrackByQueueOID(queue->OID);
		Assert(tmpquetrack == NULL);
    }

    /* New queue name must be set and unique. */
    Assert(queue->NameLen > 0);

    tmpquetrack = getQueueTrackByQueueName((char *)(queue->Name), queue->NameLen);
	if (tmpquetrack != NULL) {
		res = RESQUEMGR_DUPLICATE_QUENAME;
		snprintf(errorbuf, errorbufsize,
				 "duplicate queue name %s for creating resource queue.",
			     queue->Name);
		ELOG_WARNING_ERRORMESSAGE_CREATEQUEUETRACK(queue, errorbuf)
		goto exit;
	}

	/* Decide if the queue is special one: pg_root or pg_default */
	isDefaultQueue = RESQUEUE_IS_DEFAULT(queue);
	isRootQueue	   = RESQUEUE_IS_ROOT(queue);

	elog(DEBUG3, "HAWQ RM :: To create resource queue instance %s", queue->Name);

	/*
	 * Check the queue parent-child relationship. No matter the queue is to be
	 * created or to be loaded, the parent queue must have been loaded. The only
	 * queue has no parent is 'pg_root' say isRootQueue. The queue 'pg_default'
	 * must has 'pg_root' as the parent queue.
	 */
	if ( !isRootQueue )
	{
		/* Check if the parent queue id exists. */
		parenttrack = getQueueTrackByQueueOID(queue->ParentOID);
		Assert(parenttrack != NULL);

		/* The parent queue can not have connections. */
		if ( parenttrack->CurConnCounter > 0 )
		{
			res = RESQUEMGR_IN_USE;
			snprintf( errorbuf, errorbufsize,
					  "the parent queue %s has active connections",
					  parenttrack->QueueInfo->Name);
			ELOG_WARNING_ERRORMESSAGE_CREATEQUEUETRACK(queue, errorbuf)
			goto exit;
		}

		/*
		 * If the parent track changes the role from LEAF to BRANCH, its memory
		 * core ratio related information should also be updated.
		 */
		if ( RESQUEUE_IS_LEAF(parenttrack->QueueInfo) &&
			 parenttrack->trackedMemCoreRatio )
		{
			/* Remove parent track from memory core ratio track */
			removeResourceQueueRatio(parenttrack);

			/* Change parent track to branch queue. */
			parenttrack->QueueInfo->Status &= NOT_RESOURCE_QUEUE_STATUS_VALID_LEAF;
			parenttrack->QueueInfo->Status |= RESOURCE_QUEUE_STATUS_VALID_BRANCH;
		}
	}

	/* Set parent resource queue track reference. */
	newqueuetrack->ParentTrack = parenttrack;

	/* Build resource queue tree structure. Save 'pg_root' and 'pg_default' */
	if ( isRootQueue )
	{
		PQUEMGR->RootTrack = newqueuetrack;
	}
	else
	{
		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
		parenttrack->ChildrenTracks = lappend(parenttrack->ChildrenTracks,
											  newqueuetrack);
		MEMORY_CONTEXT_SWITCH_BACK
	}

	if ( isDefaultQueue )
	{
		PQUEMGR->DefaultTrack = newqueuetrack;
	}

	/* Save the queue track into the list and build index.*/
	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	PQUEMGR->Queues = lappend(PQUEMGR->Queues, newqueuetrack);
	MEMORY_CONTEXT_SWITCH_BACK
	if( newqueuetrack->QueueInfo->OID != InvalidOid )
	{
		setQueueTrackIndexedByQueueOID(newqueuetrack);
	}
	setQueueTrackIndexedByQueueName(newqueuetrack);

	/* Update overall ratio index. */
	Assert(RESQUEUE_IS_PERCENT(newqueuetrack->QueueInfo));

	/* Set return value. */
	*track = newqueuetrack;

exit:
	if ( res != FUNC_RETURN_OK )
	{
		/* Free resource queue track instance. */
		shallowFreeResourceQueueTrack(newqueuetrack);
		*track = NULL;
	}
	return res;
}

int dropQueueAndTrack( DynResourceQueueTrack track,
					   char				     *errorbuf,
					   int					  errorbufsize)
{
	int 				  	res 			 = FUNC_RETURN_OK;
	DynResourceQueueTrack	parenttrack		 = NULL;
	ListCell			   *cell			 = NULL;
	ListCell			   *prevcell		 = NULL;

	/* remove track from parent queue's children list */
	parenttrack = track->ParentTrack;
	foreach(cell, parenttrack->ChildrenTracks)
	{
		DynResourceQueueTrack todeltrack = (DynResourceQueueTrack)lfirst(cell);
		if ( todeltrack == track )
		{
			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
			parenttrack->ChildrenTracks = list_delete_cell(parenttrack->ChildrenTracks,
														   cell,
														   prevcell);
			MEMORY_CONTEXT_SWITCH_BACK
			break;
		}
		prevcell = cell;
	}

	/*
	 * If the resource queue has been assigned by a memory/core ratio, the
	 * corresponding reference in memory/core ratio tracker should also be
	 * updated.
	 */
	if ( track->trackedMemCoreRatio ) {
		removeResourceQueueRatio(track);
	}

	/* Directly remove the instances. */
	removeQueueTrackIndexedByQueueName(track);
	removeQueueTrackIndexedByQueueOID(track);

	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	cell 	 = NULL;
	prevcell = NULL;
	foreach(cell, PQUEMGR->Queues)
	{
		DynResourceQueueTrack todeltrack = lfirst(cell);
		if ( todeltrack == track )
		{
			PQUEMGR->Queues = list_delete_cell(PQUEMGR->Queues, cell, prevcell);
			break;
		}
		prevcell = cell;
	}
	MEMORY_CONTEXT_SWITCH_BACK

	rm_pfree(PCONTEXT, track->QueueInfo);
	shallowFreeResourceQueueTrack(track);
	return res;
}


DynResourceQueueTrack getQueueTrackByQueueOID (int64_t queoid)
{
	PAIR pair = NULL;
	SimpArray key;
	setSimpleArrayRef(&key, (char *)&queoid, sizeof(int64_t));
	pair = getHASHTABLENode(&(PQUEMGR->QueuesIDIndex), (void *)&key);
	if ( pair == NULL )
	{
		return NULL;
	}
	Assert(pair->Value != NULL);
	return (DynResourceQueueTrack)(pair->Value);
}

DynResourceQueueTrack getQueueTrackByQueueName(char 	*quename,
											   int 		 quenamelen)
{
	SimpString quenamestr;
	setSimpleStringRef(&quenamestr, quename, quenamelen);
	PAIR pair = getHASHTABLENode(&(PQUEMGR->QueuesNameIndex),
								 (void *)(&quenamestr));
	if ( pair == NULL ) {
		return NULL;
	}
	Assert(pair->Value != NULL);
	return (DynResourceQueueTrack)(pair->Value);
}

void setQueueTrackIndexedByQueueOID(DynResourceQueueTrack queuetrack)
{
	SimpArray key;
	setSimpleArrayRef(&key,
					  (void *)&(queuetrack->QueueInfo->OID),
					  sizeof(int64_t));
	setHASHTABLENode(&(PQUEMGR->QueuesIDIndex), &key, queuetrack, false);
}

void removeQueueTrackIndexedByQueueOID(DynResourceQueueTrack queuetrack)
{
	SimpArray key;
	setSimpleArrayRef(&key,
					(void *)&(queuetrack->QueueInfo->OID),
					sizeof(int64_t));
	removeHASHTABLENode(&(PQUEMGR->QueuesIDIndex),&key);
}

void setQueueTrackIndexedByQueueName(DynResourceQueueTrack queuetrack)
{
	SimpString quenamestr;
	setSimpleStringRef(&quenamestr,
					   queuetrack->QueueInfo->Name,
					   queuetrack->QueueInfo->NameLen);
	setHASHTABLENode(&(PQUEMGR->QueuesNameIndex),
					 (void *)(&quenamestr),
					 queuetrack,
					 false);
}

void removeQueueTrackIndexedByQueueName(DynResourceQueueTrack queuetrack)
{
	SimpString quenamestr;
	setSimpleStringRef(&quenamestr,
					   queuetrack->QueueInfo->Name,
					   queuetrack->QueueInfo->NameLen);
	removeHASHTABLENode(&(PQUEMGR->QueuesNameIndex), (void *) (&quenamestr));
}

const char *getRSQTBLAttributeName(int attrindex)
{
	Assert( attrindex >= 0 && attrindex < RSQ_TBL_ATTR_COUNT );
	return RSQTBLAttrNames[attrindex];
}

const char *getRSQDDLAttributeName(int colindex)
{
	Assert( colindex >= 0 && colindex < RSQ_DDL_ATTR_COUNT );
	return RSQDDLAttrNames[colindex];
}

/**
 * Get memory/core ratio index. Return -1 if no such ratio tracked.
 */
int32_t getResourceQueueRatioIndex(uint32_t ratio)
{
	if ( ratio <= 0 )
		return -1;
	PAIR ratiopair = getHASHTABLENode(&(PQUEMGR->RatioIndex),
									  TYPCONVERT(void *, ratio));
	return ratiopair == NULL ? -1 : TYPCONVERT(int32_t, ratiopair->Value);
}

DynMemoryCoreRatioTrack createDynMemoryCoreRatioTrack(uint32_t ratio,
													  int32_t index )
{
	DynMemoryCoreRatioTrack res = rm_palloc(PCONTEXT,
				  	  	  	  	  	  	    sizeof(DynMemoryCoreRatioTrackData));
	res->MemCoreRatio 				= ratio;
	res->RatioIndex   				= -1;
	res->ClusterMemory				= 0;
	res->ClusterVCore				= 0.0;
	res->ClusterMemoryMaxMB 		= 0;
	res->ClusterVCoreMax   			= 0.0;
	res->TotalPendingStartTime 		= 0;
	res->QueueTrackers				= NULL;
	res->ClusterWeightMarker 		= 0;
	res->QueueIndexForLeftResource 	= 0;

	resetResourceBundleData(&(res->TotalPending)  , 0, 0.0, ratio);
	resetResourceBundleData(&(res->TotalAllocated), 0, 0.0, ratio);
	resetResourceBundleData(&(res->TotalRequest)  , 0, 0.0, ratio);
	resetResourceBundleData(&(res->TotalUsed)     , 0, 0.0, ratio);

	return res;
}

void freeMemoryCoreTrack(DynMemoryCoreRatioTrack mctrack)
{
	Assert(list_length(mctrack->QueueTrackers) == 0);
	rm_pfree(PCONTEXT, mctrack);
}

int removeQueueTrackFromMemoryCoreTrack(DynMemoryCoreRatioTrack mctrack,
											DynResourceQueueTrack   track)
{
	ListCell *cell 	   = NULL;
	ListCell *prevcell = NULL;
	foreach(cell, mctrack->QueueTrackers)
	{
		DynResourceQueueTrack trackiter = (DynResourceQueueTrack)lfirst(cell);
		if ( trackiter == track )
		{
			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
			mctrack->QueueTrackers = list_delete_cell(mctrack->QueueTrackers,
													  cell,
													  prevcell);
			MEMORY_CONTEXT_SWITCH_BACK
			return FUNC_RETURN_OK;
		}
		prevcell = cell;
	}
	return RESQUEMGR_NO_QUE_IN_RATIO;
}
/**
 * Add one queue track of memory/core ratio into resource queue manager.
 */
int32_t addResourceQueueRatio(DynResourceQueueTrack track)
{
	if ( track->MemCoreRatio <= 0 )
		return -1;

	uint32_t ratio = track->MemCoreRatio;
	int32_t res = getResourceQueueRatioIndex(ratio);
	if ( res >= 0 ) {
		PQUEMGR->RatioReferenceCounter[res]++;
	} else {
		res = PQUEMGR->RatioCount;
		PQUEMGR->RatioReverseIndex[PQUEMGR->RatioCount] = ratio;
		setHASHTABLENode(&(PQUEMGR->RatioIndex),
						 TYPCONVERT(void *, ratio),
						 TYPCONVERT(void *, PQUEMGR->RatioCount),
						 false);
		PQUEMGR->RatioCount++;
		PQUEMGR->RatioReferenceCounter[res] = 1;
		PQUEMGR->RatioTrackers[res] = createDynMemoryCoreRatioTrack(ratio, res);
		elog(RMLOG, "Added new memory/core ratio %u, assigned index %d.",
					ratio, res);

		/* Add all resource info into the alloc/avail resource ordered indices */
		BBST newindex = NULL; /* variable for calling the following function. */
		addOrderedResourceAllocTreeIndexByRatio(ratio, &newindex);
		addOrderedResourceAvailTreeIndexByRatio(ratio, &newindex);
	}

	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	PQUEMGR->RatioTrackers[res]->QueueTrackers =
		lappend(PQUEMGR->RatioTrackers[res]->QueueTrackers, track);
	MEMORY_CONTEXT_SWITCH_BACK

	track->trackedMemCoreRatio = true;
	return res;
}

void removeResourceQueueRatio(DynResourceQueueTrack track)
{
	int res = FUNC_RETURN_OK;

	/* Ignore invalid ratio number. */
	if ( track->MemCoreRatio <= 0 )
		return;

	uint32_t ratio      = track->MemCoreRatio;
	int32_t  ratioindex = getResourceQueueRatioIndex(ratio);

	/* Ignore unkonwn ratio number. */
	if ( ratioindex < 0 ) {
		elog( WARNING, "HAWQ RM :: Cannot track resource queue %s with memory "
					   "core ratio %d MB Per CORE.",
					   track->QueueInfo->Name,
					   track->MemCoreRatio);
		return;
	}

	/* Minus ratio counter, if the counter is set to 0, delete this ratio. */
	Assert(PQUEMGR->RatioReferenceCounter[ratioindex] > 0);
	PQUEMGR->RatioReferenceCounter[ratioindex]--;

	/* Remove the reference from memory/core ratio tracker */
	res = removeQueueTrackFromMemoryCoreTrack(PQUEMGR->RatioTrackers[ratioindex],
											  track);
	if ( res != FUNC_RETURN_OK ) {
		elog( WARNING, "HAWQ RM :: Cannot find resource queue %s with memory "
					   "core ratio %d MB Per CORE in memory core ratio tracker.",
					   track->QueueInfo->Name,
					   track->MemCoreRatio);
		return;
	}

	if (PQUEMGR->RatioReferenceCounter[ratioindex] == 0) {

		/* Free the memory/core tracker instance. */
		freeMemoryCoreTrack(PQUEMGR->RatioTrackers[ratioindex]);
		PQUEMGR->RatioTrackers[ratioindex] = NULL;

		/*
		 * The top one is to be deleted, this is easy, we just adjust the counter
		 * and leave the value dirty. Because, when new ratio is added to reuse
		 * the same slots, the value is reset to zero. Check addResourceQueueRatio()
		 */
		if ( ratioindex == PQUEMGR->RatioCount - 1 ) {
			removeHASHTABLENode(&(PQUEMGR->RatioIndex), TYPCONVERT(void *, ratio));
			PQUEMGR->RatioCount--;
		}
		/*
		 * If another one is to be deleted, the idea is to copy the counters of
		 * the top one to the slots to be deleted.
		 */
		else{
			int top = PQUEMGR->RatioCount - 1;

			/* Change the tracker array */
			PQUEMGR->RatioTrackers[ratioindex] = PQUEMGR->RatioTrackers[top];

			/* Change the hash table. */
			setHASHTABLENode(&(PQUEMGR->RatioIndex),
							 TYPCONVERT(void *, PQUEMGR->RatioReverseIndex[top]),
							 TYPCONVERT(void *, ratioindex),
							 false);

			removeHASHTABLENode(&(PQUEMGR->RatioIndex),
							    TYPCONVERT(void *, ratio));

			/* Change the reverse index. */
			PQUEMGR->RatioReverseIndex[ratioindex] = PQUEMGR->RatioReverseIndex[top];
			PQUEMGR->RatioCount--;
		}

		elog(RMLOG, "HAWQ RM :: Removed ratio %d MBPCORE", ratio);
	}

	track->trackedMemCoreRatio = false;
}

void generateQueueReport( int queid, char *buff, int buffsize )
{
	DynResourceQueue 	  que 		= NULL;
	DynResourceQueueTrack quetrack 	= getQueueTrackByQueueOID(queid);

	Assert( quetrack != NULL );
	que = quetrack->QueueInfo;

	if ( RESQUEUE_IS_PERCENT(que) )
	{
		sprintf(buff,
				"\n"
				"RESQUEUE:ID="INT64_FORMAT",Name=%s,"
				"PARENT="INT64_FORMAT","
				"LIMIT(MEM=%lf%%,CORE=%lf%%),"
				"RATIO=%d MBPCORE,"
				"INUSE(%d MB, %lf CORE),"
				"CONN=%d,"
				"INQUEUE=%d.\n",
				que->OID, que->Name,
				que->ParentOID,
				que->ClusterMemoryPer,
				que->ClusterVCorePer,
				quetrack->MemCoreRatio,
				quetrack->TotalUsed.MemoryMB,
				quetrack->TotalUsed.Core,
				quetrack->CurConnCounter,
				quetrack->QueryResRequests.NodeCount);
	}
	else {
		sprintf(buff,
				"\n"
				"RESQUEUE:ID="INT64_FORMAT",Name=%s,"
				"PARENT="INT64_FORMAT","
				"LIMIT(MEM=%d MB,CORE=%f CORE),"
				"RATIO=%d MBPCORE,"
				"INUSE(%d MB, %lf CORE),"
				"CONN=%d,"
				"INQUEUE=%d.\n",
				que->OID, que->Name,
				que->ParentOID,
				que->ClusterMemoryMB,
				que->ClusterVCore,
				quetrack->MemCoreRatio,
				quetrack->TotalUsed.MemoryMB,
				quetrack->TotalUsed.Core,
				quetrack->CurConnCounter,
				quetrack->QueryResRequests.NodeCount);
	}
}

void generateUserReport( const char   *userid,
						 int 		   useridlen,
						 char 		  *buff,
						 int 		   buffsize)
{
	UserInfo 	userinfo	= NULL;
	bool		exist		= false;

	userinfo = getUserByUserName(userid, useridlen, &exist);

	if ( !exist ) {
		sprintf(buff, "NULL USER.\n");
	}
	else {
		Assert(userinfo != NULL);
		sprintf(buff, "USER:ID=%s,QUEUEID=" INT64_FORMAT ",ISSUPERUSER=%s\n",
					  userinfo->Name,
					  userinfo->QueueOID,
					  userinfo->isSuperUser?"YES":"NO");
	}
}

/**
 * Register and check the parallel limitation.
 *
 * conntrack[in] 		: The result will be saved in this connection track.
 *
 * Return:
 * 	FUNC_RETURN_OK				: Everything is ok.
 * 	RESQUEMGR_NO_USERID			: Can not find the user.
 * 	RESQUEMGR_NO_ASSIGNEDQUEUE	: No assigned queue for current user.
 * 	RESQUEMGR_PARALLEL_FULL		: Can not accept more connection.
 * 	RESQUEMGR_INTERNAL_ERROR	: HAWQ RM internal error.
 *
 * The progress attribute is updated.
 *
 * NOTE: In order to facilitate test automation, currently all undefined users
 * 	 	 are assigned to 'default' queue.
 */
int registerConnectionByUserID(ConnectionTrack  conntrack,
							   char			   *errorbuf,
							   int				errorbufsize)
{
	int 					res 		  = FUNC_RETURN_OK;
	UserInfo 				userinfo 	  = NULL;
	DynResourceQueueTrack 	queuetrack 	  = NULL;
	bool					exist		  = false;

	Assert( conntrack != NULL );
	Assert( conntrack->Progress == CONN_PP_ESTABLISHED );

	/* Check if the user exists and save reference to corresponding user. */
	userinfo = getUserByUserName(conntrack->UserID,
							     strlen(conntrack->UserID),
							     &exist);
	if ( exist )
	{
		/* Mark the user is in use in some connections.*/
		userinfo->isInUse++;
		/* Get the queue, and check if the parallel limit is achieved. */
		queuetrack = getQueueTrackByQueueOID(userinfo->QueueOID);
	}
	else
	{
		snprintf(errorbuf, errorbufsize,
				 "role %s is not defined for registering connection",
				 conntrack->UserID);
		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
		res = RESQUEMGR_NO_USERID;
		goto exit;
	}

	Assert(queuetrack != NULL);

	queuetrack->CurConnCounter++;

	conntrack->User 		= (void *)userinfo;
	conntrack->QueueTrack 	= (void *)queuetrack;
	conntrack->RegisterTime = gettime_microsec();
	conntrack->LastActTime  = conntrack->RegisterTime;

	transformConnectionTrackProgress(conntrack, CONN_PP_REGISTER_DONE);

	elog(DEBUG3, "Resource queue %s has %d connections after registering "
				 "connection.",
				 queuetrack->QueueInfo->Name,
				 queuetrack->CurConnCounter);
exit:
	if ( res != FUNC_RETURN_OK )
	{
		conntrack->User 		= NULL;
		conntrack->QueueTrack	= NULL;
		transformConnectionTrackProgress(conntrack, CONN_PP_REGISTER_FAIL);
	}
	return res;
}


/**
 * Return one connection to resource queue.
 */
void returnConnectionToQueue(ConnectionTrack conntrack, bool istimeout)
{
	DynResourceQueueTrack track = (DynResourceQueueTrack)(conntrack->QueueTrack);
	if ( !istimeout )
	{
		transformConnectionTrackProgress(conntrack, CONN_PP_ESTABLISHED);
	}
	else
	{
		transformConnectionTrackProgress(conntrack, CONN_PP_TIMEOUT_FAIL);
	}

	elog(DEBUG3, "Resource queue %s has %d connections before returning "
				 "connection ConnID %d.",
				 track->QueueInfo->Name,
				 track->CurConnCounter,
				 conntrack->ConnID);
	track->CurConnCounter--;
	if ( track->CurConnCounter == 0 )
	{
		elog(RMLOG, "Resource queue %s becomes idle.", track->QueueInfo->Name);
		track->isBusy = false;
		refreshMemoryCoreRatioLimits();
		refreshMemoryCoreRatioWaterMark();
	}
}

/*
 * Cancel one queued resource allocation request.
 */
void cancelResourceAllocRequest(ConnectionTrack  conntrack,
								char 			*errorbuf,
								bool			 generror)
{
	Assert(conntrack->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT);

	DynResourceQueueTrack queuetrack = (DynResourceQueueTrack)(conntrack->QueueTrack);

	/* Remove from queueing list.  */
	DQUEUE_LOOP_BEGIN(&(queuetrack->QueryResRequests), iter, ConnectionTrack, track)
		if ( track == conntrack )
		{
			removeDQueueNode(&(queuetrack->QueryResRequests), iter);
			break;
		}
	DQUEUE_LOOP_END

	/* Unlock session in deadlock */
	unlockSessionResource(&(queuetrack->DLDetector), conntrack->SessionID);

	if (generror)
	{
		buildAcquireResourceErrorResponseAndSend(conntrack,
												 RESQUEMGR_NORESOURCE_TIMEOUT,
												 errorbuf);
	}
}

/* Acquire resource from queue. */
int acquireResourceFromResQueMgr(ConnectionTrack  conntrack,
								 char 			 *errorbuf,
								 int 			  errorbufsize)
{
	int						res			= FUNC_RETURN_OK;
	DynResourceQueueTrack	queuetrack	= conntrack->QueueTrack;

	elog(RMLOG, "ConnID %d. Expect query resource for session "INT64_FORMAT,
			    conntrack->ConnID,
			    conntrack->SessionID);

	/* Call quota logic to make decision of resource for current query. */
	res = computeQueryQuota(conntrack, errorbuf, errorbufsize);

	if ( res == FUNC_RETURN_OK )
	{
		if ( conntrack->StatNVSeg == 0 )
		{
			/*------------------------------------------------------------------
			 * Adjust the number of virtual segments again based on
			 * NVSEG_*_LIMITs and NVSEG_*_LIMIT_PERSEGs. This adjustment must
			 * succeed.
			 *------------------------------------------------------------------
			 */
			res = adjustResourceExpectsByQueueNVSegLimits(conntrack,
														  errorbuf,
														  errorbufsize);
			if ( res == FUNC_RETURN_OK )
			{
				elog(LOG, "ConnID %d. Expect query resource (%d MB, %lf CORE) "
						  "x %d ( MIN %d ) resource after adjusting based on "
						  "queue NVSEG limits.",
						  conntrack->ConnID,
						  conntrack->SegMemoryMB,
						  conntrack->SegCore,
						  conntrack->SegNum,
						  conntrack->SegNumMin);
			}
			else
			{
				elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
				transformConnectionTrackProgress(conntrack,
												 CONN_PP_RESOURCE_ACQUIRE_FAIL);
				return res;
			}
		}

		/* Add request to the resource queue and return. */
		addQueryResourceRequestToQueue(queuetrack, conntrack);
		transformConnectionTrackProgress(conntrack,
										 CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT);
		/* Exit on succeeding in adding request to the queue. */
	}
	else
	{
		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
		transformConnectionTrackProgress(conntrack, CONN_PP_RESOURCE_ACQUIRE_FAIL);
	}
	return res;
}

int acquireResourceQuotaFromResQueMgr(ConnectionTrack	conntrack,
									  char			   *errorbuf,
									  int				errorbufsize)
{
	int 					res 		= FUNC_RETURN_OK;
	DynResourceQueueTrack   queuetrack	= conntrack->QueueTrack;
	UserInfo 				userinfo 	= NULL;
	bool					exist		= false;
	/* Check if the user exists and save reference to corresponding user. */
	userinfo = getUserByUserName(conntrack->UserID,
							     strlen(conntrack->UserID),
							     &exist);
	Assert(exist && userinfo != NULL);

	/* Get the queue, and check if the parallel limit is achieved. */
	queuetrack = getQueueTrackByQueueOID(userinfo->QueueOID);
	Assert( queuetrack != NULL );

	conntrack->QueueTrack = queuetrack;
	conntrack->QueueID	  = queuetrack->QueueInfo->OID;

	/* Call quota logic to make decision of resource for current query. */
	res = computeQueryQuota(conntrack, errorbuf, errorbufsize);

	if ( res == FUNC_RETURN_OK )
	{
		if ( conntrack->StatNVSeg == 0 )
		{
			/*------------------------------------------------------------------
			 * Adjust the number of virtual segments again based on
			 * NVSEG_*_LIMITs and NVSEG_*_LIMIT_PERSEGs. This adjustment must
			 * succeed.
			 *------------------------------------------------------------------
			 */
			res = adjustResourceExpectsByQueueNVSegLimits(conntrack,
														  errorbuf,
														  errorbufsize);

			if ( res == FUNC_RETURN_OK )
			{
				elog(LOG, "ConnID %d. Query resource quota expects (%d MB, %lf CORE) "
						  "x %d ( MIN %d ) resource after adjusting based on queue "
						  "NVSEG limits.",
						  conntrack->ConnID,
						  conntrack->SegMemoryMB,
						  conntrack->SegCore,
						  conntrack->SegNum,
						  conntrack->SegNumMin);
			}
			else
			{
				elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
				return res;
			}
		}
	}
	else
	{
		elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
	}
	return res;
}

int adjustResourceExpectsByQueueNVSegLimits(ConnectionTrack	 conntrack,
											char			*errorbuf,
											int			 	 errorbufsize)
{
	DynResourceQueueTrack queuetrack = conntrack->QueueTrack;
	DynResourceQueue	  queue		 = queuetrack->QueueInfo;
	bool				  adjusted	 = false;

	if ( queue->NVSegLowerLimit > MINIMUM_RESQUEUE_NVSEG_LOWER_LIMIT_N )
	{
		/*----------------------------------------------------------------------
		 * Check if there are some conflicts between NVSEG_LOWER_LIMIT and
		 * resource upper limits.
		 *----------------------------------------------------------------------
		 */
		if ( queue->NVSegLowerLimit > conntrack->VSegLimit )
		{
			snprintf(errorbuf, errorbufsize,
					 "queue %s's limit %s=%d is greater than maximum number of "
					 "virtual segments in cluster %d, check queue definition and "
					 "guc variable hawq_rm_nvseg_perquery_limit",
					 queue->Name,
					 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT],
					 queue->NVSegLowerLimit,
					 conntrack->VSegLimit);
			return RESQUEMGR_WRONG_NVSEG_LIMIT_LOWER;
		}

		if ( conntrack->MinSegCountFixed != conntrack->MaxSegCountFixed )
		{
			int limit = conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
			if ( queue->NVSegLowerLimit > limit )
			{
				snprintf(errorbuf, errorbufsize,
						 "queue %s's limit %s=%d is greater than maximum number "
						 "of virtual segments in cluster %d, there are %d available "
						 "segments, each segment has %d maximum virtual segments, "
						 "check queue definition and guc variable "
						 "rm_nvseg_perquery_perseg_limit",
						 queue->Name,
						 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT],
						 queue->NVSegLowerLimit,
						 limit,
						 PRESPOOL->AvailNodeCount,
						 conntrack->VSegLimitPerSeg);
				return RESQUEMGR_WRONG_NVSEG_LIMIT_LOWER;
			}
		}

		if ( queue->NVSegLowerLimit > queuetrack->ClusterSegNumberMax )
		{
			snprintf(errorbuf, errorbufsize,
					 "queue %s's limit %s=%d is greater than queue maximum "
					 "number of virtual segments %d, check queue definition",
					 queue->Name,
					 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT],
					 queue->NVSegLowerLimit,
					 queuetrack->ClusterSegNumberMax);
			return RESQUEMGR_WRONG_NVSEG_LIMIT_LOWER;
		}


		if ( conntrack->SegNum >= queue->NVSegLowerLimit &&
			 conntrack->SegNumMin < queue->NVSegLowerLimit )
		{
			conntrack->SegNumMin = queue->NVSegLowerLimit;
			adjusted =true;
			elog(RMLOG, "ConnID %d. Minimum vseg number adjusted to %d",
						conntrack->ConnID,
						conntrack->SegNumMin);
		}
	}
	else if ( queue->NVSegLowerLimitPerSeg >
			  MINIMUM_RESQUEUE_NVSEG_LOWER_PERSEG_LIMIT_N )
	{
		/*----------------------------------------------------------------------
		 * Check if there are some conflicts between NVSEG_LOWER_PERSEG_LIMIT
		 * and resource upper limits.
		 *----------------------------------------------------------------------
		 */
		int minnvseg = ceil(queue->NVSegLowerLimitPerSeg * PRESPOOL->AvailNodeCount);
		if ( minnvseg > conntrack->VSegLimit )
		{
			snprintf(errorbuf, errorbufsize,
					 "queue %s's limit %s=%lf requires minimum %d virtual "
					 "segments in cluster having %d available segments, it is "
					 "greater than maximum number of virtual segments in cluster "
					 "%d, check queue definition and guc variable "
					 "hawq_rm_nvseg_perquery_perseg_limit",
					 queue->Name,
					 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
					 queue->NVSegLowerLimitPerSeg,
					 minnvseg,
					 PRESPOOL->AvailNodeCount,
					 conntrack->VSegLimit);
			return RESQUEMGR_WRONG_NVSEG_PERSEG_LIMIT_LOWER;
		}

		if ( conntrack->MinSegCountFixed != conntrack->MaxSegCountFixed )
		{
			int limit = conntrack->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
			if ( minnvseg > limit )
			{
				snprintf(errorbuf, errorbufsize,
						 "queue %s's limit %s=%lf requires minimum %d virtual "
						 "segments in cluster having %d available segments, it "
						 "is greater than maximum number of virtual segments in "
						 "cluster %d, each segment has %d maximum virtual "
						 "segments, check queue definition and guc variable "
						 "hawq_rm_nvseg_perquery_perseg_limit",
						 queue->Name,
						 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
						 queue->NVSegLowerLimitPerSeg,
						 minnvseg,
						 PRESPOOL->AvailNodeCount,
						 limit,
						 conntrack->VSegLimitPerSeg);
				return RESQUEMGR_WRONG_NVSEG_PERSEG_LIMIT_LOWER;
			}
		}

		if ( minnvseg > queuetrack->ClusterSegNumberMax )
		{
			snprintf(errorbuf, errorbufsize,
					 "queue %s's limit %s=%lf requires minimum %d virtual "
					 "segments in cluster having %d available segments, it is "
					 "greater than queue maximum number of virtual segments %d, "
					 "check queue definition",
					 queue->Name,
					 RSQDDLAttrNames[RSQ_DDL_ATTR_NVSEG_LOWER_LIMIT_PERSEG],
					 queue->NVSegLowerLimitPerSeg,
					 minnvseg,
					 PRESPOOL->AvailNodeCount,
					 queuetrack->ClusterSegNumberMax);
			return RESQUEMGR_WRONG_NVSEG_PERSEG_LIMIT_LOWER;
		}

		if ( conntrack->SegNumMin < minnvseg )
		{
			conntrack->SegNumMin = minnvseg;
			adjusted =true;
			elog(RMLOG, "ConnID %d. Minimum vseg number adjusted to %d",
						conntrack->ConnID,
						conntrack->SegNumMin);
		}
	}

	if ( queue->NVSegUpperLimit > MINIMUM_RESQUEUE_NVSEG_UPPER_LIMIT_N )
	{
		if ( conntrack->SegNum > queue->NVSegUpperLimit )
		{
			conntrack->SegNum = queue->NVSegUpperLimit;
			adjusted =true;
			elog(RMLOG, "ConnID %d. Maximum vseg number adjusted to %d",
						conntrack->ConnID,
						conntrack->SegNum);
		}
	}
	else if ( queue->NVSegUpperLimitPerSeg >
			  MINIMUM_RESQUEUE_NVSEG_UPPER_PERSEG_LIMIT_N )
	{
		int maxnvseg = ceil(queuetrack->QueueInfo->NVSegUpperLimitPerSeg *
							PRESPOOL->AvailNodeCount);
		if ( conntrack->SegNum > maxnvseg )
		{
			conntrack->SegNum = maxnvseg;
			adjusted =true;
			elog(RMLOG, "ConnID %d. Maximum vseg number adjusted to %d",
						conntrack->ConnID,
						conntrack->SegNum);
		}
	}

	/*--------------------------------------------------------------------------
	 * Finally, we must ensure that upper limits limit the minimum resource
	 * quota. This means, the resource limits from NVSEG upper limits are always
	 * respected.
	 *--------------------------------------------------------------------------
	 */
	if ( conntrack->SegNumMin > conntrack->SegNum )
	{
		conntrack->SegNumMin = conntrack->SegNum;
		elog(RMLOG, "ConnID %d. Minimum vseg number is forced to be equal to "
					"maximum vseg number %d",
					conntrack->ConnID,
					conntrack->SegNumMin);
	}

	return FUNC_RETURN_OK;
}

/* Resource is returned from query to resource queue. */
int returnResourceToResQueMgr(ConnectionTrack conntrack)
{
	int res = FUNC_RETURN_OK;

	if ( !conntrack->isOld ) {

		/* return resource quota to resource queue. */
		returnAllocatedResourceToLeafQueue(
				conntrack->QueueTrack,
				conntrack->SegMemoryMB * conntrack->SegNumActual,
				conntrack->SegCore     * conntrack->SegNumActual);

		/* Minus resource usage by session. */
		if ( conntrack->SessionID >= 0 ) {
			DynResourceQueueTrack quetrack = (DynResourceQueueTrack)
											 (conntrack->QueueTrack);
			minusSessionInUseResource(&(quetrack->DLDetector),
									   conntrack->SessionID,
									   conntrack->SegMemoryMB * conntrack->SegNumActual,
									   conntrack->SegCore     * conntrack->SegNumActual);
		}
	}

	((DynResourceQueueTrack)(conntrack->QueueTrack))->NumOfRunningQueries--;

	/* return allocated resource to resource pool. */
	returnResourceToResourcePool(conntrack->SegMemoryMB,
								 conntrack->SegCore,
								 conntrack->SegIOBytes,
								 conntrack->SliceSize,
								 &(conntrack->Resource),
								 conntrack->isOld);

	transformConnectionTrackProgress(conntrack, CONN_PP_REGISTER_DONE);

	/* Some resource is returned. Try to dispatch resource to queries. */
	PQUEMGR->toRunQueryDispatch = true;

	validateResourcePoolStatus(true);

	return res;
}

void refreshActualMinGRMContainerPerSeg(void)
{
	/*--------------------------------------------------------------------------
	 * There are 3 limits should be considered, the actual water level is the
	 * least value of the 3 limits : resource queue normal capacity caused mean
	 * GRM container number, minimum value of all segments' maximum GRM container
	 * numbers, user setting saved in guc.
	 *
	 *--------------------------------------------------------------------------
	 */

	/* STEP 1. go through each segment to get segment maximum capacity. */
	int minctncount = INT32_MAX;
	int normalctncount = INT32_MAX;
	if ( DRMGlobalInstance->ImpType != NONE_HAWQ2 )
	{
		List 	 *allsegres = NULL;
		ListCell *cell		= NULL;
		getAllPAIRRefIntoList(&(PRESPOOL->Segments), &allsegres);

		foreach(cell, allsegres)
		{
			SegResource segres = (SegResource)(((PAIR)lfirst(cell))->Value);
			if (!IS_SEGSTAT_FTSAVAILABLE(segres->Stat))
			{
				continue;
			}

			if ( segres->Stat->GRMTotalCore < minctncount )
			{
				minctncount = segres->Stat->GRMTotalCore;
			}
		}
		freePAIRRefList(&(PRESPOOL->Segments), &allsegres);

		elog(RMLOG, "Resource manager finds minimum global resource manager "
					"container count can contained by all segments is %d",
					minctncount);

		/* STEP 2. check the queue normal capacity introduced water level. */
		if ( PRESPOOL->AvailNodeCount > 0 &&
			 PQUEMGR->GRMQueueCapacity > 0 &&
			 PRESPOOL->GRMTotalHavingNoHAWQNode.Core > 0 )
		{
			normalctncount = trunc(PRESPOOL->GRMTotalHavingNoHAWQNode.Core *
								   PQUEMGR->GRMQueueCapacity /
								   PRESPOOL->AvailNodeCount);

			elog(RMLOG, "Resource manager calculates normal global resource "
						"manager container count based on target queue capacity "
						"is %d",
						normalctncount);
		}
	}

	/* STEP 3. Get final water level result. */
	int oldval = PQUEMGR->ActualMinGRMContainerPerSeg;
	int newval = minctncount < normalctncount ? minctncount : normalctncount;
	newval = newval < rm_min_resource_perseg ? newval : rm_min_resource_perseg;

	if ( newval != oldval )
	{
		elog(WARNING, "Resource manager adjusts minimum global resource manager "
					  "container count in each segment from %d to %d.",
					  oldval,
					  newval);
	}
	PQUEMGR->ActualMinGRMContainerPerSeg = newval;
}

void refreshResourceQueueCapacity(bool queuechanged)
{
	static char errorbuf[ERRORMESSAGE_SIZE];
	List *qhavingshadow = NULL;

	/*
	 * If the cluster level memory to core ratio is not fixed based on FTS heart-
	 * beat or GRM cluster report, try to fix it firstly, if unfortunately, this
	 * ratio cannot be fixed, skip refreshing queue capacity temporarily.
	 *
	 * NOTE, once this ratio is fixed, it is not changed again until restart
	 * master.
	 *
	 */
	if ( PRESPOOL->ClusterMemoryCoreRatio <= 0 )
	{
		fixClusterMemoryCoreRatio();
		if ( PRESPOOL->ClusterMemoryCoreRatio <= 0 )
		{
			return;
		}
	}

	/* STEP 1. Build all necessary shadow resource queue track instances. */
	buildQueueTrackShadows(PQUEMGR->RootTrack, &qhavingshadow);

	/* STEP 2. Refresh resource queue capacities. */
	refreshResourceQueuePercentageCapacity(queuechanged);

	/* STEP 3. Rebuild queued resource requests. */
	rebuildAllResourceQueueTrackDynamicStatusInShadow(qhavingshadow,
													  queuechanged,
													  errorbuf,
													  sizeof(errorbuf));

	/* STEP 4. Apply changes from resource queue shadows. */
	applyResourceQueueTrackChangesFromShadows(qhavingshadow);

	/* STEP 5. Clean up. */
	cleanupQueueTrackShadows(&qhavingshadow);
}

/* Refresh actual resource queue capacity. */
void refreshResourceQueuePercentageCapacity(bool queuechanged)
{
	/*
	 * Decide The actual capacity. This is necessary because there maybe some
	 * physical machines having different ratio capacity.
	 */
	uint32_t mem  = 0;
	uint32_t core = 0;

	if ( PQUEMGR->RootTrack != NULL )
	{
		if ( DRMGlobalInstance->ImpType == YARN_LIBYARN )
		{
			mem  = PRESPOOL->GRMTotalHavingNoHAWQNode.MemoryMB *
				   PQUEMGR->GRMQueueMaxCapacity;
			core = PRESPOOL->GRMTotalHavingNoHAWQNode.Core     *
				   PQUEMGR->GRMQueueMaxCapacity;

		}
		else if ( DRMGlobalInstance->ImpType == NONE_HAWQ2 )
		{
			mem  = PRESPOOL->FTSTotal.MemoryMB;
			core = PRESPOOL->FTSTotal.Core;
		}
		else
		{
			Assert(false);
		}
	}
	else
	{
		return;
	}

	/*
	 * If we use global resource manager to manage resource, the total capacity
	 * might not follow the cluster memory to core ratio.
	 */
	adjustMemoryCoreValue(&mem, &core);

	elog(DEBUG3, "HAWQ RM :: Use cluster (%d MB, %d CORE) resources as whole.",
				mem, core);

	refreshResourceQueuePercentageCapacityInternal(mem, core, queuechanged);

	/*
	 * After freshing resource queue capacity, it is necessary to try to dispatch
	 * resource to queries.
	 */
	PQUEMGR->toRunQueryDispatch = true;
}

void refreshMemoryCoreRatioLevelUsage(uint64_t curmicrosec)
{
	ListCell *cell = NULL;

	for( int i = 0 ; i < PQUEMGR->RatioCount ; ++i ) {

		DynMemoryCoreRatioTrack mctrack = PQUEMGR->RatioTrackers[i];

		resetResourceBundleData(&(mctrack->TotalUsed),    0, 0.0, mctrack->TotalUsed.Ratio);
		resetResourceBundleData(&(mctrack->TotalRequest), 0, 0.0, mctrack->TotalUsed.Ratio);

		foreach(cell, mctrack->QueueTrackers)
		{
			DynResourceQueueTrack track = lfirst(cell);
			addResourceBundleData(&(mctrack->TotalUsed),
								  track->TotalUsed.MemoryMB,
								  track->TotalUsed.Core);
			if (track->TotalRequest.MemoryMB > track->ClusterMemoryMaxMB)
			{
				addResourceBundleData(&(mctrack->TotalRequest),
								  	  track->ClusterMemoryMaxMB,
									  track->ClusterVCoreMax);
			}
			else
			{
				addResourceBundleData(&(mctrack->TotalRequest),
									  track->TotalRequest.MemoryMB,
									  track->TotalRequest.Core);
			}
		}

		if ( mctrack->TotalRequest.MemoryMB > mctrack->ClusterMemoryMaxMB )
		{
			mctrack->TotalRequest.MemoryMB = mctrack->ClusterMemoryMaxMB;
		}

		markMemoryCoreRatioWaterMark(&(PQUEMGR->RatioWaterMarks[i]),
									 curmicrosec,
									 mctrack->TotalUsed.MemoryMB,
									 mctrack->TotalUsed.Core);
	}
}

void markMemoryCoreRatioWaterMark(DQueue 		marks,
								  uint64_t 		curmicrosec,
								  int32_t 		memmb,
								  double 		core)
{
	/* Each mark instance records the maximum usage in one sencond. */
	uint64_t 					cursec   = curmicrosec / 1000000;
	DynMemoryCoreRatioWaterMark lastmark = NULL;
	int32_t						oldmarkmem = 0;
	double 						oldmarkcore = 0;

	elog(DEBUG5, "Resource water mark candidate (%d MB, %lf CORE) "UINT64_FORMAT,
			     memmb,
			     core,
			     cursec);

	if ( marks->NodeCount > 0 ) {
		DynMemoryCoreRatioWaterMark firstmark =
				(DynMemoryCoreRatioWaterMark)
				getDQueueContainerData(getDQueueContainerHead(marks));
		oldmarkmem  = firstmark->ClusterMemoryMB;
		oldmarkcore = firstmark->ClusterVCore;
		elog(DEBUG5, "Resource water mark old (%d MB, %lf CORE)",
				     oldmarkmem,
				     oldmarkcore);
	}

	if ( marks->NodeCount > 0 ) {
		lastmark = (DynMemoryCoreRatioWaterMark)
				   getDQueueContainerData(getDQueueContainerTail(marks));

		if ( lastmark->LastRecordTime == cursec ) {
			lastmark->ClusterMemoryMB = lastmark->ClusterMemoryMB > memmb ?
									    lastmark->ClusterMemoryMB :
									    memmb;
			lastmark->ClusterVCore    = lastmark->ClusterVCore > core ?
					                    lastmark->ClusterVCore :
									    core;
			/* Get the last mark cut. We will process the left marks now. */
			removeDQueueTailNode(marks);
		}
		else {
			lastmark = NULL;
		}
	}

	if ( lastmark == NULL) {
		lastmark = rm_palloc0(PCONTEXT, sizeof(DynMemoryCoreRatioWaterMarkData));
		lastmark->LastRecordTime  = cursec;
		lastmark->ClusterMemoryMB = memmb;
		lastmark->ClusterVCore    = core;
	}

	elog(DEBUG5, "Resource water mark list size %d before timeout old marks.",
			     marks->NodeCount);

	/* Check if we should remove some marks from the head. */
	while( marks->NodeCount > 0 ) {
		DynMemoryCoreRatioWaterMark firstmark =
				(DynMemoryCoreRatioWaterMark)
				getDQueueContainerData(getDQueueContainerHead(marks));
		if ( lastmark->LastRecordTime - firstmark->LastRecordTime > rm_resource_timeout ) {
			removeDQueueHeadNode(marks);
			rm_pfree(PCONTEXT, firstmark);
		}
		else {
			break;
		}
	}

	elog(DEBUG5, "Resource water mark list size %d after timeout old marks.",
			     marks->NodeCount);

	/* Check if we can skip some marks before the last one (the one we have cut). */
	while( marks->NodeCount > 0 ) {
		DynMemoryCoreRatioWaterMark last2mark =
				(DynMemoryCoreRatioWaterMark)
				getDQueueContainerData(getDQueueContainerTail(marks));
		if ( last2mark->ClusterMemoryMB <= lastmark->ClusterMemoryMB ) {
			removeDQueueTailNode(marks);
			rm_pfree(PCONTEXT, last2mark);
		}
		else {
			break;
		}
	}

	elog(DEBUG5, "Resource water mark list size %d after remove low marks.",
			     marks->NodeCount);

	/* Add the last one back to the tail. */
	insertDQueueTailNode(marks, lastmark);

	Assert(marks->NodeCount > 0);
	{
		DynMemoryCoreRatioWaterMark firstmark =
				(DynMemoryCoreRatioWaterMark)
				getDQueueContainerData(getDQueueContainerHead(marks));
		if ( firstmark->ClusterMemoryMB != oldmarkmem ) {
			elog(LOG, "Resource water mark changes from (%d MB, %lf CORE) to "
					  "(%d MB, %lf CORE)",
					  oldmarkmem,
					  oldmarkcore,
					  firstmark->ClusterMemoryMB,
					  firstmark->ClusterVCore);
		}
	}
}

#define ELOG_WARNING_ERRORMESSAGE_PARSEUSERATTR(errorbuf)					   \
		elog(WARNING, "Resource queue cannot parse role attribute, %s",  	   \
					  (errorbuf));

/**
 * This function parses the attributes and translate into UserInfo structure's
 * attributes. This functions does not generate logs higher than WARNING, the
 * concrete error is also saved in error buffer to make the caller able to pass
 * back the message to remote process.
 */
int parseUserAttributes( List 	 	*attributes,
						 UserInfo 	 user,
						 char		*errorbuf,
						 int		 errorbufsize)
{
	int 					res 		 = FUNC_RETURN_OK;
	int						res2		 = FUNC_RETURN_OK;
	int 					attrindex 	 = -1;
	DynResourceQueueTrack 	track		 = NULL;
	Oid 					useroid 	 = InvalidOid;
	Oid						queueoid	 = InvalidOid;
	bool					exist		 = false;
	int64_t					queueoid64	 = -1;

	/* Initialize attributes. */
	user->isSuperUser 	= false;
	user->QueueOID 		= -1;

	ListCell *cell = NULL;
	foreach(cell, attributes)
	{
		KVProperty value = lfirst(cell);

		SimpStringPtr attrname  = &(value->Key);
		SimpStringPtr attrvalue = &(value->Val);

		attrindex = getUSRTBLAttributeNameIndex(attrname);

		if ( SimpleStringEmpty(attrvalue) )
		{
			if ( SimpleStringEmpty(attrvalue) )
			{
				elog(LOG, "No value for attribute [%s].", attrname->Str);
				continue;
			}
		}

		if ( attrindex == -1 )
		{
			res = RESQUEMGR_WRONG_ATTRNAME;
			snprintf(errorbuf, errorbufsize,
					 "cannot recognize resource queue attribute %s",
					 attrname->Str);
			ELOG_WARNING_ERRORMESSAGE_PARSEUSERATTR(errorbuf)
			return res;
		}

		switch(attrindex)
		{
		case USR_TBL_ATTR_OID:
			/* The oid is expected to be unique. */
			res2 = SimpleStringToOid(attrvalue, &useroid);
			getUserByUserOID(useroid, &exist);
			if ( exist )
			{
				res = RESQUEMGR_DUPLICATE_USERID;
				snprintf(errorbuf, errorbufsize, "duplicate user oid %s", attrvalue->Str);
				ELOG_WARNING_ERRORMESSAGE_PARSEUSERATTR(errorbuf)
				return res;
			}
			user->OID = useroid;
			break;

		case USR_TBL_ATTR_NAME:

			/* The user name must be unique. */
			getUserByUserName(attrvalue->Str, attrvalue->Len, &exist);
			if ( exist )
			{
				res = RESQUEMGR_DUPLICATE_USERID;
				snprintf(errorbuf, errorbufsize, "duplicate user name %s", attrvalue->Str);
				ELOG_WARNING_ERRORMESSAGE_PARSEUSERATTR(errorbuf)
				return res;
			}

			/* Set user name value. */
			strncpy(user->Name, attrvalue->Str, attrvalue->Len);
			break;

		case USR_TBL_ATTR_TARGET_QUEUE:

			res2 = SimpleStringToOid(attrvalue, &queueoid);
			if ( res2 != FUNC_RETURN_OK )
			{
				res = RESQUEMGR_WRONG_ATTR;
				snprintf(errorbuf, errorbufsize,
						 "wrong target resource queue oid %s",
						 attrvalue->Str);
				ELOG_WARNING_ERRORMESSAGE_PARSEUSERATTR(errorbuf)
				return res;
			}
			/* The target queue must exist. */
			queueoid64 = queueoid;
			track = getQueueTrackByQueueOID(queueoid64);
			if ( track == NULL )
			{
				res = RESQUEMGR_WRONG_TARGET_QUEUE;
				snprintf(errorbuf, errorbufsize,
						 "cannot find target resource queue %s",
						 attrvalue->Str);
				ELOG_WARNING_ERRORMESSAGE_PARSEUSERATTR(errorbuf)
				return res;
			}

			/* Set value. */
			user->QueueOID = track->QueueInfo->OID;

			break;
		case USR_TBL_ATTR_PRIORITY:
			break;
		case USR_TBL_ATTR_IS_SUPERUSER:
		{
			bool issuper = false;
			res2 = SimpleStringToBool(attrvalue,&issuper);
			if ( res2 != FUNC_RETURN_OK )
			{
				res = RESQUEMGR_WRONG_ATTR;
				snprintf(errorbuf, errorbufsize,
						 "Wrong user issuper setting %s",
						 attrvalue->Str);
				ELOG_WARNING_ERRORMESSAGE_PARSEUSERATTR(errorbuf)
				return res;
			}
			user->isSuperUser = issuper ? 1 : 0;
			break;
		}
		default:
			Assert(0);
		}
	}

	return FUNC_RETURN_OK;
}

int checkUserAttributes( UserInfo user, char *errorbuf, int errorbufsize)
{
	if ( user->QueueOID == -1 ) {
		user->QueueOID = DEFAULTRESQUEUE_OID;
	}
	return FUNC_RETURN_OK;
}

/* Create one user. Expect always no error. */
void createUser(UserInfo userinfo)
{
	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	PQUEMGR->Users = lappend(PQUEMGR->Users, userinfo);
	MEMORY_CONTEXT_SWITCH_BACK

	if ( userinfo->OID > InvalidOid )
	{
		setUserIndexedByUserOID(userinfo);
	}
	setUserIndexedByUserName(userinfo);
}

void setUserIndexedByUserOID(UserInfo userinfo)
{
	SimpArray key;
	setSimpleArrayRef(&key, (void *)&(userinfo->OID), sizeof(int64_t));
	setHASHTABLENode(&(PQUEMGR->UsersIDIndex), &key, userinfo, false);
}

void setUserIndexedByUserName(UserInfo userinfo)
{
	SimpString key;
	setSimpleStringRefNoLen(&key, userinfo->Name);
	setHASHTABLENode(&(PQUEMGR->UsersNameIndex),
					 (void *)(&(key)),
					 userinfo,
					 false);
}

UserInfo getUserByUserName( const char *userid, int useridlen, bool *exist)
{
	PAIR						pair				= NULL;
	SimpString					key;

	/* Check if the user exists. */
	setSimpleStringRef(&key, (char *)userid, useridlen);
	pair = getHASHTABLENode(&(PQUEMGR->UsersNameIndex), &key);
	if( pair == NULL ) {
		*exist = false;
		return NULL;
	}

	*exist = true;
	return (UserInfo)(pair->Value);
}

UserInfo getUserByUserOID ( int64_t useroid, bool *exist)
{
	PAIR		pair	= NULL;
	SimpArray 	key;
	setSimpleArrayRef(&key, (void *)&useroid, sizeof(int64_t));
	pair = getHASHTABLENode(&(PQUEMGR->UsersIDIndex), &key);
	if ( pair == NULL )
	{
		*exist = false;
		return NULL;
	}
	*exist = true;
	return (UserInfo)(pair->Value);
}

bool hasUserAssignedToQueue(DynResourceQueue queue)
{
	ListCell *cell = NULL;
	foreach(cell, PQUEMGR->Users)
	{
		UserInfo user = lfirst(cell);
		if ( user->QueueOID == queue->OID )
		{
			return true;
		}
	}
	return false;
}

int dropUser(int64_t useroid, char* name)
{
	Assert(useroid != InvalidOid && name != NULL);

	ListCell *cell 	   = NULL;
	ListCell *prevcell = NULL;
	foreach(cell, PQUEMGR->Users)
	{
		UserInfo user = lfirst(cell);
		if(user->OID == useroid)
		{
			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
			PQUEMGR->Users = list_delete_cell(PQUEMGR->Users, cell, prevcell);
			MEMORY_CONTEXT_SWITCH_BACK
			elog(LOG, "Resource manager finds user oid "INT64_FORMAT" and delete.",
					  useroid);

			SimpArray key1;
			setSimpleArrayRef(&key1, (void *)&useroid, sizeof(int64_t));
			int res = removeHASHTABLENode(&(PQUEMGR->UsersIDIndex), &key1);
			elog(DEBUG3, "Resource manager removed node from UsersIDIndex returns %d",
						 res);
			Assert(res == FUNC_RETURN_OK);

			SimpString key2;
			setSimpleStringRef(&key2, name, strlen(name));
			res = removeHASHTABLENode(&(PQUEMGR->UsersNameIndex), &key2);
			elog(DEBUG3, "Resource manager removed node from UsersNameIndex returns %d",
						 res);
			Assert(res == FUNC_RETURN_OK);
			return FUNC_RETURN_OK;
		}
		prevcell = cell;
	}

	return RESQUEMGR_NO_USERID;
}

void dispatchResourceToQueries(void)
{
	bool 		hasresourceallocated = false;
	bool 		hasrequest 		  	 = false;

	elog(DEBUG3, "Resource manager tries to dispatch resource to queries.");

	/*
	 *--------------------------------------------------------------------------
	 * STEP 1. Re-balance resource among different mem/core ratio trackers. After
	 * 		   this step, each mem/core ratio trackers process their own queues
	 * 		   only.
	 *
	 * 		   IN CURRENT VERSION. This is not implemented.
	 *--------------------------------------------------------------------------
	 */

	/*
	 * STEP 2. Decide how much resource are dispatched to each segment.
	 */
	for ( int i = 0 ; i < PQUEMGR->RatioCount ; ++i )
	{
		DQueueData  toallocqueues;
		initializeDQueue(&toallocqueues, PCONTEXT);
		DynMemoryCoreRatioTrack mctrack = PQUEMGR->RatioTrackers[i];

		/* Ignore the memory/core ratio 1) not in use. 2) no resource allocated. */
		if ( (mctrack->ClusterMemoryMaxMB == 0 || IS_DOUBLE_ZERO(mctrack->ClusterVCoreMax)) ||
			 (mctrack->TotalAllocated.MemoryMB == 0 && IS_DOUBLE_ZERO(mctrack->TotalAllocated.Core)) )
		{
			elog(DEBUG3, "Resource manager skipped memory core ratio index %d, "
						 "memory max limit %d MB, %lf CORE, "
						 "total allocated %d MB, %lf CORE",
						 i,
						 mctrack->ClusterMemoryMaxMB,
						 mctrack->ClusterVCoreMax,
						 mctrack->TotalAllocated.MemoryMB,
						 mctrack->TotalAllocated.Core);
			continue;
		}

		uint32_t	allmemory 		  = mctrack->TotalAllocated.MemoryMB;
		uint32_t	availmemory 	  = mctrack->TotalAllocated.MemoryMB;
		double		availcore		  = mctrack->TotalAllocated.Core;
		uint32_t	totalmemoryweight = 0;

		/*
		 * Count out over used resource queues. They will pause allocating resource
		 * until the water mark is lower than expected weight.
		 */
		ListCell *cell = NULL;
		foreach(cell, mctrack->QueueTrackers)
		{
			DynResourceQueueTrack track = (DynResourceQueueTrack)lfirst(cell);

			/* Reset queue resource expect status. */
			track->expectMoreResource = false;

			/* Ignore the queues not in use. */
			if ( !track->isBusy )
			{
				elog(DEBUG3, "Resource manager skips idle resource queue %s",
							 track->QueueInfo->Name);
				continue;
			}

			double expweight  = 1.0 * track->QueueInfo->ClusterMemoryMB /
							    	mctrack->ClusterMemory;
			double actweight  = allmemory == 0 ?
									0 :
									(1.0 * track->TotalUsed.MemoryMB / allmemory);

			/* If the queue is overusing resource, keep it. */
			if ( actweight > expweight ||
				 track->TotalUsed.MemoryMB > track->ClusterMemoryMaxMB )
			{
				resetResourceBundleData(&(track->TotalAllocated),
										track->TotalUsed.MemoryMB,
										track->TotalUsed.Core,
										track->TotalAllocated.Ratio);
				track->pauseAllocation = true;
				availmemory -= track->TotalUsed.MemoryMB;
				availcore   -= track->TotalUsed.Core;

				elog(DEBUG3, "Resource queue %s over uses resource with weight "
							 "%lf, expect weight %lf. Currently total used "
							 "(%d MB, %lf CORE). Allocation to queries is paused.",
							 track->QueueInfo->Name,
							 actweight,
							 expweight,
							 track->TotalUsed.MemoryMB,
							 track->TotalUsed.Core);

				/* We still need to handle the resource queue dead lock here. */
				detectAndDealWithDeadLock(track);

			}
			else
			{
				insertDQueueTailNode(&toallocqueues, track);
				track->pauseAllocation = false;

				totalmemoryweight += track->QueueInfo->ClusterMemoryMB;

				elog(DEBUG3, "Resource queue %s uses resource with weight "
							 "%lf, expect weight %lf. Currently total used "
							 "(%d MB, %lf CORE). To assign more resource.",
							 track->QueueInfo->Name,
							 actweight,
							 expweight,
							 track->TotalUsed.MemoryMB,
							 track->TotalUsed.Core);
			}
		}

		/* Assign resource to not over used resource queues. */
		elog(DEBUG3, "Reassignable resource is (%d MB, %lf CORE)",
					 availmemory,
					 availcore);

		/*
		 * Handle all the other queues to assign resource to queues. Remaining
		 * resource is dispatched to the chosen queues based on their resource
		 * weight.
		 */
		uint32_t leftmemory2 = availmemory;
		DQUEUE_LOOP_BEGIN(&toallocqueues, iter, DynResourceQueueTrack, track)
			double expweight  = 1.0 * track->QueueInfo->ClusterMemoryMB /
							    totalmemoryweight;

			uint32_t potentialmemuse =
				track->TotalUsed.MemoryMB + track->TotalRequest.MemoryMB >
					track->ClusterMemoryMaxMB ?
				track->ClusterMemoryMaxMB :
				track->TotalUsed.MemoryMB + track->TotalRequest.MemoryMB;

			double actweight2 = 1.0 * potentialmemuse  / availmemory;

			/*
			 * CASE 1. The queue acquire only a little resource that does not
			 * 		   exceed the target weight in this memory core ratio. We
			 * 		   exactly allocate the resource it wants.
			 */
			if ( actweight2 < expweight )
			{
				resetResourceBundleData(&(track->TotalAllocated),
										potentialmemuse,
										1.0 * potentialmemuse / track->MemCoreRatio,
										track->TotalAllocated.Ratio);
				leftmemory2 -= potentialmemuse;
				Assert(leftmemory2 >= 0);
				elog(DEBUG3, "Resource manager fully satisfies to resource queue "
							 "%s with (%d MB, %lf CORE) allocated.",
							 track->QueueInfo->Name,
							 track->TotalAllocated.MemoryMB,
							 track->TotalAllocated.Core);
			}

			/*
			 * CASE 2. The queue can only get partial requested resource.
			 */
			else
			{
				uint32_t allocmemory = trunc(expweight * availmemory);
				double   alloccore   = 1.0 * availmemory / track->MemCoreRatio;

				resetResourceBundleData(&(track->TotalAllocated),
										allocmemory,
										alloccore,
										track->TotalAllocated.Ratio);
				/* Mark that the queue needs more resource if possible. */
				track->expectMoreResource = true;

				leftmemory2 -= allocmemory;
				Assert(leftmemory2 >= 0);
				elog(DEBUG3, "Resource manager partially satisfies to resource "
							 "queue %s with (%d MB, %lf CORE) allocated.",
							 track->QueueInfo->Name,
							 track->TotalAllocated.MemoryMB,
							 track->TotalAllocated.Core);
			}

			elog(DEBUG3, "Resource manager allocates resource (%d MB, %lf CORE) "
						 "in queue %s.",
						 track->TotalAllocated.MemoryMB,
						 track->TotalAllocated.Core,
						 track->QueueInfo->Name);

			double evalcore = IS_DOUBLE_ZERO(track->TotalAllocated.Core) ?
							  VALIDATE_RESOURCE_BIAS :
							  track->TotalAllocated.Core * (1+VALIDATE_RESOURCE_BIAS);
			Assert(evalcore >= track->TotalUsed.Core);
		DQUEUE_LOOP_END

		/*
		 * Decide left resource. The resource is assigned to one in-use queue
		 * expecting more resource in a round-robin way. Add resource to as few
		 * queues as possible.
		 */
		if ( list_length(mctrack->QueueTrackers) > 0 && leftmemory2 > 0 )
		{
			/* In case the count of queues is less than before. */
			if ( mctrack->QueueIndexForLeftResource >= list_length(mctrack->QueueTrackers) )
			{
				mctrack->QueueIndexForLeftResource = 0;
			}

			ListCell *p = list_nth_cell(mctrack->QueueTrackers,
										mctrack->QueueIndexForLeftResource);
			DynResourceQueueTrack q = NULL;
			for ( int cq = 0 ; cq < list_length(mctrack->QueueTrackers) ; ++cq )
			{
				DynResourceQueueTrack tmpq = lfirst(p);
				if ( tmpq->expectMoreResource )
				{
					q = tmpq;
					if ( leftmemory2 + q->TotalAllocated.MemoryMB <= q->ClusterMemoryMaxMB)
					{
						elog(DEBUG3, "Resource manager allocates resource (%d MB, %lf CORE) "
									 "in queue %s.",
									 leftmemory2,
									 1.0 * leftmemory2 / q->MemCoreRatio,
									 q->QueueInfo->Name);

						addResourceBundleData(&(q->TotalAllocated),
										  	  leftmemory2,
											  1.0 * leftmemory2 / q->MemCoreRatio);
						leftmemory2 = 0;
						break;
					}
					else {
						uint32_t memorydelta = q->ClusterMemoryMaxMB - q->TotalAllocated.MemoryMB;

						elog(DEBUG3, "Resource manager allocates resource (%d MB, %lf CORE) "
									 "in queue %s.",
									 memorydelta,
									 1.0 * memorydelta / q->MemCoreRatio,
									 q->QueueInfo->Name);

						addResourceBundleData(&(q->TotalAllocated),
											  memorydelta,
											  1.0 * memorydelta / q->MemCoreRatio);
						leftmemory2 -= memorydelta;
					}
					break;
				}

				/* Try next queue in next iteration. */
				p = lnext(p);
				if ( p == NULL )
				{
					mctrack->QueueIndexForLeftResource = 0;
					p = list_head(mctrack->QueueTrackers);
				}
				else
				{
					mctrack->QueueIndexForLeftResource++;
				}
			}

			mctrack->QueueIndexForLeftResource++;
		}

		/*
		 * Dispatch resource to queries. We firstly handle the queues having
		 * resource fragment problem. Then the left queues.
		 */
		for ( int i = 0 ; i < toallocqueues.NodeCount ; ++i )
		{
			DynResourceQueueTrack track = (DynResourceQueueTrack)
										  (removeDQueueHeadNode(&toallocqueues));
			if ( !track->troubledByFragment )
			{
				insertDQueueTailNode(&toallocqueues, track);
				continue;
			}
			int oldreqnum = track->QueryResRequests.NodeCount;
			hasrequest = oldreqnum > 0;
			dispatchResourceToQueriesInOneQueue(track);
			int newreqnum = track->QueryResRequests.NodeCount;
			if ( newreqnum != oldreqnum )
			{
				hasresourceallocated = true;
			}
		}

		while( toallocqueues.NodeCount > 0 )
		{
			DynResourceQueueTrack track = (DynResourceQueueTrack)
										  (removeDQueueHeadNode(&toallocqueues));
			int oldreqnum = track->QueryResRequests.NodeCount;
			hasrequest = oldreqnum > 0;
			dispatchResourceToQueriesInOneQueue(track);
			int newreqnum = track->QueryResRequests.NodeCount;
			if ( newreqnum != oldreqnum )
			{
				hasresourceallocated = true;
			}
		}
		Assert(toallocqueues.NodeCount == 0);
		cleanDQueue(&toallocqueues);

	}

	PQUEMGR->toRunQueryDispatch = !hasrequest || hasresourceallocated;
	if ( !PQUEMGR->toRunQueryDispatch )
	{
		elog(DEBUG3, "Resource manager pauses allocating resource to query because of "
				     "lack of resource.");
	}
}

/*----------------------------------------------------------------------------*/
/*                    RESOURCE QUEUE MANAGER INTERNAL APIs                    */
/*----------------------------------------------------------------------------*/

/**
 * Create new resource queue tracker instance for one resource queue.
 */
DynResourceQueueTrack createDynResourceQueueTrack(DynResourceQueue queue)
{
	DynResourceQueueTrack newtrack =
		(DynResourceQueueTrack)rm_palloc0(PCONTEXT,
										  sizeof(DynResourceQueueTrackData));

	initializeDQueue(&(newtrack->QueryResRequests), PCONTEXT);

	newtrack->QueueInfo   		  = queue;
	newtrack->ParentTrack 		  = NULL;
	newtrack->ChildrenTracks 	  = NULL;
	newtrack->CurConnCounter 	  = 0;
	newtrack->RatioIndex          = -1;
	newtrack->ClusterSegNumber    = 0;
	newtrack->ClusterSegNumberMax = 0;
	newtrack->ClusterMemoryMaxMB  = 0;
	newtrack->ClusterVCoreMax     = 0;
	newtrack->ClusterMemoryActPer = 0;
	newtrack->ClusterMemoryMaxPer = 0;
	newtrack->ClusterVCoreActPer  = 0;
	newtrack->ClusterVCoreMaxPer  = 0;
	newtrack->trackedMemCoreRatio = false;
	newtrack->isBusy			  = false;
	newtrack->pauseAllocation	  = false;
	newtrack->troubledByFragment  = false;
	newtrack->NumOfRunningQueries = 0;

	resetResourceBundleData(&(newtrack->TotalAllocated), 0, 0.0, 0);
	resetResourceBundleData(&(newtrack->TotalRequest)  , 0, 0.0, 0);
	resetResourceBundleData(&(newtrack->TotalUsed)     , 0, 0.0, 0);

	initializeResqueueDeadLockDetector(&(newtrack->DLDetector), newtrack);
	return newtrack;
}

/**
 * Free one resource queue tracker instance, expect that this tracker has no
 * active connection information saved, the connection with the other tracker
 * instance has be cut.
 */
void shallowFreeResourceQueueTrack(DynResourceQueueTrack track)
{
	Assert( list_length(track->ChildrenTracks) == 0 );
	Assert( track->QueryResRequests.NodeCount == 0 );
	cleanDQueue(&(track->QueryResRequests));
	rm_pfree(PCONTEXT, track);
}

void deepFreeResourceQueueTrack(DynResourceQueueTrack track)
{
	Assert( list_length(track->ChildrenTracks) == 0 );
	Assert( track->ParentTrack == NULL );

	resetResourceDeadLockDetector(&(track->DLDetector));

	while(track->QueryResRequests.NodeCount > 0)
	{
		ConnectionTrack conn = (ConnectionTrack)
							   (removeDQueueHeadNode(&(track->QueryResRequests)));
		freeUsedConnectionTrack(conn);
	}

	rm_pfree(PCONTEXT, track->QueueInfo);
	shallowFreeResourceQueueTrack(track);
}

int getRSQTBLAttributeNameIndex(SimpStringPtr attrname)
{
	for ( int i = 0 ; i < RSQ_TBL_ATTR_COUNT ; ++i ) {
		if ( SimpleStringComp(attrname, RSQTBLAttrNames[i]) == 0 ) {
			return i;
		}
	}
	return -1;
}

int getRSQDDLAttributeNameIndex(SimpStringPtr attrname)
{
	for ( int i = 0 ; i < RSQ_DDL_ATTR_COUNT ; ++i )
	{
		if ( SimpleStringComp(attrname, RSQDDLAttrNames[i]) == 0 )
		{
			return i;
		}
	}
	return -1;
}

int getUSRTBLAttributeNameIndex(SimpStringPtr attrname)
{
	for ( int i = 0 ; i < USR_TBL_ATTR_COUNT ; ++i ) {
		if ( SimpleStringComp(attrname, USRTBLAttrNames[i]) == 0 ) {
			return i;
		}
	}
	return -1;
}

const char *getUSRTBLAttributeName(int attrindex)
{
	Assert( attrindex >= 0 && attrindex < USR_TBL_ATTR_COUNT );
	return USRTBLAttrNames[attrindex];
}

void resetResourceBundleData(ResourceBundle detail,
							 uint32_t mem,
							 double core,
							 uint32_t ratio)
{
	detail->MemoryMB = mem;
	detail->Core     = core;
	detail->Ratio	 = ratio;
}

void addResourceBundleData(ResourceBundle detail, int32_t mem, double core)
{
	detail->MemoryMB += mem;
	detail->Core += core;
}

void minusResourceBundleData(ResourceBundle detail, int32_t mem, double core)
{
	detail->MemoryMB -= mem;
	detail->Core -= core;
	if (IS_DOUBLE_EQ(detail->Core, 0)) {
		// this setting is to avoid accumulating double precision problem
		detail->Core = 0.0;
	}
}

void resetResourceBundleDataByBundle(ResourceBundle detail, ResourceBundle source)
{
	resetResourceBundleData(detail, source->MemoryMB, source->Core, source->Ratio);
}

void addResourceBundleDataByBundle(ResourceBundle detail, ResourceBundle source)
{
	addResourceBundleData(detail, source->MemoryMB, source->Core);
}

void minusResourceBundleDataByBundle(ResourceBundle detail, ResourceBundle source)
{
	minusResourceBundleData(detail, source->MemoryMB, source->Core);
}

/**
 * Compute the query quota.
 */
int computeQueryQuota(ConnectionTrack conn, char *errorbuf, int errorbufsize)
{
	Assert( conn != NULL );
	Assert( conn->QueueTrack != NULL );

	int					  res		= FUNC_RETURN_OK;
	int					  policy	= 0;
	DynResourceQueueTrack track		= (DynResourceQueueTrack)(conn->QueueTrack);

	policy = track->QueueInfo->AllocatePolicy;
	Assert( policy >= 0 && policy < RSQ_ALLOCATION_POLICY_COUNT );

	/*--------------------------------------------------------------------------
	 * Get one segment resource quota. If statement level resource quota is not
	 * specified, the queue vseg resource quota is derived, otherwise, statement
	 * level resource quota. The resource memory/core ratio is not changed, thus
	 * code has to calculate the adjusted vcore quota for each vseg in case
	 * statement level resource quota is active.
	 *--------------------------------------------------------------------------
	 */
	if ( conn->StatNVSeg > 0 )
	{
		conn->SegMemoryMB = conn->StatVSegMemoryMB;
		conn->SegCore	  = track->QueueInfo->SegResourceQuotaVCore *
							conn->StatVSegMemoryMB /
							track->QueueInfo->SegResourceQuotaMemoryMB;
		conn->SegNum	  = conn->StatNVSeg;
		conn->SegNumMin	  = conn->StatNVSeg;

		/* Check if the resource capacity is more than the capacity of queue. */
		conn->SegNumEqual = ceil(1.0 * conn->SegMemoryMB * conn->SegNumMin /
						  	     track->QueueInfo->SegResourceQuotaMemoryMB);
		Assert( conn->SegNumEqual > 0 );
		if ( conn->SegNumEqual > track->ClusterSegNumberMax )
		{
			res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
			snprintf(errorbuf, errorbufsize,
					 "statement resource quota %d MB x %d vseg exceeds resource "
					 "queue maximum capacity %d MB",
					 conn->StatVSegMemoryMB,
					 conn->StatNVSeg,
					 track->ClusterSegNumberMax *
					 track->QueueInfo->SegResourceQuotaMemoryMB);

			elog(WARNING, "ConnID %d. %s", conn->ConnID, errorbuf);
			return res;
		}
	}
	else
	{
		conn->SegMemoryMB = track->QueueInfo->SegResourceQuotaMemoryMB;
		conn->SegCore 	  = track->QueueInfo->SegResourceQuotaVCore;

		int vseglimit = 0;
		/*----------------------------------------------------------------------
		 * The limit of vseg number per segment is valid only when query does
		 * not have fixed vseg number to request.
		 *----------------------------------------------------------------------
		 */
		if ( conn->MinSegCountFixed != conn->MaxSegCountFixed )
		{
			vseglimit = conn->VSegLimitPerSeg * PRESPOOL->AvailNodeCount;
			vseglimit = conn->VSegLimit < vseglimit? conn->VSegLimit : vseglimit;
		}
		else
		{
			vseglimit = conn->VSegLimit;
		}

		/*----------------------------------------------------------------------
		 * Compute total resource quota. This calculation already considers the
		 * query vseg limit and vseg perseg limit.
		 *----------------------------------------------------------------------
		 */
		res = AllocationPolicy[policy] (track,
										&(conn->SegNum),
										&(conn->SegNumMin),
										vseglimit ,
										errorbuf,
										errorbufsize);
		if ( res != FUNC_RETURN_OK )
		{
			/* No setting error buffer here. We expect this is already set. */
			return res;
		}

		if ( conn->SegNum < conn->MinSegCountFixed )
		{
			res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;
			snprintf(errorbuf, errorbufsize,
					 "minimum expected number of virtual segment %d is more than "
					 "maximum possible number %d in queue %s",
					 conn->MinSegCountFixed,
					 conn->SegNum,
					 track->QueueInfo->Name);

			elog(WARNING, "ConnID %d. %s", conn->ConnID, errorbuf);
			return res;
		}

		elog(RMLOG, "ConnID %d. Expect query resource (%d MB, %lf CORE) x %d "
				    "(MIN %d) after checking queue capacity.",
					conn->ConnID,
					conn->SegMemoryMB,
					conn->SegCore,
					conn->SegNum,
					conn->SegNumMin);

		/*------------------------------------------------------------------
		 * The following logic consider the actual resource requirement from
		 * dispatcher based on table size, workload, etc. The requirement is
		 * described by (MinSegCountFixed, MaxSegCountFixed). The requirement
		 * can be satisfied only when there is a non-empty intersect between
		 * (MinSegCountFixed, MaxSegCountFixed) and (SegNumMin, SegNum).
		 *------------------------------------------------------------------
		 */
		if ( conn->MinSegCountFixed < conn->MaxSegCountFixed )
		{
			conn->SegNumMin = conn->MaxSegCountFixed < conn->SegNumMin ?
							  conn->MinSegCountFixed :
							  max(conn->SegNumMin, conn->MinSegCountFixed);
			conn->SegNum = min(conn->SegNum, conn->MaxSegCountFixed);
		}
		else
		{
			Assert(conn->SegNum >= conn->MaxSegCountFixed);
			conn->SegNumMin = conn->MinSegCountFixed;
			conn->SegNum	= conn->MaxSegCountFixed;
		}

		elog(RMLOG, "ConnID %d. Expect query resource (%d MB, %lf CORE) x %d "
				    "(MIN %d) after checking query expectation %d (MIN %d).",
					conn->ConnID,
					conn->SegMemoryMB,
					conn->SegCore,
					conn->SegNum,
					conn->SegNumMin,
					conn->MaxSegCountFixed,
					conn->MinSegCountFixed);

	}

	/*--------------------------------------------------------------------------
	 * Decide vseg number and minimum runnable vseg number. User may set guc
	 * rm_nvseg_perquery_limit at session level, this must be followed. Even
	 * in case the vseg number is set by statement level resource quota.
	 *
	 * Another guc rm_nvseg_perquery_perseg_limit can also limit the number of
	 * vseg for one statement execution.
	 *--------------------------------------------------------------------------
	 */
	if ( conn->SegNumMin > conn->VSegLimit )
	{
		res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;

		snprintf(errorbuf, errorbufsize,
				 "expected minimum number of virtual segments %d exceeds the "
				 "limit of number of virtual segments per query %d",
				 conn->SegNumMin,
				 conn->VSegLimit);

		elog(WARNING, "ConnID %d. %s", conn->ConnID, errorbuf);
		return res;
	}

	/*--------------------------------------------------------------------------
	 * The vseg number per segment limit is valid only when required vseg num
	 * is a range containing more than one validate values. Generally, in case
	 * querying one hash distributed table, hash bucket number of vseg is
	 * required.
	 *--------------------------------------------------------------------------
	 */
	if (conn->StatNVSeg == 0 &&
		conn->MinSegCountFixed != conn->MaxSegCountFixed &&
		conn->SegNumMin > conn->VSegLimitPerSeg * PRESPOOL->AvailNodeCount )
	{
		res = RESQUEMGR_TOO_MANY_FIXED_SEGNUM;

		snprintf(errorbuf, errorbufsize,
				 "expected minimum number of virtual segments %d exceeds the "
				 "limit of number of virtual segments per query per segment %d "
				 "in cluster having %d available segments",
				 conn->SegNumMin,
				 conn->VSegLimitPerSeg,
				 PRESPOOL->AvailNodeCount);

		elog(WARNING, "ConnID %d. %s", conn->ConnID, errorbuf);
		return res;
	}
	return FUNC_RETURN_OK;
}

/* Implementation of even resource allocation. */
int computeQueryQuota_EVEN(DynResourceQueueTrack	track,
						   int32_t			   	   *segnum,
						   int32_t			   	   *segnummin,
						   int32_t					segnumlimit,
						   char				   	   *errorbuf,
						   int						errorbufsize)
{
	DynResourceQueue queue = track->QueueInfo;

	/* Decide one connection should have how many virtual segments reserved. */
	int reservsegnum = trunc(track->ClusterSegNumber / queue->ParallelCount);
	reservsegnum = reservsegnum <= 0 ? 1 : reservsegnum;

	*segnum = track->ClusterSegNumberMax;
	*segnum = segnumlimit < *segnum ? segnumlimit : *segnum;

	*segnummin = reservsegnum;
	*segnummin = *segnummin > *segnum ? *segnum : *segnummin;

	Assert( *segnummin >= 0 && *segnummin <= *segnum );
	return FUNC_RETURN_OK;
}

int addQueryResourceRequestToQueue(DynResourceQueueTrack queuetrack,
								   ConnectionTrack		 conntrack)
{
	DynResourceQueueTrack queue = queuetrack->ShadowQueueTrack == NULL ?
								  queuetrack :
								  queuetrack->ShadowQueueTrack;

	insertDQueueTailNode(&(queue->QueryResRequests), conntrack);

	/* Add resource request counter. */
	addResourceBundleData(&(queue->TotalRequest),
						  conntrack->SegMemoryMB * conntrack->SegNum,
						  conntrack->SegCore * conntrack->SegNum);

	/*
	 * Set session tracker and make its corresponding session in-use resource
	 * locked.
	 */
	createAndLockSessionResource(&(queue->DLDetector), conntrack->SessionID);

	/* The following logic is triggered only when it is not in a shadow. */
	if ( queue == queuetrack )
	{
		if ( queue->DLDetector.LockedTotal.MemoryMB > 0 )
		{
			PQUEMGR->ForcedReturnGRMContainerCount = 0;
			elog(LOG, "Locking resource and stop forced GRM container breathe out.");
		}

		/*
		 * If this causes the queue to be busy, refresh the limits and weights
		 * of each memory/core ratio tracker.
		 */
		if ( !queue->isBusy )
		{
			queue->isBusy = true;
			refreshMemoryCoreRatioLimits();
			refreshMemoryCoreRatioWaterMark();
		}
		PQUEMGR->toRunQueryDispatch = true;
	}
	return FUNC_RETURN_OK;
}

/*
 * Update the overall resource queue percentage capacity.
 */
void refreshResourceQueuePercentageCapacityInternal(uint32_t clustermemmb,
													uint32_t clustercore,
													bool	 queuechanged)
{
	static uint32_t prevclustermemmb = 0;
	static uint32_t prevclustercore  = 0;

	if ( (!queuechanged) &&
		 (prevclustermemmb == clustermemmb && prevclustercore  == clustercore) )
	{
		elog(DEBUG3, "Resource manager skips updating resource queue capacities "
					 "because the total resource quota does not change.");
		return;
	}

	/*
	 * STEP 1. Decide the limit ranges of memory and core, decide the
	 * 		   memory/core ratio.
	 */
	ListCell *cell = NULL;
	foreach(cell, PQUEMGR->Queues)
	{
		DynResourceQueueTrack track     = lfirst(cell);
		DynResourceQueueTrack origtrack = track;

		/* If this resource queue track has a shadow, the shadow is updated. */
		track = track->ShadowQueueTrack == NULL ? track : track->ShadowQueueTrack;

		if ( RESQUEUE_IS_PERCENT(track->QueueInfo) &&
			 RESQUEUE_IS_LEAF(track->QueueInfo) )
		{
			track->ClusterMemoryActPer = track->QueueInfo->ClusterMemoryPer;
			track->ClusterVCoreActPer  = track->QueueInfo->ClusterVCorePer;

			/* If track references a shadow, we should find back the real parent. */
			DynResourceQueueTrack ptrack =
				(track->ParentTrack == NULL && list_length(track->ChildrenTracks) == 0 )?
				track->ShadowQueueTrack->ParentTrack :
				track->ParentTrack;

			while( ptrack != NULL )
			{
				/*
				 * If the queue track has a shadow, we should calculate based on
				 * the data in the shadow instance.
				 */
				DynResourceQueueTrack ptrackcalc = ptrack->ShadowQueueTrack == NULL ?
												   ptrack :
												   ptrack->ShadowQueueTrack;

				if ( !RESQUEUE_IS_PERCENT(ptrackcalc->QueueInfo) )
				{
					break;
				}

				track->ClusterMemoryActPer = track->ClusterMemoryActPer *
											 ptrackcalc->QueueInfo->ClusterMemoryPer /
											 100;
				track->ClusterVCoreActPer  = track->ClusterVCoreActPer *
											 ptrackcalc->QueueInfo->ClusterVCorePer /
											 100;

				ptrack = ptrack->ParentTrack;
			}

			track->ClusterMemoryMaxPer = track->ClusterMemoryActPer *
										 track->QueueInfo->ResourceOvercommit;
			track->ClusterMemoryMaxPer = track->ClusterMemoryMaxPer > 100 ?
										 100.0 :
										 track->ClusterMemoryMaxPer;
			track->ClusterVCoreMaxPer = track->ClusterVCoreActPer *
										track->QueueInfo->ResourceOvercommit;
			track->ClusterVCoreMaxPer = track->ClusterVCoreMaxPer > 100 ?
										100.0 :
										track->ClusterVCoreMaxPer;

			uint32_t tmpratio = 0;

			/*
			 * All the queues from the root to this queue are expressed by
			 * percentage, and the memory limit has the same limit with core
			 * limit.
			 */
			Assert( ptrack == NULL );
			Assert( track->ClusterMemoryActPer == track->ClusterVCoreActPer );

			tmpratio = clustermemmb / clustercore;
			track->QueueInfo->ClusterMemoryMB =
				1.0 * clustermemmb * track->ClusterMemoryActPer / 100;
			track->QueueInfo->ClusterVCore    =
				1.0 * clustercore * track->ClusterVCoreActPer / 100;

			track->ClusterMemoryMaxMB =
				1.0 * clustermemmb * track->ClusterMemoryMaxPer / 100;
			track->ClusterVCoreMax =
				1.0 * clustercore * track->ClusterVCoreMaxPer / 100;

			if ( tmpratio != track->MemCoreRatio && track->trackedMemCoreRatio )
			{
				removeResourceQueueRatio(track);
			}

			if ( !track->trackedMemCoreRatio )
			{
				track->MemCoreRatio = tmpratio;
				addResourceQueueRatio(origtrack);
			}
		}
	}

	/*
	 * STEP 2. Decide the maximum limit of memory and core of each leaf queue.
	 * 		   That means for each leaf queue, its maximum memory limit adding
	 * 		   all other leaf queues' minimum limits can not exceed cluster
	 * 		   capacity. Maximum core limit follows the same logic.
	 */
	cell = NULL;
	foreach(cell, PQUEMGR->Queues)
	{
		DynResourceQueueTrack track = lfirst(cell);

		track = track->ShadowQueueTrack == NULL ? track : track->ShadowQueueTrack;

		if ( !(RESQUEUE_IS_LEAF(track->QueueInfo)) )
		{
			continue;
		}

		/* Follows the memory/core ratio to adjust the maximum limits. */
		if ( track->ClusterMemoryMaxMB / track->ClusterVCoreMax > track->MemCoreRatio )
		{
			track->ClusterMemoryMaxMB = track->ClusterVCoreMax * track->MemCoreRatio;
		}
		else
		{
			track->ClusterVCoreMax = 1.0 * track->ClusterMemoryMaxMB / track->MemCoreRatio;
		}

		/* Decide cluster segment resource quota. */
		track->QueueInfo->SegResourceQuotaMemoryMB =
				track->QueueInfo->SegResourceQuotaMemoryMB == -1 ?
				track->QueueInfo->SegResourceQuotaVCore * track->MemCoreRatio :
				track->QueueInfo->SegResourceQuotaMemoryMB;
		track->QueueInfo->SegResourceQuotaVCore =
				track->QueueInfo->SegResourceQuotaVCore == -1 ?
				1.0 * track->QueueInfo->SegResourceQuotaMemoryMB / track->MemCoreRatio :
				track->QueueInfo->SegResourceQuotaVCore;

		/* Decide the cluster segment number quota. */
		track->ClusterSegNumber = trunc(track->QueueInfo->ClusterMemoryMB /
										track->QueueInfo->SegResourceQuotaMemoryMB);

		track->ClusterSegNumberMax = trunc(track->ClusterMemoryMaxMB /
										   track->QueueInfo->SegResourceQuotaMemoryMB);

		Assert( track->ClusterSegNumber <= track->ClusterSegNumberMax );

		elog(DEBUG3, "Resource manager refreshed resource queue capacity : %s "
				  	 "(%d MB, %lf CORE) x %d. MAX %d. FACTOR:%lf",
					 track->QueueInfo->Name,
					 track->QueueInfo->SegResourceQuotaMemoryMB,
					 track->QueueInfo->SegResourceQuotaVCore,
					 track->ClusterSegNumber,
					 track->ClusterSegNumberMax,
					 track->QueueInfo->ResourceOvercommit);
	}
}

void refreshMemoryCoreRatioLimits(void)
{
	for ( int i = 0 ; i < PQUEMGR->RatioCount ; ++i ) {

		PQUEMGR->RatioTrackers[i]->ClusterMemoryMaxMB = 0;
		PQUEMGR->RatioTrackers[i]->ClusterVCoreMax    = 0;
		PQUEMGR->RatioTrackers[i]->ClusterMemory	  = 0;
		PQUEMGR->RatioTrackers[i]->ClusterVCore		  = 0;

		ListCell *cell = NULL;
		foreach(cell, PQUEMGR->RatioTrackers[i]->QueueTrackers)
		{
			DynResourceQueueTrack track = lfirst(cell);

			/* We calculate only the queues having connections, which means
			 * potential resource request. */
			if ( !track->isBusy )
			{
				continue;
			}

			PQUEMGR->RatioTrackers[i]->ClusterMemory += track->QueueInfo->ClusterMemoryMB;
			PQUEMGR->RatioTrackers[i]->ClusterVCore  += track->QueueInfo->ClusterVCore;
			PQUEMGR->RatioTrackers[i]->ClusterMemoryMaxMB += track->ClusterMemoryMaxMB;
			PQUEMGR->RatioTrackers[i]->ClusterVCoreMax    += track->ClusterVCoreMax;
		}

		elog(DEBUG3, "Limit of memory/core ratio[%d] %d MBPCORE "
					 "is (%d MB, %lf CORE) maximum (%d MB, %lf CORE).",
					 i,
					 PQUEMGR->RatioTrackers[i]->MemCoreRatio,
					 PQUEMGR->RatioTrackers[i]->ClusterMemory,
					 PQUEMGR->RatioTrackers[i]->ClusterVCore,
					 PQUEMGR->RatioTrackers[i]->ClusterMemoryMaxMB,
					 PQUEMGR->RatioTrackers[i]->ClusterVCoreMax);
	}
}

/* TODO: Not useful yet. */
void refreshMemoryCoreRatioWaterMark(void)
{
	double totalweightmem = 0;
	double totalweightcore = 0;
	for ( int i = 0 ; i < PQUEMGR->RatioCount ; ++i ) {
		totalweightmem  += PQUEMGR->RatioTrackers[i]->ClusterMemory;
		totalweightcore += PQUEMGR->RatioTrackers[i]->ClusterVCore;
	}

	double overcommitmem  = 1;
	double overcommitcore = 1;
	double overcommit     = 1;
	if ( DRMGlobalInstance->ImpType == YARN_LIBYARN ) {
		overcommitmem  = totalweightmem  / PRESPOOL->GRMTotal.MemoryMB;
		overcommitcore = totalweightcore / PRESPOOL->GRMTotal.Core;
	}
	else if ( DRMGlobalInstance->ImpType == NONE_HAWQ2 ) {
		overcommitmem  = totalweightmem  / PRESPOOL->FTSTotal.MemoryMB;
		overcommitcore = totalweightcore / PRESPOOL->FTSTotal.Core;
	}
	else {
		Assert(false);
	}

	overcommit = overcommitmem > overcommitcore ? overcommitmem : overcommitcore;
	overcommit = overcommit > 1 ? overcommit : 1;
	for ( int i = 0 ; i < PQUEMGR->RatioCount ; ++i ) {
		PQUEMGR->RatioTrackers[i]->ClusterWeightMarker =
			PQUEMGR->RatioTrackers[i]->ClusterMemoryMaxMB / overcommit;
		elog(DEBUG5, "HAWQ RM :: Weight balance marker of memory/core ratio "
					 "[%d] %d MBPCORE is %lf MB with overcommit %lf",
					 i,
					 PQUEMGR->RatioTrackers[i]->MemCoreRatio,
					 PQUEMGR->RatioTrackers[i]->ClusterWeightMarker,
					 overcommit);
	}
}

void dispatchResourceToQueriesInOneQueue(DynResourceQueueTrack track)
{
	int			policy			= 0;
	Assert( track != NULL );

	elog(DEBUG3, "Resource manager dispatch resource in queue %s",
				 track->QueueInfo->Name);

	if ( track->QueryResRequests.NodeCount > 0 )
	{
		ConnectionTrack topwaiter = getDQueueHeadNodeData(&(track->QueryResRequests));
		if ( topwaiter->HeadQueueTime == 0 )
		{
			topwaiter->HeadQueueTime = gettime_microsec();
			elog(DEBUG3, "Set timestamp of waiting at head of queue.");
		}
	}

	policy = track->QueueInfo->AllocatePolicy;
	Assert( policy >= 0 && policy < RSQ_ALLOCATION_POLICY_COUNT );
	DispatchPolicy[policy] (track);

	/* Check if the queue has resource fragment problem. */
	track->troubledByFragment = false;
	if ( track->QueryResRequests.NodeCount > 0 )
	{
		ConnectionTrack topwaiter = getDQueueHeadNodeData(&(track->QueryResRequests));
		track->troubledByFragment = topwaiter->troubledByFragment;
	}
}

int addNewResourceToResourceManagerByBundle(ResourceBundle bundle)
{
	return addNewResourceToResourceManager(bundle->MemoryMB, bundle->Core);
}

int addNewResourceToResourceManager(int32_t memorymb, double core)
{
	elog(RMLOG, "addNewResourceToResourceManager input (%d MB, %lf CORE)",
			    memorymb,
				core);

	if ( memorymb == 0 && core == 0 ) {
		return FUNC_RETURN_OK;
	}
	Assert( memorymb != 0 && core != 0 );

	/* Expect integer cores to add. */
	Assert( trunc(core) == core );
	uint32_t ratio = trunc(1.0 * memorymb / core);
	int32_t  ratioindex = getResourceQueueRatioIndex(ratio);

	if ( ratioindex >= 0 )
	{
		elog(RMLOG, "Allocated resource for ratio %d (%d MB, %lf CORE) plus "
					"(%d MB, %lf CORE)",
					ratio,
					PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated.MemoryMB,
					PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated.Core,
					memorymb,
					core);

		addResourceBundleData(&(PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated),
							  memorymb,
							  core);

		elog(RMLOG, "Current allocated resource for ratio %d (%d MB, %lf CORE)",
					ratio,
					PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated.MemoryMB,
					PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated.Core);

		/* New resource is added. Try to dispatch resource to queries. */
		PQUEMGR->toRunQueryDispatch = true;
		return FUNC_RETURN_OK;
	}
	else
	{
		return RESOURCEPOOL_NO_RATIO;
	}
}

int minusResourceFromResourceManagerByBundle(ResourceBundle bundle)
{
	return minusResourceFromResourceManager(bundle->MemoryMB, bundle->Core);
}

int minusResourceFromResourceManager(int32_t memorymb, double core)
{
	elog(RMLOG, "minusResourceFromResourceManager input (%d MB, %lf CORE)",
			    memorymb,
				core);

	if ( memorymb == 0 && core ==0 )
	{
		return FUNC_RETURN_OK;
	}

	/* Expect integer cores to add. */
	Assert( trunc(core) == core );
	uint32_t ratio = trunc(1.0 * memorymb / core);
	int32_t  ratioindex = getResourceQueueRatioIndex(ratio);

	if ( ratioindex >= 0 )
	{
		elog(RMLOG, "Allocated resource for ratio %d (%d MB, %lf CORE) minus "
					"(%d MB, %lf CORE)",
					ratio,
					PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated.MemoryMB,
					PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated.Core,
					memorymb,
					core);

		minusResourceBundleData(&(PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated),
								memorymb,
								core);

		elog(RMLOG, "Current allocated resource for ratio %d (%d MB, %lf CORE)",
					ratio,
					PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated.MemoryMB,
					PQUEMGR->RatioTrackers[ratioindex]->TotalAllocated.Core);
		return FUNC_RETURN_OK;
	}
	else
	{
		return RESOURCEPOOL_NO_RATIO;
	}
}

void returnAllocatedResourceToLeafQueue(DynResourceQueueTrack track,
										int32_t			  	  memorymb,
										double				  core)
{
	minusResourceBundleData(&(track->TotalUsed), memorymb, core);

	elog(DEBUG3, "Return resource to queue %s (%d MB, %lf CORE).",
			  track->QueueInfo->Name,
			  memorymb, core);
}

void removePendingResourceRequestInRootQueue(int32_t 	memorymb,
											 uint32_t 	core,
											 bool 		updatependingtime)
{
	if ( memorymb ==0 && core == 0 )
		return;
	Assert(memorymb > 0 && core > 0);

	uint32_t ratio 	    = memorymb / core;
	int32_t  ratioindex = 0;

	/* Get ratio index. */
	PAIR ratiopair = getHASHTABLENode(&(PQUEMGR->RatioIndex),
									  TYPCONVERT(void *, ratio));
	Assert( ratiopair != NULL );
	ratioindex = TYPCONVERT(int, ratiopair->Value);

	/* Add resource quota to free resource statistics. */
	minusResourceBundleData(&(PQUEMGR->RatioTrackers[ratioindex]->TotalPending),
							memorymb,
							core);
	Assert(PQUEMGR->RatioTrackers[ratioindex]->TotalPending.MemoryMB >= 0 &&
		   PQUEMGR->RatioTrackers[ratioindex]->TotalPending.Core >= 0);

	if ( updatependingtime )
	{
		if ( PQUEMGR->RatioTrackers[ratioindex]->TotalPending.MemoryMB == 0 &&
			 PQUEMGR->RatioTrackers[ratioindex]->TotalPending.Core == 0 )
		{
			PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = 0;
			elog(DEBUG3, "Global resource total pending start time is updated to "UINT64_FORMAT,
						 PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime);
		}
		else if ( memorymb > 0 && core > 0 )
		{
			PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime = gettime_microsec();
			elog(DEBUG3, "Global resource total pending start time is updated to "UINT64_FORMAT,
						 PQUEMGR->RatioTrackers[ratioindex]->TotalPendingStartTime);
		}
	}

	elog(LOG, "Removed pending GRM request from root resource queue by "
			  "(%d MB, %lf CORE) to (%d MB, %lf CORE)",
			  memorymb,
			  core * 1.0,
			  PQUEMGR->RatioTrackers[ratioindex]->TotalPending.MemoryMB,
			  PQUEMGR->RatioTrackers[ratioindex]->TotalPending.Core);
}


void clearPendingResourceRequestInRootQueue(void)
{
	for ( int i = 0 ; i < PQUEMGR->RatioCount ; ++i )
	{
		if ( PQUEMGR->RatioTrackers[i]->TotalPending.MemoryMB > 0 )
		{
			removePendingResourceRequestInRootQueue(
					PQUEMGR->RatioTrackers[i]->TotalPending.MemoryMB,
					PQUEMGR->RatioTrackers[i]->TotalPending.Core,
					true);
		}
	}
}

/*
 * Dispatching allocated resource to queuing queries.
 */
int dispatchResourceToQueries_EVEN(DynResourceQueueTrack track)
{
	/* Check how many segments are available to dispatch. */
	int availsegnum = trunc((track->TotalAllocated.MemoryMB -
							 track->TotalUsed.MemoryMB) /
					  	  	track->QueueInfo->SegResourceQuotaMemoryMB);
	int counter = 0;
	int segcounter = 0;
	int segmincounter = 0;

	elog(DEBUG3, "Resource queue %s expects full parallel count %d, "
				 "current running count %d.",
				 track->QueueInfo->Name,
				 track->QueueInfo->ParallelCount,
				 track->NumOfRunningQueries);

	DQUEUE_LOOP_BEGIN(&(track->QueryResRequests), iter, ConnectionTrack, conntrack)
		/* Consider concurrency no more than defined parallel count. */
		/* TODO: Consider more here... */
		if ( counter + track->NumOfRunningQueries >= track->QueueInfo->ParallelCount )
		{
			elog(RMLOG, "Parallel count limit is encountered, to run %d more",
						counter);
			break;
		}

		int equalsegnummin = conntrack->StatNVSeg <= 0 ?
							 conntrack->SegNumMin :
							 conntrack->SegNumEqual;

		/* Check if the minimum segment requirement is met. */
		if ( segmincounter + equalsegnummin > availsegnum )
		{
			elog(RMLOG, "Resource allocated is up, available vseg num %d, "
						"to run %d more",
						availsegnum,
						counter);
			break;
		}

		segcounter += conntrack->StatNVSeg <= 0 ?
					  conntrack->SegNum :
					  conntrack->SegNumEqual;

		segmincounter += conntrack->StatNVSeg <= 0 ?
						 conntrack->SegNumMin :
						 conntrack->SegNumEqual;
		counter++;
	DQUEUE_LOOP_END

	if ( counter == 0 )
	{
		detectAndDealWithDeadLock(track);
		return FUNC_RETURN_OK; /* Expect requests are processed in next loop. */
	}

	/* Dispatch segments */
	DQueueData todisp;
	initializeDQueue(&todisp, PCONTEXT);
	for ( int i = 0 ; i < counter ; ++i )
	{
		ConnectionTrack conn = removeDQueueHeadNode(&(track->QueryResRequests));
		conn->SegNumActual = conn->SegNumMin;
		insertDQueueTailNode(&todisp, conn);
		availsegnum -= conn->StatNVSeg <= 0 ? conn->SegNumMin : conn->SegNumEqual;
	}

	DQueueNode pnode = getDQueueContainerHead(&todisp);
	int fullcount = 0;
	while(availsegnum > 0)
	{
		ConnectionTrack conn = (ConnectionTrack)(pnode->Data);
		if ( conn->StatNVSeg == 0 && conn->SegNum > conn->SegNumActual )
		{
			conn->SegNumActual++;
			availsegnum--;
			fullcount=0;
		}
		else
		{
			fullcount++;
		}
		if ( fullcount == counter )
		{
			break;
		}
		pnode = pnode->Next == NULL ? getDQueueContainerHead(&todisp) : pnode->Next;
	}

	/* Actually allocate segments from hosts in resource pool and send response.*/
	for ( int processidx = 0 ; processidx < counter ; ++processidx )
	{
		ConnectionTrack conn = removeDQueueHeadNode(&todisp);
		elog(DEBUG3, "Resource manager tries to dispatch resource to connection %d. "
		   		  	 "Expect (%d MB, %lf CORE) x %d(max %d min %d) segment(s). "
		   		  	 "Original vseg %d(min %d). "
		   		  	 "VSeg limit per segment %d VSeg limit per query %d.",
					 conn->ConnID,
				     conn->SegMemoryMB,
					 conn->SegCore,
					 conn->SegNumActual,
					 conn->SegNum,
					 conn->SegNumMin,
					 conn->MaxSegCountFixed,
					 conn->MinSegCountFixed,
					 conn->VSegLimitPerSeg,
					 conn->VSegLimit);

		if ( conn->StatNVSeg > 0 )
		{
			elog(LOG, "Resource manager tries to dispatch resource to connection %d. "
					  "Statement level resource quota is active. "
					  "Total %d vsegs, each vseg has %d MB memory quota.",
					  conn->ConnID,
					  conn->StatNVSeg,
					  conn->StatVSegMemoryMB);
		}

		/* Build resource. */
		int32_t segnumact = 0;
		allocateResourceFromResourcePool(conn->SegNumActual,
										 conn->SegNumMin,
										 conn->SegMemoryMB,
										 conn->SegCore,
										 conn->IOBytes,
										 conn->SliceSize,
										 conn->VSegLimitPerSeg,
										 conn->SegPreferredHostCount,
										 conn->SegPreferredHostNames,
										 conn->SegPreferredScanSizeMB,
										 /* If the segment count is fixed. */
										 conn->MinSegCountFixed == conn->MaxSegCountFixed,
										 &(conn->Resource),
										 &segnumact,
										 &(conn->SegIOBytes));

		RESOURCEPROBLEM accepted = isResourceAcceptable(conn, segnumact);
		if ( accepted == RESPROBLEM_NO )
		{
			elog(DEBUG3, "Resource manager dispatched %d segment(s) to connection %d",
						 segnumact,
						 conn->ConnID);
			conn->SegNumActual = segnumact;

			/* Mark resource used in resource queue. */
			addResourceBundleData(&(track->TotalUsed),
								  conn->SegMemoryMB * conn->SegNumActual,
								  conn->SegCore     * conn->SegNumActual);
			minusResourceBundleData(&(track->TotalRequest),
									conn->SegMemoryMB * conn->SegNum,
									conn->SegCore     * conn->SegNum);
			track->NumOfRunningQueries++;

			/* Unlock and update session resource usage. */
			unlockSessionResource(&(track->DLDetector), conn->SessionID);
			addSessionInUseResource(&(track->DLDetector),
									conn->SessionID,
									conn->SegMemoryMB * conn->SegNumActual,
									conn->SegCore     * conn->SegNumActual);

			/* Transform the connection track status */
			transformConnectionTrackProgress(conn, CONN_PP_RESOURCE_QUEUE_ALLOC_DONE);

			/* Build response message and send it out. */
			buildAcquireResourceResponseMessage(conn);
		}
		else
		{
			PQUEMGR->hasResourceProblem[accepted] = true;
			/*
			 * In case we have 0 segments allocated. This may occur because we
			 * have too many resource small pieces. In this case, we treat the
			 * resource allocation failed, and HAWQ RM tries this connection
			 * again later. We expect some other connections return the resource
			 * back later or some more resource allocated from GRM.
		     */
			elog(WARNING, "HAWQ RM :: Can not find enough number of hosts "
						  "containing sufficient resource for the connection %d.",
						  conn->ConnID);
			elog(WARNING, "HAWQ RM :: Found %d vsegments allocated", segnumact);
			if ( segnumact > 0 )
			{
				Assert(!conn->isOld);
				returnResourceToResourcePool(conn->SegMemoryMB,
											 conn->SegCore,
											 conn->SegIOBytes,
											 conn->SliceSize,
											 &(conn->Resource),
											 conn->isOld);
			}

			/* Mark the request has resource fragment problem. */
			if ( !conn->troubledByFragment && accepted == RESPROBLEM_FRAGMENT )
			{
				conn->troubledByFragmentTimestamp = gettime_microsec();
				conn->troubledByFragment 		  = true;

				elog(LOG, "Resource fragment problem is probably encountered. "
						  "Session "INT64_FORMAT" expects minimum %d virtual segments.",
						  conn->SessionID,
						  conn->SegNumMin);
			}

			/* Decide whether continue to process next query request. */
			if ( rm_force_fifo_queue )
			{
				insertDQueueHeadNode(&todisp, conn);
				break;
			}
			else
			{
				insertDQueueTailNode(&todisp, conn);
			}
		}
	}

	/* Return the request not completed yet. */
	while( todisp.NodeCount > 0 ) {
		ConnectionTrack conn = (ConnectionTrack)(removeDQueueTailNode(&todisp));
		insertDQueueHeadNode(&(track->QueryResRequests), (void *)conn);
	}
	cleanDQueue(&todisp);

	return FUNC_RETURN_OK;
}

RESOURCEPROBLEM isResourceAcceptable(ConnectionTrack conn, int segnumact)
{
	/*--------------------------------------------------------------------------
	 * Enough number of vsegments. If resource queue has enough quota, but
	 * resource pool does not provide enough virtual segments allocated, we
	 * consider this a resource fragment problem.
	 *--------------------------------------------------------------------------
	 */
	if ( segnumact < conn->SegNumMin )
	{
		return RESPROBLEM_FRAGMENT;
	}

	/*
	 *--------------------------------------------------------------------------
	 * Spread wide enough. If there is at least one segment containing 2 or more
	 * vsegments, the number of segments should be not be too small, i.e. the
	 * vsegments should not be assigned in a few segments.
	 *--------------------------------------------------------------------------
	 */
	if ( segnumact > list_length(conn->Resource) )
	{
		int limit = ceil(PRESPOOL->SlavesHostCount * rm_tolerate_nseg_limit);
		if ( PRESPOOL->SlavesHostCount - limit > list_length(conn->Resource) )
		{
			elog(WARNING, "Find virtual segments are dispatched to %d segments in "
						  "the cluster containing %d segments defined in slaves file. "
						  "The number of excluded segments are more than %d.",
						  list_length(conn->Resource),
						  PRESPOOL->SlavesHostCount,
						  limit);
			return RESPROBLEM_TOOFEWSEG;
		}
	}

	/*
	 *--------------------------------------------------------------------------
	 * Spread even enough. If the size of vsegments in each segment varies too
	 * much, the allocation result is not accepted.
	 *--------------------------------------------------------------------------
	 */
	if ( segnumact > list_length(conn->Resource) )
	{
		int minval = segnumact;
		int maxval = 0;
		ListCell *cell = NULL;
		foreach(cell, conn->Resource)
		{
			VSegmentCounterInternal vsegcnt = lfirst(cell);
			minval = minval < vsegcnt->VSegmentCount ? minval : vsegcnt->VSegmentCount;
			maxval = maxval > vsegcnt->VSegmentCount ? maxval : vsegcnt->VSegmentCount;
		}
		if ( rm_nvseg_variance_among_seg_limit < maxval - minval )
		{
			elog(WARNING, "Find virtual segments are not evenly dispatched to segments, "
						  "maximum virtual segment size is %d, "
						  "minimum virtual segment size is %d.",
						  maxval,
						  minval);
			return RESPROBLEM_UNEVEN;
		}
	}
	return RESPROBLEM_NO;
}

void buildAcquireResourceResponseMessage(ConnectionTrack conn)
{
	ListCell *cell = NULL;

	Assert( conn != NULL );
	resetSelfMaintainBuffer(&(conn->MessageBuff));

	/* Set message head. */
	RPCResponseHeadAcquireResourceFromRMData response;
	response.Result 	  			= FUNC_RETURN_OK;
	response.Reserved1	  			= 0;
	response.SegCount	  			= conn->SegNumActual;
	response.SegMemoryMB  			= conn->SegMemoryMB;
	response.SegCore	  			= conn->SegCore;
	response.HostCount				= list_length(conn->Resource);
	response.Reserved2				= 0;
	appendSMBVar(&(conn->MessageBuff), response);

	/* Append HDFS host name index values. */
	uint32_t hdfsidxsize = __SIZE_ALIGN64(sizeof(uint32_t) * conn->SegNumActual);
	prepareSelfMaintainBuffer(&(conn->MessageBuff), hdfsidxsize, true);
	uint32_t *indexarray = (uint32_t *)getSMBCursor(&(conn->MessageBuff));

	int segi = 0;
	foreach(cell, conn->Resource)
	{
		VSegmentCounterInternal vsegcnt = (VSegmentCounterInternal)lfirst(cell);
		for ( int i = 0 ; i < vsegcnt->VSegmentCount ; ++i )
		{
			indexarray[segi] = vsegcnt->HDFSNameIndex;
			segi++;
		}
	}
	jumpforwardSelfMaintainBuffer(&(conn->MessageBuff), hdfsidxsize);

	/* Prepare machine id information. */
	uint32_t messagecursize = getSMBContentSize(&(conn->MessageBuff));
	uint32_t hoffsetsize    = __SIZE_ALIGN64(sizeof(uint32_t) * conn->SegNumActual);
	uint32_t *hoffsetarray  = (uint32_t *)rm_palloc0(PCONTEXT, hoffsetsize);

	/* Temporary buffer containing all distinct machineid instances. */
	SelfMaintainBufferData machineids;
	initializeSelfMaintainBuffer(&machineids, PCONTEXT);

	segi = 0;
	foreach(cell, conn->Resource)
	{
		VSegmentCounterInternal vsegcnt = (VSegmentCounterInternal)lfirst(cell);
		/* Set host offset. */
		for ( int i = 0 ; i < vsegcnt->VSegmentCount ; ++i ) {
			hoffsetarray[segi] = messagecursize +
							     hoffsetsize +
								 getSMBContentSize(&(machineids));
			segi++;
		}

		/* Append machine id. */
		appendSelfMaintainBuffer(&machineids,
								 (char *)(&(vsegcnt->Resource->Stat->Info)),
								 vsegcnt->Resource->Stat->Info.Size);

		elog(DEBUG3, "Resource manager added machine %s:%d containing %d segment(s) "
					 "in response of acquiring resource.",
					 GET_SEGRESOURCE_HOSTNAME(vsegcnt->Resource),
					 vsegcnt->Resource->Stat->Info.port,
					 vsegcnt->VSegmentCount);
	}

	/* Build complete message. */
	appendSelfMaintainBuffer(&(conn->MessageBuff),
							 (char *)hoffsetarray,
							 hoffsetsize);
	appendSelfMaintainBuffer(&(conn->MessageBuff),
							 machineids.Buffer,
							 machineids.Cursor + 1);

	conn->MessageSize  = conn->MessageBuff.Cursor + 1;
	conn->MessageID    = RESPONSE_QD_ACQUIRE_RESOURCE;
	conn->ResAllocTime = gettime_microsec();

	elog(LOG, "Latency of getting resource allocated is "UINT64_FORMAT "us",
			  conn->ResAllocTime - conn->ResRequestTime);

	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conn);
	MEMORY_CONTEXT_SWITCH_BACK

	/* Clean up temporary variables. */
	destroySelfMaintainBuffer(&machineids);
	rm_pfree(PCONTEXT, hoffsetarray);
}

void detectAndDealWithDeadLock(DynResourceQueueTrack track)
{
	static char errorbuf[ERRORMESSAGE_SIZE];
	uint32_t availmemorymb = track->ClusterMemoryMaxMB -
						     track->DLDetector.LockedTotal.MemoryMB;
	double   availcore     = track->ClusterVCoreMax -
						     track->DLDetector.LockedTotal.Core;

	ConnectionTrack firstreq = (ConnectionTrack)
							   getDQueueHeadNodeData(&(track->QueryResRequests));
	if ( firstreq == NULL )
	{
		return;
	}

	uint32_t expmemorymb = firstreq->SegMemoryMB * firstreq->SegNumMin;
	double   expcore     = firstreq->SegCore     * firstreq->SegNumMin;

	if ( expmemorymb > track->ClusterMemoryMaxMB &&
		 expcore     > track->ClusterVCoreMax )
	{
		/* It is impossible to satisfy the request by checking deadlock. */
		return;
	}

	while((availmemorymb < expmemorymb || availcore < expcore) &&
		  track->QueryResRequests.NodeCount > 0 )
	{
		DQueueNode tail = getDQueueContainerTail(&(track->QueryResRequests));
		SessionTrack strack = NULL;
		while(tail != NULL)
		{
			strack = findSession(&(track->DLDetector),
								 ((ConnectionTrack)(tail->Data))->SessionID);
			if ( strack != NULL && strack->InUseTotal.MemoryMB > 0 )
			{
				break;
			}
			tail = tail->Prev;
		}
		if ( tail != NULL )
		{
			ConnectionTrack canceltrack = (ConnectionTrack)
										  removeDQueueNode(&(track->QueryResRequests),
												  	  	   tail);

			snprintf(errorbuf, sizeof(errorbuf),
					 "session "INT64_FORMAT" deadlock is detected",
					 canceltrack->SessionID);

			Assert(canceltrack != NULL);
			availmemorymb += strack->InUseTotal.MemoryMB;
			availcore     += strack->InUseTotal.Core;

			/* Unlock the resource. */
			unlockSessionResource(&(track->DLDetector), canceltrack->SessionID);

			/* Cancel this request. */
			buildAcquireResourceErrorResponse(canceltrack,
											  RESQUEMGR_DEADLOCK_DETECTED,
											  errorbuf);

			transformConnectionTrackProgress(canceltrack,
											 CONN_PP_RESOURCE_QUEUE_ALLOC_FAIL);

			canceltrack->ResponseSent = false;
			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
			PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, canceltrack);
			MEMORY_CONTEXT_SWITCH_BACK

			/* Recycle connection track instance. */
			Assert(track->CurConnCounter > 0);

			elog(DEBUG3, "Resource queue %s has %d connections before removing "
						 "deadlocked connection ConnID %d.",
						 track->QueueInfo->Name,
						 track->CurConnCounter,
						 canceltrack->ConnID);

			track->CurConnCounter--;
			if ( track->CurConnCounter == 0 )
			{
				elog(RMLOG, "Resource queue %s becomes idle after deadlock checking.",
							track->QueueInfo->Name);
				track->isBusy = false;
				refreshMemoryCoreRatioLimits();
				refreshMemoryCoreRatioWaterMark();
			}
		}
	}
}

void timeoutDeadResourceAllocation(void)
{
	static char errorbuf[ERRORMESSAGE_SIZE];
	uint64_t curmsec = gettime_microsec();

	if ( curmsec - PQUEMGR->LastCheckingDeadAllocationTime <
		 1000000LL * rm_request_timeoutcheck_interval )
	{
		return;
	}

	/* Go through all current allocated connection tracks. */
	List     *allcons = NULL;
	ListCell *cell	  = NULL;

	getAllPAIRRefIntoList(&(PCONTRACK->Connections), &allcons);

	foreach(cell, allcons)
	{
		ConnectionTrack curcon = (ConnectionTrack)(((PAIR)lfirst(cell))->Value);

		switch(curcon->Progress)
		{

		case CONN_PP_RESOURCE_QUEUE_ALLOC_DONE:
		{
			elog(DEBUG5, "Find allocated resource that should check timeout. "
						 "ConnID %d",
						 curcon->ConnID);

			if ( curmsec - curcon->LastActTime >
				 1000000L * rm_session_lease_timeout )
			{
				elog(WARNING, "ConnID %d. The allocated resource timeout is "
							  "detected. Last action time is "UINT64_FORMAT
							  "msec ago.",
						      curcon->ConnID,
						      curmsec - curcon->LastActTime);
				returnResourceToResQueMgr(curcon);
				returnConnectionToQueue(curcon, true);
				if ( curcon->CommBuffer != NULL )
				{
					forceCloseFileDesc(curcon->CommBuffer);
				}
				else
				{
					returnConnectionTrack(curcon);
				}
			}
			break;
		}

		case CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT:
		{
			if ( curmsec - curcon->LastActTime >
				 1000000L * rm_session_lease_timeout )
			{
				elog(WARNING, "ConnID %d. The queued resource request timeout is "
						      "detected. Last action time is "UINT64_FORMAT
							  "msec ago",
						      curcon->ConnID,
						      curmsec - curcon->LastActTime);

				snprintf(errorbuf, sizeof(errorbuf),
						 "queued resource request is timed out due to no session "
						 "lease heart-beat received");

				cancelResourceAllocRequest(curcon, errorbuf, true);
				returnConnectionToQueue(curcon, true);
				if ( curcon->CommBuffer != NULL )
				{
					forceCloseFileDesc(curcon->CommBuffer);
				}
				else
				{
					returnConnectionTrack(curcon);
				}
			}
			break;
		}

		case CONN_PP_REGISTER_DONE:
		{
			if ( curmsec - curcon->LastActTime >
				 1000000L * rm_session_lease_timeout )
			{
				elog(WARNING, "The registered connection timeout is detected. "
						  	  "ConnID %d. Last action time is "UINT64_FORMAT
							  "msec ago",
							  curcon->ConnID,
							  curmsec - curcon->LastActTime);
				returnConnectionToQueue(curcon, true);
				if ( curcon->CommBuffer != NULL )
				{
					forceCloseFileDesc(curcon->CommBuffer);
				}
				else
				{
					returnConnectionTrack(curcon);
				}
			}
			break;
		}
		}

	}
	freePAIRRefList(&(PCONTRACK->Connections), &allcons);

	PQUEMGR->LastCheckingDeadAllocationTime = curmsec;
}

void timeoutQueuedRequest(void)
{
	static char errorbuf[ERRORMESSAGE_SIZE];
	uint64_t curmsec = gettime_microsec();

	if ( curmsec - PQUEMGR->LastCheckingQueuedTimeoutTime <
		 1000000LL * rm_request_timeoutcheck_interval )
	{
		return;
	}

	/* Go through all to be processed requests. */
	ConnectionTrack  ct    		= NULL;
	List			*tryagain	= NULL;

	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	while( list_length(PCONTRACK->ConnHavingRequests) > 0)
	{
		ct = (ConnectionTrack)lfirst(list_head(PCONTRACK->ConnHavingRequests));
		PCONTRACK->ConnHavingRequests = list_delete_first(PCONTRACK->ConnHavingRequests);

		/*
		 * Case 1. RM has no available cluster built yet, the request is not
		 * 		   added into resource queue manager queues.
		 */
		elog(DEBUG3, "Deferred connection track is found. "
					 " Request Time " UINT64_FORMAT
					 " Curr Time " UINT64_FORMAT
					 " Delta " UINT64_FORMAT,
					 ct->RequestTime,
					 curmsec,
					 curmsec - ct->RequestTime);

		if ( curmsec - ct->RequestTime > 1000000L * rm_resource_allocation_timeout )
		{
			snprintf(errorbuf, sizeof(errorbuf),
					 "resource request is timed out due to no available cluster");

			elog(WARNING, "ConnID %d. %s", ct->ConnID, errorbuf);

			/* Build timeout response. */
			transformConnectionTrackProgress(ct, CONN_PP_TIMEOUT_FAIL);
			buildAcquireResourceErrorResponseAndSend(ct,
												 	 RESQUEMGR_NOCLUSTER_TIMEOUT,
													 errorbuf);

		}
		else
		{
			tryagain = lappend(tryagain, ct);
		}
	}

	while( list_length(tryagain) > 0 )
	{
		void *move = lfirst(list_head(tryagain));
		tryagain = list_delete_first(tryagain);
		PCONTRACK->ConnHavingRequests = lappend(PCONTRACK->ConnHavingRequests, move);
	}

	/* Go through all current allocated connection tracks. */
	curmsec = gettime_microsec();

	List     *allcons = NULL;
	ListCell *cell	  = NULL;

	getAllPAIRRefIntoList(&(PCONTRACK->Connections), &allcons);
	foreach(cell, allcons)
	{
		ConnectionTrack curcon = (ConnectionTrack)(((PAIR)lfirst(cell))->Value);

		if ( curcon->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
		{
			/*
			 * Check if corresponding mem core ratio tracker has long enough
			 * time to waiting for GRM containers.
			 */
			DynResourceQueueTrack queuetrack = (DynResourceQueueTrack)
											   (curcon->QueueTrack);
			int index = getResourceQueueRatioIndex(queuetrack->MemCoreRatio);
			Assert(PQUEMGR->RootTrack != NULL);

			/*
			 * Set the head waiting timestamp if this request is a head request
			 * in the target queue.
			 */
			if ( queuetrack->QueryResRequests.NodeCount > 0 )
			{
				ConnectionTrack topwaiter =
						getDQueueHeadNodeData(&(queuetrack->QueryResRequests));
				if ( topwaiter == curcon && topwaiter->HeadQueueTime == 0 )
				{
					topwaiter->HeadQueueTime = curmsec;
					elog(DEBUG3, "Set timestamp of waiting at head of queue.");
				}
			}

			elog(DEBUG3, "Check waiting connection track: ConnID %d "
						 "Head time "UINT64_FORMAT " "
						 "Global resource pending time "UINT64_FORMAT " ",
						 curcon->ConnID,
						 curmsec - curcon->HeadQueueTime,
						 curmsec - PQUEMGR->RatioTrackers[index]->TotalPendingStartTime);

			bool tocancel = false;

			/*
			 * Case 1. No available cluster for executing.
			 *
			 * Case 2. No enough resource to run, resource manager is still
			 * 		   acquiring resource from global resource manager, and the
			 * 		   request is at the head of the queue.
			 */
			if ( ( (PQUEMGR->RootTrack->ClusterSegNumberMax == 0) &&
				   (curmsec - curcon->ResRequestTime >
						1000000L * rm_resource_allocation_timeout ) ) ||
				 ( (PQUEMGR->RatioTrackers[index]->TotalPendingStartTime > 0) &&
				   (curmsec - PQUEMGR->RatioTrackers[index]->TotalPendingStartTime >
						1000000L * rm_resource_allocation_timeout) &&
				   (curcon->HeadQueueTime > 0) &&
				   (curmsec - curcon->HeadQueueTime >
				 	 	1000000L * rm_resource_allocation_timeout) ) )
			{
				elog(LOG, "ConnID %d. The queued resource request no resource "
						  "timeout is detected, the waiting time in head of the"
						  "queue is "UINT64_FORMAT " global resource pending "
						  "start time is "UINT64_FORMAT
						  ", current time is "UINT64_FORMAT".",
						  curcon->ConnID,
						  curmsec - curcon->HeadQueueTime,
						  PQUEMGR->RatioTrackers[index]->TotalPendingStartTime,
						  curmsec);

				snprintf(errorbuf, sizeof(errorbuf),
						 "queued resource request is timed out due to no resource");
				tocancel = true;
			}

			/* Case 3. Check if resource fragment problem lasts too long time. */
			if ( curcon->troubledByFragment &&
			     curmsec - curcon->troubledByFragmentTimestamp >
					 1000000L * rm_resource_allocation_timeout &&
				 ((DynResourceQueueTrack)(curcon->QueueTrack))->NumOfRunningQueries == 0 )
			{
				elog(LOG, "ConnID %d. The queued resource request timeout is "
						  "detected due to resource fragment problem.",
						  curcon->ConnID);

				snprintf(errorbuf, sizeof(errorbuf),
						 "queued resource request is timed out due to resource "
						 "fragment problem");
				tocancel = true;
			}

			if ( tocancel )
			{
				cancelResourceAllocRequest(curcon, errorbuf, true);
				returnConnectionToQueue(curcon, true);
			}
		}
	}
	freePAIRRefList(&(PCONTRACK->Connections), &allcons);
	PQUEMGR->LastCheckingQueuedTimeoutTime = curmsec;
	MEMORY_CONTEXT_SWITCH_BACK
}

void buildAcquireResourceErrorResponseAndSend(ConnectionTrack  conntrack,
										  	  int 		  	   errorcode,
											  char			  *errorbuf)
{
	buildAcquireResourceErrorResponse(conntrack, errorcode, errorbuf);

	conntrack->ResponseSent = false;
	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
	MEMORY_CONTEXT_SWITCH_BACK
}

void buildAcquireResourceErrorResponse(ConnectionTrack	conntrack,
									   int				errorcode,
									   char			   *errorbuf)
{
	SelfMaintainBufferData responsedata;
	initializeSelfMaintainBuffer(&responsedata, PCONTEXT);

	RPCResponseAcquireResourceFromRMERRORData response;
	response.Result   = errorcode;
	response.Reserved = 0;

	appendSMBVar(&responsedata, response);
	appendSMBStr(&responsedata, errorbuf);
	appendSelfMaintainBufferTill64bitAligned(&responsedata);

	buildResponseIntoConnTrack(conntrack,
							   SMBUFF_CONTENT(&responsedata),
							   getSMBContentSize(&responsedata),
							   conntrack->MessageMark1,
							   conntrack->MessageMark2,
							   RESPONSE_QD_ACQUIRE_RESOURCE);
	destroySelfMaintainBuffer(&responsedata);
}

bool isAllResourceQueueIdle(void)
{
	ListCell *cell = NULL;
	foreach(cell, PQUEMGR->Queues)
	{
		DynResourceQueueTrack quetrack = lfirst(cell);
		if ( quetrack->TotalUsed.MemoryMB > 0 || quetrack->TotalUsed.Core > 0 )
		{
			return false;
		}
	}
	return true;
}

void resetAllDeadLockDetector(void)
{
	ListCell *cell = NULL;
	foreach(cell, PQUEMGR->Queues)
	{
		DynResourceQueueTrack quetrack = lfirst(cell);
		resetResourceDeadLockDetector(&(quetrack->DLDetector));
	}
}

void getIdleResourceRequest(int32_t *mem, double *core)
{
	Assert(PRESPOOL->ClusterMemoryCoreRatio > 0);
	*mem  = PRESPOOL->ClusterMemoryCoreRatio *
			PRESPOOL->AvailNodeCount *
			PQUEMGR->ActualMinGRMContainerPerSeg;
	*core = 1.0 *
			PRESPOOL->AvailNodeCount *
			PQUEMGR->ActualMinGRMContainerPerSeg;
}

void setForcedReturnGRMContainerCount(void)
{
	/* If some queue has locked resource, dont do GRM container breathe. */
	ListCell *cell = NULL;
	foreach(cell, PQUEMGR->Queues)
	{
		DynResourceQueueTrack quetrack = lfirst(cell);

		if ( quetrack->DLDetector.LockedTotal.MemoryMB > 0 )
		{
			elog(LOG, "Queue %s has potential resource deadlock, cancel breathe.",
					  quetrack->QueueInfo->Name);
			PQUEMGR->GRMQueueCurCapacity   = 1.0;
			PQUEMGR->GRMQueueResourceTight = false;
			return;
		}
	}

	/* Get current GRM container size. */
	int clusterctnsize = getClusterGRMContainerSize();
	int toretctnsize = 0;
	double curabscapacity = PQUEMGR->GRMQueueCurCapacity *
							PQUEMGR->GRMQueueCapacity;

	if ( curabscapacity > PQUEMGR->GRMQueueCapacity )
	{
		/*
		 * We would like to return as many containers as possible to make queue
		 * usage lower than expected capacity.
		 */
		double r = (curabscapacity - PQUEMGR->GRMQueueCapacity) / curabscapacity;
		elog(LOG, "GRM queue is over-using, cur capacity %lf*%lf, "
				  "ratio %lf, curent GRM container size %d",
				  PQUEMGR->GRMQueueCurCapacity,
				  PQUEMGR->GRMQueueCapacity,
				  r,
				  clusterctnsize);
		toretctnsize = ceil(r * clusterctnsize);

		if ( rm_return_percentage_on_overcommit > 0 )
		{
			double r = 1.0 * clusterctnsize * rm_return_percentage_on_overcommit / 100;
			int toretctnsize2 = ceil(r);
			elog(DEBUG3, "Calculated r %lf based on return percentage.", r);
			toretctnsize = toretctnsize < toretctnsize2 ? toretctnsize : toretctnsize2;
		}
	}

	elog(LOG, "Resource manager expects to breathe out %d GRM containers. "
			  "Total %d GRM containers, ",
			  toretctnsize,
			  clusterctnsize);

	/* Restore queue report to avoid force return again. */
	PQUEMGR->ForcedReturnGRMContainerCount = toretctnsize;
	PQUEMGR->GRMQueueCurCapacity		   = 1.0;
	PQUEMGR->GRMQueueResourceTight 		   = false;
}

void buildQueueTrackShadows(DynResourceQueueTrack	toaltertrack,
							List 				  **qhavingshadow)
{
	/*--------------------------------------------------------------------------
	 * Iteratively go through each descendant including the queue to be altered.
	 * the queue to be altered should have a shadow, the all the descendant
	 * queue having query running or queued should have a shadow. We care only
	 * the busy queues, if the queue is not in busy status, no need to build
	 * shadow to care of the status, we just need to alter the original instance
	 * directly.
	 *--------------------------------------------------------------------------
	 */

	Assert(qhavingshadow != NULL);

	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)

	List *q = NULL;
	q = lappend(q, toaltertrack);
	while( list_length(q) > 0 )
	{
		/* Pop the first one in the queue. */
		DynResourceQueueTrack curtrack = (DynResourceQueueTrack)linitial(q);
		q = list_delete_first(q);

		elog(DEBUG5, "Resource manager checks queue %s for building shadow "
					 "instance.",
					 curtrack->QueueInfo->Name);

		/*
		 * Add the queue to alter and descendant in use leaf queues to the
		 * result list.
		 */
		if( (curtrack == toaltertrack) ||
			(curtrack->isBusy && list_length(curtrack->ChildrenTracks) == 0) )
		{
			*qhavingshadow = lappend(*qhavingshadow, curtrack);
			buildQueueTrackShadow(curtrack);

			elog(RMLOG, "Resource queue %s has shadow instance built.",
					  	curtrack->QueueInfo->Name);
		}

		/* Add children queue tracks into the queue. */
		ListCell *cell = NULL;
		foreach(cell, curtrack->ChildrenTracks)
		{
			q = lappend(q, lfirst(cell));
		}
	}

	MEMORY_CONTEXT_SWITCH_BACK
}

void buildQueueTrackShadow(DynResourceQueueTrack toaltertrack)
{
	Assert(toaltertrack!= NULL);
	/* It is not acceptable to have another shadow already created. */
	Assert(toaltertrack->ShadowQueueTrack == NULL);
	/* Create shadow instance and build up reference. */
	toaltertrack->ShadowQueueTrack = rm_palloc0(PCONTEXT,
												sizeof(DynResourceQueueTrackData));
	DynResourceQueueTrack shadowtrack = toaltertrack->ShadowQueueTrack;
	shadowtrack->QueueInfo = rm_palloc0(PCONTEXT, sizeof(DynResourceQueueData));
	memcpy(shadowtrack->QueueInfo,
		   toaltertrack->QueueInfo,
		   sizeof(DynResourceQueueData));
	shadowtrack->ShadowQueueTrack = toaltertrack;

	/* Shadow resource queue track does not hold tree structure. */
	shadowtrack->ParentTrack = NULL;
	shadowtrack->ChildrenTracks = NULL;

	shadowtrack->ClusterMemoryActPer	= toaltertrack->ClusterMemoryActPer;
	shadowtrack->ClusterMemoryMaxMB		= toaltertrack->ClusterMemoryMaxMB;
	shadowtrack->ClusterMemoryMaxPer	= toaltertrack->ClusterMemoryMaxPer;

	shadowtrack->ClusterVCoreActPer		= toaltertrack->ClusterVCoreActPer;
	shadowtrack->ClusterVCoreMax		= toaltertrack->ClusterVCoreMax;
	shadowtrack->ClusterVCoreMaxPer		= toaltertrack->ClusterVCoreMaxPer;

	shadowtrack->ClusterSegNumber		= toaltertrack->ClusterSegNumber;
	shadowtrack->ClusterSegNumberMax	= toaltertrack->ClusterSegNumberMax;

	shadowtrack->CurConnCounter			= toaltertrack->CurConnCounter;
	shadowtrack->NumOfRunningQueries	= toaltertrack->NumOfRunningQueries;

	shadowtrack->MemCoreRatio			= toaltertrack->MemCoreRatio;
	shadowtrack->RatioIndex				= toaltertrack->RatioIndex;

	shadowtrack->trackedMemCoreRatio	= toaltertrack->trackedMemCoreRatio;
	shadowtrack->isBusy					= toaltertrack->isBusy;

	initializeResqueueDeadLockDetector(&(shadowtrack->DLDetector), shadowtrack);

	initializeDQueue(&(shadowtrack->QueryResRequests), PCONTEXT);

	resetResourceBundleDataByBundle(&(shadowtrack->TotalUsed),
									&(toaltertrack->TotalUsed));
	resetResourceBundleDataByBundle(&(shadowtrack->TotalAllocated),
									&(toaltertrack->TotalAllocated));

	resetResourceBundleData(&(shadowtrack->TotalRequest), 0, 0.0, 0);
	shadowtrack->expectMoreResource		= false;
	shadowtrack->pauseAllocation		= false;
	shadowtrack->troubledByFragment		= false;
}

void cleanupQueueTrackShadows(List **qhavingshadow)
{
	MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	while(list_length(*qhavingshadow) > 0)
	{
		DynResourceQueueTrack track = (DynResourceQueueTrack)
									  linitial(*qhavingshadow);
		*qhavingshadow = list_delete_first(*qhavingshadow);
		DynResourceQueueTrack shadowtrack = track->ShadowQueueTrack;
		deepFreeResourceQueueTrack(shadowtrack);
		track->ShadowQueueTrack = NULL;
	}
	MEMORY_CONTEXT_SWITCH_BACK
}

int rebuildAllResourceQueueTrackDynamicStatusInShadow(List *quehavingshadow,
													  bool  queuechanged,
													  char *errorbuf,
													  int	errorbufsize)
{
	int res = FUNC_RETURN_OK;

	ListCell *cell = NULL;
	foreach(cell, quehavingshadow)
	{
		DynResourceQueueTrack quetrack = (DynResourceQueueTrack)lfirst(cell);
		res = rebuildResourceQueueTrackDynamicStatusInShadow(quetrack,
															 queuechanged,
															 errorbuf,
															 errorbufsize);
		if ( res != FUNC_RETURN_OK )
		{
			elog(WARNING, "Resource manager failed to rebuild resource queue %s "
						  "dynamic status in its shadow to reflect altered "
						  "resource queues.",
						  quetrack->QueueInfo->Name);
			return res;
		}
		else
		{
			elog(RMLOG, "Resource manager passed rebuilding resource queue %s "
					    "dynamic status in its shadow.",
					    quetrack->QueueInfo->Name);
		}

		res = detectAndDealWithDeadLockInShadow(quetrack, queuechanged);
		if ( res != FUNC_RETURN_OK )
		{
			elog(WARNING, "Resource manager failed to rebuild resource queue %s "
						  "dynamic status to reflect altered resource queues due "
						  "to the deadlock issue introduced by altering resource "
						  "queue.",
						  quetrack->QueueInfo->Name);
			return res;
		}
		else
		{
			elog(RMLOG, "Resource manager passed detecting deadlock issues in the "
					    "shadow of resource queue %s",
					    quetrack->QueueInfo->Name);
		}
	}

	elog(RMLOG, "Resource manager finished rebuilding resource queues' dynamic "
				"status");
	return FUNC_RETURN_OK;
}

int rebuildResourceQueueTrackDynamicStatusInShadow(DynResourceQueueTrack  quetrack,
												   bool					  queuechanged,
												   char 				 *errorbuf,
												   int					  errorbufsize)
{
	int res = FUNC_RETURN_OK;

	elog(RMLOG, "Rebuild resource queue %s dynamic status in its shadow.",
			    quetrack->QueueInfo->Name);

	DynResourceQueueTrack shadowtrack = quetrack->ShadowQueueTrack;
	/* Get deadlock detector ready in the shadow instance. */
	copyResourceDeadLockDetectorWithoutLocking(&(quetrack->DLDetector),
											   &(shadowtrack->DLDetector));

	elog(DEBUG3, "Deadlock detector in shadow has %d MB in use %d MB locked.",
				 shadowtrack->DLDetector.InUseTotal.MemoryMB,
				 quetrack->DLDetector.LockedTotal.MemoryMB);

	/* Go through all queued query resource requests, recalculate the request. */
	DQUEUE_LOOP_BEGIN(&(quetrack->QueryResRequests), iter, ConnectionTrack, conn)

		ConnectionTrack newconn = NULL;
		createEmptyConnectionTrack(&newconn);
		/* Process only requests waiting here. */
		Assert(conn->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT);
		copyAllocWaitingConnectionTrack(conn, newconn);
		/* Make new connection track referencing the shadow instance */
		newconn->QueueTrack = shadowtrack;
		/*
		 * Calculate the resource request again based on new resource queue
		 * definition, i.e. the attributes updated in the shadow instance.
		 */
		res = computeQueryQuota(newconn, errorbuf, errorbufsize);

		if ( res == FUNC_RETURN_OK )
		{
			if ( newconn->StatNVSeg == 0 )
			{
				/*--------------------------------------------------------------
				 * Adjust the number of virtual segments again based on
				 * NVSEG_*_LIMITs and NVSEG_*_LIMIT_PERSEGs. This adjustment
				 * must succeed.
				 *--------------------------------------------------------------
				 */
				res = adjustResourceExpectsByQueueNVSegLimits(newconn,
															  errorbuf,
															  errorbufsize);

				if (res == FUNC_RETURN_OK )
				{
					elog(LOG, "ConnID %d. Expect query resource (%d MB, %lf CORE) "
							  "x %d ( MIN %d ) resource after adjusting based on "
							  "queue NVSEG limits.",
							  newconn->ConnID,
							  newconn->SegMemoryMB,
							  newconn->SegCore,
							  newconn->SegNum,
							  newconn->SegNumMin);
				}
			}
		}

		if ( res == FUNC_RETURN_OK )
		{
			/* Add request to the resource queue and return. */
			addQueryResourceRequestToQueue(quetrack, newconn);
		}
		else
		{
			/*------------------------------------------------------------------
			 * Here we find the request unable to be adjusted based on new
			 * resource queue resource limits. If we force resource manager to
			 * cancel the request, we will cancel this request and generate
			 * error message as response. If we dont, the rebuilding phase should
			 * be stopped, all shadows are removed, then ALTER RESOURCE QUEUE
			 * statement is canceled.
			 *------------------------------------------------------------------
			 */
			elog(WARNING, "ConnID %d. %s", newconn->ConnID, errorbuf);

			if ( !queuechanged || rm_force_alterqueue_cancel_queued_request )
			{
				buildAcquireResourceErrorResponse(newconn, res, errorbuf);
				transformConnectionTrackProgress(newconn,
												 CONN_PP_RESOURCE_QUEUE_ALLOC_FAIL);
				/*
				 * We still add this failed connection track into the shadow
				 * instance, we will remove them later.
				 */
				insertDQueueTailNode(&(shadowtrack->QueryResRequests), newconn);
			}
			else
			{
				elog(WARNING, "Resource manager finds conflict between at least "
							  "one queued query resource and new definition of "
							  "resource queue %s",
							  quetrack->QueueInfo->Name);
				freeUsedConnectionTrack(newconn);
				return RESQUEMGR_ALTERQUEUE_CONFILICT;
			}
		}
	DQUEUE_LOOP_END

	elog(DEBUG3, "Deadlock detector in shadow has %d MB in use %d MB locked "
				 "after rebuilding.",
				 shadowtrack->DLDetector.InUseTotal.MemoryMB,
				 shadowtrack->DLDetector.LockedTotal.MemoryMB);

	elog(RMLOG, "Finished rebuilding resource queue %s dynamic status in its "
				"shadow.",
			    quetrack->QueueInfo->Name);

	return FUNC_RETURN_OK;
}

int detectAndDealWithDeadLockInShadow(DynResourceQueueTrack quetrack,
									  bool					queuechanged)
{
	Assert(quetrack != NULL);
	Assert(quetrack->ShadowQueueTrack != NULL);
	DynResourceQueueTrack shadowtrack = quetrack->ShadowQueueTrack;

	elog(DEBUG3, "Deadlock detector in shadow has %d MB in use, %d MB locked",
				 shadowtrack->DLDetector.InUseTotal.MemoryMB,
				 shadowtrack->DLDetector.LockedTotal.MemoryMB);

	/* Assume more available resource unlocked queued requests. */
	int32_t pavailmemorymb = 0;

	/* Go through all queued query resource requests, recalculate the request. */
	DQUEUE_LOOP_BEGIN(&(shadowtrack->QueryResRequests), iter, ConnectionTrack, conn)

		/* Maybe there are some connection track has FAIL status. */
		if ( conn->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
		{
			continue;
		}

		/* Check if this connection has deadlock issue. */
		int32_t expmemorymb   = conn->SegMemoryMB * conn->SegNumMin;
		int32_t availmemorymb = shadowtrack->ClusterMemoryMaxMB -
								shadowtrack->DLDetector.LockedTotal.MemoryMB +
								pavailmemorymb;

		/*----------------------------------------------------------------------
		 * If the queue already uses more resource than its maximum capability,
		 * we cannot count the resource available more than maximum possible
		 * limit.
		 *----------------------------------------------------------------------
		 */
		availmemorymb = availmemorymb > shadowtrack->ClusterMemoryMaxMB ?
						shadowtrack->ClusterMemoryMaxMB :
						availmemorymb;

		/* NOTE: availmemorymb maybe less than 0. */
		if ( expmemorymb > availmemorymb )
		{
			/* We encounter a deadlock issue. */
			if ( !queuechanged || rm_force_alterqueue_cancel_queued_request )
			{
				cancelQueryRequestToBreakDeadLockInShadow(shadowtrack,
														  iter,
														  expmemorymb,
														  availmemorymb);
			}
			else
			{
				elog(WARNING, "Resource manager finds at least one deadlock issue"
							  "due to new definition of resource queue %s",
							  quetrack->QueueInfo->Name);
				return RESQUEMGR_ALTERQUEUE_CONFILICT;
			}
		}

		/*----------------------------------------------------------------------
		 * When we try next request in the queue, we should assume the previous
		 * sessions can release resource. This works because we assume that the
		 * sessions waiting for resource allocation have unique session id values.
		 *----------------------------------------------------------------------
		 */
		SessionTrack strack = findSession(&(shadowtrack->DLDetector),
										  conn->SessionID);
		if ( strack != NULL )
		{
			pavailmemorymb += strack->InUseTotal.MemoryMB;
		}

	DQUEUE_LOOP_END

	return FUNC_RETURN_OK;
}

void cancelQueryRequestToBreakDeadLockInShadow(DynResourceQueueTrack shadowtrack,
											   DQueueNode			 iter,
											   int32_t				 expmemorymb,
											   int32_t				 availmemorymb)
{
	static char errorbuf[ERRORMESSAGE_SIZE];
	DQueueNode tailiter = getDQueueContainerTail(&(shadowtrack->QueryResRequests));

	elog(DEBUG3, "ConnID %d. Try to deal with deadlock issue.",
				 ((ConnectionTrack)(iter->Data))->ConnID);

	/*--------------------------------------------------------------------------
	 * Loop to try all subsequent requests.
	 *
	 * NOTE: If canceling all subsequent requests, we still can not satisfy this
	 * request we cancel current requests.
	 *--------------------------------------------------------------------------
	 */
	while((availmemorymb < expmemorymb) && tailiter != iter->Prev)
	{
		ConnectionTrack	curconn	= (ConnectionTrack)(tailiter->Data);

		if ( curconn->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
		{
			tailiter = tailiter->Prev;
			continue;
		}

		SessionTrack 	strack	= NULL;
		strack = findSession(&(shadowtrack->DLDetector), curconn->SessionID);
		if ( strack != NULL )
		{
			/* This session has locked resource, cancel it to release resource. */
			snprintf(errorbuf, sizeof(errorbuf),
					 "session "INT64_FORMAT" deadlock is detected",
					 curconn->SessionID);

			elog(WARNING, "ConnID %d, %s", curconn->ConnID, errorbuf);

			availmemorymb += strack->InUseTotal.MemoryMB;

			availmemorymb = availmemorymb > shadowtrack->ClusterMemoryMaxMB ?
							shadowtrack->ClusterMemoryMaxMB :
							availmemorymb;

			/* Unlock the resource. */
			unlockSessionResource(&(shadowtrack->DLDetector), curconn->SessionID);

			/* Cancel this request. */
			buildAcquireResourceErrorResponse(curconn,
											  RESQUEMGR_DEADLOCK_DETECTED,
											  errorbuf);

			transformConnectionTrackProgress(curconn,
											 CONN_PP_RESOURCE_QUEUE_ALLOC_FAIL);
		}
		tailiter = tailiter->Prev;
	}
}

void applyResourceQueueTrackChangesFromShadows(List *quehavingshadow)
{
	ListCell *cell = NULL;
	foreach(cell, quehavingshadow)
	{
		DynResourceQueueTrack quetrack = (DynResourceQueueTrack)lfirst(cell);
		DynResourceQueueTrack shadowtrack = quetrack->ShadowQueueTrack;

		/* Update resource queue info. */
		memcpy(quetrack->QueueInfo,
			   shadowtrack->QueueInfo,
			   sizeof(DynResourceQueueData));

		/* Update resource queue track info. */
		quetrack->ClusterMemoryActPer	= shadowtrack->ClusterMemoryActPer;
		quetrack->ClusterMemoryMaxMB	= shadowtrack->ClusterMemoryMaxMB;
		quetrack->ClusterMemoryMaxPer	= shadowtrack->ClusterMemoryMaxPer;

		quetrack->ClusterVCoreActPer	= shadowtrack->ClusterVCoreActPer;
		quetrack->ClusterVCoreMax		= shadowtrack->ClusterVCoreMax;
		quetrack->ClusterVCoreMaxPer	= shadowtrack->ClusterVCoreMaxPer;

		quetrack->ClusterSegNumber		= shadowtrack->ClusterSegNumber;
		quetrack->ClusterSegNumberMax	= shadowtrack->ClusterSegNumberMax;

		quetrack->CurConnCounter		= shadowtrack->CurConnCounter;
		quetrack->NumOfRunningQueries	= shadowtrack->NumOfRunningQueries;

		quetrack->MemCoreRatio			= shadowtrack->MemCoreRatio;
		quetrack->RatioIndex			= shadowtrack->RatioIndex;

		quetrack->trackedMemCoreRatio	= shadowtrack->trackedMemCoreRatio;
		quetrack->isBusy				= shadowtrack->isBusy;

		/* The deadlock detector should use the new one completely. */
		resetResourceDeadLockDetector(&(quetrack->DLDetector));
		copyResourceDeadLockDetectorWithoutLocking(&(shadowtrack->DLDetector),
												   &(quetrack->DLDetector));

		resetResourceBundleDataByBundle(&(quetrack->TotalUsed),
										&(shadowtrack->TotalUsed));
		resetResourceBundleDataByBundle(&(quetrack->TotalAllocated),
										&(shadowtrack->TotalAllocated));

		resetResourceBundleDataByBundle(&(quetrack->TotalRequest),
										&(shadowtrack->TotalRequest));

		quetrack->expectMoreResource	= false;
		quetrack->pauseAllocation 		= false;
		quetrack->troubledByFragment	= false;

		if ( quetrack->TotalUsed.MemoryMB > quetrack->ClusterMemoryMaxMB )
		{
			quetrack->pauseAllocation = true;
		}

		/*
		 * Update queued resource requests. We should lock resource again and
		 * handle the failed requests if there are some found in the queue.
		 * The canceled requests should have the error message sent out.
		 */

		DQueueNode shadowiter = shadowtrack->QueryResRequests.Head;
		DQueueNode iter = quetrack->QueryResRequests.Head;
		while(iter != NULL)
		{
			ConnectionTrack conn = (ConnectionTrack)(iter->Data);
			ConnectionTrack shadowconn = (ConnectionTrack)(shadowiter->Data);
			if ( shadowconn->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
			{
				/* Remove failed connection track and adjust iterator */
				DQueueNode nextiter = iter->Next;
				removeDQueueNode(&(quetrack->QueryResRequests), iter);
				iter = nextiter;

				/* Set fail status of this connection track. */
				conn->Progress = shadowconn->Progress;

				/* Get built error message. */
				conn->MessageID    = shadowconn->MessageID;
				conn->MessageMark1 = shadowconn->MessageMark1;
				conn->MessageMark2 = shadowconn->MessageMark2;
				conn->MessageSize  = shadowconn->MessageSize;

				setConnectionTrackMessageBuffer(
					conn,
					SMBUFF_CONTENT(&(shadowconn->MessageBuff)),
					getSMBContentSize(&(shadowconn->MessageBuff)));

				/* Send out the message. */
				conn->ResponseSent = false;
				MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
				PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conn);
				MEMORY_CONTEXT_SWITCH_BACK

				/* Recycle connection track instance. */
				Assert(quetrack->CurConnCounter > 0);

				elog(DEBUG3, "Resource queue %s has %d connections before handling "
							 "deadlocked connection ConnID %d detected from a "
							 "shadow.",
							 quetrack->QueueInfo->Name,
							 quetrack->CurConnCounter,
							 conn->ConnID);

				quetrack->CurConnCounter--;
				if ( quetrack->CurConnCounter == 0 )
				{
					elog(RMLOG, "Resource queue %s becomes idle after applying "
								"change from its shadow.",
								quetrack->QueueInfo->Name);
					quetrack->isBusy = false;
					refreshMemoryCoreRatioLimits();
					refreshMemoryCoreRatioWaterMark();
				}
			}
			else
			{
				/* Get updated resource quota. */
				copyResourceQuotaConnectionTrack(shadowconn, conn);

				/* Lock corresponding sessions. */
				createAndLockSessionResource(&(quetrack->DLDetector),
											 conn->SessionID);

				/* Adjust iterator. */
				iter = iter->Next;
			}
			shadowiter = shadowiter->Next;
		}
	}
}

void dumpResourceQueueStatus(const char *filename)
{
	if ( filename == NULL ) { return; }
	FILE *fp = fopen(filename, "w");

	fprintf(fp, "Maximum capacity of queue in global resource manager cluster %lf",
				PQUEMGR->GRMQueueMaxCapacity);
	fprintf(fp, "Number of resource queues : %d\n", list_length(PQUEMGR->Queues));

	/* Output each resource queue. */
	ListCell *cell = NULL;
	foreach(cell, PQUEMGR->Queues)
	{
		DynResourceQueueTrack quetrack = lfirst(cell);

		fprintf(fp, "QUEUE(name=%s:parent=%s:children=%d:busy=%d:paused=%d),",
				    quetrack->QueueInfo->Name,
					quetrack->ParentTrack != NULL ?
							quetrack->ParentTrack->QueueInfo->Name :
							"NULL",
					list_length(quetrack->ChildrenTracks),
					quetrack->isBusy ? 1 : 0,
					quetrack->pauseAllocation ? 1 : 0);

		fprintf(fp, "REQ(conn=%d:request=%d:running=%d),",
					quetrack->CurConnCounter,
					quetrack->QueryResRequests.NodeCount,
					quetrack->NumOfRunningQueries);

		fprintf(fp, "SEGCAP(ratio=%u:ratioidx=%d:segmem=%dMB:segcore=%lf:"
					"segnum=%d:segnummax=%d),",
					quetrack->MemCoreRatio,
					quetrack->RatioIndex,
					quetrack->QueueInfo->SegResourceQuotaMemoryMB,
					quetrack->QueueInfo->SegResourceQuotaVCore,
					quetrack->ClusterSegNumber,
					quetrack->ClusterSegNumberMax);

		fprintf(fp, "QUECAP(memmax=%u:coremax=%lf:"
					"memper=%lf:mempermax=%lf:coreper=%lf:corepermax=%lf),",
					quetrack->ClusterMemoryMaxMB,
					quetrack->ClusterVCoreMax,
					quetrack->ClusterMemoryActPer,
					quetrack->ClusterMemoryMaxPer,
					quetrack->ClusterVCoreActPer,
					quetrack->ClusterVCoreMaxPer);

		fprintf(fp, "QUEUSE(alloc=(%u MB,%lf CORE):"
					       "request=(%u MB,%lf CORE):"
					       "inuse=(%u MB,%lf CORE))\n",
					quetrack->TotalAllocated.MemoryMB,
					quetrack->TotalAllocated.Core,
					quetrack->TotalRequest.MemoryMB,
					quetrack->TotalRequest.Core,
					quetrack->TotalUsed.MemoryMB,
					quetrack->TotalUsed.Core);
	}

	fprintf(fp, "Number of mem/core ratios : %d\n", PQUEMGR->RatioCount);

	/* Output each mem/core ratio. */
	for( int i = 0 ; i < PQUEMGR->RatioCount ; ++i )
	{
		fprintf(fp, "RATIO(ratio=%u:",
					PQUEMGR->RatioReverseIndex[i]);

		if ( PQUEMGR->RatioWaterMarks[i].NodeCount == 0 )
		{
			fprintf(fp, "mem=0MB:core=0.0:time=NULL)\n");
		}
		else
		{
			DynMemoryCoreRatioWaterMark mark =
				(DynMemoryCoreRatioWaterMark)
				getDQueueHeadNodeData(&(PQUEMGR->RatioWaterMarks[i]));
			fprintf(fp, "mem=%uMB:core=%lf:time=%s)\n",
						mark->ClusterMemoryMB,
						mark->ClusterVCore,
						format_time_microsec(mark->LastRecordTime*1000000));
		}
	}

	fclose(fp);
}
