blob: 455f15645bad75a78abb51c36357137afbde05ca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PTOperator.HostOperatorSet;
/**
* Handle mapping from physical plan locality groupings to resource allocation requests. Monitors available resources
* through node reports.
*
* @since 0.3.4
*/
public class ResourceRequestHandler
{
private final static Logger LOG = LoggerFactory.getLogger(ResourceRequestHandler.class);
public ResourceRequestHandler()
{
super();
}
/**
* Setup the request(s) that will be sent to the RM for the container ask.
*/
public ContainerRequest createContainerRequest(ContainerStartRequest csr, boolean first)
{
int priority = csr.container.getResourceRequestPriority();
// check for node locality constraint
String[] nodes = null;
String[] racks = null;
String host = getHost(csr, first);
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(csr.container.getRequiredMemoryMB());
capability.setVirtualCores(csr.container.getRequiredVCores());
if (host != null) {
nodes = new String[]{host};
// in order to request a host, we don't have to set the rack if the locality is false
/*
* if(this.nodeToRack.get(host) != null){ racks = new String[] { this.nodeToRack.get(host) }; }
*/
return new ContainerRequest(capability, nodes, racks, Priority.newInstance(priority), false);
}
// For now, only memory is supported so we set memory requirements
return new ContainerRequest(capability, nodes, racks, Priority.newInstance(priority));
}
private final Map<String, NodeReport> nodeReportMap = Maps.newHashMap();
private final Map<Set<PTOperator>, String> nodeLocalMapping = Maps.newHashMap();
private final Map<String, String> nodeToRack = Maps.newHashMap();
public void clearNodeMapping()
{
nodeLocalMapping.clear();
}
/**
* Tracks update to available resources. Resource availability is used to make decisions about where to request new
* containers.
*
* @param nodeReports
*/
public void updateNodeReports(List<NodeReport> nodeReports)
{
// LOG.debug("Got {} updated node reports.", nodeReports.size());
for (NodeReport nr : nodeReports) {
StringBuilder sb = new StringBuilder();
sb.append("rackName=").append(nr.getRackName()).append(",nodeid=").append(nr.getNodeId()).append(",numContainers=").append(nr.getNumContainers()).append(",capability=").append(nr.getCapability()).append("used=").append(nr.getUsed()).append("state=").append(nr.getNodeState());
LOG.info("Node report: " + sb);
nodeReportMap.put(nr.getNodeId().getHost(), nr);
nodeToRack.put(nr.getNodeId().getHost(), nr.getRackName());
}
}
public String getHost(ContainerStartRequest csr, boolean first)
{
String host = null;
PTContainer c = csr.container;
if (first) {
for (PTOperator oper : c.getOperators()) {
HostOperatorSet grpObj = oper.getNodeLocalOperators();
host = nodeLocalMapping.get(grpObj.getOperatorSet());
if (host != null) {
return host;
}
if (grpObj.getHost() != null) {
host = grpObj.getHost();
// using the 1st host value as host for container
break;
}
}
if (host != null && nodeReportMap.get(host) != null) {
for (PTOperator oper : c.getOperators()) {
HostOperatorSet grpObj = oper.getNodeLocalOperators();
Set<PTOperator> nodeLocalSet = grpObj.getOperatorSet();
NodeReport report = nodeReportMap.get(host);
int aggrMemory = c.getRequiredMemoryMB();
int vCores = c.getRequiredVCores();
Set<PTContainer> containers = Sets.newHashSet();
containers.add(c);
for (PTOperator nodeLocalOper : nodeLocalSet) {
if (!containers.contains(nodeLocalOper.getContainer())) {
aggrMemory += nodeLocalOper.getContainer().getRequiredMemoryMB();
vCores += nodeLocalOper.getContainer().getRequiredVCores();
containers.add(nodeLocalOper.getContainer());
}
}
int memAvailable = report.getCapability().getMemory() - report.getUsed().getMemory();
int vCoresAvailable = report.getCapability().getVirtualCores() - report.getUsed().getVirtualCores();
if (memAvailable >= aggrMemory && vCoresAvailable >= vCores) {
nodeLocalMapping.put(nodeLocalSet, host);
return host;
}
}
}
}
// the host requested didn't have the resources so looking for other hosts
host = null;
for (PTOperator oper : c.getOperators()) {
HostOperatorSet grpObj = oper.getNodeLocalOperators();
Set<PTOperator> nodeLocalSet = grpObj.getOperatorSet();
if (nodeLocalSet.size() > 1) {
LOG.debug("Finding new host for {}", nodeLocalSet);
int aggrMemory = c.getRequiredMemoryMB();
int vCores = c.getRequiredVCores();
Set<PTContainer> containers = Sets.newHashSet();
containers.add(c);
// aggregate memory required for all containers
for (PTOperator nodeLocalOper : nodeLocalSet) {
if (!containers.contains(nodeLocalOper.getContainer())) {
aggrMemory += nodeLocalOper.getContainer().getRequiredMemoryMB();
vCores += nodeLocalOper.getContainer().getRequiredVCores();
containers.add(nodeLocalOper.getContainer());
}
}
for (Map.Entry<String, NodeReport> nodeEntry : nodeReportMap.entrySet()) {
int memAvailable = nodeEntry.getValue().getCapability().getMemory() - nodeEntry.getValue().getUsed().getMemory();
int vCoresAvailable = nodeEntry.getValue().getCapability().getVirtualCores() - nodeEntry.getValue().getUsed().getVirtualCores();
if (memAvailable >= aggrMemory && vCoresAvailable >= vCores) {
host = nodeEntry.getKey();
grpObj.setHost(host);
nodeLocalMapping.put(nodeLocalSet, host);
return host;
}
}
}
}
return host;
}
}