| /******************************************************************************* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| *******************************************************************************/ |
| package org.apache.hadoop.yarn.server.resourcemanager.reservation; |
| |
| import java.util.Date; |
| |
| import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| /** |
| * This policy enforces a time-extended notion of Capacity. In particular it |
| * guarantees that the allocation received in input when combined with all |
| * previous allocation for the user does not violate an instantaneous max limit |
| * on the resources received, and that for every window of time of length |
| * validWindow, the integral of the allocations for a user (sum of the currently |
| * submitted allocation and all prior allocations for the user) does not exceed |
| * validWindow * maxAvg. |
| * |
| * This allows flexibility, in the sense that an allocation can instantaneously |
| * use large portions of the available capacity, but prevents abuses by bounding |
| * the average use over time. |
| * |
| * By controlling maxInst, maxAvg, validWindow the administrator configuring |
| * this policy can obtain a behavior ranging from instantaneously enforced |
| * capacity (akin to existing queues), or fully flexible allocations (likely |
| * reserved to super-users, or trusted systems). |
| */ |
| @LimitedPrivate("yarn") |
| @Unstable |
| public class CapacityOverTimePolicy implements SharingPolicy { |
| |
| private CapacitySchedulerConfiguration conf; |
| private long validWindow; |
| private float maxInst; |
| private float maxAvg; |
| |
| // For now this is CapacityScheduler specific, but given a hierarchy in the |
| // configuration structure of the schedulers (e.g., SchedulerConfiguration) |
| // it should be easy to remove this limitation |
| @Override |
| public void init(String reservationQueuePath, Configuration conf) { |
| if (!(conf instanceof CapacitySchedulerConfiguration)) { |
| throw new IllegalArgumentException("Unexpected conf type: " |
| + conf.getClass().getSimpleName() + " only supported conf is: " |
| + CapacitySchedulerConfiguration.class.getSimpleName()); |
| } |
| this.conf = (CapacitySchedulerConfiguration) conf; |
| validWindow = this.conf.getReservationWindow(reservationQueuePath); |
| maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; |
| maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; |
| }; |
| |
| @Override |
| public void validate(Plan plan, ReservationAllocation reservation) |
| throws PlanningException { |
| |
| // this is entire method invoked under a write-lock on the plan, no need |
| // to synchronize accesses to the plan further |
| |
| // Try to verify whether there is already a reservation with this ID in |
| // the system (remove its contribution during validation to simulate a |
| // try-n-swap |
| // update). |
| ReservationAllocation oldReservation = |
| plan.getReservationById(reservation.getReservationId()); |
| |
| // sanity check that the update of a reservation is not changing username |
| if (oldReservation != null |
| && !oldReservation.getUser().equals(reservation.getUser())) { |
| throw new MismatchedUserException( |
| "Updating an existing reservation with mismatched user:" |
| + oldReservation.getUser() + " != " + reservation.getUser()); |
| } |
| |
| long startTime = reservation.getStartTime(); |
| long endTime = reservation.getEndTime(); |
| long step = plan.getStep(); |
| |
| Resource planTotalCapacity = plan.getTotalCapacity(); |
| |
| Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg); |
| Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst); |
| |
| // define variable that will store integral of resources (need diff class to |
| // avoid overflow issues for long/large allocations) |
| IntegralResource runningTot = new IntegralResource(0L, 0L); |
| IntegralResource maxAllowed = new IntegralResource(maxAvgRes); |
| maxAllowed.multiplyBy(validWindow / step); |
| |
| // check that the resources offered to the user during any window of length |
| // "validWindow" overlapping this allocation are within maxAllowed |
| // also enforce instantaneous and physical constraints during this pass |
| for (long t = startTime - validWindow; t < endTime + validWindow; t += step) { |
| |
| Resource currExistingAllocTot = plan.getTotalCommittedResources(t); |
| Resource currExistingAllocForUser = |
| plan.getConsumptionForUser(reservation.getUser(), t); |
| Resource currNewAlloc = reservation.getResourcesAtTime(t); |
| Resource currOldAlloc = Resources.none(); |
| if (oldReservation != null) { |
| currOldAlloc = oldReservation.getResourcesAtTime(t); |
| } |
| |
| // throw exception if the cluster is overcommitted |
| // tot_allocated - old + new > capacity |
| Resource inst = |
| Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc), |
| currOldAlloc); |
| if (Resources.greaterThan(plan.getResourceCalculator(), |
| planTotalCapacity, inst, planTotalCapacity)) { |
| throw new ResourceOverCommitException(" Resources at time " + t |
| + " would be overcommitted (" + inst + " over " |
| + plan.getTotalCapacity() + ") by accepting reservation: " |
| + reservation.getReservationId()); |
| } |
| |
| // throw exception if instantaneous limits are violated |
| // tot_alloc_to_this_user - old + new > inst_limit |
| if (Resources.greaterThan(plan.getResourceCalculator(), |
| planTotalCapacity, Resources.subtract( |
| Resources.add(currExistingAllocForUser, currNewAlloc), |
| currOldAlloc), maxInsRes)) { |
| throw new PlanningQuotaException("Instantaneous quota capacity " |
| + maxInst + " would be passed at time " + t |
| + " by accepting reservation: " + reservation.getReservationId()); |
| } |
| |
| // throw exception if the running integral of utilization over validWindow |
| // is violated. We perform a delta check, adding/removing instants at the |
| // boundary of the window from runningTot. |
| |
| // runningTot = previous_runningTot + currExistingAllocForUser + |
| // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc; |
| |
| // Where: |
| // 1) currNewAlloc, currExistingAllocForUser represent the contribution of |
| // the instant in time added in this pass. |
| // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time |
| // instants that are being retired from the the window |
| // 3) currOldAlloc is the contribution (if any) of the previous version of |
| // this reservation (the one we are updating) |
| |
| runningTot.add(currExistingAllocForUser); |
| runningTot.add(currNewAlloc); |
| runningTot.subtract(currOldAlloc); |
| |
| // expire contributions from instant in time before (t - validWindow) |
| if (t > startTime) { |
| Resource pastOldAlloc = |
| plan.getConsumptionForUser(reservation.getUser(), t - validWindow); |
| Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow); |
| |
| // runningTot = runningTot - pastExistingAlloc - pastNewAlloc; |
| runningTot.subtract(pastOldAlloc); |
| runningTot.subtract(pastNewAlloc); |
| } |
| |
| // check integral |
| // runningTot > maxAvg * validWindow |
| // NOTE: we need to use comparator of IntegralResource directly, as |
| // Resource and ResourceCalculator assume "int" amount of resources, |
| // which is not sufficient when comparing integrals (out-of-bound) |
| if (maxAllowed.compareTo(runningTot) < 0) { |
| throw new PlanningQuotaException( |
| "Integral (avg over time) quota capacity " + maxAvg |
| + " over a window of " + validWindow / 1000 + " seconds, " |
| + " would be passed at time " + t + "(" + new Date(t) |
| + ") by accepting reservation: " |
| + reservation.getReservationId()); |
| } |
| } |
| } |
| |
| @Override |
| public long getValidWindow() { |
| return validWindow; |
| } |
| |
| /** |
| * This class provides support for Resource-like book-keeping, based on |
| * long(s), as using Resource to store the "integral" of the allocation over |
| * time leads to integer overflows for large allocations/clusters. (Evolving |
| * Resource to use long is too disruptive at this point.) |
| * |
| * The comparison/multiplication behaviors of IntegralResource are consistent |
| * with the DefaultResourceCalculator. |
| */ |
| private static class IntegralResource { |
| long memory; |
| long vcores; |
| |
| public IntegralResource(Resource resource) { |
| this.memory = resource.getMemory(); |
| this.vcores = resource.getVirtualCores(); |
| } |
| |
| public IntegralResource(long mem, long vcores) { |
| this.memory = mem; |
| this.vcores = vcores; |
| } |
| |
| public void add(Resource r) { |
| memory += r.getMemory(); |
| vcores += r.getVirtualCores(); |
| } |
| |
| public void subtract(Resource r) { |
| memory -= r.getMemory(); |
| vcores -= r.getVirtualCores(); |
| } |
| |
| public void multiplyBy(long window) { |
| memory = memory * window; |
| vcores = vcores * window; |
| } |
| |
| public long compareTo(IntegralResource other) { |
| long diff = memory - other.memory; |
| if (diff == 0) { |
| diff = vcores - other.vcores; |
| } |
| return diff; |
| } |
| |
| @Override |
| public String toString() { |
| return "<memory:" + memory + ", vCores:" + vcores + ">"; |
| } |
| } |
| } |