blob: d4b4b9e010ee28dc648e369c45b2082a89950337 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.PeriodicRLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
* An abstract class that follows the general behavior of planning algorithms.
*/
public abstract class PlanningAlgorithm implements ReservationAgent {
/**
* Performs the actual allocation for a ReservationDefinition within a Plan.
*
* @param reservationId the identifier of the reservation
* @param user the user who owns the reservation
* @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources required by the user for his
* session
* @param oldReservation the existing reservation (null if none)
* @return whether the allocateUser function was successful or not
*
* @throws PlanningException if the session cannot be fitted into the plan
* @throws ContractValidationException if validation fails
*/
protected boolean allocateUser(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract,
ReservationAllocation oldReservation) throws PlanningException,
ContractValidationException {
// Adjust the ResourceDefinition to account for system "imperfections"
// (e.g., scheduling delays for large containers).
ReservationDefinition adjustedContract = adjustContract(plan, contract);
// Compute the job allocation
RLESparseResourceAllocation allocation =
computeJobAllocation(plan, reservationId, adjustedContract, user);
long period = Long.parseLong(contract.getRecurrenceExpression());
// Make allocation periodic if request is periodic
if (contract.getRecurrenceExpression() != null) {
if (period > 0) {
allocation =
new PeriodicRLESparseResourceAllocation(allocation, period);
}
}
// If no job allocation was found, fail
if (allocation == null) {
throw new PlanningException(
"The planning algorithm could not find a valid allocation"
+ " for your request");
}
// Translate the allocation to a map (with zero paddings)
long step = plan.getStep();
long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);
Map<ReservationInterval, Resource> mapAllocations =
allocationsToPaddedMap(allocation, jobArrival, jobDeadline, period);
// Create the reservation
ReservationAllocation capReservation =
new InMemoryReservationAllocation(reservationId, // ID
adjustedContract, // Contract
user, // User name
plan.getQueueName(), // Queue name
adjustedContract.getArrival(), adjustedContract.getDeadline(),
mapAllocations, // Allocations
plan.getResourceCalculator(), // Resource calculator
plan.getMinimumAllocation()); // Minimum allocation
// Add (or update) the reservation allocation
if (oldReservation != null) {
return plan.updateReservation(capReservation);
} else {
return plan.addReservation(capReservation, false);
}
}
private Map<ReservationInterval, Resource> allocationsToPaddedMap(
RLESparseResourceAllocation allocation, long jobArrival, long jobDeadline,
long period) {
// Zero allocation
Resource zeroResource = Resource.newInstance(0, 0);
if (period > 0) {
if ((jobDeadline - jobArrival) >= period) {
allocation.addInterval(new ReservationInterval(0L, period),
zeroResource);
}
jobArrival = jobArrival % period;
jobDeadline = jobDeadline % period;
if (jobArrival <= jobDeadline) {
allocation.addInterval(new ReservationInterval(0, jobArrival),
zeroResource);
allocation.addInterval(new ReservationInterval(jobDeadline, period),
zeroResource);
} else {
allocation.addInterval(new ReservationInterval(jobDeadline, jobArrival),
zeroResource);
}
} else {
// Pad at the beginning
long earliestStart = findEarliestTime(allocation.toIntervalMap());
if (jobArrival < earliestStart) {
allocation.addInterval(
new ReservationInterval(jobArrival, earliestStart), zeroResource);
}
// Pad at the beginning
long latestEnd = findLatestTime(allocation.toIntervalMap());
if (latestEnd < jobDeadline) {
allocation.addInterval(new ReservationInterval(latestEnd, jobDeadline),
zeroResource);
}
}
return allocation.toIntervalMap();
}
public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
ReservationId reservationId, ReservationDefinition reservation,
String user) throws PlanningException, ContractValidationException;
@Override
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
// Allocate
return allocateUser(reservationId, user, plan, contract, null);
}
@Override
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
// Get the old allocation
ReservationAllocation oldAlloc = plan.getReservationById(reservationId);
// Allocate (ignores the old allocation)
return allocateUser(reservationId, user, plan, contract, oldAlloc);
}
@Override
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException {
// Delete the existing reservation
return plan.deleteReservation(reservationId);
}
protected static long findEarliestTime(
Map<ReservationInterval, Resource> sesInt) {
long ret = Long.MAX_VALUE;
for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
if (s.getKey().getStartTime() < ret && s.getValue() != null) {
ret = s.getKey().getStartTime();
}
}
return ret;
}
protected static long findLatestTime(Map<ReservationInterval,
Resource> sesInt) {
long ret = Long.MIN_VALUE;
for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
if (s.getKey().getEndTime() > ret && s.getValue() != null) {
ret = s.getKey().getEndTime();
}
}
return ret;
}
protected static long stepRoundDown(long t, long step) {
return (t / step) * step;
}
protected static long stepRoundUp(long t, long step) {
return ((t + step - 1) / step) * step;
}
private ReservationDefinition adjustContract(Plan plan,
ReservationDefinition originalContract) {
// Place here adjustment. For example using QueueMetrics we can track
// large container delays per YARN-YARN-1990
return originalContract;
}
@Override
public void init(Configuration conf) {
}
}