blob: 246d450668d06973712d5e393429ce59cf4e66a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.Allocation;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.EnrichedResourceRequest;
/**
* This encapsulates application specific information used by the
* Opportunistic Container Allocator to allocate containers.
*/
public class OpportunisticContainerContext {
private static final Logger LOG = LoggerFactory
.getLogger(OpportunisticContainerContext.class);
private AllocationParams appParams =
new AllocationParams();
private ContainerIdGenerator containerIdGenerator =
new ContainerIdGenerator();
private volatile List<RemoteNode> nodeList = new LinkedList<>();
private final LinkedHashMap<String, RemoteNode> nodeMap =
new LinkedHashMap<>();
private final Set<String> blacklist = new HashSet<>();
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
// Resource Name (host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask).
private final TreeMap
<SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>>
outstandingOpReqs = new TreeMap<>();
public AllocationParams getAppParams() {
return appParams;
}
public ContainerIdGenerator getContainerIdGenerator() {
return containerIdGenerator;
}
public void setContainerIdGenerator(
ContainerIdGenerator containerIdGenerator) {
this.containerIdGenerator = containerIdGenerator;
}
public Map<String, RemoteNode> getNodeMap() {
return Collections.unmodifiableMap(nodeMap);
}
public synchronized void updateNodeList(List<RemoteNode> newNodeList) {
// This is an optimization for centralized placement. The
// OppContainerAllocatorAMService has a cached list of nodes which it sets
// here. The nodeMap needs to be updated only if the backing node list is
// modified.
if (newNodeList != nodeList) {
nodeList = newNodeList;
nodeMap.clear();
for (RemoteNode n : nodeList) {
nodeMap.put(n.getNodeId().getHost(), n);
}
}
}
public void updateAllocationParams(Resource minResource, Resource maxResource,
Resource incrResource, int containerTokenExpiryInterval) {
appParams.setMinResource(minResource);
appParams.setMaxResource(maxResource);
appParams.setIncrementResource(incrResource);
appParams.setContainerTokenExpiryInterval(containerTokenExpiryInterval);
}
public Set<String> getBlacklist() {
return blacklist;
}
public TreeMap<SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>>
getOutstandingOpReqs() {
return outstandingOpReqs;
}
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds to the outstanding
* OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
* the current YARN constraint that only a single ResourceRequest can exist at
* a give Priority and Capability.
*
* @param resourceAsks the list with the {@link ResourceRequest}s
*/
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
for (ResourceRequest request : resourceAsks) {
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
Map<Resource, EnrichedResourceRequest> reqMap =
outstandingOpReqs.get(schedulerKey);
if (reqMap == null) {
reqMap = new HashMap<>();
outstandingOpReqs.put(schedulerKey, reqMap);
}
EnrichedResourceRequest eReq = reqMap.get(request.getCapability());
if (eReq == null) {
eReq = new EnrichedResourceRequest(request);
reqMap.put(request.getCapability(), eReq);
}
// Set numContainers only for ANY request
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
eReq.getRequest().setResourceName(ResourceRequest.ANY);
eReq.getRequest().setNumContainers(request.getNumContainers());
} else {
eReq.addLocation(request.getResourceName(), request.getNumContainers());
}
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
LOG.info("# of outstandingOpReqs in ANY (at "
+ "priority = " + schedulerKey.getPriority()
+ ", allocationReqId = " + schedulerKey.getAllocationRequestId()
+ ", with capability = " + request.getCapability() + " ) : "
+ ", with location = " + request.getResourceName() + " ) : "
+ ", numContainers = " + eReq.getRequest().getNumContainers());
}
}
}
/**
* This method matches a returned list of Container Allocations to any
* outstanding OPPORTUNISTIC ResourceRequest.
* @param capability Capability
* @param allocations Allocations.
*/
public void matchAllocationToOutstandingRequest(Resource capability,
List<Allocation> allocations) {
for (OpportunisticContainerAllocator.Allocation allocation : allocations) {
SchedulerRequestKey schedulerKey =
SchedulerRequestKey.extractFrom(allocation.getContainer());
Map<Resource, EnrichedResourceRequest> asks =
outstandingOpReqs.get(schedulerKey);
if (asks == null) {
continue;
}
EnrichedResourceRequest err = asks.get(capability);
if (err != null) {
int numContainers = err.getRequest().getNumContainers();
numContainers--;
err.getRequest().setNumContainers(numContainers);
if (numContainers == 0) {
asks.remove(capability);
if (asks.size() == 0) {
outstandingOpReqs.remove(schedulerKey);
}
} else {
if (!ResourceRequest.isAnyLocation(allocation.getResourceName())) {
err.removeLocation(allocation.getResourceName());
}
}
}
}
}
}