blob: 95f1d4bc1de8f03ee7093a8ff2096b90cb775e0d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.IterativePlanner.StageProvider;
/**
* An implementation of {@link StageExecutionInterval}, which sets the execution
* interval of the stage. For ANY and ALL jobs, the interval is
* [jobArrival,jobDeadline]. For ORDER jobs, the the maximal possible time
* interval is divided as follows: First, each stage is guaranteed at least its
* requested duration. Then, the stage receives a fraction of the remaining
* time. The fraction is calculated as the ratio between the weight (total
* requested resources) of the stage and the total weight of all remaining
* stages.
*/
public class StageExecutionIntervalByDemand implements StageExecutionInterval {
private long step;
@Override
public ReservationInterval computeExecutionInterval(Plan plan,
ReservationDefinition reservation,
ReservationRequest currentReservationStage, boolean allocateLeft,
RLESparseResourceAllocation allocations) {
// Use StageExecutionIntervalUnconstrained to get the maximal interval
ReservationInterval maxInterval =
(new StageExecutionIntervalUnconstrained()).computeExecutionInterval(
plan, reservation, currentReservationStage, allocateLeft,
allocations);
ReservationRequestInterpreter jobType =
reservation.getReservationRequests().getInterpreter();
// For unconstrained jobs, such as ALL & ANY, we can use the unconstrained
// version
if ((jobType != ReservationRequestInterpreter.R_ORDER)
&& (jobType != ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
return maxInterval;
}
// For ORDER and ORDER_NO_GAP, take a sub-interval of maxInterval
step = plan.getStep();
double totalWeight = 0.0;
long totalDuration = 0;
// Iterate over the stages that haven't been allocated.
// For allocateLeft == True, we iterate in reverse order, starting from the
// last
// stage, until we reach the current stage.
// For allocateLeft == False, we do the opposite.
StageProvider stageProvider = new StageProvider(!allocateLeft, reservation);
while (stageProvider.hasNext()) {
ReservationRequest rr = stageProvider.next();
totalWeight += calcWeight(rr);
totalDuration += getRoundedDuration(rr, step);
// Stop once we reach current
if (rr == currentReservationStage) {
break;
}
}
// Compute the weight of the current stage as compared to remaining ones
double ratio = calcWeight(currentReservationStage) / totalWeight;
// Estimate an early start time, such that:
// 1. Every stage is guaranteed to receive at least its duration
// 2. The remainder of the window is divided between stages
// proportionally to its workload (total memory consumption)
long maxIntervalArrival = maxInterval.getStartTime();
long maxIntervalDeadline = maxInterval.getEndTime();
long window = maxIntervalDeadline - maxIntervalArrival;
long windowRemainder = window - totalDuration;
if (allocateLeft) {
long latestEnd =
(long) (maxIntervalArrival
+ getRoundedDuration(currentReservationStage, step)
+ (windowRemainder * ratio));
// Realign if necessary (since we did some arithmetic)
latestEnd = stepRoundDown(latestEnd, step);
// Return new interval
return new ReservationInterval(maxIntervalArrival, latestEnd);
} else {
long earlyStart =
(long) (maxIntervalDeadline
- getRoundedDuration(currentReservationStage, step)
- (windowRemainder * ratio));
// Realign if necessary (since we did some arithmetic)
earlyStart = stepRoundUp(earlyStart, step);
// Return new interval
return new ReservationInterval(earlyStart, maxIntervalDeadline);
}
}
// Weight = total memory consumption of stage
protected double calcWeight(ReservationRequest stage) {
return (stage.getDuration() * stage.getCapability().getMemorySize())
* (stage.getNumContainers());
}
protected long getRoundedDuration(ReservationRequest stage, Long s) {
return stepRoundUp(stage.getDuration(), s);
}
protected static long stepRoundDown(long t, long s) {
return (t / s) * s;
}
protected static long stepRoundUp(long t, long s) {
return ((t + s - 1) / s) * s;
}
}