| /******************************************************************************* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| *******************************************************************************/ |
| package org.apache.hadoop.yarn.server.resourcemanager.reservation; |
| |
| import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import java.util.Map; |
| import java.util.NavigableMap; |
| import java.util.TreeMap; |
| |
| /** |
| * This policy enforces a time-extended notion of Capacity. In particular it |
| * guarantees that the allocation received in input when combined with all |
| * previous allocation for the user does not violate an instantaneous max limit |
| * on the resources received, and that for every window of time of length |
| * validWindow, the integral of the allocations for a user (sum of the currently |
| * submitted allocation and all prior allocations for the user) does not exceed |
| * validWindow * maxAvg. |
| * |
| * This allows flexibility, in the sense that an allocation can instantaneously |
| * use large portions of the available capacity, but prevents abuses by bounding |
| * the average use over time. |
| * |
| * By controlling maxInst, maxAvg, validWindow the administrator configuring |
| * this policy can obtain a behavior ranging from instantaneously enforced |
| * capacity (akin to existing queues), or fully flexible allocations (likely |
| * reserved to super-users, or trusted systems). |
| */ |
| @LimitedPrivate("yarn") |
| @Unstable |
| public class CapacityOverTimePolicy extends NoOverCommitPolicy { |
| |
| private ReservationSchedulerConfiguration conf; |
| private long validWindow; |
| private float maxInst; |
| private float maxAvg; |
| |
| @Override |
| public void init(String reservationQueuePath, |
| ReservationSchedulerConfiguration conf) { |
| this.conf = conf; |
| validWindow = this.conf.getReservationWindow(reservationQueuePath); |
| maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; |
| maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; |
| } |
| |
| /** |
| * The validation algorithm walks over the RLE encoded allocation and |
| * checks that for all transition points (when the start or end of the |
| * checking window encounters a value in the RLE). At this point it |
| * checkes whether the integral computed exceeds the quota limit. Note that |
| * this might not find the exact time of a violation, but if a violation |
| * exists it will find it. The advantage is a much lower number of checks |
| * as compared to time-slot by time-slot checks. |
| * |
| * @param plan the plan to validate against |
| * @param reservation the reservation allocation to test. |
| * @throws PlanningException if the validation fails. |
| */ |
| @Override |
| public void validate(Plan plan, ReservationAllocation reservation) |
| throws PlanningException { |
| |
| |
| // rely on NoOverCommitPolicy to check for: 1) user-match, 2) physical |
| // cluster limits, and 3) maxInst (via override of available) |
| try { |
| super.validate(plan, reservation); |
| } catch (PlanningException p) { |
| //wrap it in proper quota exception |
| throw new PlanningQuotaException(p); |
| } |
| |
| long checkStart = reservation.getStartTime() - validWindow; |
| long checkEnd = reservation.getEndTime() + validWindow; |
| |
| //---- check for integral violations of capacity -------- |
| |
| // Gather a view of what to check (curr allocation of user, minus old |
| // version of this reservation, plus new version) |
| RLESparseResourceAllocation consumptionForUserOverTime = |
| plan.getConsumptionForUserOverTime(reservation.getUser(), |
| checkStart, checkEnd); |
| |
| ReservationAllocation old = |
| plan.getReservationById(reservation.getReservationId()); |
| if (old != null) { |
| consumptionForUserOverTime = |
| RLESparseResourceAllocation.merge(plan.getResourceCalculator(), |
| plan.getTotalCapacity(), consumptionForUserOverTime, |
| old.getResourcesOverTime(checkStart, checkEnd), RLEOperator.add, |
| checkStart, checkEnd); |
| } |
| |
| RLESparseResourceAllocation resRLE = |
| reservation.getResourcesOverTime(checkStart, checkEnd); |
| |
| RLESparseResourceAllocation toCheck = RLESparseResourceAllocation |
| .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), |
| consumptionForUserOverTime, resRLE, RLEOperator.add, Long.MIN_VALUE, |
| Long.MAX_VALUE); |
| |
| NavigableMap<Long, Resource> integralUp = new TreeMap<>(); |
| NavigableMap<Long, Resource> integralDown = new TreeMap<>(); |
| |
| long prevTime = toCheck.getEarliestStartTime(); |
| IntegralResource prevResource = new IntegralResource(0L, 0L); |
| IntegralResource runningTot = new IntegralResource(0L, 0L); |
| |
| // add intermediate points |
| Map<Long, Resource> temp = new TreeMap<>(); |
| for (Map.Entry<Long, Resource> pointToCheck : toCheck.getCumulative() |
| .entrySet()) { |
| |
| Long timeToCheck = pointToCheck.getKey(); |
| Resource resourceToCheck = pointToCheck.getValue(); |
| |
| Long nextPoint = toCheck.getCumulative().higherKey(timeToCheck); |
| if (nextPoint == null || toCheck.getCumulative().get(nextPoint) == null) { |
| continue; |
| } |
| for (int i = 1; i <= (nextPoint - timeToCheck) / validWindow; i++) { |
| temp.put(timeToCheck + (i * validWindow), resourceToCheck); |
| } |
| } |
| temp.putAll(toCheck.getCumulative()); |
| |
| // compute point-wise integral for the up-fronts and down-fronts |
| for (Map.Entry<Long, Resource> currPoint : temp.entrySet()) { |
| |
| Long currTime = currPoint.getKey(); |
| Resource currResource = currPoint.getValue(); |
| |
| //add to running total current contribution |
| prevResource.multiplyBy(currTime - prevTime); |
| runningTot.add(prevResource); |
| integralUp.put(currTime, normalizeToResource(runningTot, validWindow)); |
| integralDown.put(currTime + validWindow, |
| normalizeToResource(runningTot, validWindow)); |
| |
| if (currResource != null) { |
| prevResource.memory = currResource.getMemorySize(); |
| prevResource.vcores = currResource.getVirtualCores(); |
| } else { |
| prevResource.memory = 0L; |
| prevResource.vcores = 0L; |
| } |
| prevTime = currTime; |
| } |
| |
| // compute final integral as delta of up minus down transitions |
| RLESparseResourceAllocation intUp = |
| new RLESparseResourceAllocation(integralUp, |
| plan.getResourceCalculator()); |
| RLESparseResourceAllocation intDown = |
| new RLESparseResourceAllocation(integralDown, |
| plan.getResourceCalculator()); |
| |
| RLESparseResourceAllocation integral = RLESparseResourceAllocation |
| .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), intUp, |
| intDown, RLEOperator.subtract, Long.MIN_VALUE, Long.MAX_VALUE); |
| |
| // define over-time integral limit |
| // note: this is aligned with the normalization done above |
| NavigableMap<Long, Resource> tlimit = new TreeMap<>(); |
| Resource maxAvgRes = Resources.multiply(plan.getTotalCapacity(), maxAvg); |
| tlimit.put(toCheck.getEarliestStartTime() - validWindow, maxAvgRes); |
| RLESparseResourceAllocation targetLimit = |
| new RLESparseResourceAllocation(tlimit, plan.getResourceCalculator()); |
| |
| // compare using merge() limit with integral |
| try { |
| |
| RLESparseResourceAllocation.merge(plan.getResourceCalculator(), |
| plan.getTotalCapacity(), targetLimit, integral, |
| RLEOperator.subtractTestNonNegative, checkStart, checkEnd); |
| |
| } catch (PlanningException p) { |
| throw new PlanningQuotaException( |
| "Integral (avg over time) quota capacity " + maxAvg |
| + " over a window of " + validWindow / 1000 + " seconds, " |
| + " would be exceeded by accepting reservation: " + reservation |
| .getReservationId(), p); |
| } |
| } |
| |
| private Resource normalizeToResource(IntegralResource runningTot, |
| long window) { |
| // normalize to fit in windows. Rounding should not impact more than |
| // sub 1 core average allocations. This will all be removed once |
| // Resource moves to long. |
| int memory = (int) Math.round((double) runningTot.memory / window); |
| int vcores = (int) Math.round((double) runningTot.vcores / window); |
| return Resource.newInstance(memory, vcores); |
| } |
| |
| @Override |
| public RLESparseResourceAllocation availableResources( |
| RLESparseResourceAllocation available, Plan plan, String user, |
| ReservationId oldId, long start, long end) throws PlanningException { |
| |
| // this only propagates the instantaneous maxInst properties, while |
| // the time-varying one depends on the current allocation as well |
| // and are not easily captured here |
| Resource planTotalCapacity = plan.getTotalCapacity(); |
| Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst); |
| NavigableMap<Long, Resource> instQuota = new TreeMap<Long, Resource>(); |
| instQuota.put(start, maxInsRes); |
| |
| RLESparseResourceAllocation instRLEQuota = |
| new RLESparseResourceAllocation(instQuota, |
| plan.getResourceCalculator()); |
| |
| RLESparseResourceAllocation used = |
| plan.getConsumptionForUserOverTime(user, start, end); |
| |
| // add back in old reservation used resources if any |
| ReservationAllocation old = plan.getReservationById(oldId); |
| if (old != null) { |
| used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(), |
| Resources.clone(plan.getTotalCapacity()), used, |
| old.getResourcesOverTime(start, end), RLEOperator.subtract, start, |
| end); |
| } |
| |
| instRLEQuota = RLESparseResourceAllocation |
| .merge(plan.getResourceCalculator(), planTotalCapacity, instRLEQuota, |
| used, RLEOperator.subtract, start, end); |
| |
| instRLEQuota = RLESparseResourceAllocation |
| .merge(plan.getResourceCalculator(), planTotalCapacity, available, |
| instRLEQuota, RLEOperator.min, start, end); |
| |
| return instRLEQuota; |
| } |
| |
| @Override |
| public long getValidWindow() { |
| return validWindow; |
| } |
| |
| /** |
| * This class provides support for Resource-like book-keeping, based on |
| * long(s), as using Resource to store the "integral" of the allocation over |
| * time leads to integer overflows for large allocations/clusters. (Evolving |
| * Resource to use long is too disruptive at this point.) |
| * |
| * The comparison/multiplication behaviors of IntegralResource are consistent |
| * with the DefaultResourceCalculator. |
| */ |
| private static class IntegralResource { |
| long memory; |
| long vcores; |
| |
| public IntegralResource(Resource resource) { |
| this.memory = resource.getMemorySize(); |
| this.vcores = resource.getVirtualCores(); |
| } |
| |
| public IntegralResource(long mem, long vcores) { |
| this.memory = mem; |
| this.vcores = vcores; |
| } |
| |
| public void add(Resource r) { |
| memory += r.getMemorySize(); |
| vcores += r.getVirtualCores(); |
| } |
| |
| public void add(IntegralResource r) { |
| memory += r.memory; |
| vcores += r.vcores; |
| } |
| |
| public void subtract(Resource r) { |
| memory -= r.getMemorySize(); |
| vcores -= r.getVirtualCores(); |
| } |
| |
| public IntegralResource negate() { |
| return new IntegralResource(-memory, -vcores); |
| } |
| |
| public void multiplyBy(long window) { |
| memory = memory * window; |
| vcores = vcores * window; |
| } |
| |
| public long compareTo(IntegralResource other) { |
| long diff = memory - other.memory; |
| if (diff == 0) { |
| diff = vcores - other.vcores; |
| } |
| return diff; |
| } |
| |
| @Override |
| public String toString() { |
| return "<memory:" + memory + ", vCores:" + vcores + ">"; |
| } |
| |
| } |
| |
| } |