blob: 58688f76f002a9dc96ad11a76dc93d19e7ff84a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "envswitch.h"
#include "resourcebroker/resourcebroker_API.h"
#include "resourcebroker/resourcebroker_NONE.h"
#include "resourcebroker/resourcebroker_LIBYARN.h"
bool ResourceManagerIsForked;
RB_FunctionEntriesData CurrentRBImp;
List *PreferedHostsForGRMContainers;
void RB_prepareImplementation(enum RB_IMP_TYPE imptype)
{
/* Initialize resource broker implement handlers. */
CurrentRBImp.acquireResource = NULL;
CurrentRBImp.freeClusterReport = NULL;
CurrentRBImp.getClusterReport = NULL;
CurrentRBImp.handleNotification = NULL;
CurrentRBImp.handleSigSIGCHLD = NULL;
CurrentRBImp.returnResource = NULL;
CurrentRBImp.start = NULL;
CurrentRBImp.stop = NULL;
PreferedHostsForGRMContainers = NULL;
switch(imptype) {
case NONE_HAWQ2:
RB_NONE_createEntries(&CurrentRBImp);
break;
case YARN_LIBYARN:
RB_LIBYARN_createEntries(&CurrentRBImp);
break;
default:
Assert(false);
}
}
/* Start resource broker service. */
int RB_start(bool isforked)
{
return CurrentRBImp.start != NULL ? CurrentRBImp.start(isforked) : FUNC_RETURN_OK;
}
/* Stop resource broker service. */
int RB_stop(void)
{
return CurrentRBImp.stop != NULL ? CurrentRBImp.stop() : FUNC_RETURN_OK;
}
int RB_getClusterReport(const char *queuename, List **machines, double *maxcapacity)
{
if ( CurrentRBImp.getClusterReport != NULL )
{
return CurrentRBImp.getClusterReport(queuename, machines, maxcapacity);
}
*maxcapacity = 1;
PRESPOOL->RBClusterReportCounter++;
return FUNC_RETURN_OK;
}
/* Acquire and return resource. */
int RB_acquireResource(uint32_t memorymb, uint32_t core, List *preferred)
{
Assert(CurrentRBImp.acquireResource != NULL);
int res = CurrentRBImp.acquireResource(memorymb, core, preferred);
/*--------------------------------------------------------------------------
* We hold preferred host list here, when resource broker gets the resource
* allocation result, this is the reference for updating the counter in each
* segment for failing of getting GRM containers from GRM.
*--------------------------------------------------------------------------
*/
if ( PreferedHostsForGRMContainers != NULL )
{
RB_freePreferedHostsForGRMContainers();
}
PreferedHostsForGRMContainers = preferred;
return res;
}
int RB_returnResource(List **containers)
{
Assert(CurrentRBImp.returnResource != NULL);
return CurrentRBImp.returnResource(containers);
}
int RB_getContainerReport(List **ctnstat)
{
if( CurrentRBImp.getContainerReport != NULL )
{
return CurrentRBImp.getContainerReport(ctnstat);
}
return FUNC_RETURN_OK;
}
int RB_handleNotifications(void)
{
return CurrentRBImp.handleNotification != NULL ?
CurrentRBImp.handleNotification() :
FUNC_RETURN_OK;
}
void RB_handleSignalSIGCHLD(void)
{
if (CurrentRBImp.handleSigSIGCHLD != NULL)
{
CurrentRBImp.handleSigSIGCHLD();
}
}
void RB_handleError(int errorcode)
{
Assert(CurrentRBImp.handleError != NULL);
CurrentRBImp.handleError(errorcode);
}
bool isCleanGRMResourceStatus(void)
{
Assert(DRMGlobalInstance != NULL);
return DRMGlobalInstance->ResBrokerTriggerCleanup == true;
}
void setCleanGRMResourceStatus(void)
{
Assert(DRMGlobalInstance != NULL);
DRMGlobalInstance->ResBrokerTriggerCleanup = true;
elog(WARNING, "Resource manager went into cleanup phase due to error from "
"resource broker.");
}
void unsetCleanGRMResourceStatus(void)
{
Assert(DRMGlobalInstance != NULL);
DRMGlobalInstance->ResBrokerTriggerCleanup = false;
elog(WARNING, "Resource manager left cleanup phase.");
}
void RB_clearResource(List **ctnl)
{
while( (*ctnl) != NULL )
{
GRMContainer ctn = (GRMContainer)lfirst(list_head(*ctnl));
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
(*ctnl) = list_delete_first(*ctnl);
MEMORY_CONTEXT_SWITCH_BACK
elog(LOG, "Resource broker dropped GRM container "INT64_FORMAT
"(%d MB, %d CORE) on host %s",
ctn->ID,
ctn->MemoryMB,
ctn->Core,
ctn->HostName == NULL ? "NULL" : ctn->HostName);
if ( ctn->CalcDecPending )
{
minusResourceBundleData(&(ctn->Resource->DecPending),
ctn->MemoryMB,
ctn->Core);
Assert( ctn->Resource->DecPending.Core >= 0 );
Assert( ctn->Resource->DecPending.MemoryMB >= 0 );
}
/* Destroy resource container. */
freeGRMContainer(ctn);
PRESPOOL->RetPendingContainerCount--;
}
}
void RB_freePreferedHostsForGRMContainers(void)
{
ListCell *cell = NULL;
foreach(cell, PreferedHostsForGRMContainers)
{
PAIR pair = (PAIR)lfirst(cell);
rm_pfree(PCONTEXT, pair->Value);
rm_pfree(PCONTEXT, pair);
}
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
list_free(PreferedHostsForGRMContainers);
MEMORY_CONTEXT_SWITCH_BACK
PreferedHostsForGRMContainers = NULL;
}
void RB_updateSegmentsHavingNoExpectedGRMContainers(HASHTABLE segments)
{
ListCell *cell = NULL;
foreach(cell, PreferedHostsForGRMContainers)
{
PAIR pair = (PAIR)lfirst(cell);
SegResource segres = (SegResource)(pair->Key);
ResourceBundle expres = (ResourceBundle)(pair->Value);
bool failed = false;
/* Check if the segment exists in the hash table. */
PAIR pair2 = getHASHTABLENode(segments, segres);
if ( pair2 == NULL )
{
elog(RMLOG, "Resource manager finds segment %s has no resource "
"container allocated from global resource manager, "
"expected resource quota (%d MB, %lf CORE)",
GET_SEGRESOURCE_HOSTNAME(segres),
expres->MemoryMB,
expres->Core);
failed = true;
}
else
{
ResourceBundle allocres = (ResourceBundle)(pair->Value);
if ( allocres->MemoryMB < expres->MemoryMB )
{
elog(RMLOG, "Resource manager finds segment %s hasn't sufficient "
"resource containers allocated from global resource "
"manager, expected resource quota (%d MB, %lf CORE), "
"actual allocated resource (%d MB, %lf CORE)",
GET_SEGRESOURCE_HOSTNAME(segres),
expres->MemoryMB,
expres->Core,
allocres->MemoryMB,
allocres->Core);
failed = true;
}
}
if ( failed )
{
segres->GRMContainerFailAllocCount++;
elog(WARNING, "Resource manager detects segment %s hasn't gotten "
"expected quantity of global resource containers for "
"%d times.",
GET_SEGRESOURCE_HOSTNAME(segres),
segres->GRMContainerFailAllocCount);
}
else
{
segres->GRMContainerFailAllocCount = 0;
}
}
}