| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; |
| |
| import java.util.HashSet; |
| import java.util.ListIterator; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| |
| import org.apache.hadoop.yarn.api.records.ReservationDefinition; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.ReservationRequest; |
| import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| /** |
| * A planning algorithm consisting of two main phases. The algorithm iterates |
| * over the job stages in ascending/descending order, depending on the flag |
| * allocateLeft. For each stage, the algorithm: 1. Determines an interval |
| * [stageArrival, stageDeadline) in which the stage is allocated. 2. Computes an |
| * allocation for the stage inside the interval. For ANY and ALL jobs, phase 1 |
| * sets the allocation window of each stage to be [jobArrival, jobDeadline]. For |
| * ORDER and ORDER_NO_GAP jobs, the deadline of each stage is set as |
| * succcessorStartTime - the starting time of its succeeding stage (or |
| * jobDeadline if it is the last stage). The phases are set using the two |
| * functions: 1. setAlgStageExecutionInterval 2.setAlgStageAllocator |
| */ |
| public class IterativePlanner extends PlanningAlgorithm { |
| |
| // Modifications performed by the algorithm that are not been reflected in the |
| // actual plan while a request is still pending. |
| private RLESparseResourceAllocation planModifications; |
| |
| // Data extracted from plan |
| private RLESparseResourceAllocation planLoads; |
| private Resource capacity; |
| private long step; |
| |
| // Job parameters |
| private ReservationRequestInterpreter jobType; |
| private long jobArrival; |
| private long jobDeadline; |
| |
| // Phase algorithms |
| private StageExecutionInterval algStageExecutionInterval = null; |
| private StageAllocator algStageAllocator = null; |
| private final boolean allocateLeft; |
| |
| // Constructor |
| public IterativePlanner(StageExecutionInterval algStageExecutionInterval, |
| StageAllocator algStageAllocator, boolean allocateLeft) { |
| |
| this.allocateLeft = allocateLeft; |
| setAlgStageExecutionInterval(algStageExecutionInterval); |
| setAlgStageAllocator(algStageAllocator); |
| |
| } |
| |
| @Override |
| public RLESparseResourceAllocation computeJobAllocation(Plan plan, |
| ReservationId reservationId, ReservationDefinition reservation, |
| String user) throws PlanningException { |
| |
| // Initialize |
| initialize(plan, reservationId, reservation); |
| |
| // Create the allocations data structure |
| RLESparseResourceAllocation allocations = |
| new RLESparseResourceAllocation(plan.getResourceCalculator()); |
| |
| StageProvider stageProvider = new StageProvider(allocateLeft, reservation); |
| |
| // Current stage |
| ReservationRequest currentReservationStage; |
| |
| // initialize periodicity |
| long period = 0; |
| if(reservation.getRecurrenceExpression() != null){ |
| period = Long.parseLong(reservation.getRecurrenceExpression()); |
| } |
| |
| // Iterate the stages in reverse order |
| while (stageProvider.hasNext()) { |
| |
| // Get current stage |
| currentReservationStage = stageProvider.next(); |
| |
| // Validate that the ReservationRequest respects basic constraints |
| validateInputStage(plan, currentReservationStage); |
| |
| // Set the stageArrival and stageDeadline |
| ReservationInterval stageInterval = |
| setStageExecutionInterval(plan, reservation, currentReservationStage, |
| allocations); |
| Long stageArrival = stageInterval.getStartTime(); |
| Long stageDeadline = stageInterval.getEndTime(); |
| |
| // Compute stage allocation |
| Map<ReservationInterval, Resource> curAlloc = |
| computeStageAllocation(plan, currentReservationStage, stageArrival, |
| stageDeadline, period, user, reservationId); |
| |
| // If we did not find an allocation, return NULL |
| // (unless it's an ANY job, then we simply continue). |
| if (curAlloc == null) { |
| |
| // If it's an ANY job, we can move to the next possible request |
| if (jobType == ReservationRequestInterpreter.R_ANY) { |
| continue; |
| } |
| |
| // Otherwise, the job cannot be allocated |
| throw new PlanningException("The request cannot be satisfied"); |
| |
| } |
| |
| // Validate ORDER_NO_GAP |
| if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { |
| if (!validateOrderNoGap(allocations, curAlloc, allocateLeft)) { |
| throw new PlanningException( |
| "The allocation found does not respect ORDER_NO_GAP"); |
| } |
| } |
| |
| // If we did find an allocation for the stage, add it |
| for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) { |
| allocations.addInterval(entry.getKey(), entry.getValue()); |
| } |
| |
| // If this is an ANY clause, we have finished |
| if (jobType == ReservationRequestInterpreter.R_ANY) { |
| break; |
| } |
| } |
| |
| // If the allocation is empty, return an error |
| if (allocations.isEmpty()) { |
| throw new PlanningException("The request cannot be satisfied"); |
| } |
| |
| return allocations; |
| } |
| |
| protected static boolean validateOrderNoGap( |
| RLESparseResourceAllocation allocations, |
| Map<ReservationInterval, Resource> curAlloc, boolean allocateLeft) { |
| |
| // Left to right |
| if (allocateLeft) { |
| Long stageStartTime = findEarliestTime(curAlloc); |
| Long allocationEndTime = allocations.getLatestNonNullTime(); |
| |
| // Check that there is no gap between stages |
| if ((allocationEndTime != -1) && (allocationEndTime < stageStartTime)) { |
| return false; |
| } |
| // Right to left |
| } else { |
| Long stageEndTime = findLatestTime(curAlloc); |
| Long allocationStartTime = allocations.getEarliestStartTime(); |
| |
| // Check that there is no gap between stages |
| if ((allocationStartTime != -1) && (stageEndTime < allocationStartTime)) { |
| return false; |
| } |
| } |
| |
| // Check that the stage allocation does not violate ORDER_NO_GAP |
| if (!isNonPreemptiveAllocation(curAlloc)) { |
| return false; |
| } |
| |
| // The allocation is legal |
| return true; |
| } |
| |
| protected void initialize(Plan plan, ReservationId reservationId, |
| ReservationDefinition reservation) throws PlanningException { |
| |
| // Get plan step & capacity |
| capacity = plan.getTotalCapacity(); |
| step = plan.getStep(); |
| |
| // Get job parameters (type, arrival time & deadline) |
| jobType = reservation.getReservationRequests().getInterpreter(); |
| jobArrival = stepRoundUp(reservation.getArrival(), step); |
| jobDeadline = stepRoundDown(reservation.getDeadline(), step); |
| |
| // Initialize the plan modifications |
| planModifications = |
| new RLESparseResourceAllocation(plan.getResourceCalculator()); |
| |
| // Dirty read of plan load |
| |
| // planLoads are not used by other StageAllocators... and don't deal |
| // well with huge reservation ranges |
| planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline); |
| ReservationAllocation oldRes = plan.getReservationById(reservationId); |
| if (oldRes != null) { |
| planLoads = RLESparseResourceAllocation.merge( |
| plan.getResourceCalculator(), plan.getTotalCapacity(), planLoads, |
| oldRes.getResourcesOverTime(jobArrival, jobDeadline), |
| RLEOperator.subtract, jobArrival, jobDeadline); |
| } |
| } |
| |
| private void validateInputStage(Plan plan, ReservationRequest rr) |
| throws ContractValidationException { |
| |
| // Validate concurrency |
| if (rr.getConcurrency() < 1) { |
| throw new ContractValidationException("Gang Size should be >= 1"); |
| } |
| |
| // Validate number of containers |
| if (rr.getNumContainers() <= 0) { |
| throw new ContractValidationException("Num containers should be > 0"); |
| } |
| |
| // Check that gangSize and numContainers are compatible |
| if (rr.getNumContainers() % rr.getConcurrency() != 0) { |
| throw new ContractValidationException( |
| "Parallelism must be an exact multiple of gang size"); |
| } |
| |
| // Check that the largest container request does not exceed the cluster-wide |
| // limit for container sizes |
| if (Resources.greaterThan(plan.getResourceCalculator(), capacity, |
| rr.getCapability(), plan.getMaximumAllocation())) { |
| |
| throw new ContractValidationException( |
| "Individual capability requests should not exceed cluster's " |
| + "maxAlloc"); |
| |
| } |
| |
| } |
| |
| private static boolean isNonPreemptiveAllocation( |
| Map<ReservationInterval, Resource> curAlloc) { |
| |
| // Checks whether a stage allocation is non preemptive or not. |
| // Assumption: the intervals are non-intersecting (as returned by |
| // computeStageAllocation()). |
| // For a non-preemptive allocation, only two end points appear exactly once |
| |
| Set<Long> endPoints = new HashSet<Long>(2 * curAlloc.size()); |
| for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) { |
| |
| ReservationInterval interval = entry.getKey(); |
| Resource resource = entry.getValue(); |
| |
| // Ignore intervals with no allocation |
| if (Resources.equals(resource, Resource.newInstance(0, 0))) { |
| continue; |
| } |
| |
| // Get endpoints |
| Long left = interval.getStartTime(); |
| Long right = interval.getEndTime(); |
| |
| // Add left endpoint if we haven't seen it before, remove otherwise |
| if (!endPoints.contains(left)) { |
| endPoints.add(left); |
| } else { |
| endPoints.remove(left); |
| } |
| |
| // Add right endpoint if we haven't seen it before, remove otherwise |
| if (!endPoints.contains(right)) { |
| endPoints.add(right); |
| } else { |
| endPoints.remove(right); |
| } |
| } |
| |
| // Non-preemptive only if endPoints is of size 2 |
| return (endPoints.size() == 2); |
| |
| } |
| |
| // Call setStageExecutionInterval() |
| protected ReservationInterval setStageExecutionInterval(Plan plan, |
| ReservationDefinition reservation, |
| ReservationRequest currentReservationStage, |
| RLESparseResourceAllocation allocations) { |
| return algStageExecutionInterval.computeExecutionInterval(plan, |
| reservation, currentReservationStage, allocateLeft, allocations); |
| } |
| |
| // Call algStageAllocator |
| protected Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, |
| ReservationRequest rr, long stageArrivalTime, long stageDeadline, |
| long period, String user, ReservationId oldId) throws PlanningException { |
| |
| return algStageAllocator.computeStageAllocation(plan, planLoads, |
| planModifications, rr, stageArrivalTime, stageDeadline, period, user, |
| oldId); |
| |
| } |
| |
| // Set the algorithm: algStageExecutionInterval |
| public IterativePlanner setAlgStageExecutionInterval( |
| StageExecutionInterval alg) { |
| |
| this.algStageExecutionInterval = alg; |
| return this; // To allow concatenation of setAlg() functions |
| |
| } |
| |
| // Set the algorithm: algStageAllocator |
| public IterativePlanner setAlgStageAllocator(StageAllocator alg) { |
| |
| this.algStageAllocator = alg; |
| return this; // To allow concatenation of setAlg() functions |
| |
| } |
| |
| /** |
| * Helper class that provide a list of ReservationRequests and iterates |
| * forward or backward depending whether we are allocating left-to-right or |
| * right-to-left. |
| */ |
| public static class StageProvider { |
| |
| private final boolean allocateLeft; |
| |
| private final ListIterator<ReservationRequest> li; |
| |
| public StageProvider(boolean allocateLeft, |
| ReservationDefinition reservation) { |
| |
| this.allocateLeft = allocateLeft; |
| int startingIndex; |
| if (allocateLeft) { |
| startingIndex = 0; |
| } else { |
| startingIndex = |
| reservation.getReservationRequests().getReservationResources() |
| .size(); |
| } |
| // Get a reverse iterator for the set of stages |
| li = |
| reservation.getReservationRequests().getReservationResources() |
| .listIterator(startingIndex); |
| |
| } |
| |
| public boolean hasNext() { |
| if (allocateLeft) { |
| return li.hasNext(); |
| } else { |
| return li.hasPrevious(); |
| } |
| } |
| |
| public ReservationRequest next() { |
| if (allocateLeft) { |
| return li.next(); |
| } else { |
| return li.previous(); |
| } |
| } |
| |
| public int getCurrentIndex() { |
| if (allocateLeft) { |
| return li.nextIndex() - 1; |
| } else { |
| return li.previousIndex() + 1; |
| } |
| } |
| |
| } |
| |
| } |