blob: 45206bca9d0ff1cb5237ba883e6fd8b7c2662eac [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PTOperator.HostOperatorSet;
/**
* Handle mapping from physical plan locality groupings to resource allocation requests. Monitors available resources
* through node reports.
*
* @since 0.3.4
*/
public class ResourceRequestHandler
{
private static final Logger LOG = LoggerFactory.getLogger(ResourceRequestHandler.class);
private static final String INVALID_HOST = "INVALID_HOST";
protected static final int NUMBER_MISSED_HEARTBEATS = 30;
public ResourceRequestHandler()
{
super();
}
/**
* Issue requests to AM RM Client again if previous container requests expired and were not allocated by Yarn
* @param amRmClient
* @param requestedResources
* @param loopCounter
* @param resourceRequestor
* @param containerRequests
* @param removedContainerRequests
*/
public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
{
if (!requestedResources.isEmpty()) {
for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
/*
* Create container requests again if pending requests were not allocated by Yarn till timeout.
*/
if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
LOG.debug("Request for container {} timed out. Re-requesting container", csr.container);
removedContainerRequests.add(entry.getValue().getRight());
ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false);
entry.getValue().setLeft(loopCounter);
entry.getValue().setRight(cr);
containerRequests.add(cr);
}
}
}
}
/**
* Add container request to list of issued requests to Yarn along with current loop counter
* @param requestedResources
* @param loopCounter
* @param containerRequests
* @param csr
* @param cr
*/
public void addContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, List<ContainerRequest> containerRequests, StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr)
{
MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
requestedResources.put(csr, pair);
containerRequests.add(cr);
}
/**
* Setup the request(s) that will be sent to the RM for the container ask.
*/
public ContainerRequest createContainerRequest(ContainerStartRequest csr, boolean first)
{
int priority = csr.container.getResourceRequestPriority();
// check for node locality constraint
String[] nodes = null;
String[] racks = null;
String host = getHost(csr, first);
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(csr.container.getRequiredMemoryMB());
capability.setVirtualCores(csr.container.getRequiredVCores());
if (host == INVALID_HOST) {
return null;
}
if (host != null) {
nodes = new String[]{host};
// in order to request a host, we don't have to set the rack if the locality is false
/*
* if(this.nodeToRack.get(host) != null){ racks = new String[] { this.nodeToRack.get(host) }; }
*/
return new ContainerRequest(capability, nodes, racks, Priority.newInstance(priority), false);
}
// For now, only memory is supported so we set memory requirements
return new ContainerRequest(capability, nodes, racks, Priority.newInstance(priority));
}
private final Map<String, NodeReport> nodeReportMap = Maps.newHashMap();
private final Map<Set<PTOperator>, String> nodeLocalMapping = Maps.newHashMap();
private final Map<String, String> nodeToRack = Maps.newHashMap();
private final Map<PTContainer, String> antiAffinityMapping = Maps.newHashMap();
public void clearNodeMapping()
{
nodeLocalMapping.clear();
}
/**
* Tracks update to available resources. Resource availability is used to make decisions about where to request new
* containers.
*
* @param nodeReports
*/
public void updateNodeReports(List<NodeReport> nodeReports)
{
for (NodeReport nr : nodeReports) {
LOG.debug("Node report: rackName={}, nodeid={}, numContainers={}, capability={}, used={}, state={}", nr.getRackName(), nr.getNodeId(), nr.getNumContainers(), nr.getCapability(), nr.getUsed(), nr.getNodeState());
nodeReportMap.put(nr.getNodeId().getHost(), nr);
nodeToRack.put(nr.getNodeId().getHost(), nr.getRackName());
}
}
public List<String> getNodesExceptHost(List<String> hostNames)
{
List<String> nodesList = new ArrayList<>();
Set<String> hostNameSet = Sets.newHashSet();
hostNameSet.addAll(hostNames);
for (String host : nodeReportMap.keySet()) {
// Split node name and port
String[] parts = host.split(":");
if (parts.length > 0) {
if (hostNameSet.contains(parts[0]) || hostNameSet.contains(host)) {
continue;
}
nodesList.add(parts[0]);
}
}
return nodesList;
}
public String getHost(ContainerStartRequest csr, boolean first)
{
String host = null;
PTContainer c = csr.container;
if (first) {
for (PTOperator oper : c.getOperators()) {
HostOperatorSet grpObj = oper.getNodeLocalOperators();
host = nodeLocalMapping.get(grpObj.getOperatorSet());
if (host != null) {
antiAffinityMapping.put(c, host);
return host;
}
if (grpObj.getHost() != null) {
host = grpObj.getHost();
// using the 1st host value as host for container
break;
}
}
if (host != null && nodeReportMap.get(host) != null) {
for (PTOperator oper : c.getOperators()) {
HostOperatorSet grpObj = oper.getNodeLocalOperators();
Set<PTOperator> nodeLocalSet = grpObj.getOperatorSet();
NodeReport report = nodeReportMap.get(host);
int aggrMemory = c.getRequiredMemoryMB();
int vCores = c.getRequiredVCores();
Set<PTContainer> containers = Sets.newHashSet();
containers.add(c);
for (PTOperator nodeLocalOper : nodeLocalSet) {
if (!containers.contains(nodeLocalOper.getContainer())) {
aggrMemory += nodeLocalOper.getContainer().getRequiredMemoryMB();
vCores += nodeLocalOper.getContainer().getRequiredVCores();
containers.add(nodeLocalOper.getContainer());
}
}
int memAvailable = report.getCapability().getMemory() - report.getUsed().getMemory();
int vCoresAvailable = report.getCapability().getVirtualCores() - report.getUsed().getVirtualCores();
if (memAvailable >= aggrMemory && vCoresAvailable >= vCores) {
nodeLocalMapping.put(nodeLocalSet, host);
antiAffinityMapping.put(c, host);
return host;
}
}
}
}
// the host requested didn't have the resources so looking for other hosts
host = null;
List<String> antiHosts = new ArrayList<>();
List<String> antiPreferredHosts = new ArrayList<>();
if (!c.getStrictAntiPrefs().isEmpty()) {
// Check if containers are allocated already for the anti-affinity containers
populateAntiHostList(c, antiHosts);
}
if (!c.getPreferredAntiPrefs().isEmpty()) {
populateAntiHostList(c, antiPreferredHosts);
}
LOG.info("Strict anti-affinity = {} for container with operators {}", antiHosts, StringUtils.join(c.getOperators(), ","));
for (PTOperator oper : c.getOperators()) {
HostOperatorSet grpObj = oper.getNodeLocalOperators();
Set<PTOperator> nodeLocalSet = grpObj.getOperatorSet();
if (nodeLocalSet.size() > 1 || !c.getStrictAntiPrefs().isEmpty() || !c.getPreferredAntiPrefs().isEmpty()) {
LOG.info("Finding new host for {}", nodeLocalSet);
int aggrMemory = c.getRequiredMemoryMB();
int vCores = c.getRequiredVCores();
Set<PTContainer> containers = Sets.newHashSet();
containers.add(c);
// aggregate memory required for all containers
for (PTOperator nodeLocalOper : nodeLocalSet) {
if (!containers.contains(nodeLocalOper.getContainer())) {
aggrMemory += nodeLocalOper.getContainer().getRequiredMemoryMB();
vCores += nodeLocalOper.getContainer().getRequiredVCores();
containers.add(nodeLocalOper.getContainer());
}
}
host = assignHost(host, antiHosts, antiPreferredHosts, grpObj, nodeLocalSet, aggrMemory, vCores);
if (host == null && !antiPreferredHosts.isEmpty() && !antiHosts.isEmpty()) {
// Drop the preferred constraint and try allocation
antiPreferredHosts.clear();
host = assignHost(host, antiHosts, antiPreferredHosts, grpObj, nodeLocalSet, aggrMemory, vCores);
}
if (host != null) {
antiAffinityMapping.put(c, host);
} else {
host = INVALID_HOST;
}
}
}
LOG.info("Found host {}", host);
return host;
}
/**
* Populate list of nodes where container cannot be allocated due to anti-affinity constraints
* @param c container
* @param antiHosts List of nodes where container cannot be allocated
*/
public void populateAntiHostList(PTContainer c, List<String> antiHosts)
{
for (PTContainer container : c.getStrictAntiPrefs()) {
if (antiAffinityMapping.containsKey(container)) {
antiHosts.add(antiAffinityMapping.get(container));
} else {
// Check if there is an anti-affinity with host locality
String antiHost = getHostForContainer(container);
if (antiHost != null) {
antiHosts.add(antiHost);
}
}
}
}
/**
* Get host name where container would be allocated give node local constraints
* @param container
* @return
*/
public String getHostForContainer(PTContainer container)
{
for (PTOperator oper : container.getOperators()) {
HostOperatorSet grpObj = oper.getNodeLocalOperators();
String host = nodeLocalMapping.get(grpObj.getOperatorSet());
if (host != null) {
return host;
}
if (grpObj.getHost() != null) {
host = grpObj.getHost();
return host;
}
}
return null;
}
/**
* Assign host to container given affinity and anti-affinity constraints and resource availibility on node
* @param host
* @param antiHosts
* @param antiPreferredHosts
* @param grpObj
* @param nodeLocalSet
* @param aggrMemory
* @param vCores
* @return
*/
public String assignHost(String host, List<String> antiHosts, List<String> antiPreferredHosts, HostOperatorSet grpObj, Set<PTOperator> nodeLocalSet, int aggrMemory, int vCores)
{
for (Map.Entry<String, NodeReport> nodeEntry : nodeReportMap.entrySet()) {
if (nodeEntry.getValue().getNodeState() == NodeState.RUNNING) {
int memAvailable = nodeEntry.getValue().getCapability().getMemory() - nodeEntry.getValue().getUsed().getMemory();
int vCoresAvailable = nodeEntry.getValue().getCapability().getVirtualCores() - nodeEntry.getValue().getUsed().getVirtualCores();
if (memAvailable >= aggrMemory && vCoresAvailable >= vCores && !antiHosts.contains(nodeEntry.getKey()) && !antiPreferredHosts.contains(nodeEntry.getKey())) {
host = nodeEntry.getKey();
grpObj.setHost(host);
nodeLocalMapping.put(nodeLocalSet, host);
return host;
}
}
}
return null;
}
}