blob: 267e6735dee61dadfdd6d35ca51e1fdc8ddae8c2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Computes the stage allocation according to the greedy allocation rule. The
* greedy rule repeatedly allocates requested containers at the rightmost
* (latest) free interval.
*/
public class StageAllocatorGreedy implements StageAllocator {
@Override
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, long period, String user,
ReservationId oldId) throws PlanningException {
Resource totalCapacity = plan.getTotalCapacity();
Map<ReservationInterval, Resource> allocationRequests =
new HashMap<ReservationInterval, Resource>();
// compute the gang as a resource and get the duration
Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
long dur = rr.getDuration();
long step = plan.getStep();
// ceil the duration to the next multiple of the plan step
if (dur % step != 0) {
dur += (step - (dur % step));
}
// we know for sure that this division has no remainder (part of contract
// with user, validate before
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
int maxGang = 0;
RLESparseResourceAllocation netAvailable =
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
stageDeadline, period);
netAvailable =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
plan.getTotalCapacity(), netAvailable, planModifications,
RLEOperator.subtract, stageEarliestStart, stageDeadline);
// loop trying to place until we are done, or we are considering
// an invalid range of times
while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
// as we run along we remember how many gangs we can fit, and what
// was the most constraining moment in time (we will restart just
// after that to place the next batch)
maxGang = gangsToPlace;
long minPoint = stageDeadline;
int curMaxGang = maxGang;
// start placing at deadline (excluded due to [,) interval semantics and
// move backward
for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
&& maxGang > 0; t = t - plan.getStep()) {
Resource netAvailableRes = netAvailable.getCapacityAtTime(t);
// compute maximum number of gangs we could fit
curMaxGang =
(int) Math.floor(Resources.divide(plan.getResourceCalculator(),
totalCapacity, netAvailableRes, gang));
// pick the minimum between available resources in this instant, and how
// many gangs we have to place
curMaxGang = Math.min(gangsToPlace, curMaxGang);
// compare with previous max, and set it. also remember *where* we found
// the minimum (useful for next attempts)
if (curMaxGang <= maxGang) {
maxGang = curMaxGang;
minPoint = t;
}
}
// if we were able to place any gang, record this, and decrement
// gangsToPlace
if (maxGang > 0) {
gangsToPlace -= maxGang;
ReservationInterval reservationInt =
new ReservationInterval(stageDeadline - dur, stageDeadline);
Resource reservationRes =
Resources.multiply(rr.getCapability(), rr.getConcurrency()
* maxGang);
// remember occupied space (plan is read-only till we find a plausible
// allocation for the entire request). This is needed since we might be
// placing other ReservationRequest within the same
// ReservationDefinition,
// and we must avoid double-counting the available resources
planModifications.addInterval(reservationInt, reservationRes);
allocationRequests.put(reservationInt, reservationRes);
}
// reset our new starting point (curDeadline) to the most constraining
// point so far, we will look "left" of that to find more places where
// to schedule gangs (for sure nothing on the "right" of this point can
// fit a full gang.
stageDeadline = minPoint;
}
// if no gangs are left to place we succeed and return the allocation
if (gangsToPlace == 0) {
return allocationRequests;
} else {
// If we are here is becasue we did not manage to satisfy this request.
// So we need to remove unwanted side-effect from tempAssigned (needed
// for ANY).
for (Map.Entry<ReservationInterval, Resource> tempAllocation
: allocationRequests.entrySet()) {
planModifications.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue());
}
// and return null to signal failure in this allocation
return null;
}
}
}