| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "communication/rmcomm_QD2RM.h" |
| #include "dynrm.h" |
| #include "utils/memutilities.h" |
| #include "utils/simplestring.h" |
| #include "utils/linkedlist.h" |
| #include "nodes/pg_list.h" |
| #include "commands/defrem.h" |
| #include "pgstat.h" |
| |
| #include "communication/rmcomm_MessageHandler.h" |
| #include "communication/rmcomm_SyncComm.h" |
| #include "resourcemanager/resourcemanager.h" |
| #include "resourcemanager/conntrack.h" |
| #include "resourcemanager/communication/rmcomm_MessageProtocol.h" |
| |
| #include "funcapi.h" |
| #include "fmgr.h" |
| #include "miscadmin.h" |
| |
| #define DRMQD2RM_MEMORY_CONTEXT_NAME "QD to RM communication" |
| |
| #define DRM_IPC_RETRY_TIMES 10 |
| #define DRM_IPC_RETRY_SLEEP_US 100000 |
| |
| #define DRM_IPC_RESOURCE_SET_DEF_SIZE 8 |
| #define DRM_IPC_RESOURCE_SET_MAX_SIZE 1024 /* maximum 1024 internal |
| parallel portals */ |
| |
| #define VALIDATE_RESOURCE_SET_INDEX(index, errorbuf, errorbufsize) \ |
| if ( (index) < 0 || (index) >= QD2RM_ResourceSetSize || \ |
| QD2RM_ResourceSets[(index)] == NULL ) \ |
| { \ |
| snprintf((errorbuf), (errorbufsize), \ |
| "wrong resource set index %d", (index)); \ |
| return COMM2RM_CLIENT_WRONG_INPUT; \ |
| } |
| |
| #define RPC_QD_2_RM_HEAD \ |
| initializeQD2RMComm(); \ |
| VALIDATE_RESOURCE_SET_INDEX(index, errorbuf, errorbufsize) \ |
| int res = FUNC_RETURN_OK; \ |
| static char errorbuf2[ERRORMESSAGE_SIZE]; \ |
| SelfMaintainBuffer sendbuffer = &(QD2RM_ResourceSets[index]->SendBuffer);\ |
| SelfMaintainBuffer recvbuffer = &(QD2RM_ResourceSets[index]->RecvBuffer);\ |
| errorbuf2[0] = '\0'; |
| |
| void buildManipulateResQueueRequest(SelfMaintainBuffer sendbuffer, |
| uint32_t connid, |
| char *queuename, |
| uint16_t action, |
| List *options); |
| |
| struct HeartBeatThreadArgData |
| { |
| int HostAddrLength; |
| int HostAddrSize; |
| char **HostAddrs; |
| }; |
| typedef struct HeartBeatThreadArgData HeartBeatThreadArgData; |
| typedef struct HeartBeatThreadArgData *HeartBeatThreadArg; |
| |
| void freeHeartBeatThreadArg(HeartBeatThreadArg *arg); |
| void *generateResourceRefreshHeartBeat(void *arg); |
| |
| int callSyncRPCToRM(const char *sendbuff, |
| int sendbuffsize, |
| uint16_t sendmsgid, |
| uint16_t exprecvmsgid, |
| SelfMaintainBuffer recvsmb, |
| char *errorbuf, |
| int errorbufsize); |
| |
| #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) |
| /* |
| *------------------------------------------------------------------------------ |
| * Functions for testing resource manager by playing actions. |
| *------------------------------------------------------------------------------ |
| */ |
| struct TestActionConnData |
| { |
| char ConnectionName[64]; |
| int ResourceID; |
| int32_t ConnectionID; |
| ListCell *CurAction; |
| List *Actions; |
| }; |
| typedef struct TestActionConnData TestActionConnData; |
| typedef struct TestActionConnData *TestActionConn; |
| |
| struct TestActionItemData |
| { |
| char ActionName[64]; |
| List *Arguments; |
| int ResultCode; |
| char *ResultMessage; |
| }; |
| typedef struct TestActionItemData TestActionItemData; |
| typedef struct TestActionItemData *TestActionItem; |
| |
| struct TestActionPlayData |
| { |
| List *ActionConns; |
| ListCell *ActionConnCell; |
| ListCell *ActionItemCell; |
| }; |
| |
| typedef struct TestActionPlayData TestActionPlayData; |
| typedef struct TestActionPlayData *TestActionPlay; |
| |
| #define RESOURCE_ACTION_PLAY_REGISTER "register" |
| #define RESOURCE_ACTION_PLAY_ALLOCATE "allocate" |
| #define RESOURCE_ACTION_PLAY_RETURN "return" |
| #define RESOURCE_ACTION_PLAY_UNREGISTER "unregister" |
| |
| #define RESOURCE_ACTION_PLAY_WAIT "wait" |
| #define RESOURCE_ACTION_PLAY_CREATE "create" |
| #define RESOURCE_ACTION_PLAY_REMOVE "remove" |
| |
| #define RESOURCE_ACTION_RPC_FAULT "rpcfault" |
| #define RESOURCE_ACTION_RPC_FAULT_RM "rpcrmfault" |
| #define RESOURCE_ACTION_QUOTA_PAUSE "quotapause" |
| |
| #define PG_PLAY_RESOURCE_ACTION_COLUMNS 5 |
| #define PG_PLAY_RESOURCE_ACTION_BUFSIZE 1024 |
| |
| #define RESOURCE_ACTION_PLAY_ALLOCATE_OUT "/tmp/allocate" |
| |
| int loadTestActionScript(const char *filename, List **actions); |
| int runTestActionScript(List *actions, const char *filename); |
| int findFile(const char *filename); |
| int createFile(const char *filename); |
| int removeFile(const char *filename); |
| int setResourceManagerQuotaControl(bool pause, |
| int phase, |
| char *errorbuf, |
| int errorbufsize); |
| |
| void outputAllcatedResourceToFile(const char *filename, int resourceid); |
| void *buildResourceActionPlayRowData(MCTYPE context, List *actions); |
| void freeResourceActionPlayRowData(MCTYPE context, TestActionPlay *actplay); |
| /* |
| *------------------------------------------------------------------------------ |
| * Functions for UDF of explaining resource distribution. |
| *------------------------------------------------------------------------------ |
| */ |
| |
| struct ResourceDistRowData { |
| void *pointerkey; |
| char *hostname; |
| int32_t segcount; |
| int32_t segmem; |
| double segcore; |
| char *mappedname; |
| int32_t splitcount; |
| }; |
| |
| typedef struct ResourceDistRowData ResourceDistRowData; |
| typedef struct ResourceDistRowData *ResourceDistRow; |
| |
| #define PG_EXPLAIN_RESOURCE_DISTRIBUTION_COLUMNS 6 |
| #define PG_EXPLAIN_RESOURCE_DISTRIBUTION_BUFSIZE 256 |
| |
| DQueue buildResourceDistRowData(MCTYPE context, |
| int resourceid, |
| HostnameVolumeInfo *volinfo, |
| int infosize); |
| |
| /* |
| *------------------------------------------------------------------------------ |
| * Global Variables. |
| * |
| * Postmaster side global variables saving the data not necessarily always sent |
| * from resource manager. |
| *------------------------------------------------------------------------------ |
| */ |
| |
| MemoryContext QD2RM_CommContext = NULL; |
| QDResourceContext *QD2RM_ResourceSets = NULL; |
| int QD2RM_ResourceSetSize = 0; |
| int QD2RM_ResourceSetCount = 0; |
| uint64_t QD2RM_LastRefreshResourceTime = 0; |
| |
| bool QD2RM_Initialized = false; |
| |
| pthread_t ResourceHeartBeatThreadHandle; |
| bool ResourceHeartBeatRunning = false; |
| pthread_mutex_t ResourceSetsMutex; |
| uint64_t LastSendResourceRefreshHeartBeatTime = 0; |
| |
| /** |
| * Do necessary initialization for coming RPC communication between QD and RM. |
| */ |
| void initializeQD2RMComm(void) |
| { |
| if ( QD2RM_Initialized ) |
| return; |
| |
| int res = FUNC_RETURN_OK; |
| |
| /* create dynamic resource manager instance to contain config data. */ |
| res = createDRMInstance(); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(ERROR, "Fail to initialize data structure for communicating with " |
| "resource manager."); |
| } |
| |
| |
| MEMORY_CONTEXT_SWITCH_TO(TopMemoryContext) |
| QD2RM_CommContext = AllocSetContextCreate( CurrentMemoryContext, |
| DRMQD2RM_MEMORY_CONTEXT_NAME, |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE ); |
| Assert( QD2RM_CommContext != NULL ); |
| MEMORY_CONTEXT_SWITCH_BACK |
| |
| DRMGlobalInstance->Context = QD2RM_CommContext; |
| |
| res = initializeDRMInstanceForQD(); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(ERROR, "Failed to initialize data structure for communicating with " |
| "resource manager."); |
| } |
| |
| /* Initialize global variables for maintaining a list of resource sets. */ |
| QD2RM_ResourceSets = rm_palloc0(QD2RM_CommContext, |
| sizeof(QDResourceContext) * |
| DRM_IPC_RESOURCE_SET_DEF_SIZE); |
| QD2RM_ResourceSetSize = DRM_IPC_RESOURCE_SET_DEF_SIZE; |
| QD2RM_ResourceSetCount = 0; |
| for ( int i = 0 ; i < QD2RM_ResourceSetSize ; ++i ) |
| { |
| QD2RM_ResourceSets[i] = NULL; |
| } |
| |
| initializeSyncRPCComm(); |
| |
| /* Init mutex for accessing resource sets. */ |
| if ( pthread_mutex_init(&ResourceSetsMutex, NULL) != 0 ) |
| { |
| elog(ERROR, "Fail to build mutex for communication with resource manager."); |
| } |
| |
| /* Start resource heart-beat thread. */ |
| if ( rm_session_lease_heartbeat_enable ) |
| { |
| /* Resolve resource manager server address here before creating thread. */ |
| struct hostent *rmserver = gethostbyname(master_addr_host); |
| if ( rmserver == NULL ) |
| { |
| elog(ERROR, "failed to resolve resource manager hostname %s. herror %s", |
| master_addr_host, |
| hstrerror(h_errno)); |
| } |
| |
| HeartBeatThreadArg tharg = malloc(sizeof(HeartBeatThreadArgData)); |
| tharg->HostAddrLength = rmserver->h_length; |
| tharg->HostAddrs = NULL; |
| tharg->HostAddrSize = 0; |
| |
| /* Get total and INET address count. */ |
| int addrcnt = 0; |
| while( rmserver->h_addr_list[addrcnt] != NULL ) |
| { |
| addrcnt++; |
| if ( rmserver->h_addrtype == AF_INET ) |
| { |
| tharg->HostAddrSize++; |
| } |
| } |
| elog(DEBUG3, "Resolved resource manager host %s to %d INET addresses.", |
| master_addr_host, |
| tharg->HostAddrSize); |
| |
| if ( tharg->HostAddrSize <= 0 ) |
| { |
| freeHeartBeatThreadArg(&tharg); |
| elog(ERROR, "Resource manager host %s does not have available INET " |
| "address.", |
| master_addr_host); |
| } |
| |
| tharg->HostAddrs = malloc(sizeof(char *) * tharg->HostAddrSize); |
| |
| int ineti = 0; |
| for ( int i = 0 ; i < addrcnt ; ++i ) |
| { |
| if ( rmserver->h_addrtype != AF_INET ) |
| { |
| continue; |
| } |
| tharg->HostAddrs[ineti] = malloc(sizeof(char) * tharg->HostAddrLength); |
| memcpy(tharg->HostAddrs[ineti], |
| rmserver->h_addr_list[i], |
| tharg->HostAddrLength); |
| ineti++; |
| } |
| |
| |
| /* Start heart-beat thread. */ |
| ResourceHeartBeatRunning = true; |
| if ( pthread_create(&ResourceHeartBeatThreadHandle, |
| NULL, |
| generateResourceRefreshHeartBeat, |
| tharg) != 0) |
| { |
| ResourceHeartBeatRunning = false; |
| freeHeartBeatThreadArg(&tharg); |
| elog(ERROR, "failed to create background thread for communication with " |
| "resource manager."); |
| } |
| } |
| |
| initializeMessageHandlers(); |
| |
| initializeSocketConnectionPool(); |
| |
| QD2RM_Initialized = true; |
| |
| on_proc_exit(cleanupQD2RMComm, 0); |
| } |
| |
| int createNewResourceContext(int *index) |
| { |
| initializeQD2RMComm(); |
| |
| pthread_mutex_lock(&ResourceSetsMutex); |
| /* Decide if should extend the array. The size is always doubled. */ |
| /* TODO: Limit the maximum size of the array QD2RM_ResourceSets. */ |
| if ( QD2RM_ResourceSetCount >= QD2RM_ResourceSetSize ) |
| { |
| if ( QD2RM_ResourceSetSize >= DRM_IPC_RESOURCE_SET_MAX_SIZE ) |
| { |
| pthread_mutex_unlock(&ResourceSetsMutex); |
| return COMM2RM_CLIENT_FULL_RESOURCECONTEXT; |
| } |
| QD2RM_ResourceSets = rm_repalloc(QD2RM_CommContext, |
| QD2RM_ResourceSets, |
| sizeof(QDResourceContext) * |
| QD2RM_ResourceSetSize * 2); |
| |
| for ( int i = QD2RM_ResourceSetSize ; i < QD2RM_ResourceSetSize * 2 ; ++i ) |
| { |
| QD2RM_ResourceSets[i] = NULL; |
| } |
| |
| QD2RM_ResourceSetSize = QD2RM_ResourceSetSize << 1; |
| } |
| |
| /* Find one available slot with NULL set. */ |
| int availableIndex = 0; |
| while(availableIndex < QD2RM_ResourceSetSize && |
| QD2RM_ResourceSets[availableIndex] != NULL) |
| { |
| availableIndex++; |
| } |
| |
| /* Build new instance and initialize the properties. */ |
| QDResourceContext newrs = rm_palloc0(QD2RM_CommContext, |
| sizeof(QDResourceContextData)); |
| QD2RM_ResourceSets[availableIndex] = newrs; |
| |
| newrs->QD_Conn_ID = INVALID_CONNID; |
| newrs->QD_Resource = NULL; |
| newrs->QD_SegCore = 0.0; |
| newrs->QD_SegMemoryMB = 0; |
| newrs->QD_SegCount = 0; |
| newrs->QD_HdfsHostNames = NULL; |
| newrs->QD_HostCount = 0; |
| newrs->QD_ResourceList = NULL; |
| |
| initializeSelfMaintainBuffer(&(newrs->SendBuffer), QD2RM_CommContext); |
| initializeSelfMaintainBuffer(&(newrs->RecvBuffer), QD2RM_CommContext); |
| |
| QD2RM_ResourceSetCount++; |
| *index = availableIndex; |
| |
| pthread_mutex_unlock(&ResourceSetsMutex); |
| |
| return FUNC_RETURN_OK; |
| } |
| |
| int releaseResourceContext(int index) |
| { |
| initializeQD2RMComm(); |
| |
| pthread_mutex_lock(&ResourceSetsMutex); |
| |
| if ( index < 0 || index >= QD2RM_ResourceSetSize || |
| QD2RM_ResourceSets[index] == NULL ) |
| { |
| pthread_mutex_unlock(&ResourceSetsMutex); |
| return COMM2RM_CLIENT_WRONG_INPUT; |
| } |
| |
| |
| destroySelfMaintainBuffer(&(QD2RM_ResourceSets[index]->SendBuffer)); |
| destroySelfMaintainBuffer(&(QD2RM_ResourceSets[index]->RecvBuffer)); |
| |
| rm_pfree(QD2RM_CommContext, QD2RM_ResourceSets[index]); |
| QD2RM_ResourceSets[index] = NULL; |
| |
| QD2RM_ResourceSetCount--; |
| |
| pthread_mutex_unlock(&ResourceSetsMutex); |
| |
| return FUNC_RETURN_OK; |
| |
| } |
| |
| void releaseResourceContextWithErrorReport(int index) |
| { |
| int res = releaseResourceContext(index); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg("can not release resource context"))); |
| } |
| } |
| |
| int getAllocatedResourceContext(int index, QDResourceContext *rescontext) |
| { |
| initializeQD2RMComm(); |
| |
| if ( index < 0 || index >= QD2RM_ResourceSetSize ) |
| { |
| return COMM2RM_CLIENT_WRONG_INPUT; |
| } |
| *rescontext = QD2RM_ResourceSets[index]; |
| return FUNC_RETURN_OK; |
| } |
| |
| int cleanupQD2RMComm(void) |
| { |
| int res = FUNC_RETURN_OK; |
| char errorbuf[ERRORMESSAGE_SIZE]; |
| |
| elog(LOG, "Clean up communication to resource manager now."); |
| |
| initializeQD2RMComm(); |
| |
| pthread_mutex_lock(&ResourceSetsMutex); |
| for ( int i = 0 ; i < QD2RM_ResourceSetSize ; ++i ) |
| { |
| if ( QD2RM_ResourceSets[i] != NULL ) |
| { |
| if ( QD2RM_ResourceSets[i]->QD_ResourceList != NULL ) |
| { |
| elog(WARNING, "Un-returned resource is probed, will be returned. " |
| "(%d MB, %lf CORE) x %d. Conn ID=%d", |
| QD2RM_ResourceSets[i]->QD_SegMemoryMB, |
| QD2RM_ResourceSets[i]->QD_SegCore, |
| QD2RM_ResourceSets[i]->QD_SegCount, |
| QD2RM_ResourceSets[i]->QD_Conn_ID); |
| errorbuf[0] = '\0'; |
| res = returnResource(i, errorbuf, sizeof(errorbuf)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "%s", errorbuf); |
| } |
| errorbuf[0] = '\0'; |
| res = unregisterConnectionInRM(i, errorbuf, sizeof(errorbuf)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "%s", errorbuf); |
| } |
| } |
| } |
| } |
| pthread_mutex_unlock(&ResourceSetsMutex); |
| pthread_mutex_destroy(&ResourceSetsMutex); |
| |
| if (ResourceHeartBeatRunning) |
| { |
| ResourceHeartBeatRunning = false; |
| res = pthread_join(ResourceHeartBeatThreadHandle, NULL); |
| if ( res != FUNC_RETURN_OK ) { |
| elog(WARNING, "Fail to cancel resource heartbeat thread."); |
| } |
| } |
| |
| return FUNC_RETURN_OK; |
| } |
| |
| /* |
| * REGISTER CONNECTION by USER NAME. |
| */ |
| int registerConnectionInRMByStr(int index, |
| const char *userid, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| RPC_QD_2_RM_HEAD |
| |
| /* Build request. */ |
| resetSelfMaintainBuffer(sendbuffer); |
| appendSMBStr(sendbuffer, userid); |
| appendSelfMaintainBufferTill64bitAligned(sendbuffer); |
| |
| /* Call RPC. */ |
| res = callSyncRPCToRM(SMBUFF_CONTENT(sendbuffer), |
| getSMBContentSize(sendbuffer), |
| REQUEST_QD_CONNECTION_REG, |
| RESPONSE_QD_CONNECTION_REG, |
| recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, |
| "failed to register in resource manager, %s", |
| errorbuf2); |
| return res; |
| } |
| |
| /* Parse response. */ |
| RPCResponseRegisterConnectionInRMByStr response = |
| SMBUFF_HEAD(RPCResponseRegisterConnectionInRMByStr, |
| recvbuffer); |
| |
| QD2RM_ResourceSets[index]->QD_Conn_ID = response->ConnID; |
| if ( response->Result != FUNC_RETURN_OK ) |
| { |
| char *errorstr = SMBUFF_CONTENT(recvbuffer) + |
| sizeof(RPCResponseRegisterConnectionInRMByStrData); |
| snprintf(errorbuf, errorbufsize, |
| "failed to register in resource manager, %s", |
| errorstr); |
| return response->Result; |
| } |
| |
| elog(LOG, "ConnID %d. Registered in HAWQ resource manager.", |
| QD2RM_ResourceSets[index]->QD_Conn_ID); |
| return FUNC_RETURN_OK; |
| } |
| |
| /* |
| * REGISTER CONNECTION by USER OID. |
| */ |
| int registerConnectionInRMByOID(int index, |
| uint64_t useridoid, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| RPC_QD_2_RM_HEAD |
| |
| /* Build request. */ |
| resetSelfMaintainBuffer(sendbuffer); |
| RPCRequestRegisterConnectionInRMByOIDData requesthead; |
| requesthead.UseridOid = useridoid; |
| |
| appendSMBVar(sendbuffer,requesthead); |
| |
| /* Call RPC to get response. */ |
| res = callSyncRPCToRM(SMBUFF_CONTENT(sendbuffer), |
| getSMBContentSize(sendbuffer), |
| REQUEST_QD_CONNECTION_REG_OID, |
| RESPONSE_QD_CONNECTION_REG_OID, |
| recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, |
| "failed to register in resource manager, %s", |
| errorbuf2); |
| return res; |
| } |
| |
| /* Parse response. */ |
| RPCResponseRegisterConnectionInRMByOID response = |
| SMBUFF_HEAD(RPCResponseRegisterConnectionInRMByOID, |
| recvbuffer); |
| |
| QD2RM_ResourceSets[index]->QD_Conn_ID = response->ConnID; |
| if ( response->Result != FUNC_RETURN_OK ) |
| { |
| char *errorstr = SMBUFF_CONTENT(recvbuffer) + |
| sizeof(RPCResponseRegisterConnectionInRMByOIDData); |
| snprintf(errorbuf, errorbufsize, |
| "failed to register in resource manager, %s", |
| errorstr); |
| return response->Result; |
| } |
| |
| elog(LOG, "ConnID %d. Registered in HAWQ resource manager (By OID)", |
| QD2RM_ResourceSets[index]->QD_Conn_ID); |
| return FUNC_RETURN_OK; |
| } |
| |
| /* |
| * UNREGISTER CONNECTION. |
| */ |
| int unregisterConnectionInRM(int index, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| RPC_QD_2_RM_HEAD |
| |
| /* Build request. */ |
| RPCRequestHeadUnregisterConnectionInRMData request; |
| request.ConnID =QD2RM_ResourceSets[index]->QD_Conn_ID; |
| |
| resetSelfMaintainBuffer(sendbuffer); |
| appendSMBVar(sendbuffer,request); |
| |
| /* Call RPC to get response. */ |
| res = callSyncRPCToRM(SMBUFF_CONTENT(sendbuffer), |
| getSMBContentSize(sendbuffer), |
| REQUEST_QD_CONNECTION_UNREG, |
| RESPONSE_QD_CONNECTION_UNREG, |
| recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, |
| "failed to unregister from resource manager, %s", |
| errorbuf2); |
| return res; |
| } |
| |
| /* Parse response. */ |
| |
| RPCResponseUnregisterConnectionInRM response = |
| SMBUFF_HEAD(RPCResponseUnregisterConnectionInRM, |
| recvbuffer); |
| if ( response->Result != FUNC_RETURN_OK ) |
| { |
| char *errorstr = SMBUFF_CONTENT(recvbuffer) + |
| sizeof(RPCResponseUnregisterConnectionInRMData); |
| snprintf(errorbuf, errorbufsize, |
| "failed to unregister from resource manager, %s", |
| errorstr); |
| return response->Result; |
| } |
| |
| elog(LOG, "ConnID %d. Unregistered from HAWQ resource manager.", |
| QD2RM_ResourceSets[index]->QD_Conn_ID); |
| |
| QD2RM_ResourceSets[index]->QD_Conn_ID = INVALID_CONNID; |
| return FUNC_RETURN_OK; |
| } |
| |
| void unregisterConnectionInRMWithErrorReport(int index) |
| { |
| static char errorbuf[ERRORMESSAGE_SIZE]; |
| errorbuf[0] = '\0'; |
| int res = unregisterConnectionInRM(index, errorbuf, sizeof(errorbuf)); |
| if (res != FUNC_RETURN_OK) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s",errorbuf))); |
| } |
| } |
| |
| /* |
| * ACQUIRE QUERY RESOURCE |
| */ |
| int acquireResourceFromRM(int index, |
| int sessionid, |
| int slice_size, |
| int64_t iobytes, |
| HostnameVolumeInfo *preferred_nodes, |
| int preferred_nodes_size, |
| uint32_t max_seg_count_fix, |
| uint32_t min_seg_count_fix, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| RPC_QD_2_RM_HEAD |
| |
| QDResourceContext curcontext = QD2RM_ResourceSets[index]; |
| uint32_t nodecount = (preferred_nodes == NULL || |
| preferred_nodes_size == 0) ? |
| 0 : |
| preferred_nodes_size; |
| |
| elog(RMLOG, "ConnID: %d. Acquire resource request for index %d. " |
| "Max vseg size %d Min vseg size %d Estimated slice size %d " |
| "estimated IO bytes size " INT64_FORMAT " Preferred node count %d.", |
| curcontext->QD_Conn_ID, |
| index, |
| max_seg_count_fix, |
| min_seg_count_fix, |
| slice_size, |
| iobytes, |
| nodecount); |
| |
| /* Build request. */ |
| resetSelfMaintainBuffer(sendbuffer); |
| /********** STEP1. Request message head ***********************************/ |
| RPCRequestHeadAcquireResourceFromRMData requesthead; |
| requesthead.SessionID = sessionid; |
| requesthead.ConnID = curcontext->QD_Conn_ID; |
| requesthead.NodeCount = nodecount; |
| requesthead.MaxSegCountFix = max_seg_count_fix; |
| requesthead.MinSegCountFix = min_seg_count_fix; |
| requesthead.SliceSize = slice_size; |
| requesthead.VSegLimitPerSeg = rm_nvseg_perquery_perseg_limit; |
| requesthead.VSegLimit = rm_nvseg_perquery_limit; |
| requesthead.Reserved = 0; |
| requesthead.IOBytes = iobytes; |
| requesthead.StatNVSeg = rm_stmt_nvseg; |
| |
| requesthead.StatVSegMemoryMB = 0; |
| int parseres = FUNC_RETURN_OK; |
| SimpString valuestr; |
| setSimpleStringRef(&valuestr, rm_stmt_vseg_mem_str, strlen(rm_stmt_vseg_mem_str)); |
| parseres = SimpleStringToStorageSizeMB(&valuestr, |
| &(requesthead.StatVSegMemoryMB)); |
| Assert(parseres == FUNC_RETURN_OK); |
| |
| appendSMBVar(sendbuffer,requesthead); |
| |
| /********** STEP2. Preferred node scan size in MB *************************/ |
| /* Send each host scan size in MB. */ |
| for ( int i = 0 ; i < nodecount ; ++i ) |
| { |
| appendSMBVar(sendbuffer,preferred_nodes[i].datavolume); |
| } |
| |
| /********** STEP3. Preferred node host names ******************************/ |
| /* Send host names. splitted by '\0' */ |
| for ( int i = 0 ; i < nodecount ; ++i ) |
| { |
| appendSMBStr(sendbuffer,preferred_nodes[i].hostname); |
| } |
| /* send pad to ensure 64-bit aligned. */ |
| appendSelfMaintainBufferTill64bitAligned(sendbuffer); |
| |
| pgstat_report_waiting_resource(true); |
| |
| /* Call RPC to get response. */ |
| res = callSyncRPCToRM(SMBUFF_CONTENT(sendbuffer), |
| getSMBContentSize(sendbuffer), |
| REQUEST_QD_ACQUIRE_RESOURCE, |
| RESPONSE_QD_ACQUIRE_RESOURCE, |
| recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, |
| "failed to acquire resource from resource manager, %s", |
| errorbuf2); |
| pgstat_report_waiting_resource(false); |
| return res; |
| } |
| pgstat_report_waiting_resource(false); |
| |
| RPCResponseAcquireResourceFromRMERROR errres = |
| SMBUFF_HEAD(RPCResponseAcquireResourceFromRMERROR, |
| recvbuffer); |
| |
| if ( errres->Result != FUNC_RETURN_OK ) |
| { |
| char *errorstr = SMBUFF_CONTENT(recvbuffer) + |
| sizeof(RPCResponseAcquireResourceFromRMERRORData); |
| snprintf(errorbuf, errorbufsize, |
| "failed to acquire resource from resource manager, %s", |
| errorstr); |
| return errres->Result; |
| } |
| |
| /* Parse response. */ |
| RPCResponseHeadAcquireResourceFromRM response = |
| SMBUFF_HEAD(RPCResponseHeadAcquireResourceFromRM, |
| recvbuffer); |
| |
| curcontext->QD_SegCount = response->SegCount; |
| curcontext->QD_SegMemoryMB = response->SegMemoryMB; |
| curcontext->QD_SegCore = response->SegCore; |
| curcontext->QD_Resource = (char *)response; |
| curcontext->QD_HostCount = response->HostCount; |
| |
| if ( curcontext->QD_SegCount > 0 ) |
| { |
| /* Build local HDFS hostname array that will be referenced by QDMachineId |
| * instances. */ |
| if ( nodecount > 0 ) |
| { |
| curcontext->QD_HdfsHostNames = |
| (char **)rm_palloc0(QD2RM_CommContext, |
| sizeof(char *) * nodecount); |
| int hnameidx = 0; |
| for ( int i = 0 ; i < nodecount ; ++i ) |
| { |
| HostnameVolumeInfo *info = &preferred_nodes[i]; |
| int hostnamelen = strlen(info->hostname); |
| curcontext->QD_HdfsHostNames[hnameidx] = |
| (char *)rm_palloc0(QD2RM_CommContext, hostnamelen + 1); |
| strncpy(curcontext->QD_HdfsHostNames[hnameidx], |
| info->hostname, |
| hostnamelen); |
| hnameidx++; |
| } |
| } |
| |
| /* Get block of hdfs hostname index array. */ |
| uint32_t *hnameidxarray = |
| (uint32_t *)(SMBUFF_CONTENT(recvbuffer) + |
| sizeof(RPCResponseHeadAcquireResourceFromRMData)); |
| uint32_t hnameidxarraysize = |
| __SIZE_ALIGN64(sizeof(uint32_t) * curcontext->QD_SegCount); |
| |
| /* Get block of machine id instance offset array. */ |
| uint32_t *hoffsetarray = |
| (uint32_t *)(SMBUFF_CONTENT(recvbuffer) + |
| sizeof(RPCResponseHeadAcquireResourceFromRMData) + |
| hnameidxarraysize); |
| |
| /* This is an array of pointers of MachineId. */ |
| curcontext->QD_ResourceList = (QDSegInfo *) |
| rm_palloc0(QD2RM_CommContext, |
| sizeof(QDSegInfo) * |
| curcontext->QD_SegCount); |
| |
| for ( int i = 0 ; i < curcontext->QD_SegCount ; ++i ) |
| { |
| QDSegInfo newqdseg = (QDSegInfo) |
| rm_palloc0(QD2RM_CommContext, |
| sizeof(QDSegInfoData)); |
| newqdseg->QD_HdfsHostName = (hnameidxarray[i] < nodecount) ? |
| curcontext->QD_HdfsHostNames[hnameidxarray[i]] : |
| NULL; |
| newqdseg->QD_SegInfo = (SegInfo)(SMBUFF_CONTENT(recvbuffer) + |
| hoffsetarray[i]); |
| curcontext->QD_ResourceList[i] = newqdseg; |
| |
| if ( log_min_messages == DEBUG5 ) |
| { |
| SelfMaintainBufferData segreport; |
| initializeSelfMaintainBuffer(&segreport, QD2RM_CommContext); |
| generateSegInfoReport(curcontext->QD_ResourceList[i]->QD_SegInfo, |
| &segreport); |
| elog(RMLOG, "Recognized resource on host. %s. " |
| "Mapped original HDFS host name %s", |
| SMBUFF_CONTENT(&segreport), |
| (curcontext->QD_ResourceList[i]->QD_HdfsHostName != NULL ? |
| curcontext->QD_ResourceList[i]->QD_HdfsHostName : |
| "UNSET")); |
| destroySelfMaintainBuffer(&segreport); |
| } |
| } |
| elog(LOG, "ConnID %d. Acquired resource from resource manager, " |
| "(%d MB, %lf CORE) x %d.", |
| curcontext->QD_Conn_ID, |
| curcontext->QD_SegMemoryMB, |
| curcontext->QD_SegCore, |
| curcontext->QD_SegCount); |
| } |
| else |
| { |
| Assert( false ); |
| } |
| return FUNC_RETURN_OK; |
| } |
| |
| bool alreadyReturnedResource(int index) |
| { |
| initializeQD2RMComm(); |
| |
| Assert( index >= 0 && index < QD2RM_ResourceSetSize ); |
| return QD2RM_ResourceSets[index] == NULL; |
| } |
| |
| /* |
| * RETURN QUERY RESOURCE |
| */ |
| int returnResource(int index, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| RPC_QD_2_RM_HEAD |
| |
| /* Build request. */ |
| RPCRequestHeadReturnResourceData requesthead; |
| requesthead.ConnID = QD2RM_ResourceSets[index]->QD_Conn_ID; |
| requesthead.Reserved = 0; |
| |
| resetSelfMaintainBuffer(sendbuffer); |
| appendSMBVar(sendbuffer,requesthead); |
| appendSelfMaintainBufferTill64bitAligned(sendbuffer); |
| |
| /* Call RPC to get response. */ |
| res = callSyncRPCToRM(SMBUFF_CONTENT(sendbuffer), |
| getSMBContentSize(sendbuffer), |
| REQUEST_QD_RETURN_RESOURCE, |
| RESPONSE_QD_RETURN_RESOURCE, |
| recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, |
| "failed to return resource to resource manager, %s", |
| errorbuf2); |
| return res; |
| } |
| |
| /* Parse response. */ |
| RPCResponseHeadReturnResource response = |
| SMBUFF_HEAD(RPCResponseHeadReturnResource, |
| recvbuffer); |
| |
| if ( response->Result != FUNC_RETURN_OK ) |
| { |
| char *errorstr = SMBUFF_CONTENT(recvbuffer) + |
| sizeof(RPCResponseHeadReturnResourceData); |
| snprintf(errorbuf, errorbufsize, |
| "failed to return resource to resource manager, %s", |
| errorstr); |
| return response->Result; |
| } |
| |
| QD2RM_ResourceSets[index]->QD_SegMemoryMB = 0; |
| QD2RM_ResourceSets[index]->QD_SegCore = 0.0; |
| QD2RM_ResourceSets[index]->QD_SegCount = 0; |
| |
| if ( QD2RM_ResourceSets[index]->QD_ResourceList != NULL ) |
| { |
| rm_pfree(QD2RM_CommContext, QD2RM_ResourceSets[index]->QD_ResourceList); |
| } |
| |
| QD2RM_ResourceSets[index]->QD_Resource = NULL; |
| QD2RM_ResourceSets[index]->QD_ResourceList = NULL; |
| |
| elog(LOG, "ConnID %d. Returned resource to resource manager.", |
| QD2RM_ResourceSets[index]->QD_Conn_ID); |
| |
| return FUNC_RETURN_OK; |
| } |
| |
| int hasAllocatedResource(int index, bool *allocated) |
| { |
| initializeQD2RMComm(); |
| |
| static char errorbuf[ERRORMESSAGE_SIZE]; |
| /* Validate index */ |
| errorbuf[0] = '\0'; |
| VALIDATE_RESOURCE_SET_INDEX(index, errorbuf, sizeof(errorbuf)) |
| |
| *allocated = QD2RM_ResourceSets[index]->QD_SegCount > 0; |
| return FUNC_RETURN_OK; |
| } |
| |
| /* |
| * MANIPULATE RESOURCE QUEUE |
| * For CREATE RESORUCE QUEUE, "withoutliststart" is the last one. |
| * For ALTER RESOURCE QUEUE, "withoutliststart" may has more consequent nodes. |
| */ |
| int manipulateResourceQueue(int index, |
| char *queuename, |
| uint16_t action, |
| List *options, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| RPC_QD_2_RM_HEAD |
| |
| /* Build request. */ |
| buildManipulateResQueueRequest(sendbuffer, |
| QD2RM_ResourceSets[index]->QD_Conn_ID, |
| queuename, |
| action, |
| options); |
| |
| /* Call RPC to get response. */ |
| res = callSyncRPCToRM(SMBUFF_CONTENT(sendbuffer), |
| getSMBContentSize(sendbuffer), |
| REQUEST_QD_DDL_MANIPULATERESQUEUE, |
| RESPONSE_QD_DDL_MANIPULATERESQUEUE, |
| recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, "%s", errorbuf2); |
| return res; |
| } |
| |
| /*Start parsing response. */ |
| RPCResponseHeadManipulateResQueue response = |
| SMBUFF_HEAD(RPCResponseHeadManipulateResQueue, recvbuffer); |
| |
| /* CASE 1. The response contains error message. */ |
| if ( response->Result != FUNC_RETURN_OK ) |
| { |
| RPCResponseHeadManipulateResQueueERROR error = |
| SMBUFF_HEAD(RPCResponseHeadManipulateResQueueERROR, recvbuffer); |
| |
| elog(LOG, "Fail to manipulate resource queue because %s", |
| error->ErrorText); |
| snprintf(errorbuf, errorbufsize, "%s", error->ErrorText); |
| } |
| |
| elog(DEBUG3, "Manipulated resource queue and got result %d", response->Result); |
| |
| return response->Result; |
| } |
| |
| int manipulateRoleForResourceQueue (int index, |
| Oid roleid, |
| Oid queueid, |
| uint16_t action, |
| uint8_t isSuperUser, |
| char *rolename, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| static char errorbuf2[ERRORMESSAGE_SIZE]; |
| |
| initializeQD2RMComm(); |
| |
| Assert(queueid != -1); |
| Assert(action == MANIPULATE_ROLE_RESQUEUE_CREATE || |
| action == MANIPULATE_ROLE_RESQUEUE_ALTER || |
| action == MANIPULATE_ROLE_RESQUEUE_DROP); |
| |
| int res = FUNC_RETURN_OK; |
| SelfMaintainBuffer sendbuffer = &(QD2RM_ResourceSets[index]->SendBuffer); |
| SelfMaintainBuffer recvbuffer = &(QD2RM_ResourceSets[index]->RecvBuffer); |
| |
| RPCRequestHeadManipulateRoleData request; |
| |
| resetSelfMaintainBuffer(sendbuffer); |
| prepareSelfMaintainBuffer(sendbuffer, |
| sizeof(RPCRequestHeadManipulateRoleData), |
| true); |
| |
| memset(&request, 0, sizeof(RPCRequestHeadManipulateRoleData)); |
| request.QueueOID = queueid; |
| request.RoleOID = roleid; |
| request.isSuperUser = isSuperUser; |
| request.Action = action; |
| if (strlen(rolename) < sizeof(request.Name)) |
| { |
| strncpy(request.Name, rolename, strlen(rolename)); |
| } |
| else |
| { |
| elog(WARNING, "Resource manager finds in valid role name %s.", rolename); |
| snprintf(errorbuf, errorbufsize, "invalid role name %s.", rolename); |
| return RESQUEMGR_NO_USERID; |
| } |
| |
| elog(DEBUG3, "Resource manager (manipulateRoleForResourceQueue) " |
| "role oid:%d, queueID:%d, isSuper:%d, roleName:%s, action:%d", |
| request.RoleOID, request.QueueOID, request.isSuperUser, |
| request.Name, request.Action); |
| |
| appendSMBVar(sendbuffer, request); |
| |
| errorbuf2[0] = '\0'; |
| res = callSyncRPCToRM(SMBUFF_CONTENT(sendbuffer), |
| getSMBContentSize(sendbuffer), |
| REQUEST_QD_DDL_MANIPULATEROLE, |
| RESPONSE_QD_DDL_MANIPULATEROLE, |
| recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, "%s", errorbuf2); |
| return res; |
| } |
| |
| /* Start parsing response. */ |
| RPCResponseHeadManipulateRole response = |
| SMBUFF_HEAD(RPCResponseHeadManipulateRole, recvbuffer); |
| |
| /* The response contains error message. */ |
| if ( response->Result != FUNC_RETURN_OK ) |
| { |
| RPCResponseHeadManipulateRoleERROR error = |
| SMBUFF_HEAD(RPCResponseHeadManipulateRoleERROR, recvbuffer); |
| |
| elog(WARNING, "Resource manager failed to manipulate role %s. %s", |
| rolename, |
| error->ErrorText); |
| snprintf(errorbuf, errorbufsize, "%s", error->ErrorText); |
| } |
| return response->Result; |
| } |
| |
| void buildManipulateResQueueRequest(SelfMaintainBuffer sendbuffer, |
| uint32_t connid, |
| char *queuename, |
| uint16_t action, |
| List *options) |
| { |
| Assert( sendbuffer != NULL ); |
| Assert( connid != 0XFFFFFFFF ); |
| Assert( queuename != NULL ); |
| Assert( action >= MANIPULATE_RESQUEUE_CREATE && |
| action <= MANIPULATE_RESQUEUE_DROP ); |
| |
| uint16_t withlength = 0; |
| bool nowIsWithOption = false; |
| bool need_free_value = false; |
| ListCell *option = NULL; |
| |
| resetSelfMaintainBuffer(sendbuffer); |
| prepareSelfMaintainBuffer(sendbuffer, |
| sizeof(RPCRequestHeadManipulateResQueueData), |
| true); |
| |
| /* Build request head information. */ |
| RPCRequestHeadManipulateResQueue requestheadptr = |
| SMBUFF_HEAD(RPCRequestHeadManipulateResQueue, sendbuffer); |
| |
| requestheadptr->ConnID = connid; |
| requestheadptr->ManipulateAction = action; |
| |
| jumpforwardSelfMaintainBuffer(sendbuffer, |
| sizeof(RPCRequestHeadManipulateResQueueData)); |
| |
| /* Build queue name string. */ |
| appendSMBStr(sendbuffer,queuename); |
| |
| /* Build request with and without attribute list information. In case DROP |
| * RESOURCE QUEUE, there is no options passed in. */ |
| if ( options != NULL ) { |
| foreach(option, options) |
| { |
| DefElem *defel = (DefElem *) lfirst(option); |
| if ( strcmp(defel->defname, WITHLISTSTART_TAG) == 0 ) { |
| nowIsWithOption = true; |
| continue; |
| } |
| |
| if ( strcmp(defel->defname, WITHOUTLISTSTART_TAG) == 0 ) { |
| nowIsWithOption = false; |
| continue; |
| } |
| |
| /* Count how many options are WITH options. The left ones must be |
| * WITHOUT options. This is not checked here, Parser guarantees this |
| * order. */ |
| withlength = withlength + (nowIsWithOption ? 1 : 0); |
| |
| /* Convert defname to lower case. */ |
| for ( int i = 0 ; defel->defname[i] != '\0' ; ++i ) { |
| defel->defname[i] = tolower(defel->defname[i]); |
| } |
| |
| /* Append attribute keyword string. */ |
| appendSMBStr(sendbuffer,defel->defname); |
| |
| /* Append attribute value string. */ |
| if ( nowIsWithOption ) { |
| char *attrvalue = defGetString(defel, &need_free_value); |
| appendSMBStr(sendbuffer, attrvalue); |
| elog(DEBUG3, "added attribute value string %s", attrvalue); |
| } |
| } |
| } |
| |
| appendSelfMaintainBufferTill64bitAligned(sendbuffer); |
| |
| /* Update with actual with list size. */ |
| requestheadptr = SMBUFF_HEAD(RPCRequestHeadManipulateResQueue, sendbuffer); |
| requestheadptr->WithAttrLength = withlength; |
| |
| elog(DEBUG3, "WITH length is %d.", withlength); |
| |
| Assert((getSMBContentSize(sendbuffer) & 0X7) == 0 ); |
| } |
| |
| void sendFailedNodeToResourceManager(int hostNum, char **pghost) |
| { |
| static char errorbuf2[ERRORMESSAGE_SIZE]; |
| |
| initializeQD2RMComm(); |
| |
| int res = FUNC_RETURN_OK; |
| SelfMaintainBufferData sendbuffer; |
| SelfMaintainBufferData recvbuffer; |
| initializeSelfMaintainBuffer(&sendbuffer, QD2RM_CommContext); |
| initializeSelfMaintainBuffer(&recvbuffer, QD2RM_CommContext); |
| |
| for ( int i = 0 ; i < hostNum ; ++i ) |
| { |
| appendSMBStr(&sendbuffer, pghost[i]); |
| elog(LOG, "Dispatcher thinks %s is down.", pghost[i]); |
| } |
| appendSelfMaintainBufferTill64bitAligned(&sendbuffer); |
| |
| elog(LOG, "Dispatcher sends %d failed host(s) to resource manager.", |
| hostNum); |
| |
| errorbuf2[0] = '\0'; |
| res = callSyncRPCToRM(SMBUFF_CONTENT(&sendbuffer), |
| getSMBContentSize(&sendbuffer), |
| REQUEST_QD_SEGMENT_ISDOWN, |
| RESPONSE_QD_SEGMENT_ISDOWN, |
| &recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(WARNING, "Fail to get response from resource manager RPC. %s", |
| errorbuf2); |
| goto exit; |
| } |
| |
| elog(LOG, "Succeed in sending failed host to resource manager."); |
| |
| exit: |
| destroySelfMaintainBuffer(&sendbuffer); |
| destroySelfMaintainBuffer(&recvbuffer); |
| } |
| |
| /* |
| int getLocalTmpDirFromMasterRM(char *errorbuf, int errorbufsize) |
| { |
| static char errorbuf2[ERRORMESSAGE_SIZE]; |
| initializeQD2RMComm(); |
| |
| int res = FUNC_RETURN_OK; |
| SelfMaintainBufferData sendbuffer; |
| SelfMaintainBufferData recvbuffer; |
| initializeSelfMaintainBuffer(&sendbuffer, QD2RM_CommContext); |
| initializeSelfMaintainBuffer(&recvbuffer, QD2RM_CommContext); |
| |
| RPCRequestTmpDirForQDData request; |
| request.Reserved = 0; |
| appendSMBVar(&sendbuffer, request); |
| |
| errorbuf2[0] = '\0'; |
| res = callSyncRPCToRM(SMBUFF_CONTENT(&sendbuffer), |
| getSMBContentSize(&sendbuffer), |
| REQUEST_QD_TMPDIR, |
| RESPONSE_QD_TMPDIR, |
| &recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, |
| "failed to get temporary directory from resource manager, %s", |
| errorbuf2); |
| goto exit; |
| } |
| |
| RPCResponseTmpDirForQD response = SMBUFF_HEAD(RPCResponseTmpDirForQD, |
| &recvbuffer); |
| if ( response->Result != FUNC_RETURN_OK ) |
| { |
| char *errorstr = SMBUFF_CONTENT(&recvbuffer) + |
| sizeof(RPCResponseTmpDirForQDData); |
| snprintf(errorbuf, errorbufsize, |
| "failed to get temporary directory from resource manager, %s", |
| errorstr); |
| res = response->Result; |
| goto exit; |
| } |
| |
| LocalTempPath = pstrdup(response->tmpdir); |
| elog(LOG, "Got temporary directory %s", LocalTempPath); |
| |
| exit: |
| destroySelfMaintainBuffer(&sendbuffer); |
| destroySelfMaintainBuffer(&recvbuffer); |
| return res; |
| } |
| */ |
| |
| int acquireResourceQuotaFromRM(int64_t user_oid, |
| uint32_t max_seg_count_fix, |
| uint32_t min_seg_count_fix, |
| char *errorbuf, |
| int errorbufsize, |
| uint32_t *seg_num, |
| uint32_t *seg_num_min, |
| uint32_t *seg_memory_mb, |
| double *seg_core) |
| { |
| static char errorbuf2[ERRORMESSAGE_SIZE]; |
| |
| initializeQD2RMComm(); |
| |
| int res = FUNC_RETURN_OK; |
| SelfMaintainBufferData sendbuffer; |
| SelfMaintainBufferData recvbuffer; |
| initializeSelfMaintainBuffer(&sendbuffer, QD2RM_CommContext); |
| initializeSelfMaintainBuffer(&recvbuffer, QD2RM_CommContext); |
| |
| RPCRequestHeadAcquireResourceQuotaFromRMByOIDData request; |
| request.UseridOid = user_oid; |
| request.MaxSegCountFix = max_seg_count_fix; |
| request.MinSegCountFix = min_seg_count_fix; |
| request.VSegLimitPerSeg = rm_nvseg_perquery_perseg_limit; |
| request.VSegLimit = rm_nvseg_perquery_limit; |
| request.StatNVSeg = rm_stmt_nvseg; |
| |
| request.StatVSegMemoryMB = 0; |
| int parseres = FUNC_RETURN_OK; |
| SimpString valuestr; |
| setSimpleStringRef(&valuestr, rm_stmt_vseg_mem_str, strlen(rm_stmt_vseg_mem_str)); |
| parseres = SimpleStringToStorageSizeMB(&valuestr, |
| &(request.StatVSegMemoryMB)); |
| Assert(parseres == FUNC_RETURN_OK); |
| |
| appendSMBVar(&sendbuffer, request); |
| |
| elog(DEBUG3, "HAWQ RM :: Acquire resource quota for query with %d splits, " |
| "%d preferred virtual segments by user "INT64_FORMAT, |
| max_seg_count_fix, |
| min_seg_count_fix, |
| user_oid); |
| |
| errorbuf2[0] = '\0'; |
| res = callSyncRPCToRM(SMBUFF_CONTENT(&sendbuffer), |
| getSMBContentSize(&sendbuffer), |
| REQUEST_QD_ACQUIRE_RESOURCE_QUOTA, |
| RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA, |
| &recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, |
| "failed to get resource quota from resource manager, %s", |
| errorbuf2); |
| goto exit; |
| } |
| |
| RPCResponseHeadAcquireResourceQuotaFromRMByOID response = |
| SMBUFF_HEAD(RPCResponseHeadAcquireResourceQuotaFromRMByOID, &recvbuffer); |
| if ( response->Result == FUNC_RETURN_OK ) |
| { |
| *seg_num = response->SegNum; |
| *seg_num_min = response->SegNumMin; |
| *seg_memory_mb = response->SegMemoryMB; |
| *seg_core = response->SegCore; |
| } |
| else |
| { |
| res = response->Result; |
| char *errorstr = SMBUFF_CONTENT(&recvbuffer) + |
| sizeof(RPCResponseHeadAcquireResourceQuotaFromRMByOIDData); |
| |
| snprintf(errorbuf, errorbufsize, |
| "failed to get resource quota from resource manager, %s", |
| errorstr); |
| } |
| |
| exit: |
| destroySelfMaintainBuffer(&sendbuffer); |
| destroySelfMaintainBuffer(&recvbuffer); |
| return res; |
| } |
| |
| #define DEFAULT_HEARTBEAT_BUFFER 4096 |
| |
| void *generateResourceRefreshHeartBeat(void *arg) |
| { |
| static char messagehead[16] = {'M' ,'S' ,'G' ,'S' ,'T' ,'A' ,'R' ,'T' , |
| '\0','\0','\0','\0','\0','\0','\0','\0'}; |
| static char messagetail[8] = {'M' ,'S' ,'G' ,'E' ,'N' ,'D' ,'S' ,'!' }; |
| |
| int fd = -1; |
| int sleepTimes = 0; |
| |
| HeartBeatThreadArg tharg = arg; |
| Assert(arg != NULL); |
| |
| SelfMaintainBufferData sendbuffer; |
| SelfMaintainBufferData contbuffer; |
| |
| gp_set_thread_sigmasks(); |
| |
| initializeSelfMaintainBuffer(&sendbuffer, NULL); |
| initializeSelfMaintainBuffer(&contbuffer, NULL); |
| prepareSelfMaintainBuffer(&sendbuffer, DEFAULT_HEARTBEAT_BUFFER, true); |
| prepareSelfMaintainBuffer(&contbuffer, DEFAULT_HEARTBEAT_BUFFER, true); |
| |
| while( ResourceHeartBeatRunning ) |
| { |
| #define SLEEP_TIME (0.1) // in seconds |
| // the following check is used to avoid sleeping too long time |
| // without checking ResourceHeartBeatRunning |
| if (sleepTimes * SLEEP_TIME < rm_session_lease_heartbeat_interval) { |
| pg_usleep(SLEEP_TIME * 1000000L); |
| sleepTimes++; |
| } else { |
| sleepTimes = 0; |
| |
| resetSelfMaintainBuffer(&sendbuffer); |
| resetSelfMaintainBuffer(&contbuffer); |
| bool sendcontent = false; |
| |
| /* Lock to access array of resource sets */ |
| pthread_mutex_lock(&ResourceSetsMutex); |
| |
| RPCRequestHeadRefreshResourceHeartBeatData request; |
| request.ConnIDCount = QD2RM_ResourceSetCount; |
| request.Reserved = 0; |
| appendSMBVar(&contbuffer, request); |
| |
| /* Get all current in-use resource set IDs and build into request. */ |
| for ( int i = 0 ; i < QD2RM_ResourceSetSize ; ++i ) { |
| if ( QD2RM_ResourceSets[i] == NULL || |
| QD2RM_ResourceSets[i]->QD_Conn_ID == INVALID_CONNID ) |
| { |
| continue; |
| } |
| appendSMBVar(&contbuffer, QD2RM_ResourceSets[i]->QD_Conn_ID); |
| sendcontent = true; |
| } |
| /* Unlock */ |
| pthread_mutex_unlock(&ResourceSetsMutex); |
| |
| /* Build final request content and send out. */ |
| appendSelfMaintainBufferTill64bitAligned(&contbuffer); |
| |
| if ( sendcontent ) |
| { |
| /* Connect to server only when necessary. */ |
| if ( fd < 0 ) |
| { |
| /* Connect to resource manager server. */ |
| struct sockaddr_in server_addr; |
| fd = socket(AF_INET, SOCK_STREAM, 0); |
| if ( fd < 0 ) |
| { |
| write_log("ERROR generateResourceRefreshHeartBeat failed to open " |
| "socket (errno %d)", errno); |
| break; |
| } |
| memset(&server_addr, 0, sizeof(server_addr)); |
| server_addr.sin_family = AF_INET; |
| memcpy(&(server_addr.sin_addr.s_addr), |
| tharg->HostAddrs[0], |
| tharg->HostAddrLength); |
| server_addr.sin_port = htons(rm_master_port); |
| |
| int sockres = 0; |
| while(true) |
| { |
| int on; |
| sockres = connect(fd, |
| (struct sockaddr *)&server_addr, |
| sizeof(server_addr)); |
| if (sockres < 0) |
| { |
| if (errno == EINTR) |
| { |
| continue; |
| } |
| else |
| { |
| write_log("ERROR generateResourceRefreshHeartBeat " |
| "failed to connect to resource manager, " |
| "fd %d (errno %d)", fd, errno); |
| close(fd); |
| fd = -1; |
| } |
| } |
| #ifdef TCP_NODELAY |
| on = 1; |
| if (sockres == 0 && |
| setsockopt(fd, |
| IPPROTO_TCP, TCP_NODELAY, |
| (char *) &on, sizeof(on)) < 0) |
| { |
| write_log("ERROR setsockopt(TCP_NODELAY) failed: %m"); |
| close(fd); |
| fd = -1; |
| sockres = -1; |
| } |
| #endif |
| on = 1; |
| if (sockres == 0 && |
| setsockopt(fd, |
| SOL_SOCKET, SO_KEEPALIVE, |
| (char *) &on, sizeof(on)) < 0) |
| { |
| write_log("ERROR setsockopt(SO_KEEPALIVE) failed: %m"); |
| close(fd); |
| fd = -1; |
| sockres = -1; |
| } |
| |
| break; |
| } |
| |
| if ( sockres < 0 ) |
| { |
| pg_usleep(1000000L); |
| continue; |
| } |
| |
| } |
| |
| RMMessageHead phead = (RMMessageHead)messagehead; |
| RMMessageTail ptail = (RMMessageTail)messagetail; |
| phead->Mark1 = 0; |
| phead->Mark2 = 0; |
| phead->MessageID = REQUEST_QD_REFRESH_RESOURCE; |
| phead->MessageSize = contbuffer.Cursor + 1; |
| |
| appendSelfMaintainBuffer(&sendbuffer, (char *)phead, sizeof(*phead)); |
| appendSelfMaintainBuffer(&sendbuffer, |
| SMBUFF_CONTENT(&contbuffer), |
| getSMBContentSize(&contbuffer)); |
| appendSelfMaintainBuffer(&sendbuffer, (char *)ptail, sizeof(*ptail)); |
| |
| if ( sendWithRetry(fd, |
| SMBUFF_CONTENT(&sendbuffer), |
| getSMBContentSize(&sendbuffer), |
| false) == FUNC_RETURN_OK) |
| { |
| RPCResponseRefreshResourceHeartBeatData response; |
| /* Do not care response at all. */ |
| char recvbuf[sizeof(messagehead) + |
| sizeof(messagetail) + |
| sizeof(response)]; |
| |
| if ( recvWithRetry(fd, |
| recvbuf, |
| sizeof(recvbuf), |
| false) != FUNC_RETURN_OK) |
| { |
| write_log("ERROR generateResourceRefreshHeartBeat recv error " |
| "(errno %d)", errno); |
| close(fd); |
| fd = -1; |
| } |
| } |
| else |
| { |
| write_log("ERROR generateResourceRefreshHeartBeat send error " |
| "(errno %d)", errno); |
| close(fd); |
| fd = -1; |
| } |
| |
| if ( log_min_messages <= DEBUG3 ) |
| { |
| write_log("generateResourceRefreshHeartBeat sent heart-beat."); |
| } |
| } |
| } |
| } |
| |
| destroySelfMaintainBuffer(&sendbuffer); |
| destroySelfMaintainBuffer(&contbuffer); |
| freeHeartBeatThreadArg(&tharg); |
| write_log("generateResourceRefreshHeartBeat exits."); |
| return 0; |
| } |
| |
| void freeHeartBeatThreadArg(HeartBeatThreadArg *arg) |
| { |
| if ( *arg == NULL ) |
| { |
| return; |
| } |
| |
| for ( int i = 0 ; i < (*arg)->HostAddrSize ; ++i ) |
| { |
| free((*arg)->HostAddrs[i]); |
| } |
| free((*arg)->HostAddrs); |
| free(*arg); |
| *arg = NULL; |
| } |
| |
| #define PG_RESQUEUE_STATUS_COLUMNS 10 |
| #define PG_RESQUEUE_STATUS_BUFSIZE 1024 |
| |
| Datum pg_resqueue_status(PG_FUNCTION_ARGS) |
| { |
| static char errorbuf[ERRORMESSAGE_SIZE]; |
| FuncCallContext *funcctx = NULL; |
| Datum result; |
| MemoryContext oldcontext = NULL; |
| HeapTuple tuple = NULL; |
| int res = FUNC_RETURN_OK; |
| |
| if (SRF_IS_FIRSTCALL()) |
| { |
| |
| funcctx = SRF_FIRSTCALL_INIT(); |
| |
| /* Switch context when allocating stuff to be used in later calls */ |
| oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); |
| |
| /* |
| * Call RPC begin |
| */ |
| initializeQD2RMComm(); |
| SelfMaintainBufferData sendbuffer; |
| SelfMaintainBufferData recvbuffer; |
| initializeSelfMaintainBuffer(&sendbuffer, QD2RM_CommContext); |
| initializeSelfMaintainBuffer(&recvbuffer, QD2RM_CommContext); |
| |
| RPCRequestResQueueStatusData request; |
| request.Reserved = 0; |
| appendSMBVar(&sendbuffer, request); |
| |
| errorbuf[0] = '\0'; |
| res = callSyncRPCToRM(SMBUFF_CONTENT(&sendbuffer), |
| getSMBContentSize(&sendbuffer), |
| REQUEST_QD_DUMP_RESQUEUE_STATUS, |
| RESPONSE_QD_DUMP_RESQUEUE_STATUS, |
| &recvbuffer, |
| errorbuf, |
| sizeof(errorbuf)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| destroySelfMaintainBuffer(&sendbuffer); |
| destroySelfMaintainBuffer(&recvbuffer); |
| funcctx->max_calls = 0; |
| ereport(ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", errorbuf))); |
| } |
| |
| RPCResponseResQueueStatus response = SMBUFF_HEAD(RPCResponseResQueueStatus, |
| &recvbuffer); |
| Assert(response->Result == FUNC_RETURN_OK); |
| |
| DQueue funcdata = createDQueue(funcctx->multi_call_memory_ctx); |
| for (int i=0;i<response->queuenum;i++) |
| { |
| ResQueueStatus resq = rm_palloc(funcctx->multi_call_memory_ctx, |
| sizeof(ResQueueStatusData)); |
| sprintf(resq->name, "%s", response->queuedata[i].name); |
| resq->segmem = response->queuedata[i].segmem; |
| resq->segcore = response->queuedata[i].segcore; |
| resq->segsize = response->queuedata[i].segsize; |
| resq->segsizemax = response->queuedata[i].segsizemax; |
| resq->inusemem = response->queuedata[i].inusemem; |
| resq->inusecore = response->queuedata[i].inusecore; |
| resq->holders = response->queuedata[i].holders; |
| resq->waiters = response->queuedata[i].waiters; |
| resq->pausedispatch = response->queuedata[i].pausedispatch; |
| insertDQueueTailNode(funcdata, resq); |
| } |
| |
| destroySelfMaintainBuffer(&sendbuffer); |
| destroySelfMaintainBuffer(&recvbuffer); |
| /* |
| * Call RPC end |
| */ |
| funcctx->user_fctx = (void *)funcdata; |
| |
| TupleDesc tupledesc = CreateTemplateTupleDesc( |
| PG_RESQUEUE_STATUS_COLUMNS, |
| false); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 1, "rsqname", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 2, "segmem", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 3, "segcore", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 4, "segsize", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 5, "segsizemax",TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 6, "inusemem", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 7, "inusecore", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 8, "rsqholders",TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 9, "rsqwaiters",TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 10, "paused", TEXTOID, -1, 0); |
| |
| funcctx->tuple_desc = BlessTupleDesc(tupledesc); |
| |
| /* Return to original context when allocating transient memory */ |
| MemoryContextSwitchTo(oldcontext); |
| |
| funcctx->max_calls = ((DQueue)(funcctx->user_fctx))->NodeCount; |
| } |
| |
| funcctx = SRF_PERCALL_SETUP(); |
| |
| if (funcctx->call_cntr < funcctx->max_calls) |
| { |
| Datum values[PG_RESQUEUE_STATUS_COLUMNS]; |
| bool nulls[PG_RESQUEUE_STATUS_COLUMNS]; |
| char buf[PG_RESQUEUE_STATUS_BUFSIZE]; |
| |
| for (int i=0;i<PG_RESQUEUE_STATUS_COLUMNS;i++) |
| { |
| nulls[i] = false; |
| } |
| |
| DQueue funcdata = (DQueue)(funcctx->user_fctx); |
| ResQueueStatus resq = (ResQueueStatus)getDQueueNodeDataByIndex(funcdata, funcctx->call_cntr); |
| |
| values[0] = PointerGetDatum(cstring_to_text(resq->name)); |
| snprintf(buf, sizeof(buf), "%d", resq->segmem); |
| values[1] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%f", resq->segcore); |
| values[2] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%d", resq->segsize); |
| values[3] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%d", resq->segsizemax); |
| values[4] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%d", resq->inusemem); |
| values[5] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%f", resq->inusecore); |
| values[6] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%d", resq->holders); |
| values[7] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%d", resq->waiters); |
| values[8] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%c", resq->pausedispatch); |
| values[9] = PointerGetDatum(cstring_to_text(buf)); |
| |
| /* Build and return the tuple. */ |
| tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); |
| result = HeapTupleGetDatum(tuple); |
| |
| SRF_RETURN_NEXT(funcctx, result); |
| } |
| else { |
| SRF_RETURN_DONE(funcctx); |
| } |
| } |
| |
| Datum pg_resqueue_status_kv(PG_FUNCTION_ARGS) |
| { |
| return 0; |
| } |
| |
| int dumpResourceManagerStatus(uint32_t type, |
| const char *dump_file, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| static char errorbuf2[ERRORMESSAGE_SIZE]; |
| initializeQD2RMComm(); |
| |
| int res = FUNC_RETURN_OK; |
| SelfMaintainBufferData sendbuffer; |
| SelfMaintainBufferData recvbuffer; |
| initializeSelfMaintainBuffer(&sendbuffer, QD2RM_CommContext); |
| initializeSelfMaintainBuffer(&recvbuffer, QD2RM_CommContext); |
| |
| RPCRequestDumpStatusData request; |
| request.type = type; |
| request.Reserved = 0; |
| strncpy(request.dump_file, dump_file, sizeof(request.dump_file) - 1); |
| appendSMBVar(&sendbuffer, request); |
| |
| errorbuf2[0] = '\0'; |
| res = callSyncRPCToRM(SMBUFF_CONTENT(&sendbuffer), |
| getSMBContentSize(&sendbuffer), |
| REQUEST_QD_DUMP_STATUS, |
| RESPONSE_QD_DUMP_STATUS, |
| &recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, |
| "failed to dump resource manager status, %s", |
| errorbuf2); |
| goto exit; |
| } |
| |
| RPCResponseDumpStatus response = |
| SMBUFF_HEAD(RPCResponseDumpStatus, &recvbuffer); |
| |
| if ( response->Result != FUNC_RETURN_OK ) |
| { |
| char *errorstr = SMBUFF_CONTENT(&recvbuffer) + |
| sizeof(RPCResponseDumpStatusData); |
| snprintf(errorbuf, errorbufsize, |
| "failed to dump resource manager status, %s", |
| errorstr); |
| goto exit; |
| } |
| |
| exit: |
| destroySelfMaintainBuffer(&sendbuffer); |
| destroySelfMaintainBuffer(&recvbuffer); |
| return res; |
| } |
| |
| extern Datum pg_explain_resource_distribution(PG_FUNCTION_ARGS) |
| { |
| static char errorbuf[ERRORMESSAGE_SIZE]; |
| FuncCallContext *funcctx = NULL; |
| Datum result; |
| MemoryContext oldcontext = NULL; |
| HeapTuple tuple = NULL; |
| SimpString role; |
| int fixsegcountmin = 0; |
| int fixsegcountmax = 0; |
| int splitsize; |
| int slicesize; |
| SimpString locality; |
| |
| if (SRF_IS_FIRSTCALL()) |
| { |
| |
| funcctx = SRF_FIRSTCALL_INIT(); |
| |
| /* Switch context when allocating stuff to be used in later calls */ |
| oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); |
| |
| /* |
| * Get arguments |
| * pg_explain_resource_distribution('gpadmin',-1,1200, |
| * ‘host1,500,host2,300,host3,100,...’) |
| * |
| */ |
| if ( PG_ARGISNULL(0) ) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_E_R_I_E_NULL_VALUE_NOT_ALLOWED), |
| errmsg("Role name must be specified."), |
| errhint("Restart the server and try again"))); |
| } |
| |
| initSimpleString(&role, funcctx->multi_call_memory_ctx); |
| setSimpleStringNoLen(&role, GET_STR(PG_GETARG_TEXT_P(0))); |
| |
| slicesize = 0; |
| if ( !PG_ARGISNULL(1) ) |
| { |
| slicesize = PG_GETARG_INT32(1); |
| } |
| |
| splitsize = 0; |
| if ( !PG_ARGISNULL(2) ) |
| { |
| splitsize = PG_GETARG_INT32(2); |
| } |
| |
| fixsegcountmin = 0; |
| if ( !PG_ARGISNULL(3) ) |
| { |
| fixsegcountmin = PG_GETARG_INT32(3); |
| } |
| |
| fixsegcountmax = 0; |
| if ( !PG_ARGISNULL(4) ) |
| { |
| fixsegcountmax = PG_GETARG_INT32(4); |
| } |
| |
| initSimpleString(&locality, funcctx->multi_call_memory_ctx); |
| if ( !PG_ARGISNULL(5) ) |
| { |
| setSimpleStringNoLen(&locality, GET_STR(PG_GETARG_TEXT_P(5))); |
| } |
| |
| /* Call HAWQ RM RPC to get expected result. */ |
| int ret; |
| int resourceId = -1; |
| /* STEP 1. Create Context */ |
| ret = createNewResourceContext(&resourceId); |
| if ( ret != FUNC_RETURN_OK ) |
| { |
| elog(ERROR, "Fail to create resource context. %d", ret); |
| } |
| /* STEP 2. Register. */ |
| errorbuf[0] = '\0'; |
| ret = registerConnectionInRMByStr(resourceId, |
| role.Str, |
| errorbuf, |
| sizeof(errorbuf)); |
| if ( ret != FUNC_RETURN_OK ) |
| { |
| elog(ERROR, "%s", errorbuf); |
| } |
| /* STEP 3. Acquire resource. */ |
| |
| /* Build locality array. */ |
| int infosize = 0; |
| HostnameVolumeInfo *volinfo = NULL; |
| int toksize = 0; |
| SimpStringPtr tokens = NULL; |
| SimpleStringTokens(&locality, ',', &tokens, &toksize); |
| if (toksize > 1 && toksize % 2 == 0) |
| { |
| infosize = toksize / 2; |
| volinfo = rm_palloc(funcctx->multi_call_memory_ctx, |
| sizeof(HostnameVolumeInfo) * infosize); |
| for ( int i = 0 ; i < infosize ; ++i ) |
| { |
| SimpStringPtr token1 = &(tokens[(i<<1)]); |
| SimpStringPtr token2 = &(tokens[(i<<1)+1]); |
| strcpy(volinfo[i].hostname, token1->Str); |
| SimpleStringToInt64(token2, &(volinfo[i].datavolume)); |
| } |
| } |
| freeSimpleStringTokens(&locality, &tokens, toksize); |
| |
| ret = acquireResourceFromRM(resourceId, |
| gp_session_id, |
| slicesize, |
| splitsize, |
| volinfo, |
| infosize, |
| fixsegcountmax, |
| fixsegcountmin, |
| errorbuf, |
| sizeof(errorbuf)); |
| if ( ret != FUNC_RETURN_OK ) |
| { |
| elog(ERROR, "%s", errorbuf); |
| } |
| |
| /* Build result. */ |
| funcctx->user_fctx = (void *)buildResourceDistRowData( |
| funcctx->multi_call_memory_ctx, |
| resourceId, |
| volinfo, |
| infosize); |
| |
| /* STEP 4. Return resource. */ |
| ret = returnResource(resourceId, errorbuf, sizeof(errorbuf)); |
| if ( ret != FUNC_RETURN_OK ) |
| { |
| elog(ERROR, "%s", errorbuf); |
| } |
| |
| /* STEP 5. Unregister. */ |
| ret = unregisterConnectionInRM(resourceId, errorbuf, sizeof(errorbuf)); |
| if ( ret != FUNC_RETURN_OK ) |
| { |
| elog(ERROR, "%s", errorbuf); |
| } |
| |
| /* STEP 6. Remove Context */ |
| releaseResourceContext(resourceId); |
| |
| /* STEP 7. Construct a tuple descriptor for the result rows. */ |
| TupleDesc tupledesc = CreateTemplateTupleDesc( |
| PG_EXPLAIN_RESOURCE_DISTRIBUTION_COLUMNS, |
| false); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 1, "hostname", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 2, "vsegcount", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 3, "memory_mb", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 4, "core", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 5, "mapped_hostname", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 6, "splitcount", TEXTOID, -1, 0); |
| |
| funcctx->tuple_desc = BlessTupleDesc(tupledesc); |
| |
| /* Return to original context when allocating transient memory */ |
| MemoryContextSwitchTo(oldcontext); |
| |
| funcctx->max_calls = ((DQueue)(funcctx->user_fctx))->NodeCount; |
| } |
| |
| funcctx = SRF_PERCALL_SETUP(); |
| |
| if (funcctx->call_cntr < funcctx->max_calls) |
| { |
| Datum values[PG_EXPLAIN_RESOURCE_DISTRIBUTION_COLUMNS]; |
| bool nulls[PG_EXPLAIN_RESOURCE_DISTRIBUTION_COLUMNS]; |
| char buf[PG_EXPLAIN_RESOURCE_DISTRIBUTION_BUFSIZE]; |
| |
| nulls[0] = false; |
| nulls[1] = false; |
| nulls[2] = false; |
| nulls[3] = false; |
| nulls[4] = true; |
| nulls[5] = true; |
| |
| /* Go to the expected row to return. */ |
| DQueue restable = (DQueue)(funcctx->user_fctx); |
| ResourceDistRow resrow = (ResourceDistRow) |
| getDQueueNodeDataByIndex(restable, funcctx->call_cntr); |
| |
| values[0] = PointerGetDatum(cstring_to_text(resrow->hostname)); |
| snprintf(buf, sizeof(buf), "%d", resrow->segcount); |
| values[1] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%d", resrow->segmem); |
| values[2] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%f", resrow->segcore); |
| values[3] = PointerGetDatum(cstring_to_text(buf)); |
| if ( resrow->mappedname != NULL ) |
| { |
| nulls[4] = false; |
| nulls[5] = false; |
| values[4] = PointerGetDatum(cstring_to_text(resrow->mappedname)); |
| snprintf(buf, sizeof(buf), "%d", resrow->splitcount); |
| values[5] = PointerGetDatum(cstring_to_text(buf)); |
| } |
| |
| /* Build and return the tuple. */ |
| tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); |
| result = HeapTupleGetDatum(tuple); |
| |
| SRF_RETURN_NEXT(funcctx, result); |
| } |
| else |
| { |
| SRF_RETURN_DONE(funcctx); |
| } |
| } |
| |
| DQueue buildResourceDistRowData(MCTYPE context, |
| int resourceid, |
| HostnameVolumeInfo *volinfo, |
| int infosize) |
| { |
| QDResourceContext rescontext = NULL; |
| getAllocatedResourceContext(resourceid, &rescontext); |
| |
| DQueue result = createDQueue(context); |
| |
| for ( int i = 0 ; i < rescontext->QD_SegCount ; ++i ) { |
| bool foundold = false; |
| DQUEUE_LOOP_BEGIN(result, iter, ResourceDistRow, resrow) |
| if ( resrow->pointerkey == rescontext->QD_ResourceList[i]->QD_SegInfo ) { |
| resrow->segcount++; |
| resrow->segmem += rescontext->QD_SegMemoryMB; |
| resrow->segcore += rescontext->QD_SegCore; |
| foundold = true; |
| break; |
| } |
| DQUEUE_LOOP_END |
| |
| if ( foundold ) |
| continue; |
| ResourceDistRow newrow = rm_palloc0(context, sizeof(ResourceDistRowData)); |
| newrow->pointerkey = rescontext->QD_ResourceList[i]->QD_SegInfo; |
| newrow->hostname = (char *) |
| rm_palloc0(context, |
| rescontext->QD_ResourceList[i]->QD_SegInfo->HostNameLen + 1); |
| strcpy(newrow->hostname, |
| GET_SEGINFO_HOSTNAME(rescontext->QD_ResourceList[i]->QD_SegInfo)); |
| newrow->segcount = 1; |
| newrow->segmem = rescontext->QD_SegMemoryMB; |
| newrow->segcore = rescontext->QD_SegCore; |
| newrow->mappedname = NULL; |
| |
| if ( rescontext->QD_ResourceList[i]->QD_HdfsHostName != NULL ) { |
| newrow->mappedname = |
| rm_palloc0(context, |
| strlen(rescontext->QD_ResourceList[i]->QD_HdfsHostName) + 1); |
| strcpy(newrow->mappedname, |
| rescontext->QD_ResourceList[i]->QD_HdfsHostName); |
| newrow->splitcount = 0; |
| |
| for ( int j = 0 ; j < infosize ; ++j ) { |
| if ( strcmp(volinfo[j].hostname, newrow->mappedname) == 0 ) { |
| newrow->splitcount = volinfo[j].datavolume; |
| break; |
| } |
| } |
| } |
| insertDQueueTailNode(result, newrow); |
| } |
| |
| return result; |
| } |
| |
| void *buildResourceActionPlayRowData(MCTYPE context, |
| List *actions) |
| { |
| TestActionPlay userdata = (TestActionPlay) |
| rm_palloc0(context, sizeof(TestActionPlayData)); |
| userdata->ActionConns = actions; |
| userdata->ActionConnCell = list_head(actions); |
| if (userdata->ActionConnCell != NULL) |
| { |
| TestActionConn actconn = (TestActionConn)lfirst(userdata->ActionConnCell); |
| userdata->ActionItemCell = list_head(actconn->Actions); |
| } |
| return userdata; |
| } |
| |
| void freeResourceActionPlayRowData(MCTYPE context, TestActionPlay *actplay) |
| { |
| MEMORY_CONTEXT_SWITCH_TO(context) |
| /* Free each action item and then its action conn instance. */ |
| ListCell *cell = NULL; |
| foreach(cell, (*actplay)->ActionConns) |
| { |
| TestActionConn actconn = (TestActionConn)lfirst(cell); |
| ListCell *itemcell = NULL; |
| foreach(itemcell, actconn->Actions) |
| { |
| TestActionItem actitem = (TestActionItem)lfirst(itemcell); |
| if ( actitem->ResultMessage != NULL ) |
| { |
| rm_pfree(context, actitem->ResultMessage); |
| } |
| |
| ListCell *argcell = NULL; |
| foreach(argcell, actitem->Arguments) |
| { |
| char *argstr = (char *)lfirst(argcell); |
| rm_pfree(context, argstr); |
| } |
| list_free(actitem->Arguments); |
| actitem->Arguments = NULL; |
| rm_pfree(context, actitem); |
| } |
| list_free(actconn->Actions); |
| actconn->Actions = NULL; |
| rm_pfree(context, actconn); |
| } |
| rm_pfree(context, *actplay); |
| *actplay = NULL; |
| |
| MEMORY_CONTEXT_SWITCH_BACK |
| } |
| extern Datum pg_play_resource_action(PG_FUNCTION_ARGS) |
| { |
| FuncCallContext *funcctx = NULL; |
| Datum result = 0; |
| MemoryContext oldcontext = NULL; |
| SimpString actfile; |
| SimpString outfile; |
| List *actions = NULL; |
| HeapTuple tuple = NULL; |
| |
| if (SRF_IS_FIRSTCALL()) |
| { |
| funcctx = SRF_FIRSTCALL_INIT(); |
| /* Switch context when allocating stuff to be used in later calls. */ |
| oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); |
| /* Get arguments. */ |
| if ( PG_ARGISNULL(0) ) { |
| ereport(ERROR, |
| (errcode(ERRCODE_E_R_I_E_NULL_VALUE_NOT_ALLOWED), |
| errmsg("input action file name must be specified."), |
| errhint("Specify correct file name."))); |
| } |
| |
| initSimpleString(&actfile, funcctx->multi_call_memory_ctx); |
| setSimpleStringNoLen(&actfile, GET_STR(PG_GETARG_TEXT_P(0))); |
| initSimpleString(&outfile, funcctx->multi_call_memory_ctx); |
| setSimpleStringNoLen(&outfile, GET_STR(PG_GETARG_TEXT_P(1))); |
| |
| initializeQD2RMComm(); |
| |
| /* Load action script. */ |
| int res = loadTestActionScript(actfile.Str, &actions); |
| if ( res != FUNC_RETURN_OK ) |
| { |
| elog(ERROR, "Fail to load resource play actions from %s.", actfile.Str); |
| } |
| |
| /* Perform action script. */ |
| runTestActionScript(actions, outfile.Str); |
| |
| /* Collect results. connname, action, actionfull, resultcode, resultmessage. */ |
| ListCell *cell = NULL; |
| int actsize = 0; |
| foreach(cell, actions) |
| { |
| TestActionConn actconn = (TestActionConn)lfirst(cell); |
| actsize += list_length(actconn->Actions); |
| } |
| |
| elog(LOG, "Total action item size is %d", actsize); |
| |
| /* STEP 7. Construct a tuple descriptor for the result rows. */ |
| TupleDesc tupledesc = CreateTemplateTupleDesc(PG_PLAY_RESOURCE_ACTION_COLUMNS, false); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 1, "conn", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 2, "action", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 3, "actionfull", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 4, "result", TEXTOID, -1, 0); |
| TupleDescInitEntry(tupledesc, (AttrNumber) 5, "message", TEXTOID, -1, 0); |
| |
| funcctx->tuple_desc = BlessTupleDesc(tupledesc); |
| |
| /* Return to original context when allocating transient memory */ |
| MemoryContextSwitchTo(oldcontext); |
| |
| /* Build result. */ |
| funcctx->max_calls = actsize; |
| |
| /* Initialize pointer to get ready for returning rows. */ |
| funcctx->user_fctx = buildResourceActionPlayRowData(QD2RM_CommContext, |
| actions); |
| |
| |
| } |
| |
| funcctx = SRF_PERCALL_SETUP(); |
| |
| if (funcctx->call_cntr < funcctx->max_calls) |
| { |
| Datum values[PG_PLAY_RESOURCE_ACTION_COLUMNS]; |
| bool nulls[PG_PLAY_RESOURCE_ACTION_COLUMNS]; |
| char buf[PG_PLAY_RESOURCE_ACTION_BUFSIZE]; |
| |
| nulls[0] = false; |
| nulls[1] = false; |
| nulls[2] = false; |
| nulls[3] = false; |
| nulls[4] = true; |
| |
| /* Go to the expected row to return. */ |
| TestActionPlay userdata = (TestActionPlay)(funcctx->user_fctx); |
| |
| if ( userdata->ActionItemCell == NULL ) |
| { |
| freeResourceActionPlayRowData(QD2RM_CommContext, &userdata); |
| SRF_RETURN_DONE(funcctx); |
| } |
| |
| TestActionConn conn = (TestActionConn)lfirst(userdata->ActionConnCell); |
| TestActionItem item = (TestActionItem)lfirst(userdata->ActionItemCell); |
| |
| values[0] = PointerGetDatum(cstring_to_text(conn->ConnectionName)); |
| values[1] = PointerGetDatum(cstring_to_text(item->ActionName)); |
| |
| strcpy(buf, item->ActionName); |
| ListCell *cell = NULL; |
| foreach(cell, item->Arguments) |
| { |
| char * argstr = (char *)lfirst(cell); |
| if ( strlen(argstr) + strlen(buf) + 1 < sizeof(buf) ) |
| { |
| strcat(buf, "$"); |
| strcat(buf, argstr); |
| } |
| } |
| |
| values[2] = PointerGetDatum(cstring_to_text(buf)); |
| snprintf(buf, sizeof(buf), "%d", item->ResultCode); |
| values[3] = PointerGetDatum(cstring_to_text(buf)); |
| if ( item->ResultMessage != NULL ) |
| { |
| nulls[4] = false; |
| values[4] = PointerGetDatum(cstring_to_text(item->ResultMessage)); |
| } |
| |
| /* Build and return the tuple. */ |
| tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); |
| result = HeapTupleGetDatum(tuple); |
| |
| /* Try next action item. */ |
| userdata->ActionItemCell = lnext(userdata->ActionItemCell); |
| if ( userdata->ActionItemCell == NULL ) |
| { |
| userdata->ActionConnCell = lnext(userdata->ActionConnCell); |
| if( userdata->ActionConnCell != NULL ) |
| { |
| TestActionConn conn = (TestActionConn)lfirst(userdata->ActionConnCell); |
| userdata->ActionItemCell = list_head(conn->Actions); |
| } |
| } |
| SRF_RETURN_NEXT(funcctx, result); |
| } |
| else |
| { |
| TestActionPlay userdata = (TestActionPlay)(funcctx->user_fctx); |
| freeResourceActionPlayRowData(QD2RM_CommContext, &userdata); |
| SRF_RETURN_DONE(funcctx); |
| } |
| } |
| |
| |
| int findFile(const char *filename) |
| { |
| struct stat buff; |
| return stat(filename, &buff) == 0 ? FUNC_RETURN_OK : FUNC_RETURN_FAIL; |
| } |
| |
| int createFile(const char *filename) |
| { |
| int fd = 0; |
| fd = open(filename, O_RDWR|O_CREAT, S_IRUSR | S_IWUSR); |
| int res = fd > 0 ? FUNC_RETURN_OK : FUNC_RETURN_FAIL; |
| if ( fd >= 0 ) |
| { |
| close(fd); |
| } |
| return res; |
| } |
| |
| int removeFile(const char *filename) |
| { |
| int res = unlink(filename); |
| return res == 0 ? FUNC_RETURN_OK : FUNC_RETURN_FAIL; |
| } |
| |
| int setResourceManagerQuotaControl(bool pause, |
| int phase, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| static char errorbuf2[ERRORMESSAGE_SIZE]; |
| |
| initializeQD2RMComm(); |
| |
| int res = FUNC_RETURN_OK; |
| SelfMaintainBufferData sendbuffer; |
| SelfMaintainBufferData recvbuffer; |
| initializeSelfMaintainBuffer(&sendbuffer, QD2RM_CommContext); |
| initializeSelfMaintainBuffer(&recvbuffer, QD2RM_CommContext); |
| |
| RPCRequestQuotaControlData request; |
| request.Pause = pause; |
| request.Phase = phase; |
| |
| appendSMBVar(&sendbuffer, request); |
| |
| elog(LOG, "Request GRM container life cycle phase %d %s", |
| phase, |
| pause?"paused":"resumed"); |
| |
| errorbuf2[0] = '\0'; |
| res = callSyncRPCToRM(SMBUFF_CONTENT(&sendbuffer), |
| getSMBContentSize(&sendbuffer), |
| REQUEST_QD_QUOTA_CONTROL, |
| RESPONSE_QD_QUOTA_CONTROL, |
| &recvbuffer, |
| errorbuf2, |
| sizeof(errorbuf2)); |
| |
| if ( res != FUNC_RETURN_OK ) |
| { |
| snprintf(errorbuf, errorbufsize, |
| "failed to set resource manager resource quota control, %s", |
| errorbuf2); |
| goto exit; |
| } |
| |
| RPCResponseQuotaControl response = SMBUFF_HEAD(RPCResponseQuotaControl, |
| &recvbuffer); |
| Assert( response->Result == FUNC_RETURN_OK ); |
| elog(LOG, "Succeeded in setting container life cycle phase %d %s", |
| phase, |
| pause?"paused":"resumed"); |
| exit: |
| destroySelfMaintainBuffer(&sendbuffer); |
| destroySelfMaintainBuffer(&recvbuffer); |
| return res; |
| } |
| |
| int loadTestActionScript(const char *filename, List **actions) |
| { |
| Assert(actions != NULL && *actions == NULL); |
| |
| ListCell *cell = NULL; |
| ListCell *itemcell = NULL; |
| ListCell *argcell = NULL; |
| |
| char line[1024]; |
| FILE *fp = fopen(filename, "r"); |
| if ( fp == NULL ) |
| { |
| return FUNC_RETURN_FAIL; |
| } |
| |
| elog(LOG, "Start loading action file %s", filename); |
| |
| MEMORY_CONTEXT_SWITCH_TO(QD2RM_CommContext) |
| |
| while( fgets(line, sizeof(line)-1, fp) != NULL ) |
| { |
| /* Remove white space at the end of the line. */ |
| int linesize = strlen(line); |
| while( line[linesize-1] == '\n' || |
| line[linesize-1] == '\r' || |
| line[linesize-1] == '\t' || |
| line[linesize-1] == ' ' ) |
| { |
| line[linesize-1] = '\0'; |
| linesize--; |
| } |
| |
| elog(LOG, "Loaded action line : %s", line); |
| |
| TestActionItem newitem = (TestActionItem) |
| rm_palloc0(QD2RM_CommContext, |
| sizeof(TestActionItemData)); |
| newitem->Arguments = NULL; |
| newitem->ResultCode = FUNC_NOT_EXECUTED; |
| newitem->ResultMessage = NULL; |
| |
| /* Split based on ':' and build action item. */ |
| char *brk = NULL; |
| char *word = NULL; |
| int argidx = 0; |
| char connname[64]; |
| for ( word = strtok_r(line, "$", &brk) ; |
| word ; |
| word = strtok_r(NULL, "$", &brk), argidx++ ) |
| { |
| if ( argidx == 0 ) |
| { |
| /* it is a connection name. */ |
| strncpy(connname, word, sizeof(connname)-1); |
| elog(LOG, "Get action play argument connection name %s", connname); |
| } |
| else if ( argidx == 1 ) |
| { |
| /* it is an action name. */ |
| if ( strlen(word) < sizeof(newitem->ActionName) ) |
| { |
| strcpy(newitem->ActionName, word); |
| elog(LOG, "Get action play argument action name %s", newitem->ActionName); |
| } |
| else |
| { |
| elog(ERROR, "Too long action name %s that is not legal.", |
| newitem->ActionName); |
| } |
| } |
| else |
| { |
| /* it is an argument string. */ |
| char *newarg = rm_palloc0(QD2RM_CommContext, strlen(word) + 1); |
| strcpy(newarg, word); |
| newitem->Arguments = lappend(newitem->Arguments, newarg); |
| elog(LOG, "Appended action play argument %s", newarg); |
| } |
| } |
| |
| /* Add action item into the action list. */ |
| bool found = false; |
| foreach(cell, *actions) |
| { |
| TestActionConn actconn = (TestActionConn)lfirst(cell); |
| if ( strcmp(actconn->ConnectionName, connname) == 0 ) |
| { |
| actconn->Actions = lappend(actconn->Actions, newitem); |
| if ( actconn->CurAction == NULL ) |
| { |
| actconn->CurAction = list_head(actconn->Actions); |
| } |
| found = true; |
| break; |
| } |
| } |
| |
| if ( !found ) |
| { |
| TestActionConn newactconn = rm_palloc0(QD2RM_CommContext, |
| sizeof(TestActionConnData)); |
| newactconn->Actions = NULL; |
| newactconn->CurAction = NULL; |
| newactconn->ConnectionID = -1; |
| newactconn->Actions = lappend(newactconn->Actions, newitem); |
| strcpy(newactconn->ConnectionName, connname); |
| *actions = lappend(*actions, newactconn); |
| elog(LOG, "Build action play connection %s", connname); |
| } |
| } |
| |
| MEMORY_CONTEXT_SWITCH_BACK |
| |
| fclose(fp); |
| |
| /* Output all action items to log. */ |
| foreach(cell, *actions) |
| { |
| TestActionConn actconn = (TestActionConn)lfirst(cell); |
| foreach(itemcell, actconn->Actions) |
| { |
| TestActionItem actitem = (TestActionItem)lfirst(itemcell); |
| SelfMaintainBufferData smb; |
| initializeSelfMaintainBuffer(&smb, QD2RM_CommContext); |
| appendSMBStr(&smb, actitem->ActionName); |
| appendSMBStr(&smb, "::"); |
| |
| foreach(argcell, actitem->Arguments) |
| { |
| char *argstr = (char *)lfirst(argcell); |
| appendSMBStr(&smb, argstr); |
| appendSMBStr(&smb, ","); |
| } |
| |
| appendSMBStr(&smb, "$"); |
| elog(LOG, "Loaded action play :: %s", SMBUFF_CONTENT(&smb)); |
| destroySelfMaintainBuffer(&smb); |
| } |
| } |
| |
| return FUNC_RETURN_OK; |
| } |
| |
| int runTestActionScript(List *actions, const char *filename) |
| { |
| static char errorbuf[ERRORMESSAGE_SIZE]; |
| int errorcode = FUNC_RETURN_OK; |
| int ret = FUNC_RETURN_OK; |
| ListCell *conncell = NULL; |
| bool alldone = false; |
| |
| errorbuf[0] = '\0'; |
| |
| while( !alldone ) |
| { |
| foreach(conncell, actions) |
| { |
| errorcode = FUNC_RETURN_OK; |
| ret = FUNC_RETURN_OK; |
| |
| /* Try one action connection. */ |
| TestActionConn actconn = (TestActionConn)lfirst(conncell); |
| if ( actconn->CurAction == NULL ) |
| { |
| continue; |
| } |
| |
| /* Try the first action item. */ |
| TestActionItem actitem = (TestActionItem)lfirst(actconn->CurAction); |
| if ( strcmp(actitem->ActionName, RESOURCE_ACTION_PLAY_WAIT) == 0 ) |
| { |
| const char *filename = (const char *)lfirst(list_head(actitem->Arguments)); |
| if ( findFile(filename) != FUNC_RETURN_OK ) |
| { |
| /* Can not find the file yet, so , try next possible action |
| * connection. */ |
| continue; |
| } |
| } |
| |
| /* Move to next action. */ |
| actconn->CurAction = lnext(actconn->CurAction); |
| |
| if ( strcmp(actitem->ActionName, RESOURCE_ACTION_PLAY_WAIT) == 0 ) |
| { |
| /* We have processed the action. */ |
| ret = FUNC_RETURN_OK; |
| } |
| else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_PLAY_REGISTER) == 0 ) |
| { |
| /* Create Context */ |
| ret = createNewResourceContext(&(actconn->ResourceID)); |
| if ( ret != FUNC_RETURN_OK ) |
| { |
| elog(ERROR, "Fail to create resource context. %d", ret); |
| } |
| /* STEP 2. Register. */ |
| char *role = (char *)lfirst(list_head(actitem->Arguments)); |
| errorbuf[0] = '\0'; |
| ret = registerConnectionInRMByStr(actconn->ResourceID, |
| role, |
| errorbuf, |
| sizeof(errorbuf)); |
| if ( ret == FUNC_RETURN_OK ) |
| { |
| actconn->ConnectionID = QD2RM_ResourceSets[actconn->ResourceID]->QD_Conn_ID; |
| actitem->ResultMessage = rm_palloc0(QD2RM_CommContext, |
| PG_PLAY_RESOURCE_ACTION_BUFSIZE); |
| snprintf(actitem->ResultMessage, |
| PG_PLAY_RESOURCE_ACTION_BUFSIZE - 1, |
| "ResourceID:%d,ConnID:%d", |
| actconn->ResourceID, |
| actconn->ConnectionID); |
| } |
| } |
| else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_PLAY_ALLOCATE) == 0 ) |
| { |
| int sessionid = 0; |
| int slice_size = 0; |
| int split_size = 0; |
| int max_seg_count = 0; |
| int min_seg_count = 0; |
| int infosize = 0; |
| int i = 0; |
| char outfilename[512]; |
| |
| |
| HostnameVolumeInfo *volinfo = NULL; |
| ListCell *cell = NULL; |
| |
| outfilename[0] = '\0'; |
| |
| foreach(cell, actitem->Arguments) |
| { |
| char *argstr = (char *)lfirst(cell); |
| switch(i) |
| { |
| case 0: |
| sessionid = atoi(argstr); |
| break; |
| case 1: |
| slice_size = atoi(argstr); |
| break; |
| case 2: |
| split_size = atoi(argstr); |
| break; |
| case 3: |
| min_seg_count = atoi(argstr); |
| break; |
| case 4: |
| max_seg_count = atoi(argstr); |
| break; |
| case 5: |
| { |
| /* Build locality array. */ |
| SimpString locality; |
| SimpStringPtr tokens = NULL; |
| int toksize = 0; |
| initSimpleString(&locality, QD2RM_CommContext); |
| setSimpleStringNoLen(&locality, argstr); |
| SimpleStringTokens(&locality, ',', &tokens, &toksize); |
| if (toksize > 1 && toksize % 2 == 0) |
| { |
| infosize = toksize / 2; |
| volinfo = rm_palloc(QD2RM_CommContext, |
| sizeof(HostnameVolumeInfo) * infosize); |
| for ( int i = 0 ; i < infosize ; ++i ) |
| { |
| SimpStringPtr token1 = &(tokens[(i<<1)]); |
| SimpStringPtr token2 = &(tokens[(i<<1)+1]); |
| strcpy(volinfo[i].hostname, token1->Str); |
| SimpleStringToInt64(token2, &(volinfo[i].datavolume)); |
| elog(LOG, "locality data host %s with %s splits.", |
| token1->Str, |
| token2->Str); |
| } |
| elog(LOG, "Parse locality data %d hosts", infosize); |
| } |
| freeSimpleStringTokens(&locality, &tokens, toksize); |
| freeSimpleStringContent(&locality); |
| break; |
| } |
| case 6: |
| strncpy(outfilename, argstr, sizeof(outfilename)-1); |
| } |
| i++; |
| } |
| |
| errorbuf[0] = '\0'; |
| ret = acquireResourceFromRM(actconn->ResourceID, |
| sessionid, |
| slice_size, |
| split_size, |
| volinfo, |
| infosize, |
| max_seg_count, |
| min_seg_count, |
| errorbuf, |
| sizeof(errorbuf)); |
| |
| if ( ret == FUNC_RETURN_OK ) |
| { |
| if ( outfilename[0] == '\0') |
| { |
| /* Set default output file name. */ |
| snprintf(outfilename, sizeof(outfilename)-1, |
| "%s." UINT64_FORMAT, |
| RESOURCE_ACTION_PLAY_ALLOCATE_OUT, |
| gettime_microsec()); |
| } |
| |
| actitem->ResultMessage = rm_palloc0(QD2RM_CommContext, |
| PG_PLAY_RESOURCE_ACTION_BUFSIZE); |
| snprintf(actitem->ResultMessage, |
| PG_PLAY_RESOURCE_ACTION_BUFSIZE - 1, |
| "Acquired resource in %s", |
| outfilename); |
| |
| /* Output the acquired resource details into the specified file. */ |
| outputAllcatedResourceToFile(outfilename, actconn->ResourceID); |
| } |
| } |
| else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_PLAY_RETURN) == 0 ) |
| { |
| errorbuf[0] = '\0'; |
| /* STEP 4. Return resource. */ |
| ret = returnResource(actconn->ResourceID, |
| errorbuf, |
| sizeof(errorbuf)); |
| } |
| else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_PLAY_UNREGISTER) == 0 ) |
| { |
| errorbuf[0] = '\0'; |
| /* STEP 5. Unregister. */ |
| ret = unregisterConnectionInRM(actconn->ResourceID, |
| errorbuf, |
| sizeof(errorbuf)); |
| |
| /* STEP 6. Remove Context */ |
| releaseResourceContext(actconn->ResourceID); |
| if ( ret == FUNC_RETURN_OK ) |
| { |
| actconn->ResourceID = -1; |
| actconn->ConnectionID = -1; |
| } |
| } |
| else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_PLAY_CREATE) == 0 ) |
| { |
| const char *filename = (const char *) |
| lfirst(list_head(actitem->Arguments)); |
| ret = createFile(filename); |
| } |
| else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_PLAY_REMOVE) == 0 ) |
| { |
| const char *filename = (const char *) |
| lfirst(list_head(actitem->Arguments)); |
| ret = removeFile(filename); |
| } |
| else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_RPC_FAULT) == 0 ) |
| { |
| /* Set the action into message handler error inject. */ |
| ListCell *cell = list_head(actitem->Arguments); |
| const char *messageidstr = (const char *)lfirst(cell); |
| cell = lnext(cell); |
| const char *actionstr = (const char *)lfirst(cell); |
| cell = lnext(cell); |
| int countthread = atoi((const char *)lfirst(cell)); |
| |
| setMessageErrorInject(messageidstr, actionstr, countthread); |
| } |
| else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_RPC_FAULT_RM) == 0 ) |
| { |
| } |
| else if ( strcmp(actitem->ActionName, RESOURCE_ACTION_QUOTA_PAUSE) == 0 ) |
| { |
| /* |
| * The first argument should be pause or resume quota changes; |
| * The second argument should be which phase should be paused or |
| * resumed. |
| */ |
| ListCell *cell = list_head(actitem->Arguments); |
| const char *action = (const char *)lfirst(cell); |
| cell = lnext(cell); |
| int phase = atoi((const char *)lfirst(cell)); |
| ret = setResourceManagerQuotaControl(strcmp(action, "pause") == 0 ? |
| true: |
| false, |
| phase, |
| errorbuf, |
| sizeof(errorbuf)); |
| } |
| |
| actitem->ResultCode = ret; |
| actitem->ResultCode = actitem->ResultCode == FUNC_RETURN_OK ? |
| errorcode : |
| actitem->ResultCode; |
| if ( actitem->ResultCode != FUNC_RETURN_OK ) |
| { |
| actitem->ResultMessage = rm_palloc(QD2RM_CommContext, |
| strlen(errorbuf)+1); |
| strcpy(actitem->ResultMessage, errorbuf); |
| } |
| |
| break; |
| } |
| |
| /* Check if all actions are done. */ |
| alldone = true; |
| foreach(conncell, actions) |
| { |
| TestActionConn actconn = (TestActionConn)lfirst(conncell); |
| if ( actconn->CurAction != NULL ) |
| { |
| alldone = false; |
| break; |
| } |
| |
| } |
| |
| pg_usleep(100000); |
| } |
| return FUNC_RETURN_OK; |
| } |
| |
| void outputAllcatedResourceToFile(const char *filename, int resourceid) |
| { |
| QDResourceContext rescontext = NULL; |
| FILE *fp = fopen(filename, "w"); |
| if ( fp == NULL ) |
| { |
| return; |
| } |
| |
| getAllocatedResourceContext(resourceid, &rescontext); |
| if (rescontext != NULL) |
| { |
| /* Output memory quota of one virtual segment. */ |
| fprintf(fp, "%d\n", rescontext->QD_SegMemoryMB); |
| /* Output core quota of one virtual segment. */ |
| fprintf(fp, "%lf\n", rescontext->QD_SegCore); |
| /* Output virtual segment list. */ |
| |
| int i; |
| for (i = 0 ; i < rescontext->QD_SegCount ; ++i) |
| { |
| QDSegInfo qdseginfo = rescontext->QD_ResourceList[i]; |
| |
| fprintf(fp, "%d,%s:%d,", |
| i, |
| GET_SEGINFO_HOSTNAME(qdseginfo->QD_SegInfo), |
| qdseginfo->QD_SegInfo->port); |
| /* |
| * Select the first ip address here as that is reported by HAWQ FTS |
| * component. |
| */ |
| AddressString paddr1 = NULL; |
| getSegInfoHostAddrStr(qdseginfo->QD_SegInfo, 0, &paddr1); |
| Assert(paddr1 != NULL); |
| fprintf(fp, "%s,", paddr1->Address); |
| fprintf(fp, "%s\n", qdseginfo->QD_HdfsHostName == NULL ? |
| "NULL" : |
| qdseginfo->QD_HdfsHostName); |
| } |
| } |
| |
| fclose(fp); |
| } |
| |
| int callSyncRPCToRM(const char *sendbuff, |
| int sendbuffsize, |
| uint16_t sendmsgid, |
| uint16_t exprecvmsgid, |
| SelfMaintainBuffer recvsmb, |
| char *errorbuf, |
| int errorbufsize) |
| { |
| return callSyncRPCRemote(master_addr_host, |
| rm_master_port, |
| sendbuff, |
| sendbuffsize, |
| sendmsgid, |
| exprecvmsgid, |
| recvsmb, |
| errorbuf, |
| errorbufsize); |
| } |