| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.reservation; |
| |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.NavigableMap; |
| import java.util.TreeMap; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| /** |
| * This is a run length encoded sparse data structure that maintains resource |
| * allocations over time. |
| */ |
| public class RLESparseResourceAllocation { |
| |
| private static final int THRESHOLD = 100; |
| private static final Resource ZERO_RESOURCE = Resources.none(); |
| |
| @SuppressWarnings("checkstyle:visibilitymodifier") |
| protected NavigableMap<Long, Resource> cumulativeCapacity = |
| new TreeMap<Long, Resource>(); |
| |
| private final ReentrantReadWriteLock readWriteLock = |
| new ReentrantReadWriteLock(); |
| @SuppressWarnings("checkstyle:visibilitymodifier") |
| protected final Lock readLock = readWriteLock.readLock(); |
| private final Lock writeLock = readWriteLock.writeLock(); |
| |
| private final ResourceCalculator resourceCalculator; |
| |
| public RLESparseResourceAllocation(ResourceCalculator resourceCalculator) { |
| this.resourceCalculator = resourceCalculator; |
| } |
| |
| public RLESparseResourceAllocation(NavigableMap<Long, Resource> out, |
| ResourceCalculator resourceCalculator) { |
| // miss check for repeated entries |
| this.cumulativeCapacity = out; |
| this.resourceCalculator = resourceCalculator; |
| } |
| |
| /** |
| * Add a resource for the specified interval. |
| * |
| * @param reservationInterval the interval for which the resource is to be |
| * added |
| * @param totCap the resource to be added |
| * @return true if addition is successful, false otherwise |
| */ |
| public boolean addInterval(ReservationInterval reservationInterval, |
| Resource totCap) { |
| if (totCap.equals(ZERO_RESOURCE)) { |
| return true; |
| } |
| writeLock.lock(); |
| try { |
| NavigableMap<Long, Resource> addInt = new TreeMap<Long, Resource>(); |
| addInt.put(reservationInterval.getStartTime(), totCap); |
| addInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE); |
| try { |
| cumulativeCapacity = |
| merge(resourceCalculator, totCap, cumulativeCapacity, addInt, |
| Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.add); |
| } catch (PlanningException e) { |
| // never happens for add |
| } |
| return true; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Removes a resource for the specified interval. |
| * |
| * @param reservationInterval the interval for which the resource is to be |
| * removed |
| * @param totCap the resource to be removed |
| * @return true if removal is successful, false otherwise |
| */ |
| public boolean removeInterval(ReservationInterval reservationInterval, |
| Resource totCap) { |
| if (totCap.equals(ZERO_RESOURCE)) { |
| return true; |
| } |
| writeLock.lock(); |
| try { |
| |
| NavigableMap<Long, Resource> removeInt = new TreeMap<Long, Resource>(); |
| removeInt.put(reservationInterval.getStartTime(), totCap); |
| removeInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE); |
| try { |
| cumulativeCapacity = |
| merge(resourceCalculator, totCap, cumulativeCapacity, removeInt, |
| Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.subtract); |
| } catch (PlanningException e) { |
| // never happens for subtract |
| } |
| return true; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Returns the capacity, i.e. total resources allocated at the specified point |
| * of time. |
| * |
| * @param tick timeStap at which resource needs to be known |
| * @return the resources allocated at the specified time |
| */ |
| public Resource getCapacityAtTime(long tick) { |
| readLock.lock(); |
| try { |
| Entry<Long, Resource> closestStep = cumulativeCapacity.floorEntry(tick); |
| if (closestStep != null) { |
| return Resources.clone(closestStep.getValue()); |
| } |
| return Resources.clone(ZERO_RESOURCE); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Get the timestamp of the earliest resource allocation. |
| * |
| * @return the timestamp of the first resource allocation |
| */ |
| public long getEarliestStartTime() { |
| readLock.lock(); |
| try { |
| if (cumulativeCapacity.isEmpty()) { |
| return -1; |
| } else { |
| return cumulativeCapacity.firstKey(); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Get the timestamp of the latest non-null resource allocation. |
| * |
| * @return the timestamp of the last resource allocation |
| */ |
| public long getLatestNonNullTime() { |
| readLock.lock(); |
| try { |
| if (cumulativeCapacity.isEmpty()) { |
| return -1; |
| } else { |
| // the last entry might contain null (to terminate |
| // the sequence)... return previous one. |
| Entry<Long, Resource> last = cumulativeCapacity.lastEntry(); |
| if (last.getValue() == null) { |
| return cumulativeCapacity.floorKey(last.getKey() - 1); |
| } else { |
| return last.getKey(); |
| } |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Returns true if there are no non-zero entries. |
| * |
| * @return true if there are no allocations or false otherwise |
| */ |
| public boolean isEmpty() { |
| readLock.lock(); |
| try { |
| if (cumulativeCapacity.isEmpty()) { |
| return true; |
| } |
| // Deletion leaves a single zero entry with a null at the end so check for |
| // that |
| if (cumulativeCapacity.size() == 2) { |
| return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE) |
| && cumulativeCapacity.lastEntry().getValue() == null; |
| } |
| return false; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder ret = new StringBuilder(); |
| readLock.lock(); |
| try { |
| if (cumulativeCapacity.size() > THRESHOLD) { |
| ret.append("Number of steps: ").append(cumulativeCapacity.size()) |
| .append(" earliest entry: ").append(cumulativeCapacity.firstKey()) |
| .append(" latest entry: ").append(cumulativeCapacity.lastKey()); |
| } else { |
| for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) { |
| ret.append(r.getKey()).append(": ").append(r.getValue()) |
| .append("\n "); |
| } |
| } |
| return ret.toString(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Returns the representation of the current resources allocated over time as |
| * an interval map (in the defined non-null range). |
| * |
| * @return the representation of the current resources allocated over time as |
| * an interval map. |
| */ |
| public Map<ReservationInterval, Resource> toIntervalMap() { |
| |
| readLock.lock(); |
| try { |
| Map<ReservationInterval, Resource> allocations = |
| new TreeMap<ReservationInterval, Resource>(); |
| |
| // Empty |
| if (isEmpty()) { |
| return allocations; |
| } |
| |
| Map.Entry<Long, Resource> lastEntry = null; |
| for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) { |
| |
| if (lastEntry != null && entry.getValue() != null) { |
| ReservationInterval interval = |
| new ReservationInterval(lastEntry.getKey(), entry.getKey()); |
| Resource resource = lastEntry.getValue(); |
| |
| allocations.put(interval, resource); |
| } |
| |
| lastEntry = entry; |
| } |
| return allocations; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public NavigableMap<Long, Resource> getCumulative() { |
| readLock.lock(); |
| try { |
| return cumulativeCapacity; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public ResourceCalculator getResourceCalculator() { |
| return resourceCalculator; |
| } |
| |
| /** |
| * Merges the range start to end of two {@code RLESparseResourceAllocation} |
| * using a given {@code RLEOperator}. |
| * |
| * @param resCalc the resource calculator |
| * @param clusterResource the total cluster resources (for DRF) |
| * @param a the left operand |
| * @param b the right operand |
| * @param operator the operator to be applied during merge |
| * @param start the start-time of the range to be considered |
| * @param end the end-time of the range to be considered |
| * @return the a merged RLESparseResourceAllocation, produced by applying |
| * "operator" to "a" and "b" |
| * @throws PlanningException in case the operator is subtractTestPositive and |
| * the result would contain a negative value |
| */ |
| public static RLESparseResourceAllocation merge(ResourceCalculator resCalc, |
| Resource clusterResource, RLESparseResourceAllocation a, |
| RLESparseResourceAllocation b, RLEOperator operator, long start, long end) |
| throws PlanningException { |
| NavigableMap<Long, Resource> cumA = |
| a.getRangeOverlapping(start, end).getCumulative(); |
| NavigableMap<Long, Resource> cumB = |
| b.getRangeOverlapping(start, end).getCumulative(); |
| NavigableMap<Long, Resource> out = |
| merge(resCalc, clusterResource, cumA, cumB, start, end, operator); |
| return new RLESparseResourceAllocation(out, resCalc); |
| } |
| |
| private static NavigableMap<Long, Resource> merge(ResourceCalculator resCalc, |
| Resource clusterResource, NavigableMap<Long, Resource> a, |
| NavigableMap<Long, Resource> b, long start, long end, |
| RLEOperator operator) throws PlanningException { |
| |
| // handle special cases of empty input |
| if (a == null || a.isEmpty()) { |
| if (operator == RLEOperator.subtract |
| || operator == RLEOperator.subtractTestNonNegative) { |
| return negate(operator, b); |
| } else { |
| return b; |
| } |
| } |
| if (b == null || b.isEmpty()) { |
| return a; |
| } |
| |
| // define iterators and support variables |
| Iterator<Entry<Long, Resource>> aIt = a.entrySet().iterator(); |
| Iterator<Entry<Long, Resource>> bIt = b.entrySet().iterator(); |
| Entry<Long, Resource> curA = aIt.next(); |
| Entry<Long, Resource> curB = bIt.next(); |
| Entry<Long, Resource> lastA = null; |
| Entry<Long, Resource> lastB = null; |
| boolean aIsDone = false; |
| boolean bIsDone = false; |
| |
| TreeMap<Long, Resource> out = new TreeMap<Long, Resource>(); |
| |
| while (!(curA.equals(lastA) && curB.equals(lastB))) { |
| |
| Resource outRes; |
| long time = -1; |
| |
| // curA is smaller than curB |
| if (bIsDone || (curA.getKey() < curB.getKey() && !aIsDone)) { |
| outRes = combineValue(operator, resCalc, clusterResource, curA, lastB); |
| time = (curA.getKey() < start) ? start : curA.getKey(); |
| lastA = curA; |
| if (aIt.hasNext()) { |
| curA = aIt.next(); |
| } else { |
| aIsDone = true; |
| } |
| |
| } else { |
| // curB is smaller than curA |
| if (aIsDone || (curA.getKey() > curB.getKey() && !bIsDone)) { |
| outRes = |
| combineValue(operator, resCalc, clusterResource, lastA, curB); |
| time = (curB.getKey() < start) ? start : curB.getKey(); |
| lastB = curB; |
| if (bIt.hasNext()) { |
| curB = bIt.next(); |
| } else { |
| bIsDone = true; |
| } |
| |
| } else { |
| // curA is equal to curB |
| outRes = combineValue(operator, resCalc, clusterResource, curA, curB); |
| time = (curA.getKey() < start) ? start : curA.getKey(); |
| lastA = curA; |
| if (aIt.hasNext()) { |
| curA = aIt.next(); |
| } else { |
| aIsDone = true; |
| } |
| lastB = curB; |
| if (bIt.hasNext()) { |
| curB = bIt.next(); |
| } else { |
| bIsDone = true; |
| } |
| } |
| } |
| |
| // add to out if not redundant |
| addIfNeeded(out, time, outRes); |
| } |
| addIfNeeded(out, end, null); |
| |
| return out; |
| } |
| |
| private static NavigableMap<Long, Resource> negate(RLEOperator operator, |
| NavigableMap<Long, Resource> a) throws PlanningException { |
| |
| TreeMap<Long, Resource> out = new TreeMap<Long, Resource>(); |
| for (Entry<Long, Resource> e : a.entrySet()) { |
| Resource val = Resources.negate(e.getValue()); |
| // test for negative value and throws |
| if (operator == RLEOperator.subtractTestNonNegative |
| && (Resources.fitsIn(val, ZERO_RESOURCE) |
| && !Resources.equals(val, ZERO_RESOURCE))) { |
| throw new PlanningException( |
| "RLESparseResourceAllocation: merge failed as the " |
| + "resulting RLESparseResourceAllocation would be negative"); |
| } |
| out.put(e.getKey(), val); |
| } |
| |
| return out; |
| } |
| |
| private static void addIfNeeded(TreeMap<Long, Resource> out, long time, |
| Resource outRes) { |
| |
| if (out.isEmpty() || (out.lastEntry() != null && outRes == null) |
| || (out.lastEntry().getValue() != null |
| && !Resources.equals(out.lastEntry().getValue(), outRes))) { |
| out.put(time, outRes); |
| } |
| |
| } |
| |
| private static Resource combineValue(RLEOperator op, |
| ResourceCalculator resCalc, Resource clusterResource, |
| Entry<Long, Resource> eA, Entry<Long, Resource> eB) |
| throws PlanningException { |
| |
| // deal with nulls |
| if (eA == null || eA.getValue() == null) { |
| if (eB == null || eB.getValue() == null) { |
| return null; |
| } |
| if (op == RLEOperator.subtract) { |
| return Resources.negate(eB.getValue()); |
| } else { |
| return eB.getValue(); |
| } |
| } |
| if (eB == null || eB.getValue() == null) { |
| return eA.getValue(); |
| } |
| |
| Resource a = eA.getValue(); |
| Resource b = eB.getValue(); |
| switch (op) { |
| case add: |
| return Resources.add(a, b); |
| case subtract: |
| return Resources.subtract(a, b); |
| case subtractTestNonNegative: |
| if (!Resources.fitsIn(b, a)) { |
| throw new PlanningException( |
| "RLESparseResourceAllocation: merge failed as the " |
| + "resulting RLESparseResourceAllocation would " |
| + "be negative, when testing: (" + eB + ") > (" + eA + ")"); |
| } else { |
| return Resources.subtract(a, b); |
| } |
| case min: |
| return Resources.min(resCalc, clusterResource, a, b); |
| case max: |
| return Resources.max(resCalc, clusterResource, a, b); |
| default: |
| return null; |
| } |
| |
| } |
| |
| /** |
| * Get a {@link RLESparseResourceAllocation} view of the {@link Resource} |
| * allocations between the specified start and end times. |
| * |
| * @param start the time from which the {@link Resource} allocations are |
| * required |
| * @param end the time upto which the {@link Resource} allocations are |
| * required |
| * @return the overlapping allocations |
| */ |
| public RLESparseResourceAllocation getRangeOverlapping(long start, long end) { |
| readLock.lock(); |
| try { |
| NavigableMap<Long, Resource> a = this.getCumulative(); |
| if (a != null && !a.isEmpty()) { |
| // include the portion of previous entry that overlaps start |
| if (start > a.firstKey()) { |
| long previous = a.floorKey(start); |
| a = a.tailMap(previous, true); |
| } |
| if (end < a.lastKey()) { |
| a = a.headMap(end, true); |
| } |
| } |
| RLESparseResourceAllocation ret = |
| new RLESparseResourceAllocation(a, resourceCalculator); |
| return ret; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * This method shifts all the timestamp of the {@link Resource} entries by the |
| * specified "delta". |
| * |
| * @param delta the time by which to shift the {@link Resource} allocations |
| */ |
| public void shift(long delta) { |
| writeLock.lock(); |
| try { |
| TreeMap<Long, Resource> newCum = new TreeMap<>(); |
| long start; |
| for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) { |
| if (delta > 0) { |
| start = (entry.getKey() == Long.MAX_VALUE) ? Long.MAX_VALUE |
| : entry.getKey() + delta; |
| } else { |
| start = (entry.getKey() == Long.MIN_VALUE) ? Long.MIN_VALUE |
| : entry.getKey() + delta; |
| } |
| newCum.put(start, entry.getValue()); |
| } |
| cumulativeCapacity = newCum; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * The set of operators that can be applied to two |
| * {@code RLESparseResourceAllocation} during a merge operation. |
| */ |
| public enum RLEOperator { |
| add, subtract, min, max, subtractTestNonNegative |
| } |
| |
| /** |
| * Get the maximum capacity across specified time instances. The search-space |
| * is specified using the starting value, tick, and the periodic interval for |
| * search. Maximum resource allocation across tick, tick + period, tick + 2 * |
| * period,..., tick + n * period .. is returned. |
| * |
| * @param tick the starting time instance |
| * @param period interval at which capacity is evaluated |
| * @return maximum resource allocation |
| */ |
| public Resource getMaximumPeriodicCapacity(long tick, long period) { |
| Resource maxCapacity = ZERO_RESOURCE; |
| readLock.lock(); |
| try { |
| if (!cumulativeCapacity.isEmpty()) { |
| Long lastKey = cumulativeCapacity.lastKey(); |
| for (long t = tick; t <= lastKey; t = t + period) { |
| maxCapacity = Resources.componentwiseMax(maxCapacity, |
| cumulativeCapacity.floorEntry(t).getValue()); |
| } |
| } |
| return maxCapacity; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * Get the minimum capacity in the specified time range. |
| * |
| * @param interval the {@link ReservationInterval} to be searched |
| * @return minimum resource allocation |
| */ |
| public Resource getMinimumCapacityInInterval(ReservationInterval interval) { |
| Resource minCapacity = |
| Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE); |
| long start = interval.getStartTime(); |
| long end = interval.getEndTime(); |
| NavigableMap<Long, Resource> capacityRange = |
| getRangeOverlapping(start, end).getCumulative(); |
| if (!capacityRange.isEmpty()) { |
| for (Map.Entry<Long, Resource> entry : capacityRange.entrySet()) { |
| if (entry.getValue() != null) { |
| minCapacity = |
| Resources.componentwiseMin(minCapacity, entry.getValue()); |
| } |
| } |
| } |
| return minCapacity; |
| } |
| |
| } |