blob: 97d7334f2e6d65d42d17de004a78d567b9cc6b05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "envswitch.h"
#include "postmaster/fork_process.h"
#include "resourcebroker/resourcebroker_LIBYARN.h"
#include "resourcebroker/resourcebroker_RM_RB_Protocol.h"
#include "resourcemanager.h"
#include "resourcepool.h"
#include "nodes/pg_list.h"
#include "utils/kvproperties.h"
int handleRB2RM_ClusterReport(void);
int handleRB2RM_AllocatedResource(void);
int handleRB2RM_ReturnedResource(void);
int handleRB2RM_ContainerReport(void);
void buildToReturnNotTrackedGRMContainers(RB_GRMContainerStat ctnstats, int size);
void freeResourceBundle(void *resbundle);
/*
*------------------------------------------------------------------------------
* Global variables.
*------------------------------------------------------------------------------
*/
int ResBrokerRequestPipe[2];/* Pipe to send request from RM to RB.*/
int ResBrokerNotifyPipe[2]; /* Pipe to receive response from RB. */
volatile bool ResBrokerKeepRun; /* Tell RB process to run or quit. */
volatile bool ResBrokerExits; /* RB process exit signal */
volatile pid_t ResBrokerPID; /* Running RB process PID. */
pid_t ResBrokerParentPID; /* Parent PID. */
volatile bool PipeReceivePending; /* If pipe is waiting for response. */
/*
*------------------------------------------------------------------------------
* RB YARN implementation.
*------------------------------------------------------------------------------
*/
void RB_LIBYARN_createEntries(RB_FunctionEntries entries)
{
entries->start = RB_LIBYARN_start;
entries->stop = RB_LIBYARN_stop;
entries->acquireResource = RB_LIBYARN_acquireResource;
entries->returnResource = RB_LIBYARN_returnResource;
entries->getContainerReport = RB_LIBYARN_getContainerReport;
entries->handleNotification = RB_LIBYARN_handleNotification;
entries->getClusterReport = RB_LIBYARN_getClusterReport;
entries->handleSigSIGCHLD = RB_LIBYARN_handleSignalSIGCHLD;
entries->handleError = RB_LIBYARN_handleError;
ResBrokerPID = -1;
ResBrokerExits = false;
ResBrokerRequestPipe[0] = -1;
ResBrokerRequestPipe[1] = -1;
ResBrokerNotifyPipe[0] = -1;
ResBrokerNotifyPipe[1] = -1;
}
int RB_LIBYARN_start(bool isforked)
{
int res = FUNC_RETURN_OK;
/* If SIGCHLD is received, ResBrokerExits is set true, which means resource
* broker process might quits, so before starting new one, we have to do some
* clean up. */
if ( ResBrokerExits )
{
/* Check if current resource broker process exits. */
if ( ResBrokerPID > 0 )
{
int status = 0;
if( ResBrokerPID != waitpid(ResBrokerPID, &status, WNOHANG) )
{
return FUNC_RETURN_OK;
}
if (WIFEXITED(status) && WEXITSTATUS(status))
{
elog(FATAL, "YARN mode resource broker failed to start resource "
"broker process. error=%d",
WEXITSTATUS(status));
}
}
/* Resource broker quits, clean up the pipes and status. */
ResBrokerPID = -1;
ResBrokerExits = false;
/* Check and clear resource broker pending status. */
clearPendingResourceRequestInRootQueue();
PipeReceivePending = false;
}
/* Start the resource broker process. */
if ( ResBrokerPID == -1 )
{
/* Close possible open pipes. */
if ( ResBrokerRequestPipe[0] != -1 )
{
close(ResBrokerRequestPipe[0]);
close(ResBrokerRequestPipe[1]);
}
if ( ResBrokerNotifyPipe[0] != -1 )
{
close(ResBrokerNotifyPipe[0]);
close(ResBrokerNotifyPipe[1]);
}
/* Create pipe for communication between RM and RB. */
ResBrokerRequestPipe[0] = -1;
ResBrokerRequestPipe[1] = -1;
ResBrokerNotifyPipe[0] = -1;
ResBrokerNotifyPipe[1] = -1;
res = pgpipe(ResBrokerRequestPipe);
if ( res < 0 )
{
elog(FATAL, "YARN mode resource broker failed to create pipe between "
"resource manager and resource broker. errno %d",
errno);
}
res = pgpipe(ResBrokerNotifyPipe);
if ( res < 0 )
{
elog(FATAL, "YARN mode resource broker failed to create pipe between "
"resource manager and resource broker. errno %d",
errno);
}
ResBrokerExits = false;
ResBrokerParentPID = getpid();
ResBrokerPID = fork_process();
switch(ResBrokerPID)
{
case 0:
exit(ResBrokerMain());
default:
PipeReceivePending = false;
elog(LOG, "YARN mode resource broker created resource broker process "
"PID=%d.",
ResBrokerPID);
}
return FUNC_RETURN_OK;
}
return FUNC_RETURN_OK;
}
/*
* Resource broker process notifies resource manager that it quited.
*/
void RB_LIBYARN_handleSignalSIGCHLD(void)
{
/* Mark the resource broker exited. */
ResBrokerExits = true;
}
int RB_LIBYARN_stop(void)
{
int waitres = 0;
if ( ResBrokerPID > 0 )
{
/* Send signal to resource broker process. */
kill(ResBrokerPID, SIGINT);
elog(LOG, "YARN mode resource broker sent SIGINT signal to stop resource "
"broker process.");
retry:
/* Wait the exit of resource broker process. */
waitres = waitpid(ResBrokerPID, NULL, 0);
if ( waitres == -1 )
{
if ( errno == EINTR )
{
/* Wait is interrupted by signal. */
goto retry;
}
else
{
elog(WARNING, "Fail to wait for the exit of resource broker "
"process %d. errno %d.",
ResBrokerPID,
errno);
return FUNC_RETURN_FAIL;
}
}
}
ResBrokerPID = -1;
elog(LOG, "YARN mode resource broker process exited.");
return FUNC_RETURN_OK;
}
/*
* Return cluster report including all node status. The response should be
* handled asynchronously.
*/
int RB_LIBYARN_getClusterReport(const char *quename,
List **machines,
double *maxcapacity)
{
int res = FUNC_RETURN_OK;
uint32_t MessageID = RM2RB_GET_CLUSTERREPORT;
int piperes = 0;
if ( PipeReceivePending )
{
elog(DEBUG5, "YARN resource broker skip sending cluster report due to "
"not getting response of last request.");
return FUNC_RETURN_OK;
}
/* Write request to pipe */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
RPCRequestRBGetClusterReportHeadData request;
request.QueueNameLen = __SIZE_ALIGN64(strlen(quename) + 1);
request.Reserved = 0;
appendSMBVar(&sendBuffer, request);
appendSMBStr(&sendBuffer, quename);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
piperes = pipewrite(ResBrokerRequestPipe[1],
sendBuffer.Buffer,
sendBuffer.Cursor + 1);
if ( piperes != sendBuffer.Cursor + 1 )
{
elog(WARNING, "YARN mode resource broker failed to generate cluster "
"report request to resource broker process through pipe. "
"Wrote length %d, expected length %d, errno %d",
piperes,
sendBuffer.Cursor + 1,
errno);
res = RESBROK_PIPE_ERROR;
}
destroySelfMaintainBuffer(&sendBuffer);
elog(LOG, "YARN mode resource broker generated cluster report request to "
"resource broker process.");
PipeReceivePending = res == FUNC_RETURN_OK;
return res;
}
/*
* Acquire resource from hosts in LIBYARN mode.
*
* This function use round-robin sequence to select available hosts in HAWQ RM
* resource pool and choose suitable host to allocate containers.
*/
int RB_LIBYARN_acquireResource(uint32_t memorymb, uint32_t core, List *preferred)
{
int res = FUNC_RETURN_OK;
uint32_t MessageID = RM2RB_ALLOC_RESOURCE;
ListCell *cell = NULL;
if ( PipeReceivePending )
{
return RESBROK_PIPE_BUSY;
}
elog(LOG, "Allocating container request is triggered.");
/* Write request to pipe */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
RPCRequestRBAllocateResourceContainersData request;
request.ContainerCount = core;
request.MemoryMB = memorymb / core;
request.Core = 1;
request.PreferredSize = list_length(preferred);
request.MsgLength = 0;
foreach(cell, preferred)
{
PAIR pair = (PAIR)lfirst(cell);
SegResource segres = (SegResource)(pair->Key);
char *hostname = GET_SEGINFO_GRMHOSTNAME(&(segres->Stat->Info));
char *rackname = GET_SEGINFO_GRMRACKNAME(&(segres->Stat->Info));
uint32_t preferredLen = sizeof(uint32_t)*2 +
__SIZE_ALIGN64((strlen(hostname) + 1 + strlen(rackname) + 1));
request.MsgLength += preferredLen;
}
appendSMBVar(&sendBuffer, request);
/*
* parse preferred list and
* append (hostname, rackname, requested container number) into the message
*/
foreach(cell, preferred)
{
PAIR pair = (PAIR)lfirst(cell);
SegResource segres = (SegResource)(pair->Key);
ResourceBundle resource = (ResourceBundle)(pair->Value);
int16_t num_containers = resource->Core;
int16_t flag = 0; /* not used yet */
char *hostname = GET_SEGINFO_GRMHOSTNAME(&(segres->Stat->Info));
char *rackname = GET_SEGINFO_GRMRACKNAME(&(segres->Stat->Info));
uint32_t preferredLen = sizeof(preferredLen) + sizeof(num_containers) + sizeof(flag) +
__SIZE_ALIGN64((strlen(hostname) + 1 + strlen(rackname) + 1));
appendSMBVar(&sendBuffer, preferredLen);
appendSMBVar(&sendBuffer, num_containers);
appendSMBVar(&sendBuffer, flag);
appendSMBStr(&sendBuffer, hostname);
appendSMBStr(&sendBuffer, rackname);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
elog(LOG, "YARN mode resource broker build a preferred request."
"host:%s, rack:%s, container number:%d",
hostname, rackname, num_containers);
}
int piperes = pipewrite(ResBrokerRequestPipe[1],
sendBuffer.Buffer,
sendBuffer.Cursor + 1);
if ( piperes != sendBuffer.Cursor + 1 )
{
elog(WARNING, "YARN mode resource broker failed to write resource "
"allocation request to resource broker process. errno %d.",
errno);
res = RESBROK_PIPE_ERROR;
}
elog(DEBUG3, "LIBYARN mode resource broker wrote %d bytes out.", sendBuffer.Cursor+1);
destroySelfMaintainBuffer(&sendBuffer);
elog(LOG, "YARN mode resource broker wrote resource allocation request to "
"resource broker process.");
PipeReceivePending = res == FUNC_RETURN_OK;
return res;
}
/*
* Return resource to YARN.
*/
int RB_LIBYARN_returnResource(List **ctnl)
{
int res = FUNC_RETURN_OK;
uint32_t MessageID = RM2RB_RETURN_RESOURCE;
if ( *ctnl == NULL )
{
return FUNC_RETURN_OK;
}
if ( PipeReceivePending )
{
return FUNC_RETURN_OK;
}
/* Write request to pipe */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
RPCRequestRBReturnResourceContainersHeadData request;
request.ContainerCount = list_length(*ctnl);
request.Reserved = 0;
appendSMBVar(&sendBuffer, request);
while( (*ctnl) != NULL )
{
GRMContainer ctn = (GRMContainer)lfirst(list_head(*ctnl));
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
(*ctnl) = list_delete_first(*ctnl);
MEMORY_CONTEXT_SWITCH_BACK
appendSMBVar(&sendBuffer, ctn->ID);
elog(LOG, "YARN mode resource broker returned resource container "INT64_FORMAT
"(%d MB, %d CORE) to host %s",
ctn->ID,
ctn->MemoryMB,
ctn->Core,
ctn->HostName == NULL ? "NULL" : ctn->HostName);
if ( ctn->CalcDecPending )
{
minusResourceBundleData(&(ctn->Resource->DecPending), ctn->MemoryMB, ctn->Core);
Assert( ctn->Resource->DecPending.Core >= 0 );
Assert( ctn->Resource->DecPending.MemoryMB >= 0 );
}
/* Destroy resource container. */
freeGRMContainer(ctn);
PRESPOOL->RetPendingContainerCount--;
}
int piperes = pipewrite(ResBrokerRequestPipe[1],
sendBuffer.Buffer,
sendBuffer.Cursor + 1);
if ( piperes != sendBuffer.Cursor + 1 )
{
elog(WARNING, "YARN mode resource broker failed to write resource return "
"request to resource broker process. errno %d.",
errno);
res = RESBROK_PIPE_ERROR;
}
destroySelfMaintainBuffer(&sendBuffer);
elog(LOG, "YARN mode resource broker wrote resource return request to "
"resource broker process.");
PipeReceivePending = res == FUNC_RETURN_OK;
return res;
}
int RB_LIBYARN_getContainerReport(List **ctnstatl)
{
int res = FUNC_RETURN_OK;
uint32_t MessageID = RM2RB_GET_CONTAINERREPORT;
if ( PipeReceivePending )
{
return FUNC_RETURN_OK;
}
elog(LOG, "Getting container report request is triggered.");
/* Write request to pipe. */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
int piperes = pipewrite(ResBrokerRequestPipe[1],
sendBuffer.Buffer,
sendBuffer.Cursor + 1);
if ( piperes != sendBuffer.Cursor + 1 )
{
elog(WARNING, "YARN mode resource broker failed to write get container "
"status request to resource broker process. errno %d.",
errno);
res = RESBROK_PIPE_ERROR;
}
destroySelfMaintainBuffer(&sendBuffer);
elog(RMLOG, "YARN mode resource broker wrote get container status request to "
"resource broker process.");
PipeReceivePending = res == FUNC_RETURN_OK;
return res;
}
int RB_LIBYARN_handleNotification(void)
{
/* Check if the pipe has notifications. */
fd_set rfds;
struct timeval timeout;
int res;
int fd = ResBrokerNotifyPipe[0];
uint32_t messagehead[2];
uint32_t messageid;
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
timeout.tv_sec = 0;
timeout.tv_usec = 0;
res = select(fd + 1, &rfds, NULL, NULL, &timeout);
/* Something passes to RM process. */
if ( res > 0 && FD_ISSET(fd, &rfds) )
{
/* Read message id. */
res = readPipe(fd, messagehead, sizeof(messagehead));
if ( res != sizeof(messagehead) )
{
return RESBROK_PIPE_ERROR;
}
messageid = messagehead[0];
switch( messageid )
{
case RB2RM_CLUSTERREPORT:
res = handleRB2RM_ClusterReport();
break;
case RB2RM_ALLOCATED_RESOURCE:
res = handleRB2RM_AllocatedResource();
break;
case RB2RM_RETURNED_RESOURCE:
res = handleRB2RM_ReturnedResource();
break;
case RB2RM_CONTAINERREPORT:
res = handleRB2RM_ContainerReport();
break;
default:
res = RESBROK_PIPE_ERROR;
}
PipeReceivePending = false;
elog(RMLOG, "Finish processing message %d", messageid);
if ( res != FUNC_RETURN_OK )
{
elog(WARNING, "Resource broker failed to receive correct message "
"from YARN mode resource broker process. message id %d, "
"result of handling the message %d.",
messageid,
res);
return res;
}
}
else if ( res < 0 && (errno != EAGAIN && errno != EINTR) )
{
elog(WARNING, "select system call has error raised. errno %d", errno);
return RESBROK_PIPE_ERROR;
}
return FUNC_RETURN_OK;
}
int handleRB2RM_ClusterReport(void)
{
int res = FUNC_RETURN_OK;
SegStat segstat = NULL;
uint32_t segsize;
int fd = ResBrokerNotifyPipe[0];
int piperes = 0;
List *segstats = NULL;
List *allsegres = NULL;
ListCell *cell = NULL;
PRESPOOL->RBClusterReportCounter++;
/* Read whole result head. */
RPCResponseRBGetClusterReportHeadData response;
piperes = readPipe(fd, (char *)&response, sizeof(response));
if ( piperes != sizeof(response) )
{
elog(WARNING, "YARN mode resource broker failed to read cluster report "
"response (head) from pipe. "
"Read length %d, expected length %lu.",
piperes,
sizeof(response));
return RESBROK_PIPE_ERROR;
}
if ( response.Result != FUNC_RETURN_OK )
{
elog(WARNING, "YARN mode resource broker received error information of "
"cluster report response from pipe. error %d",
response.Result);
/*
* Resource broker has something wrong. Can not receive concrete node
* information.
*/
return response.Result;
}
elog(LOG, "YARN mode resource broker got cluster report having %d host(s), "
"queue capacity %lf (MAX %lf) current capacity %lf.",
response.MachineCount,
response.QueueCapacity,
response.QueueMaxCapacity,
response.QueueCurCapacity);
/* Read machines. */
for( int i = 0 ; i < response.MachineCount ; ++i )
{
/* Machine instance size. */
piperes = readPipe(fd, &segsize, sizeof(segsize));
if ( piperes != sizeof(segsize) )
{
elog(WARNING, "YARN mode resource broker failed to read cluster "
"report response message (machine info size) from pipe. "
"Read length %d, expected length %lu.",
piperes,
sizeof(segsize));
res = RESBROK_PIPE_ERROR;
break;
}
/* Skip reserved 32bit tag. */
uint32_t reserved = 0;
piperes = readPipe(fd, &reserved, sizeof(reserved));
if ( piperes != sizeof(reserved) )
{
elog(WARNING, "YARN mode resource broker failed to read cluster "
"report response message (machine info reserved) from "
"pipe. Read length %d, expected length %lu.",
piperes,
sizeof(reserved));
res = RESBROK_PIPE_ERROR;
break;
}
/* Read segment stat. */
segstat = rm_palloc0(PCONTEXT, segsize);
piperes = readPipe(fd, segstat, segsize);
if ( piperes != segsize )
{
elog(WARNING, "YARN mode resource broker failed to read cluster "
"report response message (machine info) from pipe. "
"Read length %d, expected length %d.",
piperes,
segsize);
if (segstat != NULL)
{
rm_pfree(PCONTEXT, segstat);
}
res = RESBROK_PIPE_ERROR;
break;
}
/* Hold the received segment stat instances. */
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
segstats = lappend(segstats, (void *)segstat);
MEMORY_CONTEXT_SWITCH_BACK
}
/*
* TILL NOW, the whole message content is received.
*/
if ( res == FUNC_RETURN_OK )
{
/* Check if the YARN resource queue report is valid, i.e. maximum
* capacity and capacity are all greater than 0.
*/
if ( response.QueueCapacity <= 0 || response.QueueMaxCapacity <= 0 )
{
elog(WARNING, "YARN mode resource broker got invalid cluster report");
res = RESBROK_WRONG_GLOB_MGR_QUEUEREPORT;
}
}
/* If something wrong, no need to keep the received content, free them. */
if ( res != FUNC_RETURN_OK )
{
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
while( list_length(segstats) > 0 )
{
rm_pfree(PCONTEXT, lfirst(list_head(segstats)));
segstats = list_delete_first(segstats);
}
MEMORY_CONTEXT_SWITCH_BACK
return res;
}
setAllSegResourceGRMUnhandled();
ResourceBundleData oldGRMTotalAll;
ResourceBundleData oldGRMTotal;
resetResourceBundleDataByBundle(&oldGRMTotalAll,
&(PRESPOOL->GRMTotalHavingNoHAWQNode));
resetResourceBundleDataByBundle(&oldGRMTotal,
&(PRESPOOL->GRMTotal));
/*
* Start to update resource pool content. The YARN cluster total size is
* also counted the same time.
*/
resetResourceBundleData(&(PRESPOOL->GRMTotalHavingNoHAWQNode), 0, 0.0, 0);
int updatedcnt = 0;
int skipedcnt = 0;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
while( list_length(segstats) > 0 )
{
SegStat segstat = (SegStat)lfirst(list_head(segstats));
addResourceBundleData(&(PRESPOOL->GRMTotalHavingNoHAWQNode),
segstat->GRMTotalMemoryMB,
segstat->GRMTotalCore);
res = updateHAWQSegWithGRMSegStat(segstat);
if ( res == FUNC_RETURN_OK )
{
SelfMaintainBufferData buffer;
initializeSelfMaintainBuffer(&buffer, PCONTEXT);
generateSegStatReport(segstat, &buffer);
elog(RMLOG, "YARN mode resource broker updated segment configuration "
"in resource pool. %s", buffer.Buffer);
destroySelfMaintainBuffer(&buffer);
updatedcnt++;
}
else
{
elog(WARNING, "YARN mode resource broker skipped segment configuration "
"from host %s",
GET_SEGINFO_GRMHOSTNAME(&(segstat->Info)));
skipedcnt++;
}
rm_pfree(PCONTEXT, segstat);
segstats = list_delete_first(segstats);
}
elog(LOG, "YARN mode resource broker updated %d %s, skiped %d %s",
updatedcnt,
updatedcnt <= 1 ? "segment" : "segments",
skipedcnt,
skipedcnt <= 1 ? "segment" : "segments");
/*
* iterate all segments without GRM report,
* and update its status.
*/
getAllPAIRRefIntoList(&(PRESPOOL->Segments), &allsegres);
foreach(cell, allsegres)
{
SegResource segres = (SegResource)(((PAIR)lfirst(cell))->Value);
bool statusDescChange = false;
/*
* skip segments handled in GRM report list
*/
if (segres->Stat->GRMHandled)
continue;
/*
* Set no GRM node report flag for this segment.
*/
if ((segres->Stat->StatusDesc & SEG_STATUS_NO_GRM_NODE_REPORT) == 0)
{
segres->Stat->StatusDesc |= SEG_STATUS_NO_GRM_NODE_REPORT;
statusDescChange = true;
}
if (IS_SEGSTAT_FTSAVAILABLE(segres->Stat))
{
/*
* This segment is FTS available, but master hasn't
* gotten its GRM node report, so set this segment to DOWN.
*/
setSegResHAWQAvailability(segres, RESOURCE_SEG_STATUS_UNAVAILABLE);
}
Assert(!IS_SEGSTAT_FTSAVAILABLE(segres->Stat));
if (statusDescChange && Gp_role != GP_ROLE_UTILITY)
{
SimpStringPtr description = build_segment_status_description(segres->Stat);
update_segment_status(segres->Stat->Info.ID + REGISTRATION_ORDER_OFFSET,
SEGMENT_STATUS_DOWN,
(description->Len > 0)?description->Str:"");
add_segment_history_row(segres->Stat->Info.ID + REGISTRATION_ORDER_OFFSET,
GET_SEGRESOURCE_HOSTNAME(segres),
description->Str);
elog(LOG, "Resource manager hasn't gotten GRM node report for segment(%s),"
"updates its status:'%c', description:%s",
GET_SEGRESOURCE_HOSTNAME(segres),
SEGMENT_STATUS_DOWN,
(description->Len > 0)?description->Str:"");
freeSimpleStringContent(description);
rm_pfree(PCONTEXT, description);
}
}
freePAIRRefList(&(PRESPOOL->Segments), &allsegres);
MEMORY_CONTEXT_SWITCH_BACK
if ( oldGRMTotalAll.MemoryMB != PRESPOOL->GRMTotalHavingNoHAWQNode.MemoryMB ||
oldGRMTotalAll.Core != PRESPOOL->GRMTotalHavingNoHAWQNode.Core )
{
elog(LOG, "Resource manager YARN resource broker counted YARN cluster "
"having total resource (%d MB, %lf CORE).",
PRESPOOL->GRMTotalHavingNoHAWQNode.MemoryMB,
PRESPOOL->GRMTotalHavingNoHAWQNode.Core);
}
if ( oldGRMTotal.MemoryMB != PRESPOOL->GRMTotal.MemoryMB ||
oldGRMTotal.Core != PRESPOOL->GRMTotal.Core )
{
elog(LOG, "Resource manager YARN resource broker counted HAWQ cluster now "
"having (%d MB, %lf CORE) in a YARN cluster of total resource "
"(%d MB, %lf CORE).",
PRESPOOL->GRMTotal.MemoryMB,
PRESPOOL->GRMTotal.Core,
PRESPOOL->GRMTotalHavingNoHAWQNode.MemoryMB,
PRESPOOL->GRMTotalHavingNoHAWQNode.Core);
}
/*
* If the segment is GRM unavailable or FTS unavailable,
* RM should return all containers located upon them.
*/
returnAllGRMResourceFromUnavailableSegments();
/* Refresh available node count. */
refreshAvailableNodeCount();
/* Update GRM resource queue capacity. */
PQUEMGR->GRMQueueCapacity = response.QueueCapacity;
PQUEMGR->GRMQueueCurCapacity = response.QueueCurCapacity;
PQUEMGR->GRMQueueMaxCapacity = (response.QueueMaxCapacity > 0 &&
response.QueueMaxCapacity <= 1) ?
response.QueueMaxCapacity :
PQUEMGR->GRMQueueMaxCapacity;
PQUEMGR->GRMQueueResourceTight = response.ResourceTight > 0 ? true : false;
refreshResourceQueueCapacity(false);
refreshActualMinGRMContainerPerSeg();
PRESPOOL->LastUpdateTime = gettime_microsec();
return FUNC_RETURN_OK;
}
/*
* Handle the notification from resource broker LIBYARN process, when new
* resource containers are allocated.
*
* NOTE: The request usually can not be met completed, therefore, this function
* has to clean up pending resource quantity to make HAWQ RM able to
* resubmit request if possible.
*/
int handleRB2RM_AllocatedResource(void)
{
int fd = ResBrokerNotifyPipe[0];
int64_t *containerids = NULL;
char *buffer = NULL;
int acceptedcount = 0;
int piperes = 0;
List *newcontainers = NULL;
/* Read result. */
RPCResponseRBAllocateResourceContainersHeadData response;
piperes = readPipe(fd, (char *)&response, sizeof(response));
if ( piperes != sizeof(response) )
{
elog(WARNING, "YARN mode resource broker failed to read resource "
"allocation response message from pipe. "
"Read length %d, expected length %lu.",
piperes,
sizeof(response));
return RESBROK_PIPE_ERROR;
}
if ( response.Result == FUNC_RETURN_OK )
{
elog(LOG, "YARN mode resource broker got allocated resource containers "
"from resource broker. (%d MB, %d CORE) x %d, expected %d "
"containers",
response.MemoryMB,
response.Core,
response.ContainerCount,
response.ExpectedContainerCount);
}
else
{
elog(WARNING, "YARN mode resource broker failed to get resource containers.");
}
if ( response.ContainerCount > 0 )
{
/* Read container ids */
int contidsize = __SIZE_ALIGN64(sizeof(int64_t) * response.ContainerCount);
containerids = (int64_t *)rm_palloc0(PCONTEXT, contidsize);
piperes = readPipe(fd, containerids, contidsize);
if ( piperes != contidsize )
{
elog(WARNING, "YARN mode resource broker failed to read resource "
"allocation response message (container ids) from pipe. "
"Read length %d, expected length %d, errno %d.",
piperes,
contidsize,
errno);
if ( containerids != NULL )
{
rm_pfree(PCONTEXT, containerids);
}
return RESBROK_PIPE_ERROR;
}
/* Read host names */
buffer = rm_palloc(PCONTEXT, response.HostNameStringLen);
piperes = readPipe(fd, buffer, response.HostNameStringLen);
if ( piperes != response.HostNameStringLen )
{
elog(WARNING, "YARN mode resource broker failed to read resource "
"allocation response message (host names) from pipe. "
"Read length %d, expected length %d.",
piperes,
response.HostNameStringLen);
if ( containerids != NULL )
{
rm_pfree(PCONTEXT, containerids);
}
if ( buffer != NULL )
{
rm_pfree(PCONTEXT, buffer);
}
return RESBROK_PIPE_ERROR;
}
/* Build result and add into the resource pool as a pending container. */
char *phostname = buffer;
for ( int i = 0 ; i < response.ContainerCount ; ++i )
{
GRMContainer newcontainer = createGRMContainer(containerids[i],
response.MemoryMB,
response.Core,
phostname,
NULL);
/* Try next host name. */
phostname += strlen(phostname) + 1;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
newcontainers = lappend(newcontainers, newcontainer);
MEMORY_CONTEXT_SWITCH_BACK
}
}
/* Should go into GRM container clean phase. */
if (!isCleanGRMResourceStatus() &&
DRMGlobalInstance->ResBrokerAppTimeStamp > 0 &&
response.SystemStartTimestamp > DRMGlobalInstance->ResBrokerAppTimeStamp)
{
setCleanGRMResourceStatus();
}
/* We always update resource broker work time stamp here. */
DRMGlobalInstance->ResBrokerAppTimeStamp = response.SystemStartTimestamp;
ListCell *cell = NULL;
foreach(cell, newcontainers)
{
GRMContainer newcontainer = (GRMContainer)lfirst(cell);
if ( isCleanGRMResourceStatus() )
{
/*
* If resource manager is now in clean phase, return new containers
* directly. We want to allocate again after clean phase.
*/
addGRMContainerToKicked(newcontainer);
}
else
{
/*
* The container is added to to be accepted container list, if this
* container is not recognized, it is directly kicked.
*/
if ( addGRMContainerToToBeAccepted(newcontainer) == FUNC_RETURN_OK )
{
acceptedcount++;
}
}
}
/*
* Clean up pending resource quantity, if failed to allocate resource, the
* acceptedcount is 0.
*/
removePendingResourceRequestInRootQueue(
response.MemoryMB * (response.ExpectedContainerCount - acceptedcount),
response.Core * (response.ExpectedContainerCount - acceptedcount),
(response.Result == FUNC_RETURN_OK) && (acceptedcount > 0));
elog(LOG, "Resource manager accepted YARN containers (%d MB, %d CORE) x %d "
"from resource broker, expected %d containers, skipped %d containers.",
response.MemoryMB,
response.Core,
acceptedcount,
response.ExpectedContainerCount,
response.ContainerCount - acceptedcount);
/*
* Buildup hash table for updating each segment if it can not have expected
* resources allocated.
*/
HASHTABLEData seghavingres;
initializeHASHTABLE(&seghavingres,
PCONTEXT,
HASHTABLE_SLOT_VOLUME_DEFAULT,
HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
HASHTABLE_KEYTYPE_VOIDPT,
freeResourceBundle);
while( list_length(newcontainers) > 0 )
{
GRMContainer newcontainer = (GRMContainer)
lfirst(list_head(newcontainers));
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
newcontainers = list_delete_first(newcontainers);
MEMORY_CONTEXT_SWITCH_BACK
if ( isCleanGRMResourceStatus() )
{
continue;
}
int32_t segid = -1;
int res = getSegIDByGRMHostName(newcontainer->HostName,
strlen(newcontainer->HostName),
&segid);
if ( res != FUNC_RETURN_OK )
{
elog(WARNING, "Resource manager finds not recognized YARN "
"container on host %s, container id is "INT64_FORMAT,
newcontainer->HostName,
newcontainer->ID);
continue;
}
SegResource segres = getSegResource(segid);
PAIR oldsegpair = getHASHTABLENode(&seghavingres, segres);
ResourceBundle resbundle = NULL;
if ( oldsegpair != NULL )
{
resbundle = (ResourceBundle)(oldsegpair->Value);
}
else
{
resbundle = rm_palloc0(PCONTEXT, sizeof(ResourceBundleData));
resbundle->MemoryMB = 0;
resbundle->Core = 0;
setHASHTABLENode(&seghavingres, segres, resbundle, false);
}
addResourceBundleData(resbundle,
newcontainer->MemoryMB,
newcontainer->Core);
}
if ( !isCleanGRMResourceStatus() )
{
RB_updateSegmentsHavingNoExpectedGRMContainers(&seghavingres);
}
cleanHASHTABLE(&seghavingres);
if ( containerids != NULL )
{
rm_pfree(PCONTEXT, containerids);
}
if ( buffer != NULL )
{
rm_pfree(PCONTEXT, buffer);
}
return response.Result;
}
void freeResourceBundle(void *resbundle)
{
rm_pfree(PCONTEXT, resbundle);
}
int handleRB2RM_ReturnedResource(void)
{
int fd = ResBrokerNotifyPipe[0];
int piperes = 0;
/* Read result code. */
RPCResponseRBReturnResourceContainersData response;
piperes = readPipe(fd, &response, sizeof(response));
if ( piperes != sizeof(response) )
{
elog(WARNING, "YARN mode resource broker failed to read resource return "
"response message from pipe. Read length %d, expected "
"length %lu.",
piperes,
sizeof(response));
return RESBROK_PIPE_ERROR;
}
if ( !isCleanGRMResourceStatus() &&
DRMGlobalInstance->ResBrokerAppTimeStamp > 0 &&
response.SystemStartTimestamp > DRMGlobalInstance->ResBrokerAppTimeStamp )
{
DRMGlobalInstance->ResBrokerAppTimeStamp = response.SystemStartTimestamp;
setCleanGRMResourceStatus();
return response.Result;
}
elog(LOG, "YARN mode resource broker returned resource container(s) to YARN.");
return FUNC_RETURN_OK;
}
int handleRB2RM_ContainerReport(void)
{
int fd = ResBrokerNotifyPipe[0];
int piperes = 0;
/* Read result code. */
RPCResponseRBGetContainerReportHeadData response;
piperes = readPipe(fd, &response, sizeof(response));
if ( piperes != sizeof(response) )
{
elog(WARNING, "YARN mode resource broker failed to read container report "
"message from pipe. Read length %d, expected length %lu.",
piperes,
sizeof(response));
return RESBROK_PIPE_ERROR;
}
if ( response.Result != FUNC_RETURN_OK )
{
elog(WARNING, "YARN mode resource broker received error information of "
"container report response from pipe. error %d",
response.Result);
/*
* Resource broker has something wrong. Can not resume.
*/
return response.Result;
}
RB_GRMContainerStat ctnstats = NULL;
if ( response.ContainerCount > 0 )
{
/* Read all container status. */
int totalsize = sizeof(RB_GRMContainerStatData) * response.ContainerCount;
ctnstats = rm_palloc0(PCONTEXT, totalsize);
piperes = readPipe(fd, ctnstats, totalsize);
if ( piperes != totalsize)
{
elog(WARNING, "YARN mode resource broker failed to read container "
"report container status array. "
"Read length %d, expected length %d",
piperes,
totalsize);
rm_pfree(PCONTEXT, ctnstats);
return RESBROK_PIPE_ERROR;
}
}
if ( !isCleanGRMResourceStatus() &&
DRMGlobalInstance->ResBrokerAppTimeStamp > 0 &&
response.SystemStartTimestamp > DRMGlobalInstance->ResBrokerAppTimeStamp )
{
DRMGlobalInstance->ResBrokerAppTimeStamp = response.SystemStartTimestamp;
setCleanGRMResourceStatus();
return response.Result;
}
/* Pass the container report to resource pool. */
checkGRMContainerStatus(ctnstats, response.ContainerCount);
/*
* Check all containers that not tracked by HAWQ resource manager. We have
* to explicitly return all not tracked GRM containers.
*/
buildToReturnNotTrackedGRMContainers(ctnstats, response.ContainerCount);
/* Free instance. */
if ( ctnstats != NULL )
{
rm_pfree(PCONTEXT, ctnstats);
}
PRESPOOL->LastCheckContainerTime = gettime_microsec();
return response.Result;
}
void buildToReturnNotTrackedGRMContainers(RB_GRMContainerStat ctnstats, int size)
{
for ( int i = 0 ; i < size ; ++i )
{
if ( ctnstats[i].isFound )
{
continue;
}
/* Here we simulate one GRM container instance for the untracked one. */
GRMContainer ctn = rm_palloc0(PCONTEXT, sizeof(GRMContainerData));
ctn->CalcDecPending = false;
ctn->ID = ctnstats[i].ContainerID;
ctn->HostName = NULL;
ctn->MemoryMB = 0;
ctn->Core = 0;
ctn->Life = 0;
ctn->Resource = NULL;
elog(DEBUG3, "YARN mode resource broker creates dummy GRM container "
INT64_FORMAT".",
ctn->ID);
addGRMContainerToKicked(ctn);
}
}
void RB_LIBYARN_handleError(int errorcode)
{
/* Pipe error raised, this needs resource broker process to be resarted. */
if ( errorcode == RESBROK_PIPE_ERROR )
{
DRMGlobalInstance->ResBrokerAppTimeStamp = 0;
elog(WARNING, "YARN mode resource broker pipe does not work well. "
"Try to restart it.");
if ( RB_stop() != FUNC_RETURN_OK )
{
DRMGlobalInstance->ResManagerMainKeepRun = false;
return;
}
}
/*
* If global resource manager has error raised, resource manager has to go
* into clean GRM container phase.
*/
else if ( errorcode == RESBROK_ERROR_GRM )
{
DRMGlobalInstance->ResBrokerAppTimeStamp = 0;
elog(WARNING, "YARN mode resource broker reports YARN exceptions. "
"Reset resource manager allocated resource.");
setCleanGRMResourceStatus();
}
else
{
Assert(errorcode == FUNC_RETURN_OK ||
errorcode == RESBROK_WRONG_GLOB_MGR_QUEUEREPORT);
}
}