/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "envswitch.h"
#include "dynrm.h"
#include "utils/hashtable.h"

#include "resourcemanager/resourcemanager.h"
#include "resourceenforcer/resourceenforcer.h"

#include "storage/ipc.h"
#include "postmaster/postmaster.h"
#include "postmaster/fork_process.h"
#include "postmaster/syslogger.h"
#include "utils/kvproperties.h"
#include "utils/network_utils.h"

#include "miscadmin.h"

#include "communication/rmcomm_QD2RM.h"
#include "communication/rmcomm_AsyncComm.h"
#include "communication/rmcomm_MessageHandler.h"
#include "communication/rmcomm_MessageServer.h"
#include "communication/rmcomm_RM2GRM_libyarn.h"
#include "communication/rmcomm_RM2GRM_none.h"
#include "communication/rmcomm_RMSEG2RM.h"
#include "communication/rmcomm_RM2RMSEG.h"
#include "storage/proc.h"
#include "catalog/pg_database.h"
#include "catalog/pg_tablespace.h"
#include "catalog/catalog.h"
#include "utils/syscache.h"
#include "storage/backendid.h"
#include "storage/sinvaladt.h"

#include "catalog/pg_authid.h"
#include "catalog/pg_resqueue.h"
#include "access/htup.h"
#include "utils/tqual.h"

#include "access/xact.h"

#include "resourcebroker/resourcebroker_API.h"

#include <executor/spi.h>

#include "gp-libpq-fe.h"
#include "gp-libpq-int.h"

extern bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace);
static char *probeDatabase = "template1";

int  loadAllQueueAndUser(void);
int  loadHostInformationIntoResourcePool(void);

/** Functions for signal handling. 											 **/
void quitResManager(SIGNAL_ARGS);
void handleChildSignal(SIGNAL_ARGS);
void notifyPostmasterResManagerStarted(SIGNAL_ARGS);
int  generateAllocRequestToBroker(void);
void completeAllocRequestToBroker(int32_t 	 *reqmem,
								  int32_t 	 *reqcore,
								  List 		**preferred);
void processResourceBrokerTasks(void);
void generateResourceRequestToResourceBroker(void);
void cleanupAllGRMContainers(void);
bool cleanedAllGRMContainers(void);

/** Functions for loading resource queue and user definition. **/
#define DEC_VAR_PG_CATCOL(name) Datum name##Datum; bool name##IsNull = false;

#define GETATTR_PG_CATCOL(tbname,colname,relvar,tupvar)						   \
		colname##Datum = heap_getattr(tupvar,								   \
									  Anum_##tbname##_##colname,			   \
									  relvar->rd_att, 						   \
									  &colname##IsNull);

