blob: aa4b28e8111a4f2ce3230d22b4cbd34b0574b1b5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <p>
* The CentralizedOpportunisticContainerAllocator allocates opportunistic
* containers by considering all the nodes present in the cluster, after
* modifying the container sizes to respect the limits set by the
* ResourceManager. It tries to distribute the containers as evenly as
* possible.
* </p>
*/
public class CentralizedOpportunisticContainerAllocator extends
OpportunisticContainerAllocator {
private static final Logger LOG =
LoggerFactory.getLogger(CentralizedOpportunisticContainerAllocator.class);
private NodeQueueLoadMonitor nodeQueueLoadMonitor;
private OpportunisticSchedulerMetrics metrics =
OpportunisticSchedulerMetrics.getMetrics();
/**
* Create a new Centralized Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager
*/
public CentralizedOpportunisticContainerAllocator(
BaseContainerTokenSecretManager tokenSecretManager) {
super(tokenSecretManager);
}
/**
* Create a new Centralized Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager
* @param maxAllocationsPerAMHeartbeat max number of containers to be
* allocated in one AM heartbeat
*/
public CentralizedOpportunisticContainerAllocator(
BaseContainerTokenSecretManager tokenSecretManager,
int maxAllocationsPerAMHeartbeat,
NodeQueueLoadMonitor nodeQueueLoadMonitor) {
super(tokenSecretManager, maxAllocationsPerAMHeartbeat);
this.nodeQueueLoadMonitor = nodeQueueLoadMonitor;
}
@VisibleForTesting
void setNodeQueueLoadMonitor(NodeQueueLoadMonitor nodeQueueLoadMonitor) {
this.nodeQueueLoadMonitor = nodeQueueLoadMonitor;
}
@Override
public List<Container> allocateContainers(
ResourceBlacklistRequest blackList, List<ResourceRequest> oppResourceReqs,
ApplicationAttemptId applicationAttemptId,
OpportunisticContainerContext opportContext, long rmIdentifier,
String appSubmitter) throws YarnException {
updateBlacklist(blackList, opportContext);
// Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(oppResourceReqs);
Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
List<Container> allocatedContainers = new ArrayList<>();
int maxAllocationsPerAMHeartbeat = getMaxAllocationsPerAMHeartbeat();
List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();
for (SchedulerRequestKey schedulerKey :
opportContext.getOutstandingOpReqs().descendingKeySet()) {
// Allocated containers :
// Key = Requested Capability,
// Value = List of Containers of given cap (the actual container size
// might be different than what is requested, which is why
// we need the requested capability (key) to match against
// the outstanding reqs)
int remAllocs = -1;
if (maxAllocationsPerAMHeartbeat > 0) {
remAllocs =
maxAllocationsPerAMHeartbeat - getTotalAllocations(allocations);
if (remAllocs <= 0) {
LOG.info("Not allocating more containers as we have reached max "
+ "allocations per AM heartbeat {}",
maxAllocationsPerAMHeartbeat);
break;
}
}
Map<Resource, List<Allocation>> allocation = allocatePerSchedulerKey(
rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
appSubmitter, nodeBlackList, remAllocs);
if (allocation.size() > 0) {
allocations.add(allocation);
}
}
matchAllocation(allocations, allocatedContainers, opportContext);
return allocatedContainers;
}
private Map<Resource, List<Allocation>> allocatePerSchedulerKey(
long rmIdentifier, OpportunisticContainerContext appContext,
SchedulerRequestKey schedKey, ApplicationAttemptId appAttId,
String userName, Set<String> blackList, int maxAllocations)
throws YarnException {
Map<Resource, List<Allocation>> allocations = new HashMap<>();
int totalAllocated = 0;
for (EnrichedResourceRequest enrichedAsk :
appContext.getOutstandingOpReqs().get(schedKey).values()) {
int remainingAllocs = -1;
if (maxAllocations > 0) {
remainingAllocs = maxAllocations - totalAllocated;
if (remainingAllocs <= 0) {
LOG.info("Not allocating more containers as max allocations per AM "
+ "heartbeat {} has reached", getMaxAllocationsPerAMHeartbeat());
break;
}
}
totalAllocated += allocateContainersPerRequest(rmIdentifier,
appContext.getAppParams(),
appContext.getContainerIdGenerator(), blackList,
appAttId, userName, allocations, enrichedAsk,
remainingAllocs);
ResourceRequest anyAsk = enrichedAsk.getRequest();
if (!allocations.isEmpty()) {
LOG.info("Opportunistic allocation requested for [priority={}, "
+ "allocationRequestId={}, num_containers={}, capability={}] "
+ "allocated = {}", anyAsk.getPriority(),
anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(),
anyAsk.getCapability(), allocations.keySet());
}
}
return allocations;
}
@SuppressWarnings("checkstyle:parameternumber")
private int allocateContainersPerRequest(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
Set<String> blacklist,
ApplicationAttemptId id,
String userName, Map<Resource, List<Allocation>> allocations,
EnrichedResourceRequest enrichedAsk, int maxAllocations)
throws YarnException {
ResourceRequest anyAsk = enrichedAsk.getRequest();
int totalAllocated = 0;
int maxToAllocate = anyAsk.getNumContainers()
- (allocations.isEmpty() ? 0 :
allocations.get(anyAsk.getCapability()).size());
if (maxAllocations >= 0) {
maxToAllocate = Math.min(maxAllocations, maxToAllocate);
}
// allocate node local
if (maxToAllocate > 0) {
Map<String, AtomicInteger> nodeLocations = enrichedAsk.getNodeMap();
for (Map.Entry<String, AtomicInteger> nodeLocation :
nodeLocations.entrySet()) {
int numContainers = nodeLocation.getValue().get();
numContainers = Math.min(numContainers, maxToAllocate);
List<Container> allocatedContainers =
allocateNodeLocal(enrichedAsk, nodeLocation.getKey(),
numContainers, rmIdentifier, appParams, idCounter, blacklist,
id, userName, allocations);
totalAllocated += allocatedContainers.size();
maxToAllocate -= allocatedContainers.size();
// no more containers to allocate
if (maxToAllocate <= 0) {
break;
}
}
}
// if still left, allocate rack local
if (maxToAllocate > 0) {
Map<String, AtomicInteger> rackLocations = enrichedAsk.getRackMap();
for (Map.Entry<String, AtomicInteger> rack : rackLocations.entrySet()) {
int numContainers = rack.getValue().get();
numContainers = Math.min(numContainers, maxToAllocate);
List<Container> allocatedContainers =
allocateRackLocal(enrichedAsk, rack.getKey(), numContainers,
rmIdentifier, appParams, idCounter, blacklist, id,
userName, allocations);
totalAllocated += allocatedContainers.size();
maxToAllocate -= allocatedContainers.size();
// no more containers to allocate
if (maxToAllocate <= 0) {
break;
}
}
}
// if still left, try on ANY
if (maxToAllocate > 0) {
List<Container> allocatedContainers = allocateAny(enrichedAsk,
maxToAllocate, rmIdentifier, appParams, idCounter, blacklist,
id, userName, allocations);
totalAllocated += allocatedContainers.size();
}
return totalAllocated;
}
@SuppressWarnings("checkstyle:parameternumber")
private List<Container> allocateNodeLocal(
EnrichedResourceRequest enrichedAsk,
String nodeLocation,
int toAllocate, long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
Set<String> blacklist,
ApplicationAttemptId id,
String userName, Map<Resource, List<Allocation>> allocations)
throws YarnException {
List<Container> allocatedContainers = new ArrayList<>();
while (toAllocate > 0) {
RMNode node = nodeQueueLoadMonitor.selectLocalNode(nodeLocation,
blacklist);
if (node != null) {
toAllocate--;
Container container = createContainer(rmIdentifier, appParams,
idCounter, id, userName, allocations, nodeLocation,
enrichedAsk.getRequest(), convertToRemoteNode(node));
allocatedContainers.add(container);
LOG.info("Allocated [{}] as opportunistic at location [{}]",
container.getId(), nodeLocation);
metrics.incrNodeLocalOppContainers();
} else {
// we couldn't allocate any - break the loop.
break;
}
}
return allocatedContainers;
}
@SuppressWarnings("checkstyle:parameternumber")
private List<Container> allocateRackLocal(EnrichedResourceRequest enrichedAsk,
String rackLocation, int toAllocate, long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
Set<String> blacklist,
ApplicationAttemptId id,
String userName, Map<Resource, List<Allocation>> allocations)
throws YarnException {
List<Container> allocatedContainers = new ArrayList<>();
while (toAllocate > 0) {
RMNode node = nodeQueueLoadMonitor.selectRackLocalNode(rackLocation,
blacklist);
if (node != null) {
toAllocate--;
Container container = createContainer(rmIdentifier, appParams,
idCounter, id, userName, allocations, rackLocation,
enrichedAsk.getRequest(), convertToRemoteNode(node));
allocatedContainers.add(container);
metrics.incrRackLocalOppContainers();
LOG.info("Allocated [{}] as opportunistic at location [{}]",
container.getId(), rackLocation);
} else {
// we couldn't allocate any - break the loop.
break;
}
}
return allocatedContainers;
}
@SuppressWarnings("checkstyle:parameternumber")
private List<Container> allocateAny(EnrichedResourceRequest enrichedAsk,
int toAllocate, long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
Set<String> blacklist,
ApplicationAttemptId id,
String userName, Map<Resource, List<Allocation>> allocations)
throws YarnException {
List<Container> allocatedContainers = new ArrayList<>();
while (toAllocate > 0) {
RMNode node = nodeQueueLoadMonitor.selectAnyNode(blacklist);
if (node != null) {
toAllocate--;
Container container = createContainer(rmIdentifier, appParams,
idCounter, id, userName, allocations, ResourceRequest.ANY,
enrichedAsk.getRequest(), convertToRemoteNode(node));
allocatedContainers.add(container);
metrics.incrOffSwitchOppContainers();
LOG.info("Allocated [{}] as opportunistic at location [{}]",
container.getId(), ResourceRequest.ANY);
} else {
// we couldn't allocate any - break the loop.
break;
}
}
return allocatedContainers;
}
private RemoteNode convertToRemoteNode(RMNode rmNode) {
if (rmNode != null) {
RemoteNode rNode = RemoteNode.newInstance(rmNode.getNodeID(),
rmNode.getHttpAddress());
rNode.setRackName(rmNode.getRackName());
return rNode;
}
return null;
}
}