blob: 9b18e90d47328b591e19687bbfe21c3c257023cc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashSet;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* A planning algorithm consisting of two main phases. The algorithm iterates
* over the job stages in ascending/descending order, depending on the flag
* allocateLeft. For each stage, the algorithm: 1. Determines an interval
* [stageArrival, stageDeadline) in which the stage is allocated. 2. Computes an
* allocation for the stage inside the interval. For ANY and ALL jobs, phase 1
* sets the allocation window of each stage to be [jobArrival, jobDeadline]. For
* ORDER and ORDER_NO_GAP jobs, the deadline of each stage is set as
* succcessorStartTime - the starting time of its succeeding stage (or
* jobDeadline if it is the last stage). The phases are set using the two
* functions: 1. setAlgStageExecutionInterval 2.setAlgStageAllocator
*/
public class IterativePlanner extends PlanningAlgorithm {
// Modifications performed by the algorithm that are not been reflected in the
// actual plan while a request is still pending.
private RLESparseResourceAllocation planModifications;
// Data extracted from plan
private RLESparseResourceAllocation planLoads;
private Resource capacity;
private long step;
// Job parameters
private ReservationRequestInterpreter jobType;
private long jobArrival;
private long jobDeadline;
// Phase algorithms
private StageExecutionInterval algStageExecutionInterval = null;
private StageAllocator algStageAllocator = null;
private final boolean allocateLeft;
// Constructor
public IterativePlanner(StageExecutionInterval algStageExecutionInterval,
StageAllocator algStageAllocator, boolean allocateLeft) {
this.allocateLeft = allocateLeft;
setAlgStageExecutionInterval(algStageExecutionInterval);
setAlgStageAllocator(algStageAllocator);
}
@Override
public RLESparseResourceAllocation computeJobAllocation(Plan plan,
ReservationId reservationId, ReservationDefinition reservation,
String user) throws PlanningException {
// Initialize
initialize(plan, reservationId, reservation);
// Create the allocations data structure
RLESparseResourceAllocation allocations =
new RLESparseResourceAllocation(plan.getResourceCalculator());
StageProvider stageProvider = new StageProvider(allocateLeft, reservation);
// Current stage
ReservationRequest currentReservationStage;
// initialize periodicity
long period = 0;
if(reservation.getRecurrenceExpression() != null){
period = Long.parseLong(reservation.getRecurrenceExpression());
}
// Iterate the stages in reverse order
while (stageProvider.hasNext()) {
// Get current stage
currentReservationStage = stageProvider.next();
// Validate that the ReservationRequest respects basic constraints
validateInputStage(plan, currentReservationStage);
// Set the stageArrival and stageDeadline
ReservationInterval stageInterval =
setStageExecutionInterval(plan, reservation, currentReservationStage,
allocations);
Long stageArrival = stageInterval.getStartTime();
Long stageDeadline = stageInterval.getEndTime();
// Compute stage allocation
Map<ReservationInterval, Resource> curAlloc =
computeStageAllocation(plan, currentReservationStage, stageArrival,
stageDeadline, period, user, reservationId);
// If we did not find an allocation, return NULL
// (unless it's an ANY job, then we simply continue).
if (curAlloc == null) {
// If it's an ANY job, we can move to the next possible request
if (jobType == ReservationRequestInterpreter.R_ANY) {
continue;
}
// Otherwise, the job cannot be allocated
throw new PlanningException("The request cannot be satisfied");
}
// Validate ORDER_NO_GAP
if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
if (!validateOrderNoGap(allocations, curAlloc, allocateLeft)) {
throw new PlanningException(
"The allocation found does not respect ORDER_NO_GAP");
}
}
// If we did find an allocation for the stage, add it
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
allocations.addInterval(entry.getKey(), entry.getValue());
}
// If this is an ANY clause, we have finished
if (jobType == ReservationRequestInterpreter.R_ANY) {
break;
}
}
// If the allocation is empty, return an error
if (allocations.isEmpty()) {
throw new PlanningException("The request cannot be satisfied");
}
return allocations;
}
protected static boolean validateOrderNoGap(
RLESparseResourceAllocation allocations,
Map<ReservationInterval, Resource> curAlloc, boolean allocateLeft) {
// Left to right
if (allocateLeft) {
Long stageStartTime = findEarliestTime(curAlloc);
Long allocationEndTime = allocations.getLatestNonNullTime();
// Check that there is no gap between stages
if ((allocationEndTime != -1) && (allocationEndTime < stageStartTime)) {
return false;
}
// Right to left
} else {
Long stageEndTime = findLatestTime(curAlloc);
Long allocationStartTime = allocations.getEarliestStartTime();
// Check that there is no gap between stages
if ((allocationStartTime != -1) && (stageEndTime < allocationStartTime)) {
return false;
}
}
// Check that the stage allocation does not violate ORDER_NO_GAP
if (!isNonPreemptiveAllocation(curAlloc)) {
return false;
}
// The allocation is legal
return true;
}
protected void initialize(Plan plan, ReservationId reservationId,
ReservationDefinition reservation) throws PlanningException {
// Get plan step & capacity
capacity = plan.getTotalCapacity();
step = plan.getStep();
// Get job parameters (type, arrival time & deadline)
jobType = reservation.getReservationRequests().getInterpreter();
jobArrival = stepRoundUp(reservation.getArrival(), step);
jobDeadline = stepRoundDown(reservation.getDeadline(), step);
// Initialize the plan modifications
planModifications =
new RLESparseResourceAllocation(plan.getResourceCalculator());
// Dirty read of plan load
// planLoads are not used by other StageAllocators... and don't deal
// well with huge reservation ranges
planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline);
ReservationAllocation oldRes = plan.getReservationById(reservationId);
if (oldRes != null) {
planLoads = RLESparseResourceAllocation.merge(
plan.getResourceCalculator(), plan.getTotalCapacity(), planLoads,
oldRes.getResourcesOverTime(jobArrival, jobDeadline),
RLEOperator.subtract, jobArrival, jobDeadline);
}
}
private void validateInputStage(Plan plan, ReservationRequest rr)
throws ContractValidationException {
// Validate concurrency
if (rr.getConcurrency() < 1) {
throw new ContractValidationException("Gang Size should be >= 1");
}
// Validate number of containers
if (rr.getNumContainers() <= 0) {
throw new ContractValidationException("Num containers should be > 0");
}
// Check that gangSize and numContainers are compatible
if (rr.getNumContainers() % rr.getConcurrency() != 0) {
throw new ContractValidationException(
"Parallelism must be an exact multiple of gang size");
}
// Check that the largest container request does not exceed the cluster-wide
// limit for container sizes
if (Resources.greaterThan(plan.getResourceCalculator(), capacity,
rr.getCapability(), plan.getMaximumAllocation())) {
throw new ContractValidationException(
"Individual capability requests should not exceed cluster's "
+ "maxAlloc");
}
}
private static boolean isNonPreemptiveAllocation(
Map<ReservationInterval, Resource> curAlloc) {
// Checks whether a stage allocation is non preemptive or not.
// Assumption: the intervals are non-intersecting (as returned by
// computeStageAllocation()).
// For a non-preemptive allocation, only two end points appear exactly once
Set<Long> endPoints = new HashSet<Long>(2 * curAlloc.size());
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
ReservationInterval interval = entry.getKey();
Resource resource = entry.getValue();
// Ignore intervals with no allocation
if (Resources.equals(resource, Resource.newInstance(0, 0))) {
continue;
}
// Get endpoints
Long left = interval.getStartTime();
Long right = interval.getEndTime();
// Add left endpoint if we haven't seen it before, remove otherwise
if (!endPoints.contains(left)) {
endPoints.add(left);
} else {
endPoints.remove(left);
}
// Add right endpoint if we haven't seen it before, remove otherwise
if (!endPoints.contains(right)) {
endPoints.add(right);
} else {
endPoints.remove(right);
}
}
// Non-preemptive only if endPoints is of size 2
return (endPoints.size() == 2);
}
// Call setStageExecutionInterval()
protected ReservationInterval setStageExecutionInterval(Plan plan,
ReservationDefinition reservation,
ReservationRequest currentReservationStage,
RLESparseResourceAllocation allocations) {
return algStageExecutionInterval.computeExecutionInterval(plan,
reservation, currentReservationStage, allocateLeft, allocations);
}
// Call algStageAllocator
protected Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
ReservationRequest rr, long stageArrivalTime, long stageDeadline,
long period, String user, ReservationId oldId) throws PlanningException {
return algStageAllocator.computeStageAllocation(plan, planLoads,
planModifications, rr, stageArrivalTime, stageDeadline, period, user,
oldId);
}
// Set the algorithm: algStageExecutionInterval
public IterativePlanner setAlgStageExecutionInterval(
StageExecutionInterval alg) {
this.algStageExecutionInterval = alg;
return this; // To allow concatenation of setAlg() functions
}
// Set the algorithm: algStageAllocator
public IterativePlanner setAlgStageAllocator(StageAllocator alg) {
this.algStageAllocator = alg;
return this; // To allow concatenation of setAlg() functions
}
/**
* Helper class that provide a list of ReservationRequests and iterates
* forward or backward depending whether we are allocating left-to-right or
* right-to-left.
*/
public static class StageProvider {
private final boolean allocateLeft;
private final ListIterator<ReservationRequest> li;
public StageProvider(boolean allocateLeft,
ReservationDefinition reservation) {
this.allocateLeft = allocateLeft;
int startingIndex;
if (allocateLeft) {
startingIndex = 0;
} else {
startingIndex =
reservation.getReservationRequests().getReservationResources()
.size();
}
// Get a reverse iterator for the set of stages
li =
reservation.getReservationRequests().getReservationResources()
.listIterator(startingIndex);
}
public boolean hasNext() {
if (allocateLeft) {
return li.hasNext();
} else {
return li.hasPrevious();
}
}
public ReservationRequest next() {
if (allocateLeft) {
return li.next();
} else {
return li.previous();
}
}
public int getCurrentIndex() {
if (allocateLeft) {
return li.nextIndex() - 1;
} else {
return li.previousIndex() + 1;
}
}
}
}