#define BLDPROP_PG_CATCOL(colname,list,tag1,attrtag,attrid,attrtyp,index)	   \
		insertDQueueTailNode(list,											   \
							 createProperty##attrtyp(list->Context,			   \
							 tag1,											   \
							 get##attrtag##AttributeName(attrid),			   \
							 &index,										   \
							 DatumGet##attrtyp(colname##Datum)));

#define BLDPROP_PG_CATCOL_OID(colname,list,tag1,attrtag,attrid,index)	   	   \
		insertDQueueTailNode(list,											   \
							 createPropertyOID(list->Context,			  	   \
							 tag1,											   \
							 get##attrtag##AttributeName(attrid),			   \
							 &index,										   \
							 colname##IsNull ? InvalidOid :					   \
								DatumGetObjectId(colname##Datum)));

#define BLDPROP_PG_CATCOL_SOID(list,tag1,attrtag,attrid,index,oid)	   		   \
		insertDQueueTailNode(list,											   \
							 createPropertyOID(list->Context,			   	   \
							 tag1,											   \
							 get##attrtag##AttributeName(attrid),			   \
							 &index,										   \
							 oid));

#define BLDPROP_PG_CATCOL_SET(list,tag1,attrtag,attrid,attrtyp,index,val)  	   \
		insertDQueueTailNode(list,											   \
							 createProperty##attrtyp(list->Context,			   \
							 tag1,											   \
							 get##attrtag##AttributeName(attrid),			   \
							 &index,										   \
							 val));

#define BLDPROP_PG_CATCOL_TXT(colname,list,tag1,attrtag,attrid,index)	       \
		insertDQueueTailNode(list,											   \
							 createPropertyText(list->Context,			 	   \
							 tag1,											   \
							 get##attrtag##AttributeName(attrid),			   \
							 &index,										   \
							 colname##IsNull ? NULL :					   	   \
								DatumGetTextP(colname##Datum)));

#define BLDPROP_PG_CATCOL_FLOAT(colname,list,tag1,attrtag,attrid,index)        \
		insertDQueueTailNode(list,                                             \
						     createPropertyFloat(list->Context,                \
						     tag1,                                             \
                             get##attrtag##AttributeName(attrid),              \
							 &index,                                           \
							 colname##IsNull ? -1.0 :                          \
								DatumGetFloat4(colname##Datum)));

#define BLDPROP_PG_CATCOL_STR(list,tag1,attrtag,attrid,index,val)	   	  	   \
		insertDQueueTailNode(list,											   \
							 createPropertyString(list->Context,			   \
							 tag1,											   \
							 get##attrtag##AttributeName(attrid),			   \
							 &index,										   \
							 val));

#define BLDPROP_PG_CATCOL_INT32(colname,list,tag1,attrtag,attrid,index)	       \
		insertDQueueTailNode(list,											   \
							 createPropertyInt32(list->Context,			 	   \
							 tag1,											   \
							 get##attrtag##AttributeName(attrid),			   \
							 &index,										   \
							 colname##IsNull ? -1 :					   	   \
								DatumGetInt32(colname##Datum)));

#define BLDPROP_PG_CATCOL_INT64(colname,list,tag1,attrtag,attrid,index)	       \
		insertDQueueTailNode(list,											   \
							 createPropertyInt32(list->Context,			 	   \
							 tag1,											   \
							 get##attrtag##AttributeName(attrid),			   \
							 &index,										   \
							 colname##IsNull ? -1 :					   	   \
								DatumGetInt64(colname##Datum)));

int	 loadUserPropertiesFromCatalog(List **users);
int	 loadQueuePropertiesFromCatalog(List **queues);

/******************************************************************************
 * The global instance of dynamic resource manager.
 ******************************************************************************/
DynRMGlobal   DRMGlobalInstance;/* The global instance for HAWQ RM logic. 	  */
volatile bool DRMStartup;		/* This variable is set only in postmaster
								   process by signal handler SIG_USR2. This is
								   used to check if HAWQ RM is ready to accept
								   request.									  */

pthread_t	t_move_cgroup;
queue		*g_queue_cgroup;
GHash		g_ghash_cgroup;
uint64		rm_enforce_last_cleanup_time;
volatile bool	g_enforcement_thread_quited = false;

#define SEGMENT_HEARTBEAT_INTERVAL (3LL * 1000000LL)
/***************************
 * The main entry of HAWQ RM
 ***************************/
int ResManagerMain(int argc, char *argv[])
{
    int 	 	 res 		= FUNC_RETURN_OK;

    /*******************************************************************/
    /* Step 1. Prepare memory context, log and create global instance. */
    /*******************************************************************/
	IsUnderPostmaster 	= true;
	MyProcPid 			= getpid();
	SetProcessingMode(InitProcessing);

    res = createDRMInstance();
	if ( res != FUNC_RETURN_OK ) {
		elog(FATAL, "HAWQ RM Can not create resource manager global instance.");
	}
	elog(DEBUG5, "HAWQ RM :: created dynamic resource manager instance.");

    res = createDRMMemoryContext();
    if ( res != FUNC_RETURN_OK ) {
    	elog(FATAL, "HAWQ RM Can not create resource manager global instance.");
    }
	elog(DEBUG5, "HAWQ RM :: created resource manager memory context.");

	res = initializeDRMInstance(PCONTEXT);
	if ( res != FUNC_RETURN_OK ) {
	    	elog(FATAL, "HAWQ RM Can not initialize global instance.");
	}

	elog(DEBUG5, "HAWQ RM :: initialized resource manager instance.");

	/**************************************************************************
	 * STEP 2. Load configuration.
	 **************************************************************************/

	/* Command line parser. */
    res = parseCommandLine(argc, argv);
    if ( res != FUNC_RETURN_OK ) {
    	elog(FATAL, "Wrong resource manager command line arguments.");
    }

	elog(DEBUG5, "HAWQ RM ::passed command line arguments.");

	/* Recognize all loaded configure properties. */
	res = loadDynamicResourceManagerConfigure();
	if ( res != FUNC_RETURN_OK ) {
		elog(FATAL, "Fail to load valid properties from configure files.");
	}

	elog(DEBUG5, "HAWQ RM :: passed loading configuration.");

    /*******************************************************/
    /* Step 3. Post-initialization of core data structure. */
    /*******************************************************/
    initializeDRMCore();

	elog(DEBUG5, "HAWQ RM :: Passed initializing core data structure.");

	/**************************************************************************/
	/* STEP 4. INIT for making RM process access catalog by CAQL etc.         */
	/**************************************************************************/
	/* BLOCK signal behavior. Only another specific thread has the capability to
	 * process the signal. */
	PG_SETMASK(&BlockSig);
	pqsignal(SIGHUP , SIG_IGN);
	pqsignal(SIGINT , quitResManager);
	pqsignal(SIGTERM, quitResManager);
	pqsignal(SIGQUIT, quitResManager);
	pqsignal(SIGPIPE, SIG_IGN);
	pqsignal(SIGUSR1, SIG_IGN);
	pqsignal(SIGUSR2, quitResManager);
	pqsignal(SIGCHLD, handleChildSignal);
	pqsignal(SIGTTIN, SIG_IGN);
	pqsignal(SIGTTOU, SIG_IGN);

	if ( DRMGlobalInstance->Role == START_RM_ROLE_MASTER ) {
		CurrentResourceOwner = ResourceOwnerCreate(NULL, "Resource Manager");

		BaseInit();
		InitProcess();
		InitBufferPoolBackend();
		InitXLOGAccess();

		SetProcessingMode(NormalProcessing);

		MyDatabaseId = TemplateDbOid;
		MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
		if (!FindMyDatabase(probeDatabase, &MyDatabaseId, &MyDatabaseTableSpace))
			ereport(FATAL, (errcode(ERRCODE_UNDEFINED_DATABASE),
				errmsg("database 'postgres' does not exist")));

		char *fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);

		SetDatabasePath(fullpath);

		InitProcessPhase2();

		MyBackendId = InvalidBackendId;

		SharedInvalBackendInit(false);

		if (MyBackendId > MaxBackends || MyBackendId <= 0)
			elog(FATAL, "bad backend id: %d", MyBackendId);

		InitBufferPoolBackend();
		RelationCacheInitialize();
		InitCatalogCache();
		RelationCacheInitializePhase2();
	}
	/* END: INIT for making RM process access catalog by caql etc.            */
	/**************************************************************************/
	PG_SETMASK(&UnBlockSig);

    /* Save process fork mode. */
	DRMGlobalInstance->ThisPID	 = getpid();
	DRMGlobalInstance->ParentPID = getppid();

	/* Initialize socket connection pool. */
	initializeSocketConnectionPool();

	elog(DEBUG5, "HAWQ RM :: starts as role %d.", DRMGlobalInstance->Role);

	/*******************************************/
	/* STEP 5. Start concrete processing loop. */
	/*******************************************/
    switch(DRMGlobalInstance->Role) {
    case START_RM_ROLE_MASTER:
    {
    	/* Mark as master side resource manager. */
    	int oldid = gp_session_id;
    	gp_session_id = -1;
		init_ps_display("master resource manager", "     ", "", "");
		gp_session_id = oldid;

		/* Initialize resource broker service. */
		RB_prepareImplementation(DRMGlobalInstance->ImpType);
		res = RB_start(true);
		if ( res != FUNC_RETURN_OK ) {
			elog(FATAL, "Fail to create resource broker service.");
		}
		res = ResManagerMainServer2ndPhase();
		elog(LOG, "Master RM exits.\n");
		break;
    }
    case START_RM_ROLE_SEGMENT:
    {
		init_ps_display("segment resource manager", "", "","");
		res = ResManagerMainSegment2ndPhase();
		elog(LOG, "Segment RM exits.\n");
		break;
    }
    default:
    	Assert(false);	/* Should never come here. */
    }
    return res;
}


/*
 * Start sub-process as resource manager from postmaster.
 *
 * rmrole[in] 	Specify the role of RM process, possible values are :
 *				START_RM_ROLE_SERVER : RM server
 *				START_RM_ROLE_SEGMENT: RM agent.
 */
int ResManagerProcessStartup(void)
{
	pid_t ResMgrPID = 0;
	char *arguments[3] = {NULL, NULL, NULL};
	arguments[0] = "postgres";
	arguments[1] = "-role";

	int rmrole = AmIMaster() ? START_RM_ROLE_MASTER : START_RM_ROLE_SEGMENT;
    // if in upgrade mode, at the first calling, the old pg_resqueue data can't let resource manager works,
    // And also we only need to upgrade master node, no segment is enabled at this time, so no need for resource manager.
    if(gp_upgrade_mode) 
        return 0;
	/*
	 * Prepare temporary signal handler for SIGUSR2 to let postmaster process
	 * able to know when HAWQ RM is ready to accept requests.
	 */
	DRMStartup = false;
	pqsignal(SIGUSR2, notifyPostmasterResManagerStarted);
	PG_SETMASK(&UnBlockSig);

	/* Fork resource manager process. */
    switch ((ResMgrPID = fork_process()))
    {
        case -1:
        	pqsignal(SIGUSR2, SIG_IGN);
        	PG_SETMASK(&BlockSig);
            ereport(FATAL,
                    (errmsg("could not fork resource manager collector: %m")));
            return 0;
        case 0:
        	pqsignal(SIGUSR2, SIG_IGN);
            /* in postmaster child ... */
            /* Close the postmaster's sockets */
            ClosePostmasterPorts(false);

            /* Lose the postmaster's on-exit routines */
            on_exit_reset();

            ResourceManagerIsForked = true;

            switch(rmrole) {
            case START_RM_ROLE_MASTER:
            	arguments[2] = "server";
            	proc_exit(ResManagerMain(3, arguments));
            	break;
            case START_RM_ROLE_SEGMENT:
            	arguments[2] = "segment";
            	proc_exit(ResManagerMain(3, arguments));
            	break;
            }

            break;

        default:
        {
        	/*******************************************************************
        	 * IN POSTMASTER. Postmaster here waits for the signal SIGUSR2 sent
        	 * from HAWQ RM to gurantee that HAWQ RM can accept the request after
        	 * the return of this function.
        	 ******************************************************************/
        	/* wait to ensure that resource manager works basically. */
        	for ( int i = 0 ; i < 180 ; ++i ) {
        		if ( usleep(1000000) != 0 )
        			i--;
        		elog(LOG, "Wait for HAWQ RM %d", i);
        		if ( DRMStartup )
        			break;
        	}
        	pqsignal(SIGUSR2, SIG_IGN);
        	PG_SETMASK(&BlockSig);

        	if ( DRMStartup ) {
        		elog(LOG, "HAWQ :: Received signal notification that HAWQ RM "
        				  "works now.");
        	}
        	else {
        		elog(FATAL, "HAWQ RM can not work. Please check HAWQ RM log.");
        	}
            return ResMgrPID;
        }
    }

    /* shouldn't get here */
    return 0;
}

int ResManagerMainServer2ndPhase(void)
{
	int res = FUNC_RETURN_OK;

	/* Register message handlers */
	registerMessageHandler(REQUEST_QD_CONNECTION_REG        , handleRMRequestConnectionReg);
	registerMessageHandler(REQUEST_QD_CONNECTION_REG_OID    , handleRMRequestConnectionRegByOID);
	registerMessageHandler(REQUEST_QD_CONNECTION_UNREG      , handleRMRequestConnectionUnReg);
	registerMessageHandler(REQUEST_QD_ACQUIRE_RESOURCE      , handleRMRequestAcquireResource);
	registerMessageHandler(REQUEST_QD_RETURN_RESOURCE       , handleRMRequestReturnResource);
	registerMessageHandler(REQUEST_RM_IMALIVE               , handleRMSEGRequestIMAlive);
	registerMessageHandler(REQUEST_QD_DDL_MANIPULATERESQUEUE, handleRMDDLRequestManipulateResourceQueue);
	registerMessageHandler(REQUEST_QD_DDL_MANIPULATEROLE	, handleRMDDLRequestManipulateRole);
	registerMessageHandler(REQUEST_QD_ACQUIRE_RESOURCE_QUOTA, handleRMRequestAcquireResourceQuota);
	registerMessageHandler(REQUEST_QD_REFRESH_RESOURCE      , handleRMRequestRefreshResource);
	registerMessageHandler(REQUEST_QD_SEGMENT_ISDOWN        , handleRMRequestSegmentIsDown);
	registerMessageHandler(REQUEST_QD_DUMP_STATUS           , handleRMRequestDumpStatus);
    registerMessageHandler(REQUEST_QD_DUMP_RESQUEUE_STATUS  , handleRMRequestDumpResQueueStatus);
	registerMessageHandler(REQUEST_DUMMY                    , handleRMRequestDummy);
	registerMessageHandler(REQUEST_QD_QUOTA_CONTROL 		, handleRMRequestQuotaControl);
	/* New socket facility poll based server.*/
	res = initializeSocketServer();
	if ( res != FUNC_RETURN_OK ) {
		elog(FATAL, "Fail to initialize socket server.");
	}

	/*
	 * Notify postmaster that HAWQ RM is ready. Ignore the possible problem that
	 * the parent process quits. HAWQ RM will automatically detect if its parent
	 * dies, then HAWQ RM should exit normally.
	 */
	kill(DRMGlobalInstance->ParentPID, SIGUSR2);

	/* Clean up gp_segment_configuration table. */
	cleanup_segment_config();

	cleanup_segment_config_history();

	/*
	 * register itself into gp_segment_configuration table
	 * master internal id is 0, segment id starts from 1
	 */
	add_segment_config_row(MASTER_ORDER_ID,
                           DRMGlobalInstance->SocketLocalHostName.Str,
                           DRMGlobalInstance->SocketLocalHostName.Str,
                           PostPortNumber,
                           SEGMENT_ROLE_MASTER_CONFIG,
                           SEGMENT_STATUS_UP,
                           "");

	/* Load queue and user definition as no DDL now. */
	res = loadAllQueueAndUser();
	if ( res != FUNC_RETURN_OK ) {
		elog(FATAL, "Fail to load queue and user definition.");
	}

	elog(DEBUG5, "HAWQ RM :: passed loading queue and user definition.");

	/* Check slaves file firstly to ensure we have expected cluster size. */
	checkSlavesFile();

	if ( rm_resourcepool_test_filename != NULL &&
		 rm_resourcepool_test_filename[0] != '\0' ) {
		loadHostInformationIntoResourcePool();
	}

	/******* TILL NOW, resource manager starts providing services *******/
	elog(LOG, "HAWQ RM process works now.");

    /* Start request handler to provide services. */
    res = MainHandlerLoop();
    /* res is returned to the caller. */


    /******* TILL NOW, resource manager goes into EXIT PHASE *******/
    elog(LOG, "HAWQ RM server goes into exit phase.");

    /* Shutdown resource broker. */
    RB_stop();

    return res;
}

/**
 * The main loop of HAWQ RM process.
 */
int MainHandlerLoop(void)
{
	int res = FUNC_RETURN_OK;

	DRMGlobalInstance->ResourceManagerStartTime = gettime_microsec();

	while( DRMGlobalInstance->ResManagerMainKeepRun )
	{
		/* STEP 0. Check if postmaster process exists. */
		if (!PostmasterIsAlive(true)) {
			DRMGlobalInstance->ResManagerMainKeepRun = false;
			elog(LOG, "Postmaster is not alive, resource manager exits");
			break;
		}

		/* STEP 1. Check resource broker status. */
		RB_start(true);

		/* STEP 2. Check if resource manager is in clean up status. */
		if ( isCleanGRMResourceStatus() )
		{
			/*
			 * Fisrtly we move all currently accepted GRM containers into resource
			 * pool. We will drop them at once.
			 */
			moveAllAcceptedGRMContainersToResPool();

			/*
			 * Mark all current active connection tracks that they are in fact
			 * using old application's resource. When resource is returned, the
			 * resource is calculated in old resource level.
			 */
			setAllAllocatedResourceInConnectionTracksOld();

			/* Validate that all in-use resource are marked to old resource. */
			Assert(isAllResourceQueueIdle());

			/*
			 * Reset deadlock detector. The deadlock detector works for new
			 * resource only.
			 */
			resetAllDeadLockDetector();

			/*
			 * Mark all segments DOWN due to no GRM cluster report
			 */
			setAllNodesGRMDown();

			/* Move all resource broker allocated GRM containers to returned. */
			cleanupAllGRMContainers();

			/* Refresh resource queue resource usage and request quota. */
			refreshMemoryCoreRatioLevelUsage(gettime_microsec());

			resetAllSegmentsGRMContainerFailAllocCount();
			resetAllSegmentsNVSeg();

			/* Check if can resume using new available global resource manager.*/
			if ( cleanedAllGRMContainers() )
			{
				unsetCleanGRMResourceStatus();
			}
		}

		/*
		 * STEP 3. Process all communications between resource manager and
		 * 		   resource broker.
		 */
		processResourceBrokerTasks();

		/* STEP 4. Handle socket server inputs. */
		res = processAllCommFileDescs();
		if ( res != FUNC_RETURN_OK )
		{
			/*
			 * The possible error here is the failure of poll(), we won't keep
			 * running HAWQ RM any longer, graceful quit is requested.
			 */
			elog(WARNING, "System error cause resource manager not possible to "
						  "track network communications. Exit resource manager "
						  "process normally.");
			DRMGlobalInstance->ResManagerMainKeepRun = false;
		}

		/* STEP 5. Handle all submitted requests through socket clients. */
		processSubmittedRequests();

		/* STEP 6. Generate possible resource request to resource broker. */
		generateResourceRequestToResourceBroker();

        /* STEP 7. Check timeout resource allocation and timeout queuing requests. */
        timeoutDeadResourceAllocation();
        timeoutQueuedRequest();

		/*
		 * STEP 8. Check the status of all segment nodes, mark down if hasn't got
		 * 		   IMAlive message for a pre-defined period.
		 */
        uint64_t curtime = gettime_microsec();
		if ((rm_resourcepool_test_filename == NULL ||
			rm_resourcepool_test_filename[0] == '\0') &&
			(curtime - PRESPOOL->LastCheckTime >
        	 1000000LL * rm_segment_heartbeat_timeout))
		{
			updateStatusOfAllNodes();
			PRESPOOL->LastCheckTime = curtime;
		}


		/* STEP 9. Move all accepted GRM containers into resource pool. */
		moveAllAcceptedGRMContainersToResPool();

		/*
		 * STEP 10. Check if should pause dispatching resource to queries to
		 * collect resource back in order to return GRM containers.
		 */
		if ( DRMGlobalInstance->ImpType != NONE_HAWQ2 &&
			 PRESPOOL->AddPendingContainerCount == 0 &&
			 PRESPOOL->RetPendingContainerCount == 0 &&
			 PQUEMGR->ForcedReturnGRMContainerCount == 0 &&
			 PQUEMGR->GRMQueueCurCapacity > 1 &&
			 PQUEMGR->GRMQueueResourceTight )
		{
			elog(LOG, "Resource manager decides to breathe out resource. "
					  "Current relative GRM queue capacity %lf, "
					  "Expect GRM queue capacity %lf, "
					  "Estimae GRM queue %s",
					  PQUEMGR->GRMQueueCurCapacity,
					  PQUEMGR->GRMQueueCapacity,
					  PQUEMGR->GRMQueueResourceTight ? "busy" : "not busy");

			/* Calculate how many GRM containers should be returned. */
			setForcedReturnGRMContainerCount();
		}

		/* STEP 11. Dispatch resource to queries and send the messages out.*/

        if ( PRESPOOL->Segments.NodeCount > 0 && PQUEMGR->RatioCount > 0 &&
			 PQUEMGR->toRunQueryDispatch &&
			 PQUEMGR->ForcedReturnGRMContainerCount == 0 &&
			 PRESPOOL->SlavesHostCount > 0 )
        {
    		dispatchResourceToQueries();
        }
        else if ( PQUEMGR->ForcedReturnGRMContainerCount > 0 )
        {
        	forceReturnGRMResourceToRB();
        }
        else
        {
        	elog(DEBUG3, "Loop dummy. %d, %d, %d, %d",
        				 PRESPOOL->Segments.NodeCount,
						 PQUEMGR->RatioCount,
						 PQUEMGR->toRunQueryDispatch ? 1 : 0,
						 PQUEMGR->ForcedReturnGRMContainerCount);
        }

        /* STEP 12. Generate output content to client connections. */
        sendResponseToClients();

        /*
         * STEP 13. Return containers to global resource manager if there some
         * 			some idle resource.
         */
        timeoutIdleGRMResourceToRB();

		/* STEP 14. Notify segments to increase resource quota. */
		notifyToBeAcceptedGRMContainersToRMSEG();

        /* STEP 15. Notify segments to decrease resource. */
        notifyToBeKickedGRMContainersToRMSEG();

        /*
         * STEP 16. Check slaves file if the content is not checked or is
         * 			updated.
         */
        checkSlavesFile();
	}

	elog(LOG, "Resource manager main event handler exits.");

	return res;
}

/**
 * Parse passed in command line arguments. The acceptable argument keys and
 * values are all listed below which are locally used in parseCommandLine().
 */

/* -role + rolename */
#define HAWQDRM_COMMANDLINE_ROLE				"-role"

#define HAWQDRM_COMMANDLINE_ROLE_VALUE_SERVER   "server"
#define HAWQDRM_COMMANDLINE_ROLE_VALUE_SEGMENT  "segment"

int parseCommandLine(int argc, char **argv)
{
	int 		res = FUNC_RETURN_OK;
	SimpString 	value;

	initSimpleString(&value, PCONTEXT);
	for ( int i = 0 ; i < argc ; ++i ) {
		if ( strcmp(argv[i], HAWQDRM_COMMANDLINE_ROLE) == 0 ) {
			res = getStringValue(argc, argv, i+1, &value);
			if ( res != FUNC_RETURN_OK ) {
				elog( WARNING,
					  "Wrong command line argument %s behind %s.",
					  value.Str,
					  HAWQDRM_COMMANDLINE_ROLE);
				goto exit;
			}

			if ( SimpleStringComp(&value,
					 	 	 	  HAWQDRM_COMMANDLINE_ROLE_VALUE_SERVER) == 0 ) {
				DRMGlobalInstance->Role = START_RM_ROLE_MASTER;
			}
			else if ( SimpleStringComp(&value,
					  	  	  	  	   HAWQDRM_COMMANDLINE_ROLE_VALUE_SEGMENT) == 0 ) {
				DRMGlobalInstance->Role = START_RM_ROLE_SEGMENT;
			}
			else {
				elog(WARNING, "Wrong argument value '%s' for %s",
							  value.Str, HAWQDRM_COMMANDLINE_ROLE);
				res = MAIN_WRONG_COMMANDLINE;
			}

			elog(DEBUG5, "HAWQ RM :: Command line sets RM role: %d\n",
						 DRMGlobalInstance->Role);
		}
	}

exit:
	freeSimpleStringContent(&value);
	return res;
}

int getStringValue(int argc, char **argv, int pos, SimpString *val)
{
	if ( pos >= argc )
		return MAIN_WRONG_COMMANDLINE;
	setSimpleStringNoLen(val, argv[pos]);
	return FUNC_RETURN_OK;
}

int createDRMInstance(void)
{
	int res = FUNC_RETURN_OK;

	/* Create dynamic resource manager global instance holding all data. */
    DRMGlobalInstance = NULL;
	DRMGlobalInstance = (DynRMGlobal)
						rm_palloc0(TopMemoryContext,
								   sizeof(struct DynRMGlobalData));
	Assert( DRMGlobalInstance != NULL );
	return res;
}

/*
 * Initialize the dynamic resource manager instance. Return 0 if succeed, other-
 * wise, -1 is returned. Resource manager's memory context is created as one
 * child context which is referenced by DRMGlobalInstance->Context. 
 *
 *         TopMemoryContext ( global instance is allocated here )
 *                 |
 *                 V
 *      Resource manager context ( all consequent objects are allocated here )
 *
 */
int createDRMMemoryContext(void)
{
	int res = FUNC_RETURN_OK;

	Assert( DRMGlobalInstance != NULL );

	/* Create dynamic resource manager memory context,the new memory context is
	   created as the child of top memory context. */
	DRMGlobalInstance->Context = NULL;

	DRMGlobalInstance->Context = AllocSetContextCreate(
                                            TopMemoryContext,
										  	DRMGLOBAL_MEMORY_CONTEXT_NAME,
											ALLOCSET_DEFAULT_MINSIZE,
											ALLOCSET_DEFAULT_INITSIZE,
											ALLOCSET_DEFAULT_MAXSIZE );
	Assert( PCONTEXT != NULL );

	return res;
}

int initializeDRMInstance(MCTYPE context)
{
	int res = FUNC_RETURN_OK;

	DRMGlobalInstance->ResourceQueueManager     = NULL;
	DRMGlobalInstance->ConnTrackManager			= NULL;
	DRMGlobalInstance->ResourcePoolInstance		= NULL;
	DRMGlobalInstance->ImpType					= NONE_HAWQ2;

	DRMGlobalInstance->LocalHostLastUpdateTime	= 0;
	DRMGlobalInstance->HeartBeatLastSentTime    = 0;
	DRMGlobalInstance->TmpDirLastCheckTime      = 0;
	DRMGlobalInstance->LocalHostStat			= NULL;
	
    initializeDQueue(&(DRMGlobalInstance->LocalHostTempDirectories),   context);
    DRMGlobalInstance->LocalHostFailedTmpDirList = NULL;

    HASHCTL ctl;
    ctl.keysize                                 = sizeof(TmpDirKey);
    ctl.entrysize                               = sizeof(TmpDirEntry);
    ctl.hcxt                                    = context;

	/* Tell the working threads keep running. */
	DRMGlobalInstance->ResManagerMainKeepRun 	= true;

	/* Whether send IMAlive message */
	DRMGlobalInstance->SendIMAlive              = true;

	/* Where to send the heart-beat. */
	DRMGlobalInstance->SendToStandby            = false;

	DRMGlobalInstance->ResBrokerAppTimeStamp    = 0;
	DRMGlobalInstance->ResBrokerTriggerCleanup  = false;

	/* Get local host name here to make all components able to use this info. */
	initSimpleString(&(DRMGlobalInstance->SocketLocalHostName), 	   context);
	res = getLocalHostName(&(DRMGlobalInstance->SocketLocalHostName));
	if ( res != FUNC_RETURN_OK ) {
		elog(WARNING, "Fail to get local host name.");
	}

	/* Set resource manager server startup time to 0, i.e. not started yet. */
	DRMGlobalInstance->ResourceManagerStartTime = 0;

	return res;
}

int initializeDRMInstanceForQD(void)
{
	int res = FUNC_RETURN_OK;

	/* Get local host name here to make all components able to use this info. */
	initSimpleString(&(DRMGlobalInstance->SocketLocalHostName),
			         DRMGlobalInstance->Context);
	res = getLocalHostName(&(DRMGlobalInstance->SocketLocalHostName));
	if ( res != FUNC_RETURN_OK ) {
		elog(WARNING, "Fail to get local host name.");
	}
	return res;
}

#define DRMQE2RM_MEMORY_CONTEXT_NAME  "QE to RM communication"

bool 		  QE2RM_Initialized = false;
MemoryContext QE2RM_CommContext	= NULL;

void initializeDRMInstanceForQE(void)
{
	if ( QE2RM_Initialized )
		return;

	int res = FUNC_RETURN_OK;

    /* create dynamic resource manager instance to contain config data. */
    res = createDRMInstance();
    if ( res != FUNC_RETURN_OK ) {
    	elog(ERROR, "Fail to initialize data structure for communicating with "
                	"resource manager.");
    }

    MEMORY_CONTEXT_SWITCH_TO(TopMemoryContext)
    QE2RM_CommContext = AllocSetContextCreate( CurrentMemoryContext,
            DRMQE2RM_MEMORY_CONTEXT_NAME,
            ALLOCSET_DEFAULT_MINSIZE,
            ALLOCSET_DEFAULT_INITSIZE,
            ALLOCSET_DEFAULT_MAXSIZE );
    Assert( QE2RM_CommContext != NULL );
    MEMORY_CONTEXT_SWITCH_BACK

	DRMGlobalInstance->Context = QE2RM_CommContext;

	/* Get local host name here to make all components able to use this info. */
	initSimpleString(&(DRMGlobalInstance->SocketLocalHostName),
			         DRMGlobalInstance->Context);
	res = getLocalHostName(&(DRMGlobalInstance->SocketLocalHostName));
	if ( res != FUNC_RETURN_OK ) {
		elog(WARNING, "Fail to get local host name.");
	}

	/****** Resource enforcement GUCs begins ******/

	/* Get resource enforcement enablement flag */
	DRMGlobalInstance->ResourceEnforcerCpuEnable = rm_enforce_cpu_enable;

	/* Get resource enforcement CGroup mount point */
	initSimpleString(&(DRMGlobalInstance->ResourceEnforcerCgroupMountPoint),
			 PCONTEXT);
	setSimpleStringNoLen(&(DRMGlobalInstance->ResourceEnforcerCgroupMountPoint),
			     rm_enforce_cgrp_mnt_pnt);

	/* Get resource enforcement CGroup hierarchy name */
	initSimpleString(&(DRMGlobalInstance->ResourceEnforcerCgroupHierarchyName),
			 PCONTEXT);
	setSimpleStringNoLen(&(DRMGlobalInstance->ResourceEnforcerCgroupHierarchyName),
			     rm_enforce_cgrp_hier_name);

	/* Get resource enforcement mapping between hawq weight and yarn weight */
	DRMGlobalInstance->ResourceEnforcerCpuWeight = rm_enforce_cpu_weight;

	/* Get resource enforcement vcore-pcore ratio */
	DRMGlobalInstance->ResourceEnforcerVcorePcoreRatio = rm_enforce_core_vpratio;

	/* Get resource enforcement CGroup cleanup period */
	DRMGlobalInstance->ResourceEnforcerCleanupPeriod = rm_enforce_cleanup_period;

	/****** Resource enforcement GUCs ends ******/

    QE2RM_Initialized = true;
}

/**
 * Initialize resource manager core structure to get ready for services.
 */
void initializeDRMCore(void)
{
	/* Initialize resource queue management part. */
	DRMGlobalInstance->ResourceQueueManager =
			(DynResourceQueueManager)
			rm_palloc0(PCONTEXT, sizeof(DynResourceQueueManagerData));
	initializeResourceQueueManager();

	/* Initialize connection track manager part. */
	DRMGlobalInstance->ConnTrackManager =
			(ConnectionTrackManager)
			rm_palloc0(PCONTEXT, sizeof(ConnectionTrackManagerData));
	initializeConnectionTrackManager();

	/* Initialize node resource management part. */
	PRESPOOL = (ResourcePool) rm_palloc0(PCONTEXT, sizeof(ResourcePoolData));
	initializeResourcePoolManager();
}

static void InitTemporaryDirs(DQueue tmpdirs_list, char *tmpdirs_string)
{
    int        templocation = -1;
	SimpString tmpdirs;
	initSimpleString(&tmpdirs, PCONTEXT);
	setSimpleStringNoLen(&tmpdirs, tmpdirs_string);
	while (FUNC_RETURN_OK == SimpleStringLocateChar(&tmpdirs, ',', &templocation))
	{
		SimpStringPtr tempdir = createSimpleString(PCONTEXT);
		SimpleStringSubstring(&tmpdirs, 0, templocation, tempdir);
		insertDQueueTailNode(tmpdirs_list, tempdir);

		SimpString tmpstr;
		initSimpleString(&tmpstr, PCONTEXT);
		SimpleStringSubstring(&tmpdirs, templocation+1, -1, &tmpstr);
		freeSimpleStringContent(&tmpdirs);
		SimpleStringCopy(&tmpdirs, &tmpstr);
		freeSimpleStringContent(&tmpstr);
	}

	SimpStringPtr tempdirlast = createSimpleString(PCONTEXT);
	setSimpleStringWithContent(tempdirlast, tmpdirs.Str, tmpdirs.Len);
	insertDQueueTailNode(tmpdirs_list, tempdirlast);
	freeSimpleStringContent(&tmpdirs);
}


/**
 * Read loaded config properties and fill the verified values into global
 * instance.
 */
int  loadDynamicResourceManagerConfigure(void)
{
	elog(DEBUG3, "Resource manager loads Socket Listening Port %d",
				 rm_master_port);
	elog(DEBUG3, "Resource manager loads Segment Socket Listening Port %d",
				 rm_segment_port);

	/* Decide global resource manager mode. */
	if ( strcasecmp(rm_global_rm_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_YARN) == 0 )
	{
		DRMGlobalInstance->ImpType = YARN_LIBYARN;
	}
	else if ( strcasecmp(rm_global_rm_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_NONE) == 0 )
	{
		DRMGlobalInstance->ImpType = NONE_HAWQ2;
	}
	else
	{
		elog(WARNING, "Wrong global resource manager type set in %s.",
				  	  HAWQDRM_CONFFILE_SERVER_TYPE);
		return MAIN_CONF_UNSET_ROLE;
	}
	elog(DEBUG3, "Resource manager loads resource broker implement mode : %d",
				 DRMGlobalInstance->ImpType);

	SimpString segmem;
	if ( rm_seg_memory_use[0] == '\0' )
	{
		elog(WARNING, "%s is not set", HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
		return MAIN_CONF_UNSET_SEGMENT_MEMORY_USE;
	}

	setSimpleStringRefNoLen(&segmem, rm_seg_memory_use);
	int res = SimpleStringToStorageSizeMB(&segmem,
										  &(DRMGlobalInstance->SegmentMemoryMB));
	if ( res != FUNC_RETURN_OK)
	{
		elog(WARNING, "Can not understand the value '%s' of property %s.",
				  	  rm_seg_memory_use,
					  HAWQDRM_CONFFILE_LIMIT_MEMORY_USE);
		return MAIN_CONF_UNSET_SEGMENT_MEMORY_USE;
	}

	DRMGlobalInstance->SegmentCore = rm_seg_core_use;

	elog(DEBUG3, "HAWQ RM :: Accepted NONE mode resource management setting, "
				 "each host has (%d MB,%lf) resource capacity.\n",
				 DRMGlobalInstance->SegmentMemoryMB,
				 DRMGlobalInstance->SegmentCore);

    // For temporary directories
    InitTemporaryDirs(&DRMGlobalInstance->LocalHostTempDirectories, rm_seg_tmp_dirs);

	DQUEUE_LOOP_BEGIN(&DRMGlobalInstance->LocalHostTempDirectories, iter, SimpStringPtr, value)
		elog(LOG, "HAWQ Segment RM :: Temporary directory %s", value->Str);
	DQUEUE_LOOP_END

	checkAndBuildFailedTmpDirList();

	/****** Resource enforcement GUCs begins ******/

	/* Get resource enforcement enablement flag */
	DRMGlobalInstance->ResourceEnforcerCpuEnable = rm_enforce_cpu_enable;

	/* Get resource enforcement CGroup mount point */
	initSimpleString(&(DRMGlobalInstance->ResourceEnforcerCgroupMountPoint),
			 PCONTEXT);
	setSimpleStringNoLen(&(DRMGlobalInstance->ResourceEnforcerCgroupMountPoint),
			     rm_enforce_cgrp_mnt_pnt);

	/* Get resource enforcement CGroup hierarchy name */
	initSimpleString(&(DRMGlobalInstance->ResourceEnforcerCgroupHierarchyName),
			 PCONTEXT);
	setSimpleStringNoLen(&(DRMGlobalInstance->ResourceEnforcerCgroupHierarchyName),
			     rm_enforce_cgrp_hier_name);

	/* Get resource enforcement mapping between hawq weight and yarn weight */
	DRMGlobalInstance->ResourceEnforcerCpuWeight = rm_enforce_cpu_weight;

	/* Get resource enforcement vcore-pcore ratio */
	DRMGlobalInstance->ResourceEnforcerVcorePcoreRatio = rm_enforce_core_vpratio;

	/* Get resource enforcement CGroup cleanup period */
	DRMGlobalInstance->ResourceEnforcerCleanupPeriod = rm_enforce_cleanup_period;

	/****** Resource enforcement GUCs ends ******/
    return FUNC_RETURN_OK;
}

/*
 * Load resource queue and user information into HAWQ RM memory. If independent
 * mode, the definition is from xml file saving a list of properties. If normal
 * forked RM process, this can be loaded from catalog table: pg_authid and
 * pg_resqueue.
 *
 */
int  loadAllQueueAndUser(void)
{
	int   res 		 = FUNC_RETURN_OK;
	List *queueprops = NULL;			/* Save all queue properties.    */
	List *userprops  = NULL;			/* Save all role/user properties.*/

	res = loadUserPropertiesFromCatalog(&userprops);
	if ( res != FUNC_RETURN_OK )
	{
		goto exit;
	}

	res = loadQueuePropertiesFromCatalog(&queueprops);
	if ( res != FUNC_RETURN_OK )
	{
		goto exit;
	}

	/* Add resource queue and user into resource pool data structure. */
	res = addResourceQueueAndUserFromProperties(queueprops, userprops);
	/* res is checked before returning. */
	Assert(PQUEMGR->RootTrack != NULL);
	Assert(PQUEMGR->DefaultTrack != NULL);

exit:
	cleanPropertyList(PCONTEXT, &queueprops);
	cleanPropertyList(PCONTEXT, &userprops);

	if ( res != FUNC_RETURN_OK )
	{
		elog( LOG, "Fail to load queue and user definition.");
	}
	return res;
}

/*****************************************************************************
 * Load user information from catalog table pg_authid.
 *****************************************************************************/
int	 loadUserPropertiesFromCatalog(List **users)
{
	int			libpqres 		= CONNECTION_OK;
	int 		ret 	 		= FUNC_RETURN_OK;
	PGconn 	   *conn 	 		= NULL;
    static char conninfo[1024];
	PQExpBuffer sql 	 		= NULL;
	PGresult   *result 	 		= NULL;
	int 		ntups 	 		= 0;
	int 		i_oid 	 		= 0,
				i_rolname 		= 0,
				i_rolsuper 		= 0,
				i_rolresqueue 	= 0;

	Oid 		roloid 			= 0,
				rolresqueueOID  = 0;

	char 	   *rolname 		= NULL,
			   *rolresqueue 	= NULL;
	bool 		rolsuper 		= false;
	int8_t 		DummyPriority 	= 3;

	sprintf(conninfo, "options='-c gp_session_role=UTILITY' "
					  "dbname=template1 port=%d connect_timeout=%d",
					  master_addr_port,
					  LIBPQ_CONNECT_TIMEOUT);

	conn = PQconnectdb(conninfo);
	if ((libpqres = PQstatus(conn)) != CONNECTION_OK)
	{
		elog(WARNING, "Resource manager failed to connect database when loading"
					  "role specifications from pg_authid, error code %d, "
					  "reason: %s",
					  libpqres,
					  PQerrorMessage(conn));
		PQfinish(conn);
		return LIBPQ_FAIL_EXECUTE;
	}

	sql = createPQExpBuffer();
	if ( sql == NULL )
	{
		elog(WARNING, "Resource manager failed to allocate buffer for building "
					  "sql statement.");
		goto cleanup;
	}
	appendPQExpBuffer(sql, "SELECT oid,rolname,rolsuper,rolresqueue FROM pg_authid");
	result = PQexec(conn, sql->data);
	if (!result || PQresultStatus(result) != PGRES_TUPLES_OK)
	{
		elog(WARNING, "Resource manager failed to run SQL: %s "
					  "when loading role specifications from pg_authid, "
					  "reason : %s",
					  sql->data,
					  PQresultErrorMessage(result));
		ret = LIBPQ_FAIL_EXECUTE;
		goto cleanup;
	}

	ntups 		  = PQntuples(result);
	i_oid 		  = PQfnumber(result, "oid");
	i_rolname 	  = PQfnumber(result, "rolname");
	i_rolsuper 	  = PQfnumber(result, "rolsuper");
	i_rolresqueue = PQfnumber(result, "rolresqueue");

	for (int i = 0; i < ntups; i++)
	{
	    roloid 		= (Oid)strtoul(PQgetvalue(result, i, i_oid), NULL, 10);
	    rolname 	= PQgetvalue(result, i, i_rolname);
	    rolresqueue = PQgetvalue(result, i, i_rolresqueue);

	    if (rolresqueue == NULL || strlen(rolresqueue) == 0)
	    {
	    	rolresqueueOID = InvalidOid;
	    }
	    else
	    {
	    	rolresqueueOID = (Oid)strtoul(PQgetvalue(result, i, i_rolresqueue), NULL, 10);
	    }

	    if (PQgetvalue(result, i, i_rolsuper)[0] == 't')
	    {
			rolsuper = true;
	    }
	    else
	    {
	    	rolsuper = false;
	    }

	    MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	    *users = lappend(*users,
	    				 createPropertyOID(
	    					 PCONTEXT,
							 "user",
							 getUSRTBLAttributeName(USR_TBL_ATTR_OID),
							 &i,
							 roloid));
	    *users = lappend(*users,
	    				 createPropertyName(
	    					 PCONTEXT,
							 "user",
							 getUSRTBLAttributeName(USR_TBL_ATTR_NAME),
							 &i,
							 (Name)rolname));
	    *users = lappend(*users,
	    				 createPropertyOID(
	    					 PCONTEXT,
							 "user",
							 getUSRTBLAttributeName(USR_TBL_ATTR_TARGET_QUEUE),
							 &i,
							 rolresqueueOID));
	    *users = lappend(*users,
	    				 createPropertyBool(
	    					 PCONTEXT,
							 "user",
							 getUSRTBLAttributeName(USR_TBL_ATTR_IS_SUPERUSER),
							 &i,
							 rolsuper));
	    *users = lappend(*users,
	    				 createPropertyInt8(
	    					 PCONTEXT,
							 "user",
							 getUSRTBLAttributeName(USR_TBL_ATTR_PRIORITY),
							 &i,
							 DummyPriority));
	    MEMORY_CONTEXT_SWITCH_BACK
	}

	ListCell *cell = NULL;
	foreach(cell, *users)
	{
		KVProperty property = lfirst(cell);
		elog(RMLOG, "Resource manager loaded role specifications from pg_authid : "
				    "[%s]=[%s]",
					property->Key.Str,
					property->Val.Str);
	}
	elog(LOG, "Resource manager successfully loaded role specifications.");

cleanup:
	if(sql != NULL)
	{
		destroyPQExpBuffer(sql);
	}
	if(result != NULL)
	{
		PQclear(result);
	}
	PQfinish(conn);
	return ret;
}

/*****************************************************************************
 * Load resource queue information from catalog table pg_resqueue.
 *****************************************************************************/
int	 loadQueuePropertiesFromCatalog(List **queues)
{
	int		 	 libpqres 			= CONNECTION_OK;
	int 	 	 ret 	 			= FUNC_RETURN_OK;
	PGconn 		*conn 				= NULL;
	static char  conninfo[1024];
	PQExpBuffer  sql 				= NULL;
	PGresult* result 				= NULL;
	int ntups 						= 0;
	int i_oid 						= 0,
		i_name 						= 0,
		i_parent 					= 0,
		i_active_stats_cluster  	= 0,
		i_memory_limit_cluster 		= 0,
		i_core_limit_cluster 		= 0,
		i_resource_overcommit 		= 0,
		i_allocation_policy 		= 0,
		i_vseg_resource_quota 		= 0,
		i_nvseg_upper_limit			= 0,
		i_nvseg_lower_limit			= 0,
		i_nvseg_upper_limit_perseg 	= 0,
		i_nvseg_lower_limit_perseg 	= 0,
		i_creation_time 			= 0,
		i_update_time 				= 0,
		i_status 					= 0;

	Oid oid 						= 0,
		parentoid 					= 0;

	char *name 						= NULL,
		 *parent 					= NULL,
		 *memory_limit_cluster 		= NULL,
		 *core_limit_cluster 		= NULL,
		 *allocation_policy 		= NULL,
		 *vseg_resource_quota 		= NULL,
		 *status 					= NULL;

	int active_stats_cluster 		= 0,
		nvseg_upper_limit			= 0,
		nvseg_lower_limit			= 0;

	float nvseg_upper_limit_perseg	= 0.0,
		  nvseg_lower_limit_perseg	= 0.0,
		  resource_overcommit		= 0.0;

	int64 creation_time 			= 0,
		  update_time 				= 0;

	snprintf(conninfo, sizeof(conninfo),
			 "options='-c gp_session_role=UTILITY' "
			 "dbname=template1 port=%d connect_timeout=%d",
			 master_addr_port,
			 LIBPQ_CONNECT_TIMEOUT);

	conn = PQconnectdb(conninfo);
	if ((libpqres = PQstatus(conn)) != CONNECTION_OK)
	{
		elog(WARNING, "Resource manager failed to connect database when loading "
					  "resource queue specifications from pg_resqueue, "
					  "error code: %d, reason: %s",
					  libpqres,
					  PQerrorMessage(conn));
		PQfinish(conn);
		return LIBPQ_FAIL_EXECUTE;
	}

	sql = createPQExpBuffer();
	if ( sql == NULL )
	{
		elog(WARNING, "Resource manager failed to allocate buffer for building "
					  "sql statement.");
		goto cleanup;
	}

	appendPQExpBuffer(sql,"SELECT oid,"
								 "rsqname,"
								 "parentoid,"
								 "activestats,"
								 "memorylimit, "
								 "corelimit, "
								 "resovercommit,"
								 "allocpolicy, "
								 "vsegresourcequota, "
								 "nvsegupperlimit, "
								 "nvseglowerlimit, "
								 "nvsegupperlimitperseg, "
								 "nvseglowerlimitperseg, "
								 "creationtime, "
								 "updatetime, "
								 "status "
								 "FROM pg_resqueue");
	result = PQexec(conn, sql->data);

	if (!result || PQresultStatus(result) != PGRES_TUPLES_OK)
	{
		elog(WARNING, "Resource manager failed to run SQL: %s when loading "
					  "resource queue specifications from pg_resqueue, "
					  "reason : %s",
					  sql->data,
					  PQresultErrorMessage(result));
		ret = LIBPQ_FAIL_EXECUTE;
		goto cleanup;
	}

	ntups = PQntuples(result);

	i_oid 						= PQfnumber(result, PG_RESQUEUE_COL_OID);
	i_name 						= PQfnumber(result, PG_RESQUEUE_COL_RSQNAME);
	i_parent 					= PQfnumber(result, PG_RESQUEUE_COL_PARENTOID);
	i_active_stats_cluster 		= PQfnumber(result, PG_RESQUEUE_COL_ACTIVESTATS);
	i_memory_limit_cluster 		= PQfnumber(result, PG_RESQUEUE_COL_MEMORYLIMIT);
	i_core_limit_cluster  		= PQfnumber(result, PG_RESQUEUE_COL_CORELIMIT);
	i_resource_overcommit 		= PQfnumber(result, PG_RESQUEUE_COL_RESOVERCOMMIT);
	i_allocation_policy 		= PQfnumber(result, PG_RESQUEUE_COL_ALLOCPOLICY);
	i_vseg_resource_quota 		= PQfnumber(result, PG_RESQUEUE_COL_VSEGRESOURCEQUOTA);
	i_nvseg_upper_limit			= PQfnumber(result, PG_RESQUEUE_COL_NVSEGUPPERLIMIT);
	i_nvseg_lower_limit			= PQfnumber(result, PG_RESQUEUE_COL_NVSEGLOWERLIMIT);
	i_nvseg_upper_limit_perseg	= PQfnumber(result, PG_RESQUEUE_COL_NVSEGUPPERLIMITPERSEG);
	i_nvseg_lower_limit_perseg	= PQfnumber(result, PG_RESQUEUE_COL_NVSEGLOWERLIMITPERSEG);
	i_creation_time 			= PQfnumber(result, PG_RESQUEUE_COL_CREATIONTIME);
	i_update_time 				= PQfnumber(result, PG_RESQUEUE_COL_UPDATETIME);
	i_status 					= PQfnumber(result, PG_RESQUEUE_COL_STATUS);

	for (int i = 0; i < ntups; i++)
	{
	    oid 					  = (Oid)strtoul(PQgetvalue(result, i, i_oid), NULL, 10);
	    name 				  	  = 			 PQgetvalue(result, i, i_name);
	    parent 					  = 			 PQgetvalue(result, i, i_parent);
	    if (parent == NULL || strlen(parent) == 0)
	    {
	    	parentoid = InvalidOid;
	    }
	    else
	    {
	    	parentoid = (Oid)strtoul(parent, NULL, 10);
	    }

	    parentoid 			     = (Oid)strtoul(PQgetvalue(result, i, i_parent), NULL, 10);
	    active_stats_cluster     = 		atoi(PQgetvalue(result, i, i_active_stats_cluster));
	    memory_limit_cluster     = 			 PQgetvalue(result, i, i_memory_limit_cluster);
	    core_limit_cluster 	     = 			 PQgetvalue(result, i, i_core_limit_cluster);
	    resource_overcommit      = 		atof(PQgetvalue(result, i, i_resource_overcommit));
	    allocation_policy 	     = 			 PQgetvalue(result, i, i_allocation_policy);
	    vseg_resource_quota      = 			 PQgetvalue(result, i, i_vseg_resource_quota);
	    nvseg_upper_limit	     = 		atoi(PQgetvalue(result, i, i_nvseg_upper_limit));
	    nvseg_lower_limit	     = 		atoi(PQgetvalue(result, i, i_nvseg_lower_limit));
	    nvseg_upper_limit_perseg = 		atof(PQgetvalue(result, i, i_nvseg_upper_limit_perseg));
	    nvseg_lower_limit_perseg = 		atof(PQgetvalue(result, i, i_nvseg_lower_limit_perseg));
	    creation_time 		     = 		atol(PQgetvalue(result, i, i_creation_time));
	    update_time 		     = 		atol(PQgetvalue(result, i, i_update_time));
	    status 				     = 			 PQgetvalue(result, i, i_status);

	    MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
	    *queues = lappend(*queues,
	    				  createPropertyOID(
	    					  PCONTEXT,
							  "queue",
							  getRSQTBLAttributeName(RSQ_TBL_ATTR_OID),
							  &i,
							  oid));

	    *queues = lappend(*queues,
	    				  createPropertyName(
	    					  PCONTEXT,
							  "queue",
							  getRSQTBLAttributeName(RSQ_TBL_ATTR_NAME),
							  &i,
							  (Name)name));

	    *queues = lappend(*queues,
	    				  createPropertyOID(
	    					  PCONTEXT,
	    					  "queue",
							  getRSQTBLAttributeName(RSQ_TBL_ATTR_PARENT),
							  &i,
							  parentoid));

	    *queues = lappend(*queues,
	    				  createPropertyInt32(
	    					  PCONTEXT,
	 						  "queue",
							  getRSQTBLAttributeName(RSQ_TBL_ATTR_ACTIVE_STATMENTS),
							  &i,
							  active_stats_cluster));

	    *queues = lappend(*queues,
	    				  createPropertyString(
	    					  PCONTEXT,
	 						  "queue",
	 						  getRSQTBLAttributeName(RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER),
	 						  &i,
	 						  memory_limit_cluster));

	    *queues = lappend(*queues,
	    				  createPropertyString(
	    					  PCONTEXT,
	 						  "queue",
	 						  getRSQTBLAttributeName(RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER),
	 						  &i,
							  core_limit_cluster));

	    *queues = lappend(*queues,
	    				  createPropertyString(
	    					  PCONTEXT,
	 						  "queue",
	 						  getRSQTBLAttributeName(RSQ_TBL_ATTR_ALLOCATION_POLICY),
	 						  &i,
							  allocation_policy));

	    *queues = lappend(*queues,
	    				  createPropertyFloat(
	    					  PCONTEXT,
							  "queue",
	 						  getRSQTBLAttributeName(RSQ_TBL_ATTR_RESOURCE_OVERCOMMIT_FACTOR),
							  &i,
							  resource_overcommit));

	    *queues = lappend(*queues,
	    				  createPropertyString(
	    					  PCONTEXT,
							  "queue",
	 						  getRSQTBLAttributeName(RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA),
							  &i,
							  vseg_resource_quota));

	    *queues = lappend(*queues,
	    				  createPropertyInt32(
	    					  PCONTEXT,
	 						  "queue",
							  getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT),
							  &i,
							  nvseg_upper_limit));

	    *queues = lappend(*queues,
	    				  createPropertyInt32(
	    					  PCONTEXT,
	 						  "queue",
							  getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT),
							  &i,
							  nvseg_lower_limit));

	    *queues = lappend(*queues,
	    				  createPropertyFloat(
	    					  PCONTEXT,
							  "queue",
	 						  getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG),
							  &i,
							  nvseg_upper_limit_perseg));

	    *queues = lappend(*queues,
	    				  createPropertyFloat(
	    					  PCONTEXT,
							  "queue",
	 						  getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT_PERSEG),
							  &i,
							  nvseg_lower_limit_perseg));

	    *queues = lappend(*queues,
	    				  createPropertyInt32(
	    					  PCONTEXT,
	 						  "queue",
							  getRSQTBLAttributeName(RSQ_TBL_ATTR_CREATION_TIME),
							  &i,
							  creation_time));

	    *queues = lappend(*queues,
	    				  createPropertyInt32(
	    					  PCONTEXT,
	 						  "queue",
							  getRSQTBLAttributeName(RSQ_TBL_ATTR_UPDATE_TIME),
							  &i,
							  update_time));

	    *queues = lappend(*queues,
	    				  createPropertyString(
	    					  PCONTEXT,
							  "queue",
	 						  getRSQTBLAttributeName(RSQ_TBL_ATTR_STATUS),
							  &i,
							  status));
		MEMORY_CONTEXT_SWITCH_BACK
	}

	ListCell *cell = NULL;
	foreach(cell, *queues)
	{
		KVProperty property = lfirst(cell);
		elog(RMLOG, "Resource manger loaded resource queue specifications from "
				  	"pg_resqueue : [%s]=[%s]",
					property->Key.Str,
					property->Val.Str);
	}
	elog(LOG, "Resource manger successfully loaded resource queue specifications");

cleanup:
	if(sql != NULL)
	{
		destroyPQExpBuffer(sql);
	}
	if(result != NULL)
	{
		PQclear(result);
	}
	PQfinish(conn);
	return ret;
}


void quitResManager(SIGNAL_ARGS)
{
	DRMGlobalInstance->ResManagerMainKeepRun = false;
}

void handleChildSignal(SIGNAL_ARGS)
{
	/* The resource broker process quits. */
	RB_handleSignalSIGCHLD();
}

void notifyPostmasterResManagerStarted(SIGNAL_ARGS)
{
	DRMStartup = true;
}

/*
 * Read properties and generate corresponding queue and user definitions.
 *
 * properties[in]	: list of KVPropertyData
 * queues[out]		: list of DynResourceQueue
 * users[out]		: list of UserInfo
 */
int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops)
{
	static char	  errorbuf[1024];
	int			  res			= FUNC_RETURN_OK;
	char	     *substrstart	= NULL;
	int			  substrlen		= 0;
	char	     *substr2start	= NULL;
	int 		  substr2len	= 0;
	List 	  	 *queueattrlist = NULL;
	List		 *userattrlist	= NULL;
	ListCell	 *cell			= NULL;

	/*
	 * STEP 1. Process properties and build resource queue and user structure.
	 */
	int   currentindex = -1;
	List *currentattrs = NULL;
	foreach(cell, queueprops)
	{
		KVProperty value = lfirst(cell);

		elog(RMLOG, "Loads queue property %s=%s", value->Key.Str, value->Val.Str);

		/* Split key string into (attribute, index) */
		if ( SimpleStringStartWith(&(value->Key), "queue.") != FUNC_RETURN_OK )
		{
			elog(RMLOG, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
			continue;
		}

		/*
		 *----------------------------------------------------------------------
		 * The key string is formatted as
		 *   queue.<queueattrstr>.<queueindexstr>=<value>
		 *    (0)       (1)            (2)
		 *----------------------------------------------------------------------
		 */
		int32_t		queueindex;
		SimpString	queueattrstr;
		SimpString	queueindexstr;

		/* Fill queueindex, queueattrstr */
		int parseres1 = PropertyKeySubstring(&(value->Key),
											 1,
											 &substrstart,
											 &substrlen);
		int parseres2 = PropertyKeySubstring(&(value->Key),
											 2,
											 &substr2start,
											 &substr2len);

		if( parseres1 != FUNC_RETURN_OK || parseres2 != FUNC_RETURN_OK )
		{
			Assert(false);
		}

		setSimpleStringRef(&queueindexstr, substr2start, substr2len);
		setSimpleStringRef(&queueattrstr, substrstart, substrlen);

		parseres1 = SimpleStringToInt32(&queueindexstr, &queueindex);
		if ( parseres1 != FUNC_RETURN_OK )
		{
			Assert(false);
		}

		/* Fill queue attribute to corresponding list. */

		KVProperty newprop = createPropertyEmpty(PCONTEXT);
		setSimpleStringWithContent(&(newprop->Key), substrstart, substrlen);
		setSimpleStringNoLen(&(newprop->Val), value->Val.Str);

		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
		if ( queueindex != currentindex )
		{
			if ( currentattrs != NULL )
			{
				queueattrlist = lappend(queueattrlist, currentattrs);
			}
			currentattrs = NULL;
			currentindex = queueindex;
		}

		elog(RMLOG, "Resource manager loaded attribute for creating queue %s=%s",
				    newprop->Key.Str,
					newprop->Val.Str);

		currentattrs = lappend(currentattrs, newprop);
		MEMORY_CONTEXT_SWITCH_BACK
	}

	if ( currentattrs != NULL )
	{
		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
		queueattrlist = lappend(queueattrlist, currentattrs);
		MEMORY_CONTEXT_SWITCH_BACK
	}

	currentindex = -1;
	currentattrs = NULL;

	foreach(cell, userprops)
	{
		KVProperty value = lfirst(cell);

		elog(RMLOG, "Loads user property %s=%s", value->Key.Str, value->Val.Str);

		/* Split key string into (attribute, index) */
		if ( SimpleStringStartWith(&(value->Key), "user.") != FUNC_RETURN_OK )
		{
			elog(RMLOG, "Ignore property %s=%s", value->Key.Str, value->Val.Str);
			continue;
		}

		/*
		 *----------------------------------------------------------------------
		 * The key string is formatted as
		 *   user.<userattrstr>.<userindexstr>=<value>
		 *    (0)      (1)            (2)
		 *----------------------------------------------------------------------
		 */
		int32_t					userindex;
		SimpString				userattrstr;
		SimpString  			userindexstr;

		/* Fill userindex, userattrstr */
		int parseres1 = PropertyKeySubstring(&(value->Key),
											 1,
											 &substrstart,
											 &substrlen);
		int parseres2 = PropertyKeySubstring(&(value->Key),
											 2,
											 &substr2start,
											 &substr2len);

		if ( parseres1 != FUNC_RETURN_OK || parseres2 != FUNC_RETURN_OK )
		{
			Assert(false);
		}

		setSimpleStringRef(&userindexstr, substr2start, substr2len);
		setSimpleStringRef(&userattrstr, substrstart, substrlen);

		parseres1 = SimpleStringToInt32(&userindexstr, &userindex);
		Assert(parseres1 == FUNC_RETURN_OK);

		/* Fill user attribute to corresponding list. */
		KVProperty newprop = createPropertyEmpty(PCONTEXT);
		setSimpleStringWithContent(&(newprop->Key), substrstart, substrlen);
		setSimpleStringNoLen(&(newprop->Val), value->Val.Str);

		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
		if ( userindex != currentindex )
		{
			if ( currentattrs != NULL )
			{
				userattrlist = lappend(userattrlist, currentattrs);
			}
			currentattrs = NULL;
			currentindex = userindex;
		}

		elog(RMLOG, "Resource manager loaded attribute for creating role %s=%s",
				    newprop->Key.Str,
					newprop->Val.Str);

		currentattrs = lappend(currentattrs, newprop);
		MEMORY_CONTEXT_SWITCH_BACK
	}

	if ( currentattrs != NULL )
	{
		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
		userattrlist = lappend(userattrlist, currentattrs);
		MEMORY_CONTEXT_SWITCH_BACK
	}

	/*
	 * STEP 2. Add resource queues.
	 */

	/* STEP 2.1. Build resource queue object with parsed properties set.     */
	List *rawrsqs = NULL;
	foreach(cell, queueattrlist)
	{
		List 	 *attrs = lfirst(cell);
		ListCell *cell2 = NULL;
		foreach(cell2, attrs)
		{
			KVProperty attrkv = lfirst(cell2);
			elog(RMLOG, "To parse : %s=%s", attrkv->Key.Str, attrkv->Val.Str);
		}

		DynResourceQueue newqueue = rm_palloc0(PCONTEXT,
		   	   	   	   	   	   	   			   sizeof(DynResourceQueueData));

		res = parseResourceQueueAttributes(attrs,
										   newqueue,
										   false,
										   true,
										   errorbuf,
										   sizeof(errorbuf));
		if ( res != FUNC_RETURN_OK )
		{
			rm_pfree(PCONTEXT, newqueue);
			elog(WARNING, "Resource manager can not create resource queue with its "
						  "attributes because %s",
						  errorbuf);
			continue;
		}

		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
		rawrsqs = lappend(rawrsqs, newqueue);
		MEMORY_CONTEXT_SWITCH_BACK
	}

	/*
	 * STEP 2.2. Reorder the resource queue sequence to ensure that every time
	 * the queue is added into the manager, its parent queue can be found.
	 */
	List *orderedrsqs = NULL;
	bool orderchanged = true;
	while( orderchanged )
	{
		DynResourceQueue  toreordrsq = NULL;
		ListCell 		 *cell 	     = NULL;
		ListCell 		 *prevcell   = NULL;
		foreach(cell, rawrsqs)
		{
			DynResourceQueue rawqueue = lfirst(cell);
			if ( rawqueue->ParentOID == InvalidOid )
			{
				toreordrsq = rawqueue;
				MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
				rawrsqs = list_delete_cell(rawrsqs, cell, prevcell);
				MEMORY_CONTEXT_SWITCH_BACK
				break;
			}

			ListCell *cell2 = NULL;
			foreach(cell2, orderedrsqs)
			{
				DynResourceQueue ordqueue = lfirst(cell2);
				if ( ordqueue->OID == rawqueue->ParentOID )
				{
					toreordrsq = rawqueue;
					MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
					rawrsqs = list_delete_cell(rawrsqs, cell, prevcell);
					MEMORY_CONTEXT_SWITCH_BACK
				}
			}

			if ( toreordrsq != NULL )
			{
				break;
			}
			prevcell = cell;
		}

		orderchanged = false;
		if ( toreordrsq != NULL )
		{
			elog(RMLOG, "Find one resource queue valid to continue loading %s.",
					    toreordrsq->Name);
			orderchanged = true;
			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
			orderedrsqs = lappend(orderedrsqs, toreordrsq);
			MEMORY_CONTEXT_SWITCH_BACK
		}
	}

	/*
	 * There is one possibility that we have some resource queues whose parent
	 * queue ids are not present in the pg_resqueue table or external configure
	 * file. In this case, these resource queues are logged out out as warnings,
	 * and HAWQ RM will ignore them.
	 *
	 * Through catalog, this should be impossible.
	 */
	if ( list_length(rawrsqs) > 0 )
	{
		elog(WARNING, "Invalid resource queues are detected due to no valid "
					  "parent resource queue id.");

		while( list_length(rawrsqs) > 0 )
		{
			DynResourceQueue rawqueue = lfirst(list_head(rawrsqs));
			MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
			rawrsqs = list_delete_first(rawrsqs);
			MEMORY_CONTEXT_SWITCH_BACK
			elog(WARNING, "Invalid resource queue : [%s]", rawqueue->Name);
			rm_pfree(PCONTEXT, rawqueue);
		}
	}

	/*
	 * STEP 2.3. List of DynResourceQueue, save partially filled attributes.
	 * Actually check and complete the resource queue definition and save it
	 * in the resource queue manager.
	 */
	while( list_length(orderedrsqs) > 0 )
	{
		DynResourceQueue partqueue = lfirst(list_head(orderedrsqs));

		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
		orderedrsqs = list_delete_first(orderedrsqs);
		MEMORY_CONTEXT_SWITCH_BACK

		elog(RMLOG, "Load queue %s.", partqueue->Name);

		res = checkAndCompleteNewResourceQueueAttributes(partqueue,
														 errorbuf,
														 sizeof(errorbuf));
		if ( res != FUNC_RETURN_OK )
		{
			elog(RMLOG, "res=%d error=%s, after check and complete queue %s.",
						res,
						errorbuf,
						partqueue->Name);

			rm_pfree(PCONTEXT, partqueue);
			elog(WARNING, "Resource manager can not complete resource queue's "
						  "attributes because %s",
						  errorbuf);
			continue;
		}

		elog(RMLOG, "Checked and completed queue %s.", partqueue->Name);

		DynResourceQueueTrack newtrack = NULL;
		res = createQueueAndTrack(partqueue, &newtrack, errorbuf, sizeof(errorbuf));

		if ( res != FUNC_RETURN_OK )
		{
			rm_pfree(PCONTEXT, partqueue);
			if ( newtrack != NULL )
			{
				rm_pfree(PCONTEXT, newtrack);
			}

			elog( WARNING, "Resource manager can not create resource queue %s "
						   "because %s",
						   partqueue->Name,
						   errorbuf);
			continue;
		}

		elog(RMLOG, "Created queue %s.", partqueue->Name);

		char buffer[1024];
		generateQueueReport(partqueue->OID, buffer, sizeof(buffer));
		elog(LOG, "Resource manager created resource queue instance : %s",
				  buffer);
	}

	/*
	 * STEP 3. Add users.
	 */
	while( list_length(userattrlist) > 0 )
	{
		List *attrs = lfirst(list_head(userattrlist));

		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
		userattrlist = list_delete_first(userattrlist);
		MEMORY_CONTEXT_SWITCH_BACK

		UserInfo newuser = rm_palloc0(PCONTEXT,
									  sizeof(UserInfoData));

		res = parseUserAttributes(attrs,newuser, errorbuf, sizeof(errorbuf));
		if ( res != FUNC_RETURN_OK )
		{
			elog(WARNING, "Can not create user with its attributes because %s",
						  errorbuf);
			rm_pfree(PCONTEXT, newuser);
			continue;
		}

		res = checkUserAttributes(newuser, errorbuf, sizeof(errorbuf));
		if ( res != FUNC_RETURN_OK )
		{
			elog(WARNING, "User is not valid because %s", errorbuf);
			rm_pfree(PCONTEXT, newuser);
			continue;
		}

		createUser(newuser);

		char buffer[256];
		generateUserReport(newuser->Name,
						   strlen(newuser->Name),
						   buffer,
						   sizeof(buffer));

		elog(LOG, "Resource manager created user. %s", buffer);
	}
	return res;
}

int generateAllocRequestToBroker(void)
{
	int res = FUNC_RETURN_OK;
	/*--------------------------------------------------------------------------
	 * This is a temporary restrict that HAWQ RM supports only one memory/core
	 * ratio in current version.
	 *--------------------------------------------------------------------------
	 */
	Assert( PQUEMGR->RatioCount == 1 );

	DynMemoryCoreRatioTrack mctrack = PQUEMGR->RatioTrackers[0];

	bool hasWorkload = mctrack->TotalUsed.MemoryMB +
					   mctrack->TotalRequest.MemoryMB > 0;
	if ( !hasWorkload )
	{
		/* Check if resource manager has workload recently. */
		if ( PQUEMGR->RatioCount == 1 &&
			 PQUEMGR->RatioWaterMarks[0].NodeCount > 0 )
		{
			DynMemoryCoreRatioWaterMark mark =
				(DynMemoryCoreRatioWaterMark)
				getDQueueHeadNodeData(&(PQUEMGR->RatioWaterMarks[0]));
			hasWorkload = mark->ClusterMemoryMB > 0 ;
		}
	}

	/* Decide water level of resource. */
	int wlevel = hasWorkload ? PQUEMGR->ActualMinGRMContainerPerSeg : 0;
	switch( DRMGlobalInstance->ImpType )
	{
	case YARN_LIBYARN:
		/* Do nothing. */
		break;
	case NONE_HAWQ2:
		/* We always expect all resource allocated. */
		wlevel = INT_MAX;
		break;
	default:
		Assert(false);
	}

	/*
	 * Go through each segment, decide how many containers should be allocated.
	 * And generate preferred host list based on minimum water level.
	 */
	int		  grmctnsize = 0;
	List	 *preferred	 = NULL;
	List 	 *ressegl	 = NULL;
	ListCell *cell		 = NULL;
	getAllPAIRRefIntoList(&(PRESPOOL->Segments), &ressegl);

	foreach(cell, ressegl)
	{
		PAIR pair = (PAIR)lfirst(cell);
		SegResource segres = (SegResource)(pair->Value);

		/*
		 * Resource manager skips this segment if
		 * 1) Not FTS available;
		 * 2) Having resource decrease pending.
		 */
		if (!IS_SEGSTAT_FTSAVAILABLE(segres->Stat) ||
			(segres->DecPending.MemoryMB > 0 && segres->DecPending.Core > 0))
		{
			continue;
		}

		/* Get segment resource capacity. */
		uint32_t memcap  = getSegResourceCapacityMemory(segres);
		uint32_t corecap = getSegResourceCapacityCore(segres);

		/*
		 * The segment capacity mem/core ratio maybe not equal to the cluster
		 * mem/core ratio.
		 */
		int ctnsize = memcap / mctrack->MemCoreRatio;
		ctnsize = ctnsize < corecap ? ctnsize : corecap;

		/* Follow the water level calculated. */
		ctnsize = ctnsize < wlevel ? ctnsize : wlevel;

		elog(RMLOG, "Host %s has %d GRM containers, (%d MB, %lf CORE) increase pending.",
					 GET_SEGRESOURCE_HOSTNAME(segres),
					 (segres->ContainerSets[0] == NULL ?
					  0 :
					  list_length(segres->ContainerSets[0]->Containers)),
					 segres->IncPending.MemoryMB,
					 segres->IncPending.Core);

		int ctnsizeneed = ctnsize -
						  (segres->ContainerSets[0] == NULL ?
						   0 :
						   list_length(segres->ContainerSets[0]->Containers)) -
						  segres->IncPending.MemoryMB / mctrack->MemCoreRatio;
		if ( ctnsizeneed <= 0 )
		{
			/*
			 * If the segment contains more resource than expected resource
			 * water level.
			 */
			continue;
		}

		/* Build preferred host information. */
		PAIR newpair = rm_palloc0(PCONTEXT, sizeof(PAIRData));
		newpair->Key = segres;
		ResourceBundle resource = rm_palloc0(PCONTEXT, sizeof(ResourceBundleData));
		resource->MemoryMB = mctrack->MemCoreRatio * (ctnsizeneed);
		resource->Core = ctnsizeneed;
		newpair->Value = resource;
		preferred = lappend(preferred, newpair);

		elog(DEBUG3, "Expect resource from resource broker with "
				     "preferred host, hostname:%s, container number:%lf.",
				     GET_SEGRESOURCE_HOSTNAME(segres),
				     resource->Core);


		grmctnsize += ctnsizeneed;
	}
	freePAIRRefList(&(PRESPOOL->Segments), &ressegl);

	elog(RMLOG, "Resource manager needs minimum %d GRM containers.", grmctnsize);

	/* Decide how much resource to acquire from global resource manager. */
	int32_t reqmem  = 0;
	int32_t reqcore = ceil(mctrack->TotalRequest.Core   +
			  	   	       mctrack->TotalUsed.Core      -
						   mctrack->TotalAllocated.Core -
						   mctrack->TotalPending.Core);

	elog(RMLOG, "Memory Core Track has %lf requested, %lf used, %lf allocated, "
				 "%lf pending.",
				 mctrack->TotalRequest.Core,
				 mctrack->TotalUsed.Core,
				 mctrack->TotalAllocated.Core,
				 mctrack->TotalPending.Core);
	/*
	 * If after checking water level of segment resource, we expect more resource
	 * containers, we expect more as well.
	 */
	reqcore = reqcore < grmctnsize - mctrack->TotalPending.Core ?
			  grmctnsize - mctrack->TotalPending.Core :
			  reqcore;
	reqmem = reqcore * mctrack->MemCoreRatio;

	elog(RMLOG, "Resource manager now needs %d GRM containers.", reqcore);

	/*
	 * Check if should raise water level to deal with resource fragment or
	 * resource uneven problems. We trigger this logic only when no resource
	 * request caused by lack of resource, and no pending resource are waited
	 * for.
	 */
	if ( reqcore <= 0 &&
		 mctrack->TotalPending.Core <= 0 &&
		 (PQUEMGR->hasResourceProblem[RESPROBLEM_FRAGMENT] ||
		  PQUEMGR->hasResourceProblem[RESPROBLEM_UNEVEN]   ||
		  PQUEMGR->hasResourceProblem[RESPROBLEM_TOOFEWSEG]) )
	{
		/* Check if it is possible to raise water level. */
		if ( mctrack->TotalAllocated.Core + 1 <=
			 PRESPOOL->GRMTotal.Core * PQUEMGR->GRMQueueMaxCapacity )
		{
			/*
			 * We only add one more GRM container to acquire, this will trigger
			 * the following logic to raise the water level.
			 */
			reqcore = 1;
			reqmem = reqcore * mctrack->MemCoreRatio;

			PQUEMGR->hasResourceProblem[RESPROBLEM_FRAGMENT]  = false;
			PQUEMGR->hasResourceProblem[RESPROBLEM_UNEVEN]    = false;
			PQUEMGR->hasResourceProblem[RESPROBLEM_TOOFEWSEG] = false;

			elog(LOG, "Resource manager raises segment resource water level.");
		}
	}

	/* Call resource broker to request resource. */
	if ( reqmem > 0 && reqcore > 0 )
	{
		/*
		 * Here we know that we have to allocate more resource from GRM, we should
		 * check again to enrich the preferred host list for new locality data
		 * of expected additional GRM containers.
		 *
		 * The expected size of GRM containers may vary because we expect GRM
		 * to have even number of GRM containers in each available segments.
		 */
		completeAllocRequestToBroker(&reqmem, &reqcore, &preferred);

		elog(RMLOG, "Resource manager needs %d GRM containers after adjusting "
					"overall water level.",
					reqcore);

		if ( reqcore > 0 )
		{
			PRESPOOL->LastResAcqTime = gettime_microsec();

			addResourceBundleData(&(mctrack->TotalPending), reqmem, reqcore);
			uint64_t oldtime = mctrack->TotalPendingStartTime;
			if ( mctrack->TotalPendingStartTime == 0 )
			{
				mctrack->TotalPendingStartTime = gettime_microsec();
				elog(DEBUG3, "Global resource total pending start time is updated "
							 "to "UINT64_FORMAT,
							 mctrack->TotalPendingStartTime);
			}

			/* Generate request to resource broker now. */
			res = RB_acquireResource(reqmem, reqcore, preferred);
			if ( res != FUNC_RETURN_OK && res != RESBROK_PIPE_BUSY )
			{
				minusResourceBundleData(&(mctrack->TotalPending), reqmem, reqcore);
				mctrack->TotalPendingStartTime = oldtime;
				elog(WARNING, "Resource manager failed to allocate resource from "
							  "resource broker (%d MB, %d CORE).",
							  reqmem, reqcore);
			}
			else if ( res == RESBROK_PIPE_BUSY )
			{
				minusResourceBundleData(&(mctrack->TotalPending), reqmem, reqcore);
				elog(DEBUG3, "Resource manager should retry to submit request.");
				res = FUNC_RETURN_OK;
			}
			else
			{
				elog(RMLOG, "Resource manager finished submitting resource "
							"allocation request to global resource manager for "
							"(%d MB, %d CORE).",
							reqmem,
							reqcore);
			}
		}
	}

	return res;
}

void completeAllocRequestToBroker(int32_t 	 *reqmem,
								  int32_t 	 *reqcore,
								  List 		**preferred)
{
	/*
	 * Go through each segment to get minimum water level. The idea of completing
	 * the request is to keep pulling up the lowest water level in the cluster
	 * until equal or more GRM containers are requested.
	 */
	Assert(*reqmem % *reqcore == 0);
	uint32_t ratio = *reqmem / *reqcore;

	/* Step 1. Get lowest water level and build up index. */
	List *ressegl = NULL;
	getAllPAIRRefIntoList(&(PRESPOOL->Segments), &ressegl);
	/* Index of each segment in current preferred host list. */
	PAIR *reqidx = rm_palloc0(PCONTEXT, sizeof(PAIR) * list_length(ressegl));
	int llevel = INT_MAX;
	int totalcount = 0;
	int index = 0;
	bool allunavail = true;
	ListCell *cell = NULL;
	foreach(cell, ressegl)
	{
		reqidx[index] = NULL;

		PAIR pair = (PAIR)lfirst(cell);
		SegResource segres = (SegResource)(pair->Value);

		/*
		 * Resource manager skips this segment if
		 * 1) Not FTS available;
		 * 2) Having resource decrease pending.
		 */
		if (!IS_SEGSTAT_FTSAVAILABLE(segres->Stat)  ||
			(segres->DecPending.MemoryMB > 0 && segres->DecPending.Core > 0))
		{
			index++;
			continue;
		}

		allunavail = false;

		int clevel = (segres->ContainerSets[0] == NULL ?
					  0 :
					  list_length(segres->ContainerSets[0]->Containers)) +
					 segres->IncPending.MemoryMB / ratio;

		ListCell *pcell = NULL;
		foreach(pcell, *preferred)
		{
			PAIR existpair = (PAIR)lfirst(pcell);
			if ( existpair->Key == segres )
			{
				reqidx[index] = existpair;
				totalcount += ((ResourceBundle)(reqidx[index]->Value))->MemoryMB /
							  ratio;
				break;
			}
		}

		int creqsize = reqidx[index] == NULL ?
					   0 :
					   ((ResourceBundle)(reqidx[index]->Value))->MemoryMB / ratio;

		llevel = (clevel+creqsize) < llevel ? (clevel+creqsize) : llevel;
		index++;
	}

	/* If no segment available for adjusting the request, no need to keep going */
	if ( allunavail )
	{
		rm_pfree(PCONTEXT, reqidx);
		return;
	}

	for ( int i = 0 ; i < list_length(ressegl) ; ++i )
	{
		if ( reqidx[i] == NULL )
		{
			continue;
		}

		elog(RMLOG, "Expect resource from resource broker with "
				    "preferred host, hostname:%s, container number:%lf, "
				    "increase pending %d.",
				    GET_SEGRESOURCE_HOSTNAME(((SegResource)(reqidx[i]->Key))),
					((ResourceBundle)(reqidx[i]->Value))->Core,
					((SegResource)(reqidx[i]->Key))->IncPending.MemoryMB/ratio);
	}

	elog(RMLOG, "Lowest water level before adjusting is %d.", llevel);

	/* Step 2. Adjust request. */
	int32_t reqcoreleft = *reqcore - totalcount;
	bool keeplooping = true;
	while( reqcoreleft > 0 && keeplooping )
	{
		llevel++;
		index = 0;
		keeplooping = false;
		foreach(cell, ressegl)
		{
			PAIR pair = (PAIR)lfirst(cell);
			SegResource segres = (SegResource)(pair->Value);

			/*
			 * Resource manager skips this segment if
			 * 1) Not FTS available;
			 * 2) Having resource decrease pending.
			 */
			if (!IS_SEGSTAT_FTSAVAILABLE(segres->Stat) ||
				(segres->DecPending.MemoryMB > 0 && segres->DecPending.Core > 0))
			{
				index++;
				continue;
			}

			/* Check total capacity of the segment. */
			uint32_t corecap = getSegResourceCapacityCore(segres);

			int clevel = (segres->ContainerSets[0] == NULL ?
						  0 :
						  list_length(segres->ContainerSets[0]->Containers)) +
						 segres->IncPending.MemoryMB / ratio;

			int aclevel = reqidx[index] == NULL ?
						  clevel :
						  (clevel + ((ResourceBundle)(reqidx[index]->Value))->MemoryMB / ratio);

			if ( llevel > aclevel && llevel <= corecap )
			{
				if ( reqidx[index] == NULL )
				{
					reqidx[index] = rm_palloc0(PCONTEXT, sizeof(PAIRData));
					reqidx[index]->Key = segres;
					ResourceBundle resource = rm_palloc0(PCONTEXT,
														 sizeof(ResourceBundleData));
					resetResourceBundleData(resource, 0, 0, ratio);
					reqidx[index]->Value = resource;
					*preferred = lappend(*preferred, reqidx[index]);
				}
				addResourceBundleData((ResourceBundle)(reqidx[index]->Value),
									  (llevel-aclevel) * ratio,
									  llevel-aclevel);
				reqcoreleft -= llevel-aclevel;
				keeplooping = true;

				elog(RMLOG, "Resource manager acquires %lf GRM containers on "
						    "host %s. Current level(having pending) %d, "
						    "expect level %d, acquired in current request %d.",
						    ((ResourceBundle)(reqidx[index]->Value))->Core,
						    GET_SEGRESOURCE_HOSTNAME(((SegResource)(reqidx[index]->Key))),
							clevel,
							llevel,
							aclevel-clevel);
			}
			index++;
		}
	}

	/* Adjust total mem and core request. */
	*reqcore -= reqcoreleft;
	*reqmem = *reqcore * ratio;

	rm_pfree(PCONTEXT, reqidx);
}
/*
 * Create all socket servers to accept connection from QD or RM agents.
 */
int	 initializeSocketServer(void)
{
	int 		res		= FUNC_RETURN_OK;
	int 		netres 	= 0;
	char 	   *allip   = "0.0.0.0";
	pgsocket 	RMListenSocket[HAWQRM_SERVER_PORT_COUNT];

	for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i )
	{
		RMListenSocket[i] = PGINVALID_SOCKET;
	}

	netres = StreamServerPort(AF_UNSPEC,
			  	  	  	  	  allip,
							  rm_master_port,
							  NULL,
							  RMListenSocket,
							  HAWQRM_SERVER_PORT_COUNT);
	if ( netres != STATUS_OK )
	{
		res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
		elog(LOG, "Resource manager cannot create socket server. Port=%d",
				  rm_master_port);
		return res;
	}

	/* Initialize array for polling all file descriptors. */
	initializeAsyncComm();
	int 			validfdcount = 0;
	AsyncCommBuffer newbuffer    = NULL;
	for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i )
	{
		if (RMListenSocket[i] != PGINVALID_SOCKET)
		{
			netres = registerFileDesc(RMListenSocket[i],
									  ASYNCCOMM_READ,
									  &AsyncCommBufferHandlersMsgServer,
									  NULL,
									  &newbuffer);
			if ( netres != FUNC_RETURN_OK )
			{
				res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
				elog(WARNING, "Resource manager cannot track socket server.");
				break;
			}
			validfdcount++;
			InitHandler_Message(newbuffer);
		}
	}

	if ( res != FUNC_RETURN_OK )
	{
		for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i )
		{
			if ( RMListenSocket[i] != PGINVALID_SOCKET )
			{
				close(RMListenSocket[i]);
			}
		}
		return res;
	}

	elog(LOG, "Resource manager starts accepting resource request. "
			  "Listening normal socket port %d. "
			  "Total listened %d FDs.",
			  rm_master_port,
			  validfdcount);
	return res;
}


/*
 * Send built response content back to RM. The sending is asynchronous.
 *
 * If the progress is error, close the connection.
 */
void sendResponseToClients(void)
{
	while( list_length(PCONTRACK->ConnToSend) > 0 )
	{
		ConnectionTrack conntrack = (ConnectionTrack)
									lfirst(list_head(PCONTRACK->ConnToSend));
		MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
		PCONTRACK->ConnToSend = list_delete_first(PCONTRACK->ConnToSend);
		MEMORY_CONTEXT_SWITCH_BACK

		if ( conntrack->CommBuffer != NULL )
		{
			buildMessageToCommBuffer(conntrack->CommBuffer,
									 conntrack->MessageBuff.Buffer,
									 conntrack->MessageSize,
									 conntrack->MessageID,
									 conntrack->MessageMark1,
									 conntrack->MessageMark2);
		}
	}
}

/*
 * Check and set the nodes down that are not updated by IMAlive heart-beat for a
 * long time.
 */
void updateStatusOfAllNodes()
{
	SegResource node = NULL;
	uint64_t curtime = 0;

	bool changedstatus = false;
	curtime = gettime_microsec();
	for(uint32_t idx = 0; idx < PRESPOOL->SegmentIDCounter; idx++)
	{
		node = getSegResource(idx);
		Assert(node != NULL);
		uint8_t oldStatus = node->Stat->FTSAvailable;
		if ( (curtime - node->LastUpdateTime >
			 1000000LL * rm_segment_heartbeat_timeout) &&
			 (node->Stat->StatusDesc & SEG_STATUS_HEARTBEAT_TIMEOUT) == 0)
		{
			/*
			 * This segment is heartbeat timeout, update its description
			 * and set it to unavailable if needed.
			 */
			if (oldStatus == RESOURCE_SEG_STATUS_AVAILABLE)
			{
				/*
				 * This call makes resource manager able to adjust queue and mem/core
				 * trackers' capacity.
				 */
				setSegResHAWQAvailability(node, RESOURCE_SEG_STATUS_UNAVAILABLE);
				/*
				 * This call makes resource pool remove unused containers.
				 */
				returnAllGRMResourceFromSegment(node);
				changedstatus = true;
			}

			node->Stat->StatusDesc |= SEG_STATUS_HEARTBEAT_TIMEOUT;
			if (Gp_role != GP_ROLE_UTILITY)
			{
				SimpStringPtr description = build_segment_status_description(node->Stat);
				update_segment_status(idx + REGISTRATION_ORDER_OFFSET,
										SEGMENT_STATUS_DOWN,
										(description->Len > 0)?description->Str:"");
				add_segment_history_row(idx + REGISTRATION_ORDER_OFFSET,
										GET_SEGRESOURCE_HOSTNAME(node),
										(description->Len > 0)?description->Str:"");

				freeSimpleStringContent(description);
				rm_pfree(PCONTEXT, description);
			}

			elog(WARNING, "Resource manager sets host %s heartbeat timeout.",
						  GET_SEGRESOURCE_HOSTNAME(node));
		}
	}

	if ( changedstatus )
	{
		refreshResourceQueueCapacity(false);
		refreshActualMinGRMContainerPerSeg();
	}

	validateResourcePoolStatus(true);
}

/*
 * Set all nodes DOWN due to no GRM cluster report
 */
void setAllNodesGRMDown()
{
	SegResource node = NULL;

	bool changedstatus = false;
	for(uint32_t idx = 0; idx < PRESPOOL->SegmentIDCounter; idx++)
	{
		node = getSegResource(idx);
		Assert(node != NULL);
		uint8_t oldStatus = node->Stat->FTSAvailable;
		uint32_t oldDesc  = node->Stat->StatusDesc;

		if (oldStatus == RESOURCE_SEG_STATUS_AVAILABLE)
		{
			/*
			 * This call makes resource manager able to adjust queue and mem/core
			 * trackers' capacity.
			 */
			setSegResHAWQAvailability(node, RESOURCE_SEG_STATUS_UNAVAILABLE);
			changedstatus = true;
		}

		node->Stat->StatusDesc |= SEG_STATUS_NO_GRM_NODE_REPORT;
		if (Gp_role != GP_ROLE_UTILITY && oldDesc != node->Stat->StatusDesc)
		{
			SimpStringPtr description = build_segment_status_description(node->Stat);
			update_segment_status(idx + REGISTRATION_ORDER_OFFSET,
									SEGMENT_STATUS_DOWN,
									(description->Len > 0)?description->Str:"");
			add_segment_history_row(idx + REGISTRATION_ORDER_OFFSET,
									GET_SEGRESOURCE_HOSTNAME(node),
									(description->Len > 0)?description->Str:"");

			freeSimpleStringContent(description);
			rm_pfree(PCONTEXT, description);
		}

		if (oldDesc != node->Stat->StatusDesc)
		{
			elog(WARNING, "Resource manager sets host %s down in cleanup phase "
						  "for resource broker error.",
						  GET_SEGRESOURCE_HOSTNAME(node));
		}
	}

	if (changedstatus)
	{
		refreshResourceQueueCapacity(false);
		refreshActualMinGRMContainerPerSeg();
	}
}

/**
 * Load segment information from file.
 *
 * NOTE: This is a test facility.
 */
int  loadHostInformationIntoResourcePool(void)
{
    int                     res             = FUNC_RETURN_OK;
    SelfMaintainBufferData  seginfobuff;
    FILE				   *phostlist 		= NULL;

    /* Open the file */

    elog(LOG, "HAWQ RM :: To load file %s", rm_resourcepool_test_filename);

    phostlist = fopen(rm_resourcepool_test_filename, "r");
    if ( phostlist == NULL )
        return FUNC_RETURN_OK;

    /* This buffer is used for building machine id instance. */
    initializeSelfMaintainBuffer(&seginfobuff, PCONTEXT);

    /*
     * Loop each host, we identify hosts by hostname only. The first segment in
     * that host is selected as the target machine.
     */
    char line[1024];
    while( fgets(line, sizeof(line)-1, phostlist) != NULL ) {

        elog(LOG, "HAWQ RM :: Load line %s", line);

        /* Parse line. */
        char *phostname = strtok(line, ",");
        if ( phostname == NULL ) continue;
        char *phostport = strtok(NULL, ",");
        if ( phostport == NULL ) continue;
        char *phostip = strtok(NULL, ",");
        if ( phostip == NULL ) continue;
        uint32_t port = 0;
        if (sscanf(phostport, "%d", &port) != 1)
        {
            elog(LOG, "HAWQ RM :: Invalid port, skip machine %s", phostname);
            continue;
        }

        /* If the host is new, add new host instance in resource pool */
        int32_t segtid = SEGSTAT_ID_INVALID;
        if ( getSegIDByHostName(phostname,strlen(phostname), &segtid) ==
             FUNC_RETURN_OK )
        {
            elog(LOG, "HAWQ RM :: Skip machine %s", phostname);
            continue;
        }

        elog(LOG, "HAWQ RM :: Find machine to add. %s", phostname);

        /* Reserve fixed head part of the machine id instance. */
        resetSelfMaintainBuffer(&seginfobuff);
        prepareSelfMaintainBuffer(&seginfobuff, sizeof(SegInfoData),true);
        SegInfo newhost = (SegInfo)(seginfobuff.Buffer);
        newhost->master     = 0;
        newhost->standby    = 0;
        newhost->port       = port;
        newhost->alive      = 1;

        jumpforwardSelfMaintainBuffer(&seginfobuff, sizeof(SegInfoData));

        /* Add address offset and attributes. Currently, only one address. */
        newhost->HostAddrCount = 1;
        newhost->AddressAttributeOffset = sizeof(SegInfoData);
        /* Jump offset and reserved pad. */
        newhost->AddressContentOffset   = sizeof(SegInfoData) +
                sizeof(uint32_t) * (((newhost->HostAddrCount + 1) >> 1) << 1);
        uint16_t addrattr   = HOST_ADDRESS_CONTENT_STRING;
        uint16_t addroffset = newhost->AddressContentOffset;

        appendSMBVar(&seginfobuff, addroffset);
        appendSMBVar(&seginfobuff, addrattr);
        appendSelfMaintainBufferTill64bitAligned(&seginfobuff);

        /* Add hostip content. */
        uint32_t length   = strlen(phostip);
        while( length > 0 &&
               (phostip[length-1] == '\r' ||
                phostip[length-1] == '\n' ||
                phostip[length-1] == '\t' ||
                phostip[length-1] == ' ') ) {
            length--;
        }
        int     addrsize  = __SIZE_ALIGN64(offsetof(AddressStringData, Address) +
                                           length + 1);
        prepareSelfMaintainBuffer(&seginfobuff, addrsize, true);
        AddressString straddr = (AddressString)(seginfobuff.Buffer +
                                                seginfobuff.Cursor + 1);
        straddr->Length = length;
        memcpy(straddr->Address, phostip, length+1);
        straddr->Address[length] = '\0';
        jumpforwardSelfMaintainBuffer(&seginfobuff, addrsize);

        newhost = (SegInfo)(seginfobuff.Buffer);

        /* Append hostname string. */
        newhost->HostNameLen = strlen(phostname);
        newhost->HostNameOffset = seginfobuff.Cursor + 1;
        appendSMBStr(&seginfobuff,phostname);
        appendSelfMaintainBufferTill64bitAligned(&seginfobuff);

        /* Update total machine id instance size. */
        newhost = (SegInfo)(seginfobuff.Buffer);
        newhost->Size = seginfobuff.Cursor + 1;

        /* Build machine info instance. */
        SegStat segstat = (SegStat)rm_palloc0(PCONTEXT,
                                              offsetof(SegStatData, Info) +
                                              seginfobuff.Cursor + 1);
        segstat->ID                = SEGSTAT_ID_INVALID;
        segstat->FTSAvailable      = RESOURCE_SEG_STATUS_AVAILABLE;
        segstat->FTSTotalMemoryMB  = DRMGlobalInstance->SegmentMemoryMB;
        segstat->FTSTotalCore      = DRMGlobalInstance->SegmentCore;
        segstat->GRMTotalMemoryMB  = 0;
        segstat->GRMTotalCore      = 0;
        segstat->FailedTmpDirNum   = 0;

        memcpy((char *)segstat + offsetof(SegStatData, Info),
                seginfobuff.Buffer,
                seginfobuff.Cursor+1);

        SelfMaintainBufferData segreport;
        initializeSelfMaintainBuffer(&segreport,PCONTEXT);
        generateSegStatReport(segstat, &segreport);
        elog(LOG, "Resource manager builds available segment from file: %s",
        		  segreport.Buffer);
        destroySelfMaintainBuffer(&segreport);

        bool capstatchanged = false;
        res = addHAWQSegWithSegStat(segstat, &capstatchanged);
        if ( res != FUNC_RETURN_OK )
        {
            elog(WARNING, "Resource manager failed to add machine from file.");
            rm_pfree(PCONTEXT, segstat);
        }

        /* Fill HDFS and GRM host name cache */
        int32_t id = SEGSTAT_ID_INVALID;
        SimpString key;
        setSimpleStringRefNoLen(&key, GET_SEGINFO_HOSTNAME(&(segstat->Info)));
        getSegIDByHostName(GET_SEGINFO_HOSTNAME(&(segstat->Info)),
                           segstat->Info.HostNameLen,
                           &id);
        setHASHTABLENode(&(PRESPOOL->HDFSHostNameIndexed),
                         &key,
                         TYPCONVERT(void *, id),
                         false);
        setHASHTABLENode(&(PRESPOOL->GRMHostNameIndexed),
                         &key,
                         TYPCONVERT(void *, id),
                         false);
    }

	/* Refresh resource queue capacities. */
    refreshResourceQueueCapacity(false);
    refreshActualMinGRMContainerPerSeg();
	/* Recalculate all memory/core ratio instances' limits. */
	refreshMemoryCoreRatioLimits();
	/* Refresh memory/core ratio level water mark. */
	refreshMemoryCoreRatioWaterMark();

	/* Free buffer if used. */
	destroySelfMaintainBuffer(&seginfobuff);

	fclose(phostlist);

    return FUNC_RETURN_OK;
}

extern Datum dump_resource_manager_status(PG_FUNCTION_ARGS)
{
	static char errorbuf[ERRORMESSAGE_SIZE];
    int type = PG_GETARG_INT32(0);
    char message[1024] = {0};
    char dump_file[1024] = {0};

    switch (type)
    {
    case 1:
        strcpy(dump_file, "/tmp/resource_manager_conntrack_status");
        sprintf(message, "Dump resource manager connection track status to %s", dump_file);
        break;

    case 2:
        strcpy(dump_file, "/tmp/resource_manager_resqueue_status");
        sprintf(message, "Dump resource manager resource queue status to %s", dump_file);
        break;

    case 3:
        strcpy(dump_file, "/tmp/resource_manager_respool_status");
        sprintf(message, "Dump resource manager resource pool status to %s", dump_file);
        break;

    default:
        sprintf(message, "Dump resource manager status failed.\n"
                "1 -> dump resource manager connection track status\n"
                "2 -> dump resource manager resource queue status\n"
                "3 -> dump resource manager resource pool status");
        PG_RETURN_TEXT_P(cstring_to_text(message));    
    }

    dumpResourceManagerStatus(type, dump_file, errorbuf, sizeof(errorbuf));

    PG_RETURN_TEXT_P(cstring_to_text(message));    
}

void processResourceBrokerTasks(void)
{
	uint64_t curtime = 0;
	int		 res	 = FUNC_RETURN_OK;

	if ( !isCleanGRMResourceStatus() )
	{
		/*
		 * STEP 1. Check if should generate request to periodically check
		 * 		   GRM cluster status.
		 *
		 * Generate request to the global resource manager ( YARN, etc.) to get
		 * cluster report. This is a asynchronous request, therefore, we don't
		 * need to handle the real cluster report right now. The possibility is
		 * reserved that if one fake global resource manager is implemented in
		 * HAWQ RM process and the cluster report can be returned directly in
		 * this function.
		 */
        curtime = gettime_microsec();

		if ( (PRESPOOL->Segments.NodeCount > 0 ) &&
			 (curtime - PRESPOOL->LastUpdateTime  >
			  rm_cluster_report_period * 1000000LL ||
			  hasSegmentGRMCapacityNotUpdated() ) &&
			 (curtime - PRESPOOL->LastRequestTime > 5LL * 1000000LL) )
		{
			double  maxcap  = 0.0;
			List   *report	= NULL;

		    PRESPOOL->LastRequestTime = curtime;
		    res = RB_getClusterReport(rm_grm_yarn_queue, &report, &maxcap);
		    Assert(report == NULL);
		    if ( res != FUNC_RETURN_OK )
		    {
		    	elog(WARNING, "Resource manager fails to fresh cluster report "
		    				  "from global resource manager.");
		    	goto exit;
		    }
		}

		/* STEP 2. Check if should periodically check container status. */
		curtime = gettime_microsec();

		if ( PRESPOOL->AddPendingContainerCount == 0 &&
			 PRESPOOL->RetPendingContainerCount == 0 &&
			 (curtime - PRESPOOL->LastCheckContainerTime   >
			  rm_cluster_report_period * 1000000LL) &&
			 (curtime - PRESPOOL->LastRequestContainerTime > 5LL  * 1000000LL) )
		{
			List *report = NULL;
			PRESPOOL->LastRequestContainerTime = curtime;
			res = RB_getContainerReport(&report);
			Assert( report == NULL );
			if ( res != FUNC_RETURN_OK )
			{
				elog(WARNING, "Resource manager fails to get container report "
							  "from global resource manager.");
				goto exit;
			}
		}

        /* STEP 3. Return kicked GRM containers. */
        curtime = gettime_microsec();

    	if ( !PRESPOOL->pausePhase[QUOTA_PHASE_KICKED_TO_RETURN] )
    	{
			res = RB_returnResource(&(PRESPOOL->KickedContainers));
			if ( res != FUNC_RETURN_OK )
			{
				elog(WARNING, "Resource manager failed to return kicked container to "
							  "global resource manager.");
				goto exit;
			}
    	}
    	else
		{
			elog(LOG, "Paused returning GRM containers kicked to GRM.");
    	}
	    /*
	     * STEP 4. Handle resource broker input as new allocated resource or
		 * 		   cluster report, container report etc. The allocated resource
		 * 		   from resource broker will be added to the resource queue
		 * 		   manager and resource pool.
		 */
		res = RB_handleNotifications();
		if ( res != FUNC_RETURN_OK )
		{
			elog(WARNING, "Resource manager fails to handle response from global "
						  "resource broker");
			goto exit;
		}
	}
	else
	{
		RB_clearResource(&(PRESPOOL->KickedContainers));
	}

exit:
	RB_handleError(res);
}

void generateResourceRequestToResourceBroker(void)
{
	uint64_t curtime = 0;
	int		 res	 = FUNC_RETURN_OK;

	if ( !isCleanGRMResourceStatus() )
	{
		curtime = gettime_microsec();

        if ( PRESPOOL->Segments.NodeCount > 0 && PQUEMGR->RatioCount > 0 )
        {
        	refreshMemoryCoreRatioLevelUsage(curtime);
        	if ( curtime - PRESPOOL->LastResAcqTime > 1000000LL)
        	{
				res = generateAllocRequestToBroker();
				if ( res != FUNC_RETURN_OK )
				{
					elog(WARNING, "Resource manager fails to allocate container "
								  "from global resource manager.");
					goto exit;
				}
        	}
        }
	}

exit:
	RB_handleError(res);

}

void cleanupAllGRMContainers(void)
{
	dropAllResPoolGRMContainersToToBeKicked();
	dropAllToAcceptGRMContainersToKicked();
}

bool cleanedAllGRMContainers(void)
{
	/* Condition 1. All segments have no resource allocated. */
	if ( !allSegmentHasNoGRMContainersAllocated() )
	{
		return false;
	}

	if ( PRESPOOL->AddPendingContainerCount > 0 ||
		 PRESPOOL->RetPendingContainerCount > 0 )
	{
		elog(RMLOG, "Pending GRM container count inc %d, dec %d.",
					PRESPOOL->AddPendingContainerCount,
					PRESPOOL->RetPendingContainerCount);
	}
	/* Condition 2. No on-the-fly GRM containers for increasing and decreasing.*/
	return PRESPOOL->AddPendingContainerCount == 0 &&
		   PRESPOOL->RetPendingContainerCount == 0;
}
