blob: f2b0e5aaf6230c0db95cd2be5848f4d46c69e3fa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This represents a dynamic queue managed by the {@link ReservationSystem}.
* From the user perspective this is equivalent to a LeafQueue that respect
* reservations, but functionality wise is a sub-class of ParentQueue
*
*/
public class PlanQueue extends AbstractManagedParentQueue {
private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);
private int maxAppsForReservation;
private int maxAppsPerUserForReservation;
private int userLimit;
private float userLimitFactor;
protected CapacitySchedulerContext schedulerContext;
private boolean showReservationsAsQueues;
public PlanQueue(CapacitySchedulerContext cs, String queueName,
CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
updateAbsoluteCapacities();
this.schedulerContext = cs;
// Set the reservation queue attributes for the Plan
CapacitySchedulerConfiguration conf = cs.getConfiguration();
String queuePath = super.getQueuePath();
int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath);
showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath);
if (maxAppsForReservation < 0) {
maxAppsForReservation =
(int) (CapacitySchedulerConfiguration.
DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super
.getAbsoluteCapacity());
}
int userLimit = conf.getUserLimit(queuePath);
float userLimitFactor = conf.getUserLimitFactor(queuePath);
int maxAppsPerUserForReservation =
(int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
if (userLimitFactor == -1) {
maxAppsPerUserForReservation = maxAppsForReservation;
}
updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
maxAppsPerUserForReservation);
StringBuffer queueInfo = new StringBuffer();
queueInfo.append("Created Plan Queue: ").append(queueName)
.append("\nwith capacity: [").append(super.getCapacity())
.append("]\nwith max capacity: [").append(super.getMaximumCapacity())
.append("\nwith max reservation apps: [").append(maxAppsForReservation)
.append("]\nwith max reservation apps per user: [")
.append(maxAppsPerUserForReservation).append("]\nwith user limit: [")
.append(userLimit).append("]\nwith user limit factor: [")
.append(userLimitFactor).append("].");
LOG.info(queueInfo.toString());
}
@Override
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
writeLock.lock();
try {
// Sanity check
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
if (newlyParsedParentQueue.getChildQueues().size() != 1) {
throw new IOException(
"Reservable Queue should not have sub-queues in the"
+ "configuration expect the default reservation queue");
}
// Set new configs
setupQueueConfigs(clusterResource, csContext.getConfiguration());
updateQuotas(newlyParsedParentQueue.userLimit,
newlyParsedParentQueue.userLimitFactor,
newlyParsedParentQueue.maxAppsForReservation,
newlyParsedParentQueue.maxAppsPerUserForReservation);
// run reinitialize on each existing queue, to trigger absolute cap
// recomputations
for (CSQueue res : this.getChildQueues()) {
res.reinitialize(res, clusterResource);
}
showReservationsAsQueues =
newlyParsedParentQueue.showReservationsAsQueues;
} finally {
writeLock.unlock();
}
}
private void updateQuotas(int userLimit, float userLimitFactor,
int maxAppsForReservation, int maxAppsPerUserForReservation) {
this.userLimit = userLimit;
this.userLimitFactor = userLimitFactor;
this.maxAppsForReservation = maxAppsForReservation;
this.maxAppsPerUserForReservation = maxAppsPerUserForReservation;
}
/**
* Number of maximum applications for each of the reservations in this Plan.
*
* @return maxAppsForreservation
*/
public int getMaxApplicationsForReservations() {
return maxAppsForReservation;
}
/**
* Number of maximum applications per user for each of the reservations in
* this Plan.
*
* @return maxAppsPerUserForreservation
*/
public int getMaxApplicationsPerUserForReservation() {
return maxAppsPerUserForReservation;
}
/**
* User limit value for each of the reservations in this Plan.
*
* @return userLimit
*/
public int getUserLimitForReservation() {
return userLimit;
}
/**
* User limit factor value for each of the reservations in this Plan.
*
* @return userLimitFactor
*/
public float getUserLimitFactor() {
return userLimitFactor;
}
/**
* Determine whether to hide/show the ReservationQueues
*/
public boolean showReservationsAsQueues() {
return showReservationsAsQueues;
}
}