blob: 168fe44d661ee98317fa765321532f96dea735d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "dynrm.h"
#include "envswitch.h"
#include "utils/linkedlist.h"
#include "utils/memutilities.h"
#include "utils/network_utils.h"
#include "utils/kvproperties.h"
#include "communication/rmcomm_MessageHandler.h"
#include "communication/rmcomm_QD2RM.h"
#include "communication/rmcomm_QD_RM_Protocol.h"
#include "communication/rmcomm_RMSEG_RM_Protocol.h"
#include "communication/rmcomm_RM2RMSEG.h"
#include "resourceenforcer/resourceenforcer.h"
#include "resourcemanager.h"
/*
* The MAIN ENTRY of request handler.
* The implementation of all request handlers are :
* requesthandler.c : handlers for QD-RM resource negotiation RPC
* requesthandler_ddl.c : handlers for QD-RM DDL manipulations
* requesthandler_RMSEG.c : handlers for QE-RMSEG, RM-RMSEG RPC
*/
#define RETRIEVE_CONNTRACK(res, connid, conntrack) \
if ( (*conntrack)->ConnID == INVALID_CONNID ) { \
res = retrieveConnectionTrack((*conntrack), connid); \
if ( res != FUNC_RETURN_OK ) { \
elog(LOG, "invalid resource context with id %d.", connid); \
goto sendresponse; \
} \
elog(DEBUG5, "HAWQ RM :: Fetched existing connection track " \
"ID=%d, Progress=%d.", \
(*conntrack)->ConnID, \
(*conntrack)->Progress); \
}
/*
* Handle the request of REGISTER CONNECTION.
*/
bool handleRMRequestConnectionReg(void **arg)
{
static char errorbuf[ERRORMESSAGE_SIZE];
int res = FUNC_RETURN_OK;
ConnectionTrack conntrack = (ConnectionTrack )(*arg);
SelfMaintainBufferData responsedata;
RPCResponseRegisterConnectionInRMByStrData response;
initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
/* Parse request. */
strncpy(conntrack->UserID,
SMBUFF_CONTENT(&(conntrack->MessageBuff)),
sizeof(conntrack->UserID)-1);
conntrack->UserID[sizeof(conntrack->UserID)-1] = '\0';
/* Handle the request. */
res = registerConnectionByUserID(conntrack, errorbuf, sizeof(errorbuf));
if ( res == FUNC_RETURN_OK )
{
/* Allocate connection id and track this connection. */
res = useConnectionID(&(conntrack->ConnID));
if ( res == FUNC_RETURN_OK )
{
trackConnectionTrack(conntrack);
elog(LOG, "ConnID %d. Resource manager tracked connection.",
conntrack->ConnID);
response.Result = FUNC_RETURN_OK;
response.ConnID = conntrack->ConnID;
}
else
{
Assert( res == CONNTRACK_CONNID_FULL );
snprintf(errorbuf, sizeof(errorbuf),
"cannot accept more resource context instance");
elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
/* No connection id resource. Return occupation in resource queue. */
returnConnectionToQueue(conntrack, false);
response.Result = res;
response.ConnID = INVALID_CONNID;
}
}
else
{
elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
response.Result = res;
response.ConnID = INVALID_CONNID;
}
/* Build response. */
appendSMBVar(&responsedata, response);
if ( response.Result != FUNC_RETURN_OK )
{
appendSMBStr(&responsedata, errorbuf);
appendSelfMaintainBufferTill64bitAligned(&responsedata);
}
buildResponseIntoConnTrack(conntrack,
SMBUFF_CONTENT(&responsedata),
getSMBContentSize(&responsedata),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_CONNECTION_REG);
destroySelfMaintainBuffer(&responsedata);
elog(DEBUG3, "ConnID %d. One connection register result %d.",
conntrack->ConnID,
response.Result);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}
/**
* Handle the request of REGISTER CONNECTION.
*/
bool handleRMRequestConnectionRegByOID(void **arg)
{
static char errorbuf[ERRORMESSAGE_SIZE];
int res = FUNC_RETURN_OK;
ConnectionTrack conntrack = (ConnectionTrack )(*arg);
bool exist = false;
SelfMaintainBufferData responsedata;
RPCResponseRegisterConnectionInRMByOIDData response;
initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
RPCRequestRegisterConnectionInRMByOID request =
SMBUFF_HEAD(RPCRequestRegisterConnectionInRMByOID,
&(conntrack->MessageBuff));
/* Get user name from oid. */
UserInfo reguser = getUserByUserOID(request->UseridOid, &exist);
if ( !exist )
{
res = RESQUEMGR_NO_USERID;
snprintf(errorbuf, sizeof(errorbuf),
"user oid " INT64_FORMAT "does not exist",
request->UseridOid);
elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
goto exit;
}
else
{
/* Set user id string into connection track. */
strncpy(conntrack->UserID, reguser->Name, sizeof(conntrack->UserID)-1);
}
/* Handle the request. */
res = registerConnectionByUserID(conntrack, errorbuf, sizeof(errorbuf));
if ( res == FUNC_RETURN_OK )
{
/* Allocate connection id and track this connection. */
res = useConnectionID(&(conntrack->ConnID));
if ( res == FUNC_RETURN_OK )
{
trackConnectionTrack(conntrack);
elog(RMLOG, "ConnID %d. Resource manager tracked connection.",
conntrack->ConnID);
response.Result = FUNC_RETURN_OK;
response.ConnID = conntrack->ConnID;
}
else {
Assert( res == CONNTRACK_CONNID_FULL );
snprintf(errorbuf, sizeof(errorbuf),
"cannot accept more resource context instance");
elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
/* No connection id resource. Return occupation in resource queue. */
returnConnectionToQueue(conntrack, false);
response.Result = res;
response.ConnID = INVALID_CONNID;
}
}
else {
elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
response.Result = res;
response.ConnID = INVALID_CONNID;
}
exit:
/* Build message saved in the connection track instance. */
appendSMBVar(&responsedata, response);
if ( response.Result != FUNC_RETURN_OK )
{
appendSMBStr(&responsedata, errorbuf);
appendSelfMaintainBufferTill64bitAligned(&responsedata);
}
buildResponseIntoConnTrack(conntrack,
SMBUFF_CONTENT(&responsedata),
getSMBContentSize(&responsedata),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_CONNECTION_REG_OID);
destroySelfMaintainBuffer(&responsedata);
elog(DEBUG3, "ConnID %d. One connection register result %d (OID).",
conntrack->ConnID,
response.Result);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}
/*
* Handle UNREGISTER request.
*/
bool handleRMRequestConnectionUnReg(void **arg)
{
static char errorbuf[ERRORMESSAGE_SIZE];
int res = FUNC_RETURN_OK;
ConnectionTrack *conntrack = (ConnectionTrack *)arg;
RPCRequestHeadUnregisterConnectionInRM request =
SMBUFF_HEAD(RPCRequestHeadUnregisterConnectionInRM,
&((*conntrack)->MessageBuff));
elog(DEBUG3, "ConnID %d. Try to unregister.", request->ConnID);
res = retrieveConnectionTrack((*conntrack), request->ConnID);
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, sizeof(errorbuf),
"the resource context is invalid or timed out");
elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
goto sendresponse;
}
elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
(*conntrack)->ConnID,
(*conntrack)->Progress);
/* Get connection ID. */
request = SMBUFF_HEAD(RPCRequestHeadUnregisterConnectionInRM,
&((*conntrack)->MessageBuff));
/*
* If this connection is waiting for resource allocated, cancel the request
* from resource queue.
*/
if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_WAIT )
{
cancelResourceAllocRequest((*conntrack), errorbuf, false);
transformConnectionTrackProgress((*conntrack), CONN_PP_REGISTER_DONE);
}
/* If this connection has resource allocated, return the resource. */
else if ( (*conntrack)->Progress == CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
{
returnResourceToResQueMgr((*conntrack));
}
/*
* If this connection has acquire resource not processed yet, we should
* remove that now. In this case, this connection should have registered.
*/
else if ( (*conntrack)->Progress == CONN_PP_REGISTER_DONE )
{
elog(WARNING, "resource manager finds possible not handled resource "
"request from ConnID %d.",
request->ConnID);
removeResourceRequestInConnHavingReqeusts(request->ConnID);
}
else if ( !canTransformConnectionTrackProgress((*conntrack), CONN_PP_ESTABLISHED) )
{
snprintf(errorbuf, sizeof(errorbuf),
"wrong resource context status for unregistering, %d",
(*conntrack)->Progress);
elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
res = REQUESTHANDLER_WRONG_CONNSTAT;
goto sendresponse;
}
returnConnectionToQueue(*conntrack, false);
elog(LOG, "ConnID %d. Connection is unregistered.", (*conntrack)->ConnID);
sendresponse:
{
SelfMaintainBufferData responsedata;
initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
RPCResponseUnregisterConnectionInRMData response;
response.Result = res;
response.Reserved = 0;
appendSMBVar(&responsedata, response);
if ( response.Result != FUNC_RETURN_OK )
{
appendSMBStr(&responsedata, errorbuf);
appendSelfMaintainBufferTill64bitAligned(&responsedata);
}
/* Build message saved in the connection track instance. */
buildResponseIntoConnTrack((*conntrack),
SMBUFF_CONTENT(&responsedata),
getSMBContentSize(&responsedata),
(*conntrack)->MessageMark1,
(*conntrack)->MessageMark2,
RESPONSE_QD_CONNECTION_UNREG);
destroySelfMaintainBuffer(&responsedata);
if ( res == CONNTRACK_NO_CONNID )
{
transformConnectionTrackProgress((*conntrack),
CONN_PP_TRANSFORM_ERROR);
}
(*conntrack)->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, *conntrack);
MEMORY_CONTEXT_SWITCH_BACK
}
return true;
}
/**
* Handle the request of ACQUIRE QUERY RESOURCE
**/
bool handleRMRequestAcquireResource(void **arg)
{
static char errorbuf[ERRORMESSAGE_SIZE];
int res = FUNC_RETURN_OK;
ConnectionTrack *conntrack = (ConnectionTrack *)arg;
uint64_t reqtime = gettime_microsec();
/* If we run in YARN mode, we expect that we should try to get at least one
* available segment, and this requires at least once global resource manager
* cluster report returned.
*/
if ( reqtime - DRMGlobalInstance->ResourceManagerStartTime <=
rm_nocluster_timeout * 1000000LL &&
PRESPOOL->RBClusterReportCounter == 0 )
{
elog(DEBUG3, "resource manager defers the resource request.");
return false;
}
/*
* If resource queue has no concrete capacity set yet, no need to handle
* the request.
*/
if ( PQUEMGR->RootTrack->QueueInfo->ClusterMemoryMB <= 0 )
{
elog(DEBUG3, "resource manager defers the resource request because the "
"resource queues have no valid resource capacities yet.");
return false;
}
RPCRequestHeadAcquireResourceFromRM request =
SMBUFF_HEAD(RPCRequestHeadAcquireResourceFromRM,
&((*conntrack)->MessageBuff));
elog(DEBUG3, "ConnID %d. Acquires query resource. Session id "INT64_FORMAT,
request->ConnID,
request->SessionID);
res = retrieveConnectionTrack((*conntrack), request->ConnID);
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, sizeof(errorbuf),
"the resource context may be timed out");
elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
goto sendresponse;
}
elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
(*conntrack)->ConnID,
(*conntrack)->Progress);
request = SMBUFF_HEAD(RPCRequestHeadAcquireResourceFromRM,
&((*conntrack)->MessageBuff));
if ( (*conntrack)->Progress != CONN_PP_REGISTER_DONE )
{
res = REQUESTHANDLER_WRONG_CONNSTAT;
snprintf(errorbuf, sizeof(errorbuf),
"the resource context status is invalid");
elog(WARNING, "ConnID %d. %s", (*conntrack)->ConnID, errorbuf);
goto sendresponse;
}
/*--------------------------------------------------------------------------
* We firstly check if the cluster has too many unavailable segments, which
* is measured by rm_rejectrequest_nseg_limit. The expected cluster size is
* loaded from counting hosts in $GPHOME/etc/slaves. Resource manager rejects
* query resource requests at once if currently there are too many segments
* unavailable.
*--------------------------------------------------------------------------
*/
Assert(PRESPOOL->SlavesHostCount > 0);
int rejectlimit = ceil(PRESPOOL->SlavesHostCount * rm_rejectrequest_nseg_limit);
int unavailcount = PRESPOOL->SlavesHostCount - PRESPOOL->AvailNodeCount;
if ( unavailcount > rejectlimit )
{
snprintf(errorbuf, sizeof(errorbuf),
"%d of %d segments %s unavailable, exceeds %.1f%% defined in GUC hawq_rm_rejectrequest_nseg_limit. "
"The allocation request is rejected.",
unavailcount,
PRESPOOL->SlavesHostCount,
unavailcount == 1 ? "is" : "are",
rm_rejectrequest_nseg_limit*100.0);
elog(WARNING, "ConnID %d. %s", (*conntrack)->ConnID, errorbuf);
res = RESOURCEPOOL_TOO_MANY_UAVAILABLE_HOST;
goto sendresponse;
}
/* Get scan size. */
request = SMBUFF_HEAD(RPCRequestHeadAcquireResourceFromRM,
&((*conntrack)->MessageBuff));
(*conntrack)->SliceSize = request->SliceSize;
(*conntrack)->IOBytes = request->IOBytes;
(*conntrack)->SegPreferredHostCount = request->NodeCount;
(*conntrack)->MaxSegCountFixed = request->MaxSegCountFix;
(*conntrack)->MinSegCountFixed = request->MinSegCountFix;
(*conntrack)->SessionID = request->SessionID;
(*conntrack)->VSegLimitPerSeg = request->VSegLimitPerSeg;
(*conntrack)->VSegLimit = request->VSegLimit;
(*conntrack)->StatVSegMemoryMB = request->StatVSegMemoryMB;
(*conntrack)->StatNVSeg = request->StatNVSeg;
/* Get preferred nodes. */
buildSegPreferredHostInfo((*conntrack));
elog(RMLOG, "ConnID %d. Session ID " INT64_FORMAT " "
"Scanning "INT64_FORMAT" io bytes "
"by %d slices with %d preferred segments. "
"Expect %d vseg (MIN %d). "
"Each segment has maximum %d vseg. "
"Query has maximum %d vseg. "
"Statement quota %d MB x %d vseg",
(*conntrack)->ConnID,
(*conntrack)->SessionID,
(*conntrack)->IOBytes,
(*conntrack)->SliceSize,
(*conntrack)->SegPreferredHostCount,
(*conntrack)->MaxSegCountFixed,
(*conntrack)->MinSegCountFixed,
(*conntrack)->VSegLimitPerSeg,
(*conntrack)->VSegLimit,
(*conntrack)->StatVSegMemoryMB,
(*conntrack)->StatNVSeg);
if ( (*conntrack)->StatNVSeg > 0 )
{
elog(LOG, "ConnID %d. Statement level resource quota is active. "
"Expect resource ( %d MB ) x %d.",
(*conntrack)->ConnID,
(*conntrack)->StatVSegMemoryMB,
(*conntrack)->StatNVSeg);
}
res = acquireResourceFromResQueMgr((*conntrack), errorbuf, sizeof(errorbuf));
if ( res != FUNC_RETURN_OK )
{
goto sendresponse;
}
(*conntrack)->ResRequestTime = reqtime;
(*conntrack)->LastActTime = (*conntrack)->ResRequestTime;
return true;
sendresponse:
{
/* Send error message. */
RPCResponseAcquireResourceFromRMERRORData errresponse;
errresponse.Result = res;
errresponse.Reserved = 0;
SelfMaintainBufferData responsedata;
initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
appendSMBVar(&responsedata, errresponse);
appendSMBStr(&responsedata, errorbuf);
appendSelfMaintainBufferTill64bitAligned(&responsedata);
buildResponseIntoConnTrack((*conntrack),
SMBUFF_CONTENT(&responsedata),
getSMBContentSize(&responsedata),
(*conntrack)->MessageMark1,
(*conntrack)->MessageMark2,
RESPONSE_QD_ACQUIRE_RESOURCE);
destroySelfMaintainBuffer(&responsedata);
transformConnectionTrackProgress((*conntrack), CONN_PP_TRANSFORM_ERROR);
(*conntrack)->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, *conntrack);
MEMORY_CONTEXT_SWITCH_BACK
}
return true;
}
/**
* Handle RETURN RESOURCE request.
*
*/
bool handleRMRequestReturnResource(void **arg)
{
static char errorbuf[ERRORMESSAGE_SIZE];
int res = FUNC_RETURN_OK;
ConnectionTrack *conntrack = (ConnectionTrack *)arg;
RPCRequestHeadReturnResource request =
SMBUFF_HEAD(RPCRequestHeadReturnResource, &((*conntrack)->MessageBuff));
elog(DEBUG3, "ConnID %d. Returns query resource.", request->ConnID);
res = retrieveConnectionTrack((*conntrack), request->ConnID);
if ( res != FUNC_RETURN_OK )
{
snprintf(errorbuf, sizeof(errorbuf),
"the resource context may be timed out");
elog(WARNING, "ConnID %d. %s", request->ConnID, errorbuf);
goto sendresponse;
}
elog(DEBUG3, "ConnID %d. Fetched existing connection track, progress=%d.",
(*conntrack)->ConnID,
(*conntrack)->Progress);
/* Get connection ID. */
request = SMBUFF_HEAD(RPCRequestHeadReturnResource,
&((*conntrack)->MessageBuff));
if ( (*conntrack)->Progress != CONN_PP_RESOURCE_QUEUE_ALLOC_DONE )
{
res = REQUESTHANDLER_WRONG_CONNSTAT;
snprintf(errorbuf, sizeof(errorbuf),
"the resource context status is invalid");
elog(WARNING, "ConnID %d. %s", (*conntrack)->ConnID, errorbuf);
goto sendresponse;
}
/* Return the resource. */
returnResourceToResQueMgr(*conntrack);
elog(LOG, "ConnID %d. Returned resource.", (*conntrack)->ConnID);
sendresponse:
{
SelfMaintainBufferData responsedata;
initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
RPCResponseHeadReturnResourceData response;
response.Result = res;
response.Reserved = 0;
appendSMBVar(&responsedata, response);
if ( response.Result != FUNC_RETURN_OK )
{
appendSMBStr(&responsedata, errorbuf);
appendSelfMaintainBufferTill64bitAligned(&responsedata);
}
buildResponseIntoConnTrack((*conntrack),
SMBUFF_CONTENT(&responsedata),
getSMBContentSize(&responsedata),
(*conntrack)->MessageMark1,
(*conntrack)->MessageMark2,
RESPONSE_QD_RETURN_RESOURCE );
destroySelfMaintainBuffer(&responsedata);
if ( res == CONNTRACK_NO_CONNID )
{
transformConnectionTrackProgress((*conntrack), CONN_PP_TRANSFORM_ERROR);
}
(*conntrack)->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, *conntrack);
MEMORY_CONTEXT_SWITCH_BACK
}
return true;
}
/*
* Handle I AM ALIVE request.
*/
bool handleRMSEGRequestIMAlive(void **arg)
{
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
elog(RMLOG, "resource manager receives segment heart-beat information.");
SelfMaintainBufferData machinereport;
initializeSelfMaintainBuffer(&machinereport,PCONTEXT);
SegStat segstat = (SegStat)(SMBUFF_CONTENT(&(conntrack->MessageBuff)) +
sizeof(RPCRequestHeadIMAliveData));
generateSegStatReport(segstat, &machinereport);
elog(RMLOG, "resource manager received segment machine information, %s",
SMBUFF_CONTENT(&machinereport));
destroySelfMaintainBuffer(&machinereport);
/* Get hostname and ip address from the connection's sockaddr */
char* fts_client_ip = NULL;
uint32_t fts_client_ip_len = 0;
struct hostent* fts_client_host = NULL;
struct in_addr fts_client_addr;
Assert(conntrack->CommBuffer != NULL);
fts_client_ip = conntrack->CommBuffer->ClientAddrDotStr;
fts_client_ip_len = strlen(fts_client_ip);
inet_aton(fts_client_ip, &fts_client_addr);
fts_client_host = gethostbyaddr(&fts_client_addr, 4, AF_INET);
if (fts_client_host == NULL)
{
elog(WARNING, "failed to reverse DNS lookup for ip %s.", fts_client_ip);
return true;
}
/* Get the received machine id instance start address. */
SegInfo fts_client_seginfo = (SegInfo)(SMBUFF_CONTENT(&(conntrack->MessageBuff)) +
sizeof(RPCRequestHeadIMAliveData) +
offsetof(SegStatData, Info));
/* Build new machine id with inserted ip address and modified host name. */
SelfMaintainBufferData newseginfo;
initializeSelfMaintainBuffer(&newseginfo, PCONTEXT);
/* Copy machine id header. */
prepareSelfMaintainBuffer(&newseginfo, sizeof(SegInfoData), false);
memcpy(SMBUFF_CONTENT(&(newseginfo)),
fts_client_seginfo, sizeof(SegInfoData));
jumpforwardSelfMaintainBuffer(&newseginfo, sizeof(SegInfoData));
/* Put client ip address's offset and attribute */
uint16_t addroffset = sizeof(SegInfoData) +
sizeof(uint32_t) *
(((fts_client_seginfo->HostAddrCount + 1 + 1) >> 1) << 1);
uint16_t addrattr = HOST_ADDRESS_CONTENT_STRING;
SegInfo newseginfoptr = SMBUFF_HEAD(SegInfo, &newseginfo);
newseginfoptr->AddressAttributeOffset = sizeof(SegInfoData);
newseginfoptr->AddressContentOffset = addroffset;
newseginfoptr->HostAddrCount = fts_client_seginfo->HostAddrCount + 1;
uint32_t addContentOffset= addroffset;
appendSMBVar(&newseginfo,addroffset);
appendSMBVar(&newseginfo,addrattr);
elog(RMLOG, "resource manager received IMAlive message, this segment's IP "
"address count: %d",
fts_client_seginfo->HostAddrCount);
/* iterate all the offset/attribute in machineIdData from client */
for (int i = 0; i < fts_client_seginfo->HostAddrCount; i++) {
/* Read old address offset data. */
addroffset = *(uint16_t *)((char *)fts_client_seginfo +
fts_client_seginfo->AddressAttributeOffset +
i*sizeof(uint32_t));
/*
* Adjust address offset by counting the size of one AddressString.
* Adding new address attribute and offset content can also causing more
* space enlarged. We have to count it.
*/
addroffset += __SIZE_ALIGN64(sizeof(uint32_t) +
fts_client_ip_len + 1) +
(addContentOffset - fts_client_seginfo->AddressContentOffset);
appendSMBVar(&newseginfo,addroffset);
/* Read old address attribute data. */
addrattr = *(uint16_t *)((char *)fts_client_seginfo +
fts_client_seginfo->AddressAttributeOffset +
i*sizeof(uint32_t) +
sizeof(uint16_t));
/* No need to adjust the value. */
appendSMBVar(&newseginfo,addrattr);
}
/* We may have to add '\0' pads to make the block of address offset and
* attribute 64-bit aligned. */
appendSelfMaintainBufferTill64bitAligned(&newseginfo);
/* Put the connection's client ip into the first position */
appendSMBVar(&newseginfo,fts_client_ip_len);
appendSMBStr(&newseginfo,fts_client_ip);
appendSelfMaintainBufferTill64bitAligned(&newseginfo);
elog(RMLOG, "resource manager received IMAlive message, "
"this segment's IP address: %s\n",
fts_client_ip);
/* Put other ip addresses' content directly. */
appendSelfMaintainBuffer(&newseginfo,
(char *)fts_client_seginfo +
fts_client_seginfo->AddressContentOffset,
fts_client_seginfo->HostNameOffset -
fts_client_seginfo->AddressContentOffset);
/* fill in hostname */
newseginfoptr = SMBUFF_HEAD(SegInfo, &(newseginfo));
newseginfoptr->HostNameOffset = getSMBContentSize(&newseginfo);
appendSMBStr(&newseginfo,fts_client_host->h_name);
appendSelfMaintainBufferTill64bitAligned(&newseginfo);
newseginfoptr = SMBUFF_HEAD(SegInfo, &(newseginfo));
newseginfoptr->HostNameLen = strlen(fts_client_host->h_name);
appendSelfMaintainBufferTill64bitAligned(&newseginfo);
/* fill in failed temporary directory string */
if (fts_client_seginfo->FailedTmpDirLen != 0)
{
newseginfoptr->FailedTmpDirOffset = getSMBContentSize(&newseginfo);
newseginfoptr->FailedTmpDirLen = strlen(GET_SEGINFO_FAILEDTMPDIR(fts_client_seginfo));
appendSMBStr(&newseginfo, GET_SEGINFO_FAILEDTMPDIR(fts_client_seginfo));
elog(RMLOG, "resource manager received IMAlive message, "
"failed temporary directory:%s",
GET_SEGINFO_FAILEDTMPDIR(fts_client_seginfo));
appendSelfMaintainBufferTill64bitAligned(&newseginfo);
}
else
{
newseginfoptr->FailedTmpDirOffset = 0;
newseginfoptr->FailedTmpDirLen = 0;
}
newseginfoptr->Size = getSMBContentSize(&newseginfo);
/* reported by segment, set GRM host/rack as NULL */
newseginfoptr->GRMHostNameLen = 0;
newseginfoptr->GRMHostNameOffset = 0;
newseginfoptr->GRMRackNameLen = 0;
newseginfoptr->GRMRackNameOffset = 0;
elog(RMLOG, "resource manager received IMAlive message, "
"this segment's hostname: %s\n",
fts_client_host->h_name);
/* build segment status information instance and add to resource pool */
SegStat newsegstat = (SegStat)
rm_palloc0(PCONTEXT,
offsetof(SegStatData, Info) +
getSMBContentSize(&newseginfo));
/* Copy old segment status information. */
memcpy((char *)newsegstat, segstat, offsetof(SegStatData, Info));
/* Copy new segment info information. */
memcpy((char *)newsegstat + offsetof(SegStatData, Info),
SMBUFF_CONTENT(&newseginfo),
getSMBContentSize(&newseginfo));
destroySelfMaintainBuffer(&newseginfo);
newsegstat->Info.ID = SEGSTAT_ID_INVALID;
RPCRequestHeadIMAlive header = SMBUFF_HEAD(RPCRequestHeadIMAlive,
&(conntrack->MessageBuff));
newsegstat->FailedTmpDirNum = header->TmpDirBrokenCount;
newsegstat->RMStartTimestamp = header->RMStartTimestamp;
newsegstat->StatusDesc = 0;
bool capstatchanged = false;
if ( addHAWQSegWithSegStat(newsegstat, &capstatchanged) != FUNC_RETURN_OK )
{
/* Should be a duplicate host. */
rm_pfree(PCONTEXT, newsegstat);
}
if ( capstatchanged )
{
/* Refresh resource queue capacities. */
refreshResourceQueueCapacity(false);
refreshActualMinGRMContainerPerSeg();
/* Recalculate all memory/core ratio instances' limits. */
refreshMemoryCoreRatioLimits();
/* Refresh memory/core ratio level water mark. */
refreshMemoryCoreRatioWaterMark();
}
/* Send the response. */
RPCResponseIMAliveData response;
response.Result = FUNC_RETURN_OK;
response.Reserved = 0;
buildResponseIntoConnTrack(conntrack,
(char *)&response,
sizeof(response),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_RM_IMALIVE);
elog(DEBUG3, "resource manager accepted segment machine.");
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}
bool handleRMRequestAcquireResourceQuota(void **arg)
{
static char errorbuf[ERRORMESSAGE_SIZE];
int res = FUNC_RETURN_OK;
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
bool exist = false;
uint64_t reqtime = gettime_microsec();
/* If we run in YARN mode, we expect that we should try to get at least one
* available segment, and this requires at least once global resource manager
* cluster report returned.
*/
if ( reqtime - DRMGlobalInstance->ResourceManagerStartTime <=
rm_nocluster_timeout * 1000000LL &&
PRESPOOL->RBClusterReportCounter == 0 )
{
elog(DEBUG3, "resource manager defers the resource request.");
return false;
}
/*
* If resource queue has no concrete capacity set yet, no need to handle
* the request.
*/
if ( PQUEMGR->RootTrack->QueueInfo->ClusterMemoryMB <= 0 )
{
elog(DEBUG3, "resource manager defers the resource request because the "
"resource queues have no valid resource capacities yet.");
return false;
}
Assert(PRESPOOL->SlavesHostCount > 0);
int rejectlimit = ceil(PRESPOOL->SlavesHostCount * rm_rejectrequest_nseg_limit);
int unavailcount = PRESPOOL->SlavesHostCount - PRESPOOL->AvailNodeCount;
if ( unavailcount > rejectlimit )
{
snprintf(errorbuf, sizeof(errorbuf),
"%d of %d segments %s unavailable, exceeds %.1f%% defined in "
"GUC hawq_rm_rejectrequest_nseg_limit. The resource quota "
"request is rejected.",
unavailcount,
PRESPOOL->SlavesHostCount,
unavailcount == 1 ? "is" : "are",
rm_rejectrequest_nseg_limit*100.0);
elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
res = RESOURCEPOOL_TOO_MANY_UAVAILABLE_HOST;
goto errorexit;
}
RPCRequestHeadAcquireResourceQuotaFromRMByOID request =
SMBUFF_HEAD(RPCRequestHeadAcquireResourceQuotaFromRMByOID,
&(conntrack->MessageBuff));
/* Get user name from oid. */
UserInfo reguser = getUserByUserOID(request->UseridOid, &exist);
if ( !exist )
{
res = RESQUEMGR_NO_USERID;
snprintf(errorbuf, sizeof(errorbuf),
"user oid " INT64_FORMAT "does not exist",
request->UseridOid);
elog(WARNING, "ConnID %d. %s", conntrack->ConnID, errorbuf);
goto errorexit;
}
else
{
/* Set user id string into connection track. */
strncpy(conntrack->UserID, reguser->Name, sizeof(conntrack->UserID)-1);
}
conntrack->MaxSegCountFixed = request->MaxSegCountFix;
conntrack->MinSegCountFixed = request->MinSegCountFix;
conntrack->VSegLimitPerSeg = request->VSegLimitPerSeg;
conntrack->VSegLimit = request->VSegLimit;
conntrack->StatVSegMemoryMB = request->StatVSegMemoryMB;
conntrack->StatNVSeg = request->StatNVSeg;
elog(RMLOG, "ConnID %d. User "INT64_FORMAT" acquires query resource quota. "
"Expect %d vseg (MIN %d). "
"Each segment has maximum %d vseg. "
"Query has maximum %d vseg. "
"Statement quota %d MB x %d vseg",
conntrack->ConnID,
request->UseridOid,
request->MaxSegCountFix,
request->MinSegCountFix,
request->VSegLimitPerSeg,
request->VSegLimit,
request->StatVSegMemoryMB,
request->StatNVSeg);
if ( conntrack->StatNVSeg > 0 )
{
elog(LOG, "ConnID %d. Statement level resource quota is active. "
"Expect resource ( %d MB ) x %d.",
conntrack->ConnID,
conntrack->StatVSegMemoryMB,
conntrack->StatNVSeg);
}
res = acquireResourceQuotaFromResQueMgr(conntrack, errorbuf, sizeof(errorbuf));
if ( res == FUNC_RETURN_OK )
{
RPCResponseHeadAcquireResourceQuotaFromRMByOIDData response;
DynResourceQueueTrack queuetrack = getQueueTrackByQueueOID(reguser->QueueOID);
if ( queuetrack != NULL )
{
memcpy(response.QueueName,
queuetrack->QueueInfo->Name,
sizeof(response.QueueName));
}
else {
response.QueueName[0]='\0';
}
response.Reserved1 = 0;
response.Result = res;
response.SegCore = conntrack->SegCore;
response.SegMemoryMB = conntrack->SegMemoryMB;
response.SegNum = conntrack->SegNum;
response.SegNumMin = conntrack->SegNumMin;
response.Reserved2 = 0;
buildResponseIntoConnTrack( conntrack,
(char *)&response,
sizeof(response),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}
else
{
elog(WARNING, "%s", errorbuf);
}
errorexit:
{
SelfMaintainBufferData responsedata;
initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
buildResponseIntoConnTrack(conntrack,
SMBUFF_CONTENT(&responsedata),
getSMBContentSize(&responsedata),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_ACQUIRE_RESOURCE_QUOTA);
destroySelfMaintainBuffer(&responsedata);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
}
return true;
}
bool handleRMRequestRefreshResource(void **arg)
{
int res = FUNC_RETURN_OK;
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
ConnectionTrack oldct = NULL;
uint64_t curmsec = gettime_microsec();
RPCRequestHeadRefreshResourceHeartBeat request =
SMBUFF_HEAD(RPCRequestHeadRefreshResourceHeartBeat,
&(conntrack->MessageBuff));
uint32_t *connids = (uint32_t *)
(SMBUFF_CONTENT(&(conntrack->MessageBuff)) +
sizeof(RPCRequestHeadRefreshResourceHeartBeatData));
elog(DEBUG3, "resource manager refreshes %d ConnIDs.", request->ConnIDCount);
for ( int i = 0 ; i < request->ConnIDCount ; ++i )
{
/* Find connection track identified by ConnID */
res = getInUseConnectionTrack(connids[i], &oldct);
if ( res == FUNC_RETURN_OK )
{
oldct->LastActTime = curmsec;
elog(RMLOG, "refreshed resource context connection id %d", connids[i]);
}
else
{
elog(WARNING, "cannot find resource context connection id %d for "
"resource refreshing.",
connids[i]);
}
}
/* Temporarily, we ignore the wrong conn id inputs. */
RPCResponseRefreshResourceHeartBeatData response;
response.Result = FUNC_RETURN_OK;
response.Reserved = 0;
buildResponseIntoConnTrack( conntrack,
(char *)&response,
sizeof(response),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_REFRESH_RESOURCE);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}
bool handleRMRequestSegmentIsDown(void **arg)
{
int res = FUNC_RETURN_OK;
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
/* Get host name that is down. */
char *hostname = SMBUFF_CONTENT(&(conntrack->MessageBuff));
int hostnamelen = 0;
int32_t segid = SEGSTAT_ID_INVALID;
while( (hostname - SMBUFF_CONTENT(&(conntrack->MessageBuff)) <
getSMBContentSize(&(conntrack->MessageBuff))) &&
*hostname != '\0' )
{
hostnamelen = strlen(hostname);
res = getSegIDByHostName(hostname, hostnamelen, &segid);
if ( res == FUNC_RETURN_OK )
{
/* Get resource info of the expected host. */
SegResource segres = getSegResource(segid);
Assert( segres != NULL );
if ( !IS_SEGSTAT_FTSAVAILABLE(segres->Stat) )
{
elog(WARNING, "resource manager does not probe the status of "
"host %s because it is down already.",
hostname);
}
else if ( segres->RUAlivePending )
{
elog(LOG, "resource manager does not probe the status of host %s "
"because it is in RUAlive pending status already.",
hostname);
}
else
{
elog(RMLOG, "resource manager probes the status of host %s by "
"sending RUAlive request.",
hostname);
res = sendRUAlive(hostname);
/* IN THIS CASE, the segment is considered as down. */
if (res != FUNC_RETURN_OK)
{
/*----------------------------------------------------------
* This call makes resource manager able to adjust queue and
* mem/core trackers' capacity.
*----------------------------------------------------------
*/
setSegResHAWQAvailability(segres,
RESOURCE_SEG_STATUS_UNAVAILABLE);
/* Make resource pool remove unused containers */
returnAllGRMResourceFromSegment(segres);
/* Set the host down in gp_segment_configuration table */
segres->Stat->StatusDesc |= SEG_STATUS_FAILED_PROBING_SEGMENT;
if (Gp_role != GP_ROLE_UTILITY)
{
SimpStringPtr description = build_segment_status_description(segres->Stat);
update_segment_status(segres->Stat->Info.ID + REGISTRATION_ORDER_OFFSET,
SEGMENT_STATUS_DOWN,
(description->Len > 0)?description->Str:"");
add_segment_history_row(segres->Stat->Info.ID + REGISTRATION_ORDER_OFFSET,
hostname,
description->Str);
freeSimpleStringContent(description);
rm_pfree(PCONTEXT, description);
}
/* Set the host down. */
elog(LOG, "resource manager sets host %s from up to down "
"due to not reaching host.", hostname);
}
else
{
elog(RMLOG, "resource manager triggered RUAlive request to "
"host %s.",
hostname);
}
}
}
else {
elog(WARNING, "resource manager cannot find host %s to check status, "
"skip it.",
hostname);
}
hostname = hostname + strlen(hostname) + 1; /* Try next */
}
refreshResourceQueueCapacity(false);
refreshActualMinGRMContainerPerSeg();
RPCResponseSegmentIsDownData response;
response.Result = res;
response.Reserved = 0;
buildResponseIntoConnTrack( conntrack,
(char *)&response,
sizeof(response),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_SEGMENT_ISDOWN);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}
bool handleRMRequestDumpResQueueStatus(void **arg)
{
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
RPCResponseResQueueStatus response = NULL;
int responseLen = sizeof(RPCResponseResQueueStatusData)
+ sizeof(ResQueueStatusData) * (list_length(PQUEMGR->Queues) - 1);
response = (RPCResponseResQueueStatus)palloc(responseLen);
response->Result = FUNC_RETURN_OK;
response->queuenum = list_length(PQUEMGR->Queues);
int i = 0;
ListCell *cell = NULL;
foreach(cell, PQUEMGR->Queues)
{
DynResourceQueueTrack quetrack = lfirst(cell);
sprintf(response->queuedata[i].name, "%s", quetrack->QueueInfo->Name);
response->queuedata[i].segmem = quetrack->QueueInfo->SegResourceQuotaMemoryMB;
response->queuedata[i].segcore = quetrack->QueueInfo->SegResourceQuotaVCore;
response->queuedata[i].segsize = quetrack->ClusterSegNumber;
response->queuedata[i].segsizemax = quetrack->ClusterSegNumberMax;
response->queuedata[i].inusemem = quetrack->TotalUsed.MemoryMB;
response->queuedata[i].inusecore = quetrack->TotalUsed.Core;
response->queuedata[i].holders = quetrack->NumOfRunningQueries;
response->queuedata[i].waiters = quetrack->QueryResRequests.NodeCount;
/* Generate if resource queue paused dispatching resource. */
if ( quetrack->troubledByFragment )
{
response->queuedata[i].pausedispatch = 'R';
}
else if ( quetrack->pauseAllocation )
{
response->queuedata[i].pausedispatch = 'T';
}
else
{
response->queuedata[i].pausedispatch = 'F';
}
response->queuedata[i].reserved[0] = '\0';
response->queuedata[i].reserved[1] = '\0';
response->queuedata[i].reserved[2] = '\0';
response->queuedata[i].reserved[3] = '\0';
response->queuedata[i].reserved[4] = '\0';
response->queuedata[i].reserved[5] = '\0';
response->queuedata[i].reserved[6] = '\0';
i++;
}
buildResponseIntoConnTrack(conntrack,
(char *)response,
responseLen,
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_DUMP_RESQUEUE_STATUS);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}
bool handleRMRequestDumpStatus(void **arg)
{
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
RPCResponseDumpStatusData response;
RPCRequestDumpStatus request = SMBUFF_HEAD(RPCRequestDumpStatus,
&(conntrack->MessageBuff));
elog(DEBUG3, "resource manager dump type %u data to file %s",
request->type,
request->dump_file);
response.Result = FUNC_RETURN_OK;
response.Reserved = 0;
switch (request->type)
{
case 1:
dumpConnectionTracks(request->dump_file);
break;
case 2:
dumpResourceQueueStatus(request->dump_file);
break;
case 3:
dumpResourcePoolHosts(request->dump_file);
break;
default:
Assert(false);
break;
}
SelfMaintainBufferData responsedata;
initializeSelfMaintainBuffer(&responsedata, PCONTEXT);
appendSMBVar(&responsedata, response);
buildResponseIntoConnTrack(conntrack,
SMBUFF_CONTENT(&responsedata),
getSMBContentSize(&responsedata),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_DUMP_STATUS);
destroySelfMaintainBuffer(&responsedata);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}
bool handleRMRequestDummy(void **arg)
{
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
RPCResponseDummyData response;
response.Result = FUNC_RETURN_OK;
response.Reserved = 0;
buildResponseIntoConnTrack(conntrack,
(char *)&response,
sizeof(response),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_DUMMY);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}
bool handleRMRequestQuotaControl(void **arg)
{
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
RPCRequestQuotaControl request =
SMBUFF_HEAD(RPCRequestQuotaControl, &(conntrack->MessageBuff));
Assert(request->Phase >= 0 && request->Phase < QUOTA_PHASE_COUNT);
bool oldvalue = PRESPOOL->pausePhase[request->Phase];
PRESPOOL->pausePhase[request->Phase] = request->Pause;
if ( oldvalue != PRESPOOL->pausePhase[request->Phase] )
{
elog(LOG, "resource manager resource quota life cycle pause setting %d "
"changes to %s",
request->Phase,
PRESPOOL->pausePhase[request->Phase]?"paused":"resumed");
}
RPCResponseQuotaControlData response;
response.Result = FUNC_RETURN_OK;
response.Reserved = 0;
buildResponseIntoConnTrack(conntrack,
(char *)&response,
sizeof(response),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QD_QUOTA_CONTROL);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
return true;
}