blob: 4723233ab1bc9fc1a131c08a085a288285ed3c27 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
/**
* <p>
* The OpportunisticContainerAllocator allocates containers on a given list of
* nodes, after modifying the container sizes to respect the limits set by the
* ResourceManager. It tries to distribute the containers as evenly as possible.
* It also uses the <code>NMTokenSecretManagerInNM</code> to generate the
* required NM tokens for the allocated containers.
* </p>
*/
public class OpportunisticContainerAllocator {
private static final Log LOG =
LogFactory.getLog(OpportunisticContainerAllocator.class);
private static final ResourceCalculator RESOURCE_CALCULATOR =
new DominantResourceCalculator();
static class ContainerIdCounter {
final AtomicLong containerIdCounter = new AtomicLong(1);
void resetContainerIdCounter(long containerIdStart) {
this.containerIdCounter.set(containerIdStart);
}
long generateContainerId() {
return this.containerIdCounter.decrementAndGet();
}
}
private final NodeStatusUpdater nodeStatusUpdater;
private final Context context;
private int webpagePort;
public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
Context context, int webpagePort) {
this.nodeStatusUpdater = nodeStatusUpdater;
this.context = context;
this.webpagePort = webpagePort;
}
public Map<Resource, List<Container>> allocate(
DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
Collection<ResourceRequest> resourceAsks, Set<String> blacklist,
ApplicationAttemptId appAttId, Map<String, NodeId> allNodes,
String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>();
for (ResourceRequest anyAsk : resourceAsks) {
allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
allNodes, userName, containers, anyAsk);
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", num_containers=" + anyAsk.getNumContainers()
+ ", capability=" + anyAsk.getCapability() + "]"
+ " allocated = " + containers.get(anyAsk.getCapability()).size());
}
return containers;
}
private void allocateOpportunisticContainers(
DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
Set<String> blacklist, ApplicationAttemptId id,
Map<String, NodeId> allNodes, String userName,
Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
throws YarnException {
int toAllocate = anyAsk.getNumContainers()
- (containers.isEmpty() ? 0 :
containers.get(anyAsk.getCapability()).size());
List<NodeId> nodesForScheduling = new ArrayList<>();
for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
// Do not use blacklisted nodes for scheduling.
if (blacklist.contains(nodeEntry.getKey())) {
continue;
}
nodesForScheduling.add(nodeEntry.getValue());
}
int numAllocated = 0;
int nextNodeToSchedule = 0;
for (int numCont = 0; numCont < toAllocate; numCont++) {
nextNodeToSchedule++;
nextNodeToSchedule %= nodesForScheduling.size();
NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
Container container = buildContainer(appParams, idCounter, anyAsk, id,
userName, nodeId);
List<Container> cList = containers.get(anyAsk.getCapability());
if (cList == null) {
cList = new ArrayList<>();
containers.put(anyAsk.getCapability(), cList);
}
cList.add(container);
numAllocated++;
LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
}
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
}
private Container buildContainer(DistributedSchedulerParams appParams,
ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
String userName, NodeId nodeId) throws YarnException {
ContainerId cId =
ContainerId.newContainerId(id, idCounter.generateContainerId());
// Normalize the resource asks (Similar to what the the RM scheduler does
// before accepting an ask)
Resource capability = normalizeCapability(appParams, rr);
long currTime = System.currentTimeMillis();
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(
cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
capability, currTime + appParams.containerTokenExpiryInterval,
context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
ExecutionType.OPPORTUNISTIC);
byte[] pwd =
context.getContainerTokenSecretManager().createPassword(
containerTokenIdentifier);
Token containerToken = newContainerToken(nodeId, pwd,
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
capability, rr.getPriority(), containerToken,
containerTokenIdentifier.getExecutionType(),
rr.getAllocationRequestId());
return container;
}
private Resource normalizeCapability(DistributedSchedulerParams appParams,
ResourceRequest ask) {
return Resources.normalize(RESOURCE_CALCULATOR,
ask.getCapability(), appParams.minResource, appParams.maxResource,
appParams.incrementResource);
}
public static Token newContainerToken(NodeId nodeId, byte[] password,
ContainerTokenIdentifier tokenIdentifier) {
// RPC layer client expects ip:port as service for tokens
InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
nodeId.getPort());
// NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
.buildTokenService(addr).toString());
return containerToken;
}
}