blob: 73d4c0b75727185c214da20f8cc3edd3c91b65a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "envswitch.h"
#include "dynrm.h"
#include "miscadmin.h"
#include "getaddrinfo.h"
#include "libpq/ip.h"
#include "utils/kvproperties.h"
#include "resourcebroker/resourcebroker_LIBYARN.h"
#include "resourcebroker/resourcebroker_RM_RB_Protocol.h"
#include "resourcemanager.h"
#include "libyarn/LibYarnClientC.h"
#include <krb5.h>
#include "cdb/cdbfilesystemcredential.h"
/*
*------------------------------------------------------------------------------
* Internal functions
*------------------------------------------------------------------------------
*/
char * ExtractPrincipalFromTicketCache(const char* cache);
int ResBrokerMainInternal(void);
int loadParameters(void);
/*
* Functions for communicating between RB process and YARN resource manager.
*/
int RB2YARN_initializeConnection(void);
int RB2YARN_connectToYARN(void);
int RB2YARN_registerYARNApplication(void);
int RB2YARN_getClusterReport(DQueue hosts);
int RB2YARN_getQueueReport(char *queuename,
double *cap,
double *curcap,
double *maxcap,
bool *haschildren);
int RB2YARN_acquireResource(uint32_t memorymb,
uint32_t core,
uint32_t count,
LibYarnNodeInfo_t *preferredArray,
uint32_t preferredSize,
DQueue containerids,
DQueue containerhosts);
int RB2YARN_returnResource(int64_t *contids, int contcount);
int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size);
int RB2YARN_finishYARNApplication(void);
int RB2YARN_disconnectFromYARN(void);
void RB2YARN_freeContainersInMemory(DQueue containerids, DQueue containerhosts);
const char *RB2YARN_getErrorMessage(void);
/*
* Functions for handling request from HAWQ resource manager.
*/
int handleRM2RB_GetClusterReport(void);
int handleRM2RB_AllocateResource(void);
int handleRM2RB_ReturnResource(void);
int handleRM2RB_GetContainerReport(void);
int sendRBGetClusterReportErrorData(int errorcode);
int sendRBAllocateResourceErrorData(int errorcode,
RPCRequestRBAllocateResourceContainers request);
int sendRBReturnResourceErrorData(int errorcode);
int sendRBGetContainerReportErrorData(int errorcode);
void quitResBroker(SIGNAL_ARGS);
uint64_t ResBrokerStartTime;
/* The user who submits hawq application to Hadoop Yarn,
* default is postgres, if Kerberos is enable, should be principal name.
* */
char* YARNUser;
bool YARNUserShouldFree;
SimpString YARNServer;
SimpString YARNPort;
SimpString YARNSchedulerServer;
SimpString YARNSchedulerPort;
char YARNAMServer[] = "0.0.0.0";
int32_t YARNAMPort = 0;
char YARNTRKUrl[] = "url";
SimpString YARNAppName;
SimpString YARNQueueName;
char *YARNJobID;
LibYarnClient_t *LIBYARNClient;
bool YARNResourceTight;
uint32_t YARNResourceTightTestMemoryMB;
uint32_t YARNResourceTightTestCore;
/**
* Handler of SIGINT which will be sent from HAWQ RM.
*/
void quitResBroker(SIGNAL_ARGS)
{
ResBrokerKeepRun = false;
}
int ResBrokerMain(void)
{
/* Change process ps display. */
if( ResourceManagerIsForked ) {
int oldid = gp_session_id;
gp_session_id = -1;
init_ps_display("yarn resource broker", "", "", "");
gp_session_id = oldid;
}
MyProcPid = getpid();
ResBrokerKeepRun = true;
/* Load parameters */
int res = loadParameters();
if ( res != FUNC_RETURN_OK ) {
elog(WARNING, "Resource broker loads invalid yarn parameters");
}
/* Set signal behavior */
PG_SETMASK(&BlockSig);
pqsignal(SIGHUP , SIG_IGN);
pqsignal(SIGINT , quitResBroker);
pqsignal(SIGTERM, quitResBroker);
pqsignal(SIGQUIT, SIG_DFL);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, SIG_IGN);
pqsignal(SIGUSR2, SIG_IGN);
/* call system() needs set SIG_DFL for SIGCHLD */
pqsignal(SIGCHLD, SIG_DFL);
pqsignal(SIGTTIN, SIG_IGN);
pqsignal(SIGTTOU, SIG_IGN);
PG_SETMASK(&UnBlockSig);
res = ResBrokerMainInternal();
pqsignal(SIGCHLD, SIG_IGN);
elog(LOG, "YARN mode resource broker goes into exit phase.");
return res;
}
/**
* Main entry of resource broker process.
*/
int ResBrokerMainInternal(void)
{
uint32_t messagehead[2];
uint32_t messageid;
fd_set rfds;
struct timeval timeout;
int res = FUNC_RETURN_OK;
int yarnres = FUNCTION_SUCCEEDED;
int fd = ResBrokerRequestPipe[0];
YARNJobID = NULL;
LIBYARNClient = NULL;
ResBrokerStartTime = 0;
YARNResourceTight = false;
YARNResourceTightTestMemoryMB = 0;
YARNResourceTightTestCore = 0;
while( ResBrokerKeepRun )
{
/*
* If the parent process (HAWQ RM process) does not exist, no need to
* run anymore. The process goes to the exit phase directly.
*/
if ( getppid() != ResBrokerParentPID ) {
elog(WARNING, "Parent process of YARN mode resource broker quit. "
"Resource broker process will actively close.");
ResBrokerKeepRun = false;
continue;
}
/* refresh kerberos ticket */
if (enable_secure_filesystem && !login())
{
elog(WARNING, "Resource broker failed to refresh kerberos ticket.");
}
/*
* If the connection between YARN and YARN resource broker is not
* created, try to build up connection and register application.
*/
if ( LIBYARNClient == NULL && YARNJobID == NULL ) {
/* Get ready to connect to global resource manager. */
yarnres = RB2YARN_initializeConnection();
if ( yarnres != FUNCTION_SUCCEEDED ) {
elog(WARNING, "YARN mode resource broker failed to register YARN "
"application. Resource broker will retry soon.");
}
}
/*
* Resource broker always handle requests from resource manager process.
* If the connection between resource broker and YARN is not created,
* error information is sent back as the response.
*/
/* fd to select. */
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
/* select timeout setting. */
timeout.tv_sec = 0;
timeout.tv_usec = 100000;
res = select(fd + 1, &rfds, NULL, NULL, &timeout);
/* Something passed from HAWQ RM process. */
if ( res > 0 && FD_ISSET(fd, &rfds) )
{
/* read request from ResBrokerRequestPipe */
int readres = readPipe(fd, messagehead, sizeof(messagehead));
if (readres == -1) {
elog(WARNING, "YARN mode resource broker pipe has error raised.");
ResBrokerKeepRun = false;
continue;
}
else if ( readres != sizeof(messagehead) ) {
elog(WARNING, "YARN mode resource broker pipe cannot read expect "
"data.");
ResBrokerKeepRun = false;
continue;
}
messageid = messagehead[0];
elog(DEBUG3, "YARN mode resource broker gets request %d from "
"resource manager main process.",
messageid);
res = FUNC_RETURN_OK;
switch(messageid) {
case RM2RB_GET_CLUSTERREPORT:
res = handleRM2RB_GetClusterReport();
break;
case RM2RB_ALLOC_RESOURCE:
res = handleRM2RB_AllocateResource();
break;
case RM2RB_RETURN_RESOURCE:
res = handleRM2RB_ReturnResource();
break;
case RM2RB_GET_CONTAINERREPORT:
res = handleRM2RB_GetContainerReport();
break;
default:
elog(WARNING, "YARN mode resource broker received wrong message "
"id %d",
messageid);
ResBrokerKeepRun = false;
res = RESBROK_WRONG_MESSAGE_ID;
}
if ( res != FUNC_RETURN_OK ) {
elog(WARNING, "YARN mode resource broker failed to process request. "
"Message id = %d, result = %d.",
messageid,
res);
/* If this is a pipe error between RM and RB or YARN remove error.
* Exit RB and let RM restart RB process. */
if ( res == RESBROK_ERROR_GRM )
{
if ( LIBYARNClient != NULL && YARNJobID != NULL ) {
forceKillJob(LIBYARNClient, YARNJobID);
RB2YARN_disconnectFromYARN();
elog(LOG, "YARN mode resource broker disconnects from YARN. "
"Resource broker will retry to register soon.");
}
}
else
{
ResBrokerKeepRun = false;
}
}
}
else if ( res < 0 ) {
if ( errno != EAGAIN && errno != EINTR ) {
elog(WARNING, "YARN mode resource broker got select() error, "
"(errno %d).", errno);
/* We treat this case as pipe error. */
ResBrokerKeepRun = false;
}
}
}
elog(LOG, "YARN mode resource broker finish YARN application now.");
if ( LIBYARNClient != NULL && YARNJobID != NULL ) {
/*
* Unregister the application in YARN, this also makes all allocated
* resource containers for HAWQ returned. Here, we ignore the result
* of the action.
*/
int yarnres = RB2YARN_finishYARNApplication();
elog(LOG, "YARN mode resource broker get result of finish yarn application "
"through libYARN %d",
yarnres);
if (YARNUser != NULL && YARNUserShouldFree)
{
free(YARNUser);
}
YARNUser = NULL;
YARNUserShouldFree = true;
yarnres = RB2YARN_disconnectFromYARN();
elog(LOG, "YARN mode resource broker get result of disconnecting YARN "
"through libYARN %d",
yarnres);
}
return FUNC_RETURN_OK;
}
/*
* Extract principal from cache
*/
char * ExtractPrincipalFromTicketCache(const char* cache)
{
krb5_context cxt = NULL;
krb5_ccache ccache = NULL;
krb5_principal principal = NULL;
krb5_error_code ec = 0;
char *priName = NULL, *retval = NULL;
const char *errorMsg = NULL;
if (cache) {
if (0 != setenv("KRB5CCNAME", cache, 1)) {
elog(WARNING, "Cannot set env parameter \"KRB5CCNAME\" when extract principal from cache:%s", cache);
return NULL;
}
}
do {
if (0 != (ec = krb5_init_context(&cxt))) {
break;
}
if (0 != (ec = krb5_cc_default(cxt, &ccache))) {
break;
}
if (0 != (ec = krb5_cc_get_principal(cxt, ccache, &principal))) {
break;
}
if (0 != (ec = krb5_unparse_name(cxt, principal, &priName))) {
break;
}
} while (0);
if (!ec) {
retval = strdup(priName);
} else {
if (cxt) {
errorMsg = krb5_get_error_message(cxt, ec);
} else {
errorMsg = "Cannot initialize kerberos context";
}
}
if (priName != NULL) {
krb5_free_unparsed_name(cxt, priName);
}
if (principal != NULL) {
krb5_free_principal(cxt, principal);
}
if (ccache != NULL) {
krb5_cc_close(cxt, ccache);
}
if (cxt != NULL) {
krb5_free_context(cxt);
}
if (errorMsg != NULL) {
elog(WARNING, "Fail to extract principal from cache, because : %s", errorMsg);
return NULL;
}
return retval;
}
int loadParameters(void)
{
int res = FUNC_RETURN_OK;
initSimpleString(&YARNServer, PCONTEXT);
initSimpleString(&YARNPort, PCONTEXT);
initSimpleString(&YARNSchedulerServer, PCONTEXT);
initSimpleString(&YARNSchedulerPort, PCONTEXT);
initSimpleString(&YARNQueueName, PCONTEXT);
initSimpleString(&YARNAppName, PCONTEXT);
YARNUser = NULL;
YARNUserShouldFree = false;
/* Get server and port */
char *pcolon = NULL;
if ( rm_grm_yarn_rm_addr == NULL ||
(pcolon = strchr(rm_grm_yarn_rm_addr, ':')) ==NULL ) {
res = RESBROK_WRONG_GLOB_MGR_ADDRESS;
elog(LOG, "The format of property %s must be <address>:<port>.",
HAWQDRM_CONFFILE_YARN_SERVERADDR);
goto exit;
}
/* YARNServer:YARNPort */
setSimpleStringWithContent(&YARNServer,
rm_grm_yarn_rm_addr,
pcolon - rm_grm_yarn_rm_addr);
setSimpleStringNoLen(&YARNPort, pcolon + 1);
int32_t testport;
res = SimpleStringToInt32(&YARNPort, &testport);
if ( res != FUNC_RETURN_OK ) {
res = RESBROK_WRONG_GLOB_MGR_ADDRESS;
elog(LOG, "The port number in property %s can not be parsed.",
HAWQDRM_CONFFILE_YARN_SERVERADDR);
goto exit;
}
/* Get scheduler server and port */
if ( rm_grm_yarn_sched_addr == NULL ||
(pcolon = strchr(rm_grm_yarn_sched_addr, ':')) ==NULL ) {
res = RESBROK_WRONG_GLOB_MGR_ADDRESS;
elog(LOG, "The format of property %s must be <address>:<port>.",
HAWQDRM_CONFFILE_YARN_SCHEDULERADDR);
goto exit;
}
setSimpleStringWithContent(&YARNSchedulerServer,
rm_grm_yarn_sched_addr,
pcolon - rm_grm_yarn_sched_addr);
setSimpleStringNoLen(&YARNSchedulerPort, pcolon + 1);
int32_t testschedport;
res = SimpleStringToInt32(&YARNSchedulerPort, &testschedport);
if ( res != FUNC_RETURN_OK ) {
res = RESBROK_WRONG_GLOB_MGR_ADDRESS;
elog(LOG, "The port number in property %s can not be parsed.",
HAWQDRM_CONFFILE_YARN_SCHEDULERADDR);
goto exit;
}
/* Get YARN queue name. */
if ( rm_grm_yarn_queue == NULL || rm_grm_yarn_queue[0] == '\0' ) {
res = RESBROK_WRONG_GLOB_MGR_QUEUE;
elog(LOG, "Can not find property %s.", HAWQDRM_CONFFILE_YARN_QUEUE);
goto exit;
}
setSimpleStringNoLen(&YARNQueueName, rm_grm_yarn_queue);
/* Get YARN HAWQ application name string. */
if ( rm_grm_yarn_app_name == NULL || rm_grm_yarn_app_name[0] == '\0' ) {
res = RESBROK_WRONG_GLOB_MGR_APPNAME;
elog(LOG, "Can not find property %s.", HAWQDRM_CONFFILE_YARN_APP_NAME);
goto exit;
}
setSimpleStringNoLen(&YARNAppName, rm_grm_yarn_app_name);
/* If kerberos is enable, fetch the principal from ticket cache file. */
if (enable_secure_filesystem)
{
if (!login())
{
elog(WARNING, "Resource broker failed to refresh kerberos ticket.");
}
YARNUser = ExtractPrincipalFromTicketCache(krb5_ccname);
YARNUserShouldFree = true;
}
if (YARNUser == NULL)
{
YARNUser = "postgres";
YARNUserShouldFree = false;
}
elog(LOG, "YARN mode resource broker accepted YARN connection arguments : "
"YARN Server %s:%s "
"Scheduler server %s:%s "
"Queue %s Application name %s, "
"by user:%s",
YARNServer.Str,
YARNPort.Str,
YARNSchedulerServer.Str,
YARNSchedulerPort.Str,
YARNQueueName.Str,
YARNAppName.Str,
YARNUser);
exit:
if ( res != FUNC_RETURN_OK ) {
elog(WARNING, "YARN mode resource broker failed to load YARN connection arguments.");
}
return res;
}
/**
* Handle the request from RM to RB for getting cluster report.
*/
int handleRM2RB_GetClusterReport(void)
{
int res = FUNC_RETURN_OK;
DQueueData clusterreport;
uint32_t Reserved = 0;
uint32_t MessageID = RB2RM_CLUSTERREPORT;
int piperes = 0;
/* Get request header. */
RPCRequestRBGetClusterReportHeadData requesthead;
piperes = readPipe(ResBrokerRequestPipe[0], &requesthead, sizeof(requesthead));
if ( piperes != sizeof(requesthead) ) {
elog(WARNING, "YARN mode resource broker failed to read cluster report "
"request message from pipe. Read length %d, expected "
"length %lu",
piperes,
sizeof(requesthead));
return RESBROK_PIPE_ERROR;
}
/* This is bad assumption... */
Assert( (requesthead.QueueNameLen % 8) == 0 );
/* Get resource queue name string. */
SelfMaintainBufferData resqueuename;
initializeSelfMaintainBuffer(&resqueuename, PCONTEXT);
prepareSelfMaintainBuffer(&resqueuename, requesthead.QueueNameLen, true);
piperes = readPipe(ResBrokerRequestPipe[0],
resqueuename.Buffer,
requesthead.QueueNameLen);
if ( piperes != requesthead.QueueNameLen ) {
elog(WARNING, "YARN mode resource broker failed to read cluster report "
"request message from pipe (queue name). Read length %d, "
"expected length %d",
piperes,
requesthead.QueueNameLen);
return RESBROK_PIPE_ERROR;
}
if ( YARNJobID == NULL ) {
return sendRBGetClusterReportErrorData(RESBROK_ERROR_GRM);
}
/* Get resource queue capacity. */
double cap = 0.0;
double curcap = 0.0;
double maxcap = 0.0;
bool haschildren = false;
res = RB2YARN_getQueueReport(resqueuename.Buffer,
&cap,
&curcap,
&maxcap,
&haschildren);
if ( res != FUNCTION_SUCCEEDED ) {
return sendRBGetClusterReportErrorData(RESBROK_ERROR_GRM);
}
elog(LOG, "Get YARN resource queue %s report: "
"Capacity %lf, Current Capacity %lf, Maximum Capacity %lf.",
resqueuename.Buffer,
cap,
curcap,
maxcap);
destroySelfMaintainBuffer(&resqueuename);
/* Get cluster report. */
initializeDQueue(&clusterreport, PCONTEXT);
res = RB2YARN_getClusterReport(&clusterreport);
if ( res != FUNCTION_SUCCEEDED )
{
return sendRBGetClusterReportErrorData(RESBROK_ERROR_GRM);
}
elog(DEBUG3, "YARN resource tight test (%d MB, %d CORE)",
YARNResourceTightTestMemoryMB,
YARNResourceTightTestCore);
/* Test queue resource tightness. */
if ( YARNResourceTightTestMemoryMB > 0 && YARNResourceTightTestCore > 0 )
{
/* Try to allocate 1 container. */
DQueueData ctnids;
DQueueData ctnhosts;
initializeDQueue(&ctnids, PCONTEXT);
initializeDQueue(&ctnhosts, PCONTEXT);
res = RB2YARN_acquireResource(YARNResourceTightTestMemoryMB,
YARNResourceTightTestCore,
1,
NULL,
0,
&ctnids,
&ctnhosts);
if ( res != FUNCTION_SUCCEEDED )
{
RB2YARN_freeContainersInMemory(&ctnids, &ctnhosts);
return sendRBGetClusterReportErrorData(RESBROK_ERROR_GRM);
}
if ( ctnids.NodeCount > 0 )
{
/* Return it at once. */
int64_t *ctnidarr = rm_palloc0(PCONTEXT,
sizeof(int64_t) * ctnids.NodeCount);
int idx = 0;
DQUEUE_LOOP_BEGIN(&ctnids, iter, int64_t *, pctnid)
ctnidarr[idx] = *pctnid;
elog(DEBUG3, "YARN mode resource broker returns container ID "
INT64_FORMAT".",
ctnidarr[idx]);
idx++;
DQUEUE_LOOP_END
RB2YARN_freeContainersInMemory(&ctnids, &ctnhosts);
res = RB2YARN_returnResource(ctnidarr, idx);
if ( res != FUNCTION_SUCCEEDED )
{
rm_pfree(PCONTEXT, ctnidarr);
return sendRBReturnResourceErrorData(RESBROK_ERROR_GRM);
}
YARNResourceTight = false;
elog(DEBUG3, "YARN mode resource broker consider the target queue not busy.");
}
else
{
YARNResourceTight = true;
elog(DEBUG3, "YARN mode resource broker consider the target queue busy.");
}
}
/*
* Build response message.
*/
/* Prepare buffer for the content to send. */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
RPCResponseRBGetClusterReportHeadData responsehead;
responsehead.Result = FUNC_RETURN_OK;
responsehead.MachineCount = clusterreport.NodeCount;
responsehead.QueueCapacity = cap;
responsehead.QueueCurCapacity = curcap;
responsehead.QueueMaxCapacity = maxcap;
responsehead.Reserved = 0;
responsehead.ResourceTight = (YARNResourceTightTestMemoryMB > 0 &&
YARNResourceTightTestCore > 0 &&
YARNResourceTight) ? 1 : 0;
appendSMBVar(&sendBuffer, responsehead);
elog(DEBUG3, "YARN cluster report includes %d hosts.", clusterreport.NodeCount);
/* all machines */
DQUEUE_LOOP_BEGIN(&clusterreport, iter, SegStat, segstat)
/* calculate segent stat size. */
uint32_t datasize = offsetof(SegStatData, Info) + segstat->Info.Size;
appendSMBVar(&sendBuffer, datasize);
appendSMBVar(&sendBuffer, Reserved);
appendSelfMaintainBuffer(&sendBuffer, (char *)segstat, datasize);
DQUEUE_LOOP_END
/*
* Send Message.
*/
int buffsize = getSMBContentSize(&sendBuffer);
piperes = writePipe(ResBrokerNotifyPipe[1], sendBuffer.Buffer, buffsize);
if ( piperes != buffsize )
{
res = RESBROK_PIPE_ERROR;
elog(WARNING, "YARN mode resource broker failed to write out cluster "
"report response message from pipe. "
"Wrote length %d, expected length %d, errno %m.",
piperes,
buffsize);
}
else {
elog(DEBUG3, "YARN mode resource broker sent YARN cluster report "
"response to resource manager.");
}
/* Free send buffer. */
destroySelfMaintainBuffer(&sendBuffer);
/* Free the result. */
while( clusterreport.NodeCount > 0 ) {
SegStat tofree = removeDQueueHeadNode(&clusterreport);
rm_pfree(PCONTEXT, tofree);
}
cleanDQueue(&clusterreport);
return res;
}
int sendRBGetClusterReportErrorData(int errorcode)
{
uint32_t MessageID = RB2RM_CLUSTERREPORT;
/* Prepare buffer for the content to send. */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
RPCResponseRBGetClusterReportHeadData responsehead;
responsehead.Result = errorcode;
responsehead.MachineCount = 0;
responsehead.QueueCapacity = 0.0;
responsehead.QueueCurCapacity = 0.0;
responsehead.QueueMaxCapacity = 0.0;
appendSMBVar(&sendBuffer, responsehead);
elog(DEBUG3, "YARN mode resource broker sends cluster report "
"error response %d.", errorcode);
int buffsize = getSMBContentSize(&sendBuffer);
int piperes = writePipe(ResBrokerNotifyPipe[1], sendBuffer.Buffer, buffsize);
/* Free send buffer. */
destroySelfMaintainBuffer(&sendBuffer);
if ( piperes != buffsize )
{
elog(WARNING, "YARN mode resource broker failed to write out cluster "
"report response message from pipe. "
"Wrote length %d, expected length %d, errno %m.",
piperes,
buffsize);
return RESBROK_PIPE_ERROR;
}
return errorcode;
}
/**
* Handle the request from RM to RB for allocating a group of containers as
* allocated resource to HAWQ.
*/
int handleRM2RB_AllocateResource(void)
{
uint32_t MessageID = RB2RM_ALLOCATED_RESOURCE;
int res = FUNC_RETURN_OK;
int libyarnres = FUNCTION_SUCCEEDED;
int piperes = 0;
void *pBuffer = NULL;
LibYarnNodeInfo_t *preferredArray = NULL;
DQueueData acquiredcontids;
DQueueData acquiredconthosts;
initializeDQueue(&acquiredcontids, PCONTEXT);
initializeDQueue(&acquiredconthosts, PCONTEXT);
/* Get request content. */
RPCRequestRBAllocateResourceContainersData request;
piperes = readPipe(ResBrokerRequestPipe[0], &request, sizeof(request));
if ( piperes != sizeof(request) )
{
elog(WARNING, "YARN mode resource broker failed to read resource "
"allocation request message from pipe. "
"Read length %d, expected length %lu",
piperes,
sizeof(request));
return RESBROK_PIPE_ERROR;
}
elog(DEBUG3, "Resource manager acquires (%d MB, %d CORE) x %d containers "
"from YARN.",
request.MemoryMB,
request.Core,
request.ContainerCount);
/* build preferred host list */
if (request.MsgLength > 0 && request.PreferredSize > 0) {
/* read preferred host message */
pBuffer = rm_palloc0(PCONTEXT, request.MsgLength);
piperes = readPipe(ResBrokerRequestPipe[0], pBuffer, request.MsgLength);
if ( piperes != request.MsgLength)
{
elog(WARNING, "YARN mode resource broker failed to read resource "
"allocation request message with preferred host list from pipe."
"Read length %d, expected length %d.",
piperes,
request.MsgLength);
if (pBuffer != NULL)
rm_pfree(PCONTEXT, pBuffer);
return RESBROK_PIPE_ERROR;
}
preferredArray = (LibYarnNodeInfo_t*)rm_palloc0(PCONTEXT,
request.PreferredSize * sizeof(LibYarnNodeInfo_t));
uint32_t preferredOffset = 0;
for (int i = 0; i < request.PreferredSize; i++) {
preferredArray[i].num_containers = *(uint16_t*)((char*)pBuffer + preferredOffset + sizeof(uint32_t));
preferredArray[i].hostname = (char*)pBuffer + preferredOffset + sizeof(uint32_t)*2;
preferredArray[i].rackname = (char*)pBuffer + preferredOffset + sizeof(uint32_t)*2 +
strlen(preferredArray[i].hostname) + 1;
preferredOffset += *(uint32_t*)((char*)pBuffer + preferredOffset);
elog(LOG, "YARN mode resource broker acquire resource from YARN with "
"preferred host, hostname:%s, rackname:%s, container number:%d.",
preferredArray[i].hostname, preferredArray[i].rackname, preferredArray[i].num_containers);
}
}
elog(DEBUG3, "LIBYARN mode resource broker process read %d bytes in.",
request.MsgLength + sizeof(RPCRequestRBAllocateResourceContainersData));
if ( YARNJobID == NULL )
{
if (pBuffer != NULL)
{
rm_pfree(PCONTEXT, pBuffer);
}
if (preferredArray != NULL)
{
rm_pfree(PCONTEXT, preferredArray);
}
return sendRBAllocateResourceErrorData(RESBROK_ERROR_GRM, &request);
}
/*
* Mark latest used YARN container resource quota for testing if YARN
* resource queue is busy.
*/
YARNResourceTightTestMemoryMB = request.MemoryMB;
YARNResourceTightTestCore = request.Core;
/* Acquire resource from global resource manager. */
libyarnres = RB2YARN_acquireResource(request.MemoryMB,
request.Core,
request.ContainerCount,
preferredArray,
request.PreferredSize,
&acquiredcontids,
&acquiredconthosts);
if ( libyarnres != FUNCTION_SUCCEEDED )
{
if (pBuffer != NULL)
{
rm_pfree(PCONTEXT, pBuffer);
}
if (preferredArray != NULL)
{
rm_pfree(PCONTEXT, preferredArray);
}
return sendRBAllocateResourceErrorData(RESBROK_ERROR_GRM, &request);
}
/*
* Build response message.
*/
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
Assert( acquiredcontids.NodeCount == acquiredconthosts.NodeCount );
/* Build response head information. */
RPCResponseRBAllocateResourceContainersHeadData responsehead;
responsehead.Result = FUNC_RETURN_OK;
responsehead.MemoryMB = request.MemoryMB;
responsehead.Core = request.Core;
responsehead.ContainerCount = acquiredcontids.NodeCount;
responsehead.ExpectedContainerCount = request.ContainerCount;
responsehead.HostNameStringLen = 0;
responsehead.SystemStartTimestamp = ResBrokerStartTime;
appendSMBVar(&sendBuffer, responsehead);
/* Append each container id. */
DQUEUE_LOOP_BEGIN(&acquiredcontids, iter, int64_t *, pcontid)
appendSMBVar(&sendBuffer, *pcontid);
DQUEUE_LOOP_END
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
int cursize = sendBuffer.Cursor + 1;
/* Append each host name string. */
DQUEUE_LOOP_BEGIN(&acquiredconthosts, iter, char *, hostname)
appendSMBStr(&sendBuffer, hostname);
DQUEUE_LOOP_END
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
/* Set host name string total length. */
RPCResponseRBAllocateResourceContainersHead phead =
(RPCResponseRBAllocateResourceContainersHead)
(sendBuffer.Buffer + sizeof(MessageID) + sizeof(uint32_t));
phead->HostNameStringLen = sendBuffer.Cursor + 1 - cursize;
/*
* Send message.
*/
int buffsize = getSMBContentSize(&sendBuffer);
piperes = writePipe(ResBrokerNotifyPipe[1], sendBuffer.Buffer, buffsize);
if ( piperes != buffsize )
{
res = RESBROK_PIPE_ERROR;
elog(WARNING, "YARN mode resource broker failed to write out resource "
"allocation response message from pipe. "
"Wrote length %d, expected length %d.",
piperes,
buffsize);
}
else
{
elog(DEBUG3, "Sent YARN resource allocation response to resource manager.");
}
if (pBuffer != NULL)
rm_pfree(PCONTEXT, pBuffer);
if (preferredArray != NULL)
rm_pfree(PCONTEXT, preferredArray);
/* Free the result. */
RB2YARN_freeContainersInMemory(&acquiredcontids, &acquiredconthosts);
/* Free send buffer. */
destroySelfMaintainBuffer(&sendBuffer);
return res;
}
int sendRBAllocateResourceErrorData(int errorcode,
RPCRequestRBAllocateResourceContainers request)
{
uint32_t MessageID = RB2RM_ALLOCATED_RESOURCE;
/* Prepare buffer for the content to send. */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
/* Build response head information. */
RPCResponseRBAllocateResourceContainersHeadData responsehead;
responsehead.Result = errorcode;
responsehead.MemoryMB = request->MemoryMB;
responsehead.Core = request->Core;
responsehead.ContainerCount = 0;
responsehead.ExpectedContainerCount = request->ContainerCount;
responsehead.HostNameStringLen = 0;
responsehead.SystemStartTimestamp = ResBrokerStartTime;
appendSMBVar(&sendBuffer, responsehead);
elog(DEBUG3, "YARN mode resource broker sends resource allocation "
"error response %d.", errorcode);
int buffsize = getSMBContentSize(&sendBuffer);
int piperes = writePipe(ResBrokerNotifyPipe[1], sendBuffer.Buffer, buffsize);
/* Free send buffer. */
destroySelfMaintainBuffer(&sendBuffer);
if ( piperes != buffsize )
{
elog(WARNING, "YARN mode resource broker failed to write out resource "
"allocation response error message from pipe. "
"Wrote length %d, expected length %d.",
piperes,
buffsize);
return RESBROK_PIPE_ERROR;
}
return errorcode;
}
int handleRM2RB_ReturnResource(void)
{
uint32_t MessageID = RB2RM_RETURNED_RESOURCE;
int res = FUNC_RETURN_OK;
int libyarnres = FUNCTION_SUCCEEDED;
int piperes = 0;
int actualsize = 0;
int64_t *containerids = NULL;
/* Read request content. */
RPCRequestRBReturnResourceContainersHeadData request;
piperes = readPipe(ResBrokerRequestPipe[0], &request, sizeof(request));
if ( piperes != sizeof(request) ) {
elog(WARNING, "YARN mode resource broker failed to read resource return "
"request message from pipe. "
"Read length %d, expected length %lu, errno %m.",
piperes,
sizeof(request));
return RESBROK_PIPE_ERROR;
}
/* Read the container id list. */
actualsize = request.ContainerCount;
containerids = rm_palloc(PCONTEXT, sizeof(int64_t) * actualsize);
piperes = readPipe(ResBrokerRequestPipe[0],
containerids,
sizeof(int64_t) * actualsize);
if ( piperes != sizeof(int64_t) * actualsize )
{
elog(WARNING, "YARN mode resource broker failed to read resource return "
"request message (container ids) from pipe. "
"Read length %d, expected length %lu, errno %m.",
piperes,
sizeof(request));
if( containerids != NULL )
{
rm_pfree(PCONTEXT, containerids);
}
return RESBROK_PIPE_ERROR;
}
elog(LOG, "YARN mode resource broker returns %d Containers to YARN.",
request.ContainerCount);
for ( int i = 0 ; i < request.ContainerCount ; ++i )
{
elog(RMLOG, "YARN mode resource broker tries to return container of id "
INT64_FORMAT,
containerids[i]);
}
if ( YARNJobID == NULL )
{
if( containerids != NULL ) {
rm_pfree(PCONTEXT, containerids);
}
return sendRBReturnResourceErrorData(RESBROK_ERROR_GRM);
}
/* Return resource to global resource manager. */
libyarnres = RB2YARN_returnResource(containerids, request.ContainerCount);
if ( libyarnres != FUNCTION_SUCCEEDED )
{
if( containerids != NULL ) {
rm_pfree(PCONTEXT, containerids);
}
return sendRBReturnResourceErrorData(RESBROK_ERROR_GRM);
}
/*
* Build response message.
*/
/* Prepare buffer for the content to send. */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
RPCResponseRBReturnResourceContainersData response;
response.Result = FUNC_RETURN_OK;
response.Reserved = 0;
response.SystemStartTimestamp = ResBrokerStartTime;
appendSMBVar(&sendBuffer, response);
/*
* Send Message.
*/
int buffsize = getSMBContentSize(&sendBuffer);
piperes = writePipe(ResBrokerNotifyPipe[1], sendBuffer.Buffer, buffsize);
if ( piperes != buffsize )
{
res = RESBROK_PIPE_ERROR;
elog(WARNING, "YARN mode resource broker failed to write out resource "
"return response message from pipe. "
"Wrote length %d, expected length %d, errno %m.",
piperes,
buffsize);
}
else
{
elog(DEBUG3, "Sent YARN resource return response to resource manager.");
}
/* Free send buffer. */
destroySelfMaintainBuffer(&sendBuffer);
if( containerids != NULL )
{
rm_pfree(PCONTEXT, containerids);
}
return res;
}
int sendRBReturnResourceErrorData(int errorcode)
{
uint32_t MessageID = RB2RM_RETURNED_RESOURCE;
/* Prepare buffer for the content to send. */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
RPCResponseRBReturnResourceContainersData response;
response.Result = errorcode;
response.Reserved = 0;
response.SystemStartTimestamp = ResBrokerStartTime;
appendSMBVar(&sendBuffer, response);
elog(DEBUG3, "YARN mode resource broker sends resource return "
"error response %d.", errorcode);
int buffsize = getSMBContentSize(&sendBuffer);
int piperes = writePipe(ResBrokerNotifyPipe[1], sendBuffer.Buffer, buffsize);
/* Free send buffer. */
destroySelfMaintainBuffer(&sendBuffer);
if ( piperes != buffsize )
{
elog(WARNING, "YARN mode resource broker failed to write out resource "
"return response error message from pipe. "
"Wrote length %d, expected length %d.",
piperes,
buffsize);
return RESBROK_PIPE_ERROR;
}
return errorcode;
}
int handleRM2RB_GetContainerReport(void)
{
uint32_t MessageID = RB2RM_CONTAINERREPORT;
int res = FUNC_RETURN_OK;
int libyarnres = FUNCTION_SUCCEEDED;
int piperes = 0;
if ( YARNJobID == NULL )
{
return sendRBGetContainerReportErrorData(RESBROK_ERROR_GRM);
}
/* Get container report from yarn resource manager. */
RB_GRMContainerStat ctnstats = NULL;
int size = 0;
libyarnres = RB2YARN_getContainerReport(&ctnstats, &size);
if ( libyarnres != FUNCTION_SUCCEEDED )
{
if( ctnstats != NULL )
{
rm_pfree(PCONTEXT, ctnstats);
}
return sendRBGetContainerReportErrorData(RESBROK_ERROR_GRM);
}
elog(LOG, "YARN mode resource broker got total %d containers", size);
for( int i = 0 ; i < size ; ++i )
{
elog(RMLOG, "Container report ID:"INT64_FORMAT", isActive:%d",
ctnstats[i].ContainerID,
ctnstats[i].isActive);
}
/* Build response message. */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
RPCResponseRBGetContainerReportHeadData response;
response.Result = FUNC_RETURN_OK;
response.ContainerCount = size;
response.SystemStartTimestamp = ResBrokerStartTime;
appendSMBVar(&sendBuffer, response);
appendSelfMaintainBuffer(&sendBuffer,
(char *)ctnstats,
size * sizeof(RB_GRMContainerStatData));
/* Send Message. */
int buffsize = getSMBContentSize(&sendBuffer);
piperes = writePipe(ResBrokerNotifyPipe[1], sendBuffer.Buffer, buffsize);
if ( piperes != getSMBContentSize(&sendBuffer) )
{
res = RESBROK_PIPE_ERROR;
elog(WARNING, "YARN mode resource broker failed to write out container "
"report response message to pipe. "
"Wrote length %d, expected length %d, errno %m.",
piperes,
buffsize);
}
else
{
elog(DEBUG3, "Sent YARN container report response to resource manager.");
}
/* Free send buffer. */
destroySelfMaintainBuffer(&sendBuffer);
if( ctnstats != NULL )
{
rm_pfree(PCONTEXT, ctnstats);
}
return res;
}
int sendRBGetContainerReportErrorData(int errorcode)
{
uint32_t MessageID = RB2RM_CONTAINERREPORT;
/* Prepare buffer for the content to send. */
SelfMaintainBufferData sendBuffer;
initializeSelfMaintainBuffer(&sendBuffer, PCONTEXT);
appendSMBVar(&sendBuffer, MessageID);
appendSelfMaintainBufferTill64bitAligned(&sendBuffer);
RPCResponseRBGetContainerReportHeadData response;
response.Result = errorcode;
response.ContainerCount = 0;
response.SystemStartTimestamp = ResBrokerStartTime;
appendSMBVar(&sendBuffer, response);
elog(DEBUG3, "YARN mode resource broker sends container report error "
"response %d.",
errorcode);
int buffsize = getSMBContentSize(&sendBuffer);
int piperes = writePipe(ResBrokerNotifyPipe[1], sendBuffer.Buffer, buffsize);
/* Free send buffer. */
destroySelfMaintainBuffer(&sendBuffer);
if ( piperes != buffsize )
{
elog(WARNING, "YARN mode resource broker failed to write out container "
"report response error message from pipe. "
"Wrote length %d, expected length %d.",
piperes,
buffsize);
return RESBROK_PIPE_ERROR;
}
return errorcode;
}
/*******************************************************************************
* Wrapper of libyarn APIs to negotiate resource with YARN.
*
* RETURN VALUES follow the definition of libyarn APIs.
******************************************************************************/
/*
* Initialize connection to global resource manager before providing resource
* negotiation services.
*/
int RB2YARN_initializeConnection(void)
{
int yarnres = FUNCTION_SUCCEEDED;
/* Connect to YARN. */
yarnres = RB2YARN_connectToYARN();
if ( yarnres != FUNCTION_SUCCEEDED ) { return yarnres; }
/* Register in global resource manager. */
yarnres = RB2YARN_registerYARNApplication();
if ( yarnres != FUNCTION_SUCCEEDED ) {
/* Free libYARN client instance. */
RB2YARN_disconnectFromYARN();
return yarnres;
}
elog(LOG, "YARN mode resource broker is ready to access YARN resource manager.");
YARNResourceTight = false;
return FUNCTION_SUCCEEDED;
}
#define HAWQ_YARN_AM_HEARTBEAT_INTERVAL 5
/* Connect and disconnect to the global resource manager. */
int RB2YARN_connectToYARN(void)
{
int yarnres = FUNCTION_SUCCEEDED;
/* Setup YARN client. */
yarnres = newLibYarnClient(YARNUser,
YARNServer.Str,
YARNPort.Str,
YARNSchedulerServer.Str,
YARNSchedulerPort.Str,
YARNAMServer,
YARNAMPort,
YARNTRKUrl,
&LIBYARNClient,
HAWQ_YARN_AM_HEARTBEAT_INTERVAL*1000 /* Hard code 5 sec */);
return yarnres;
}
int RB2YARN_registerYARNApplication(void)
{
int retry = 5;
int yarnres = FUNCTION_SUCCEEDED, result = FUNCTION_SUCCEEDED;
yarnres = createJob(LIBYARNClient,
YARNAppName.Str,
YARNQueueName.Str,
&YARNJobID);
if ( yarnres != FUNCTION_SUCCEEDED )
{
elog(WARNING, "YARN mode resource broker failed to create application "
"in YARN resource manager. %s",
getErrorMessage());
return yarnres;
}
elog(LOG, "YARN mode resource broker created job in YARN resource "
"manager %s as new application %s assigned to queue %s.",
YARNJobID,
YARNAppName.Str,
YARNQueueName.Str);
/* check if hawq is registered successfully in Hadoop Yarn.
* if not, kill application from Hadoop Yarn.
*/
LibYarnApplicationReport_t *applicationReport = NULL;
while (retry > 0)
{
retry--;
result = getApplicationReport(LIBYARNClient, YARNJobID, &applicationReport);
if (result != FUNCTION_SUCCEEDED || applicationReport == NULL)
{
if (retry > 0) {
usleep(HAWQ_YARN_AM_HEARTBEAT_INTERVAL*1000*1000L);
continue;
} else {
elog(WARNING, "YARN mode resource broker failed to get application report, "
"so kill it from Hadoop Yarn.");
result = forceKillJob(LIBYARNClient, YARNJobID);
if (result != FUNCTION_SUCCEEDED)
elog(WARNING, "YARN mode resource broker kill job failed.");
return FUNCTION_FAILED;
}
}
if (applicationReport->progress < 0.5)
{
if (retry > 0) {
usleep(HAWQ_YARN_AM_HEARTBEAT_INTERVAL*1000*1000L);
continue;
} else {
elog(WARNING, "YARN mode resource broker failed to register itself in Hadoop Yarn."
"Got progress:%f, and try to kill application from Hadoop Yarn",
applicationReport->progress);
result = forceKillJob(LIBYARNClient, YARNJobID);
if (result != FUNCTION_SUCCEEDED)
elog(WARNING, "YARN mode resource broker kill job failed.");
return FUNCTION_FAILED;
}
} else {
break;
}
}
ResBrokerStartTime = gettime_microsec();
elog(LOG, "YARN mode resource broker registered new "
"YARN application. Progress:%f, Start time stamp "UINT64_FORMAT,
applicationReport->progress, ResBrokerStartTime);
return yarnres;
}
/**
* Get YARN cluster report.
*/
int RB2YARN_getClusterReport(DQueue hosts)
{
int yarnres = FUNCTION_SUCCEEDED;
LibYarnNodeReport_t *nodeReportArray;
int nodeReportArraySize;
yarnres = getClusterNodes(LIBYARNClient,
NODE_STATE_RUNNING,
&nodeReportArray,
&nodeReportArraySize);
if ( yarnres != FUNCTION_SUCCEEDED )
{
elog(WARNING, "YARN mode resource broker failed to get cluster "
"information from YARN. %s",
getErrorMessage());
}
else
{
elog(LOG, "YARN mode resource broker got information of %d YARN cluster "
"nodes.",
nodeReportArraySize);
/* Build result. For LIBYARN implementation, we only get the host name. */
for ( int i = 0 ; i < nodeReportArraySize ; ++i )
{
SimpString ohostname;
LibYarnNodeReport_t *pnodereport = &(nodeReportArray[i]);
List *gottenaddr = NULL;
ListCell *addrcell = NULL;
AddressString addr = NULL;
uint16_t addroffset = 0;
uint16_t addrattr = HOST_ADDRESS_CONTENT_STRING;
int addrcount = 0;
SelfMaintainBufferData AddrAttribute;
SelfMaintainBufferData AddrContent;
initializeSelfMaintainBuffer(&AddrAttribute, PCONTEXT);
initializeSelfMaintainBuffer(&AddrContent, PCONTEXT);
/* Expect to get running nodes only. */
Assert( pnodereport->nodeState == NODE_STATE_RUNNING );
/* Get host addresses based on the */
initSimpleString(&ohostname, PCONTEXT);
getHostIPV4AddressesByHostNameAsString(PCONTEXT,
pnodereport->host,
&ohostname,
&gottenaddr);
freeSimpleStringContent(&ohostname);
addrcount = list_length(gottenaddr);
if ( addrcount > 0 )
{
/*
* Calculate the offset containing only SegInfo head and address
* attributes. All addresses are now passed by string format.
*/
addroffset = sizeof(SegInfoData) +
__SIZE_ALIGN64(sizeof(uint32_t) * addrcount);
/* Build address offset, attribute and content. */
foreach(addrcell, gottenaddr)
{
addr = (AddressString)(lfirst(addrcell));
elog(DEBUG3, "YARN mode resource broker gets host address %s.",
addr->Address);
/* Build address attributes */
appendSMBVar(&AddrAttribute, addroffset);
appendSMBVar(&AddrAttribute, addrattr);
/* Build address content */
appendSelfMaintainBuffer(&AddrContent,
(char *)addr,
ADDRESS_STRING_ALL_SIZE(addr));
appendSelfMaintainBufferTill64bitAligned(&AddrContent);
/* Adjust address offset data. */
addroffset = sizeof(SegInfoData) +
__SIZE_ALIGN64(sizeof(uint32_t) * addrcount) +
getSMBContentSize(&AddrContent);
}
appendSelfMaintainBufferTill64bitAligned(&AddrAttribute);
}
int hostnamelen = strlen(pnodereport->host);
int racknamelen = strlen(pnodereport->rackName);
int segsize = sizeof(SegInfoData) + /* machine id head */
getSMBContentSize(&AddrAttribute) + /* addr attribute size*/
getSMBContentSize(&AddrContent) + /* addr content size */
__SIZE_ALIGN64(hostnamelen+1) + /* host name */
__SIZE_ALIGN64(racknamelen+1); /* rack name */
SegStat segstat = (SegStat)rm_palloc0(PCONTEXT,
offsetof(SegStatData, Info) +
segsize);
segstat->ID = SEGSTAT_ID_INVALID;
segstat->FTSAvailable = RESOURCE_SEG_STATUS_UNSET;
segstat->GRMTotalMemoryMB = pnodereport->memoryCapability;
segstat->GRMTotalCore = pnodereport->vcoresCapability;
segstat->FTSTotalMemoryMB = 0;
segstat->FTSTotalCore = 0;
segstat->Info.HostAddrCount = addrcount;
segstat->Info.AddressAttributeOffset = sizeof(SegInfoData);
segstat->Info.AddressContentOffset = sizeof(SegInfoData) +
getSMBContentSize(&AddrAttribute);
segstat->Info.HostNameLen = 0;
segstat->Info.HostNameOffset = 0;
segstat->Info.GRMHostNameLen = hostnamelen;
segstat->Info.GRMHostNameOffset = sizeof(SegInfoData) +
getSMBContentSize(&AddrAttribute) +
getSMBContentSize(&AddrContent);
segstat->Info.GRMRackNameLen = racknamelen;
segstat->Info.GRMRackNameOffset = segstat->Info.GRMHostNameOffset +
__SIZE_ALIGN64(hostnamelen+1);
segstat->Info.FailedTmpDirOffset = 0;
segstat->Info.FailedTmpDirLen = 0;
segstat->Info.Size = segsize;
memcpy((char *)&(segstat->Info) + sizeof(SegInfoData),
AddrAttribute.Buffer,
getSMBContentSize(&AddrAttribute));
memcpy((char *)&(segstat->Info) + sizeof(SegInfoData) +
getSMBContentSize(&AddrAttribute),
AddrContent.Buffer,
getSMBContentSize(&AddrContent));
destroySelfMaintainBuffer(&AddrAttribute);
destroySelfMaintainBuffer(&AddrContent);
freeHostIPV4AddressesAsString(PCONTEXT, &gottenaddr);
Assert( gottenaddr == NULL );
strcpy(GET_SEGINFO_GRMHOSTNAME(&(segstat->Info)), pnodereport->host);
strcpy(GET_SEGINFO_GRMRACKNAME(&(segstat->Info)), pnodereport->rackName);
insertDQueueTailNode(hosts, segstat);
elog(RMLOG, "YARN mode resource broker got YARN cluster host \n"
"\thost:%s\n"
"\tport:%d\n"
"\thttpAddress:%s\n"
"\trackName:%s\n"
"\tmemoryUsed:%d\n"
"\tvcoresUsed:%d\n"
"\tmemoryCapability:%d\n"
"\tvcoresCapability:%d\n"
"\tnumContainers:%d\n"
"\tnodeState:%d\n"
"\thealthReport:%s\n"
"\tlastHealthReportTime:"INT64_FORMAT"\n"
"\tmachineidsize:%d",
pnodereport->host,
pnodereport->port,
pnodereport->httpAddress,
pnodereport->rackName,
pnodereport->memoryUsed,
pnodereport->vcoresUsed,
pnodereport->memoryCapability,
pnodereport->vcoresCapability,
pnodereport->numContainers,
pnodereport->nodeState,
pnodereport->healthReport,
pnodereport->lastHealthReportTime,
segstat->Info.Size);
elog(RMLOG, "YARN mode reosurce broker built cluster segment %s at rack %s",
GET_SEGINFO_GRMHOSTNAME(&(segstat->Info)),
GET_SEGINFO_GRMRACKNAME(&(segstat->Info)));
}
freeMemNodeReportArray(nodeReportArray, nodeReportArraySize);
}
return yarnres;
}
int RB2YARN_acquireResource(uint32_t memorymb,
uint32_t core,
uint32_t count,
LibYarnNodeInfo_t *preferredArray,
uint32_t preferredSize,
DQueue containerids,
DQueue containerhosts)
{
int yarnres = FUNCTION_SUCCEEDED;
char *blackListAdditions[0];
char *blackListRemovals[0];
LibYarnResource_t *allocatedResourcesArray = NULL;
int allocatedResourcesArraySize;
yarnres = allocateResources(LIBYARNClient,
YARNJobID,
1, /* priority */
core, /* core */
memorymb, /* memory */
count,
blackListAdditions,
0,
blackListRemovals,
0,
preferredArray,
preferredSize,
&allocatedResourcesArray,
&allocatedResourcesArraySize);
if( yarnres != FUNCTION_SUCCEEDED ) {
elog(WARNING, "YARN mode resource broker failed to allocate "
"containers from YARN. %s",
getErrorMessage());
return yarnres;
}
else if ( allocatedResourcesArraySize == 0 ) {
elog(LOG, "YARN mode resource broker temporarily cannot acquire "
"container from YARN.");
return yarnres;
}
elog(LOG, "YARN mode resource broker allocated %d containers from YARN.",
allocatedResourcesArraySize);
HASHTABLEData FailedIDIndex;
initializeHASHTABLE(&FailedIDIndex,
PCONTEXT,
HASHTABLE_SLOT_VOLUME_DEFAULT,
HASHTABLE_SLOT_VOLUME_DEFAULT_MAX,
HASHTABLE_KEYTYPE_CHARARRAY,
NULL);
/* Activate containers. */
int64_t activeContainerIds[allocatedResourcesArraySize];
for ( int i = 0 ; i < allocatedResourcesArraySize ; ++i ) {
activeContainerIds[i] = allocatedResourcesArray[i].containerId;
}
/* Return the containers fail to activate. */
int64_t *activeFailIds = NULL;
int activeFailSize = 0;
yarnres = activeResources(LIBYARNClient,
YARNJobID,
activeContainerIds,
allocatedResourcesArraySize);
if( yarnres != FUNCTION_SUCCEEDED ) {
elog(WARNING, "YARN mode resource broker failed to activate containers. %s",
getErrorMessage());
goto exit;
}
elog(LOG, "YARN mode resource broker submitted to activate %d containers.",
allocatedResourcesArraySize);
yarnres = getActiveFailContainerIds(LIBYARNClient,
&activeFailIds,
&activeFailSize);
if( yarnres != FUNCTION_SUCCEEDED ) {
elog(WARNING, "YARN mode resource broker failed to get active-fail "
"containers. %s",
getErrorMessage());
goto exit;
}
/* Build temporary failed container ids in hash table for fast retrieving.*/
if ( activeFailSize > 0 ) {
for (int i = 0 ; i < activeFailSize ; ++i) {
elog(WARNING, "YARN mode resource broker failed to activate "
"container "INT64_FORMAT,
activeFailIds[i]);
SimpArray key;
setSimpleArrayRef(&key, (void *)&(activeFailIds[i]), sizeof(int64_t));
setHASHTABLENode(&FailedIDIndex, &key, TYPCONVERT(void *, &activeFailIds[i]), false);
}
yarnres = releaseResources(LIBYARNClient,
YARNJobID,
activeFailIds,
activeFailSize);
if ( yarnres != FUNCTION_SUCCEEDED ) {
elog(WARNING, "YARN mode resource broker failed to return active-fail "
"containers. %s",
getErrorMessage());
}
}
/* Build result. */
for ( int i = 0 ; i < allocatedResourcesArraySize ; ++i )
{
int64_t *ctnid = (int64_t *)rm_palloc0(PCONTEXT, sizeof(int64_t));
*ctnid = allocatedResourcesArray[i].containerId;
insertDQueueTailNode(containerids, ctnid);
char *hostnamestr =
(char *)rm_palloc0(PCONTEXT,
strlen(allocatedResourcesArray[i].host) + 1);
strcpy(hostnamestr, allocatedResourcesArray[i].host);
insertDQueueTailNode(containerhosts, hostnamestr);
elog(RMLOG, "YARN mode resource broker allocated and activated container. "
"ID : "INT64_FORMAT"(%d MB, %d CORE) at %s.",
allocatedResourcesArray[i].containerId,
allocatedResourcesArray[i].memory,
allocatedResourcesArray[i].vCores,
allocatedResourcesArray[i].host);
}
exit:
/* Free allocated */
freeMemAllocatedResourcesArray(allocatedResourcesArray,
allocatedResourcesArraySize);
/* Free list of active fail id list. */
if ( activeFailIds != NULL ) {
free(activeFailIds);
}
clearHASHTABLE(&FailedIDIndex);
return FUNCTION_SUCCEEDED;
}
int RB2YARN_returnResource(int64_t *contids, int contcount)
{
if( contcount == 0 )
return FUNCTION_SUCCEEDED;
Assert(contids != NULL);
Assert(contcount > 0);
int yarnres = FUNCTION_SUCCEEDED;
yarnres = releaseResources(LIBYARNClient,
YARNJobID,
contids,
contcount);
if ( yarnres != FUNCTION_SUCCEEDED ) {
elog(WARNING, "YARN mode resource broker failed to return containers. %s",
getErrorMessage());
}
for ( int i = 0 ; i < contcount ; ++i ) {
elog(LOG, "YARN mode resource broker returned container of id "INT64_FORMAT,
contids[i]);
}
return yarnres;
}
int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size)
{
Assert( ctnstats != NULL );
int yarnres = FUNCTION_SUCCEEDED;
LibYarnContainerReport_t *ctnrparr = NULL;
int arrsize = 0;
LibYarnContainerStatus_t *ctnstatarr = NULL;
int ctnstatsize = 0;
int64_t *ctnidarr = NULL;
*ctnstats = NULL;
*size = 0;
yarnres = getContainerReports(LIBYARNClient, YARNJobID, &ctnrparr, &arrsize);
if ( yarnres != FUNCTION_SUCCEEDED )
{
elog(WARNING, "YARN mode resource broker failed to get container "
"report. %s",
getErrorMessage());
}
else if ( arrsize > 0 )
{
/*
* TODO:
* There is a problem here that container report does not get correct
* container status. The work round here is to call container status API
* to get final container statuses.
*/
ctnidarr = (int64_t *)rm_palloc(PCONTEXT, sizeof(int64_t) * arrsize);
for ( int i = 0 ; i < arrsize ; ++i )
{
ctnidarr[i] = ctnrparr[i].containerId;
}
yarnres = getContainerStatuses(LIBYARNClient,
YARNJobID,
ctnidarr,
arrsize,
&ctnstatarr,
&ctnstatsize);
if ( yarnres != FUNCTION_SUCCEEDED )
{
elog(WARNING, "YARN mode resource broker failed to get container "
"status. %s",
getErrorMessage());
}
rm_pfree(PCONTEXT, ctnidarr);
if( ctnstatsize > 0 )
{
*size = ctnstatsize;
*ctnstats = rm_palloc0(PCONTEXT,
sizeof(RB_GRMContainerStatData) * ctnstatsize);
for ( int i = 0 ; i < ctnstatsize ; ++i )
{
(*ctnstats)[i].ContainerID = ctnstatarr[i].containerId;
(*ctnstats)[i].isActive = ctnstatarr[i].state == C_RUNNING ? 1 : 0;
(*ctnstats)[i].isFound = 0;
}
}
freeContainerStatusArray(ctnstatarr, ctnstatsize);
}
if(ctnrparr != NULL && arrsize > 0)
freeContainerReportArray(ctnrparr, arrsize);
return yarnres;
}
void RB2YARN_freeContainersInMemory(DQueue containerids, DQueue containerhosts)
{
if ( containerids != NULL )
{
while( containerids->NodeCount > 0 )
{
int64_t *pctnid = removeDQueueHeadNode(containerids);
rm_pfree(PCONTEXT, pctnid);
}
cleanDQueue(containerids);
}
if ( containerhosts != NULL )
{
while( containerhosts->NodeCount > 0 )
{
char *hostname = removeDQueueHeadNode(containerhosts);
rm_pfree(PCONTEXT, hostname);
}
cleanDQueue(containerhosts);
}
}
const char *RB2YARN_getErrorMessage(void)
{
return getErrorMessage();
}
int RB2YARN_getQueueReport(char *queuename,
double *cap,
double *curcap,
double *maxcap,
bool *haschildren)
{
int yarnres = FUNCTION_SUCCEEDED;
LibYarnQueueInfo_t *queueInfo = NULL;
Assert( queuename != NULL );
yarnres = getQueueInfo(LIBYARNClient, queuename, true, true, true, &queueInfo);
if ( yarnres != FUNCTION_SUCCEEDED ) {
elog(WARNING, "YARN mode resource broker failed to get YARN queue report "
"of queue %s. %s",
queuename,
RB2YARN_getErrorMessage());
return yarnres;
}
Assert( queueInfo != NULL );
*cap = queueInfo->capacity;
*curcap = queueInfo->currentCapacity;
*maxcap = queueInfo->maximumCapacity;
*haschildren = queueInfo->childQueueNameArraySize > 0;
freeMemQueueInfo(queueInfo);
return FUNCTION_SUCCEEDED;
}
int RB2YARN_finishYARNApplication(void)
{
int yarnres = FUNCTION_SUCCEEDED;
yarnres = finishJob(LIBYARNClient, YARNJobID, APPLICATION_SUCCEEDED);
if (yarnres != FUNCTION_SUCCEEDED) {
elog(WARNING, "YARN mode resource broker failed to finish job in YARN. %s",
getErrorMessage());
}
else {
elog(LOG, "YARN mode resource broker finished job in YARN.");
}
return yarnres;
}
int RB2YARN_disconnectFromYARN(void)
{
if ( LIBYARNClient != NULL) {
deleteLibYarnClient(LIBYARNClient);
}
if ( YARNJobID != NULL ) {
free(YARNJobID);
}
LIBYARNClient = NULL;
YARNJobID = NULL;
return FUNCTION_SUCCEEDED;
}