blob: e7d666ad8ad3ec484afef5056e7c1fbb025bdd0c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Validate system behavior when the am-scheduling logic 'blacklists' a node for
* an application because of AM failures.
*/
public class TestNodeBlacklistingOnAMFailures {
@Before
public void setup() {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.setMiniClusterMode(true);
}
@Test(timeout = 100000)
public void testNodeBlacklistingOnAMFailure() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
true);
MockRM rm = startRM(conf);
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
// Register 5 nodes, so that we can blacklist atleast one if AM container
// is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1.
// First register 2 nodes, and after AM lauched register 3 more nodes.
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
nm2.registerNode();
RMApp app = rm.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
ContainerId amContainerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
MockNM currentNode, otherNode;
if (nodeWhereAMRan.equals(nm1.getNodeId())) {
currentNode = nm1;
otherNode = nm2;
} else {
currentNode = nm2;
otherNode = nm1;
}
// register 3 nodes now
MockNM nm3 =
new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService());
nm3.registerNode();
MockNM nm4 =
new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService());
nm4.registerNode();
MockNM nm5 =
new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService());
nm5.registerNode();
// Set the exist status to INVALID so that we can verify that the system
// automatically blacklisting the node
makeAMContainerExit(rm, amContainerId, currentNode,
ContainerExitStatus.INVALID);
// restart the am
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
// Try the current node a few times
for (int i = 0; i <= 2; i++) {
currentNode.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertEquals(
"AppAttemptState should still be SCHEDULED if currentNode is "
+ "blacklisted correctly", RMAppAttemptState.SCHEDULED,
attempt.getAppAttemptState());
}
// Now try the other node
otherNode.nodeHeartbeat(true);
rm.drainEvents();
// Now the AM container should be allocated
MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
amContainerId =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
rmContainer = scheduler.getRMContainer(amContainerId);
nodeWhereAMRan = rmContainer.getAllocatedNode();
// The other node should now receive the assignment
Assert.assertEquals(
"After blacklisting, AM should have run on the other node",
otherNode.getNodeId(), nodeWhereAMRan);
am2.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
List<Container> allocatedContainers =
TestAMRestart.allocateContainers(currentNode, am2, 1);
Assert.assertEquals(
"Even though AM is blacklisted from the node, application can "
+ "still allocate non-AM containers there",
currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
}
@Test(timeout = 100000)
public void testNodeBlacklistingOnAMFailureStrictNodeLocality()
throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
true);
MockRM rm = startRM(conf);
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
// Register 5 nodes, so that we can blacklist atleast one if AM container
// is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1.
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
nm2.registerNode();
MockNM nm3 =
new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService());
nm3.registerNode();
MockNM nm4 =
new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService());
nm4.registerNode();
MockNM nm5 =
new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService());
nm5.registerNode();
// Specify a strict locality on nm2
List<ResourceRequest> reqs = new ArrayList<>();
ResourceRequest nodeReq = ResourceRequest.newInstance(
Priority.newInstance(0), nm2.getNodeId().getHost(),
Resource.newInstance(200, 1), 1, true);
ResourceRequest rackReq = ResourceRequest.newInstance(
Priority.newInstance(0), "/default-rack",
Resource.newInstance(200, 1), 1, false);
ResourceRequest anyReq = ResourceRequest.newInstance(
Priority.newInstance(0), ResourceRequest.ANY,
Resource.newInstance(200, 1), 1, false);
reqs.add(anyReq);
reqs.add(rackReq);
reqs.add(nodeReq);
RMApp app = rm.submitApp(reqs);
MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2);
ContainerId amContainerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
// Set the exist status to INVALID so that we can verify that the system
// automatically blacklisting the node
makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID);
// restart the am
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
nm2.nodeHeartbeat(true);
rm.drainEvents();
// Now the AM container should be allocated
MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
amContainerId =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
rmContainer = scheduler.getRMContainer(amContainerId);
nodeWhereAMRan = rmContainer.getAllocatedNode();
// The second AM should be on the same node because the strict locality
// made the eligible nodes only 1, so the blacklisting threshold kicked in
System.out.println("AM ran on " + nodeWhereAMRan);
Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
am2.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
}
@Test(timeout = 100000)
public void testNodeBlacklistingOnAMFailureRelaxedNodeLocality()
throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
true);
MockRM rm = startRM(conf);
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
// Register 5 nodes, so that we can blacklist atleast one if AM container
// is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1.
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
nm2.registerNode();
MockNM nm3 =
new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService());
nm3.registerNode();
MockNM nm4 =
new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService());
nm4.registerNode();
MockNM nm5 =
new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService());
nm5.registerNode();
// Specify a relaxed locality on nm2
List<ResourceRequest> reqs = new ArrayList<>();
ResourceRequest nodeReq = ResourceRequest.newInstance(
Priority.newInstance(0), nm2.getNodeId().getHost(),
Resource.newInstance(200, 1), 1, true);
ResourceRequest rackReq = ResourceRequest.newInstance(
Priority.newInstance(0), "/default-rack",
Resource.newInstance(200, 1), 1, true);
ResourceRequest anyReq = ResourceRequest.newInstance(
Priority.newInstance(0), ResourceRequest.ANY,
Resource.newInstance(200, 1), 1, true);
reqs.add(anyReq);
reqs.add(rackReq);
reqs.add(nodeReq);
RMApp app = rm.submitApp(reqs);
MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2);
ContainerId amContainerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
// Set the exist status to INVALID so that we can verify that the system
// automatically blacklisting the node
makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID);
// restart the am
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
nm2.nodeHeartbeat(true);
nm1.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
nm5.nodeHeartbeat(true);
rm.drainEvents();
// Now the AM container should be allocated
MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
amContainerId =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
rmContainer = scheduler.getRMContainer(amContainerId);
nodeWhereAMRan = rmContainer.getAllocatedNode();
// The second AM should be on a different node because the relaxed locality
// made the app schedulable on other nodes and nm2 is blacklisted
System.out.println("AM ran on " + nodeWhereAMRan);
Assert.assertNotEquals(nm2.getNodeId(), nodeWhereAMRan);
am2.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
}
@Test(timeout = 100000)
public void testNoBlacklistingForNonSystemErrors() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
true);
// disable the float so it is possible to blacklist the entire cluster
conf.setFloat(
YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
1.5f);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 100);
MockRM rm = startRM(conf);
MockNM node =
new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
node.registerNode();
RMApp app = rm.submitApp(200);
ApplicationId appId = app.getApplicationId();
int numAppAttempts = 1;
// Now the AM container should be allocated
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
node.nodeHeartbeat(true);
((AbstractYarnScheduler)rm.getResourceScheduler()).update();
rm.drainEvents();
MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, numAppAttempts);
ContainerId amContainerId = ContainerId.newContainerId(appAttemptId, 1);
for (int containerExitStatus : new int[] {
ContainerExitStatus.PREEMPTED,
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
// ContainerExitStatus.KILLED_BY_APPMASTER,
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
ContainerExitStatus.ABORTED, ContainerExitStatus.DISKS_FAILED,
ContainerExitStatus.KILLED_EXCEEDED_VMEM,
ContainerExitStatus.KILLED_EXCEEDED_PMEM }) {
// Set the exist status to be containerExitStatus so that we can verify
// that the system automatically blacklisting the node
makeAMContainerExit(rm, amContainerId, node, containerExitStatus);
// restart the am
attempt = MockRM.waitForAttemptScheduled(app, rm);
System.out
.println("New AppAttempt launched " + attempt.getAppAttemptId());
node.nodeHeartbeat(true);
((AbstractYarnScheduler)rm.getResourceScheduler()).update();
rm.drainEvents();
MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
numAppAttempts++;
appAttemptId = ApplicationAttemptId.newInstance(appId, numAppAttempts);
amContainerId = ContainerId.newContainerId(appAttemptId, 1);
rm.waitForState(node, amContainerId, RMContainerState.ACQUIRED);
}
}
private void makeAMContainerExit(MockRM rm, ContainerId amContainer,
MockNM node, int exitStatus) throws Exception, InterruptedException {
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
"", exitStatus, Resources.createResource(200));
node.containerStatus(containerStatus);
ApplicationAttemptId amAttemptID = amContainer.getApplicationAttemptId();
rm.waitForState(amAttemptID, RMAppAttemptState.FAILED);
rm.waitForState(amAttemptID.getApplicationId(), RMAppState.ACCEPTED);
}
private MockRM startRM(YarnConfiguration conf) {
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MockRM rm = new MockRM(conf);
rm.start();
return rm;
}
}