blob: 1a34a7f3f9ee8a5325d8702dfe9b4fffb0323db8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestLeafQueue {
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
RMContext rmContext;
CapacityScheduler cs;
CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
CSQueue root;
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
final static int GB = 1024;
final static String DEFAULT_RACK = "/default";
@Before
public void setUp() throws Exception {
CapacityScheduler spyCs = new CapacityScheduler();
cs = spy(spyCs);
rmContext = TestUtils.getMockRMContext();
csConf =
new CapacitySchedulerConfiguration();
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
final String newRoot = "root" + System.currentTimeMillis();
setupQueueConfiguration(csConf, newRoot);
YarnConfiguration conf = new YarnConfiguration();
cs.setConf(conf);
csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getMinimumResourceCapability()).
thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB));
when(csContext.getClusterResources()).
thenReturn(Resources.createResource(100 * 16 * GB));
root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
cs.reinitialize(csConf, rmContext);
}
private static final String A = "a";
private static final String B = "b";
private static final String C = "c";
private static final String C1 = "c1";
private static final String D = "d";
private static final String E = "e";
private void setupQueueConfiguration(
CapacitySchedulerConfiguration conf,
final String newRoot) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {newRoot});
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
conf.setCapacity(Q_newRoot, 100);
conf.setMaximumCapacity(Q_newRoot, 100);
conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
final String Q_A = Q_newRoot + "." + A;
conf.setCapacity(Q_A, 8.5f);
conf.setMaximumCapacity(Q_A, 20);
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
final String Q_B = Q_newRoot + "." + B;
conf.setCapacity(Q_B, 80);
conf.setMaximumCapacity(Q_B, 99);
conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*");
final String Q_C = Q_newRoot + "." + C;
conf.setCapacity(Q_C, 1.5f);
conf.setMaximumCapacity(Q_C, 10);
conf.setAcl(Q_C, QueueACL.SUBMIT_APPLICATIONS, " ");
conf.setQueues(Q_C, new String[] {C1});
final String Q_C1 = Q_C + "." + C1;
conf.setCapacity(Q_C1, 100);
final String Q_D = Q_newRoot + "." + D;
conf.setCapacity(Q_D, 9);
conf.setMaximumCapacity(Q_D, 11);
conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
final String Q_E = Q_newRoot + "." + E;
conf.setCapacity(Q_E, 1);
conf.setMaximumCapacity(Q_E, 1);
conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e");
}
static LeafQueue stubLeafQueue(LeafQueue queue) {
// Mock some methods for ease in these unit tests
// 1. LeafQueue.createContainer to return dummy containers
doAnswer(
new Answer<Container>() {
@Override
public Container answer(InvocationOnMock invocation)
throws Throwable {
final SchedulerApp application =
(SchedulerApp)(invocation.getArguments()[0]);
final ContainerId containerId =
TestUtils.getMockContainerId(application);
Container container = TestUtils.getMockContainer(
containerId,
((SchedulerNode)(invocation.getArguments()[1])).getNodeID(),
(Resource)(invocation.getArguments()[2]),
((Priority)invocation.getArguments()[3]));
return container;
}
}
).
when(queue).createContainer(
any(SchedulerApp.class),
any(SchedulerNode.class),
any(Resource.class),
any(Priority.class)
);
// 2. Stub out LeafQueue.parent.completedContainer
CSQueue parent = queue.getParent();
doNothing().when(parent).completedContainer(
any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class),
any(RMContainer.class), any(ContainerStatus.class),
any(RMContainerEventType.class));
return queue;
}
@Test
public void testInitializeQueue() throws Exception {
final float epsilon = 1e-5f;
//can add more sturdy test with 3-layer queues
//once MAPREDUCE:3410 is resolved
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
assertEquals(0.085, a.getCapacity(), epsilon);
assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
assertEquals(0.2, a.getMaximumCapacity(), epsilon);
assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
assertEquals(0.80, b.getCapacity(), epsilon);
assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
assertEquals(0.99, b.getMaximumCapacity(), epsilon);
assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
ParentQueue c = (ParentQueue)queues.get(C);
assertEquals(0.015, c.getCapacity(), epsilon);
assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
assertEquals(0.1, c.getMaximumCapacity(), epsilon);
assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
}
@Test
public void testSingleQueueOneUserMetrics() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B));
// Users
final String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, B);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_0, B); // same user
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, priority,
recordFactory)));
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
assertEquals(6*GB, a.getMetrics().getAvailableMB());
}
@Test
public void testUserQueueAcl() throws Exception {
// Manipulate queue 'a'
LeafQueue d = stubLeafQueue((LeafQueue) queues.get(D));
// Users
final String user_d = "user_d";
// Submit applications
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1);
SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_d, d, null,
rmContext, null);
d.submitApplication(app_0, user_d, D);
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_d, d, null,
rmContext, null);
d.submitApplication(app_1, user_d, D); // same user
}
@Test
public void testAppAttemptMetrics() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
// Users
final String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1);
SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_0, a, null,
rmContext, null);
a.submitApplication(app_0, user_0, B);
when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
AppRemovedSchedulerEvent event = new AppRemovedSchedulerEvent(
appAttemptId_0, RMAppAttemptState.FAILED);
cs.handle(event);
assertEquals(0, a.getMetrics().getAppsPending());
assertEquals(1, a.getMetrics().getAppsFailed());
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_0, a, null,
rmContext, null);
a.submitApplication(app_1, user_0, B); // same user
assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(1, a.getMetrics().getAppsPending());
when(cs.getApplication(appAttemptId_1)).thenReturn(app_0);
event = new AppRemovedSchedulerEvent(appAttemptId_0,
RMAppAttemptState.FINISHED);
cs.handle(event);
assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(0, a.getMetrics().getAppsPending());
assertEquals(0, a.getMetrics().getAppsFailed());
assertEquals(1, a.getMetrics().getAppsCompleted());
QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0);
assertEquals(1, userMetrics.getAppsSubmitted());
}
@Test
public void testSingleQueueWithOneUser() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
recordFactory)));
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(1*GB, a.getMetrics().getAllocatedMB());
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Can't allocate 3rd due to user-limit
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(3*GB, a.getMetrics().getAllocatedMB());
// One more should work, for app_1, due to user-limit-factor
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(4*GB, a.getMetrics().getAllocatedMB());
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(4*GB, a.getMetrics().getAllocatedMB());
// Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
null, RMContainerEventType.KILL);
}
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(1*GB, a.getMetrics().getAllocatedMB());
// Release each container from app_1
for (RMContainer rmContainer : app_1.getLiveContainers()) {
a.completedContainer(clusterResource, app_1, node_0, rmContainer,
null, RMContainerEventType.KILL);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(0*GB, a.getMetrics().getAllocatedMB());
assertEquals(1*GB, a.getMetrics().getAvailableMB());
}
@Test
public void testUserLimits() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
recordFactory)));
/**
* Start testing...
*/
// Set user-limit
a.setUserLimit(50);
a.setUserLimitFactor(2);
// Now, only user_0 should be active since he is the only one with
// outstanding requests
assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers());
// This commented code is key to test 'activeUsers'.
// It should fail the test if uncommented since
// it would increase 'activeUsers' to 2 and stop user_2
// Pre MAPREDUCE-3732 this test should fail without this block too
// app_2.updateResourceRequests(Collections.singletonList(
// TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
// recordFactory)));
// 1 container to user_0
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user
a.assignContainers(clusterResource, node_1);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
}
@Test
public void testHeadroomWithMaxCap() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
recordFactory)));
/**
* Start testing...
*/
// Set user-limit
a.setUserLimit(50);
a.setUserLimitFactor(2);
// Now, only user_0 should be active since he is the only one with
// outstanding requests
assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers());
// 1 container to user_0
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
recordFactory)));
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
a.assignContainers(clusterResource, node_1);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_0.getHeadroom().getMemory());
assertEquals(0*GB, app_1.getHeadroom().getMemory());
// Check headroom for app_2
LOG.info("here");
app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 0, priority,
recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
@Test
public void testSingleQueueWithMultipleUsers() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
final String user_2 = "user_2";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
SchedulerApp app_3 =
new SchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_3, user_2, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority,
recordFactory)));
/**
* Start testing...
*/
// Only 1 container
a.assignContainers(clusterResource, node_0);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Can't allocate 3rd due to user-limit
a.setUserLimit(25);
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Submit resource requests for other apps now to 'activate' them
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 3*GB, 1, priority,
recordFactory)));
app_3.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
recordFactory)));
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
// Now allocations should goto app_0 since
// user_0 is at user-limit not above it
a.assignContainers(clusterResource, node_0);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
// Revert max-capacity and user-limit-factor
// Now, allocations should goto app_3 since it's under user-limit
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
a.assignContainers(clusterResource, node_0);
assertEquals(7*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
// Now we should assign to app_3 again since user_2 is under user-limit
a.assignContainers(clusterResource, node_0);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_3.getCurrentConsumption().getMemory());
// 8. Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
null, RMContainerEventType.KILL);
}
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(3*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_3.getCurrentConsumption().getMemory());
// 9. Release each container from app_2
for (RMContainer rmContainer : app_2.getLiveContainers()) {
a.completedContainer(clusterResource, app_2, node_0, rmContainer,
null, RMContainerEventType.KILL);
}
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_3.getCurrentConsumption().getMemory());
// 10. Release each container from app_3
for (RMContainer rmContainer : app_3.getLiveContainers()) {
a.completedContainer(clusterResource, app_3, node_0, rmContainer,
null, RMContainerEventType.KILL);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
}
@Test
public void testReservation() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 1, priority,
recordFactory)));
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(1*GB, a.getMetrics().getAllocatedMB());
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(2*GB, node_0.getUsedResource().getMemory());
assertEquals(4*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now free 1 container from app_0 i.e. 1G
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(1*GB, node_0.getUsedResource().getMemory());
assertEquals(4*GB, a.getMetrics().getReservedMB());
assertEquals(1*GB, a.getMetrics().getAllocatedMB());
// Now finish another container from app_0 and fulfill the reservation
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
assertEquals(4*GB, node_0.getUsedResource().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(4*GB, a.getMetrics().getAllocatedMB());
}
@Test
public void testStolenReservedContainer() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
recordFactory)));
// Setup app_1 to request a 4GB container on host_0 and
// another 4GB container anywhere.
ArrayList<ResourceRequest> appRequests_1 =
new ArrayList<ResourceRequest>(4);
appRequests_1.add(TestUtils.createResourceRequest(host_0, 4*GB, 1,
priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1,
priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 2,
priority, recordFactory));
app_1.updateResourceRequests(appRequests_1);
// Start testing...
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(2*GB, node_0.getUsedResource().getMemory());
assertEquals(4*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// node_1 heartbeats in and gets the DEFAULT_RACK request for app_1
a.assignContainers(clusterResource, node_1);
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(4*GB, node_1.getUsedResource().getMemory());
assertEquals(4*GB, a.getMetrics().getReservedMB());
assertEquals(6*GB, a.getMetrics().getAllocatedMB());
// Now free 1 container from app_0 and try to assign to node_0
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
assertEquals(4*GB, node_0.getUsedResource().getMemory());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(8*GB, a.getMetrics().getAllocatedMB());
}
@Test
public void testReservationExchange() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(10);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(4*GB));
when(a.getMaximumAllocation()).thenReturn(Resources.createResource(4*GB));
when(a.getMinimumAllocationFactor()).thenReturn(0.25f); // 1G / 4G
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 1, priority,
recordFactory)));
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(2*GB, node_0.getUsedResource().getMemory());
// Now free 1 container from app_0 i.e. 1G, and re-reserve it
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(1*GB, node_0.getUsedResource().getMemory());
assertEquals(1, app_1.getReReservations(priority));
// Re-reserve
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(1*GB, node_0.getUsedResource().getMemory());
assertEquals(2, app_1.getReReservations(priority));
// Try to schedule on node_1 now, should *move* the reservation
a.assignContainers(clusterResource, node_1);
assertEquals(9*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(4*GB, node_1.getUsedResource().getMemory());
// Doesn't change yet... only when reservation is cancelled or a different
// container is reserved
assertEquals(2, app_1.getReReservations(priority));
// Now finish another container from app_0 and see the reservation cancelled
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
CSAssignment assignment = a.assignContainers(clusterResource, node_0);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(0*GB, node_0.getUsedResource().getMemory());
assertEquals(4*GB,
assignment.getExcessReservation().getContainer().getResource().getMemory());
}
@Test
public void testLocalityScheduling() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
// User
String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
spy(new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0 = "host_0";
String rack_0 = "rack_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
String host_1 = "host_1";
String rack_1 = "rack_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
String host_2 = "host_2";
String rack_2 = "rack_2";
SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests and submit
Priority priority = TestUtils.createMockPriority(1);
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, // one extra
priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
CSAssignment assignment = null;
// Start with off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, now we should allocate
// since missedOpportunities=3 and reqdContainers=3
assignment = a.assignContainers(clusterResource, node_2);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// NODE_LOCAL - node_0
assignment = a.assignContainers(clusterResource, node_0);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(1, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// NODE_LOCAL - node_1
assignment = a.assignContainers(clusterResource, node_1);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// Add 1 more request to check for RACK_LOCAL
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, // one extra
priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
assertEquals(2, app_0.getTotalRequiredResources(priority));
String host_3 = "host_3"; // on rack_1
SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
// Rack-delay
doReturn(1).when(a).getNodeLocalityDelay();
// Shouldn't assign RACK_LOCAL yet
assignment = a.assignContainers(clusterResource, node_3);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(1, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.RACK_LOCAL, assignment.getType());
}
@Test
public void testApplicationPriorityScheduling() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
// User
String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
spy(new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0 = "host_0";
String rack_0 = "rack_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
String host_1 = "host_1";
String rack_1 = "rack_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
String host_2 = "host_2";
String rack_2 = "rack_2";
SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests and submit
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
// P1
Priority priority_1 = TestUtils.createMockPriority(1);
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0, 1*GB, 1,
priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2,
priority_1, recordFactory));
// P2
Priority priority_2 = TestUtils.createMockPriority(2);
app_0_requests_0.add(
TestUtils.createResourceRequest(host_2, 2*GB, 1,
priority_2, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_2, 2*GB, 1,
priority_2, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1,
priority_2, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
assertEquals(2, app_0.getTotalRequiredResources(priority_1));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
assertEquals(2, app_0.getTotalRequiredResources(priority_1));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate OFF_SWITCH P1
a.assignContainers(clusterResource, node_2);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
assertEquals(1, app_0.getTotalRequiredResources(priority_1));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, DATA_LOCAL for P1
a.assignContainers(clusterResource, node_0);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
assertEquals(0, app_0.getTotalRequiredResources(priority_1));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, OFF_SWITCH for P2
a.assignContainers(clusterResource, node_1);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
assertEquals(0, app_0.getTotalRequiredResources(priority_1));
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority_2));
assertEquals(0, app_0.getTotalRequiredResources(priority_2));
}
@Test
public void testSchedulingConstraints() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
// User
String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
spy(new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0_0 = "host_0_0";
String rack_0 = "rack_0";
SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
String host_0_1 = "host_0_1";
SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
String host_1_0 = "host_1_0";
String rack_1 = "rack_1";
SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests and submit
Priority priority = TestUtils.createMockPriority(1);
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0_0, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0_1, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1_0, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
// Add one request
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one
priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
a.assignContainers(clusterResource, node_1_0);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
// since #req=0
assertEquals(0, app_0.getTotalRequiredResources(priority));
// Add one request
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one
priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
a.assignContainers(clusterResource, node_0_1);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(1, app_0.getTotalRequiredResources(priority));
// NODE_LOCAL - node_1
a.assignContainers(clusterResource, node_1_0);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
}
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
for (QueueUserACLInfo aclInfo : aclInfos) {
if (aclInfo.getUserAcls().contains(acl)) {
return true;
}
}
return false;
}
@Test
public void testInheritedQueueAcls() throws IOException {
UserGroupInformation user = UserGroupInformation.getCurrentUser();
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
ParentQueue c = (ParentQueue)queues.get(C);
LeafQueue c1 = stubLeafQueue((LeafQueue)queues.get(C1));
assertFalse(root.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
assertTrue(a.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
assertTrue(b.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
assertFalse(c.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
assertFalse(c1.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
assertTrue(hasQueueACL(
a.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
assertTrue(hasQueueACL(
b.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
assertFalse(hasQueueACL(
c.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
assertFalse(hasQueueACL(
c1.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
}
@Test (timeout = 30000)
public void testNodeLocalityAfterQueueRefresh() throws Exception {
// Manipulate queue 'e'
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
// before reinitialization
assertEquals(0, e.getNodeLocalityDelay());
csConf.setInt(CapacitySchedulerConfiguration
.NODE_LOCALITY_DELAY, 60);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResources());
// after reinitialization
assertEquals(60, e.getNodeLocalityDelay());
}
@After
public void tearDown() throws Exception {
}
}