blob: 49e07aac84a347a0dbbb0960153e6382f1772acd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class represents an in memory representation of the state of our
* reservation system, and provides accelerated access to both individual
* reservations and aggregate utilization of resources over time.
*/
public class InMemoryPlan implements Plan {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private final RMStateStore rmStateStore;
private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
private RLESparseResourceAllocation rleSparseVector;
private PeriodicRLESparseResourceAllocation periodicRle;
private Map<String, RLESparseResourceAllocation> userResourceAlloc =
new HashMap<String, RLESparseResourceAllocation>();
private Map<String, RLESparseResourceAllocation> userPeriodicResourceAlloc =
new HashMap<String, RLESparseResourceAllocation>();
private Map<String, RLESparseResourceAllocation> userActiveReservationCount =
new HashMap<String, RLESparseResourceAllocation>();
private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
new HashMap<ReservationId, InMemoryReservationAllocation>();
private final ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final SharingPolicy policy;
private final ReservationAgent agent;
private final long step;
private final ResourceCalculator resCalc;
private final Resource minAlloc, maxAlloc;
private final String queueName;
private final QueueMetrics queueMetrics;
private final Planner replanner;
private final boolean getMoveOnExpiry;
private final Clock clock;
private final long maxPeriodicity;
private Resource totalCapacity;
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry,
RMContext rmContext) {
this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
maxAlloc, queueName, replanner, getMoveOnExpiry,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
rmContext);
}
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry,
long maxPeriodicity, RMContext rmContext) {
this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
maxAlloc, queueName, replanner, getMoveOnExpiry, maxPeriodicity,
rmContext, new UTCClock());
}
@SuppressWarnings("checkstyle:parameternumber")
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry,
long maxPeriodicty, RMContext rmContext, Clock clock) {
this.queueMetrics = queueMetrics;
this.policy = policy;
this.agent = agent;
this.step = step;
this.totalCapacity = totalCapacity;
this.resCalc = resCalc;
this.minAlloc = minAlloc;
this.maxAlloc = maxAlloc;
this.rleSparseVector = new RLESparseResourceAllocation(resCalc);
this.maxPeriodicity = maxPeriodicty;
this.periodicRle =
new PeriodicRLESparseResourceAllocation(resCalc, this.maxPeriodicity);
this.queueName = queueName;
this.replanner = replanner;
this.getMoveOnExpiry = getMoveOnExpiry;
this.clock = clock;
this.rmStateStore = rmContext.getStateStore();
}
@Override
public QueueMetrics getQueueMetrics() {
return queueMetrics;
}
private RLESparseResourceAllocation getUserRLEResourceAllocation(String user,
long period) {
RLESparseResourceAllocation resAlloc = null;
if (period > 0) {
if (userPeriodicResourceAlloc.containsKey(user)) {
resAlloc = userPeriodicResourceAlloc.get(user);
} else {
resAlloc = new PeriodicRLESparseResourceAllocation(resCalc,
periodicRle.getTimePeriod());
userPeriodicResourceAlloc.put(user, resAlloc);
}
} else {
if (userResourceAlloc.containsKey(user)) {
resAlloc = userResourceAlloc.get(user);
} else {
resAlloc = new RLESparseResourceAllocation(resCalc);
userResourceAlloc.put(user, resAlloc);
}
}
return resAlloc;
}
private void gcUserRLEResourceAllocation(String user, long period) {
if (period > 0) {
if (userPeriodicResourceAlloc.get(user).isEmpty()) {
userPeriodicResourceAlloc.remove(user);
}
} else {
if (userResourceAlloc.get(user).isEmpty()) {
userResourceAlloc.remove(user);
}
}
}
private void incrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, Resource> allocationRequests =
reservation.getAllocationRequests();
// check if we have encountered the user earlier and if not add an entry
String user = reservation.getUser();
long period = reservation.getPeriodicity();
RLESparseResourceAllocation resAlloc =
getUserRLEResourceAllocation(user, period);
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
if (resCount == null) {
resCount = new RLESparseResourceAllocation(resCalc);
userActiveReservationCount.put(user, resCount);
}
long earliestActive = Long.MAX_VALUE;
long latestActive = Long.MIN_VALUE;
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
if (period > 0L) {
for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {
long rStart = r.getKey().getStartTime() + i * period;
long rEnd = r.getKey().getEndTime() + i * period;
// handle wrap-around
if (rEnd > periodicRle.getTimePeriod()) {
long diff = rEnd - periodicRle.getTimePeriod();
rEnd = periodicRle.getTimePeriod();
ReservationInterval newInterval = new ReservationInterval(0, diff);
periodicRle.addInterval(newInterval, r.getValue());
resAlloc.addInterval(newInterval, r.getValue());
}
ReservationInterval newInterval =
new ReservationInterval(rStart, rEnd);
periodicRle.addInterval(newInterval, r.getValue());
resAlloc.addInterval(newInterval, r.getValue());
}
} else {
rleSparseVector.addInterval(r.getKey(), r.getValue());
resAlloc.addInterval(r.getKey(), r.getValue());
if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
ZERO_RESOURCE)) {
earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
latestActive = Math.max(latestActive, r.getKey().getEndTime());
}
}
}
// periodic reservations are active from start time and good till cancelled
if (period > 0L) {
earliestActive = reservation.getStartTime();
latestActive = Long.MAX_VALUE;
}
resCount.addInterval(new ReservationInterval(earliestActive, latestActive),
Resource.newInstance(1, 1));
}
private void decrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, Resource> allocationRequests =
reservation.getAllocationRequests();
String user = reservation.getUser();
long period = reservation.getPeriodicity();
RLESparseResourceAllocation resAlloc =
getUserRLEResourceAllocation(user, period);
long earliestActive = Long.MAX_VALUE;
long latestActive = Long.MIN_VALUE;
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
if (period > 0L) {
for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) {
long rStart = r.getKey().getStartTime() + i * period;
long rEnd = r.getKey().getEndTime() + i * period;
// handle wrap-around
if (rEnd > periodicRle.getTimePeriod()) {
long diff = rEnd - periodicRle.getTimePeriod();
rEnd = periodicRle.getTimePeriod();
ReservationInterval newInterval = new ReservationInterval(0, diff);
periodicRle.removeInterval(newInterval, r.getValue());
resAlloc.removeInterval(newInterval, r.getValue());
}
ReservationInterval newInterval =
new ReservationInterval(rStart, rEnd);
periodicRle.removeInterval(newInterval, r.getValue());
resAlloc.removeInterval(newInterval, r.getValue());
}
} else {
rleSparseVector.removeInterval(r.getKey(), r.getValue());
resAlloc.removeInterval(r.getKey(), r.getValue());
if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
ZERO_RESOURCE)) {
earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
latestActive = Math.max(latestActive, r.getKey().getEndTime());
}
}
}
gcUserRLEResourceAllocation(user, period);
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
// periodic reservations are active from start time and good till cancelled
if (period > 0L) {
earliestActive = reservation.getStartTime();
latestActive = Long.MAX_VALUE;
}
resCount.removeInterval(
new ReservationInterval(earliestActive, latestActive),
Resource.newInstance(1, 1));
if (resCount.isEmpty()) {
userActiveReservationCount.remove(user);
}
}
public Set<ReservationAllocation> getAllReservations() {
readLock.lock();
try {
if (currentReservations != null) {
Set<ReservationAllocation> flattenedReservations =
new TreeSet<ReservationAllocation>();
for (Set<InMemoryReservationAllocation> res : currentReservations
.values()) {
flattenedReservations.addAll(res);
}
return flattenedReservations;
} else {
return null;
}
} finally {
readLock.unlock();
}
}
@Override
public boolean addReservation(ReservationAllocation reservation,
boolean isRecovering) throws PlanningException {
// Verify the allocation is memory based otherwise it is not supported
InMemoryReservationAllocation inMemReservation =
(InMemoryReservationAllocation) reservation;
if (inMemReservation.getUser() == null) {
String errMsg = "The specified Reservation with ID "
+ inMemReservation.getReservationId() + " is not mapped to any user";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
writeLock.lock();
try {
if (reservationTable.containsKey(inMemReservation.getReservationId())) {
String errMsg = "The specified Reservation with ID "
+ inMemReservation.getReservationId() + " already exists";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
// Validate if we can accept this reservation, throws exception if
// validation fails
if (!isRecovering) {
policy.validate(this, inMemReservation);
// we record here the time in which the allocation has been accepted
reservation.setAcceptanceTimestamp(clock.getTime());
if (rmStateStore != null) {
rmStateStore.storeNewReservation(
ReservationSystemUtil.buildStateProto(inMemReservation),
getQueueName(), inMemReservation.getReservationId().toString());
}
}
ReservationInterval searchInterval = new ReservationInterval(
inMemReservation.getStartTime(), inMemReservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations == null) {
reservations = new HashSet<InMemoryReservationAllocation>();
}
if (!reservations.add(inMemReservation)) {
LOG.error("Unable to add reservation: {} to plan.",
inMemReservation.getReservationId());
return false;
}
currentReservations.put(searchInterval, reservations);
reservationTable.put(inMemReservation.getReservationId(),
inMemReservation);
incrementAllocation(inMemReservation);
LOG.info("Successfully added reservation: {} to plan.",
inMemReservation.getReservationId());
return true;
} finally {
writeLock.unlock();
}
}
@Override
public boolean updateReservation(ReservationAllocation reservation)
throws PlanningException {
writeLock.lock();
boolean result = false;
try {
ReservationId resId = reservation.getReservationId();
ReservationAllocation currReservation = getReservationById(resId);
if (currReservation == null) {
String errMsg = "The specified Reservation with ID " + resId
+ " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
// validate if we can accept this reservation, throws exception if
// validation fails
policy.validate(this, reservation);
if (!removeReservation(currReservation)) {
LOG.error("Unable to replace reservation: {} from plan.",
reservation.getReservationId());
return result;
}
try {
result = addReservation(reservation, false);
} catch (PlanningException e) {
LOG.error("Unable to update reservation: {} from plan due to {}.",
reservation.getReservationId(), e.getMessage());
}
if (result) {
LOG.info("Successfully updated reservation: {} in plan.",
reservation.getReservationId());
return result;
} else {
// rollback delete
addReservation(currReservation, false);
LOG.info("Rollbacked update reservation: {} from plan.",
reservation.getReservationId());
return result;
}
} finally {
writeLock.unlock();
}
}
private boolean removeReservation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
ReservationInterval searchInterval = new ReservationInterval(
reservation.getStartTime(), reservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations != null) {
if (rmStateStore != null) {
rmStateStore.removeReservation(getQueueName(),
reservation.getReservationId().toString());
}
if (!reservations.remove(reservation)) {
LOG.error("Unable to remove reservation: {} from plan.",
reservation.getReservationId());
return false;
}
if (reservations.isEmpty()) {
currentReservations.remove(searchInterval);
}
} else {
String errMsg = "The specified Reservation with ID "
+ reservation.getReservationId() + " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
reservationTable.remove(reservation.getReservationId());
decrementAllocation(reservation);
LOG.info("Sucessfully deleted reservation: {} in plan.",
reservation.getReservationId());
return true;
}
@Override
public boolean deleteReservation(ReservationId reservationID) {
writeLock.lock();
try {
ReservationAllocation reservation = getReservationById(reservationID);
if (reservation == null) {
String errMsg = "The specified Reservation with ID " + reservationID
+ " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
return removeReservation(reservation);
} finally {
writeLock.unlock();
}
}
@Override
public void archiveCompletedReservations(long tick) {
// Since we are looking for old reservations, read lock is optimal
LOG.debug("Running archival at time: {}", tick);
List<InMemoryReservationAllocation> expiredReservations =
new ArrayList<InMemoryReservationAllocation>();
readLock.lock();
// archive reservations and delete the ones which are beyond
// the reservation policy "window"
try {
long archivalTime = tick - policy.getValidWindow();
ReservationInterval searchInterval =
new ReservationInterval(archivalTime, archivalTime);
SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
currentReservations.headMap(searchInterval, true);
if (!reservations.isEmpty()) {
for (Set<InMemoryReservationAllocation> reservationEntries : reservations
.values()) {
for (InMemoryReservationAllocation reservation : reservationEntries) {
if (reservation.getEndTime() <= archivalTime) {
expiredReservations.add(reservation);
}
}
}
}
} finally {
readLock.unlock();
}
if (expiredReservations.isEmpty()) {
return;
}
// Need write lock only if there are any reservations to be deleted
writeLock.lock();
try {
for (InMemoryReservationAllocation expiredReservation : expiredReservations) {
removeReservation(expiredReservation);
}
} finally {
writeLock.unlock();
}
}
@Override
public Set<ReservationAllocation> getReservationsAtTime(long tick) {
return getReservations(null, new ReservationInterval(tick, tick), "");
}
@Override
public long getStep() {
return step;
}
@Override
public SharingPolicy getSharingPolicy() {
return policy;
}
@Override
public ReservationAgent getReservationAgent() {
return agent;
}
@Override
public RLESparseResourceAllocation getReservationCountForUserOverTime(
String user, long start, long end) {
readLock.lock();
try {
RLESparseResourceAllocation userResAlloc =
userActiveReservationCount.get(user);
if (userResAlloc != null) {
return userResAlloc.getRangeOverlapping(start, end);
} else {
return new RLESparseResourceAllocation(resCalc);
}
} finally {
readLock.unlock();
}
}
@Override
public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
long start, long end) {
readLock.lock();
try {
// merge periodic and non-periodic allocations
RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
RLESparseResourceAllocation userPeriodicResAlloc =
userPeriodicResourceAlloc.get(user);
if (userResAlloc != null && userPeriodicResAlloc != null) {
return RLESparseResourceAllocation.merge(resCalc, totalCapacity,
userResAlloc, userPeriodicResAlloc, RLEOperator.add, start, end);
}
if (userResAlloc != null) {
return userResAlloc.getRangeOverlapping(start, end);
}
if (userPeriodicResAlloc != null) {
return userPeriodicResAlloc.getRangeOverlapping(start, end);
}
} catch (PlanningException e) {
LOG.warn("Exception while trying to merge periodic"
+ " and non-periodic user allocations: {}", e.getMessage(), e);
} finally {
readLock.unlock();
}
return new RLESparseResourceAllocation(resCalc);
}
@Override
public Resource getTotalCommittedResources(long t) {
readLock.lock();
try {
return Resources.add(rleSparseVector.getCapacityAtTime(t),
periodicRle.getCapacityAtTime(t));
} finally {
readLock.unlock();
}
}
@Override
public Set<ReservationAllocation> getReservations(ReservationId reservationID,
ReservationInterval interval) {
return getReservations(reservationID, interval, null);
}
@Override
public Set<ReservationAllocation> getReservations(ReservationId reservationID,
ReservationInterval interval, String user) {
if (reservationID != null) {
ReservationAllocation allocation = getReservationById(reservationID);
if (allocation == null) {
return Collections.emptySet();
}
return Collections.singleton(allocation);
}
long startTime = interval == null ? 0 : interval.getStartTime();
long endTime = interval == null ? Long.MAX_VALUE : interval.getEndTime();
ReservationInterval searchInterval =
new ReservationInterval(endTime, Long.MAX_VALUE);
readLock.lock();
try {
SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> res =
currentReservations.headMap(searchInterval, true);
if (!res.isEmpty()) {
Set<ReservationAllocation> flattenedReservations = new HashSet<>();
for (Set<InMemoryReservationAllocation> resEntries : res.values()) {
for (InMemoryReservationAllocation reservation : resEntries) {
// validate user
if (user != null && !user.isEmpty()
&& !reservation.getUser().equals(user)) {
continue;
}
// handle periodic reservations
long period = reservation.getPeriodicity();
if (period > 0) {
// The shift is used to remove the wrap around for the
// reservation interval. The wrap around will still
// exist for the search interval.
long shift = reservation.getStartTime() % period;
// This is the duration of the reservation since
// duration < period.
long periodicReservationEnd =
(reservation.getEndTime() -shift) % period;
long periodicSearchStart = (startTime - shift) % period;
long periodicSearchEnd = (endTime - shift) % period;
long searchDuration = endTime - startTime;
// 1. If the searchDuration is greater than the period, then
// the reservation is within the interval. This will allow
// us to ignore cases where search end > search start >
// reservation end.
// 2/3. If the search end is less than the reservation end, or if
// the search start is less than the reservation end, then the
// reservation will be in the reservation since
// periodic reservation start is always zero. Note that neither
// of those values will ever be negative.
// 4. If the search end is less than the search start, then
// there is a wrap around, and both values are implicitly
// greater than the reservation end because of condition 2/3,
// so the reservation is within the search interval.
if (searchDuration > period
|| periodicSearchEnd < periodicReservationEnd
|| periodicSearchStart < periodicReservationEnd
|| periodicSearchStart > periodicSearchEnd) {
flattenedReservations.add(reservation);
}
} else {
// check for non-periodic reservations
if (reservation.getEndTime() > startTime) {
flattenedReservations.add(reservation);
}
}
}
}
return Collections.unmodifiableSet(flattenedReservations);
} else {
return Collections.emptySet();
}
} finally {
readLock.unlock();
}
}
@Override
public ReservationAllocation getReservationById(ReservationId reservationID) {
if (reservationID == null) {
return null;
}
readLock.lock();
try {
return reservationTable.get(reservationID);
} finally {
readLock.unlock();
}
}
@Override
public Resource getTotalCapacity() {
readLock.lock();
try {
return Resources.clone(totalCapacity);
} finally {
readLock.unlock();
}
}
@Override
public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
ReservationId oldId, long start, long end, long period)
throws PlanningException {
readLock.lock();
try {
// for non-periodic return simple available resources
if (period == 0) {
// create RLE of totCapacity
TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
totAvailable.put(start, Resources.clone(totalCapacity));
RLESparseResourceAllocation totRLEAvail =
new RLESparseResourceAllocation(totAvailable, resCalc);
// subtract used from available
RLESparseResourceAllocation netAvailable;
netAvailable = RLESparseResourceAllocation.merge(resCalc,
Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
RLEOperator.subtractTestNonNegative, start, end);
// remove periodic component
netAvailable = RLESparseResourceAllocation.merge(resCalc,
Resources.clone(totalCapacity), netAvailable, periodicRle,
RLEOperator.subtractTestNonNegative, start, end);
// add back in old reservation used resources if any
ReservationAllocation old = reservationTable.get(oldId);
if (old != null) {
RLESparseResourceAllocation addBackPrevious =
old.getResourcesOverTime(start, end);
netAvailable = RLESparseResourceAllocation.merge(resCalc,
Resources.clone(totalCapacity), netAvailable, addBackPrevious,
RLEOperator.add, start, end);
}
// lower it if this is needed by the sharing policy
netAvailable = getSharingPolicy().availableResources(netAvailable, this,
user, oldId, start, end);
return netAvailable;
} else {
if (periodicRle.getTimePeriod() % period != 0) {
throw new PlanningException("The reservation periodicity (" + period
+ ") must be" + " an exact divider of the system maxPeriod ("
+ periodicRle.getTimePeriod() + ")");
}
if (period < (end - start)) {
throw new PlanningException(
"Invalid input: (end - start) = (" + end + " - " + start + ") = "
+ (end - start) + " > period = " + period);
}
// find the minimum resources available among all the instances that fit
// in the LCM
long numInstInLCM = periodicRle.getTimePeriod() / period;
RLESparseResourceAllocation minOverLCM =
getAvailableResourceOverTime(user, oldId, start, end, 0);
for (int i = 1; i < numInstInLCM; i++) {
long rStart = start + i * period;
long rEnd = end + i * period;
// recursive invocation of non-periodic range (to pick raw-info)
RLESparseResourceAllocation snapShot =
getAvailableResourceOverTime(user, oldId, rStart, rEnd, 0);
// time-align on start
snapShot.shift(-(i * period));
// pick the minimum amount of resources in each time interval
minOverLCM =
RLESparseResourceAllocation.merge(resCalc, getTotalCapacity(),
minOverLCM, snapShot, RLEOperator.min, start, end);
}
return minOverLCM;
}
} finally {
readLock.unlock();
}
}
@Override
public Resource getMinimumAllocation() {
return Resources.clone(minAlloc);
}
@Override
public void setTotalCapacity(Resource cap) {
writeLock.lock();
try {
totalCapacity = Resources.clone(cap);
} finally {
writeLock.unlock();
}
}
public long getEarliestStartTime() {
readLock.lock();
try {
return rleSparseVector.getEarliestStartTime();
} finally {
readLock.unlock();
}
}
@Override
public long getLastEndTime() {
readLock.lock();
try {
return rleSparseVector.getLatestNonNullTime();
} finally {
readLock.unlock();
}
}
@Override
public ResourceCalculator getResourceCalculator() {
return resCalc;
}
@Override
public String getQueueName() {
return queueName;
}
@Override
public Resource getMaximumAllocation() {
return Resources.clone(maxAlloc);
}
@Override
public long getMaximumPeriodicity() {
return this.maxPeriodicity;
}
public String toCumulativeString() {
readLock.lock();
try {
return rleSparseVector.toString() + "\n" + periodicRle.toString();
} finally {
readLock.unlock();
}
}
@Override
public Planner getReplanner() {
return replanner;
}
@Override
public boolean getMoveOnExpiry() {
return getMoveOnExpiry;
}
@Override
public String toString() {
readLock.lock();
try {
StringBuffer planStr = new StringBuffer("In-memory Plan: ");
planStr.append("Parent Queue: ").append(queueName)
.append("Total Capacity: ").append(totalCapacity).append("Step: ")
.append(step);
for (ReservationAllocation reservation : getAllReservations()) {
planStr.append(reservation);
}
return planStr.toString();
} finally {
readLock.unlock();
}
}
@Override
public Set<ReservationAllocation> getReservationByUserAtTime(String user,
long t) {
readLock.lock();
try {
Set<ReservationAllocation> resSet = new HashSet<ReservationAllocation>();
for (ReservationAllocation ra : getReservationsAtTime(t)) {
String resUser = ra.getUser();
if (resUser != null && resUser.equals(user)) {
resSet.add(ra);
}
}
return resSet;
} finally {
readLock.unlock();
}
}
@Override
public RLESparseResourceAllocation getCumulativeLoadOverTime(long start,
long end) throws PlanningException {
readLock.lock();
try {
RLESparseResourceAllocation ret =
rleSparseVector.getRangeOverlapping(start, end);
ret = RLESparseResourceAllocation.merge(resCalc, totalCapacity, ret,
periodicRle.getRangeOverlapping(start, end), RLEOperator.add, start,
end);
return ret;
} finally {
readLock.unlock();
}
}
}