blob: 0bc5cb5d4afea9dd6bd0693d40053a381641a159 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.event.Event;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ProportionalCapacityPreemptionPolicyMockFramework {
static final Log LOG =
LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
final String ROOT = CapacitySchedulerConfiguration.ROOT;
Map<String, CSQueue> nameToCSQueues = null;
Map<String, Resource> partitionToResource = null;
Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes = null;
RMNodeLabelsManager nlm = null;
RMContext rmContext = null;
ResourceCalculator rc = new DefaultResourceCalculator();
Clock mClock = null;
CapacitySchedulerConfiguration conf = null;
CapacityScheduler cs = null;
@SuppressWarnings("rawtypes")
EventHandler<Event> mDisp = null;
ProportionalCapacityPreemptionPolicy policy = null;
Resource clusterResource = null;
@SuppressWarnings("unchecked")
@Before
public void setup() {
org.apache.log4j.Logger.getRootLogger().setLevel(
org.apache.log4j.Level.DEBUG);
conf = new CapacitySchedulerConfiguration(new Configuration(false));
conf.setLong(
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
3000);
// report "ideal" preempt
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
(float) 1.0);
conf.setFloat(
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
(float) 1.0);
mClock = mock(Clock.class);
cs = mock(CapacityScheduler.class);
when(cs.getResourceCalculator()).thenReturn(rc);
when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
when(cs.getConfiguration()).thenReturn(conf);
nlm = mock(RMNodeLabelsManager.class);
mDisp = mock(EventHandler.class);
rmContext = mock(RMContext.class);
when(rmContext.getNodeLabelManager()).thenReturn(nlm);
Dispatcher disp = mock(Dispatcher.class);
when(rmContext.getDispatcher()).thenReturn(disp);
when(disp.getEventHandler()).thenReturn(mDisp);
when(cs.getRMContext()).thenReturn(rmContext);
partitionToResource = new HashMap<>();
nodeIdToSchedulerNodes = new HashMap<>();
nameToCSQueues = new HashMap<>();
}
public void buildEnv(String labelsConfig, String nodesConfig,
String queuesConfig, String appsConfig) throws IOException {
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
}
public void buildEnv(String labelsConfig, String nodesConfig,
String queuesConfig, String appsConfig,
boolean useDominantResourceCalculator) throws IOException {
if (useDominantResourceCalculator) {
when(cs.getResourceCalculator()).thenReturn(
new DominantResourceCalculator());
}
mockNodeLabelsManager(labelsConfig);
mockSchedulerNodes(nodesConfig);
for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
when(cs.getSchedulerNode(nodeId)).thenReturn(
nodeIdToSchedulerNodes.get(nodeId));
}
List<FiCaSchedulerNode> allNodes = new ArrayList<>(
nodeIdToSchedulerNodes.values());
when(cs.getAllNodes()).thenReturn(allNodes);
ParentQueue root = mockQueueHierarchy(queuesConfig);
when(cs.getRootQueue()).thenReturn(root);
when(cs.getClusterResource()).thenReturn(clusterResource);
mockApplications(appsConfig);
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
mClock);
}
private void mockContainers(String containersConfig, FiCaSchedulerApp app,
ApplicationAttemptId attemptId, String queueName,
List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
int containerId = 1;
int start = containersConfig.indexOf("=") + 1;
int end = -1;
Resource used = Resource.newInstance(0, 0);
Resource pending = Resource.newInstance(0, 0);
Priority pri = Priority.newInstance(0);
while (start < containersConfig.length()) {
while (start < containersConfig.length()
&& containersConfig.charAt(start) != '(') {
start++;
}
if (start >= containersConfig.length()) {
throw new IllegalArgumentException(
"Error containers specification, line=" + containersConfig);
}
end = start + 1;
while (end < containersConfig.length()
&& containersConfig.charAt(end) != ')') {
end++;
}
if (end >= containersConfig.length()) {
throw new IllegalArgumentException(
"Error containers specification, line=" + containersConfig);
}
// now we found start/end, get container values
String[] values = containersConfig.substring(start + 1, end).split(",");
if (values.length < 6 || values.length > 8) {
throw new IllegalArgumentException("Format to define container is:"
+ "(priority,resource,host,expression,repeat,reserved, pending)");
}
pri.setPriority(Integer.valueOf(values[0]));
Resource res = parseResourceFromString(values[1]);
NodeId host = NodeId.newInstance(values[2], 1);
String label = values[3];
String userName = "user";
int repeat = Integer.valueOf(values[4]);
boolean reserved = Boolean.valueOf(values[5]);
if (values.length >= 7) {
Resources.addTo(pending, parseResourceFromString(values[6]));
}
if (values.length == 8) {
userName = values[7];
}
for (int i = 0; i < repeat; i++) {
Container c = mock(Container.class);
Resources.addTo(used, res);
when(c.getResource()).thenReturn(res);
when(c.getPriority()).thenReturn(pri);
SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c);
RMContainerImpl rmc = mock(RMContainerImpl.class);
when(rmc.getAllocatedSchedulerKey()).thenReturn(sk);
when(rmc.getAllocatedNode()).thenReturn(host);
when(rmc.getNodeLabelExpression()).thenReturn(label);
when(rmc.getAllocatedResource()).thenReturn(res);
when(rmc.getContainer()).thenReturn(c);
when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
when(rmc.getQueueName()).thenReturn(queueName);
final ContainerId cId = ContainerId.newContainerId(attemptId,
containerId);
when(rmc.getContainerId()).thenReturn(cId);
doAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
return cId.compareTo(
((RMContainer) invocation.getArguments()[0]).getContainerId());
}
}).when(rmc).compareTo(any(RMContainer.class));
if (containerId == 1) {
when(rmc.isAMContainer()).thenReturn(true);
when(app.getAMResource(label)).thenReturn(res);
when(app.getAppAMNodePartitionName()).thenReturn(label);
}
if (reserved) {
reservedContainers.add(rmc);
when(rmc.getReservedResource()).thenReturn(res);
} else {
liveContainers.add(rmc);
}
// Add container to scheduler-node
addContainerToSchedulerNode(host, rmc, reserved);
// If this is a non-exclusive allocation
String partition = null;
if (label.isEmpty()
&& !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
.isEmpty()) {
LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
.getIgnoreExclusivityRMContainers();
if (!ignoreExclusivityContainers.containsKey(partition)) {
ignoreExclusivityContainers.put(partition,
new TreeSet<RMContainer>());
}
ignoreExclusivityContainers.get(partition).add(rmc);
}
LOG.debug("add container to app=" + attemptId + " res=" + res + " node="
+ host + " nodeLabelExpression=" + label + " partition="
+ partition);
containerId++;
}
// If app has 0 container, and it has only pending, still make sure to
// update label.
if (repeat == 0) {
when(app.getAppAMNodePartitionName()).thenReturn(label);
}
// Some more app specific aggregated data can be better filled here.
when(app.getPriority()).thenReturn(pri);
when(app.getUser()).thenReturn(userName);
when(app.getCurrentConsumption()).thenReturn(used);
when(app.getCurrentReservation())
.thenReturn(Resources.createResource(0, 0));
Map<String, Resource> pendingForDefaultPartition =
new HashMap<String, Resource>();
// Add for default partition for now.
pendingForDefaultPartition.put(label, pending);
when(app.getTotalPendingRequestsPerPartition())
.thenReturn(pendingForDefaultPartition);
// need to set pending resource in resource usage as well
ResourceUsage ru = new ResourceUsage();
ru.setUsed(label, used);
when(app.getAppAttemptResourceUsage()).thenReturn(ru);
start = end + 1;
}
}
/**
* Format is:
* <pre>
* queueName\t // app1
* (priority,resource,host,expression,#repeat,reserved)
* (priority,resource,host,expression,#repeat,reserved);
* queueName\t // app2
* </pre>
*/
private void mockApplications(String appsConfig) {
int id = 1;
HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
HashMap<String, HashMap<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
LeafQueue queue = null;
int mulp = -1;
for (String a : appsConfig.split(";")) {
String[] strs = a.split("\t");
String queueName = strs[0];
if (mulp <= 0 && strs.length > 2 && strs[2] != null) {
mulp = 100 / (new Integer(strs[2]).intValue());
}
// get containers
List<RMContainer> liveContainers = new ArrayList<RMContainer>();
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
ApplicationId appId = ApplicationId.newInstance(0L, id);
ApplicationAttemptId appAttemptId = ApplicationAttemptId
.newInstance(appId, 1);
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
when(app.getAMResource(anyString()))
.thenReturn(Resources.createResource(0, 0));
mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers,
liveContainers);
LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId);
when(app.getLiveContainers()).thenReturn(liveContainers);
when(app.getReservedContainers()).thenReturn(reservedContainers);
when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
when(app.getApplicationId()).thenReturn(appId);
when(app.getQueueName()).thenReturn(queueName);
// add to LeafQueue
queue = (LeafQueue) nameToCSQueues.get(queueName);
queue.getApplications().add(app);
queue.getAllApplications().add(app);
when(queue.getMinimumAllocation())
.thenReturn(Resource.newInstance(1,1));
when(app.getCSLeafQueue()).thenReturn(queue);
HashSet<String> users = userMap.get(queueName);
if (null == users) {
users = new HashSet<String>();
userMap.put(queueName, users);
}
users.add(app.getUser());
String label = app.getAppAMNodePartitionName();
// Get label to queue
HashMap<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue = userResourceUsagePerLabel
.get(label);
if (null == userResourceUsagePerQueue) {
userResourceUsagePerQueue = new HashMap<>();
userResourceUsagePerLabel.put(label, userResourceUsagePerQueue);
}
// Get queue to user based resource map
HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerQueue
.get(queueName);
if (null == userResourceUsage) {
userResourceUsage = new HashMap<>();
userResourceUsagePerQueue.put(queueName, userResourceUsage);
}
// Get user to its resource usage.
ResourceUsage usage = userResourceUsage.get(app.getUser());
if (null == usage) {
usage = new ResourceUsage();
userResourceUsage.put(app.getUser(), usage);
}
usage.incAMUsed(app.getAMResource(label));
usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label));
id++;
}
for (String label : userResourceUsagePerLabel.keySet()) {
for (String queueName : userMap.keySet()) {
queue = (LeafQueue) nameToCSQueues.get(queueName);
// Currently we have user-limit test support only for default label.
Resource totResoucePerPartition = partitionToResource.get("");
Resource capacity = Resources.multiply(totResoucePerPartition,
queue.getQueueCapacities().getAbsoluteCapacity());
HashSet<String> users = userMap.get(queue.getQueueName());
when(queue.getAllUsers()).thenReturn(users);
Resource userLimit;
if (mulp > 0) {
userLimit = Resources.divideAndCeil(rc, capacity, mulp);
} else {
userLimit = Resources.divideAndCeil(rc, capacity,
users.size());
}
LOG.debug("Updating user-limit from mock: totResoucePerPartition="
+ totResoucePerPartition + ", capacity=" + capacity
+ ", users.size()=" + users.size() + ", userlimit= " + userLimit
+ ",label= " + label + ",queueName= " + queueName);
HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerLabel
.get(label).get(queueName);
for (String userName : users) {
User user = new User(userName);
if (userResourceUsage != null) {
user.setResourceUsage(userResourceUsage.get(userName));
}
when(queue.getUser(eq(userName))).thenReturn(user);
when(queue.getResourceLimitForAllUsers(eq(userName),
any(Resource.class), anyString(), any(SchedulingMode.class)))
.thenReturn(userLimit);
}
}
}
}
private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
boolean isReserved) {
SchedulerNode node = nodeIdToSchedulerNodes.get(nodeId);
assert node != null;
if (isReserved) {
when(node.getReservedContainer()).thenReturn(container);
} else {
node.getCopiedListOfRunningContainers().add(container);
Resources.subtractFrom(node.getUnallocatedResource(),
container.getAllocatedResource());
}
}
/**
* Format is:
* host1=partition[ res=resource];
* host2=partition[ res=resource];
*/
private void mockSchedulerNodes(String schedulerNodesConfigStr)
throws IOException {
String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
for (String p : nodesConfigStrArray) {
String[] arr = p.split(" ");
NodeId nodeId = NodeId.newInstance(arr[0].substring(0, arr[0].indexOf("=")), 1);
String partition = arr[0].substring(arr[0].indexOf("=") + 1, arr[0].length());
FiCaSchedulerNode sn = mock(FiCaSchedulerNode.class);
when(sn.getNodeID()).thenReturn(nodeId);
when(sn.getPartition()).thenReturn(partition);
Resource totalRes = Resources.createResource(0);
if (arr.length > 1) {
String res = arr[1];
if (res.contains("res=")) {
String resSring = res.substring(
res.indexOf("res=") + "res=".length());
totalRes = parseResourceFromString(resSring);
}
}
when(sn.getTotalResource()).thenReturn(totalRes);
when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes));
// TODO, add settings of killable resources when necessary
when(sn.getTotalKillableResources()).thenReturn(Resources.none());
List<RMContainer> liveContainers = new ArrayList<>();
when(sn.getCopiedListOfRunningContainers()).thenReturn(liveContainers);
nodeIdToSchedulerNodes.put(nodeId, sn);
LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
}
}
/**
* Format is:
* <pre>
* partition0=total_resource,exclusivity;
* partition1=total_resource,exclusivity;
* ...
* </pre>
*/
private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
clusterResource = Resources.createResource(0);
for (String p : partitionConfigArr) {
String partitionName = p.substring(0, p.indexOf("="));
Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
p.indexOf(",")));
boolean exclusivity =
Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
.thenReturn(res);
when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
// add to partition to resource
partitionToResource.put(partitionName, res);
LOG.debug("add partition=" + partitionName + " totalRes=" + res
+ " exclusivity=" + exclusivity);
Resources.addTo(clusterResource, res);
}
when(nlm.getClusterNodeLabelNames()).thenReturn(
partitionToResource.keySet());
}
private Resource parseResourceFromString(String p) {
String[] resource = p.split(":");
Resource res;
if (resource.length == 1) {
res = Resources.createResource(Integer.valueOf(resource[0]));
} else {
res = Resources.createResource(Integer.valueOf(resource[0]),
Integer.valueOf(resource[1]));
}
return res;
}
/**
* Format is:
* <pre>
* root (<partition-name-1>=[guaranteed max used pending (reserved)],<partition-name-2>=..);
* -A(...);
* --A1(...);
* --A2(...);
* -B...
* </pre>
* ";" splits queues, and there should no empty lines, no extra spaces
*
* For each queue, it has configurations to specify capacities (to each
* partition), format is:
* <pre>
* -<queueName> (<labelName1>=[guaranteed max used pending], \
* <labelName2>=[guaranteed max used pending])
* {key1=value1,key2=value2}; // Additional configs
* </pre>
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private ParentQueue mockQueueHierarchy(String queueExprs) {
String[] queueExprArray = queueExprs.split(";");
ParentQueue rootQueue = null;
for (int idx = 0; idx < queueExprArray.length; idx++) {
String q = queueExprArray[idx];
CSQueue queue;
// Initialize queue
if (isParent(queueExprArray, idx)) {
ParentQueue parentQueue = mock(ParentQueue.class);
queue = parentQueue;
List<CSQueue> children = new ArrayList<CSQueue>();
when(parentQueue.getChildQueues()).thenReturn(children);
QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
when(policy.getConfigName()).thenReturn(
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy);
} else {
LeafQueue leafQueue = mock(LeafQueue.class);
final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
if (a1.getPriority() != null
&& !a1.getPriority().equals(a2.getPriority())) {
return a1.getPriority().compareTo(a2.getPriority());
}
int res = a1.getApplicationId()
.compareTo(a2.getApplicationId());
return res;
}
});
when(leafQueue.getApplications()).thenReturn(apps);
when(leafQueue.getAllApplications()).thenReturn(apps);
OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
when(so.getPreemptionIterator()).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
return apps.descendingIterator();
}
});
when(leafQueue.getOrderingPolicy()).thenReturn(so);
Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
new HashMap<>();
when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
ignorePartitionContainers);
queue = leafQueue;
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(queue.getReadLock()).thenReturn(lock.readLock());
setupQueue(queue, q, queueExprArray, idx);
if (queue.getQueueName().equals(ROOT)) {
rootQueue = (ParentQueue) queue;
}
}
return rootQueue;
}
private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
int idx) {
LOG.debug("*** Setup queue, source=" + q);
String queuePath = null;
int myLevel = getLevel(q);
if (0 == myLevel) {
// It's root
when(queue.getQueueName()).thenReturn(ROOT);
queuePath = ROOT;
}
String queueName = getQueueName(q);
when(queue.getQueueName()).thenReturn(queueName);
// Setup parent queue, and add myself to parentQueue.children-list
ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
if (null != parentQueue) {
when(queue.getParent()).thenReturn(parentQueue);
parentQueue.getChildQueues().add(queue);
// Setup my path
queuePath = parentQueue.getQueuePath() + "." + queueName;
}
when(queue.getQueuePath()).thenReturn(queuePath);
QueueCapacities qc = new QueueCapacities(0 == myLevel);
ResourceUsage ru = new ResourceUsage();
when(queue.getQueueCapacities()).thenReturn(qc);
when(queue.getQueueResourceUsage()).thenReturn(ru);
LOG.debug("Setup queue, name=" + queue.getQueueName() + " path="
+ queue.getQueuePath());
LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
.getQueueName()));
// Setup other fields like used resource, guaranteed resource, etc.
String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
for (String s : capacitySettingStr.split(",")) {
String partitionName = s.substring(0, s.indexOf("="));
String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
// Add a small epsilon to capacities to avoid truncate when doing
// Resources.multiply
float epsilon = 1e-6f;
Resource totResoucePerPartition = partitionToResource.get(partitionName);
float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
parseResourceFromString(values[0].trim()), totResoucePerPartition)
+ epsilon;
float absMax = Resources.divide(rc, totResoucePerPartition,
parseResourceFromString(values[1].trim()), totResoucePerPartition)
+ epsilon;
float absUsed = Resources.divide(rc, totResoucePerPartition,
parseResourceFromString(values[2].trim()), totResoucePerPartition)
+ epsilon;
float used = Resources.divide(rc, totResoucePerPartition,
parseResourceFromString(values[2].trim()),
parseResourceFromString(values[0].trim())) + epsilon;
Resource pending = parseResourceFromString(values[3].trim());
qc.setAbsoluteCapacity(partitionName, absGuaranteed);
qc.setAbsoluteMaximumCapacity(partitionName, absMax);
qc.setAbsoluteUsedCapacity(partitionName, absUsed);
qc.setUsedCapacity(partitionName, used);
when(queue.getUsedCapacity()).thenReturn(used);
ru.setPending(partitionName, pending);
// Setup reserved resource if it contained by input config
Resource reserved = Resources.none();
if(values.length == 5) {
reserved = parseResourceFromString(values[4].trim());
ru.setReserved(partitionName, reserved);
}
if (!isParent(queueExprArray, idx)) {
LeafQueue lq = (LeafQueue) queue;
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(false))).thenReturn(pending);
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(true))).thenReturn(
Resources.subtract(pending, reserved));
}
ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+ ",abs_used" + absUsed + ",pending_resource=" + pending
+ ", reserved_resource=" + reserved + "]");
}
// Setup preemption disabled
when(queue.getPreemptionDisabled()).thenReturn(
conf.getPreemptionDisabled(queuePath, false));
// Setup other queue configurations
Map<String, String> otherConfigs = getOtherConfigurations(
queueExprArray[idx]);
if (otherConfigs.containsKey("priority")) {
when(queue.getPriority()).thenReturn(
Priority.newInstance(Integer.valueOf(otherConfigs.get("priority"))));
} else {
// set queue's priority to 0 by default
when(queue.getPriority()).thenReturn(Priority.newInstance(0));
}
// Setup disable preemption of queues
if (otherConfigs.containsKey("disable_preemption")) {
when(queue.getPreemptionDisabled()).thenReturn(
Boolean.valueOf(otherConfigs.get("disable_preemption")));
}
nameToCSQueues.put(queueName, queue);
when(cs.getQueue(eq(queueName))).thenReturn(queue);
}
/**
* Get additional queue's configurations
* @param queueExpr queue expr
* @return maps of configs
*/
private Map<String, String> getOtherConfigurations(String queueExpr) {
if (queueExpr.contains("{")) {
int left = queueExpr.indexOf('{');
int right = queueExpr.indexOf('}');
if (right > left) {
Map<String, String> configs = new HashMap<>();
String subStr = queueExpr.substring(left + 1, right);
for (String kv : subStr.split(",")) {
if (kv.contains("=")) {
String key = kv.substring(0, kv.indexOf("="));
String value = kv.substring(kv.indexOf("=") + 1);
configs.put(key, value);
}
}
return configs;
}
}
return Collections.EMPTY_MAP;
}
/**
* Level of a queue is how many "-" at beginning, root's level is 0
*/
private int getLevel(String q) {
int level = 0; // level = how many "-" at beginning
while (level < q.length() && q.charAt(level) == '-') {
level++;
}
return level;
}
private String getQueueName(String q) {
int idx = 0;
// find first != '-' char
while (idx < q.length() && q.charAt(idx) == '-') {
idx++;
}
if (idx == q.length()) {
throw new IllegalArgumentException("illegal input:" + q);
}
// name = after '-' and before '('
String name = q.substring(idx, q.indexOf('('));
if (name.isEmpty()) {
throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
}
if (name.contains(".")) {
throw new IllegalArgumentException("queue name shouldn't contain '.':"
+ name);
}
return name;
}
private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
idx--;
while (idx >= 0) {
int level = getLevel(queueExprArray[idx]);
if (level < myLevel) {
String parentQueuName = getQueueName(queueExprArray[idx]);
return (ParentQueue) nameToCSQueues.get(parentQueuName);
}
idx--;
}
return null;
}
/**
* Get if a queue is ParentQueue
*/
private boolean isParent(String[] queues, int idx) {
int myLevel = getLevel(queues[idx]);
idx++;
while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
idx++;
}
if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
// It's a LeafQueue
return false;
} else {
return true;
}
}
public ApplicationAttemptId getAppAttemptId(int id) {
ApplicationId appId = ApplicationId.newInstance(0L, id);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
return appAttemptId;
}
public void checkContainerNodesInApp(FiCaSchedulerApp app,
int expectedContainersNumber, String host) {
NodeId nodeId = NodeId.newInstance(host, 1);
int num = 0;
for (RMContainer c : app.getLiveContainers()) {
if (c.getAllocatedNode().equals(nodeId)) {
num++;
}
}
for (RMContainer c : app.getReservedContainers()) {
if (c.getAllocatedNode().equals(nodeId)) {
num++;
}
}
Assert.assertEquals(expectedContainersNumber, num);
}
public FiCaSchedulerApp getApp(String queueName, int appId) {
for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
.getApplications()) {
if (app.getApplicationId().getId() == appId) {
return app;
}
}
return null;
}
public void checkAbsCapacities(CSQueue queue, String partition,
float guaranteed, float max, float used) {
QueueCapacities qc = queue.getQueueCapacities();
Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
}
public void checkPendingResource(CSQueue queue, String partition, int pending) {
ResourceUsage ru = queue.getQueueResourceUsage();
Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
}
public void checkPriority(CSQueue queue, int expectedPriority) {
Assert.assertEquals(expectedPriority, queue.getPriority().getPriority());
}
public void checkReservedResource(CSQueue queue, String partition, int reserved) {
ResourceUsage ru = queue.getQueueResourceUsage();
Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
}
static class IsPreemptionRequestForQueueAndNode
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
private final String queueName;
private final NodeId nodeId;
IsPreemptionRequestForQueueAndNode(ApplicationAttemptId appAttId,
String queueName, NodeId nodeId) {
this.appAttId = appAttId;
this.queueName = queueName;
this.nodeId = nodeId;
}
@Override
public boolean matches(Object o) {
ContainerPreemptEvent cpe = (ContainerPreemptEvent)o;
return appAttId.equals(cpe.getAppId())
&& queueName.equals(cpe.getContainer().getQueueName())
&& nodeId.equals(cpe.getContainer().getAllocatedNode());
}
@Override
public String toString() {
return appAttId.toString();
}
}
}