blob: 799140de6608b3ddb607f588ce098c49f4df5787 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Class encapsulates all outstanding container increase and decrease
* requests for an application.
*/
public class ContainerUpdateContext {
public static final ContainerId UNDEFINED =
ContainerId.newContainerId(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(-1, -1), -1), -1);
protected static final RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
// Keep track of containers that are undergoing promotion
private final Map<SchedulerRequestKey, Map<Resource,
Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();
private final Map<ContainerId, Resource> outstandingDecreases =
new HashMap<>();
private final AppSchedulingInfo appSchedulingInfo;
ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
this.appSchedulingInfo = appSchedulingInfo;
}
/**
* Add the container to outstanding decreases.
* @param updateReq UpdateContainerRequest.
* @param schedulerNode SchedulerNode.
* @param container Container.
* @return If it was possible to decrease the container.
*/
public synchronized boolean checkAndAddToOutstandingDecreases(
UpdateContainerRequest updateReq, SchedulerNode schedulerNode,
Container container) {
if (outstandingDecreases.containsKey(container.getId())) {
return false;
}
if (ContainerUpdateType.DECREASE_RESOURCE ==
updateReq.getContainerUpdateType()) {
SchedulerRequestKey updateKey = new SchedulerRequestKey
(container.getPriority(),
container.getAllocationRequestId(), container.getId());
cancelPreviousRequest(schedulerNode, updateKey);
outstandingDecreases.put(container.getId(), updateReq.getCapability());
} else {
outstandingDecreases.put(container.getId(), container.getResource());
}
return true;
}
/**
* Add the container to outstanding increases.
* @param rmContainer RMContainer.
* @param schedulerNode SchedulerNode.
* @param updateRequest UpdateContainerRequest.
* @return true if updated to outstanding increases was successful.
*/
public synchronized boolean checkAndAddToOutstandingIncreases(
RMContainer rmContainer, SchedulerNode schedulerNode,
UpdateContainerRequest updateRequest) {
Container container = rmContainer.getContainer();
SchedulerRequestKey schedulerKey =
SchedulerRequestKey.create(updateRequest,
rmContainer.getAllocatedSchedulerKey());
Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
outstandingIncreases.get(schedulerKey);
if (resourceMap == null) {
resourceMap = new HashMap<>();
outstandingIncreases.put(schedulerKey, resourceMap);
} else {
// Updating Resource for and existing increase container
if (ContainerUpdateType.INCREASE_RESOURCE ==
updateRequest.getContainerUpdateType()) {
cancelPreviousRequest(schedulerNode, schedulerKey);
} else {
return false;
}
}
Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
Map<NodeId, Set<ContainerId>> locationMap =
resourceMap.get(resToIncrease);
if (locationMap == null) {
locationMap = new HashMap<>();
resourceMap.put(resToIncrease, locationMap);
}
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds == null) {
containerIds = new HashSet<>();
locationMap.put(container.getNodeId(), containerIds);
}
if (outstandingDecreases.containsKey(container.getId())) {
return false;
}
containerIds.add(container.getId());
if (!Resources.isNone(resToIncrease)) {
Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
new HashMap<>();
Map<String, ResourceRequest> resMap =
createResourceRequests(rmContainer, schedulerNode,
schedulerKey, resToIncrease);
updateResReqs.put(schedulerKey, resMap);
appSchedulingInfo.updateResourceRequests(updateResReqs, false);
}
return true;
}
private void cancelPreviousRequest(SchedulerNode schedulerNode,
SchedulerRequestKey schedulerKey) {
AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
if (appPlacementAllocator != null) {
PendingAsk pendingAsk = appPlacementAllocator.getPendingAsk(
ResourceRequest.ANY);
// Decrement the pending using a dummy RR with
// resource = prev update req capability
if (pendingAsk != null && pendingAsk.getCount() > 0) {
Container container = Container.newInstance(UNDEFINED,
schedulerNode.getNodeID(), "host:port",
pendingAsk.getPerAllocationResource(),
schedulerKey.getPriority(), null);
appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
schedulerKey,
new RMContainerImpl(container, schedulerKey,
appSchedulingInfo.getApplicationAttemptId(),
schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
appSchedulingInfo.getRMContext(),
appPlacementAllocator.getPrimaryRequestedNodePartition()));
}
}
}
private Map<String, ResourceRequest> createResourceRequests(
RMContainer rmContainer, SchedulerNode schedulerNode,
SchedulerRequestKey schedulerKey, Resource resToIncrease) {
Map<String, ResourceRequest> resMap = new HashMap<>();
resMap.put(rmContainer.getContainer().getNodeId().getHost(),
createResourceReqForIncrease(schedulerKey, resToIncrease,
RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
rmContainer, rmContainer.getContainer().getNodeId().getHost()));
resMap.put(schedulerNode.getRackName(),
createResourceReqForIncrease(schedulerKey, resToIncrease,
RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
rmContainer, schedulerNode.getRackName()));
resMap.put(ResourceRequest.ANY,
createResourceReqForIncrease(schedulerKey, resToIncrease,
RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
rmContainer, ResourceRequest.ANY));
return resMap;
}
private Resource getResourceToIncrease(UpdateContainerRequest updateReq,
RMContainer rmContainer) {
if (updateReq.getContainerUpdateType() ==
ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
return rmContainer.getContainer().getResource();
}
if (updateReq.getContainerUpdateType() ==
ContainerUpdateType.INCREASE_RESOURCE) {
// This has to equal the Resources in excess of fitsIn()
// for container increase and is equal to the container total
// resource for Promotion.
Resource maxCap = Resources.componentwiseMax(updateReq.getCapability(),
rmContainer.getContainer().getResource());
return Resources.add(maxCap,
Resources.negate(rmContainer.getContainer().getResource()));
}
return null;
}
private static ResourceRequest createResourceReqForIncrease(
SchedulerRequestKey schedulerRequestKey, Resource resToIncrease,
ResourceRequest rr, RMContainer rmContainer, String resourceName) {
rr.setResourceName(resourceName);
rr.setNumContainers(1);
rr.setRelaxLocality(false);
rr.setPriority(rmContainer.getContainer().getPriority());
rr.setAllocationRequestId(schedulerRequestKey.getAllocationRequestId());
rr.setCapability(resToIncrease);
rr.setNodeLabelExpression(rmContainer.getNodeLabelExpression());
rr.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(
ExecutionType.GUARANTEED, true));
return rr;
}
/**
* Remove Container from outstanding increases / decreases. Calling this
* method essentially completes the update process.
* @param schedulerKey SchedulerRequestKey.
* @param container Container.
*/
public synchronized void removeFromOutstandingUpdate(
SchedulerRequestKey schedulerKey, Container container) {
Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
outstandingIncreases.get(schedulerKey);
if (resourceMap != null) {
Map<NodeId, Set<ContainerId>> locationMap =
resourceMap.get(container.getResource());
if (locationMap != null) {
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds != null && !containerIds.isEmpty()) {
containerIds.remove(container.getId());
if (containerIds.isEmpty()) {
locationMap.remove(container.getNodeId());
}
}
if (locationMap.isEmpty()) {
resourceMap.remove(container.getResource());
}
}
if (resourceMap.isEmpty()) {
outstandingIncreases.remove(schedulerKey);
}
}
outstandingDecreases.remove(container.getId());
}
/**
* Check if a new container is to be matched up against an outstanding
* Container increase request.
* @param node SchedulerNode.
* @param schedulerKey SchedulerRequestKey.
* @param rmContainer RMContainer.
* @return ContainerId.
*/
public ContainerId matchContainerToOutstandingIncreaseReq(
SchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer rmContainer) {
ContainerId retVal = null;
Container container = rmContainer.getContainer();
Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
outstandingIncreases.get(schedulerKey);
if (resourceMap != null) {
Map<NodeId, Set<ContainerId>> locationMap =
resourceMap.get(container.getResource());
if (locationMap != null) {
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds != null && !containerIds.isEmpty()) {
retVal = containerIds.iterator().next();
}
}
}
// Allocation happened on NM on the same host, but not on the NM
// we need.. We need to signal that this container has to be released.
// We also need to add these requests back.. to be reallocated.
if (resourceMap != null && retVal == null) {
Map<SchedulerRequestKey, Map<String, ResourceRequest>> reqsToUpdate =
new HashMap<>();
Map<String, ResourceRequest> resMap = createResourceRequests
(rmContainer, node, schedulerKey,
rmContainer.getContainer().getResource());
reqsToUpdate.put(schedulerKey, resMap);
appSchedulingInfo.updateResourceRequests(reqsToUpdate, true);
return UNDEFINED;
}
return retVal;
}
/**
* Swaps the existing RMContainer's and the temp RMContainers internal
* container references after adjusting the resources in each.
* @param tempRMContainer Temp RMContainer.
* @param existingRMContainer Existing RMContainer.
* @param updateType Update Type.
* @return Existing RMContainer after swapping the container references.
*/
public RMContainer swapContainer(RMContainer tempRMContainer,
RMContainer existingRMContainer, ContainerUpdateType updateType) {
ContainerId matchedContainerId = existingRMContainer.getContainerId();
// Swap updated container with the existing container
Container tempContainer = tempRMContainer.getContainer();
Resource updatedResource = createUpdatedResource(
tempContainer, existingRMContainer.getContainer(), updateType);
Resource resourceToRelease = createResourceToRelease(
existingRMContainer.getContainer(), updateType);
Container newContainer = Container.newInstance(matchedContainerId,
existingRMContainer.getContainer().getNodeId(),
existingRMContainer.getContainer().getNodeHttpAddress(),
updatedResource,
existingRMContainer.getContainer().getPriority(), null,
tempContainer.getExecutionType());
newContainer.setExposedPorts(
existingRMContainer.getContainer().getExposedPorts());
newContainer.setAllocationRequestId(
existingRMContainer.getContainer().getAllocationRequestId());
newContainer.setVersion(existingRMContainer.getContainer().getVersion());
tempRMContainer.getContainer().setResource(resourceToRelease);
tempRMContainer.getContainer().setExecutionType(
existingRMContainer.getContainer().getExecutionType());
((RMContainerImpl)existingRMContainer).setContainer(newContainer);
return existingRMContainer;
}
/**
* Returns the resource that the container will finally be assigned with
* at the end of the update operation.
* @param tempContainer Temporary Container created for the operation.
* @param existingContainer Existing Container.
* @param updateType Update Type.
* @return Final Resource.
*/
private Resource createUpdatedResource(Container tempContainer,
Container existingContainer, ContainerUpdateType updateType) {
if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
return Resources.add(existingContainer.getResource(),
tempContainer.getResource());
} else if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
return outstandingDecreases.get(existingContainer.getId());
} else {
return existingContainer.getResource();
}
}
/**
* Returns the resources that need to be released at the end of the update
* operation.
* @param existingContainer Existing Container.
* @param updateType Updated type.
* @return Resources to be released.
*/
private Resource createResourceToRelease(Container existingContainer,
ContainerUpdateType updateType) {
if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
return Resources.none();
} else if (ContainerUpdateType.DECREASE_RESOURCE == updateType){
return Resources.add(existingContainer.getResource(),
Resources.negate(
outstandingDecreases.get(existingContainer.getId())));
} else {
return existingContainer.getResource();
}
}
}