| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "envswitch.h" |
| #include "dynrm.h" |
| #include "utils/hashtable.h" |
| |
| #include "resourcemanager/resourcemanager.h" |
| #include "resourceenforcer/resourceenforcer.h" |
| |
| #include "storage/ipc.h" |
| #include "postmaster/postmaster.h" |
| #include "postmaster/fork_process.h" |
| #include "postmaster/syslogger.h" |
| #include "utils/kvproperties.h" |
| #include "utils/network_utils.h" |
| |
| #include "miscadmin.h" |
| |
| #include "communication/rmcomm_QD2RM.h" |
| #include "communication/rmcomm_AsyncComm.h" |
| #include "communication/rmcomm_MessageHandler.h" |
| #include "communication/rmcomm_MessageServer.h" |
| #include "communication/rmcomm_RM2GRM_libyarn.h" |
| #include "communication/rmcomm_RM2GRM_none.h" |
| #include "communication/rmcomm_RMSEG2RM.h" |
| #include "communication/rmcomm_RM2RMSEG.h" |
| #include "storage/proc.h" |
| #include "catalog/pg_database.h" |
| #include "catalog/pg_tablespace.h" |
| #include "catalog/catalog.h" |
| #include "utils/syscache.h" |
| #include "storage/backendid.h" |
| #include "storage/sinvaladt.h" |
| |
| #include "catalog/pg_authid.h" |
| #include "catalog/pg_resqueue.h" |
| #include "access/htup.h" |
| #include "utils/tqual.h" |
| |
| #include "access/xact.h" |
| |
| #include "resourcebroker/resourcebroker_API.h" |
| |
| #include <executor/spi.h> |
| |
| #include "gp-libpq-fe.h" |
| #include "gp-libpq-int.h" |
| |
| extern bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace); |
| static char *probeDatabase = "template1"; |
| |
| int loadAllQueueAndUser(void); |
| int loadHostInformationIntoResourcePool(void); |
| |
| /** Functions for signal handling. **/ |
| void quitResManager(SIGNAL_ARGS); |
| void handleChildSignal(SIGNAL_ARGS); |
| void notifyPostmasterResManagerStarted(SIGNAL_ARGS); |
| int generateAllocRequestToBroker(void); |
| void completeAllocRequestToBroker(int32_t *reqmem, |
| int32_t *reqcore, |
| List **preferred); |
| void processResourceBrokerTasks(void); |
| void generateResourceRequestToResourceBroker(void); |
| void cleanupAllGRMContainers(void); |
| bool cleanedAllGRMContainers(void); |
| |
| /** Functions for loading resource queue and user definition. **/ |
| #define DEC_VAR_PG_CATCOL(name) Datum name##Datum; bool name##IsNull = false; |
| |
| #define GETATTR_PG_CATCOL(tbname,colname,relvar,tupvar) \ |
| colname##Datum = heap_getattr(tupvar, \ |
| Anum_##tbname##_##colname, \ |
| relvar->rd_att, \ |
| &colname##IsNull); |
| |
| #define BLDPROP_PG_CATCOL(colname,list,tag1,attrtag,attrid,attrtyp,index) \ |
| insertDQueueTailNode(list, \ |
| createProperty##attrtyp(list->Context, \ |
| tag1, \ |
| get##attrtag##AttributeName(attrid), \ |
| &index, \ |
| DatumGet##attrtyp(colname##Datum))); |
| |
| #define BLDPROP_PG_CATCOL_OID(colname,list,tag1,attrtag,attrid,index) \ |
| insertDQueueTailNode(list, \ |
| createPropertyOID(list->Context, \ |
| tag1, \ |
| get##attrtag##AttributeName(attrid), \ |
| &index, \ |
| colname##IsNull ? InvalidOid : \ |
| DatumGetObjectId(colname##Datum))); |
| |
| #define BLDPROP_PG_CATCOL_SOID(list,tag1,attrtag,attrid,index,oid) \ |
| insertDQueueTailNode(list, \ |
| createPropertyOID(list->Context, \ |
| tag1, \ |
| get##attrtag##AttributeName(attrid), \ |
| &index, \ |
| oid)); |
| |
| #define BLDPROP_PG_CATCOL_SET(list,tag1,attrtag,attrid,attrtyp,index,val) \ |
| insertDQueueTailNode(list, \ |
| createProperty##attrtyp(list->Context, \ |
| tag1, \ |
| get##attrtag##AttributeName(attrid), \ |
| &index, \ |
| val)); |
| |
| #define BLDPROP_PG_CATCOL_TXT(colname,list,tag1,attrtag,attrid,index) \ |
| insertDQueueTailNode(list, \ |
| createPropertyText(list->Context, \ |
| tag1, \ |
| get##attrtag##AttributeName(attrid), \ |
| &index, \ |
| colname##IsNull ? NULL : \ |
| DatumGetTextP(colname##Datum))); |
| |
| #define BLDPROP_PG_CATCOL_FLOAT(colname,list,tag1,attrtag,attrid,index) \ |
| insertDQueueTailNode(list, \ |
| createPropertyFloat(list->Context, \ |
| tag1, \ |
| get##attrtag##AttributeName(attrid), \ |
| &index, \ |
| colname##IsNull ? -1.0 : \ |
| DatumGetFloat4(colname##Datum))); |
| |
| #define BLDPROP_PG_CATCOL_STR(list,tag1,attrtag,attrid,index,val) \ |
| insertDQueueTailNode(list, \ |
| createPropertyString(list->Context, \ |
| tag1, \ |
| get##attrtag##AttributeName(attrid), \ |
| &index, \ |
| val)); |
| |
| #define BLDPROP_PG_CATCOL_INT32(colname,list,tag1,attrtag,attrid,index) \ |
| insertDQueueTailNode(list, \ |
| createPropertyInt32(list->Context, \ |
| tag1, \ |
| get##attrtag##AttributeName(attrid), \ |
| &index, \ |
| colname##IsNull ? -1 : \ |
| DatumGetInt32(colname##Datum))); |
| |
| #define BLDPROP_PG_CATCOL_INT64(colname,list,tag1,attrtag,attrid,index) \ |
| insertDQueueTailNode(list, \ |
| createPropertyInt32(list->Context, \ |
| tag1, \ |
| get##attrtag##AttributeName(attrid), \ |
| &index, \ |
| colname##IsNull ? -1 : \ |
| DatumGetInt64(colname##Datum))); |
| |
| int loadUserPropertiesFromCatalog(List **users); |
| int loadQueuePropertiesFromCatalog(List **queues); |
| |
| /****************************************************************************** |
| * The global instance of dynamic resource manager. |
| ******************************************************************************/ |
| DynRMGlobal DRMGlobalInstance;/* The global instance for HAWQ RM logic. */ |
| volatile bool DRMStartup; /* This variable is set only in postmaster |
| process by signal handler SIG_USR2. This is |
| used to check if HAWQ RM is ready to accept |
| request. */ |
| |
| pthread_t t_move_cgroup; |
| queue *g_queue_cgroup; |
| GHash g_ghash_cgroup; |
| uint64 rm_enforce_last_cleanup_time; |
| volatile bool g_enforcement_thread_quited = false; |
| |
| #define SEGMENT_HEARTBEAT_INTERVAL (3LL * 1000000LL) |
| /*************************** |
| * The main entry of HAWQ RM |
| ***************************/ |
| int ResManagerMain(int argc, char *argv[]) |
| { |
| int res = FUNC_RETURN_OK; |
| |
| /*******************************************************************/ |
| /* Step 1. Prepare memory context, log and create global instance. */ |
| /*******************************************************************/ |
| IsUnderPostmaster = true; |
| MyProcPid = getpid(); |
| SetProcessingMode(InitProcessing); |
| |
| res = createDRMInstance(); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(FATAL, "HAWQ RM Can not create resource manager global instance."); |
| } |
| elog(DEBUG5, "HAWQ RM :: created dynamic resource manager instance."); |
| |
| res = createDRMMemoryContext(); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(FATAL, "HAWQ RM Can not create resource manager global instance."); |
| } |
| elog(DEBUG5, "HAWQ RM :: created resource manager memory context."); |
| |
| res = initializeDRMInstance(PCONTEXT); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(FATAL, "HAWQ RM Can not initialize global instance."); |
| } |
| |
| elog(DEBUG5, "HAWQ RM :: initialized resource manager instance."); |
| |
| /************************************************************************** |
| * STEP 2. Load configuration. |
| **************************************************************************/ |
| |
| /* Command line parser. */ |
| res = parseCommandLine(argc, argv); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(FATAL, "Wrong resource manager command line arguments."); |
| } |
| |
| elog(DEBUG5, "HAWQ RM ::passed command line arguments."); |
| |
| /* Recognize all loaded configure properties. */ |
| res = loadDynamicResourceManagerConfigure(); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(FATAL, "Fail to load valid properties from configure files."); |
| } |
| |
| elog(DEBUG5, "HAWQ RM :: passed loading configuration."); |
| |
| /*******************************************************/ |
| /* Step 3. Post-initialization of core data structure. */ |
| /*******************************************************/ |
| initializeDRMCore(); |
| |
| elog(DEBUG5, "HAWQ RM :: Passed initializing core data structure."); |
| |
| /**************************************************************************/ |
| /* STEP 4. INIT for making RM process access catalog by CAQL etc. */ |
| /**************************************************************************/ |
| /* BLOCK signal behavior. Only another specific thread has the capability to |
| * process the signal. */ |
| PG_SETMASK(&BlockSig); |
| pqsignal(SIGHUP , SIG_IGN); |
| pqsignal(SIGINT , quitResManager); |
| pqsignal(SIGTERM, quitResManager); |
| pqsignal(SIGQUIT, quitResManager); |
| pqsignal(SIGPIPE, SIG_IGN); |
| pqsignal(SIGUSR1, SIG_IGN); |
| pqsignal(SIGUSR2, quitResManager); |
| pqsignal(SIGCHLD, handleChildSignal); |
| pqsignal(SIGTTIN, SIG_IGN); |
| pqsignal(SIGTTOU, SIG_IGN); |
| |
| if ( DRMGlobalInstance->Role == START_RM_ROLE_MASTER ) { |
| CurrentResourceOwner = ResourceOwnerCreate(NULL, "Resource Manager"); |
| |
| BaseInit(); |
| InitProcess(); |
| InitBufferPoolBackend(); |
| InitXLOGAccess(); |
| |
| SetProcessingMode(NormalProcessing); |
| |
| MyDatabaseId = TemplateDbOid; |
| MyDatabaseTableSpace = DEFAULTTABLESPACE_OID; |
| if (!FindMyDatabase(probeDatabase, &MyDatabaseId, &MyDatabaseTableSpace)) |
| ereport(FATAL, (errcode(ERRCODE_UNDEFINED_DATABASE), |
| errmsg("database 'postgres' does not exist"))); |
| |
| char *fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace); |
| |
| SetDatabasePath(fullpath); |
| |
| InitProcessPhase2(); |
| |
| MyBackendId = InvalidBackendId; |
| |
| SharedInvalBackendInit(false); |
| |
| if (MyBackendId > MaxBackends || MyBackendId <= 0) |
| elog(FATAL, "bad backend id: %d", MyBackendId); |
| |
| InitBufferPoolBackend(); |
| RelationCacheInitialize(); |
| InitCatalogCache(); |
| RelationCacheInitializePhase2(); |
| } |
| /* END: INIT for making RM process access catalog by caql etc. */ |
| /**************************************************************************/ |
| PG_SETMASK(&UnBlockSig); |
| |
| /* Save process fork mode. */ |
| DRMGlobalInstance->ThisPID = getpid(); |
| DRMGlobalInstance->ParentPID = getppid(); |
| |
| /* Initialize socket connection pool. */ |
| initializeSocketConnectionPool(); |
| |
| elog(DEBUG5, "HAWQ RM :: starts as role %d.", DRMGlobalInstance->Role); |
| |
| /*******************************************/ |
| /* STEP 5. Start concrete processing loop. */ |
| /*******************************************/ |
| switch(DRMGlobalInstance->Role) { |
| case START_RM_ROLE_MASTER: |
| { |
| /* Mark as master side resource manager. */ |
| int oldid = gp_session_id; |
| gp_session_id = -1; |
| init_ps_display("master resource manager", " ", "", ""); |
| gp_session_id = oldid; |
| |
| /* Initialize resource broker service. */ |
| RB_prepareImplementation(DRMGlobalInstance->ImpType); |
| res = RB_start(true); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(FATAL, "Fail to create resource broker service."); |
| } |
| res = ResManagerMainServer2ndPhase(); |
| elog(LOG, "Master RM exits.\n"); |
| break; |
| } |
| case START_RM_ROLE_SEGMENT: |
| { |
| init_ps_display("segment resource manager", "", "",""); |
| res = ResManagerMainSegment2ndPhase(); |
| elog(LOG, "Segment RM exits.\n"); |
| break; |
| } |
| default: |
| Assert(false); /* Should never come here. */ |
| } |
| return res; |
| } |
| |
| |
| /* |
| * Start sub-process as resource manager from postmaster. |
| * |
| * rmrole[in] Specify the role of RM process, possible values are : |
| * START_RM_ROLE_SERVER : RM server |
| * START_RM_ROLE_SEGMENT: RM agent. |
| */ |
| int ResManagerProcessStartup(void) |
| { |
| pid_t ResMgrPID = 0; |
| char *arguments[3] = {NULL, NULL, NULL}; |
| arguments[0] = "postgres"; |
| arguments[1] = "-role"; |
| |
| int rmrole = AmIMaster() ? START_RM_ROLE_MASTER : START_RM_ROLE_SEGMENT; |
| // if in upgrade mode, at the first calling, the old pg_resqueue data can't let resource manager works, |
| // And also we only need to upgrade master node, no segment is enabled at this time, so no need for resource manager. |
| if(gp_upgrade_mode) |
| return 0; |
| /* |
| * Prepare temporary signal handler for SIGUSR2 to let postmaster process |
| * able to know when HAWQ RM is ready to accept requests. |
| */ |
| DRMStartup = false; |
| pqsignal(SIGUSR2, notifyPostmasterResManagerStarted); |
| PG_SETMASK(&UnBlockSig); |
| |
| /* Fork resource manager process. */ |
| switch ((ResMgrPID = fork_process())) |
| { |
| case -1: |
| pqsignal(SIGUSR2, SIG_IGN); |
| PG_SETMASK(&BlockSig); |
| ereport(FATAL, |
| (errmsg("could not fork resource manager collector: %m"))); |
| return 0; |
| case 0: |
| pqsignal(SIGUSR2, SIG_IGN); |
| /* in postmaster child ... */ |
| /* Close the postmaster's sockets */ |
| ClosePostmasterPorts(false); |
| |
| /* Lose the postmaster's on-exit routines */ |
| on_exit_reset(); |
| |
| ResourceManagerIsForked = true; |
| |
| switch(rmrole) { |
| case START_RM_ROLE_MASTER: |
| arguments[2] = "server"; |
| proc_exit(ResManagerMain(3, arguments)); |
| break; |
| case START_RM_ROLE_SEGMENT: |
| arguments[2] = "segment"; |
| proc_exit(ResManagerMain(3, arguments)); |
| break; |
| } |
| |
| break; |
| |
| default: |
| { |
| /******************************************************************* |
| * IN POSTMASTER. Postmaster here waits for the signal SIGUSR2 sent |
| * from HAWQ RM to gurantee that HAWQ RM can accept the request after |
| * the return of this function. |
| ******************************************************************/ |
| /* wait to ensure that resource manager works basically. */ |
| for ( int i = 0 ; i < 180 ; ++i ) { |
| if ( usleep(1000000) != 0 ) |
| i--; |
| elog(LOG, "Wait for HAWQ RM %d", i); |
| if ( DRMStartup ) |
| break; |
| } |
| pqsignal(SIGUSR2, SIG_IGN); |
| PG_SETMASK(&BlockSig); |
| |
| if ( DRMStartup ) { |
| elog(LOG, "HAWQ :: Received signal notification that HAWQ RM " |
| "works now."); |
| } |
| else { |
| elog(FATAL, "HAWQ RM can not work. Please check HAWQ RM log."); |
| } |
| return ResMgrPID; |
| } |
| } |
| |
| /* shouldn't get here */ |
| return 0; |
| } |
| |
| int ResManagerMainServer2ndPhase(void) |
| { |
| int res = FUNC_RETURN_OK; |
| |
| /* Register message handlers */ |
| registerMessageHandler(REQUEST_QD_CONNECTION_REG , handleRMRequestConnectionReg); |
| registerMessageHandler(REQUEST_QD_CONNECTION_REG_OID , handleRMRequestConnectionRegByOID); |
| registerMessageHandler(REQUEST_QD_CONNECTION_UNREG , handleRMRequestConnectionUnReg); |
| registerMessageHandler(REQUEST_QD_ACQUIRE_RESOURCE , handleRMRequestAcquireResource); |
| registerMessageHandler(REQUEST_QD_RETURN_RESOURCE , handleRMRequestReturnResource); |
| registerMessageHandler(REQUEST_RM_IMALIVE , handleRMSEGRequestIMAlive); |
| registerMessageHandler(REQUEST_QD_DDL_MANIPULATERESQUEUE, handleRMDDLRequestManipulateResourceQueue); |
| registerMessageHandler(REQUEST_QD_DDL_MANIPULATEROLE , handleRMDDLRequestManipulateRole); |
| registerMessageHandler(REQUEST_QD_ACQUIRE_RESOURCE_QUOTA, handleRMRequestAcquireResourceQuota); |
| registerMessageHandler(REQUEST_QD_REFRESH_RESOURCE , handleRMRequestRefreshResource); |
| registerMessageHandler(REQUEST_QD_SEGMENT_ISDOWN , handleRMRequestSegmentIsDown); |
| registerMessageHandler(REQUEST_QD_DUMP_STATUS , handleRMRequestDumpStatus); |
| registerMessageHandler(REQUEST_QD_DUMP_RESQUEUE_STATUS , handleRMRequestDumpResQueueStatus); |
| registerMessageHandler(REQUEST_DUMMY , handleRMRequestDummy); |
| registerMessageHandler(REQUEST_QD_QUOTA_CONTROL , handleRMRequestQuotaControl); |
| /* New socket facility poll based server.*/ |
| res = initializeSocketServer(); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(FATAL, "Fail to initialize socket server."); |
| } |
| |
| /* |
| * Notify postmaster that HAWQ RM is ready. Ignore the possible problem that |
| * the parent process quits. HAWQ RM will automatically detect if its parent |
| * dies, then HAWQ RM should exit normally. |
| */ |
| kill(DRMGlobalInstance->ParentPID, SIGUSR2); |
| |
| /* Clean up gp_segment_configuration table. */ |
| cleanup_segment_config(); |
| |
| cleanup_segment_config_history(); |
| |
| /* |
| * register itself into gp_segment_configuration table |
| * master internal id is 0, segment id starts from 1 |
| */ |
| add_segment_config_row(MASTER_ORDER_ID, |
| DRMGlobalInstance->SocketLocalHostName.Str, |
| DRMGlobalInstance->SocketLocalHostName.Str, |
| PostPortNumber, |
| SEGMENT_ROLE_MASTER_CONFIG, |
| SEGMENT_STATUS_UP, |
| ""); |
| |
| /* Load queue and user definition as no DDL now. */ |
| res = loadAllQueueAndUser(); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(FATAL, "Fail to load queue and user definition."); |
| } |
| |
| elog(DEBUG5, "HAWQ RM :: passed loading queue and user definition."); |
| |
| /* Check slaves file firstly to ensure we have expected cluster size. */ |
| checkSlavesFile(); |
| |
| if ( rm_resourcepool_test_filename != NULL && |
| rm_resourcepool_test_filename[0] != '\0' ) { |
| loadHostInformationIntoResourcePool(); |
| } |
| |
| /******* TILL NOW, resource manager starts providing services *******/ |
| elog(LOG, "HAWQ RM process works now."); |
| |
| /* Start request handler to provide services. */ |
| res = MainHandlerLoop(); |
| /* res is returned to the caller. */ |
| |
| |
| /******* TILL NOW, resource manager goes into EXIT PHASE *******/ |
| elog(LOG, "HAWQ RM server goes into exit phase."); |
| |
| /* Shutdown resource broker. */ |
| RB_stop(); |
| |
| return res; |
| } |
| |
| /** |
| * The main loop of HAWQ RM process. |
| */ |
| int MainHandlerLoop(void) |
| { |
| int res = FUNC_RETURN_OK; |
| |
| DRMGlobalInstance->ResourceManagerStartTime = gettime_microsec(); |
| |
| while( DRMGlobalInstance->ResManagerMainKeepRun ) |
| { |
| /* STEP 0. Check if postmaster process exists. */ |
| if (!PostmasterIsAlive(true)) { |
| DRMGlobalInstance->ResManagerMainKeepRun = false; |
| elog(LOG, "Postmaster is not alive, resource manager exits"); |
| break; |
| } |
| |
| /* STEP 1. Check resource broker status. */ |
| RB_start(true); |
| |
| /* STEP 2. Check if resource manager is in clean up status. */ |
| if ( isCleanGRMResourceStatus() ) |
| { |
| /* |
| * Fisrtly we move all currently accepted GRM containers into resource |
| * pool. We will drop them at once. |
| */ |
| moveAllAcceptedGRMContainersToResPool(); |
| |
| /* |
| * Mark all current active connection tracks that they are in fact |
| * using old application's resource. When resource is returned, the |
| * resource is calculated in old resource level. |
| */ |
| setAllAllocatedResourceInConnectionTracksOld(); |
| |
| /* Validate that all in-use resource are marked to old resource. */ |
| Assert(isAllResourceQueueIdle()); |
| |
| /* |
| * Reset deadlock detector. The deadlock detector works for new |
| * resource only. |
| */ |
| resetAllDeadLockDetector(); |
| |
| /* |
| * Mark all segments DOWN due to no GRM cluster report |
| */ |
| setAllNodesGRMDown(); |
| |
| /* Move all resource broker allocated GRM containers to returned. */ |
| cleanupAllGRMContainers(); |
| |
| /* Refresh resource queue resource usage and request quota. */ |
| refreshMemoryCoreRatioLevelUsage(gettime_microsec()); |
| |
| resetAllSegmentsGRMContainerFailAllocCount(); |
| resetAllSegmentsNVSeg(); |
| |
| /* Check if can resume using new available global resource manager.*/ |
| if ( cleanedAllGRMContainers() ) |
| { |
| unsetCleanGRMResourceStatus(); |
| } |
| } |
| |
| /* |
| * STEP 3. Process all communications between resource manager and |
| * resource broker. |
| */ |
| processResourceBrokerTasks(); |
| |
| /* STEP 4. Handle socket server inputs. */ |
| res = processAllCommFileDescs(); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| /* |
| * The possible error here is the failure of poll(), we won't keep |
| * running HAWQ RM any longer, graceful quit is requested. |
| */ |
| elog(WARNING, "System error cause resource manager not possible to " |
| "track network communications. Exit resource manager " |
| "process normally."); |
| DRMGlobalInstance->ResManagerMainKeepRun = false; |
| } |
| |
| /* STEP 5. Handle all submitted requests through socket clients. */ |
| processSubmittedRequests(); |
| |
| /* STEP 6. Generate possible resource request to resource broker. */ |
| generateResourceRequestToResourceBroker(); |
| |
| /* STEP 7. Check timeout resource allocation and timeout queuing requests. */ |
| timeoutDeadResourceAllocation(); |
| timeoutQueuedRequest(); |
| |
| /* |
| * STEP 8. Check the status of all segment nodes, mark down if hasn't got |
| * IMAlive message for a pre-defined period. |
| */ |
| uint64_t curtime = gettime_microsec(); |
| if ((rm_resourcepool_test_filename == NULL || |
| rm_resourcepool_test_filename[0] == '\0') && |
| (curtime - PRESPOOL->LastCheckTime > |
| 1000000LL * rm_segment_heartbeat_timeout)) |
| { |
| updateStatusOfAllNodes(); |
| PRESPOOL->LastCheckTime = curtime; |
| } |
| |
| |
| /* STEP 9. Move all accepted GRM containers into resource pool. */ |
| moveAllAcceptedGRMContainersToResPool(); |
| |
| /* |
| * STEP 10. Check if should pause dispatching resource to queries to |
| * collect resource back in order to return GRM containers. |
| */ |
| if ( DRMGlobalInstance->ImpType != NONE_HAWQ2 && |
| PRESPOOL->AddPendingContainerCount == 0 && |
| PRESPOOL->RetPendingContainerCount == 0 && |
| PQUEMGR->ForcedReturnGRMContainerCount == 0 && |
| PQUEMGR->GRMQueueCurCapacity > 1 && |
| PQUEMGR->GRMQueueResourceTight ) |
| { |
| elog(LOG, "Resource manager decides to breathe out resource. " |
| "Current relative GRM queue capacity %lf, " |
| "Expect GRM queue capacity %lf, " |
| "Estimae GRM queue %s", |
| PQUEMGR->GRMQueueCurCapacity, |
| PQUEMGR->GRMQueueCapacity, |
| PQUEMGR->GRMQueueResourceTight ? "busy" : "not busy"); |
| |
| /* Calculate how many GRM containers should be returned. */ |
| setForcedReturnGRMContainerCount(); |
| } |
| |
| /* STEP 11. Dispatch resource to queries and send the messages out.*/ |
| |
| if ( PRESPOOL->Segments.NodeCount > 0 && PQUEMGR->RatioCount > 0 && |
| PQUEMGR->toRunQueryDispatch && |
| PQUEMGR->ForcedReturnGRMContainerCount == 0 && |
| PRESPOOL->SlavesHostCount > 0 ) |
| { |
| dispatchResourceToQueries(); |
| } |
| else if ( PQUEMGR->ForcedReturnGRMContainerCount > 0 ) |
| { |
| forceReturnGRMResourceToRB(); |
| } |
| else |
| { |
| elog(DEBUG3, "Loop dummy. %d, %d, %d, %d", |
| PRESPOOL->Segments.NodeCount, |
| PQUEMGR->RatioCount, |
| PQUEMGR->toRunQueryDispatch ? 1 : 0, |
| PQUEMGR->ForcedReturnGRMContainerCount); |
| } |
| |
| /* STEP 12. Generate output content to client connections. */ |
| sendResponseToClients(); |
| |
| /* |
| * STEP 13. Return containers to global resource manager if there some |
| * some idle resource. |
| */ |
| timeoutIdleGRMResourceToRB(); |
| |
| /* STEP 14. Notify segments to increase resource quota. */ |
| notifyToBeAcceptedGRMContainersToRMSEG(); |
| |
| /* STEP 15. Notify segments to decrease resource. */ |
| notifyToBeKickedGRMContainersToRMSEG(); |
| |
| /* |
| * STEP 16. Check slaves file if the content is not checked or is |
| * updated. |
| */ |
| checkSlavesFile(); |
| } |
| |
| elog(LOG, "Resource manager main event handler exits."); |
| |
| return res; |
| } |
| |
| /** |
| * Parse passed in command line arguments. The acceptable argument keys and |
| * values are all listed below which are locally used in parseCommandLine(). |
| */ |
| |
| /* -role + rolename */ |
| #define HAWQDRM_COMMANDLINE_ROLE "-role" |
| |
| #define HAWQDRM_COMMANDLINE_ROLE_VALUE_SERVER "server" |
| #define HAWQDRM_COMMANDLINE_ROLE_VALUE_SEGMENT "segment" |
| |
| int parseCommandLine(int argc, char **argv) |
| { |
| int res = FUNC_RETURN_OK; |
| SimpString value; |
| |
| initSimpleString(&value, PCONTEXT); |
| for ( int i = 0 ; i < argc ; ++i ) { |
| if ( strcmp(argv[i], HAWQDRM_COMMANDLINE_ROLE) == 0 ) { |
| res = getStringValue(argc, argv, i+1, &value); |
| if ( res != FUNC_RETURN_OK ) { |
| elog( WARNING, |
| "Wrong command line argument %s behind %s.", |
| value.Str, |
| HAWQDRM_COMMANDLINE_ROLE); |
| goto exit; |
| } |
| |
| if ( SimpleStringComp(&value, |
| HAWQDRM_COMMANDLINE_ROLE_VALUE_SERVER) == 0 ) { |
| DRMGlobalInstance->Role = START_RM_ROLE_MASTER; |
| } |
| else if ( SimpleStringComp(&value, |
| HAWQDRM_COMMANDLINE_ROLE_VALUE_SEGMENT) == 0 ) { |
| DRMGlobalInstance->Role = START_RM_ROLE_SEGMENT; |
| } |
| else { |
| elog(WARNING, "Wrong argument value '%s' for %s", |
| value.Str, HAWQDRM_COMMANDLINE_ROLE); |
| res = MAIN_WRONG_COMMANDLINE; |
| } |
| |
| elog(DEBUG5, "HAWQ RM :: Command line sets RM role: %d\n", |
| DRMGlobalInstance->Role); |
| } |
| } |
| |
| exit: |
| freeSimpleStringContent(&value); |
| return res; |
| } |
| |
| int getStringValue(int argc, char **argv, int pos, SimpString *val) |
| { |
| if ( pos >= argc ) |
| return MAIN_WRONG_COMMANDLINE; |
| setSimpleStringNoLen(val, argv[pos]); |
| return FUNC_RETURN_OK; |
| } |
| |
| int createDRMInstance(void) |
| { |
| int res = FUNC_RETURN_OK; |
| |
| /* Create dynamic resource manager global instance holding all data. */ |
| DRMGlobalInstance = NULL; |
| DRMGlobalInstance = (DynRMGlobal) |
| rm_palloc0(TopMemoryContext, |
| sizeof(struct DynRMGlobalData)); |
| Assert( DRMGlobalInstance != NULL ); |
| return res; |
| } |
| |
| /* |
| * Initialize the dynamic resource manager instance. Return 0 if succeed, other- |
| * wise, -1 is returned. Resource manager's memory context is created as one |
| * child context which is referenced by DRMGlobalInstance->Context. |
| * |
| * TopMemoryContext ( global instance is allocated here ) |
| * | |
| * V |
| * Resource manager context ( all consequent objects are allocated here ) |
| * |
| */ |
| int createDRMMemoryContext(void) |
| { |
| int res = FUNC_RETURN_OK; |
| |
| Assert( DRMGlobalInstance != NULL ); |
| |
| /* Create dynamic resource manager memory context,the new memory context is |
| created as the child of top memory context. */ |
| DRMGlobalInstance->Context = NULL; |
| |
| DRMGlobalInstance->Context = AllocSetContextCreate( |
| TopMemoryContext, |
| DRMGLOBAL_MEMORY_CONTEXT_NAME, |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE ); |
| Assert( PCONTEXT != NULL ); |
| |
| return res; |
| } |
| |
| int initializeDRMInstance(MCTYPE context) |
| { |
| int res = FUNC_RETURN_OK; |
| |
| DRMGlobalInstance->ResourceQueueManager = NULL; |
| DRMGlobalInstance->ConnTrackManager = NULL; |
| DRMGlobalInstance->ResourcePoolInstance = NULL; |
| DRMGlobalInstance->ImpType = NONE_HAWQ2; |
| |
| DRMGlobalInstance->LocalHostLastUpdateTime = 0; |
| DRMGlobalInstance->HeartBeatLastSentTime = 0; |
| DRMGlobalInstance->TmpDirLastCheckTime = 0; |
| DRMGlobalInstance->LocalHostStat = NULL; |
| |
| initializeDQueue(&(DRMGlobalInstance->LocalHostTempDirectories), context); |
| DRMGlobalInstance->LocalHostFailedTmpDirList = NULL; |
| |
| HASHCTL ctl; |
| ctl.keysize = sizeof(TmpDirKey); |
| ctl.entrysize = sizeof(TmpDirEntry); |
| ctl.hcxt = context; |
| |
| /* Tell the working threads keep running. */ |
| DRMGlobalInstance->ResManagerMainKeepRun = true; |
| |
| /* Whether send IMAlive message */ |
| DRMGlobalInstance->SendIMAlive = true; |
| |
| /* Where to send the heart-beat. */ |
| DRMGlobalInstance->SendToStandby = false; |
| |
| DRMGlobalInstance->ResBrokerAppTimeStamp = 0; |
| DRMGlobalInstance->ResBrokerTriggerCleanup = false; |
| |
| /* Get local host name here to make all components able to use this info. */ |
| initSimpleString(&(DRMGlobalInstance->SocketLocalHostName), context); |
| res = getLocalHostName(&(DRMGlobalInstance->SocketLocalHostName)); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(WARNING, "Fail to get local host name."); |
| } |
| |
| /* Set resource manager server startup time to 0, i.e. not started yet. */ |
| DRMGlobalInstance->ResourceManagerStartTime = 0; |
| |
| return res; |
| } |
| |
| int initializeDRMInstanceForQD(void) |
| { |
| int res = FUNC_RETURN_OK; |
| |
| /* Get local host name here to make all components able to use this info. */ |
| initSimpleString(&(DRMGlobalInstance->SocketLocalHostName), |
| DRMGlobalInstance->Context); |
| res = getLocalHostName(&(DRMGlobalInstance->SocketLocalHostName)); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(WARNING, "Fail to get local host name."); |
| } |
| return res; |
| } |
| |
| #define DRMQE2RM_MEMORY_CONTEXT_NAME "QE to RM communication" |
| |
| bool QE2RM_Initialized = false; |
| MemoryContext QE2RM_CommContext = NULL; |
| |
| void initializeDRMInstanceForQE(void) |
| { |
| if ( QE2RM_Initialized ) |
| return; |
| |
| int res = FUNC_RETURN_OK; |
| |
| /* create dynamic resource manager instance to contain config data. */ |
| res = createDRMInstance(); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(ERROR, "Fail to initialize data structure for communicating with " |
| "resource manager."); |
| } |
| |
| MEMORY_CONTEXT_SWITCH_TO(TopMemoryContext) |
| QE2RM_CommContext = AllocSetContextCreate( CurrentMemoryContext, |
| DRMQE2RM_MEMORY_CONTEXT_NAME, |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE ); |
| Assert( QE2RM_CommContext != NULL ); |
| MEMORY_CONTEXT_SWITCH_BACK |
| |
| DRMGlobalInstance->Context = QE2RM_CommContext; |
| |
| /* Get local host name here to make all components able to use this info. */ |
| initSimpleString(&(DRMGlobalInstance->SocketLocalHostName), |
| DRMGlobalInstance->Context); |
| res = getLocalHostName(&(DRMGlobalInstance->SocketLocalHostName)); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(WARNING, "Fail to get local host name."); |
| } |
| |
| /****** Resource enforcement GUCs begins ******/ |
| |
| /* Get resource enforcement enablement flag */ |
| DRMGlobalInstance->ResourceEnforcerCpuEnable = rm_enforce_cpu_enable; |
| |
| /* Get resource enforcement CGroup mount point */ |
| initSimpleString(&(DRMGlobalInstance->ResourceEnforcerCgroupMountPoint), |
| PCONTEXT); |
| setSimpleStringNoLen(&(DRMGlobalInstance->ResourceEnforcerCgroupMountPoint), |
| rm_enforce_cgrp_mnt_pnt); |
| |
| /* Get resource enforcement CGroup hierarchy name */ |
| initSimpleString(&(DRMGlobalInstance->ResourceEnforcerCgroupHierarchyName), |
| PCONTEXT); |
| setSimpleStringNoLen(&(DRMGlobalInstance->ResourceEnforcerCgroupHierarchyName), |
| rm_enforce_cgrp_hier_name); |
| |
| /* Get resource enforcement mapping between hawq weight and yarn weight */ |
| DRMGlobalInstance->ResourceEnforcerCpuWeight = rm_enforce_cpu_weight; |
| |
| /* Get resource enforcement vcore-pcore ratio */ |
| DRMGlobalInstance->ResourceEnforcerVcorePcoreRatio = rm_enforce_core_vpratio; |
| |
| /* Get resource enforcement CGroup cleanup period */ |
| DRMGlobalInstance->ResourceEnforcerCleanupPeriod = rm_enforce_cleanup_period; |
| |
| /****** Resource enforcement GUCs ends ******/ |
| |
| QE2RM_Initialized = true; |
| } |
| |
| /** |
| * Initialize resource manager core structure to get ready for services. |
| */ |
| void initializeDRMCore(void) |
| { |
| /* Initialize resource queue management part. */ |
| DRMGlobalInstance->ResourceQueueManager = |
| (DynResourceQueueManager) |
| rm_palloc0(PCONTEXT, sizeof(DynResourceQueueManagerData)); |
| initializeResourceQueueManager(); |
| |
| /* Initialize connection track manager part. */ |
| DRMGlobalInstance->ConnTrackManager = |
| (ConnectionTrackManager) |
| rm_palloc0(PCONTEXT, sizeof(ConnectionTrackManagerData)); |
| initializeConnectionTrackManager(); |
| |
| /* Initialize node resource management part. */ |
| PRESPOOL = (ResourcePool) rm_palloc0(PCONTEXT, sizeof(ResourcePoolData)); |
| initializeResourcePoolManager(); |
| } |
| |
| static void InitTemporaryDirs(DQueue tmpdirs_list, char *tmpdirs_string) |
| { |
| int templocation = -1; |
| SimpString tmpdirs; |
| initSimpleString(&tmpdirs, PCONTEXT); |
| setSimpleStringNoLen(&tmpdirs, tmpdirs_string); |
| while (FUNC_RETURN_OK == SimpleStringLocateChar(&tmpdirs, ',', &templocation)) |
| { |
| SimpStringPtr tempdir = createSimpleString(PCONTEXT); |
| SimpleStringSubstring(&tmpdirs, 0, templocation, tempdir); |
| insertDQueueTailNode(tmpdirs_list, tempdir); |
| |
| SimpString tmpstr; |
| initSimpleString(&tmpstr, PCONTEXT); |
| SimpleStringSubstring(&tmpdirs, templocation+1, -1, &tmpstr); |
| freeSimpleStringContent(&tmpdirs); |
| SimpleStringCopy(&tmpdirs, &tmpstr); |
| freeSimpleStringContent(&tmpstr); |
| } |
| |
| SimpStringPtr tempdirlast = createSimpleString(PCONTEXT); |
| setSimpleStringWithContent(tempdirlast, tmpdirs.Str, tmpdirs.Len); |
| insertDQueueTailNode(tmpdirs_list, tempdirlast); |
| freeSimpleStringContent(&tmpdirs); |
| } |
| |
| |
| /** |
| * Read loaded config properties and fill the verified values into global |
| * instance. |
| */ |
| int loadDynamicResourceManagerConfigure(void) |
| { |
| elog(DEBUG3, "Resource manager loads Socket Listening Port %d", |
| rm_master_port); |
| elog(DEBUG3, "Resource manager loads Segment Socket Listening Port %d", |
| rm_segment_port); |
| |
| /* Decide global resource manager mode. */ |
| if ( strcasecmp(rm_global_rm_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_YARN) == 0 ) |
| { |
| DRMGlobalInstance->ImpType = YARN_LIBYARN; |
| } |
| else if ( strcasecmp(rm_global_rm_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_NONE) == 0 ) |
| { |
| DRMGlobalInstance->ImpType = NONE_HAWQ2; |
| } |
| else |
| { |
| elog(WARNING, "Wrong global resource manager type set in %s.", |
| HAWQDRM_CONFFILE_SERVER_TYPE); |
| return MAIN_CONF_UNSET_ROLE; |
| } |
| elog(DEBUG3, "Resource manager loads resource broker implement mode : %d", |
| DRMGlobalInstance->ImpType); |
| |
| SimpString segmem; |
| if ( rm_seg_memory_use[0] == '\0' ) |
| { |
| elog(WARNING, "%s is not set", HAWQDRM_CONFFILE_LIMIT_MEMORY_USE); |
| return MAIN_CONF_UNSET_SEGMENT_MEMORY_USE; |
| } |
| |
| setSimpleStringRefNoLen(&segmem, rm_seg_memory_use); |
| int res = SimpleStringToStorageSizeMB(&segmem, |
| (uint32_t *) &(DRMGlobalInstance->SegmentMemoryMB)); |
| if ( res != FUNC_RETURN_OK) |
| { |
| elog(WARNING, "Can not understand the value '%s' of property %s.", |
| rm_seg_memory_use, |
| HAWQDRM_CONFFILE_LIMIT_MEMORY_USE); |
| return MAIN_CONF_UNSET_SEGMENT_MEMORY_USE; |
| } |
| |
| DRMGlobalInstance->SegmentCore = rm_seg_core_use; |
| |
| elog(DEBUG3, "HAWQ RM :: Accepted NONE mode resource management setting, " |
| "each host has (%d MB,%lf) resource capacity.\n", |
| DRMGlobalInstance->SegmentMemoryMB, |
| DRMGlobalInstance->SegmentCore); |
| |
| if ( DRMGlobalInstance->Role == START_RM_ROLE_SEGMENT ) |
| { |
| // For temporary directories |
| InitTemporaryDirs(&DRMGlobalInstance->LocalHostTempDirectories, rm_seg_tmp_dirs); |
| |
| DQUEUE_LOOP_BEGIN(&DRMGlobalInstance->LocalHostTempDirectories, iter, SimpStringPtr, value) |
| elog(LOG, "HAWQ Segment RM :: Temporary directory %s", value->Str); |
| DQUEUE_LOOP_END |
| |
| checkAndBuildFailedTmpDirList(); |
| } |
| |
| /****** Resource enforcement GUCs begins ******/ |
| |
| /* Get resource enforcement enablement flag */ |
| DRMGlobalInstance->ResourceEnforcerCpuEnable = rm_enforce_cpu_enable; |
| |
| /* Get resource enforcement CGroup mount point */ |
| initSimpleString(&(DRMGlobalInstance->ResourceEnforcerCgroupMountPoint), |
| PCONTEXT); |
| setSimpleStringNoLen(&(DRMGlobalInstance->ResourceEnforcerCgroupMountPoint), |
| rm_enforce_cgrp_mnt_pnt); |
| |
| /* Get resource enforcement CGroup hierarchy name */ |
| initSimpleString(&(DRMGlobalInstance->ResourceEnforcerCgroupHierarchyName), |
| PCONTEXT); |
| setSimpleStringNoLen(&(DRMGlobalInstance->ResourceEnforcerCgroupHierarchyName), |
| rm_enforce_cgrp_hier_name); |
| |
| /* Get resource enforcement mapping between hawq weight and yarn weight */ |
| DRMGlobalInstance->ResourceEnforcerCpuWeight = rm_enforce_cpu_weight; |
| |
| /* Get resource enforcement vcore-pcore ratio */ |
| DRMGlobalInstance->ResourceEnforcerVcorePcoreRatio = rm_enforce_core_vpratio; |
| |
| /* Get resource enforcement CGroup cleanup period */ |
| DRMGlobalInstance->ResourceEnforcerCleanupPeriod = rm_enforce_cleanup_period; |
| |
| /****** Resource enforcement GUCs ends ******/ |
| return FUNC_RETURN_OK; |
| } |
| |
| /* |
| * Load resource queue and user information into HAWQ RM memory. If independent |
| * mode, the definition is from xml file saving a list of properties. If normal |
| * forked RM process, this can be loaded from catalog table: pg_authid and |
| * pg_resqueue. |
| * |
| */ |
| int loadAllQueueAndUser(void) |
| { |
| int res = FUNC_RETURN_OK; |
| List *queueprops = NULL; /* Save all queue properties. */ |
| List *userprops = NULL; /* Save all role/user properties.*/ |
| |
| res = loadUserPropertiesFromCatalog(&userprops); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| goto exit; |
| } |
| |
| res = loadQueuePropertiesFromCatalog(&queueprops); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| goto exit; |
| } |
| |
| /* Add resource queue and user into resource pool data structure. */ |
| res = addResourceQueueAndUserFromProperties(queueprops, userprops); |
| /* res is checked before returning. */ |
| Assert(PQUEMGR->RootTrack != NULL); |
| Assert(PQUEMGR->DefaultTrack != NULL); |
| |
| exit: |
| cleanPropertyList(PCONTEXT, &queueprops); |
| cleanPropertyList(PCONTEXT, &userprops); |
| |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog( LOG, "Fail to load queue and user definition."); |
| } |
| return res; |
| } |
| |
| /***************************************************************************** |
| * Load user information from catalog table pg_authid. |
| *****************************************************************************/ |
| int loadUserPropertiesFromCatalog(List **users) |
| { |
| int libpqres = CONNECTION_OK; |
| int ret = FUNC_RETURN_OK; |
| PGconn *conn = NULL; |
| static char conninfo[1024]; |
| PQExpBuffer sql = NULL; |
| PGresult *result = NULL; |
| int ntups = 0; |
| int i_oid = 0, |
| i_rolname = 0, |
| i_rolsuper = 0, |
| i_rolresqueue = 0; |
| |
| Oid roloid = 0, |
| rolresqueueOID = 0; |
| |
| char *rolname = NULL, |
| *rolresqueue = NULL; |
| bool rolsuper = false; |
| int8_t DummyPriority = 3; |
| |
| sprintf(conninfo, "options='-c gp_session_role=UTILITY' " |
| "dbname=template1 port=%d connect_timeout=%d", |
| master_addr_port, |
| LIBPQ_CONNECT_TIMEOUT); |
| |
| conn = PQconnectdb(conninfo); |
| if ((libpqres = PQstatus(conn)) != CONNECTION_OK) |
| { |
| elog(WARNING, "Resource manager failed to connect database when loading" |
| "role specifications from pg_authid, error code %d, " |
| "reason: %s", |
| libpqres, |
| PQerrorMessage(conn)); |
| PQfinish(conn); |
| return LIBPQ_FAIL_EXECUTE; |
| } |
| |
| sql = createPQExpBuffer(); |
| if ( sql == NULL ) |
| { |
| elog(WARNING, "Resource manager failed to allocate buffer for building " |
| "sql statement."); |
| goto cleanup; |
| } |
| appendPQExpBuffer(sql, "SELECT oid,rolname,rolsuper,rolresqueue FROM pg_authid"); |
| result = PQexec(conn, sql->data); |
| if (!result || PQresultStatus(result) != PGRES_TUPLES_OK) |
| { |
| elog(WARNING, "Resource manager failed to run SQL: %s " |
| "when loading role specifications from pg_authid, " |
| "reason : %s", |
| sql->data, |
| PQresultErrorMessage(result)); |
| ret = LIBPQ_FAIL_EXECUTE; |
| goto cleanup; |
| } |
| |
| ntups = PQntuples(result); |
| i_oid = PQfnumber(result, "oid"); |
| i_rolname = PQfnumber(result, "rolname"); |
| i_rolsuper = PQfnumber(result, "rolsuper"); |
| i_rolresqueue = PQfnumber(result, "rolresqueue"); |
| |
| for (int i = 0; i < ntups; i++) |
| { |
| roloid = (Oid)strtoul(PQgetvalue(result, i, i_oid), NULL, 10); |
| rolname = PQgetvalue(result, i, i_rolname); |
| rolresqueue = PQgetvalue(result, i, i_rolresqueue); |
| |
| if (rolresqueue == NULL || strlen(rolresqueue) == 0) |
| { |
| rolresqueueOID = InvalidOid; |
| } |
| else |
| { |
| rolresqueueOID = (Oid)strtoul(PQgetvalue(result, i, i_rolresqueue), NULL, 10); |
| } |
| |
| if (PQgetvalue(result, i, i_rolsuper)[0] == 't') |
| { |
| rolsuper = true; |
| } |
| else |
| { |
| rolsuper = false; |
| } |
| |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| *users = lappend(*users, |
| createPropertyOID( |
| PCONTEXT, |
| "user", |
| getUSRTBLAttributeName(USR_TBL_ATTR_OID), |
| &i, |
| roloid)); |
| *users = lappend(*users, |
| createPropertyName( |
| PCONTEXT, |
| "user", |
| getUSRTBLAttributeName(USR_TBL_ATTR_NAME), |
| &i, |
| (Name)rolname)); |
| *users = lappend(*users, |
| createPropertyOID( |
| PCONTEXT, |
| "user", |
| getUSRTBLAttributeName(USR_TBL_ATTR_TARGET_QUEUE), |
| &i, |
| rolresqueueOID)); |
| *users = lappend(*users, |
| createPropertyBool( |
| PCONTEXT, |
| "user", |
| getUSRTBLAttributeName(USR_TBL_ATTR_IS_SUPERUSER), |
| &i, |
| rolsuper)); |
| *users = lappend(*users, |
| createPropertyInt8( |
| PCONTEXT, |
| "user", |
| getUSRTBLAttributeName(USR_TBL_ATTR_PRIORITY), |
| &i, |
| DummyPriority)); |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| |
| ListCell *cell = NULL; |
| foreach(cell, *users) |
| { |
| KVProperty property = lfirst(cell); |
| elog(RMLOG, "Resource manager loaded role specifications from pg_authid : " |
| "[%s]=[%s]", |
| property->Key.Str, |
| property->Val.Str); |
| } |
| elog(LOG, "Resource manager successfully loaded role specifications."); |
| |
| cleanup: |
| if(sql != NULL) |
| { |
| destroyPQExpBuffer(sql); |
| } |
| if(result != NULL) |
| { |
| PQclear(result); |
| } |
| PQfinish(conn); |
| return ret; |
| } |
| |
| /***************************************************************************** |
| * Load resource queue information from catalog table pg_resqueue. |
| *****************************************************************************/ |
| int loadQueuePropertiesFromCatalog(List **queues) |
| { |
| int libpqres = CONNECTION_OK; |
| int ret = FUNC_RETURN_OK; |
| PGconn *conn = NULL; |
| static char conninfo[1024]; |
| PQExpBuffer sql = NULL; |
| PGresult* result = NULL; |
| int ntups = 0; |
| int i_oid = 0, |
| i_name = 0, |
| i_parent = 0, |
| i_active_stats_cluster = 0, |
| i_memory_limit_cluster = 0, |
| i_core_limit_cluster = 0, |
| i_resource_overcommit = 0, |
| i_allocation_policy = 0, |
| i_vseg_resource_quota = 0, |
| i_nvseg_upper_limit = 0, |
| i_nvseg_lower_limit = 0, |
| i_nvseg_upper_limit_perseg = 0, |
| i_nvseg_lower_limit_perseg = 0, |
| i_creation_time = 0, |
| i_update_time = 0, |
| i_status = 0; |
| |
| Oid oid = 0, |
| parentoid = 0; |
| |
| char *name = NULL, |
| *parent = NULL, |
| *memory_limit_cluster = NULL, |
| *core_limit_cluster = NULL, |
| *allocation_policy = NULL, |
| *vseg_resource_quota = NULL, |
| *status = NULL; |
| |
| int active_stats_cluster = 0, |
| nvseg_upper_limit = 0, |
| nvseg_lower_limit = 0; |
| |
| float nvseg_upper_limit_perseg = 0.0, |
| nvseg_lower_limit_perseg = 0.0, |
| resource_overcommit = 0.0; |
| |
| int64 creation_time = 0, |
| update_time = 0; |
| |
| snprintf(conninfo, sizeof(conninfo), |
| "options='-c gp_session_role=UTILITY' " |
| "dbname=template1 port=%d connect_timeout=%d", |
| master_addr_port, |
| LIBPQ_CONNECT_TIMEOUT); |
| |
| conn = PQconnectdb(conninfo); |
| if ((libpqres = PQstatus(conn)) != CONNECTION_OK) |
| { |
| elog(WARNING, "Resource manager failed to connect database when loading " |
| "resource queue specifications from pg_resqueue, " |
| "error code: %d, reason: %s", |
| libpqres, |
| PQerrorMessage(conn)); |
| PQfinish(conn); |
| return LIBPQ_FAIL_EXECUTE; |
| } |
| |
| sql = createPQExpBuffer(); |
| if ( sql == NULL ) |
| { |
| elog(WARNING, "Resource manager failed to allocate buffer for building " |
| "sql statement."); |
| goto cleanup; |
| } |
| |
| appendPQExpBuffer(sql,"SELECT oid," |
| "rsqname," |
| "parentoid," |
| "activestats," |
| "memorylimit, " |
| "corelimit, " |
| "resovercommit," |
| "allocpolicy, " |
| "vsegresourcequota, " |
| "nvsegupperlimit, " |
| "nvseglowerlimit, " |
| "nvsegupperlimitperseg, " |
| "nvseglowerlimitperseg, " |
| "creationtime, " |
| "updatetime, " |
| "status " |
| "FROM pg_resqueue"); |
| result = PQexec(conn, sql->data); |
| |
| if (!result || PQresultStatus(result) != PGRES_TUPLES_OK) |
| { |
| elog(WARNING, "Resource manager failed to run SQL: %s when loading " |
| "resource queue specifications from pg_resqueue, " |
| "reason : %s", |
| sql->data, |
| PQresultErrorMessage(result)); |
| ret = LIBPQ_FAIL_EXECUTE; |
| goto cleanup; |
| } |
| |
| ntups = PQntuples(result); |
| |
| i_oid = PQfnumber(result, PG_RESQUEUE_COL_OID); |
| i_name = PQfnumber(result, PG_RESQUEUE_COL_RSQNAME); |
| i_parent = PQfnumber(result, PG_RESQUEUE_COL_PARENTOID); |
| i_active_stats_cluster = PQfnumber(result, PG_RESQUEUE_COL_ACTIVESTATS); |
| i_memory_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_MEMORYLIMIT); |
| i_core_limit_cluster = PQfnumber(result, PG_RESQUEUE_COL_CORELIMIT); |
| i_resource_overcommit = PQfnumber(result, PG_RESQUEUE_COL_RESOVERCOMMIT); |
| i_allocation_policy = PQfnumber(result, PG_RESQUEUE_COL_ALLOCPOLICY); |
| i_vseg_resource_quota = PQfnumber(result, PG_RESQUEUE_COL_VSEGRESOURCEQUOTA); |
| i_nvseg_upper_limit = PQfnumber(result, PG_RESQUEUE_COL_NVSEGUPPERLIMIT); |
| i_nvseg_lower_limit = PQfnumber(result, PG_RESQUEUE_COL_NVSEGLOWERLIMIT); |
| i_nvseg_upper_limit_perseg = PQfnumber(result, PG_RESQUEUE_COL_NVSEGUPPERLIMITPERSEG); |
| i_nvseg_lower_limit_perseg = PQfnumber(result, PG_RESQUEUE_COL_NVSEGLOWERLIMITPERSEG); |
| i_creation_time = PQfnumber(result, PG_RESQUEUE_COL_CREATIONTIME); |
| i_update_time = PQfnumber(result, PG_RESQUEUE_COL_UPDATETIME); |
| i_status = PQfnumber(result, PG_RESQUEUE_COL_STATUS); |
| |
| for (int i = 0; i < ntups; i++) |
| { |
| oid = (Oid)strtoul(PQgetvalue(result, i, i_oid), NULL, 10); |
| name = PQgetvalue(result, i, i_name); |
| parent = PQgetvalue(result, i, i_parent); |
| if (parent == NULL || strlen(parent) == 0) |
| { |
| parentoid = InvalidOid; |
| } |
| else |
| { |
| parentoid = (Oid)strtoul(parent, NULL, 10); |
| } |
| |
| parentoid = (Oid)strtoul(PQgetvalue(result, i, i_parent), NULL, 10); |
| active_stats_cluster = atoi(PQgetvalue(result, i, i_active_stats_cluster)); |
| memory_limit_cluster = PQgetvalue(result, i, i_memory_limit_cluster); |
| core_limit_cluster = PQgetvalue(result, i, i_core_limit_cluster); |
| resource_overcommit = atof(PQgetvalue(result, i, i_resource_overcommit)); |
| allocation_policy = PQgetvalue(result, i, i_allocation_policy); |
| vseg_resource_quota = PQgetvalue(result, i, i_vseg_resource_quota); |
| nvseg_upper_limit = atoi(PQgetvalue(result, i, i_nvseg_upper_limit)); |
| nvseg_lower_limit = atoi(PQgetvalue(result, i, i_nvseg_lower_limit)); |
| nvseg_upper_limit_perseg = atof(PQgetvalue(result, i, i_nvseg_upper_limit_perseg)); |
| nvseg_lower_limit_perseg = atof(PQgetvalue(result, i, i_nvseg_lower_limit_perseg)); |
| creation_time = atol(PQgetvalue(result, i, i_creation_time)); |
| update_time = atol(PQgetvalue(result, i, i_update_time)); |
| status = PQgetvalue(result, i, i_status); |
| |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| *queues = lappend(*queues, |
| createPropertyOID( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_OID), |
| &i, |
| oid)); |
| |
| *queues = lappend(*queues, |
| createPropertyName( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_NAME), |
| &i, |
| (Name)name)); |
| |
| *queues = lappend(*queues, |
| createPropertyOID( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_PARENT), |
| &i, |
| parentoid)); |
| |
| *queues = lappend(*queues, |
| createPropertyInt32( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_ACTIVE_STATMENTS), |
| &i, |
| active_stats_cluster)); |
| |
| *queues = lappend(*queues, |
| createPropertyString( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_MEMORY_LIMIT_CLUSTER), |
| &i, |
| memory_limit_cluster)); |
| |
| *queues = lappend(*queues, |
| createPropertyString( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_CORE_LIMIT_CLUSTER), |
| &i, |
| core_limit_cluster)); |
| |
| *queues = lappend(*queues, |
| createPropertyString( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_ALLOCATION_POLICY), |
| &i, |
| allocation_policy)); |
| |
| *queues = lappend(*queues, |
| createPropertyFloat( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_RESOURCE_OVERCOMMIT_FACTOR), |
| &i, |
| resource_overcommit)); |
| |
| *queues = lappend(*queues, |
| createPropertyString( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_VSEG_RESOURCE_QUOTA), |
| &i, |
| vseg_resource_quota)); |
| |
| *queues = lappend(*queues, |
| createPropertyInt32( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT), |
| &i, |
| nvseg_upper_limit)); |
| |
| *queues = lappend(*queues, |
| createPropertyInt32( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT), |
| &i, |
| nvseg_lower_limit)); |
| |
| *queues = lappend(*queues, |
| createPropertyFloat( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_UPPER_LIMIT_PERSEG), |
| &i, |
| nvseg_upper_limit_perseg)); |
| |
| *queues = lappend(*queues, |
| createPropertyFloat( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_NVSEG_LOWER_LIMIT_PERSEG), |
| &i, |
| nvseg_lower_limit_perseg)); |
| |
| *queues = lappend(*queues, |
| createPropertyInt32( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_CREATION_TIME), |
| &i, |
| creation_time)); |
| |
| *queues = lappend(*queues, |
| createPropertyInt32( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_UPDATE_TIME), |
| &i, |
| update_time)); |
| |
| *queues = lappend(*queues, |
| createPropertyString( |
| PCONTEXT, |
| "queue", |
| getRSQTBLAttributeName(RSQ_TBL_ATTR_STATUS), |
| &i, |
| status)); |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| |
| ListCell *cell = NULL; |
| foreach(cell, *queues) |
| { |
| KVProperty property = lfirst(cell); |
| elog(RMLOG, "Resource manger loaded resource queue specifications from " |
| "pg_resqueue : [%s]=[%s]", |
| property->Key.Str, |
| property->Val.Str); |
| } |
| elog(LOG, "Resource manger successfully loaded resource queue specifications"); |
| |
| cleanup: |
| if(sql != NULL) |
| { |
| destroyPQExpBuffer(sql); |
| } |
| if(result != NULL) |
| { |
| PQclear(result); |
| } |
| PQfinish(conn); |
| return ret; |
| } |
| |
| |
| void quitResManager(SIGNAL_ARGS) |
| { |
| DRMGlobalInstance->ResManagerMainKeepRun = false; |
| } |
| |
| void handleChildSignal(SIGNAL_ARGS) |
| { |
| /* The resource broker process quits. */ |
| RB_handleSignalSIGCHLD(); |
| } |
| |
| void notifyPostmasterResManagerStarted(SIGNAL_ARGS) |
| { |
| DRMStartup = true; |
| } |
| |
| /* |
| * Read properties and generate corresponding queue and user definitions. |
| * |
| * properties[in] : list of KVPropertyData |
| * queues[out] : list of DynResourceQueue |
| * users[out] : list of UserInfo |
| */ |
| int addResourceQueueAndUserFromProperties(List *queueprops, List *userprops) |
| { |
| static char errorbuf[1024]; |
| int res = FUNC_RETURN_OK; |
| char *substrstart = NULL; |
| int substrlen = 0; |
| char *substr2start = NULL; |
| int substr2len = 0; |
| List *queueattrlist = NULL; |
| List *userattrlist = NULL; |
| ListCell *cell = NULL; |
| |
| /* |
| * STEP 1. Process properties and build resource queue and user structure. |
| */ |
| int currentindex = -1; |
| List *currentattrs = NULL; |
| foreach(cell, queueprops) |
| { |
| KVProperty value = lfirst(cell); |
| |
| elog(RMLOG, "Loads queue property %s=%s", value->Key.Str, value->Val.Str); |
| |
| /* Split key string into (attribute, index) */ |
| if ( SimpleStringStartWith(&(value->Key), "queue.") != FUNC_RETURN_OK ) |
| { |
| elog(RMLOG, "Ignore property %s=%s", value->Key.Str, value->Val.Str); |
| continue; |
| } |
| |
| /* |
| *---------------------------------------------------------------------- |
| * The key string is formatted as |
| * queue.<queueattrstr>.<queueindexstr>=<value> |
| * (0) (1) (2) |
| *---------------------------------------------------------------------- |
| */ |
| int32_t queueindex; |
| SimpString queueattrstr; |
| SimpString queueindexstr; |
| |
| /* Fill queueindex, queueattrstr */ |
| int parseres1 = PropertyKeySubstring(&(value->Key), |
| 1, |
| &substrstart, |
| &substrlen); |
| int parseres2 = PropertyKeySubstring(&(value->Key), |
| 2, |
| &substr2start, |
| &substr2len); |
| |
| if( parseres1 != FUNC_RETURN_OK || parseres2 != FUNC_RETURN_OK ) |
| { |
| Assert(false); |
| } |
| |
| setSimpleStringRef(&queueindexstr, substr2start, substr2len); |
| setSimpleStringRef(&queueattrstr, substrstart, substrlen); |
| |
| parseres1 = SimpleStringToInt32(&queueindexstr, &queueindex); |
| if ( parseres1 != FUNC_RETURN_OK ) |
| { |
| Assert(false); |
| } |
| |
| /* Fill queue attribute to corresponding list. */ |
| |
| KVProperty newprop = createPropertyEmpty(PCONTEXT); |
| setSimpleStringWithContent(&(newprop->Key), substrstart, substrlen); |
| setSimpleStringNoLen(&(newprop->Val), value->Val.Str); |
| |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| if ( queueindex != currentindex ) |
| { |
| if ( currentattrs != NULL ) |
| { |
| queueattrlist = lappend(queueattrlist, currentattrs); |
| } |
| currentattrs = NULL; |
| currentindex = queueindex; |
| } |
| |
| elog(RMLOG, "Resource manager loaded attribute for creating queue %s=%s", |
| newprop->Key.Str, |
| newprop->Val.Str); |
| |
| currentattrs = lappend(currentattrs, newprop); |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| |
| if ( currentattrs != NULL ) |
| { |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| queueattrlist = lappend(queueattrlist, currentattrs); |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| |
| currentindex = -1; |
| currentattrs = NULL; |
| |
| foreach(cell, userprops) |
| { |
| KVProperty value = lfirst(cell); |
| |
| elog(RMLOG, "Loads user property %s=%s", value->Key.Str, value->Val.Str); |
| |
| /* Split key string into (attribute, index) */ |
| if ( SimpleStringStartWith(&(value->Key), "user.") != FUNC_RETURN_OK ) |
| { |
| elog(RMLOG, "Ignore property %s=%s", value->Key.Str, value->Val.Str); |
| continue; |
| } |
| |
| /* |
| *---------------------------------------------------------------------- |
| * The key string is formatted as |
| * user.<userattrstr>.<userindexstr>=<value> |
| * (0) (1) (2) |
| *---------------------------------------------------------------------- |
| */ |
| int32_t userindex; |
| SimpString userattrstr; |
| SimpString userindexstr; |
| |
| /* Fill userindex, userattrstr */ |
| int parseres1 = PropertyKeySubstring(&(value->Key), |
| 1, |
| &substrstart, |
| &substrlen); |
| int parseres2 = PropertyKeySubstring(&(value->Key), |
| 2, |
| &substr2start, |
| &substr2len); |
| |
| if ( parseres1 != FUNC_RETURN_OK || parseres2 != FUNC_RETURN_OK ) |
| { |
| Assert(false); |
| } |
| |
| setSimpleStringRef(&userindexstr, substr2start, substr2len); |
| setSimpleStringRef(&userattrstr, substrstart, substrlen); |
| |
| parseres1 = SimpleStringToInt32(&userindexstr, &userindex); |
| Assert(parseres1 == FUNC_RETURN_OK); |
| |
| /* Fill user attribute to corresponding list. */ |
| KVProperty newprop = createPropertyEmpty(PCONTEXT); |
| setSimpleStringWithContent(&(newprop->Key), substrstart, substrlen); |
| setSimpleStringNoLen(&(newprop->Val), value->Val.Str); |
| |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| if ( userindex != currentindex ) |
| { |
| if ( currentattrs != NULL ) |
| { |
| userattrlist = lappend(userattrlist, currentattrs); |
| } |
| currentattrs = NULL; |
| currentindex = userindex; |
| } |
| |
| elog(RMLOG, "Resource manager loaded attribute for creating role %s=%s", |
| newprop->Key.Str, |
| newprop->Val.Str); |
| |
| currentattrs = lappend(currentattrs, newprop); |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| |
| if ( currentattrs != NULL ) |
| { |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| userattrlist = lappend(userattrlist, currentattrs); |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| |
| /* |
| * STEP 2. Add resource queues. |
| */ |
| |
| /* STEP 2.1. Build resource queue object with parsed properties set. */ |
| List *rawrsqs = NULL; |
| foreach(cell, queueattrlist) |
| { |
| List *attrs = lfirst(cell); |
| ListCell *cell2 = NULL; |
| foreach(cell2, attrs) |
| { |
| KVProperty attrkv = lfirst(cell2); |
| elog(RMLOG, "To parse : %s=%s", attrkv->Key.Str, attrkv->Val.Str); |
| } |
| |
| DynResourceQueue newqueue = rm_palloc0(PCONTEXT, |
| sizeof(DynResourceQueueData)); |
| |
| res = parseResourceQueueAttributes(attrs, |
| newqueue, |
| false, |
| true, |
| errorbuf, |
| sizeof(errorbuf)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| rm_pfree(PCONTEXT, newqueue); |
| elog(WARNING, "Resource manager can not create resource queue with its " |
| "attributes because %s", |
| errorbuf); |
| continue; |
| } |
| |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| rawrsqs = lappend(rawrsqs, newqueue); |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| |
| /* |
| * STEP 2.2. Reorder the resource queue sequence to ensure that every time |
| * the queue is added into the manager, its parent queue can be found. |
| */ |
| List *orderedrsqs = NULL; |
| bool orderchanged = true; |
| while( orderchanged ) |
| { |
| DynResourceQueue toreordrsq = NULL; |
| ListCell *cell = NULL; |
| ListCell *prevcell = NULL; |
| foreach(cell, rawrsqs) |
| { |
| DynResourceQueue rawqueue = lfirst(cell); |
| if ( rawqueue->ParentOID == InvalidOid ) |
| { |
| toreordrsq = rawqueue; |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| rawrsqs = list_delete_cell(rawrsqs, cell, prevcell); |
| MEMORY_CONTEXT_SWITCH_BACK |
| break; |
| } |
| |
| ListCell *cell2 = NULL; |
| foreach(cell2, orderedrsqs) |
| { |
| DynResourceQueue ordqueue = lfirst(cell2); |
| if ( ordqueue->OID == rawqueue->ParentOID ) |
| { |
| toreordrsq = rawqueue; |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| rawrsqs = list_delete_cell(rawrsqs, cell, prevcell); |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| } |
| |
| if ( toreordrsq != NULL ) |
| { |
| break; |
| } |
| prevcell = cell; |
| } |
| |
| orderchanged = false; |
| if ( toreordrsq != NULL ) |
| { |
| elog(RMLOG, "Find one resource queue valid to continue loading %s.", |
| toreordrsq->Name); |
| orderchanged = true; |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| orderedrsqs = lappend(orderedrsqs, toreordrsq); |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| } |
| |
| /* |
| * There is one possibility that we have some resource queues whose parent |
| * queue ids are not present in the pg_resqueue table or external configure |
| * file. In this case, these resource queues are logged out out as warnings, |
| * and HAWQ RM will ignore them. |
| * |
| * Through catalog, this should be impossible. |
| */ |
| if ( list_length(rawrsqs) > 0 ) |
| { |
| elog(WARNING, "Invalid resource queues are detected due to no valid " |
| "parent resource queue id."); |
| |
| while( list_length(rawrsqs) > 0 ) |
| { |
| DynResourceQueue rawqueue = lfirst(list_head(rawrsqs)); |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| rawrsqs = list_delete_first(rawrsqs); |
| MEMORY_CONTEXT_SWITCH_BACK |
| elog(WARNING, "Invalid resource queue : [%s]", rawqueue->Name); |
| rm_pfree(PCONTEXT, rawqueue); |
| } |
| } |
| |
| /* |
| * STEP 2.3. List of DynResourceQueue, save partially filled attributes. |
| * Actually check and complete the resource queue definition and save it |
| * in the resource queue manager. |
| */ |
| while( list_length(orderedrsqs) > 0 ) |
| { |
| DynResourceQueue partqueue = lfirst(list_head(orderedrsqs)); |
| |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| orderedrsqs = list_delete_first(orderedrsqs); |
| MEMORY_CONTEXT_SWITCH_BACK |
| |
| elog(RMLOG, "Load queue %s.", partqueue->Name); |
| |
| res = checkAndCompleteNewResourceQueueAttributes(partqueue, |
| errorbuf, |
| sizeof(errorbuf)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(RMLOG, "res=%d error=%s, after check and complete queue %s.", |
| res, |
| errorbuf, |
| partqueue->Name); |
| |
| rm_pfree(PCONTEXT, partqueue); |
| elog(WARNING, "Resource manager can not complete resource queue's " |
| "attributes because %s", |
| errorbuf); |
| continue; |
| } |
| |
| elog(RMLOG, "Checked and completed queue %s.", partqueue->Name); |
| |
| DynResourceQueueTrack newtrack = NULL; |
| res = createQueueAndTrack(partqueue, &newtrack, errorbuf, sizeof(errorbuf)); |
| |
| if ( res != FUNC_RETURN_OK ) |
| { |
| rm_pfree(PCONTEXT, partqueue); |
| if ( newtrack != NULL ) |
| { |
| rm_pfree(PCONTEXT, newtrack); |
| } |
| |
| elog( WARNING, "Resource manager can not create resource queue %s " |
| "because %s", |
| partqueue->Name, |
| errorbuf); |
| continue; |
| } |
| |
| elog(RMLOG, "Created queue %s.", partqueue->Name); |
| |
| char buffer[1024]; |
| generateQueueReport(partqueue->OID, buffer, sizeof(buffer)); |
| elog(LOG, "Resource manager created resource queue instance : %s", |
| buffer); |
| } |
| |
| /* |
| * STEP 3. Add users. |
| */ |
| while( list_length(userattrlist) > 0 ) |
| { |
| List *attrs = lfirst(list_head(userattrlist)); |
| |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| userattrlist = list_delete_first(userattrlist); |
| MEMORY_CONTEXT_SWITCH_BACK |
| |
| UserInfo newuser = rm_palloc0(PCONTEXT, |
| sizeof(UserInfoData)); |
| |
| res = parseUserAttributes(attrs,newuser, errorbuf, sizeof(errorbuf)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "Can not create user with its attributes because %s", |
| errorbuf); |
| rm_pfree(PCONTEXT, newuser); |
| continue; |
| } |
| |
| res = checkUserAttributes(newuser, errorbuf, sizeof(errorbuf)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "User is not valid because %s", errorbuf); |
| rm_pfree(PCONTEXT, newuser); |
| continue; |
| } |
| |
| createUser(newuser); |
| |
| char buffer[256]; |
| generateUserReport(newuser->Name, |
| strlen(newuser->Name), |
| buffer, |
| sizeof(buffer)); |
| |
| elog(LOG, "Resource manager created user. %s", buffer); |
| } |
| return res; |
| } |
| |
| int generateAllocRequestToBroker(void) |
| { |
| int res = FUNC_RETURN_OK; |
| /*-------------------------------------------------------------------------- |
| * This is a temporary restrict that HAWQ RM supports only one memory/core |
| * ratio in current version. |
| *-------------------------------------------------------------------------- |
| */ |
| Assert( PQUEMGR->RatioCount == 1 ); |
| |
| DynMemoryCoreRatioTrack mctrack = PQUEMGR->RatioTrackers[0]; |
| |
| bool hasWorkload = mctrack->TotalUsed.MemoryMB + |
| mctrack->TotalRequest.MemoryMB > 0; |
| if ( !hasWorkload ) |
| { |
| /* Check if resource manager has workload recently. */ |
| if ( PQUEMGR->RatioCount == 1 && |
| PQUEMGR->RatioWaterMarks[0].NodeCount > 0 ) |
| { |
| DynMemoryCoreRatioWaterMark mark = |
| (DynMemoryCoreRatioWaterMark) |
| getDQueueHeadNodeData(&(PQUEMGR->RatioWaterMarks[0])); |
| hasWorkload = mark->ClusterMemoryMB > 0 ; |
| } |
| } |
| |
| /* Decide water level of resource. */ |
| int wlevel = hasWorkload ? PQUEMGR->ActualMinGRMContainerPerSeg : 0; |
| switch( DRMGlobalInstance->ImpType ) |
| { |
| case YARN_LIBYARN: |
| /* Do nothing. */ |
| break; |
| case NONE_HAWQ2: |
| /* We always expect all resource allocated. */ |
| wlevel = INT_MAX; |
| break; |
| default: |
| Assert(false); |
| } |
| |
| /* |
| * Go through each segment, decide how many containers should be allocated. |
| * And generate preferred host list based on minimum water level. |
| */ |
| int grmctnsize = 0; |
| List *preferred = NULL; |
| List *ressegl = NULL; |
| ListCell *cell = NULL; |
| getAllPAIRRefIntoList(&(PRESPOOL->Segments), &ressegl); |
| |
| foreach(cell, ressegl) |
| { |
| PAIR pair = (PAIR)lfirst(cell); |
| SegResource segres = (SegResource)(pair->Value); |
| |
| /* |
| * Resource manager skips this segment if |
| * 1) Not FTS available; |
| * 2) Having resource decrease pending. |
| */ |
| if (!IS_SEGSTAT_FTSAVAILABLE(segres->Stat) || |
| (segres->DecPending.MemoryMB > 0 && segres->DecPending.Core > 0)) |
| { |
| continue; |
| } |
| |
| /* Get segment resource capacity. */ |
| uint32_t memcap = getSegResourceCapacityMemory(segres); |
| uint32_t corecap = getSegResourceCapacityCore(segres); |
| |
| /* |
| * The segment capacity mem/core ratio maybe not equal to the cluster |
| * mem/core ratio. |
| */ |
| int ctnsize = memcap / mctrack->MemCoreRatio; |
| ctnsize = ctnsize < corecap ? ctnsize : corecap; |
| |
| /* Follow the water level calculated. */ |
| ctnsize = ctnsize < wlevel ? ctnsize : wlevel; |
| |
| elog(RMLOG, "Host %s has %d GRM containers, (%d MB, %lf CORE) increase pending.", |
| GET_SEGRESOURCE_HOSTNAME(segres), |
| (segres->ContainerSets[0] == NULL ? |
| 0 : |
| list_length(segres->ContainerSets[0]->Containers)), |
| segres->IncPending.MemoryMB, |
| segres->IncPending.Core); |
| |
| int ctnsizeneed = ctnsize - |
| (segres->ContainerSets[0] == NULL ? |
| 0 : |
| list_length(segres->ContainerSets[0]->Containers)) - |
| segres->IncPending.MemoryMB / mctrack->MemCoreRatio; |
| if ( ctnsizeneed <= 0 ) |
| { |
| /* |
| * If the segment contains more resource than expected resource |
| * water level. |
| */ |
| continue; |
| } |
| |
| /* Build preferred host information. */ |
| PAIR newpair = rm_palloc0(PCONTEXT, sizeof(PAIRData)); |
| newpair->Key = segres; |
| ResourceBundle resource = rm_palloc0(PCONTEXT, sizeof(ResourceBundleData)); |
| resource->MemoryMB = mctrack->MemCoreRatio * (ctnsizeneed); |
| resource->Core = ctnsizeneed; |
| newpair->Value = resource; |
| preferred = lappend(preferred, newpair); |
| |
| elog(DEBUG3, "Expect resource from resource broker with " |
| "preferred host, hostname:%s, container number:%lf.", |
| GET_SEGRESOURCE_HOSTNAME(segres), |
| resource->Core); |
| |
| |
| grmctnsize += ctnsizeneed; |
| } |
| freePAIRRefList(&(PRESPOOL->Segments), &ressegl); |
| |
| elog(RMLOG, "Resource manager needs minimum %d GRM containers.", grmctnsize); |
| |
| /* Decide how much resource to acquire from global resource manager. */ |
| int32_t reqmem = 0; |
| int32_t reqcore = ceil(mctrack->TotalRequest.Core + |
| mctrack->TotalUsed.Core - |
| mctrack->TotalAllocated.Core - |
| mctrack->TotalPending.Core); |
| |
| elog(RMLOG, "Memory Core Track has %lf requested, %lf used, %lf allocated, " |
| "%lf pending.", |
| mctrack->TotalRequest.Core, |
| mctrack->TotalUsed.Core, |
| mctrack->TotalAllocated.Core, |
| mctrack->TotalPending.Core); |
| /* |
| * If after checking water level of segment resource, we expect more resource |
| * containers, we expect more as well. |
| */ |
| reqcore = reqcore < grmctnsize - mctrack->TotalPending.Core ? |
| grmctnsize - mctrack->TotalPending.Core : |
| reqcore; |
| reqmem = reqcore * mctrack->MemCoreRatio; |
| |
| elog(RMLOG, "Resource manager now needs %d GRM containers.", reqcore); |
| |
| /* |
| * Check if should raise water level to deal with resource fragment or |
| * resource uneven problems. We trigger this logic only when no resource |
| * request caused by lack of resource, and no pending resource are waited |
| * for. |
| */ |
| if ( reqcore <= 0 && |
| mctrack->TotalPending.Core <= 0 && |
| (PQUEMGR->hasResourceProblem[RESPROBLEM_FRAGMENT] || |
| PQUEMGR->hasResourceProblem[RESPROBLEM_UNEVEN] || |
| PQUEMGR->hasResourceProblem[RESPROBLEM_TOOFEWSEG]) ) |
| { |
| /* Check if it is possible to raise water level. */ |
| if ( mctrack->TotalAllocated.Core + 1 <= |
| PRESPOOL->GRMTotal.Core * PQUEMGR->GRMQueueMaxCapacity ) |
| { |
| /* |
| * We only add one more GRM container to acquire, this will trigger |
| * the following logic to raise the water level. |
| */ |
| reqcore = 1; |
| reqmem = reqcore * mctrack->MemCoreRatio; |
| |
| PQUEMGR->hasResourceProblem[RESPROBLEM_FRAGMENT] = false; |
| PQUEMGR->hasResourceProblem[RESPROBLEM_UNEVEN] = false; |
| PQUEMGR->hasResourceProblem[RESPROBLEM_TOOFEWSEG] = false; |
| |
| elog(LOG, "Resource manager raises segment resource water level."); |
| } |
| } |
| |
| /* Call resource broker to request resource. */ |
| if ( reqmem > 0 && reqcore > 0 ) |
| { |
| /* |
| * Here we know that we have to allocate more resource from GRM, we should |
| * check again to enrich the preferred host list for new locality data |
| * of expected additional GRM containers. |
| * |
| * The expected size of GRM containers may vary because we expect GRM |
| * to have even number of GRM containers in each available segments. |
| */ |
| completeAllocRequestToBroker(&reqmem, &reqcore, &preferred); |
| |
| elog(RMLOG, "Resource manager needs %d GRM containers after adjusting " |
| "overall water level.", |
| reqcore); |
| |
| if ( reqcore > 0 ) |
| { |
| PRESPOOL->LastResAcqTime = gettime_microsec(); |
| |
| addResourceBundleData(&(mctrack->TotalPending), reqmem, reqcore); |
| uint64_t oldtime = mctrack->TotalPendingStartTime; |
| if ( mctrack->TotalPendingStartTime == 0 ) |
| { |
| mctrack->TotalPendingStartTime = gettime_microsec(); |
| elog(DEBUG3, "Global resource total pending start time is updated " |
| "to "UINT64_FORMAT, |
| mctrack->TotalPendingStartTime); |
| } |
| |
| /* Generate request to resource broker now. */ |
| res = RB_acquireResource(reqmem, reqcore, preferred); |
| if ( res != FUNC_RETURN_OK && res != RESBROK_PIPE_BUSY ) |
| { |
| minusResourceBundleData(&(mctrack->TotalPending), reqmem, reqcore); |
| mctrack->TotalPendingStartTime = oldtime; |
| elog(WARNING, "Resource manager failed to allocate resource from " |
| "resource broker (%d MB, %d CORE).", |
| reqmem, reqcore); |
| } |
| else if ( res == RESBROK_PIPE_BUSY ) |
| { |
| minusResourceBundleData(&(mctrack->TotalPending), reqmem, reqcore); |
| elog(DEBUG3, "Resource manager should retry to submit request."); |
| res = FUNC_RETURN_OK; |
| } |
| else |
| { |
| elog(RMLOG, "Resource manager finished submitting resource " |
| "allocation request to global resource manager for " |
| "(%d MB, %d CORE).", |
| reqmem, |
| reqcore); |
| } |
| } |
| } |
| |
| return res; |
| } |
| |
| void completeAllocRequestToBroker(int32_t *reqmem, |
| int32_t *reqcore, |
| List **preferred) |
| { |
| /* |
| * Go through each segment to get minimum water level. The idea of completing |
| * the request is to keep pulling up the lowest water level in the cluster |
| * until equal or more GRM containers are requested. |
| */ |
| Assert(*reqmem % *reqcore == 0); |
| uint32_t ratio = *reqmem / *reqcore; |
| |
| /* Step 1. Get lowest water level and build up index. */ |
| List *ressegl = NULL; |
| getAllPAIRRefIntoList(&(PRESPOOL->Segments), &ressegl); |
| /* Index of each segment in current preferred host list. */ |
| PAIR *reqidx = rm_palloc0(PCONTEXT, sizeof(PAIR) * list_length(ressegl)); |
| int llevel = INT_MAX; |
| int totalcount = 0; |
| int index = 0; |
| bool allunavail = true; |
| ListCell *cell = NULL; |
| foreach(cell, ressegl) |
| { |
| reqidx[index] = NULL; |
| |
| PAIR pair = (PAIR)lfirst(cell); |
| SegResource segres = (SegResource)(pair->Value); |
| |
| /* |
| * Resource manager skips this segment if |
| * 1) Not FTS available; |
| * 2) Having resource decrease pending. |
| */ |
| if (!IS_SEGSTAT_FTSAVAILABLE(segres->Stat) || |
| (segres->DecPending.MemoryMB > 0 && segres->DecPending.Core > 0)) |
| { |
| index++; |
| continue; |
| } |
| |
| allunavail = false; |
| |
| int clevel = (segres->ContainerSets[0] == NULL ? |
| 0 : |
| list_length(segres->ContainerSets[0]->Containers)) + |
| segres->IncPending.MemoryMB / ratio; |
| |
| ListCell *pcell = NULL; |
| foreach(pcell, *preferred) |
| { |
| PAIR existpair = (PAIR)lfirst(pcell); |
| if ( existpair->Key == segres ) |
| { |
| reqidx[index] = existpair; |
| totalcount += ((ResourceBundle)(reqidx[index]->Value))->MemoryMB / |
| ratio; |
| break; |
| } |
| } |
| |
| int creqsize = reqidx[index] == NULL ? |
| 0 : |
| ((ResourceBundle)(reqidx[index]->Value))->MemoryMB / ratio; |
| |
| llevel = (clevel+creqsize) < llevel ? (clevel+creqsize) : llevel; |
| index++; |
| } |
| |
| /* If no segment available for adjusting the request, no need to keep going */ |
| if ( allunavail ) |
| { |
| rm_pfree(PCONTEXT, reqidx); |
| return; |
| } |
| |
| for ( int i = 0 ; i < list_length(ressegl) ; ++i ) |
| { |
| if ( reqidx[i] == NULL ) |
| { |
| continue; |
| } |
| |
| elog(RMLOG, "Expect resource from resource broker with " |
| "preferred host, hostname:%s, container number:%lf, " |
| "increase pending %d.", |
| GET_SEGRESOURCE_HOSTNAME(((SegResource)(reqidx[i]->Key))), |
| ((ResourceBundle)(reqidx[i]->Value))->Core, |
| ((SegResource)(reqidx[i]->Key))->IncPending.MemoryMB/ratio); |
| } |
| |
| elog(RMLOG, "Lowest water level before adjusting is %d.", llevel); |
| |
| /* Step 2. Adjust request. */ |
| int32_t reqcoreleft = *reqcore - totalcount; |
| bool keeplooping = true; |
| while( reqcoreleft > 0 && keeplooping ) |
| { |
| llevel++; |
| index = 0; |
| keeplooping = false; |
| foreach(cell, ressegl) |
| { |
| PAIR pair = (PAIR)lfirst(cell); |
| SegResource segres = (SegResource)(pair->Value); |
| |
| /* |
| * Resource manager skips this segment if |
| * 1) Not FTS available; |
| * 2) Having resource decrease pending. |
| */ |
| if (!IS_SEGSTAT_FTSAVAILABLE(segres->Stat) || |
| (segres->DecPending.MemoryMB > 0 && segres->DecPending.Core > 0)) |
| { |
| index++; |
| continue; |
| } |
| |
| /* Check total capacity of the segment. */ |
| uint32_t corecap = getSegResourceCapacityCore(segres); |
| |
| int clevel = (segres->ContainerSets[0] == NULL ? |
| 0 : |
| list_length(segres->ContainerSets[0]->Containers)) + |
| segres->IncPending.MemoryMB / ratio; |
| |
| int aclevel = reqidx[index] == NULL ? |
| clevel : |
| (clevel + ((ResourceBundle)(reqidx[index]->Value))->MemoryMB / ratio); |
| |
| if ( llevel > aclevel && llevel <= corecap ) |
| { |
| if ( reqidx[index] == NULL ) |
| { |
| reqidx[index] = rm_palloc0(PCONTEXT, sizeof(PAIRData)); |
| reqidx[index]->Key = segres; |
| ResourceBundle resource = rm_palloc0(PCONTEXT, |
| sizeof(ResourceBundleData)); |
| resetResourceBundleData(resource, 0, 0, ratio); |
| reqidx[index]->Value = resource; |
| *preferred = lappend(*preferred, reqidx[index]); |
| } |
| addResourceBundleData((ResourceBundle)(reqidx[index]->Value), |
| (llevel-aclevel) * ratio, |
| llevel-aclevel); |
| reqcoreleft -= llevel-aclevel; |
| keeplooping = true; |
| |
| elog(RMLOG, "Resource manager acquires %lf GRM containers on " |
| "host %s. Current level(having pending) %d, " |
| "expect level %d, acquired in current request %d.", |
| ((ResourceBundle)(reqidx[index]->Value))->Core, |
| GET_SEGRESOURCE_HOSTNAME(((SegResource)(reqidx[index]->Key))), |
| clevel, |
| llevel, |
| aclevel-clevel); |
| } |
| index++; |
| } |
| } |
| |
| /* Adjust total mem and core request. */ |
| *reqcore -= reqcoreleft; |
| *reqmem = *reqcore * ratio; |
| |
| rm_pfree(PCONTEXT, reqidx); |
| } |
| /* |
| * Create all socket servers to accept connection from QD or RM agents. |
| */ |
| int initializeSocketServer(void) |
| { |
| int res = FUNC_RETURN_OK; |
| int netres = 0; |
| char *allip = "0.0.0.0"; |
| pgsocket RMListenSocket[HAWQRM_SERVER_PORT_COUNT]; |
| |
| for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) |
| { |
| RMListenSocket[i] = PGINVALID_SOCKET; |
| } |
| |
| netres = StreamServerPort(AF_UNSPEC, |
| allip, |
| rm_master_port, |
| NULL, |
| RMListenSocket, |
| HAWQRM_SERVER_PORT_COUNT); |
| if ( netres != STATUS_OK ) |
| { |
| res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER; |
| elog(LOG, "Resource manager cannot create socket server. Port=%d", |
| rm_master_port); |
| return res; |
| } |
| |
| /* Initialize array for polling all file descriptors. */ |
| initializeAsyncComm(); |
| int validfdcount = 0; |
| AsyncCommBuffer newbuffer = NULL; |
| for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) |
| { |
| if (RMListenSocket[i] != PGINVALID_SOCKET) |
| { |
| netres = registerFileDesc(RMListenSocket[i], |
| ASYNCCOMM_READ, |
| &AsyncCommBufferHandlersMsgServer, |
| NULL, |
| &newbuffer); |
| if ( netres != FUNC_RETURN_OK ) |
| { |
| res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER; |
| elog(WARNING, "Resource manager cannot track socket server."); |
| break; |
| } |
| validfdcount++; |
| InitHandler_Message(newbuffer); |
| } |
| } |
| |
| if ( res != FUNC_RETURN_OK ) |
| { |
| for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) |
| { |
| if ( RMListenSocket[i] != PGINVALID_SOCKET ) |
| { |
| close(RMListenSocket[i]); |
| } |
| } |
| return res; |
| } |
| |
| elog(LOG, "Resource manager starts accepting resource request. " |
| "Listening normal socket port %d. " |
| "Total listened %d FDs.", |
| rm_master_port, |
| validfdcount); |
| return res; |
| } |
| |
| |
| /* |
| * Send built response content back to RM. The sending is asynchronous. |
| * |
| * If the progress is error, close the connection. |
| */ |
| void sendResponseToClients(void) |
| { |
| while( list_length(PCONTRACK->ConnToSend) > 0 ) |
| { |
| ConnectionTrack conntrack = (ConnectionTrack) |
| lfirst(list_head(PCONTRACK->ConnToSend)); |
| MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) |
| PCONTRACK->ConnToSend = list_delete_first(PCONTRACK->ConnToSend); |
| MEMORY_CONTEXT_SWITCH_BACK |
| |
| if ( conntrack->CommBuffer != NULL ) |
| { |
| buildMessageToCommBuffer(conntrack->CommBuffer, |
| conntrack->MessageBuff.Buffer, |
| conntrack->MessageSize, |
| conntrack->MessageID, |
| conntrack->MessageMark1, |
| conntrack->MessageMark2); |
| } |
| } |
| } |
| |
| /* |
| * Check and set the nodes down that are not updated by IMAlive heart-beat for a |
| * long time. |
| */ |
| void updateStatusOfAllNodes() |
| { |
| SegResource node = NULL; |
| uint64_t curtime = 0; |
| |
| bool changedstatus = false; |
| curtime = gettime_microsec(); |
| for(uint32_t idx = 0; idx < PRESPOOL->SegmentIDCounter; idx++) |
| { |
| node = getSegResource(idx); |
| Assert(node != NULL); |
| uint8_t oldStatus = node->Stat->FTSAvailable; |
| if ( (curtime - node->LastUpdateTime > |
| 1000000LL * rm_segment_heartbeat_timeout) && |
| (node->Stat->StatusDesc & SEG_STATUS_HEARTBEAT_TIMEOUT) == 0) |
| { |
| /* |
| * This segment is heartbeat timeout, update its description |
| * and set it to unavailable if needed. |
| */ |
| if (oldStatus == RESOURCE_SEG_STATUS_AVAILABLE) |
| { |
| /* |
| * This call makes resource manager able to adjust queue and mem/core |
| * trackers' capacity. |
| */ |
| setSegResHAWQAvailability(node, RESOURCE_SEG_STATUS_UNAVAILABLE); |
| /* |
| * This call makes resource pool remove unused containers. |
| */ |
| returnAllGRMResourceFromSegment(node); |
| changedstatus = true; |
| } |
| |
| node->Stat->StatusDesc |= SEG_STATUS_HEARTBEAT_TIMEOUT; |
| if (Gp_role != GP_ROLE_UTILITY) |
| { |
| SimpStringPtr description = build_segment_status_description(node->Stat); |
| update_segment_status(idx + REGISTRATION_ORDER_OFFSET, |
| SEGMENT_STATUS_DOWN, |
| (description->Len > 0)?description->Str:""); |
| add_segment_history_row(idx + REGISTRATION_ORDER_OFFSET, |
| GET_SEGRESOURCE_HOSTNAME(node), |
| (description->Len > 0)?description->Str:""); |
| |
| freeSimpleStringContent(description); |
| rm_pfree(PCONTEXT, description); |
| } |
| |
| elog(WARNING, "Resource manager sets host %s heartbeat timeout.", |
| GET_SEGRESOURCE_HOSTNAME(node)); |
| } |
| } |
| |
| if ( changedstatus ) |
| { |
| refreshResourceQueueCapacity(false); |
| refreshActualMinGRMContainerPerSeg(); |
| } |
| |
| validateResourcePoolStatus(true); |
| } |
| |
| /* |
| * Set all nodes DOWN due to no GRM cluster report |
| */ |
| void setAllNodesGRMDown() |
| { |
| SegResource node = NULL; |
| |
| bool changedstatus = false; |
| for(uint32_t idx = 0; idx < PRESPOOL->SegmentIDCounter; idx++) |
| { |
| node = getSegResource(idx); |
| Assert(node != NULL); |
| uint8_t oldStatus = node->Stat->FTSAvailable; |
| uint32_t oldDesc = node->Stat->StatusDesc; |
| |
| if (oldStatus == RESOURCE_SEG_STATUS_AVAILABLE) |
| { |
| /* |
| * This call makes resource manager able to adjust queue and mem/core |
| * trackers' capacity. |
| */ |
| setSegResHAWQAvailability(node, RESOURCE_SEG_STATUS_UNAVAILABLE); |
| changedstatus = true; |
| } |
| |
| node->Stat->StatusDesc |= SEG_STATUS_NO_GRM_NODE_REPORT; |
| if (Gp_role != GP_ROLE_UTILITY && oldDesc != node->Stat->StatusDesc) |
| { |
| SimpStringPtr description = build_segment_status_description(node->Stat); |
| update_segment_status(idx + REGISTRATION_ORDER_OFFSET, |
| SEGMENT_STATUS_DOWN, |
| (description->Len > 0)?description->Str:""); |
| add_segment_history_row(idx + REGISTRATION_ORDER_OFFSET, |
| GET_SEGRESOURCE_HOSTNAME(node), |
| (description->Len > 0)?description->Str:""); |
| |
| freeSimpleStringContent(description); |
| rm_pfree(PCONTEXT, description); |
| } |
| |
| if (oldDesc != node->Stat->StatusDesc) |
| { |
| elog(WARNING, "Resource manager sets host %s down in cleanup phase " |
| "for resource broker error.", |
| GET_SEGRESOURCE_HOSTNAME(node)); |
| } |
| } |
| |
| if (changedstatus) |
| { |
| refreshResourceQueueCapacity(false); |
| refreshActualMinGRMContainerPerSeg(); |
| } |
| } |
| |
| /** |
| * Load segment information from file. |
| * |
| * NOTE: This is a test facility. |
| */ |
| int loadHostInformationIntoResourcePool(void) |
| { |
| int res = FUNC_RETURN_OK; |
| SelfMaintainBufferData seginfobuff; |
| FILE *phostlist = NULL; |
| |
| /* Open the file */ |
| |
| elog(LOG, "HAWQ RM :: To load file %s", rm_resourcepool_test_filename); |
| |
| phostlist = fopen(rm_resourcepool_test_filename, "r"); |
| if ( phostlist == NULL ) |
| return FUNC_RETURN_OK; |
| |
| /* This buffer is used for building machine id instance. */ |
| initializeSelfMaintainBuffer(&seginfobuff, PCONTEXT); |
| |
| /* |
| * Loop each host, we identify hosts by hostname only. The first segment in |
| * that host is selected as the target machine. |
| */ |
| char line[1024]; |
| while( fgets(line, sizeof(line)-1, phostlist) != NULL ) { |
| |
| elog(LOG, "HAWQ RM :: Load line %s", line); |
| |
| /* Parse line. */ |
| char *phostname = strtok(line, ","); |
| if ( phostname == NULL ) continue; |
| char *phostport = strtok(NULL, ","); |
| if ( phostport == NULL ) continue; |
| char *phostip = strtok(NULL, ","); |
| if ( phostip == NULL ) continue; |
| uint32_t port = 0; |
| if (sscanf(phostport, "%d", &port) != 1) |
| { |
| elog(LOG, "HAWQ RM :: Invalid port, skip machine %s", phostname); |
| continue; |
| } |
| |
| /* If the host is new, add new host instance in resource pool */ |
| int32_t segtid = SEGSTAT_ID_INVALID; |
| if ( getSegIDByHostName(phostname,strlen(phostname), &segtid) == |
| FUNC_RETURN_OK ) |
| { |
| elog(LOG, "HAWQ RM :: Skip machine %s", phostname); |
| continue; |
| } |
| |
| elog(LOG, "HAWQ RM :: Find machine to add. %s", phostname); |
| |
| /* Reserve fixed head part of the machine id instance. */ |
| resetSelfMaintainBuffer(&seginfobuff); |
| prepareSelfMaintainBuffer(&seginfobuff, sizeof(SegInfoData),true); |
| SegInfo newhost = (SegInfo)(seginfobuff.Buffer); |
| newhost->master = 0; |
| newhost->standby = 0; |
| newhost->port = port; |
| newhost->alive = 1; |
| |
| jumpforwardSelfMaintainBuffer(&seginfobuff, sizeof(SegInfoData)); |
| |
| /* Add address offset and attributes. Currently, only one address. */ |
| newhost->HostAddrCount = 1; |
| newhost->AddressAttributeOffset = sizeof(SegInfoData); |
| /* Jump offset and reserved pad. */ |
| newhost->AddressContentOffset = sizeof(SegInfoData) + |
| sizeof(uint32_t) * (((newhost->HostAddrCount + 1) >> 1) << 1); |
| uint16_t addrattr = HOST_ADDRESS_CONTENT_STRING; |
| uint16_t addroffset = newhost->AddressContentOffset; |
| |
| appendSMBVar(&seginfobuff, addroffset); |
| appendSMBVar(&seginfobuff, addrattr); |
| appendSelfMaintainBufferTill64bitAligned(&seginfobuff); |
| |
| /* Add hostip content. */ |
| uint32_t length = strlen(phostip); |
| while( length > 0 && |
| (phostip[length-1] == '\r' || |
| phostip[length-1] == '\n' || |
| phostip[length-1] == '\t' || |
| phostip[length-1] == ' ') ) { |
| length--; |
| } |
| int addrsize = __SIZE_ALIGN64(offsetof(AddressStringData, Address) + |
| length + 1); |
| prepareSelfMaintainBuffer(&seginfobuff, addrsize, true); |
| AddressString straddr = (AddressString)(seginfobuff.Buffer + |
| seginfobuff.Cursor + 1); |
| straddr->Length = length; |
| memcpy(straddr->Address, phostip, length+1); |
| straddr->Address[length] = '\0'; |
| jumpforwardSelfMaintainBuffer(&seginfobuff, addrsize); |
| |
| newhost = (SegInfo)(seginfobuff.Buffer); |
| |
| /* Append hostname string. */ |
| newhost->HostNameLen = strlen(phostname); |
| newhost->HostNameOffset = seginfobuff.Cursor + 1; |
| appendSMBStr(&seginfobuff,phostname); |
| appendSelfMaintainBufferTill64bitAligned(&seginfobuff); |
| |
| /* Update total machine id instance size. */ |
| newhost = (SegInfo)(seginfobuff.Buffer); |
| newhost->Size = seginfobuff.Cursor + 1; |
| |
| /* Build machine info instance. */ |
| SegStat segstat = (SegStat)rm_palloc0(PCONTEXT, |
| offsetof(SegStatData, Info) + |
| seginfobuff.Cursor + 1); |
| segstat->ID = SEGSTAT_ID_INVALID; |
| segstat->FTSAvailable = RESOURCE_SEG_STATUS_AVAILABLE; |
| segstat->FTSTotalMemoryMB = DRMGlobalInstance->SegmentMemoryMB; |
| segstat->FTSTotalCore = DRMGlobalInstance->SegmentCore; |
| segstat->GRMTotalMemoryMB = 0; |
| segstat->GRMTotalCore = 0; |
| segstat->FailedTmpDirNum = 0; |
| |
| memcpy((char *)segstat + offsetof(SegStatData, Info), |
| seginfobuff.Buffer, |
| seginfobuff.Cursor+1); |
| |
| SelfMaintainBufferData segreport; |
| initializeSelfMaintainBuffer(&segreport,PCONTEXT); |
| generateSegStatReport(segstat, &segreport); |
| elog(LOG, "Resource manager builds available segment from file: %s", |
| segreport.Buffer); |
| destroySelfMaintainBuffer(&segreport); |
| |
| bool capstatchanged = false; |
| res = addHAWQSegWithSegStat(segstat, &capstatchanged); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "Resource manager failed to add machine from file."); |
| rm_pfree(PCONTEXT, segstat); |
| } |
| |
| /* Fill HDFS and GRM host name cache */ |
| int32_t id = SEGSTAT_ID_INVALID; |
| SimpString key; |
| setSimpleStringRefNoLen(&key, GET_SEGINFO_HOSTNAME(&(segstat->Info))); |
| getSegIDByHostName(GET_SEGINFO_HOSTNAME(&(segstat->Info)), |
| segstat->Info.HostNameLen, |
| &id); |
| setHASHTABLENode(&(PRESPOOL->HDFSHostNameIndexed), |
| &key, |
| TYPCONVERT(void *, id), |
| false); |
| setHASHTABLENode(&(PRESPOOL->GRMHostNameIndexed), |
| &key, |
| TYPCONVERT(void *, id), |
| false); |
| } |
| |
| /* Refresh resource queue capacities. */ |
| refreshResourceQueueCapacity(false); |
| refreshActualMinGRMContainerPerSeg(); |
| /* Recalculate all memory/core ratio instances' limits. */ |
| refreshMemoryCoreRatioLimits(); |
| /* Refresh memory/core ratio level water mark. */ |
| refreshMemoryCoreRatioWaterMark(); |
| |
| /* Free buffer if used. */ |
| destroySelfMaintainBuffer(&seginfobuff); |
| |
| fclose(phostlist); |
| |
| return FUNC_RETURN_OK; |
| } |
| |
| extern Datum dump_resource_manager_status(PG_FUNCTION_ARGS) |
| { |
| static char errorbuf[ERRORMESSAGE_SIZE]; |
| int type = PG_GETARG_INT32(0); |
| char message[1024] = {0}; |
| char dump_file[1024] = {0}; |
| |
| switch (type) |
| { |
| case 1: |
| strcpy(dump_file, "/tmp/resource_manager_conntrack_status"); |
| sprintf(message, "Dump resource manager connection track status to %s", dump_file); |
| break; |
| |
| case 2: |
| strcpy(dump_file, "/tmp/resource_manager_resqueue_status"); |
| sprintf(message, "Dump resource manager resource queue status to %s", dump_file); |
| break; |
| |
| case 3: |
| strcpy(dump_file, "/tmp/resource_manager_respool_status"); |
| sprintf(message, "Dump resource manager resource pool status to %s", dump_file); |
| break; |
| |
| default: |
| sprintf(message, "Dump resource manager status failed.\n" |
| "1 -> dump resource manager connection track status\n" |
| "2 -> dump resource manager resource queue status\n" |
| "3 -> dump resource manager resource pool status"); |
| PG_RETURN_TEXT_P(cstring_to_text(message)); |
| } |
| |
| dumpResourceManagerStatus(type, dump_file, errorbuf, sizeof(errorbuf)); |
| |
| PG_RETURN_TEXT_P(cstring_to_text(message)); |
| } |
| |
| void processResourceBrokerTasks(void) |
| { |
| uint64_t curtime = 0; |
| int res = FUNC_RETURN_OK; |
| |
| if ( !isCleanGRMResourceStatus() ) |
| { |
| /* |
| * STEP 1. Check if should generate request to periodically check |
| * GRM cluster status. |
| * |
| * Generate request to the global resource manager ( YARN, etc.) to get |
| * cluster report. This is a asynchronous request, therefore, we don't |
| * need to handle the real cluster report right now. The possibility is |
| * reserved that if one fake global resource manager is implemented in |
| * HAWQ RM process and the cluster report can be returned directly in |
| * this function. |
| */ |
| curtime = gettime_microsec(); |
| |
| if ( (PRESPOOL->Segments.NodeCount > 0 ) && |
| (curtime - PRESPOOL->LastUpdateTime > |
| rm_cluster_report_period * 1000000LL || |
| requireInstantClusterReport() ) && |
| (curtime - PRESPOOL->LastRequestTime > 5LL * 1000000LL) ) |
| { |
| double maxcap = 0.0; |
| List *report = NULL; |
| |
| PRESPOOL->LastRequestTime = curtime; |
| res = RB_getClusterReport(rm_grm_yarn_queue, &report, &maxcap); |
| Assert(report == NULL); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "Resource manager fails to fresh cluster report " |
| "from global resource manager."); |
| goto exit; |
| } |
| } |
| |
| /* STEP 2. Check if should periodically check container status. */ |
| curtime = gettime_microsec(); |
| |
| if ( PRESPOOL->AddPendingContainerCount == 0 && |
| PRESPOOL->RetPendingContainerCount == 0 && |
| (curtime - PRESPOOL->LastCheckContainerTime > |
| rm_cluster_report_period * 1000000LL) && |
| (curtime - PRESPOOL->LastRequestContainerTime > 5LL * 1000000LL) ) |
| { |
| List *report = NULL; |
| PRESPOOL->LastRequestContainerTime = curtime; |
| res = RB_getContainerReport(&report); |
| Assert( report == NULL ); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "Resource manager fails to get container report " |
| "from global resource manager."); |
| goto exit; |
| } |
| } |
| |
| /* STEP 3. Return kicked GRM containers. */ |
| curtime = gettime_microsec(); |
| |
| if ( !PRESPOOL->pausePhase[QUOTA_PHASE_KICKED_TO_RETURN] ) |
| { |
| res = RB_returnResource(&(PRESPOOL->KickedContainers)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "Resource manager failed to return kicked container to " |
| "global resource manager."); |
| goto exit; |
| } |
| } |
| else |
| { |
| elog(LOG, "Paused returning GRM containers kicked to GRM."); |
| } |
| /* |
| * STEP 4. Handle resource broker input as new allocated resource or |
| * cluster report, container report etc. The allocated resource |
| * from resource broker will be added to the resource queue |
| * manager and resource pool. |
| */ |
| res = RB_handleNotifications(); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "Resource manager fails to handle response from global " |
| "resource broker"); |
| goto exit; |
| } |
| } |
| else |
| { |
| RB_clearResource(&(PRESPOOL->KickedContainers)); |
| } |
| |
| exit: |
| RB_handleError(res); |
| } |
| |
| void generateResourceRequestToResourceBroker(void) |
| { |
| uint64_t curtime = 0; |
| int res = FUNC_RETURN_OK; |
| |
| if ( !isCleanGRMResourceStatus() ) |
| { |
| curtime = gettime_microsec(); |
| |
| if ( PRESPOOL->Segments.NodeCount > 0 && PQUEMGR->RatioCount > 0 ) |
| { |
| refreshMemoryCoreRatioLevelUsage(curtime); |
| if ( curtime - PRESPOOL->LastResAcqTime > 1000000LL) |
| { |
| res = generateAllocRequestToBroker(); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "Resource manager fails to allocate container " |
| "from global resource manager."); |
| goto exit; |
| } |
| } |
| } |
| } |
| |
| exit: |
| RB_handleError(res); |
| |
| } |
| |
| void cleanupAllGRMContainers(void) |
| { |
| dropAllResPoolGRMContainersToToBeKicked(); |
| dropAllToAcceptGRMContainersToKicked(); |
| } |
| |
| bool cleanedAllGRMContainers(void) |
| { |
| /* Condition 1. All segments have no resource allocated. */ |
| if ( !allSegmentHasNoGRMContainersAllocated() ) |
| { |
| return false; |
| } |
| |
| if ( PRESPOOL->AddPendingContainerCount > 0 || |
| PRESPOOL->RetPendingContainerCount > 0 ) |
| { |
| elog(RMLOG, "Pending GRM container count inc %d, dec %d.", |
| PRESPOOL->AddPendingContainerCount, |
| PRESPOOL->RetPendingContainerCount); |
| } |
| /* Condition 2. No on-the-fly GRM containers for increasing and decreasing.*/ |
| return PRESPOOL->AddPendingContainerCount == 0 && |
| PRESPOOL->RetPendingContainerCount == 0; |
| } |