| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.reservation; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.UTCClock; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class represents an in memory representation of the state of our |
| * reservation system, and provides accelerated access to both individual |
| * reservations and aggregate utilization of resources over time. |
| */ |
| public class InMemoryPlan implements Plan { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class); |
| |
| private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); |
| private final RMStateStore rmStateStore; |
| |
| private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations = |
| new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>(); |
| |
| private RLESparseResourceAllocation rleSparseVector; |
| |
| private PeriodicRLESparseResourceAllocation periodicRle; |
| |
| private Map<String, RLESparseResourceAllocation> userResourceAlloc = |
| new HashMap<String, RLESparseResourceAllocation>(); |
| |
| private Map<String, RLESparseResourceAllocation> userPeriodicResourceAlloc = |
| new HashMap<String, RLESparseResourceAllocation>(); |
| |
| private Map<String, RLESparseResourceAllocation> userActiveReservationCount = |
| new HashMap<String, RLESparseResourceAllocation>(); |
| |
| private Map<ReservationId, InMemoryReservationAllocation> reservationTable = |
| new HashMap<ReservationId, InMemoryReservationAllocation>(); |
| |
| private final ReentrantReadWriteLock readWriteLock = |
| new ReentrantReadWriteLock(); |
| private final Lock readLock = readWriteLock.readLock(); |
| private final Lock writeLock = readWriteLock.writeLock(); |
| private final SharingPolicy policy; |
| private final ReservationAgent agent; |
| private final long step; |
| private final ResourceCalculator resCalc; |
| private final Resource minAlloc, maxAlloc; |
| private final String queueName; |
| private final QueueMetrics queueMetrics; |
| private final Planner replanner; |
| private final boolean getMoveOnExpiry; |
| private final Clock clock; |
| private final long maxPeriodicity; |
| |
| private Resource totalCapacity; |
| |
| public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, |
| ReservationAgent agent, Resource totalCapacity, long step, |
| ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, |
| String queueName, Planner replanner, boolean getMoveOnExpiry, |
| RMContext rmContext) { |
| this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc, |
| maxAlloc, queueName, replanner, getMoveOnExpiry, |
| YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY, |
| rmContext); |
| } |
| |
| public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, |
| ReservationAgent agent, Resource totalCapacity, long step, |
| ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, |
| String queueName, Planner replanner, boolean getMoveOnExpiry, |
| long maxPeriodicity, RMContext rmContext) { |
| this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc, |
| maxAlloc, queueName, replanner, getMoveOnExpiry, maxPeriodicity, |
| rmContext, new UTCClock()); |
| } |
| |
| @SuppressWarnings("checkstyle:parameternumber") |
| public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, |
| ReservationAgent agent, Resource totalCapacity, long step, |
| ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, |
| String queueName, Planner replanner, boolean getMoveOnExpiry, |
| long maxPeriodicty, RMContext rmContext, Clock clock) { |
| this.queueMetrics = queueMetrics; |
| this.policy = policy; |
| this.agent = agent; |
| this.step = step; |
| this.totalCapacity = totalCapacity; |
| this.resCalc = resCalc; |
| this.minAlloc = minAlloc; |
| this.maxAlloc = maxAlloc; |
| this.rleSparseVector = new RLESparseResourceAllocation(resCalc); |
| this.maxPeriodicity = maxPeriodicty; |
| this.periodicRle = |
| new PeriodicRLESparseResourceAllocation(resCalc, this.maxPeriodicity); |
| this.queueName = queueName; |
| this.replanner = replanner; |
| this.getMoveOnExpiry = getMoveOnExpiry; |
| this.clock = clock; |
| this.rmStateStore = rmContext.getStateStore(); |
| } |
| |
| @Override |
| public QueueMetrics getQueueMetrics() { |
| return queueMetrics; |
| } |
| |
| private RLESparseResourceAllocation getUserRLEResourceAllocation(String user, |
| long period) { |
| RLESparseResourceAllocation resAlloc = null; |
| if (period > 0) { |
| if (userPeriodicResourceAlloc.containsKey(user)) { |
| resAlloc = userPeriodicResourceAlloc.get(user); |
| } else { |
| resAlloc = new PeriodicRLESparseResourceAllocation(resCalc, |
| periodicRle.getTimePeriod()); |
| userPeriodicResourceAlloc.put(user, resAlloc); |
| } |
| } else { |
| if (userResourceAlloc.containsKey(user)) { |
| resAlloc = userResourceAlloc.get(user); |
| } else { |
| resAlloc = new RLESparseResourceAllocation(resCalc); |
| userResourceAlloc.put(user, resAlloc); |
| } |
| } |
| return resAlloc; |
| } |
| |
| private void gcUserRLEResourceAllocation(String user, long period) { |
| if (period > 0) { |
| if (userPeriodicResourceAlloc.get(user).isEmpty()) { |
| userPeriodicResourceAlloc.remove(user); |
| } |
| } else { |
| if (userResourceAlloc.get(user).isEmpty()) { |
| userResourceAlloc.remove(user); |
| } |
| } |
| } |
| |
| private void incrementAllocation(ReservationAllocation reservation) { |
| assert (readWriteLock.isWriteLockedByCurrentThread()); |
| Map<ReservationInterval, Resource> allocationRequests = |
| reservation.getAllocationRequests(); |
| // check if we have encountered the user earlier and if not add an entry |
| String user = reservation.getUser(); |
| long period = reservation.getPeriodicity(); |
| RLESparseResourceAllocation resAlloc = |
| getUserRLEResourceAllocation(user, period); |
| |
| RLESparseResourceAllocation resCount = userActiveReservationCount.get(user); |
| if (resCount == null) { |
| resCount = new RLESparseResourceAllocation(resCalc); |
| userActiveReservationCount.put(user, resCount); |
| } |
| |
| long earliestActive = Long.MAX_VALUE; |
| long latestActive = Long.MIN_VALUE; |
| |
| for (Map.Entry<ReservationInterval, Resource> r : allocationRequests |
| .entrySet()) { |
| |
| if (period > 0L) { |
| for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) { |
| |
| long rStart = r.getKey().getStartTime() + i * period; |
| long rEnd = r.getKey().getEndTime() + i * period; |
| |
| // handle wrap-around |
| if (rEnd > periodicRle.getTimePeriod()) { |
| long diff = rEnd - periodicRle.getTimePeriod(); |
| rEnd = periodicRle.getTimePeriod(); |
| ReservationInterval newInterval = new ReservationInterval(0, diff); |
| periodicRle.addInterval(newInterval, r.getValue()); |
| resAlloc.addInterval(newInterval, r.getValue()); |
| } |
| |
| ReservationInterval newInterval = |
| new ReservationInterval(rStart, rEnd); |
| periodicRle.addInterval(newInterval, r.getValue()); |
| resAlloc.addInterval(newInterval, r.getValue()); |
| } |
| |
| } else { |
| rleSparseVector.addInterval(r.getKey(), r.getValue()); |
| resAlloc.addInterval(r.getKey(), r.getValue()); |
| if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), |
| ZERO_RESOURCE)) { |
| earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); |
| latestActive = Math.max(latestActive, r.getKey().getEndTime()); |
| } |
| } |
| } |
| // periodic reservations are active from start time and good till cancelled |
| if (period > 0L) { |
| earliestActive = reservation.getStartTime(); |
| latestActive = Long.MAX_VALUE; |
| } |
| resCount.addInterval(new ReservationInterval(earliestActive, latestActive), |
| Resource.newInstance(1, 1)); |
| } |
| |
| private void decrementAllocation(ReservationAllocation reservation) { |
| assert (readWriteLock.isWriteLockedByCurrentThread()); |
| Map<ReservationInterval, Resource> allocationRequests = |
| reservation.getAllocationRequests(); |
| String user = reservation.getUser(); |
| long period = reservation.getPeriodicity(); |
| RLESparseResourceAllocation resAlloc = |
| getUserRLEResourceAllocation(user, period); |
| |
| long earliestActive = Long.MAX_VALUE; |
| long latestActive = Long.MIN_VALUE; |
| for (Map.Entry<ReservationInterval, Resource> r : allocationRequests |
| .entrySet()) { |
| if (period > 0L) { |
| for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) { |
| |
| long rStart = r.getKey().getStartTime() + i * period; |
| long rEnd = r.getKey().getEndTime() + i * period; |
| |
| // handle wrap-around |
| if (rEnd > periodicRle.getTimePeriod()) { |
| long diff = rEnd - periodicRle.getTimePeriod(); |
| rEnd = periodicRle.getTimePeriod(); |
| ReservationInterval newInterval = new ReservationInterval(0, diff); |
| periodicRle.removeInterval(newInterval, r.getValue()); |
| resAlloc.removeInterval(newInterval, r.getValue()); |
| } |
| |
| ReservationInterval newInterval = |
| new ReservationInterval(rStart, rEnd); |
| periodicRle.removeInterval(newInterval, r.getValue()); |
| resAlloc.removeInterval(newInterval, r.getValue()); |
| } |
| } else { |
| rleSparseVector.removeInterval(r.getKey(), r.getValue()); |
| resAlloc.removeInterval(r.getKey(), r.getValue()); |
| if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), |
| ZERO_RESOURCE)) { |
| earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); |
| latestActive = Math.max(latestActive, r.getKey().getEndTime()); |
| } |
| } |
| } |
| gcUserRLEResourceAllocation(user, period); |
| |
| RLESparseResourceAllocation resCount = userActiveReservationCount.get(user); |
| // periodic reservations are active from start time and good till cancelled |
| if (period > 0L) { |
| earliestActive = reservation.getStartTime(); |
| latestActive = Long.MAX_VALUE; |
| } |
| resCount.removeInterval( |
| new ReservationInterval(earliestActive, latestActive), |
| Resource.newInstance(1, 1)); |
| if (resCount.isEmpty()) { |
| userActiveReservationCount.remove(user); |
| } |
| } |
| |
| public Set<ReservationAllocation> getAllReservations() { |
| readLock.lock(); |
| try { |
| if (currentReservations != null) { |
| Set<ReservationAllocation> flattenedReservations = |
| new TreeSet<ReservationAllocation>(); |
| for (Set<InMemoryReservationAllocation> res : currentReservations |
| .values()) { |
| flattenedReservations.addAll(res); |
| } |
| return flattenedReservations; |
| } else { |
| return null; |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public boolean addReservation(ReservationAllocation reservation, |
| boolean isRecovering) throws PlanningException { |
| // Verify the allocation is memory based otherwise it is not supported |
| InMemoryReservationAllocation inMemReservation = |
| (InMemoryReservationAllocation) reservation; |
| if (inMemReservation.getUser() == null) { |
| String errMsg = "The specified Reservation with ID " |
| + inMemReservation.getReservationId() + " is not mapped to any user"; |
| LOG.error(errMsg); |
| throw new IllegalArgumentException(errMsg); |
| } |
| writeLock.lock(); |
| try { |
| if (reservationTable.containsKey(inMemReservation.getReservationId())) { |
| String errMsg = "The specified Reservation with ID " |
| + inMemReservation.getReservationId() + " already exists"; |
| LOG.error(errMsg); |
| throw new IllegalArgumentException(errMsg); |
| } |
| // Validate if we can accept this reservation, throws exception if |
| // validation fails |
| if (!isRecovering) { |
| policy.validate(this, inMemReservation); |
| // we record here the time in which the allocation has been accepted |
| reservation.setAcceptanceTimestamp(clock.getTime()); |
| if (rmStateStore != null) { |
| rmStateStore.storeNewReservation( |
| ReservationSystemUtil.buildStateProto(inMemReservation), |
| getQueueName(), inMemReservation.getReservationId().toString()); |
| } |
| } |
| ReservationInterval searchInterval = new ReservationInterval( |
| inMemReservation.getStartTime(), inMemReservation.getEndTime()); |
| Set<InMemoryReservationAllocation> reservations = |
| currentReservations.get(searchInterval); |
| if (reservations == null) { |
| reservations = new HashSet<InMemoryReservationAllocation>(); |
| } |
| if (!reservations.add(inMemReservation)) { |
| LOG.error("Unable to add reservation: {} to plan.", |
| inMemReservation.getReservationId()); |
| return false; |
| } |
| currentReservations.put(searchInterval, reservations); |
| reservationTable.put(inMemReservation.getReservationId(), |
| inMemReservation); |
| incrementAllocation(inMemReservation); |
| LOG.info("Successfully added reservation: {} to plan.", |
| inMemReservation.getReservationId()); |
| return true; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public boolean updateReservation(ReservationAllocation reservation) |
| throws PlanningException { |
| writeLock.lock(); |
| boolean result = false; |
| try { |
| ReservationId resId = reservation.getReservationId(); |
| ReservationAllocation currReservation = getReservationById(resId); |
| if (currReservation == null) { |
| String errMsg = "The specified Reservation with ID " + resId |
| + " does not exist in the plan"; |
| LOG.error(errMsg); |
| throw new IllegalArgumentException(errMsg); |
| } |
| // validate if we can accept this reservation, throws exception if |
| // validation fails |
| policy.validate(this, reservation); |
| if (!removeReservation(currReservation)) { |
| LOG.error("Unable to replace reservation: {} from plan.", |
| reservation.getReservationId()); |
| return result; |
| } |
| try { |
| result = addReservation(reservation, false); |
| } catch (PlanningException e) { |
| LOG.error("Unable to update reservation: {} from plan due to {}.", |
| reservation.getReservationId(), e.getMessage()); |
| } |
| if (result) { |
| LOG.info("Successfully updated reservation: {} in plan.", |
| reservation.getReservationId()); |
| return result; |
| } else { |
| // rollback delete |
| addReservation(currReservation, false); |
| LOG.info("Rollbacked update reservation: {} from plan.", |
| reservation.getReservationId()); |
| return result; |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private boolean removeReservation(ReservationAllocation reservation) { |
| assert (readWriteLock.isWriteLockedByCurrentThread()); |
| ReservationInterval searchInterval = new ReservationInterval( |
| reservation.getStartTime(), reservation.getEndTime()); |
| Set<InMemoryReservationAllocation> reservations = |
| currentReservations.get(searchInterval); |
| if (reservations != null) { |
| if (rmStateStore != null) { |
| rmStateStore.removeReservation(getQueueName(), |
| reservation.getReservationId().toString()); |
| } |
| if (!reservations.remove(reservation)) { |
| LOG.error("Unable to remove reservation: {} from plan.", |
| reservation.getReservationId()); |
| return false; |
| } |
| if (reservations.isEmpty()) { |
| currentReservations.remove(searchInterval); |
| } |
| } else { |
| String errMsg = "The specified Reservation with ID " |
| + reservation.getReservationId() + " does not exist in the plan"; |
| LOG.error(errMsg); |
| throw new IllegalArgumentException(errMsg); |
| } |
| reservationTable.remove(reservation.getReservationId()); |
| decrementAllocation(reservation); |
| LOG.info("Sucessfully deleted reservation: {} in plan.", |
| reservation.getReservationId()); |
| return true; |
| } |
| |
| @Override |
| public boolean deleteReservation(ReservationId reservationID) { |
| writeLock.lock(); |
| try { |
| ReservationAllocation reservation = getReservationById(reservationID); |
| if (reservation == null) { |
| String errMsg = "The specified Reservation with ID " + reservationID |
| + " does not exist in the plan"; |
| LOG.error(errMsg); |
| throw new IllegalArgumentException(errMsg); |
| } |
| return removeReservation(reservation); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void archiveCompletedReservations(long tick) { |
| // Since we are looking for old reservations, read lock is optimal |
| LOG.debug("Running archival at time: {}", tick); |
| List<InMemoryReservationAllocation> expiredReservations = |
| new ArrayList<InMemoryReservationAllocation>(); |
| readLock.lock(); |
| // archive reservations and delete the ones which are beyond |
| // the reservation policy "window" |
| try { |
| long archivalTime = tick - policy.getValidWindow(); |
| ReservationInterval searchInterval = |
| new ReservationInterval(archivalTime, archivalTime); |
| SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations = |
| currentReservations.headMap(searchInterval, true); |
| if (!reservations.isEmpty()) { |
| for (Set<InMemoryReservationAllocation> reservationEntries : reservations |
| .values()) { |
| for (InMemoryReservationAllocation reservation : reservationEntries) { |
| if (reservation.getEndTime() <= archivalTime) { |
| expiredReservations.add(reservation); |
| } |
| } |
| } |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| if (expiredReservations.isEmpty()) { |
| return; |
| } |
| // Need write lock only if there are any reservations to be deleted |
| writeLock.lock(); |
| try { |
| for (InMemoryReservationAllocation expiredReservation : expiredReservations) { |
| removeReservation(expiredReservation); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<ReservationAllocation> getReservationsAtTime(long tick) { |
| return getReservations(null, new ReservationInterval(tick, tick), ""); |
| } |
| |
| @Override |
| public long getStep() { |
| return step; |
| } |
| |
| @Override |
| public SharingPolicy getSharingPolicy() { |
| return policy; |
| } |
| |
| @Override |
| public ReservationAgent getReservationAgent() { |
| return agent; |
| } |
| |
| @Override |
| public RLESparseResourceAllocation getReservationCountForUserOverTime( |
| String user, long start, long end) { |
| readLock.lock(); |
| try { |
| RLESparseResourceAllocation userResAlloc = |
| userActiveReservationCount.get(user); |
| |
| if (userResAlloc != null) { |
| return userResAlloc.getRangeOverlapping(start, end); |
| } else { |
| return new RLESparseResourceAllocation(resCalc); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public RLESparseResourceAllocation getConsumptionForUserOverTime(String user, |
| long start, long end) { |
| readLock.lock(); |
| try { |
| // merge periodic and non-periodic allocations |
| RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user); |
| RLESparseResourceAllocation userPeriodicResAlloc = |
| userPeriodicResourceAlloc.get(user); |
| |
| if (userResAlloc != null && userPeriodicResAlloc != null) { |
| return RLESparseResourceAllocation.merge(resCalc, totalCapacity, |
| userResAlloc, userPeriodicResAlloc, RLEOperator.add, start, end); |
| } |
| if (userResAlloc != null) { |
| return userResAlloc.getRangeOverlapping(start, end); |
| } |
| if (userPeriodicResAlloc != null) { |
| return userPeriodicResAlloc.getRangeOverlapping(start, end); |
| } |
| } catch (PlanningException e) { |
| LOG.warn("Exception while trying to merge periodic" |
| + " and non-periodic user allocations: {}", e.getMessage(), e); |
| } finally { |
| readLock.unlock(); |
| } |
| return new RLESparseResourceAllocation(resCalc); |
| } |
| |
| @Override |
| public Resource getTotalCommittedResources(long t) { |
| readLock.lock(); |
| try { |
| return Resources.add(rleSparseVector.getCapacityAtTime(t), |
| periodicRle.getCapacityAtTime(t)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<ReservationAllocation> getReservations(ReservationId reservationID, |
| ReservationInterval interval) { |
| return getReservations(reservationID, interval, null); |
| } |
| |
| @Override |
| public Set<ReservationAllocation> getReservations(ReservationId reservationID, |
| ReservationInterval interval, String user) { |
| if (reservationID != null) { |
| ReservationAllocation allocation = getReservationById(reservationID); |
| if (allocation == null) { |
| return Collections.emptySet(); |
| } |
| return Collections.singleton(allocation); |
| } |
| |
| long startTime = interval == null ? 0 : interval.getStartTime(); |
| long endTime = interval == null ? Long.MAX_VALUE : interval.getEndTime(); |
| |
| ReservationInterval searchInterval = |
| new ReservationInterval(endTime, Long.MAX_VALUE); |
| readLock.lock(); |
| try { |
| SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> res = |
| currentReservations.headMap(searchInterval, true); |
| if (!res.isEmpty()) { |
| Set<ReservationAllocation> flattenedReservations = new HashSet<>(); |
| for (Set<InMemoryReservationAllocation> resEntries : res.values()) { |
| for (InMemoryReservationAllocation reservation : resEntries) { |
| // validate user |
| if (user != null && !user.isEmpty() |
| && !reservation.getUser().equals(user)) { |
| continue; |
| } |
| // handle periodic reservations |
| long period = reservation.getPeriodicity(); |
| if (period > 0) { |
| // The shift is used to remove the wrap around for the |
| // reservation interval. The wrap around will still |
| // exist for the search interval. |
| long shift = reservation.getStartTime() % period; |
| // This is the duration of the reservation since |
| // duration < period. |
| long periodicReservationEnd = |
| (reservation.getEndTime() -shift) % period; |
| long periodicSearchStart = (startTime - shift) % period; |
| long periodicSearchEnd = (endTime - shift) % period; |
| long searchDuration = endTime - startTime; |
| |
| // 1. If the searchDuration is greater than the period, then |
| // the reservation is within the interval. This will allow |
| // us to ignore cases where search end > search start > |
| // reservation end. |
| // 2/3. If the search end is less than the reservation end, or if |
| // the search start is less than the reservation end, then the |
| // reservation will be in the reservation since |
| // periodic reservation start is always zero. Note that neither |
| // of those values will ever be negative. |
| // 4. If the search end is less than the search start, then |
| // there is a wrap around, and both values are implicitly |
| // greater than the reservation end because of condition 2/3, |
| // so the reservation is within the search interval. |
| if (searchDuration > period |
| || periodicSearchEnd < periodicReservationEnd |
| || periodicSearchStart < periodicReservationEnd |
| || periodicSearchStart > periodicSearchEnd) { |
| flattenedReservations.add(reservation); |
| } |
| } else { |
| // check for non-periodic reservations |
| if (reservation.getEndTime() > startTime) { |
| flattenedReservations.add(reservation); |
| } |
| } |
| } |
| } |
| return Collections.unmodifiableSet(flattenedReservations); |
| } else { |
| return Collections.emptySet(); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ReservationAllocation getReservationById(ReservationId reservationID) { |
| if (reservationID == null) { |
| return null; |
| } |
| readLock.lock(); |
| try { |
| return reservationTable.get(reservationID); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Resource getTotalCapacity() { |
| readLock.lock(); |
| try { |
| return Resources.clone(totalCapacity); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public RLESparseResourceAllocation getAvailableResourceOverTime(String user, |
| ReservationId oldId, long start, long end, long period) |
| throws PlanningException { |
| readLock.lock(); |
| try { |
| |
| // for non-periodic return simple available resources |
| if (period == 0) { |
| |
| // create RLE of totCapacity |
| TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>(); |
| totAvailable.put(start, Resources.clone(totalCapacity)); |
| RLESparseResourceAllocation totRLEAvail = |
| new RLESparseResourceAllocation(totAvailable, resCalc); |
| |
| // subtract used from available |
| RLESparseResourceAllocation netAvailable; |
| |
| netAvailable = RLESparseResourceAllocation.merge(resCalc, |
| Resources.clone(totalCapacity), totRLEAvail, rleSparseVector, |
| RLEOperator.subtractTestNonNegative, start, end); |
| |
| // remove periodic component |
| netAvailable = RLESparseResourceAllocation.merge(resCalc, |
| Resources.clone(totalCapacity), netAvailable, periodicRle, |
| RLEOperator.subtractTestNonNegative, start, end); |
| |
| // add back in old reservation used resources if any |
| ReservationAllocation old = reservationTable.get(oldId); |
| if (old != null) { |
| |
| RLESparseResourceAllocation addBackPrevious = |
| old.getResourcesOverTime(start, end); |
| netAvailable = RLESparseResourceAllocation.merge(resCalc, |
| Resources.clone(totalCapacity), netAvailable, addBackPrevious, |
| RLEOperator.add, start, end); |
| } |
| // lower it if this is needed by the sharing policy |
| netAvailable = getSharingPolicy().availableResources(netAvailable, this, |
| user, oldId, start, end); |
| return netAvailable; |
| } else { |
| |
| if (periodicRle.getTimePeriod() % period != 0) { |
| throw new PlanningException("The reservation periodicity (" + period |
| + ") must be" + " an exact divider of the system maxPeriod (" |
| + periodicRle.getTimePeriod() + ")"); |
| } |
| |
| if (period < (end - start)) { |
| throw new PlanningException( |
| "Invalid input: (end - start) = (" + end + " - " + start + ") = " |
| + (end - start) + " > period = " + period); |
| } |
| |
| // find the minimum resources available among all the instances that fit |
| // in the LCM |
| long numInstInLCM = periodicRle.getTimePeriod() / period; |
| |
| RLESparseResourceAllocation minOverLCM = |
| getAvailableResourceOverTime(user, oldId, start, end, 0); |
| for (int i = 1; i < numInstInLCM; i++) { |
| |
| long rStart = start + i * period; |
| long rEnd = end + i * period; |
| |
| // recursive invocation of non-periodic range (to pick raw-info) |
| RLESparseResourceAllocation snapShot = |
| getAvailableResourceOverTime(user, oldId, rStart, rEnd, 0); |
| |
| // time-align on start |
| snapShot.shift(-(i * period)); |
| |
| // pick the minimum amount of resources in each time interval |
| minOverLCM = |
| RLESparseResourceAllocation.merge(resCalc, getTotalCapacity(), |
| minOverLCM, snapShot, RLEOperator.min, start, end); |
| |
| } |
| |
| return minOverLCM; |
| |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Resource getMinimumAllocation() { |
| return Resources.clone(minAlloc); |
| } |
| |
| @Override |
| public void setTotalCapacity(Resource cap) { |
| writeLock.lock(); |
| try { |
| totalCapacity = Resources.clone(cap); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public long getEarliestStartTime() { |
| readLock.lock(); |
| try { |
| return rleSparseVector.getEarliestStartTime(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getLastEndTime() { |
| readLock.lock(); |
| try { |
| return rleSparseVector.getLatestNonNullTime(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ResourceCalculator getResourceCalculator() { |
| return resCalc; |
| } |
| |
| @Override |
| public String getQueueName() { |
| return queueName; |
| } |
| |
| @Override |
| public Resource getMaximumAllocation() { |
| return Resources.clone(maxAlloc); |
| } |
| |
| @Override |
| public long getMaximumPeriodicity() { |
| return this.maxPeriodicity; |
| } |
| |
| public String toCumulativeString() { |
| readLock.lock(); |
| try { |
| return rleSparseVector.toString() + "\n" + periodicRle.toString(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Planner getReplanner() { |
| return replanner; |
| } |
| |
| @Override |
| public boolean getMoveOnExpiry() { |
| return getMoveOnExpiry; |
| } |
| |
| @Override |
| public String toString() { |
| readLock.lock(); |
| try { |
| StringBuffer planStr = new StringBuffer("In-memory Plan: "); |
| planStr.append("Parent Queue: ").append(queueName) |
| .append("Total Capacity: ").append(totalCapacity).append("Step: ") |
| .append(step); |
| for (ReservationAllocation reservation : getAllReservations()) { |
| planStr.append(reservation); |
| } |
| return planStr.toString(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<ReservationAllocation> getReservationByUserAtTime(String user, |
| long t) { |
| readLock.lock(); |
| try { |
| Set<ReservationAllocation> resSet = new HashSet<ReservationAllocation>(); |
| for (ReservationAllocation ra : getReservationsAtTime(t)) { |
| String resUser = ra.getUser(); |
| if (resUser != null && resUser.equals(user)) { |
| resSet.add(ra); |
| } |
| } |
| return resSet; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public RLESparseResourceAllocation getCumulativeLoadOverTime(long start, |
| long end) throws PlanningException { |
| readLock.lock(); |
| try { |
| |
| RLESparseResourceAllocation ret = |
| rleSparseVector.getRangeOverlapping(start, end); |
| ret = RLESparseResourceAllocation.merge(resCalc, totalCapacity, ret, |
| periodicRle.getRangeOverlapping(start, end), RLEOperator.add, start, |
| end); |
| |
| return ret; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| } |