blob: c1264e9ba914795907ae884725e5cd79af5a12a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.extractQueuePath;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.getPlacementContext;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.isStaticQueueMapping;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetAutoCreatedQueueMapping;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetQueueMapping;
public class AppNameMappingPlacementRule extends PlacementRule {
private static final Log LOG = LogFactory
.getLog(AppNameMappingPlacementRule.class);
public static final String CURRENT_APP_MAPPING = "%application";
private static final String QUEUE_MAPPING_NAME = "app-name";
private boolean overrideWithQueueMappings = false;
private List<QueueMappingEntity> mappings = null;
public AppNameMappingPlacementRule() {
this(false, null);
}
public AppNameMappingPlacementRule(boolean overrideWithQueueMappings,
List<QueueMappingEntity> newMappings) {
this.overrideWithQueueMappings = overrideWithQueueMappings;
this.mappings = newMappings;
}
@Override
public boolean initialize(CapacitySchedulerContext schedulerContext)
throws IOException {
CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info(
"Initialized queue mappings, override: " + overrideWithQueueMappings);
List<QueueMappingEntity> queueMappings =
conf.getQueueMappingEntity(QUEUE_MAPPING_NAME);
// Get new user mappings
List<QueueMappingEntity> newMappings = new ArrayList<>();
CapacitySchedulerQueueManager queueManager =
schedulerContext.getCapacitySchedulerQueueManager();
// check if mappings refer to valid queues
for (QueueMappingEntity mapping : queueMappings) {
QueuePath queuePath = extractQueuePath(mapping.getQueue());
if (isStaticQueueMapping(mapping)) {
//Try getting queue by its leaf queue name
// without splitting into parent/leaf queues
CSQueue queue = queueManager.getQueue(mapping.getQueue());
if (ifQueueDoesNotExist(queue)) {
//Try getting the queue by extracting leaf and parent queue names
//Assuming its a potential auto created leaf queue
queue = queueManager.getQueue(queuePath.getLeafQueue());
if (ifQueueDoesNotExist(queue)) {
//if leaf queue does not exist,
// this could be a potential auto created leaf queue
//validate if parent queue is specified,
// then it should exist and
// be an instance of AutoCreateEnabledParentQueue
QueueMappingEntity newMapping =
validateAndGetAutoCreatedQueueMapping(queueManager, mapping,
queuePath);
if (newMapping == null) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mapping
.getQueue());
}
newMappings.add(newMapping);
} else{
QueueMappingEntity newMapping = validateAndGetQueueMapping(
queueManager, queue, mapping, queuePath);
newMappings.add(newMapping);
}
} else{
// if queue exists, validate
// if its an instance of leaf queue
// if its an instance of auto created leaf queue,
// then extract parent queue name and update queue mapping
QueueMappingEntity newMapping = validateAndGetQueueMapping(
queueManager, queue, mapping, queuePath);
newMappings.add(newMapping);
}
} else{
//If it is a dynamic queue mapping,
// we can safely assume leaf queue name does not have '.' in it
// validate
// if parent queue is specified, then
// parent queue exists and an instance of AutoCreateEnabledParentQueue
//
QueueMappingEntity newMapping = validateAndGetAutoCreatedQueueMapping(
queueManager, mapping, queuePath);
if (newMapping != null) {
newMappings.add(newMapping);
} else{
newMappings.add(mapping);
}
}
}
if (newMappings.size() > 0) {
this.mappings = newMappings;
this.overrideWithQueueMappings = overrideWithQueueMappings;
return true;
}
return false;
}
private static boolean ifQueueDoesNotExist(CSQueue queue) {
return queue == null;
}
private ApplicationPlacementContext getAppPlacementContext(String user,
ApplicationId applicationId) throws IOException {
for (QueueMappingEntity mapping : mappings) {
if (mapping.getSource().equals(CURRENT_APP_MAPPING)) {
if (mapping.getQueue().equals(CURRENT_APP_MAPPING)) {
return getPlacementContext(mapping, String.valueOf(applicationId));
} else {
return getPlacementContext(mapping);
}
}
if (mapping.getSource().equals(applicationId.toString())) {
return getPlacementContext(mapping);
}
}
return null;
}
@Override
public ApplicationPlacementContext getPlacementForApp(
ApplicationSubmissionContext asc, String user) throws YarnException {
String queueName = asc.getQueue();
ApplicationId applicationId = asc.getApplicationId();
if (mappings != null && mappings.size() > 0) {
try {
ApplicationPlacementContext mappedQueue = getAppPlacementContext(user,
applicationId);
if (mappedQueue != null) {
// We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
//queueName will be same as mapped queue name in case of recovery
|| queueName.equals(mappedQueue.getQueue())
|| overrideWithQueueMappings) {
LOG.info("Application " + applicationId
+ " mapping [" + queueName + "] to [" + mappedQueue
+ "] override " + overrideWithQueueMappings);
return mappedQueue;
}
}
} catch (IOException ioex) {
String message = "Failed to submit application " + applicationId +
" reason: " + ioex.getMessage();
throw new YarnException(message);
}
}
return null;
}
}