blob: e40e978a80022bffce64ae404ef0fa5c0ff7078b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
public final class CapacitySchedulerConfigValidator {
private static final Logger LOG = LoggerFactory.getLogger(
CapacitySchedulerConfigValidator.class);
private CapacitySchedulerConfigValidator() {
throw new IllegalStateException("Utility class");
}
public static boolean validateCSConfiguration(
final Configuration oldConfParam, final Configuration newConf,
final RMContext rmContext) throws IOException {
// ensure that the oldConf is deep copied
Configuration oldConf = new Configuration(oldConfParam);
QueueMetrics.setConfigurationValidation(oldConf, true);
QueueMetrics.setConfigurationValidation(newConf, true);
CapacityScheduler liveScheduler = (CapacityScheduler) rmContext.getScheduler();
CapacityScheduler newCs = new CapacityScheduler();
try {
//TODO: extract all the validation steps and replace reinitialize with
//the specific validation steps
newCs.setConf(oldConf);
newCs.setRMContext(rmContext);
newCs.init(oldConf);
newCs.addNodes(liveScheduler.getAllNodes());
newCs.reinitialize(newConf, rmContext, true);
return true;
} finally {
newCs.stop();
}
}
public static Set<String> validatePlacementRules(
Collection<String> placementRuleStrs) throws IOException {
Set<String> distinguishRuleSet = new LinkedHashSet<>();
// fail the case if we get duplicate placementRule add in
for (String pls : placementRuleStrs) {
if (!distinguishRuleSet.add(pls)) {
throw new IOException("Invalid PlacementRule inputs which "
+ "contains duplicate rule strings");
}
}
return distinguishRuleSet;
}
public static void validateMemoryAllocation(Configuration conf) {
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
if (minMem <= 0 || minMem > maxMem) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
}
public static void validateVCores(Configuration conf) {
int minVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
int maxVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
if (minVcores <= 0 || minVcores > maxVcores) {
throw new YarnRuntimeException("Invalid resource scheduler vcores"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ "=" + minVcores
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ "=" + maxVcores + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
}
/**
* Ensure all existing queues are present. Queues cannot be deleted if it's not
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
* Previous child queue could be converted into parent queue if it is in
* STOPPED state.
*
* @param queues existing queues
* @param newQueues new queues
* @param newConf Capacity Scheduler Configuration.
* @throws IOException an I/O exception has occurred.
*/
public static void validateQueueHierarchy(
CSQueueStore queues,
CSQueueStore newQueues,
CapacitySchedulerConfiguration newConf) throws IOException {
// check that all static queues are included in the newQueues list
for (CSQueue oldQueue : queues.getQueues()) {
if (AbstractAutoCreatedLeafQueue.class.isAssignableFrom(oldQueue.getClass())) {
continue;
}
final String queuePath = oldQueue.getQueuePath();
final String configPrefix = CapacitySchedulerConfiguration.getQueuePrefix(
oldQueue.getQueuePath());
final QueueState newQueueState = createQueueState(newConf.get(configPrefix + "state"),
queuePath);
final CSQueue newQueue = newQueues.get(queuePath);
if (null == newQueue) {
// old queue doesn't exist in the new XML
if (isEitherQueueStopped(oldQueue.getState(), newQueueState)) {
LOG.info("Deleting Queue {}, as it is not present in the modified capacity " +
"configuration xml", queuePath);
} else {
if (!isDynamicQueue(oldQueue)) {
throw new IOException(oldQueue.getQueuePath() + " cannot be"
+ " deleted from the capacity scheduler configuration, as the"
+ " queue is not yet in stopped state. Current State : "
+ oldQueue.getState());
}
}
} else {
validateSameQueuePath(oldQueue, newQueue);
validateParentQueueConversion(oldQueue, newQueue);
validateLeafQueueConversion(oldQueue, newQueue);
}
}
}
private static void validateSameQueuePath(CSQueue oldQueue, CSQueue newQueue) throws IOException {
if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
// Queues cannot be moved from one hierarchy to another
throw new IOException(
oldQueue.getQueuePath() + " is moved from:" + oldQueue.getQueuePath() + " to:"
+ newQueue.getQueuePath()
+ " after refresh, which is not allowed.");
}
}
private static void validateParentQueueConversion(CSQueue oldQueue,
CSQueue newQueue) throws IOException {
if (oldQueue instanceof AbstractParentQueue) {
if (!(oldQueue instanceof ManagedParentQueue) && newQueue instanceof ManagedParentQueue) {
throw new IOException(
"Can not convert parent queue: " + oldQueue.getQueuePath()
+ " to auto create enabled parent queue since "
+ "it could have other pre-configured queues which is not "
+ "supported");
}
if (oldQueue instanceof ManagedParentQueue
&& !(newQueue instanceof ManagedParentQueue)) {
throw new IOException(
"Cannot convert auto create enabled parent queue: "
+ oldQueue.getQueuePath() + " to leaf queue. Please check "
+ " parent queue's configuration "
+ CapacitySchedulerConfiguration.AUTO_CREATE_CHILD_QUEUE_ENABLED
+ " is set to true");
}
if (newQueue instanceof AbstractLeafQueue) {
LOG.info("Converting the parent queue: {} to leaf queue.", oldQueue.getQueuePath());
}
}
}
private static void validateLeafQueueConversion(CSQueue oldQueue,
CSQueue newQueue) throws IOException {
if (oldQueue instanceof AbstractLeafQueue && newQueue instanceof AbstractParentQueue) {
if (isEitherQueueStopped(oldQueue.getState(), newQueue.getState())) {
LOG.info("Converting the leaf queue: {} to parent queue.", oldQueue.getQueuePath());
} else {
throw new IOException(
"Can not convert the leaf queue: " + oldQueue.getQueuePath()
+ " to parent queue since "
+ "it is not yet in stopped state. Current State : "
+ oldQueue.getState());
}
}
}
private static QueueState createQueueState(String state, String queuePath) {
if (state != null) {
try {
return QueueState.valueOf(state);
} catch (Exception ex) {
LOG.warn("Not a valid queue state for queue: {}, state: {}", queuePath, state);
}
}
return null;
}
private static boolean isDynamicQueue(CSQueue csQueue) {
return ((AbstractCSQueue)csQueue).isDynamicQueue();
}
private static boolean isEitherQueueStopped(QueueState a, QueueState b) {
return a == QueueState.STOPPED || b == QueueState.STOPPED;
}
}