blob: a9d36b09803ae2695ccf37091c9b365eba19562d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@SuppressWarnings("unchecked")
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
public TestAbstractYarnScheduler(SchedulerType type) throws IOException {
super(type);
}
@Test
public void testMaximimumAllocationMemory() throws Exception {
final int node1MaxMemory = 15 * 1024;
final int node2MaxMemory = 5 * 1024;
final int node3MaxMemory = 6 * 1024;
final int configuredMaxMemory = 10 * 1024;
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
1000 * 1000);
MockRM rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationMemoryHelper(
rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
} finally {
rm.stop();
}
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationMemoryHelper(
rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
node2MaxMemory, node3MaxMemory, node2MaxMemory);
} finally {
rm.stop();
}
}
private void testMaximumAllocationMemoryHelper(
YarnScheduler scheduler,
final int node1MaxMemory, final int node2MaxMemory,
final int node3MaxMemory, final int... expectedMaxMemory)
throws Exception {
Assert.assertEquals(6, expectedMaxMemory.length);
Assert.assertEquals(0, scheduler.getNumClusterNodes());
long maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
Assert.assertEquals(expectedMaxMemory[0], maxMemory);
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(node1MaxMemory), 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
Assert.assertEquals(expectedMaxMemory[1], maxMemory);
scheduler.handle(new NodeRemovedSchedulerEvent(node1));
Assert.assertEquals(0, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
Assert.assertEquals(expectedMaxMemory[2], maxMemory);
RMNode node2 = MockNodes.newNodeInfo(
0, Resources.createResource(node2MaxMemory), 2, "127.0.0.3");
scheduler.handle(new NodeAddedSchedulerEvent(node2));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
Assert.assertEquals(expectedMaxMemory[3], maxMemory);
RMNode node3 = MockNodes.newNodeInfo(
0, Resources.createResource(node3MaxMemory), 3, "127.0.0.4");
scheduler.handle(new NodeAddedSchedulerEvent(node3));
Assert.assertEquals(2, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
Assert.assertEquals(expectedMaxMemory[4], maxMemory);
scheduler.handle(new NodeRemovedSchedulerEvent(node3));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
Assert.assertEquals(expectedMaxMemory[5], maxMemory);
scheduler.handle(new NodeRemovedSchedulerEvent(node2));
Assert.assertEquals(0, scheduler.getNumClusterNodes());
}
@Test
public void testMaximimumAllocationVCores() throws Exception {
final int node1MaxVCores = 15;
final int node2MaxVCores = 5;
final int node3MaxVCores = 6;
final int configuredMaxVCores = 10;
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
1000 * 1000);
MockRM rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationVCoresHelper(
rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores);
} finally {
rm.stop();
}
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationVCoresHelper(
rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
node2MaxVCores, node3MaxVCores, node2MaxVCores);
} finally {
rm.stop();
}
}
private void testMaximumAllocationVCoresHelper(
YarnScheduler scheduler,
final int node1MaxVCores, final int node2MaxVCores,
final int node3MaxVCores, final int... expectedMaxVCores)
throws Exception {
Assert.assertEquals(6, expectedMaxVCores.length);
Assert.assertEquals(0, scheduler.getNumClusterNodes());
int maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[0], maxVCores);
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(1024, node1MaxVCores), 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[1], maxVCores);
scheduler.handle(new NodeRemovedSchedulerEvent(node1));
Assert.assertEquals(0, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[2], maxVCores);
RMNode node2 = MockNodes.newNodeInfo(
0, Resources.createResource(1024, node2MaxVCores), 2, "127.0.0.3");
scheduler.handle(new NodeAddedSchedulerEvent(node2));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[3], maxVCores);
RMNode node3 = MockNodes.newNodeInfo(
0, Resources.createResource(1024, node3MaxVCores), 3, "127.0.0.4");
scheduler.handle(new NodeAddedSchedulerEvent(node3));
Assert.assertEquals(2, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[4], maxVCores);
scheduler.handle(new NodeRemovedSchedulerEvent(node3));
Assert.assertEquals(1, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
Assert.assertEquals(expectedMaxVCores[5], maxVCores);
scheduler.handle(new NodeRemovedSchedulerEvent(node2));
Assert.assertEquals(0, scheduler.getNumClusterNodes());
}
@Test
public void testUpdateMaxAllocationUsesTotal() throws IOException {
final int configuredMaxVCores = 20;
final int configuredMaxMemory = 10 * 1024;
Resource configuredMaximumResource = Resource.newInstance
(configuredMaxMemory, configuredMaxVCores);
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
MockRM rm = new MockRM(conf);
try {
rm.start();
AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm
.getResourceScheduler();
Resource emptyResource = Resource.newInstance(0, 0);
Resource fullResource1 = Resource.newInstance(1024, 5);
Resource fullResource2 = Resource.newInstance(2048, 10);
SchedulerNode mockNode1 = mock(SchedulerNode.class);
when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080));
when(mockNode1.getUnallocatedResource()).thenReturn(emptyResource);
when(mockNode1.getTotalResource()).thenReturn(fullResource1);
SchedulerNode mockNode2 = mock(SchedulerNode.class);
when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("bar", 8081));
when(mockNode2.getUnallocatedResource()).thenReturn(emptyResource);
when(mockNode2.getTotalResource()).thenReturn(fullResource2);
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
scheduler.nodeTracker.addNode(mockNode1);
verifyMaximumResourceCapability(fullResource1, scheduler);
scheduler.nodeTracker.addNode(mockNode2);
verifyMaximumResourceCapability(fullResource2, scheduler);
scheduler.nodeTracker.removeNode(mockNode2.getNodeID());
verifyMaximumResourceCapability(fullResource1, scheduler);
scheduler.nodeTracker.removeNode(mockNode1.getNodeID());
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
} finally {
rm.stop();
}
}
@Test
public void testMaxAllocationAfterUpdateNodeResource() throws IOException {
final int configuredMaxVCores = 20;
final int configuredMaxMemory = 10 * 1024;
Resource configuredMaximumResource = Resource.newInstance
(configuredMaxMemory, configuredMaxVCores);
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
MockRM rm = new MockRM(conf);
try {
rm.start();
AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm
.getResourceScheduler();
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
Resource resource1 = Resource.newInstance(2048, 5);
Resource resource2 = Resource.newInstance(4096, 10);
Resource resource3 = Resource.newInstance(512, 1);
Resource resource4 = Resource.newInstance(1024, 2);
RMNode node1 = MockNodes.newNodeInfo(
0, resource1, 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
RMNode node2 = MockNodes.newNodeInfo(
0, resource3, 2, "127.0.0.3");
scheduler.handle(new NodeAddedSchedulerEvent(node2));
verifyMaximumResourceCapability(resource1, scheduler);
// increase node1 resource
scheduler.updateNodeResource(node1, ResourceOption.newInstance(
resource2, 0));
verifyMaximumResourceCapability(resource2, scheduler);
// decrease node1 resource
scheduler.updateNodeResource(node1, ResourceOption.newInstance(
resource1, 0));
verifyMaximumResourceCapability(resource1, scheduler);
// increase node2 resource
scheduler.updateNodeResource(node2, ResourceOption.newInstance(
resource4, 0));
verifyMaximumResourceCapability(resource1, scheduler);
// decrease node2 resource
scheduler.updateNodeResource(node2, ResourceOption.newInstance(
resource3, 0));
verifyMaximumResourceCapability(resource1, scheduler);
} finally {
rm.stop();
}
}
/*
* This test case is to test the pending containers are cleared from the
* attempt even if one of the application in the list have current attempt as
* null (no attempt).
*/
@SuppressWarnings({ "rawtypes" })
@Test(timeout = 10000)
public void testReleasedContainerIfAppAttemptisNull() throws Exception {
YarnConfiguration conf=getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm1.getResourceScheduler();
// Mock App without attempt
RMApp mockAPp =
new MockRMApp(125, System.currentTimeMillis(), RMAppState.NEW);
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(null, mockAPp.getUser());
// Second app with one app attempt
RMApp app = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app, rm1, nm1);
final ContainerId runningContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
am1.allocate(null, Arrays.asList(runningContainer));
Map schedulerApplications = scheduler.getSchedulerApplications();
SchedulerApplication schedulerApp =
(SchedulerApplication) scheduler.getSchedulerApplications().get(
app.getApplicationId());
schedulerApplications.put(mockAPp.getApplicationId(), application);
scheduler.clearPendingContainerCache();
Assert.assertEquals("Pending containers are not released "
+ "when one of the application attempt is null !", schedulerApp
.getCurrentAppAttempt().getPendingRelease().size(), 0);
} finally {
if (rm1 != null) {
rm1.stop();
}
}
}
@Test(timeout=60000)
public void testContainerReleasedByNode() throws Exception {
System.out.println("Starting testContainerReleasedByNode");
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default",
-1, null, "Test", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// allocate a container that fills more than half the node
am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// release the container from the AM
ContainerId cid = containers.get(0).getId();
List<ContainerId> releasedContainers = new ArrayList<>(1);
releasedContainers.add(cid);
List<ContainerStatus> completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container can be allocated immediately because container
// never launched on the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// launch the container on the node
cid = containers.get(0).getId();
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.RUNNING);
rm1.waitForState(nm1, cid, RMContainerState.RUNNING);
// release the container from the AM
releasedContainers.clear();
releasedContainers.add(cid);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container cannot be allocated immediately because container
// has not been released by the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
Assert.assertTrue("new container allocated before node freed old",
containers.isEmpty());
for (int i = 0; i < 10; ++i) {
Thread.sleep(10);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
Assert.assertTrue("new container allocated before node freed old",
containers.isEmpty());
}
// free the old container from the node
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.COMPLETE);
// verify new container is now allocated
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
} finally {
rm1.stop();
System.out.println("Stopping testContainerReleasedByNode");
}
}
@Test(timeout = 60000)
public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
throws Exception {
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default",
-1, null, "Test", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.0.0.1:2351", 10240, rm1.getResourceTrackerService());
nm2.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 1;
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm1.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// 3rd container is in Allocated state.
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm2.nodeHeartbeat(true);
ContainerId containerId3 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
rm1.waitForState(nm2, containerId3, RMContainerState.ALLOCATED);
// NodeManager restart
nm2.registerNode();
// NM restart kills all allocated and running containers.
rm1.waitForState(nm2, containerId3, RMContainerState.KILLED);
// The killed RMContainer request should be restored. In successive
// nodeHeartBeats AM should be able to get container allocated.
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm2.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 4,
ContainerState.RUNNING);
ContainerId containerId4 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 4);
rm1.waitForState(nm2, containerId4, RMContainerState.RUNNING);
} finally {
rm1.stop();
}
}
/**
* Test to verify that ResourceRequests recovery back to the right app-attempt
* after a container gets killed at ACQUIRED state: YARN-4502.
*
* @throws Exception
*/
@Test
public void testResourceRequestRecoveryToTheRightAppAttempt()
throws Exception {
YarnConfiguration conf = getConf();
MockRM rm = new MockRM(conf);
try {
rm.start();
RMApp rmApp =
rm.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null, "Test", false, true);
MockNM node =
new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService());
node.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(rmApp, rm, node);
ApplicationAttemptId applicationAttemptOneID =
am1.getApplicationAttemptId();
ContainerId am1ContainerID =
ContainerId.newContainerId(applicationAttemptOneID, 1);
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
node.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != 1) {
node.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
// launch a 2nd container, for testing running-containers transfer.
node.nodeHeartbeat(applicationAttemptOneID, 2, ContainerState.RUNNING);
ContainerId runningContainerID =
ContainerId.newContainerId(applicationAttemptOneID, 2);
rm.waitForState(node, runningContainerID, RMContainerState.RUNNING);
// 3rd container is in Allocated state.
int ALLOCATED_CONTAINER_PRIORITY = 1047;
am1.allocate("127.0.0.1", 1024, 1, ALLOCATED_CONTAINER_PRIORITY,
new ArrayList<ContainerId>(), null);
node.nodeHeartbeat(true);
ContainerId allocatedContainerID =
ContainerId.newContainerId(applicationAttemptOneID, 3);
rm.waitForState(node, allocatedContainerID, RMContainerState.ALLOCATED);
RMContainer allocatedContainer =
rm.getResourceScheduler().getRMContainer(allocatedContainerID);
// Capture scheduler app-attempt before AM crash.
SchedulerApplicationAttempt firstSchedulerAppAttempt =
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
.getResourceScheduler())
.getApplicationAttempt(applicationAttemptOneID);
// AM crashes, and a new app-attempt gets created
node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE);
rm.drainEvents();
RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm);
ApplicationAttemptId applicationAttemptTwoID =
rmAppAttempt2.getAppAttemptId();
Assert.assertEquals(2, applicationAttemptTwoID.getAttemptId());
// All outstanding allocated containers will be killed (irrespective of
// keep-alive of container across app-attempts)
Assert.assertEquals(RMContainerState.KILLED,
allocatedContainer.getState());
// The core part of this test
// The killed containers' ResourceRequests are recovered back to the
// original app-attempt, not the new one
for (SchedulerRequestKey key : firstSchedulerAppAttempt.getSchedulerKeys()) {
if (key.getPriority().getPriority() == 0) {
Assert.assertEquals(0,
firstSchedulerAppAttempt.getOutstandingAsksCount(key));
} else if (key.getPriority().getPriority() ==
ALLOCATED_CONTAINER_PRIORITY) {
Assert.assertEquals(1,
firstSchedulerAppAttempt.getOutstandingAsksCount(key));
}
}
// Also, only one running container should be transferred after AM
// launches
MockRM.launchAM(rmApp, rm, node);
List<Container> transferredContainers =
rm.getResourceScheduler().getTransferredContainers(
applicationAttemptTwoID);
Assert.assertEquals(1, transferredContainers.size());
Assert.assertEquals(runningContainerID, transferredContainers.get(0)
.getId());
} finally {
rm.stop();
}
}
private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, YarnScheduler scheduler) {
final Resource schedulerMaximumResourceCapability = scheduler
.getMaximumResourceCapability();
Assert.assertEquals(expectedMaximumResource.getMemorySize(),
schedulerMaximumResourceCapability.getMemorySize());
Assert.assertEquals(expectedMaximumResource.getVirtualCores(),
schedulerMaximumResourceCapability.getVirtualCores());
}
private class SleepHandler implements EventHandler<SchedulerEvent> {
boolean sleepFlag = false;
int sleepTime = 20;
@Override
public void handle(SchedulerEvent event) {
try {
if (sleepFlag) {
Thread.sleep(sleepTime);
}
} catch(InterruptedException ie) {
}
}
}
private ResourceTrackerService getPrivateResourceTrackerService(
Dispatcher privateDispatcher, ResourceManager rm,
SleepHandler sleepHandler) {
Configuration conf = getConf();
RMContext privateContext =
new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
null, null, null);
privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
privateDispatcher.register(SchedulerEventType.class, sleepHandler);
privateDispatcher.register(SchedulerEventType.class,
rm.getResourceScheduler());
privateDispatcher.register(RMNodeEventType.class,
new ResourceManager.NodeEventDispatcher(privateContext));
((Service) privateDispatcher).init(conf);
((Service) privateDispatcher).start();
NMLivelinessMonitor nmLivelinessMonitor =
new NMLivelinessMonitor(privateDispatcher);
nmLivelinessMonitor.init(conf);
nmLivelinessMonitor.start();
NodesListManager nodesListManager = new NodesListManager(privateContext);
nodesListManager.init(conf);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.start();
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.start();
ResourceTrackerService privateResourceTrackerService =
new ResourceTrackerService(privateContext, nodesListManager,
nmLivelinessMonitor, containerTokenSecretManager,
nmTokenSecretManager);
privateResourceTrackerService.init(conf);
privateResourceTrackerService.start();
rm.getResourceScheduler().setRMContext(privateContext);
return privateResourceTrackerService;
}
/**
* Test the behavior of the scheduler when a node reconnects
* with changed capabilities. This test is to catch any race conditions
* that might occur due to the use of the RMNode object.
* @throws Exception
*/
@Test(timeout = 60000)
public void testNodemanagerReconnect() throws Exception {
Configuration conf = getConf();
MockRM rm = new MockRM(conf);
try {
rm.start();
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
DrainDispatcher privateDispatcher = new DrainDispatcher();
SleepHandler sleepHandler = new SleepHandler();
ResourceTrackerService privateResourceTrackerService =
getPrivateResourceTrackerService(privateDispatcher, rm, sleepHandler);
// Register node1
String hostname1 = "localhost1";
Resource capability = BuilderUtils.newResource(4096, 4);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
RegisterNodeManagerRequest request1 =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
request1.setNodeId(nodeId1);
request1.setHttpPort(0);
request1.setResource(capability);
privateResourceTrackerService.registerNodeManager(request1);
privateDispatcher.await();
Resource clusterResource =
rm.getResourceScheduler().getClusterResource();
Assert.assertEquals("Initial cluster resources don't match", capability,
clusterResource);
Resource newCapability = BuilderUtils.newResource(1024, 1);
RegisterNodeManagerRequest request2 =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request2.setNodeId(nodeId1);
request2.setHttpPort(0);
request2.setResource(newCapability);
// hold up the disaptcher and register the same node with lower capability
sleepHandler.sleepFlag = true;
privateResourceTrackerService.registerNodeManager(request2);
privateDispatcher.await();
Assert.assertEquals("Cluster resources don't match", newCapability,
rm.getResourceScheduler().getClusterResource());
privateResourceTrackerService.stop();
} finally {
rm.stop();
}
}
@Test(timeout = 10000)
public void testUpdateThreadLifeCycle() throws Exception {
MockRM rm = new MockRM(getConf());
try {
rm.start();
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm.getResourceScheduler();
if (getSchedulerType().equals(SchedulerType.FAIR)) {
Thread updateThread = scheduler.updateThread;
Assert.assertTrue(updateThread.isAlive());
scheduler.stop();
int numRetries = 100;
while (numRetries-- > 0 && updateThread.isAlive()) {
Thread.sleep(50);
}
Assert.assertNotEquals("The Update thread is still alive", 0, numRetries);
} else if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
Assert.assertNull("updateThread shouldn't have been created",
scheduler.updateThread);
} else {
Assert.fail("Unhandled SchedulerType, " + getSchedulerType() +
", please update this unit test.");
}
} finally {
rm.stop();
}
}
@Test(timeout=60000)
public void testContainerRecoveredByNode() throws Exception {
System.out.println("Starting testContainerRecoveredByNode");
final int maxMemory = 10 * 1024;
YarnConfiguration conf = getConf();
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.set(
YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default",
-1, null, "Test", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
YarnScheduler scheduler = rm1.getResourceScheduler();
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(maxMemory), 1, "127.0.0.2");
ContainerId containerId = ContainerId.newContainerId(
app1.getCurrentAppAttempt().getAppAttemptId(), 2);
NMContainerStatus containerReport =
NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING,
Resource.newInstance(1024, 1), "recover container", 0,
Priority.newInstance(0), 0);
List<NMContainerStatus> containerReports = new ArrayList<>();
containerReports.add(containerReport);
scheduler.handle(new NodeAddedSchedulerEvent(node1, containerReports));
RMContainer rmContainer = scheduler.getRMContainer(containerId);
//verify queue name when rmContainer is recovered
Assert.assertEquals(app1.getQueue(), rmContainer.getQueueName());
} finally {
rm1.stop();
System.out.println("Stopping testContainerRecoveredByNode");
}
}
}