| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceSizing; |
| import org.apache.hadoop.yarn.api.records.SchedulingRequest; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NodeCandidateSelector; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Basic placement algorithm. |
| * Supports different Iterators at SchedulingRequest level including: |
| * Serial, PopularTags |
| */ |
| public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(DefaultPlacementAlgorithm.class); |
| |
| // Number of times to re-attempt placing a single scheduling request. |
| private static final int RE_ATTEMPT_COUNT = 2; |
| |
| private LocalAllocationTagsManager tagsManager; |
| private PlacementConstraintManager constraintManager; |
| private NodeCandidateSelector nodeSelector; |
| private ResourceCalculator resourceCalculator; |
| |
| @Override |
| public void init(RMContext rmContext) { |
| this.tagsManager = new LocalAllocationTagsManager( |
| rmContext.getAllocationTagsManager()); |
| this.constraintManager = rmContext.getPlacementConstraintManager(); |
| this.resourceCalculator = rmContext.getScheduler().getResourceCalculator(); |
| this.nodeSelector = |
| filter -> ((AbstractYarnScheduler) (rmContext).getScheduler()) |
| .getNodes(filter); |
| } |
| |
| boolean attemptPlacementOnNode(ApplicationId appId, |
| Resource availableResources, SchedulingRequest schedulingRequest, |
| SchedulerNode schedulerNode, boolean ignoreResourceCheck) |
| throws InvalidAllocationTagsQueryException { |
| boolean fitsInNode = ignoreResourceCheck || |
| Resources.fitsIn(resourceCalculator, |
| schedulingRequest.getResourceSizing().getResources(), |
| availableResources); |
| boolean constraintsSatisfied = |
| PlacementConstraintsUtil.canSatisfyConstraints(appId, |
| schedulingRequest, schedulerNode, constraintManager, tagsManager); |
| return fitsInNode && constraintsSatisfied; |
| } |
| |
| |
| @Override |
| public void place(ConstraintPlacementAlgorithmInput input, |
| ConstraintPlacementAlgorithmOutputCollector collector) { |
| BatchedRequests requests = (BatchedRequests) input; |
| int placementAttempt = requests.getPlacementAttempt(); |
| ConstraintPlacementAlgorithmOutput resp = |
| new ConstraintPlacementAlgorithmOutput(requests.getApplicationId()); |
| List<SchedulerNode> allNodes = nodeSelector.selectNodes(null); |
| |
| List<SchedulingRequest> rejectedRequests = new ArrayList<>(); |
| Map<NodeId, Resource> availResources = new HashMap<>(); |
| int rePlacementCount = RE_ATTEMPT_COUNT; |
| while (rePlacementCount > 0) { |
| doPlacement(requests, resp, allNodes, rejectedRequests, availResources); |
| // Double check if placement constraints are really satisfied |
| validatePlacement(requests.getApplicationId(), resp, |
| rejectedRequests, availResources); |
| if (rejectedRequests.size() == 0 || rePlacementCount == 1) { |
| break; |
| } |
| requests = new BatchedRequests(requests.getIteratorType(), |
| requests.getApplicationId(), rejectedRequests, |
| requests.getPlacementAttempt()); |
| rejectedRequests = new ArrayList<>(); |
| rePlacementCount--; |
| } |
| |
| resp.getRejectedRequests().addAll( |
| rejectedRequests.stream().map( |
| x -> new SchedulingRequestWithPlacementAttempt( |
| placementAttempt, x)).collect(Collectors.toList())); |
| collector.collect(resp); |
| // Clean current temp-container tags |
| this.tagsManager.cleanTempContainers(requests.getApplicationId()); |
| } |
| |
| private void doPlacement(BatchedRequests requests, |
| ConstraintPlacementAlgorithmOutput resp, |
| List<SchedulerNode> allNodes, |
| List<SchedulingRequest> rejectedRequests, |
| Map<NodeId, Resource> availableResources) { |
| Iterator<SchedulingRequest> requestIterator = requests.iterator(); |
| Iterator<SchedulerNode> nIter = allNodes.iterator(); |
| SchedulerNode lastSatisfiedNode = null; |
| while (requestIterator.hasNext()) { |
| if (allNodes.isEmpty()) { |
| LOG.warn("No nodes available for placement at the moment !!"); |
| break; |
| } |
| SchedulingRequest schedulingRequest = requestIterator.next(); |
| PlacedSchedulingRequest placedReq = |
| new PlacedSchedulingRequest(schedulingRequest); |
| placedReq.setPlacementAttempt(requests.getPlacementAttempt()); |
| resp.getPlacedRequests().add(placedReq); |
| CircularIterator<SchedulerNode> nodeIter = |
| new CircularIterator(lastSatisfiedNode, nIter, allNodes); |
| int numAllocs = |
| schedulingRequest.getResourceSizing().getNumAllocations(); |
| while (nodeIter.hasNext() && numAllocs > 0) { |
| SchedulerNode node = nodeIter.next(); |
| try { |
| String tag = schedulingRequest.getAllocationTags() == null ? "" : |
| schedulingRequest.getAllocationTags().iterator().next(); |
| Resource unallocatedResource = |
| availableResources.computeIfAbsent(node.getNodeID(), |
| x -> Resource.newInstance(node.getUnallocatedResource())); |
| if (!requests.getBlacklist(tag).contains(node.getNodeID()) && |
| attemptPlacementOnNode( |
| requests.getApplicationId(), unallocatedResource, |
| schedulingRequest, node, false)) { |
| schedulingRequest.getResourceSizing() |
| .setNumAllocations(--numAllocs); |
| Resources.addTo(unallocatedResource, |
| schedulingRequest.getResourceSizing().getResources()); |
| placedReq.getNodes().add(node); |
| numAllocs = |
| schedulingRequest.getResourceSizing().getNumAllocations(); |
| // Add temp-container tags for current placement cycle |
| this.tagsManager.addTempTags(node.getNodeID(), |
| requests.getApplicationId(), |
| schedulingRequest.getAllocationTags()); |
| lastSatisfiedNode = node; |
| } |
| } catch (InvalidAllocationTagsQueryException e) { |
| LOG.warn("Got exception from TagManager !", e); |
| } |
| } |
| } |
| // Add all requests whose numAllocations still > 0 to rejected list. |
| requests.getSchedulingRequests().stream() |
| .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0) |
| .forEach(rejReq -> rejectedRequests.add(cloneReq(rejReq))); |
| } |
| |
| /** |
| * During the placement phase, allocation tags are added to the node if the |
| * constraint is satisfied, But depending on the order in which the |
| * algorithm sees the request, it is possible that a constraint that happened |
| * to be valid during placement of an earlier-seen request, might not be |
| * valid after all subsequent requests have been placed. |
| * |
| * For eg: |
| * Assume nodes n1, n2, n3, n4 and n5 |
| * |
| * Consider the 2 constraints: |
| * 1) "foo", anti-affinity with "foo" |
| * 2) "bar", anti-affinity with "foo" |
| * |
| * And 2 requests |
| * req1: NumAllocations = 4, allocTags = [foo] |
| * req2: NumAllocations = 1, allocTags = [bar] |
| * |
| * If "req1" is seen first, the algorithm can place the 4 containers in |
| * n1, n2, n3 and n4. And when it gets to "req2", it will see that 4 nodes |
| * with the "foo" tag and will place on n5. |
| * But if "req2" is seem first, then "bar" will be placed on any node, |
| * since no node currently has "foo", and when it gets to "req1", since |
| * "foo" has not anti-affinity with "bar", the algorithm can end up placing |
| * "foo" on a node with "bar" violating the second constraint. |
| * |
| * To prevent the above, we need a validation step: after the placements for a |
| * batch of requests are made, for each req, we remove its tags from the node |
| * and try to see of constraints are still satisfied if the tag were to be |
| * added back on the node. |
| * |
| * When applied to the example above, after "req2" and "req1" are placed, |
| * we remove the "bar" tag from the node and try to add it back on the node. |
| * This time, constraint satisfaction will fail, since there is now a "foo" |
| * tag on the node and "bar" cannot be added. The algorithm will then |
| * retry placing "req2" on another node. |
| * |
| * @param applicationId |
| * @param resp |
| * @param rejectedRequests |
| * @param availableResources |
| */ |
| private void validatePlacement(ApplicationId applicationId, |
| ConstraintPlacementAlgorithmOutput resp, |
| List<SchedulingRequest> rejectedRequests, |
| Map<NodeId, Resource> availableResources) { |
| Iterator<PlacedSchedulingRequest> pReqIter = |
| resp.getPlacedRequests().iterator(); |
| while (pReqIter.hasNext()) { |
| PlacedSchedulingRequest pReq = pReqIter.next(); |
| Iterator<SchedulerNode> nodeIter = pReq.getNodes().iterator(); |
| // Assuming all reqs were satisfied. |
| int num = 0; |
| while (nodeIter.hasNext()) { |
| SchedulerNode node = nodeIter.next(); |
| try { |
| // Remove just the tags for this placement. |
| this.tagsManager.removeTempTags(node.getNodeID(), |
| applicationId, pReq.getSchedulingRequest().getAllocationTags()); |
| Resource availOnNode = availableResources.get(node.getNodeID()); |
| if (!attemptPlacementOnNode(applicationId, availOnNode, |
| pReq.getSchedulingRequest(), node, true)) { |
| nodeIter.remove(); |
| num++; |
| Resources.subtractFrom(availOnNode, |
| pReq.getSchedulingRequest().getResourceSizing().getResources()); |
| } else { |
| // Add back the tags if everything is fine. |
| this.tagsManager.addTempTags(node.getNodeID(), |
| applicationId, pReq.getSchedulingRequest().getAllocationTags()); |
| } |
| } catch (InvalidAllocationTagsQueryException e) { |
| LOG.warn("Got exception from TagManager !", e); |
| } |
| } |
| if (num > 0) { |
| SchedulingRequest sReq = cloneReq(pReq.getSchedulingRequest()); |
| sReq.getResourceSizing().setNumAllocations(num); |
| rejectedRequests.add(sReq); |
| } |
| if (pReq.getNodes().isEmpty()) { |
| pReqIter.remove(); |
| } |
| } |
| } |
| |
| private static SchedulingRequest cloneReq(SchedulingRequest sReq) { |
| return SchedulingRequest.newInstance( |
| sReq.getAllocationRequestId(), sReq.getPriority(), |
| sReq.getExecutionType(), sReq.getAllocationTags(), |
| ResourceSizing.newInstance( |
| sReq.getResourceSizing().getNumAllocations(), |
| sReq.getResourceSizing().getResources()), |
| sReq.getPlacementConstraint()); |
| } |
| |
| } |