blob: 73b8a9b27d37734b23404d6b59343bc27b01536a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* This data structure stores a periodic {@link RLESparseResourceAllocation}.
* Default period is 1 day (86400000ms).
*/
public class PeriodicRLESparseResourceAllocation
extends RLESparseResourceAllocation {
// Log
private static final Logger LOG =
LoggerFactory.getLogger(PeriodicRLESparseResourceAllocation.class);
private long timePeriod;
/**
* Constructor.
*
* @param resourceCalculator {@link ResourceCalculator} the resource
* calculator to use.
* @param timePeriod Time period in milliseconds.
*/
public PeriodicRLESparseResourceAllocation(
ResourceCalculator resourceCalculator, Long timePeriod) {
super(resourceCalculator);
this.timePeriod = timePeriod;
}
/**
* Constructor. Default time period set to 1 day.
*
* @param resourceCalculator {@link ResourceCalculator} the resource
* calculator to use..
*/
public PeriodicRLESparseResourceAllocation(
ResourceCalculator resourceCalculator) {
this(resourceCalculator,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
}
/**
* Constructor.
*
* @param rleVector {@link RLESparseResourceAllocation} with the run-length
* encoded data.
* @param timePeriod Time period in milliseconds.
*/
@VisibleForTesting
public PeriodicRLESparseResourceAllocation(
RLESparseResourceAllocation rleVector, Long timePeriod) {
super(rleVector.getCumulative(), rleVector.getResourceCalculator());
this.timePeriod = timePeriod;
// make sure the PeriodicRLE is zero-based, and handles wrap-around
long delta = (getEarliestStartTime() % timePeriod - getEarliestStartTime());
shift(delta);
List<Long> toRemove = new ArrayList<>();
Map<Long, Resource> toAdd = new TreeMap<>();
for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
if (entry.getKey() > timePeriod) {
toRemove.add(entry.getKey());
if (entry.getValue() != null) {
toAdd.put(timePeriod, entry.getValue());
long prev = entry.getKey() % timePeriod;
toAdd.put(prev, this.getCapacityAtTime(prev));
toAdd.put(0L, entry.getValue());
}
}
}
for (Long l : toRemove) {
cumulativeCapacity.remove(l);
}
cumulativeCapacity.putAll(toAdd);
}
/**
* Get capacity at time based on periodic repetition.
*
* @param tick UTC time for which the allocated {@link Resource} is queried.
* @return {@link Resource} allocated at specified time
*/
public Resource getCapacityAtTime(long tick) {
long convertedTime = (tick % timePeriod);
return super.getCapacityAtTime(convertedTime);
}
/**
* Add resource for the specified interval. This function will be used by
* {@link InMemoryPlan} while placing reservations between 0 and timePeriod.
* The interval may include 0, but the end time must be strictly less than
* timePeriod.
*
* @param interval {@link ReservationInterval} to which the specified resource
* is to be added.
* @param resource {@link Resource} to be added to the interval specified.
* @return true if addition is successful, false otherwise
*/
public boolean addInterval(ReservationInterval interval, Resource resource) {
long startTime = interval.getStartTime();
long endTime = interval.getEndTime();
if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) {
return super.addInterval(interval, resource);
} else {
LOG.info("Cannot set capacity beyond end time: " + timePeriod + " was ("
+ interval.toString() + ")");
return false;
}
}
/**
* Removes a resource for the specified interval.
*
* @param interval the {@link ReservationInterval} for which the resource is
* to be removed.
* @param resource the {@link Resource} to be removed.
* @return true if removal is successful, false otherwise
*/
public boolean removeInterval(ReservationInterval interval,
Resource resource) {
long startTime = interval.getStartTime();
long endTime = interval.getEndTime();
// If the resource to be subtracted is less than the minimum resource in
// the range, abort removal to avoid negative capacity.
// TODO revesit decrementing endTime
if (!Resources.fitsIn(resource, getMinimumCapacityInInterval(
new ReservationInterval(startTime, endTime - 1)))) {
LOG.info("Request to remove more resources than what is available");
return false;
}
if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) {
return super.removeInterval(interval, resource);
} else {
LOG.info("Interval extends beyond the end time " + timePeriod);
return false;
}
}
/**
* Get maximum capacity at periodic offsets from the specified time.
*
* @param tick UTC time base from which offsets are specified for finding the
* maximum capacity.
* @param period periodic offset at which capacities are evaluated.
* @return the maximum {@link Resource} across the specified time instants.
* @return true if removal is successful, false otherwise
*/
public Resource getMaximumPeriodicCapacity(long tick, long period) {
Resource maxResource;
if (period < timePeriod) {
maxResource = super.getMaximumPeriodicCapacity(tick % timePeriod, period);
} else {
// if period is greater than the length of PeriodicRLESparseAllocation,
// only a single value exists in this interval.
maxResource = super.getCapacityAtTime(tick % timePeriod);
}
return maxResource;
}
/**
* Get time period of PeriodicRLESparseResourceAllocation.
*
* @return timePeriod time period represented in ms.
*/
public long getTimePeriod() {
return this.timePeriod;
}
@Override
public String toString() {
StringBuilder ret = new StringBuilder();
ret.append("Period: ").append(timePeriod).append("\n")
.append(super.toString());
if (super.isEmpty()) {
ret.append(" no allocations\n");
}
return ret.toString();
}
@Override
public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
NavigableMap<Long, Resource> unrolledMap = new TreeMap<>();
readLock.lock();
try {
long relativeStart = (start >= 0) ? start % timePeriod : 0;
NavigableMap<Long, Resource> cumulativeMap = this.getCumulative();
Long previous = cumulativeMap.floorKey(relativeStart);
previous = (previous != null) ? previous : 0;
//make sure to go one past end, to catch end times extending past period
for (long i = 0; i <= 1 + (end - start) / timePeriod; i++) {
for (Map.Entry<Long, Resource> e : cumulativeMap.entrySet()) {
long curKey = e.getKey() + (i * timePeriod);
if (curKey >= previous && (start + curKey - relativeStart) <= end) {
unrolledMap.put(curKey, e.getValue());
}
}
}
RLESparseResourceAllocation rle =
new RLESparseResourceAllocation(unrolledMap, getResourceCalculator());
rle.shift(start - relativeStart);
return rle;
} finally {
readLock.unlock();
}
}
}