blob: 5a61b9455473f3b60f04a66540d489e4d9dbcc63 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This Agent employs a simple greedy placement strategy, placing the various
* stages of a {@link ReservationRequest} from the deadline moving backward
* towards the arrival. This allows jobs with earlier deadline to be scheduled
* greedily as well. Combined with an opportunistic anticipation of work if the
* cluster is not fully utilized also seems to provide good latency for
* best-effort jobs (i.e., jobs running without a reservation).
*
* This agent does not account for locality and only consider container
* granularity for validation purposes (i.e., you can't exceed max-container
* size).
*/
public class GreedyReservationAgent implements ReservationAgent {
private static final Logger LOG = LoggerFactory
.getLogger(GreedyReservationAgent.class);
@Override
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
return computeAllocation(reservationId, user, plan, contract, null);
}
@Override
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
return computeAllocation(reservationId, user, plan, contract,
plan.getReservationById(reservationId));
}
@Override
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException {
return plan.deleteReservation(reservationId);
}
private boolean computeAllocation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract,
ReservationAllocation oldReservation) throws PlanningException,
ContractValidationException {
LOG.info("placing the following ReservationRequest: " + contract);
Resource totalCapacity = plan.getTotalCapacity();
// Here we can addd logic to adjust the ResourceDefinition to account for
// system "imperfections" (e.g., scheduling delays for large containers).
// Align with plan step conservatively (i.e., ceil arrival, and floor
// deadline)
long earliestStart = contract.getArrival();
long step = plan.getStep();
if (earliestStart % step != 0) {
earliestStart = earliestStart + (step - (earliestStart % step));
}
long deadline =
contract.getDeadline() - contract.getDeadline() % plan.getStep();
// setup temporary variables to handle time-relations between stages and
// intermediate answers
long curDeadline = deadline;
long oldDeadline = -1;
Map<ReservationInterval, ReservationRequest> allocations =
new HashMap<ReservationInterval, ReservationRequest>();
RLESparseResourceAllocation tempAssigned =
new RLESparseResourceAllocation(plan.getResourceCalculator(),
plan.getMinimumAllocation());
List<ReservationRequest> stages = contract.getReservationRequests()
.getReservationResources();
ReservationRequestInterpreter type = contract.getReservationRequests()
.getInterpreter();
// Iterate the stages in backward from deadline
for (ListIterator<ReservationRequest> li =
stages.listIterator(stages.size()); li.hasPrevious();) {
ReservationRequest currentReservationStage = li.previous();
// validate the RR respect basic constraints
validateInput(plan, currentReservationStage, totalCapacity);
// run allocation for a single stage
Map<ReservationInterval, ReservationRequest> curAlloc =
placeSingleStage(plan, tempAssigned, currentReservationStage,
earliestStart, curDeadline, oldReservation, totalCapacity);
if (curAlloc == null) {
// if we did not find an allocation for the currentReservationStage
// return null, unless the ReservationDefinition we are placing is of
// type ANY
if (type != ReservationRequestInterpreter.R_ANY) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
} else {
continue;
}
} else {
// if we did find an allocation add it to the set of allocations
allocations.putAll(curAlloc);
// if this request is of type ANY we are done searching (greedy)
// and can return the current allocation (break-out of the search)
if (type == ReservationRequestInterpreter.R_ANY) {
break;
}
// if the request is of ORDER or ORDER_NO_GAP we constraint the next
// round of allocation to precede the current allocation, by setting
// curDeadline
if (type == ReservationRequestInterpreter.R_ORDER
|| type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
curDeadline = findEarliestTime(curAlloc.keySet());
// for ORDER_NO_GAP verify that the allocation found so far has no
// gap, return null otherwise (the greedy procedure failed to find a
// no-gap
// allocation)
if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
&& oldDeadline > 0) {
if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
.getStep()) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
}
}
// keep the variable oldDeadline pointing to the last deadline we
// found
oldDeadline = curDeadline;
}
}
}
// / If we got here is because we failed to find an allocation for the
// ReservationDefinition give-up and report failure to the user
if (allocations.isEmpty()) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
}
// create reservation with above allocations if not null/empty
ReservationRequest ZERO_RES =
ReservationRequest.newInstance(Resource.newInstance(0, 0), 0);
long firstStartTime = findEarliestTime(allocations.keySet());
// add zero-padding from arrival up to the first non-null allocation
// to guarantee that the reservation exists starting at arrival
if (firstStartTime > earliestStart) {
allocations.put(new ReservationInterval(earliestStart,
firstStartTime), ZERO_RES);
firstStartTime = earliestStart;
// consider to add trailing zeros at the end for simmetry
}
// Actually add/update the reservation in the plan.
// This is subject to validation as other agents might be placing
// in parallel and there might be sharing policies the agent is not
// aware off.
ReservationAllocation capReservation =
new InMemoryReservationAllocation(reservationId, contract, user,
plan.getQueueName(), firstStartTime,
findLatestTime(allocations.keySet()), allocations,
plan.getResourceCalculator(), plan.getMinimumAllocation());
if (oldReservation != null) {
return plan.updateReservation(capReservation);
} else {
return plan.addReservation(capReservation);
}
}
private void validateInput(Plan plan, ReservationRequest rr,
Resource totalCapacity) throws ContractValidationException {
if (rr.getConcurrency() < 1) {
throw new ContractValidationException("Gang Size should be >= 1");
}
if (rr.getNumContainers() <= 0) {
throw new ContractValidationException("Num containers should be >= 0");
}
// check that gangSize and numContainers are compatible
if (rr.getNumContainers() % rr.getConcurrency() != 0) {
throw new ContractValidationException(
"Parallelism must be an exact multiple of gang size");
}
// check that the largest container request does not exceed
// the cluster-wide limit for container sizes
if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
rr.getCapability(), plan.getMaximumAllocation())) {
throw new ContractValidationException("Individual"
+ " capability requests should not exceed cluster's maxAlloc");
}
}
/**
* This method actually perform the placement of an atomic stage of the
* reservation. The key idea is to traverse the plan backward for a
* "lease-duration" worth of time, and compute what is the maximum multiple of
* our concurrency (gang) parameter we can fit. We do this and move towards
* previous instant in time until the time-window is exhausted or we placed
* all the user request.
*/
private Map<ReservationInterval, ReservationRequest> placeSingleStage(
Plan plan, RLESparseResourceAllocation tempAssigned,
ReservationRequest rr, long earliestStart, long curDeadline,
ReservationAllocation oldResAllocation, final Resource totalCapacity) {
Map<ReservationInterval, ReservationRequest> allocationRequests =
new HashMap<ReservationInterval, ReservationRequest>();
// compute the gang as a resource and get the duration
Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
long dur = rr.getDuration();
long step = plan.getStep();
// ceil the duration to the next multiple of the plan step
if (dur % step != 0) {
dur += (step - (dur % step));
}
// we know for sure that this division has no remainder (part of contract
// with user, validate before
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
int maxGang = 0;
// loop trying to place until we are done, or we are considering
// an invalid range of times
while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
// as we run along we remember how many gangs we can fit, and what
// was the most constraining moment in time (we will restart just
// after that to place the next batch)
maxGang = gangsToPlace;
long minPoint = curDeadline;
int curMaxGang = maxGang;
// start placing at deadline (excluded due to [,) interval semantics and
// move backward
for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
&& maxGang > 0; t = t - plan.getStep()) {
// As we run along we will logically remove the previous allocation for
// this reservation
// if one existed
Resource oldResCap = Resource.newInstance(0, 0);
if (oldResAllocation != null) {
oldResCap = oldResAllocation.getResourcesAtTime(t);
}
// compute net available resources
Resource netAvailableRes = Resources.clone(totalCapacity);
Resources.addTo(netAvailableRes, oldResCap);
Resources.subtractFrom(netAvailableRes,
plan.getTotalCommittedResources(t));
Resources.subtractFrom(netAvailableRes,
tempAssigned.getCapacityAtTime(t));
// compute maximum number of gangs we could fit
curMaxGang =
(int) Math.floor(Resources.divide(plan.getResourceCalculator(),
totalCapacity, netAvailableRes, gang));
// pick the minimum between available resources in this instant, and how
// many gangs we have to place
curMaxGang = Math.min(gangsToPlace, curMaxGang);
// compare with previous max, and set it. also remember *where* we found
// the minimum (useful for next attempts)
if (curMaxGang <= maxGang) {
maxGang = curMaxGang;
minPoint = t;
}
}
// if we were able to place any gang, record this, and decrement
// gangsToPlace
if (maxGang > 0) {
gangsToPlace -= maxGang;
ReservationInterval reservationInt =
new ReservationInterval(curDeadline - dur, curDeadline);
ReservationRequest reservationRes =
ReservationRequest.newInstance(rr.getCapability(),
rr.getConcurrency() * maxGang, rr.getConcurrency(),
rr.getDuration());
// remember occupied space (plan is read-only till we find a plausible
// allocation for the entire request). This is needed since we might be
// placing other ReservationRequest within the same
// ReservationDefinition,
// and we must avoid double-counting the available resources
tempAssigned.addInterval(reservationInt, reservationRes);
allocationRequests.put(reservationInt, reservationRes);
}
// reset our new starting point (curDeadline) to the most constraining
// point so far, we will look "left" of that to find more places where
// to schedule gangs (for sure nothing on the "right" of this point can
// fit a full gang.
curDeadline = minPoint;
}
// if no gangs are left to place we succeed and return the allocation
if (gangsToPlace == 0) {
return allocationRequests;
} else {
// If we are here is becasue we did not manage to satisfy this request.
// So we need to remove unwanted side-effect from tempAssigned (needed
// for ANY).
for (Map.Entry<ReservationInterval, ReservationRequest> tempAllocation :
allocationRequests.entrySet()) {
tempAssigned.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue());
}
// and return null to signal failure in this allocation
return null;
}
}
// finds the leftmost point of this set of ReservationInterval
private long findEarliestTime(Set<ReservationInterval> resInt) {
long ret = Long.MAX_VALUE;
for (ReservationInterval s : resInt) {
if (s.getStartTime() < ret) {
ret = s.getStartTime();
}
}
return ret;
}
// finds the rightmost point of this set of ReservationIntervals
private long findLatestTime(Set<ReservationInterval> resInt) {
long ret = Long.MIN_VALUE;
for (ReservationInterval s : resInt) {
if (s.getEndTime() > ret) {
ret = s.getEndTime();
}
}
return ret;
}
}