blob: 6f54d47719b55deb2936ed5829b6e975189bec6d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestContainerAllocation {
private static final Log LOG = LogFactory
.getLog(TestContainerAllocation.class);
private final int GB = 1024;
private YarnConfiguration conf;
RMNodeLabelsManager mgr;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
@Test(timeout = 60000)
public void testExcessReservationThanNodeManagerCapacity() throws Exception {
@SuppressWarnings("resource")
MockRM rm = new MockRM(conf);
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 2 * GB, 4);
MockNM nm2 = rm.registerNode("127.0.0.1:2234", 3 * GB, 4);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
// wait..
int waitCount = 20;
int size = rm.getRMContext().getRMNodes().size();
while ((size = rm.getRMContext().getRMNodes().size()) != 2
&& waitCount-- > 0) {
LOG.info("Waiting for node managers to register : " + size);
Thread.sleep(100);
}
Assert.assertEquals(2, rm.getRMContext().getRMNodes().size());
// Submit an application
RMApp app1 = rm.submitApp(128);
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
LOG.info("sending container requests ");
am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler
nm1.nodeHeartbeat(true);
int waitCounter = 20;
LOG.info("heartbeating nm1");
while (alloc1Response.getAllocatedContainers().size() < 1
&& waitCounter-- > 0) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(500);
alloc1Response = am1.schedule();
}
LOG.info("received container : "
+ alloc1Response.getAllocatedContainers().size());
// No container should be allocated.
// Internally it should not been reserved.
Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 0);
LOG.info("heartbeating nm2");
waitCounter = 20;
nm2.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1
&& waitCounter-- > 0) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(500);
alloc1Response = am1.schedule();
}
LOG.info("received container : "
+ alloc1Response.getAllocatedContainers().size());
Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 1);
rm.stop();
}
// This is to test container tokens are generated when the containers are
// acquired by the AM, not when the containers are allocated
@Test
public void testContainerTokenGeneratedOnPullRequest() throws Exception {
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
RMContainer container =
rm1.getResourceScheduler().getRMContainer(containerId2);
// no container token is generated.
Assert.assertEquals(containerId2, container.getContainerId());
Assert.assertNull(container.getContainer().getContainerToken());
// acquire the container.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
Assert.assertEquals(containerId2, containers.get(0).getId());
// container token is generated.
Assert.assertNotNull(containers.get(0).getContainerToken());
rm1.stop();
}
@Test
public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
// acquire the container.
SecurityUtilTestHelper.setTokenServiceUseIp(true);
List<Container> containers;
try {
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
// not able to fetch the container;
Assert.assertEquals(0, containers.size());
} finally {
SecurityUtilTestHelper.setTokenServiceUseIp(false);
}
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
// should be able to fetch the container;
Assert.assertEquals(1, containers.size());
}
// This is to test whether LogAggregationContext is passed into
// container tokens correctly
@Test
public void testLogAggregationContextPassedIntoContainerToken()
throws Exception {
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
MockNM nm2 = rm1.registerNode("127.0.0.1:2345", 8000);
// LogAggregationContext is set as null
Assert
.assertNull(getLogAggregationContextFromContainerToken(rm1, nm1, null));
// create a not-null LogAggregationContext
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance(
"includePattern", "excludePattern",
"rolledLogsIncludePattern",
"rolledLogsExcludePattern",
"policyClass",
"policyParameters");
LogAggregationContext returned =
getLogAggregationContextFromContainerToken(rm1, nm2,
logAggregationContext);
Assert.assertEquals("includePattern", returned.getIncludePattern());
Assert.assertEquals("excludePattern", returned.getExcludePattern());
Assert.assertEquals("rolledLogsIncludePattern",
returned.getRolledLogsIncludePattern());
Assert.assertEquals("rolledLogsExcludePattern",
returned.getRolledLogsExcludePattern());
Assert.assertEquals("policyClass",
returned.getLogAggregationPolicyClassName());
Assert.assertEquals("policyParameters",
returned.getLogAggregationPolicyParameters());
rm1.stop();
}
private LogAggregationContext getLogAggregationContextFromContainerToken(
MockRM rm1, MockNM nm1, LogAggregationContext logAggregationContext)
throws Exception {
RMApp app2 = rm1.submitApp(200, logAggregationContext);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
nm1.nodeHeartbeat(true);
// request a container.
am2.allocate("127.0.0.1", 512, 1, new ArrayList<ContainerId>());
ContainerId containerId =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId, RMContainerState.ALLOCATED);
// acquire the container.
List<Container> containers =
am2.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
Assert.assertEquals(containerId, containers.get(0).getId());
// container token is generated.
Assert.assertNotNull(containers.get(0).getContainerToken());
ContainerTokenIdentifier token =
BuilderUtils.newContainerTokenIdentifier(containers.get(0)
.getContainerToken());
return token.getLogAggregationContext();
}
private volatile int numRetries = 0;
private class TestRMSecretManagerService extends RMSecretManagerService {
public TestRMSecretManagerService(Configuration conf,
RMContextImpl rmContext) {
super(conf, rmContext);
}
@Override
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
return new RMContainerTokenSecretManager(conf) {
@Override
public Token createContainerToken(ContainerId containerId,
int containerVersion, NodeId nodeId, String appSubmitter,
Resource capability, Priority priority, long createTime,
LogAggregationContext logAggregationContext, String nodeLabelExp,
ContainerType containerType, ExecutionType executionType,
long allocationRequestId) {
numRetries++;
return super.createContainerToken(containerId, containerVersion,
nodeId, appSubmitter, capability, priority, createTime,
logAggregationContext, nodeLabelExp, containerType,
executionType, allocationRequestId);
}
};
}
}
// This is to test fetching AM container will be retried, if AM container is
// not fetchable since DNS is unavailable causing container token/NMtoken
// creation failure.
@Test(timeout = 30000)
public void testAMContainerAllocationWhenDNSUnavailable() throws Exception {
MockRM rm1 = new MockRM(conf) {
@Override
protected RMSecretManagerService createRMSecretManagerService() {
return new TestRMSecretManagerService(conf, rmContext);
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
RMApp app1;
try {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
app1 = rm1.submitApp(200);
RMAppAttempt attempt = app1.getCurrentAppAttempt();
nm1.nodeHeartbeat(true);
// fetching am container will fail, keep retrying 5 times.
while (numRetries <= 5) {
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
Assert.assertEquals(RMAppAttemptState.SCHEDULED,
attempt.getAppAttemptState());
System.out.println("Waiting for am container to be allocated.");
}
} finally {
SecurityUtilTestHelper.setTokenServiceUseIp(false);
}
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
@Test(timeout = 60000)
public void testExcessReservationWillBeUnreserved() throws Exception {
/**
* Test case: Submit two application (app1/app2) to a queue. And there's one
* node with 8G resource in the cluster. App1 allocates a 6G container, Then
* app2 asks for a 4G container. App2's request will be reserved on the
* node.
*
* Before next node heartbeat, app2 cancels the reservation, we should found
* the reserved resource is cancelled as well.
*/
// inject node label manager
MockRM rm1 = new MockRM();
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
// container for app2
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// App2 will get preference to be allocated on node1, and node1 will be all
// used by App2.
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// Check if a 4G contaienr allocated for app1, and nothing allocated for app2
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemorySize());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Usage of queue = 4G + 2 * 1G + 4G (reserved)
Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemorySize());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemorySize());
Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemorySize());
// Cancel asks of app2 and re-kick RM
am2.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// App2's reservation will be cancelled
Assert.assertTrue(schedulerApp2.getReservedContainers().size() == 0);
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemorySize());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemorySize());
Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemorySize());
Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved()
.getMemorySize());
rm1.close();
}
@Test(timeout = 60000)
public void testAllocationForReservedContainer() throws Exception {
/**
* Test case: Submit two application (app1/app2) to a queue. And there's one
* node with 8G resource in the cluster. App1 allocates a 6G container, Then
* app2 asks for a 4G container. App2's request will be reserved on the
* node.
*
* Before next node heartbeat, app1 container is completed/killed. So app1
* container which was reserved will be allocated.
*/
// inject node label manager
MockRM rm1 = new MockRM();
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
// container for app2
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// App2 will get preference to be allocated on node1, and node1 will be all
// used by App2.
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// Check if a 4G container allocated for app1, and nothing allocated for app2
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemorySize());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Usage of queue = 4G + 2 * 1G + 4G (reserved)
Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemorySize());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemorySize());
Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemorySize());
// Mark one app1 container as killed/completed and re-kick RM
for (RMContainer container : schedulerApp1.getLiveContainers()) {
if (container.isAMContainer()) {
continue;
}
cs.markContainerForKillable(container);
}
// Cancel asks of app1 and re-kick RM
am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Check 4G container cancelled for app1, and one container allocated for
// app2
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemorySize());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Usage of queue = 4G + 2 * 1G
Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemorySize());
Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemorySize());
rm1.close();
}
@Test(timeout = 60000)
public void testReservedContainerMetricsOnDecommisionedNode() throws Exception {
/**
* Test case: Submit two application (app1/app2) to a queue. And there's one
* node with 8G resource in the cluster. App1 allocates a 6G container, Then
* app2 asks for a 4G container. App2's request will be reserved on the
* node.
*
* Before next node heartbeat, app1 container is completed/killed. So app1
* container which was reserved will be allocated.
*/
// inject node label manager
MockRM rm1 = new MockRM();
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
// container for app2
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// App2 will get preference to be allocated on node1, and node1 will be all
// used by App2.
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// Check if a 4G container allocated for app1, and nothing allocated for app2
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemorySize());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Usage of queue = 4G + 2 * 1G + 4G (reserved)
Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemorySize());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemorySize());
Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemorySize());
// Remove the node
cs.handle(new NodeRemovedSchedulerEvent(rmNode1));
// Check all container cancelled for app1 and app2
Assert.assertEquals(0, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp2.getLiveContainers().size());
Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
// Usage and Reserved capacity of queue is 0
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemorySize());
Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemorySize());
rm1.close();
}
@Test(timeout = 60000)
public void testAssignMultipleOffswitchContainers() throws Exception {
MockRM rm1 = new MockRM();
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 80 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
// Do node heartbeats once
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
// App1 will get one container allocated (plus AM container
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
// Set assign multiple off-switch containers to 3
CapacitySchedulerConfiguration newCSConf = new CapacitySchedulerConfiguration();
newCSConf.setInt(
CapacitySchedulerConfiguration.OFFSWITCH_PER_HEARTBEAT_LIMIT, 3);
cs.reinitialize(newCSConf, rm1.getRMContext());
// Do node heartbeats once
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// App1 will get 3 new container allocated (plus 2 previously allocated
// container)
Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
rm1.close();
}
@Test(timeout = 60000)
public void testContinuousReservationLookingWhenUsedEqualsMax() throws Exception {
CapacitySchedulerConfiguration newConf =
(CapacitySchedulerConfiguration) TestUtils
.getConfigurationWithMultipleQueues(conf);
// Set maximum capacity of A to 10
newConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 10);
MockRM rm1 = new MockRM(newConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 90 * GB);
// launch an app to queue A, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch 2nd app to queue B, AM container should be launched in nm1
// Now usage of nm1 is 3G (2G + 1G)
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
am1.allocate("*", 4 * GB, 2, null);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// Do node heartbeats twice, we expect one container allocated on nm1 and
// one container reserved on nm1.
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
// App1 will get 2 container allocated (plus AM container)
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
// Do node heartbeats on nm2, we expect one container allocated on nm2 and
// one unreserved on nm1
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
rm1.close();
}
@Test
public void testPendingResourcesConsideringUserLimit() throws Exception {
// Set maximum capacity of A to 10
CapacitySchedulerConfiguration newConf = new CapacitySchedulerConfiguration(
conf);
newConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".default",
0.5f);
newConf.setMaximumAMResourcePercentPerPartition(
CapacitySchedulerConfiguration.ROOT + ".default", "", 1.0f);
MockRM rm1 = new MockRM(newConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue default, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(2 * GB, "app", "u1", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch 2nd app to queue default, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(4 * GB, "app", "u2", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// am1 asks 1 * 3G container
am1.allocate("*", 3 * GB, 1, null);
// am2 asks 4 * 5G container
am2.allocate("*", 5 * GB, 4, null);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// Do node heartbeats one, we expect one container allocated reserved on nm1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
// App1 will get 1 container reserved
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
/*
* Note that the behavior of appAttemptResourceUsage is different from queue's
* For queue, used = actual-used + reserved
* For app, used = actual-used.
*
* TODO (wangda): Need to make behaviors of queue/app's resource usage
* consistent
*/
Assert.assertEquals(2 * GB,
schedulerApp1.getAppAttemptResourceUsage().getUsed().getMemorySize());
Assert.assertEquals(3 * GB,
schedulerApp1.getAppAttemptResourceUsage().getReserved()
.getMemorySize());
Assert.assertEquals(3 * GB,
schedulerApp1.getAppAttemptResourceUsage().getPending()
.getMemorySize());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
Assert.assertEquals(4 * GB,
schedulerApp2.getAppAttemptResourceUsage().getUsed().getMemorySize());
Assert.assertEquals(0 * GB,
schedulerApp2.getAppAttemptResourceUsage().getReserved()
.getMemorySize());
Assert.assertEquals(5 * 4 * GB,
schedulerApp2.getAppAttemptResourceUsage().getPending()
.getMemorySize());
LeafQueue lq = (LeafQueue) cs.getQueue("default");
// UL = 8GB, so head room of u1 = 8GB - 2GB (AM) - 3GB (Reserved) = 3GB
// u2 = 8GB - 4GB = 4GB
// When not deduct reserved, total-pending = 3G (u1) + 4G (u2) = 7G
// deduct reserved, total-pending = 0G (u1) + 4G = 4G
Assert.assertEquals(7 * GB, lq.getTotalPendingResourcesConsideringUserLimit(
Resources.createResource(20 * GB), "", false).getMemorySize());
Assert.assertEquals(4 * GB, lq.getTotalPendingResourcesConsideringUserLimit(
Resources.createResource(20 * GB), "", true).getMemorySize());
rm1.close();
}
@Test(timeout = 60000)
public void testQueuePriorityOrdering() throws Exception {
CapacitySchedulerConfiguration newConf =
(CapacitySchedulerConfiguration) TestUtils
.getConfigurationWithMultipleQueues(conf);
// Set ordering policy
newConf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
// Set maximum capacity of A to 20
newConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 20);
newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2);
newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 3);
MockRM rm1 = new MockRM(newConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 100 * GB);
// launch an app to queue A, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch an app to queue B, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "b");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// launch an app to queue C, AM container should be launched in nm1
RMApp app3 = rm1.submitApp(2 * GB, "app", "user", null, "c");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
// Each application asks 10 * 5GB containers
am1.allocate("*", 5 * GB, 10, null);
am2.allocate("*", 5 * GB, 10, null);
am3.allocate("*", 5 * GB, 10, null);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp3 =
cs.getApplicationAttempt(am3.getApplicationAttemptId());
// container will be allocated to am1
// App1 will get 2 container allocated (plus AM container)
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
// container will be allocated to am1 again,
// App1 will get 3 container allocated (plus AM container)
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
// (Now usages of queues: a=12G (satisfied), b=2G, c=2G)
// container will be allocated to am2 (since app1 reaches its guaranteed
// capacity)
// App2 will get 2 container allocated (plus AM container)
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
// Do this 3 times
// container will be allocated to am2 (since app1 reaches its guaranteed
// capacity)
// App2 will get 2 container allocated (plus AM container)
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(5, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
// (Now usages of queues: a=12G (satisfied), b=22G (satisfied), c=2G))
// Do this 10 times
for (int i = 0; i < 10; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(5, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(11, schedulerApp3.getLiveContainers().size());
// (Now usages of queues: a=12G (satisfied), b=22G (satisfied),
// c=52G (satisfied and no pending))
// Do this 20 times, we can only allocate 3 containers, 1 to A and 3 to B
for (int i = 0; i < 20; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(6, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(11, schedulerApp3.getLiveContainers().size());
// (Now usages of queues: a=17G (satisfied), b=27G (satisfied), c=52G))
rm1.close();
}
@Test(timeout = 60000)
public void testUserLimitAllocationMultipleContainers() throws Exception {
CapacitySchedulerConfiguration newConf =
(CapacitySchedulerConfiguration) TestUtils
.getConfigurationWithMultipleQueues(conf);
newConf.setUserLimit("root.c", 50);
MockRM rm1 = new MockRM(newConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 1000 * GB);
// launch app from 1st user to queue C, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(2 * GB, "app", "user1", null, "c");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch app from 2nd user to queue C, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(2 * GB, "app", "user2", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// Each application asks 1000 * 5GB containers
am1.allocate("*", 5 * GB, 1000, null);
am1.allocate("h1", 5 * GB, 1000, null);
am1.allocate(NetworkTopology.DEFAULT_RACK, 5 * GB, 1000, null);
// Each application asks 1000 * 5GB containers
am2.allocate("*", 5 * GB, 1000, null);
am2.allocate("h1", 5 * GB, 1000, null);
am2.allocate(NetworkTopology.DEFAULT_RACK, 5 * GB, 1000, null);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// container will be allocated to am1
// App1 will get 2 container allocated (plus AM container)
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(101, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(100, schedulerApp2.getLiveContainers().size());
rm1.close();
}
}