blob: f67973faefea1c3ca76a16a9344dfe1062b688a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeSet;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* A stage allocator that iteratively allocates containers in the
* {@link DurationInterval} with lowest overall cost. The algorithm only
* considers non-overlapping intervals of length 'duration'. This guarantees
* that the allocations are aligned. If 'allocateLeft == true', the intervals
* considered by the algorithm are aligned to stageArrival; otherwise, they are
* aligned to stageDeadline. The smoothnessFactor parameter controls the number
* of containers that are simultaneously allocated in each iteration of the
* algorithm.
*/
public class StageAllocatorLowCostAligned implements StageAllocator {
private final boolean allocateLeft;
// Smoothness factor
private int smoothnessFactor = 10;
// Constructor
public StageAllocatorLowCostAligned(boolean allocateLeft) {
this.allocateLeft = allocateLeft;
}
// Constructor
public StageAllocatorLowCostAligned(int smoothnessFactor,
boolean allocateLeft) {
this.allocateLeft = allocateLeft;
this.smoothnessFactor = smoothnessFactor;
}
@Override
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageArrival, long stageDeadline, long period, String user,
ReservationId oldId) throws PlanningException {
// Initialize
ResourceCalculator resCalc = plan.getResourceCalculator();
Resource capacity = plan.getTotalCapacity();
RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
user, oldId, stageArrival, stageDeadline, period);
long step = plan.getStep();
// Create allocationRequestsearlies
RLESparseResourceAllocation allocationRequests =
new RLESparseResourceAllocation(plan.getResourceCalculator());
// Initialize parameters
long duration = stepRoundUp(rr.getDuration(), step);
int windowSizeInDurations =
(int) ((stageDeadline - stageArrival) / duration);
int totalGangs = rr.getNumContainers() / rr.getConcurrency();
int numContainersPerGang = rr.getConcurrency();
Resource gang =
Resources.multiply(rr.getCapability(), numContainersPerGang);
// Set maxGangsPerUnit
int maxGangsPerUnit = (int) Math
.max(Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
// If window size is too small, return null
if (windowSizeInDurations <= 0) {
return null;
}
final int preferLeft = allocateLeft ? 1 : -1;
// Initialize tree sorted by costs
TreeSet<DurationInterval> durationIntervalsSortedByCost =
new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
@Override
public int compare(DurationInterval val1, DurationInterval val2) {
int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost());
if (cmp != 0) {
return cmp;
}
return preferLeft
* Long.compare(val1.getEndTime(), val2.getEndTime());
}
});
List<Long> intervalEndTimes =
computeIntervalEndTimes(stageArrival, stageDeadline, duration);
// Add durationIntervals that end at (endTime - n*duration) for some n.
for (long intervalEnd : intervalEndTimes) {
long intervalStart = intervalEnd - duration;
// Get duration interval [intervalStart,intervalEnd)
DurationInterval durationInterval =
getDurationInterval(intervalStart, intervalEnd, planLoads,
planModifications, capacity, netRLERes, resCalc, step, gang);
// If the interval can fit a gang, add it to the tree
if (durationInterval.canAllocate()) {
durationIntervalsSortedByCost.add(durationInterval);
}
}
// Allocate
int remainingGangs = totalGangs;
while (remainingGangs > 0) {
// If no durationInterval can fit a gang, break and return null
if (durationIntervalsSortedByCost.isEmpty()) {
break;
}
// Get best duration interval
DurationInterval bestDurationInterval =
durationIntervalsSortedByCost.first();
int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
numGangsToAllocate =
Math.min(numGangsToAllocate, bestDurationInterval.numCanFit());
// Add it
remainingGangs -= numGangsToAllocate;
ReservationInterval reservationInt =
new ReservationInterval(bestDurationInterval.getStartTime(),
bestDurationInterval.getEndTime());
Resource reservationRes = Resources.multiply(rr.getCapability(),
rr.getConcurrency() * numGangsToAllocate);
planModifications.addInterval(reservationInt, reservationRes);
allocationRequests.addInterval(reservationInt, reservationRes);
// Remove from tree
durationIntervalsSortedByCost.remove(bestDurationInterval);
// Get updated interval
DurationInterval updatedDurationInterval =
getDurationInterval(bestDurationInterval.getStartTime(),
bestDurationInterval.getStartTime() + duration, planLoads,
planModifications, capacity, netRLERes, resCalc, step, gang);
// Add to tree, if possible
if (updatedDurationInterval.canAllocate()) {
durationIntervalsSortedByCost.add(updatedDurationInterval);
}
}
// Get the final allocation
Map<ReservationInterval, Resource> allocations =
allocationRequests.toIntervalMap();
// If no gangs are left to place we succeed and return the allocation
if (remainingGangs <= 0) {
return allocations;
} else {
// If we are here is because we did not manage to satisfy this
// request.
// We remove unwanted side-effect from planModifications (needed for
// ANY).
for (Map.Entry<ReservationInterval, Resource> tempAllocation : allocations
.entrySet()) {
planModifications.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue());
}
// Return null to signal failure in this allocation
return null;
}
}
private List<Long> computeIntervalEndTimes(long stageEarliestStart,
long stageDeadline, long duration) {
List<Long> intervalEndTimes = new ArrayList<Long>();
if (!allocateLeft) {
for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
+ duration; intervalEnd -= duration) {
intervalEndTimes.add(intervalEnd);
}
} else {
for (long intervalStart =
stageEarliestStart; intervalStart <= stageDeadline
- duration; intervalStart += duration) {
intervalEndTimes.add(intervalStart + duration);
}
}
return intervalEndTimes;
}
protected static DurationInterval getDurationInterval(long startTime,
long endTime, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
RLESparseResourceAllocation netRLERes, ResourceCalculator resCalc,
long step, Resource requestedResources) throws PlanningException {
// Get the total cost associated with the duration interval
double totalCost = getDurationIntervalTotalCost(startTime, endTime,
planLoads, planModifications, capacity, resCalc, step);
// Calculate how many gangs can fit, i.e., how many times can 'capacity'
// be allocated within the duration interval [startTime, endTime)
int gangsCanFit = getDurationIntervalGangsCanFit(startTime, endTime,
planModifications, capacity, netRLERes, resCalc, requestedResources);
// Return the desired durationInterval
return new DurationInterval(startTime, endTime, totalCost, gangsCanFit);
}
protected static double getDurationIntervalTotalCost(long startTime,
long endTime, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc, long step) throws PlanningException {
// Compute the current resource load within the interval [startTime,endTime)
// by adding planLoads (existing load) and planModifications (load that
// corresponds to the current job).
RLESparseResourceAllocation currentLoad =
RLESparseResourceAllocation.merge(resCalc, capacity, planLoads,
planModifications, RLEOperator.add, startTime, endTime);
// Convert load from RLESparseResourceAllocation to a Map representation
NavigableMap<Long, Resource> mapCurrentLoad = currentLoad.getCumulative();
// Initialize auxiliary variables
double totalCost = 0.0;
Long tPrev = -1L;
Resource loadPrev = Resources.none();
double cost = 0.0;
// Iterate over time points. For each point 't', accumulate the total cost
// that corresponds to the interval [tPrev, t). The cost associated within
// this interval is fixed for each of the time steps, therefore the cost of
// a single step is multiplied by (t - tPrev) / step.
for (Entry<Long, Resource> e : mapCurrentLoad.entrySet()) {
Long t = e.getKey();
Resource load = e.getValue();
if (tPrev != -1L) {
tPrev = Math.max(tPrev, startTime);
cost = calcCostOfLoad(loadPrev, capacity, resCalc);
totalCost = totalCost + cost * (t - tPrev) / step;
}
tPrev = t;
loadPrev = load;
}
// Add the cost associated with the last interval (the for loop does not
// calculate it).
if (loadPrev != null) {
// This takes care of the corner case of a single entry
tPrev = Math.max(tPrev, startTime);
cost = calcCostOfLoad(loadPrev, capacity, resCalc);
totalCost = totalCost + cost * (endTime - tPrev) / step;
}
// Return the overall cost
return totalCost;
}
protected static int getDurationIntervalGangsCanFit(long startTime,
long endTime, RLESparseResourceAllocation planModifications,
Resource capacity, RLESparseResourceAllocation netRLERes,
ResourceCalculator resCalc, Resource requestedResources)
throws PlanningException {
// Initialize auxiliary variables
int gangsCanFit = Integer.MAX_VALUE;
int curGangsCanFit;
// Calculate the total amount of available resources between startTime
// and endTime, by subtracting planModifications from netRLERes
RLESparseResourceAllocation netAvailableResources =
RLESparseResourceAllocation.merge(resCalc, capacity, netRLERes,
planModifications, RLEOperator.subtractTestNonNegative, startTime,
endTime);
// Convert result to a map
NavigableMap<Long, Resource> mapAvailableCapacity =
netAvailableResources.getCumulative();
// Iterate over the map representation.
// At each point, calculate how many times does 'requestedResources' fit.
// The result is the minimum over all time points.
for (Entry<Long, Resource> e : mapAvailableCapacity.entrySet()) {
Long t = e.getKey();
Resource curAvailable = e.getValue();
if (t >= endTime) {
break;
}
if (curAvailable == null) {
gangsCanFit = 0;
} else {
curGangsCanFit = (int) Math.floor(Resources.divide(resCalc, capacity,
curAvailable, requestedResources));
if (curGangsCanFit < gangsCanFit) {
gangsCanFit = curGangsCanFit;
}
}
}
return gangsCanFit;
}
protected double calcCostOfInterval(long startTime, long endTime,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc, long step) {
// Sum costs in the interval [startTime,endTime)
double totalCost = 0.0;
for (long t = startTime; t < endTime; t += step) {
totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity,
resCalc);
}
// Return sum
return totalCost;
}
protected double calcCostOfTimeSlot(long t,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc) {
// Get the current load at time t
Resource load = getLoadAtTime(t, planLoads, planModifications);
// Return cost
return calcCostOfLoad(load, capacity, resCalc);
}
protected Resource getLoadAtTime(long t,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications) {
Resource planLoad = planLoads.getCapacityAtTime(t);
return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
}
protected static double calcCostOfLoad(Resource load, Resource capacity,
ResourceCalculator resCalc) {
return resCalc.ratio(load, capacity);
}
protected static long stepRoundDown(long t, long step) {
return (t / step) * step;
}
protected static long stepRoundUp(long t, long step) {
return ((t + step - 1) / step) * step;
}
/**
* An inner class that represents an interval, typically of length duration.
* The class holds the total cost of the interval and the maximal load inside
* the interval in each dimension (both calculated externally).
*/
protected static class DurationInterval {
private long startTime;
private long endTime;
private double cost;
private final int gangsCanFit;
// Constructor
public DurationInterval(long startTime, long endTime, double cost,
int gangsCanfit) {
this.startTime = startTime;
this.endTime = endTime;
this.cost = cost;
this.gangsCanFit = gangsCanfit;
}
// canAllocate() - boolean function, returns whether requestedResources
// can be allocated during the durationInterval without
// violating capacity constraints
public boolean canAllocate() {
return (gangsCanFit > 0);
}
// numCanFit() - returns the maximal number of requestedResources can be
// allocated during the durationInterval without violating
// capacity constraints
public int numCanFit() {
return gangsCanFit;
}
public long getStartTime() {
return this.startTime;
}
public void setStartTime(long value) {
this.startTime = value;
}
public long getEndTime() {
return this.endTime;
}
public void setEndTime(long value) {
this.endTime = value;
}
public double getTotalCost() {
return this.cost;
}
public void setTotalCost(double value) {
this.cost = value;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" start: " + startTime).append(" end: " + endTime)
.append(" cost: " + cost).append(" gangsCanFit: " + gangsCanFit);
return sb.toString();
}
}
}