| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; |
| |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| |
| /** |
| * Temporary data-structure tracking resource availability, pending resource |
| * need, current utilization. This is per-queue-per-partition data structure |
| */ |
| public class TempQueuePerPartition extends AbstractPreemptionEntity { |
| // Following fields are copied from scheduler |
| final String partition; |
| |
| private final Resource killable; |
| private final float absCapacity; |
| private final float absMaxCapacity; |
| final Resource totalPartitionResource; |
| |
| // Following fields are settled and used by candidate selection policies |
| Resource untouchableExtra; |
| Resource preemptableExtra; |
| |
| double normalizedGuarantee; |
| |
| final ArrayList<TempQueuePerPartition> children; |
| private Collection<TempAppPerPartition> apps; |
| LeafQueue leafQueue; |
| boolean preemptionDisabled; |
| |
| protected Resource pendingDeductReserved; |
| |
| // Relative priority of this queue to its parent |
| // If parent queue's ordering policy doesn't respect priority, |
| // this will be always 0 |
| int relativePriority = 0; |
| TempQueuePerPartition parent = null; |
| |
| // This will hold a temp user data structure and will hold userlimit, |
| // idealAssigned, used etc. |
| Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>(); |
| |
| TempQueuePerPartition(String queueName, Resource current, |
| boolean preemptionDisabled, String partition, Resource killable, |
| float absCapacity, float absMaxCapacity, Resource totalPartitionResource, |
| Resource reserved, CSQueue queue) { |
| super(queueName, current, Resource.newInstance(0, 0), reserved, |
| Resource.newInstance(0, 0)); |
| |
| if (queue instanceof LeafQueue) { |
| LeafQueue l = (LeafQueue) queue; |
| pending = l.getTotalPendingResourcesConsideringUserLimit( |
| totalPartitionResource, partition, false); |
| pendingDeductReserved = l.getTotalPendingResourcesConsideringUserLimit( |
| totalPartitionResource, partition, true); |
| leafQueue = l; |
| } else { |
| pending = Resources.createResource(0); |
| pendingDeductReserved = Resources.createResource(0); |
| } |
| |
| this.normalizedGuarantee = Float.NaN; |
| this.children = new ArrayList<>(); |
| this.apps = new ArrayList<>(); |
| this.untouchableExtra = Resource.newInstance(0, 0); |
| this.preemptableExtra = Resource.newInstance(0, 0); |
| this.preemptionDisabled = preemptionDisabled; |
| this.partition = partition; |
| this.killable = killable; |
| this.absCapacity = absCapacity; |
| this.absMaxCapacity = absMaxCapacity; |
| this.totalPartitionResource = totalPartitionResource; |
| } |
| |
| public void setLeafQueue(LeafQueue l) { |
| assert children.size() == 0; |
| this.leafQueue = l; |
| } |
| |
| /** |
| * When adding a child we also aggregate its pending resource needs. |
| * |
| * @param q |
| * the child queue to add to this queue |
| */ |
| public void addChild(TempQueuePerPartition q) { |
| assert leafQueue == null; |
| children.add(q); |
| Resources.addTo(pending, q.pending); |
| Resources.addTo(pendingDeductReserved, q.pendingDeductReserved); |
| } |
| |
| public ArrayList<TempQueuePerPartition> getChildren() { |
| return children; |
| } |
| |
| // This function "accepts" all the resources it can (pending) and return |
| // the unused ones |
| Resource offer(Resource avail, ResourceCalculator rc, |
| Resource clusterResource, boolean considersReservedResource) { |
| Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( |
| Resources.subtract(getMax(), idealAssigned), |
| Resource.newInstance(0, 0)); |
| // accepted = min{avail, |
| // max - assigned, |
| // current + pending - assigned, |
| // # Make sure a queue will not get more than max of its |
| // # used/guaranteed, this is to make sure preemption won't |
| // # happen if all active queues are beyond their guaranteed |
| // # This is for leaf queue only. |
| // max(guaranteed, used) - assigned} |
| // remain = avail - accepted |
| Resource accepted = Resources.min(rc, clusterResource, |
| absMaxCapIdealAssignedDelta, |
| Resources.min(rc, clusterResource, avail, Resources |
| /* |
| * When we're using FifoPreemptionSelector (considerReservedResource |
| * = false). |
| * |
| * We should deduct reserved resource from pending to avoid excessive |
| * preemption: |
| * |
| * For example, if an under-utilized queue has used = reserved = 20. |
| * Preemption policy will try to preempt 20 containers (which is not |
| * satisfied) from different hosts. |
| * |
| * In FifoPreemptionSelector, there's no guarantee that preempted |
| * resource can be used by pending request, so policy will preempt |
| * resources repeatly. |
| */ |
| .subtract(Resources.add(getUsed(), |
| (considersReservedResource ? pending : pendingDeductReserved)), |
| idealAssigned))); |
| |
| // For leaf queue: accept = min(accept, max(guaranteed, used) - assigned) |
| // Why only for leaf queue? |
| // Because for a satisfied parent queue, it could have some under-utilized |
| // leaf queues. Such under-utilized leaf queue could preemption resources |
| // from over-utilized leaf queue located at other hierarchies. |
| if (null == children || children.isEmpty()) { |
| Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract( |
| Resources.max(rc, clusterResource, getUsed(), getGuaranteed()), |
| idealAssigned); |
| maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource, |
| maxOfGuranteedAndUsedDeductAssigned, Resources.none()); |
| accepted = Resources.min(rc, clusterResource, accepted, |
| maxOfGuranteedAndUsedDeductAssigned); |
| } |
| Resource remain = Resources.subtract(avail, accepted); |
| Resources.addTo(idealAssigned, accepted); |
| return remain; |
| } |
| |
| public Resource getGuaranteed() { |
| return Resources.multiply(totalPartitionResource, absCapacity); |
| } |
| |
| public Resource getMax() { |
| return Resources.multiply(totalPartitionResource, absMaxCapacity); |
| } |
| |
| public void updatePreemptableExtras(ResourceCalculator rc) { |
| // Reset untouchableExtra and preemptableExtra |
| untouchableExtra = Resources.none(); |
| preemptableExtra = Resources.none(); |
| |
| Resource extra = Resources.subtract(getUsed(), getGuaranteed()); |
| if (Resources.lessThan(rc, totalPartitionResource, extra, |
| Resources.none())) { |
| extra = Resources.none(); |
| } |
| |
| if (null == children || children.isEmpty()) { |
| // If it is a leaf queue |
| if (preemptionDisabled) { |
| untouchableExtra = extra; |
| } else { |
| preemptableExtra = extra; |
| } |
| } else { |
| // If it is a parent queue |
| Resource childrensPreemptable = Resource.newInstance(0, 0); |
| for (TempQueuePerPartition child : children) { |
| Resources.addTo(childrensPreemptable, child.preemptableExtra); |
| } |
| // untouchableExtra = max(extra - childrenPreemptable, 0) |
| if (Resources.greaterThanOrEqual(rc, totalPartitionResource, |
| childrensPreemptable, extra)) { |
| untouchableExtra = Resource.newInstance(0, 0); |
| } else { |
| untouchableExtra = Resources.subtract(extra, childrensPreemptable); |
| } |
| preemptableExtra = Resources.min(rc, totalPartitionResource, |
| childrensPreemptable, extra); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append(" NAME: " + queueName).append(" CUR: ").append(current) |
| .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved) |
| .append(" GAR: ").append(getGuaranteed()).append(" NORM: ") |
| .append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ") |
| .append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted) |
| .append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted()) |
| .append(" UNTOUCHABLE: ").append(untouchableExtra) |
| .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n"); |
| |
| return sb.toString(); |
| } |
| |
| public void assignPreemption(float scalingFactor, ResourceCalculator rc, |
| Resource clusterResource) { |
| Resource usedDeductKillable = Resources.subtract(getUsed(), killable); |
| Resource totalResource = Resources.add(getUsed(), pending); |
| |
| // The minimum resource that we need to keep for a queue is: |
| // max(idealAssigned, min(used + pending, guaranteed)). |
| // |
| // Doing this because when we calculate ideal allocation doesn't consider |
| // reserved resource, ideal-allocation calculated could be less than |
| // guaranteed and total. We should avoid preempt from a queue if it is |
| // already |
| // <= its guaranteed resource. |
| Resource minimumQueueResource = Resources.max(rc, clusterResource, |
| Resources.min(rc, clusterResource, totalResource, getGuaranteed()), |
| idealAssigned); |
| |
| if (Resources.greaterThan(rc, clusterResource, usedDeductKillable, |
| minimumQueueResource)) { |
| toBePreempted = Resources.multiply( |
| Resources.subtract(usedDeductKillable, minimumQueueResource), |
| scalingFactor); |
| } else { |
| toBePreempted = Resources.none(); |
| } |
| } |
| |
| public void deductActuallyToBePreempted(ResourceCalculator rc, |
| Resource cluster, Resource toBeDeduct) { |
| if (Resources.greaterThan(rc, cluster, getActuallyToBePreempted(), |
| toBeDeduct)) { |
| Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); |
| } |
| setActuallyToBePreempted(Resources.max(rc, cluster, |
| getActuallyToBePreempted(), Resources.none())); |
| } |
| |
| void appendLogString(StringBuilder sb) { |
| sb.append(queueName).append(", ").append(current.getMemorySize()) |
| .append(", ").append(current.getVirtualCores()).append(", ") |
| .append(pending.getMemorySize()).append(", ") |
| .append(pending.getVirtualCores()).append(", ") |
| .append(getGuaranteed().getMemorySize()).append(", ") |
| .append(getGuaranteed().getVirtualCores()).append(", ") |
| .append(idealAssigned.getMemorySize()).append(", ") |
| .append(idealAssigned.getVirtualCores()).append(", ") |
| .append(toBePreempted.getMemorySize()).append(", ") |
| .append(toBePreempted.getVirtualCores()).append(", ") |
| .append(getActuallyToBePreempted().getMemorySize()).append(", ") |
| .append(getActuallyToBePreempted().getVirtualCores()); |
| } |
| |
| public void addAllApps(Collection<TempAppPerPartition> orderedApps) { |
| this.apps = orderedApps; |
| } |
| |
| public Collection<TempAppPerPartition> getApps() { |
| return apps; |
| } |
| |
| public void addUserPerPartition(String userName, |
| TempUserPerPartition tmpUser) { |
| this.usersPerPartition.put(userName, tmpUser); |
| } |
| |
| public Map<String, TempUserPerPartition> getUsersPerPartition() { |
| return usersPerPartition; |
| } |
| } |