blob: 0d3610c54a62a21d29dfb2bac25ebc8a297279b7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestCapacitySchedulerAsyncScheduling {
private final int GB = 1024;
private YarnConfiguration conf;
RMNodeLabelsManager mgr;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
@Test(timeout = 300000)
public void testSingleThreadAsyncContainerAllocation() throws Exception {
testAsyncContainerAllocation(1);
}
@Test(timeout = 300000)
public void testTwoThreadsAsyncContainerAllocation() throws Exception {
testAsyncContainerAllocation(2);
}
@Test(timeout = 300000)
public void testThreeThreadsAsyncContainerAllocation() throws Exception {
testAsyncContainerAllocation(3);
}
public void testAsyncContainerAllocation(int numThreads) throws Exception {
conf.setInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
numThreads);
conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms", 100);
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
// inject node label manager
MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
List<MockNM> nms = new ArrayList<>();
// Add 10 nodes to the cluster, in the cluster we have 200 GB resource
for (int i = 0; i < 10; i++) {
nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB));
}
List<MockAM> ams = new ArrayList<MockAM>();
// Add 3 applications to the cluster, one app in one queue
// the i-th app ask (20 * i) containers. So in total we will have
// 123G container allocated
int totalAsked = 3 * GB; // 3 AMs
for (int i = 0; i < 3; i++) {
RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
Character.toString((char) (i % 34 + 97)), 1, null, null, false);
MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
am.registerAppAttempt();
ams.add(am);
}
for (int i = 0; i < 3; i++) {
ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<ContainerId>());
totalAsked += 20 * (i + 1) * GB;
}
// Wait for at most 15000 ms
int waitTime = 15000; // ms
while (waitTime > 0) {
if (rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB()
== totalAsked) {
break;
}
Thread.sleep(50);
waitTime -= 50;
}
Assert.assertEquals(
rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(),
totalAsked);
// Wait for another 2 sec to make sure we will not allocate more than
// required
waitTime = 2000; // ms
while (waitTime > 0) {
Assert.assertEquals(
rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(),
totalAsked);
waitTime -= 50;
Thread.sleep(50);
}
rm.close();
}
// Testcase for YARN-6714
@Test (timeout = 30000)
public void testCommitProposalForFailedAppAttempt()
throws Exception {
// disable async-scheduling for simulating complex since scene
Configuration disableAsyncConf = new Configuration(conf);
disableAsyncConf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
// init RM & NMs & Nodes
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
List<MockNM> nmLst = new ArrayList<>();
nmLst.add(nm1);
nmLst.add(nm2);
// init scheduler & nodes
while (
((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
.nodeCount() < 2) {
Thread.sleep(10);
}
Assert.assertEquals(2,
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
.getNodeTracker().nodeCount());
CapacityScheduler scheduler =
(CapacityScheduler) rm.getRMContext().getScheduler();
SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
SchedulerNode sn2 = scheduler.getSchedulerNode(nm2.getNodeId());
// launch app
RMApp app = rm.submitApp(200, "app", "user", null, false, "default",
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
FiCaSchedulerApp schedulerApp =
scheduler.getApplicationAttempt(am.getApplicationAttemptId());
// allocate and launch 1 containers and running on nm2
allocateAndLaunchContainers(am, nm2, rm, 1,
Resources.createResource(5 * GB), 0, 2);
// nm1 runs 1 container(app1-container_01/AM)
// nm2 runs 1 container(app1-container_02)
Assert.assertEquals(1, sn1.getNumContainers());
Assert.assertEquals(1, sn2.getNumContainers());
// kill app attempt1
scheduler.handle(
new AppAttemptRemovedSchedulerEvent(am.getApplicationAttemptId(),
RMAppAttemptState.KILLED, true));
// wait until app attempt1 removed on nm1
while (sn1.getCopiedListOfRunningContainers().size() == 1) {
Thread.sleep(100);
}
// wait until app attempt2 launched on nm1
while (sn1.getCopiedListOfRunningContainers().size() == 0) {
nm1.nodeHeartbeat(true);
Thread.sleep(100);
}
// generate reserved proposal of stopped app attempt
// and it could be committed for async-scheduling
// this kind of proposal should be skipped
Resource reservedResource = Resources.createResource(5 * GB);
Container container = Container.newInstance(
ContainerId.newContainerId(am.getApplicationAttemptId(), 3),
sn2.getNodeID(), sn2.getHttpAddress(), reservedResource,
Priority.newInstance(0), null);
RMContainer rmContainer = new RMContainerImpl(container, SchedulerRequestKey
.create(ResourceRequest
.newInstance(Priority.newInstance(0), "*", reservedResource, 1)),
am.getApplicationAttemptId(), sn2.getNodeID(), "user",
rm.getRMContext());
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> reservedContainer =
new SchedulerContainer<>(
schedulerApp, scheduler.getNode(sn2.getNodeID()), rmContainer, "",
false);
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
reservedForAttempt1Proposal =
new ContainerAllocationProposal<>(
reservedContainer, null, reservedContainer, NodeType.OFF_SWITCH,
NodeType.OFF_SWITCH, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
reservedResource);
List<ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>>
reservedProposals = new ArrayList<>();
reservedProposals.add(reservedForAttempt1Proposal);
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
new ResourceCommitRequest<>(null,
reservedProposals, null);
scheduler.tryCommit(scheduler.getClusterResource(), request);
Assert.assertNull("Outdated proposal should not be accepted!",
sn2.getReservedContainer());
rm.stop();
}
// Testcase for YARN-6678
@Test(timeout = 30000)
public void testCommitOutdatedReservedProposal() throws Exception {
// disable async-scheduling for simulating complex since scene
Configuration disableAsyncConf = new Configuration(conf);
disableAsyncConf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
// init RM & NMs & Nodes
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
// init scheduler nodes
int waitTime = 1000;
while (waitTime > 0 &&
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
.getNodeTracker().nodeCount() < 2) {
waitTime -= 10;
Thread.sleep(10);
}
Assert.assertEquals(2,
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
.getNodeTracker().nodeCount());
YarnScheduler scheduler = rm.getRMContext().getScheduler();
final SchedulerNode sn1 =
((CapacityScheduler) scheduler).getSchedulerNode(nm1.getNodeId());
final SchedulerNode sn2 =
((CapacityScheduler) scheduler).getSchedulerNode(nm2.getNodeId());
// submit app1, am1 is running on nm1
RMApp app = rm.submitApp(200, "app", "user", null, "default");
final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
// submit app2, am2 is running on nm1
RMApp app2 = rm.submitApp(200, "app", "user", null, "default");
final MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
// allocate and launch 2 containers for app1
allocateAndLaunchContainers(am, nm1, rm, 1,
Resources.createResource(5 * GB), 0, 2);
allocateAndLaunchContainers(am, nm2, rm, 1,
Resources.createResource(5 * GB), 0, 3);
// nm1 runs 3 containers(app1-container_01/AM, app1-container_02,
// app2-container_01/AM)
// nm2 runs 1 container(app1-container_03)
Assert.assertEquals(3, sn1.getNumContainers());
Assert.assertEquals(1, sn2.getNumContainers());
// reserve 1 container(app1-container_04) for app1 on nm1
ResourceRequest rr2 = ResourceRequest
.newInstance(Priority.newInstance(0), "*",
Resources.createResource(5 * GB), 1);
am.allocate(Arrays.asList(rr2), null);
nm1.nodeHeartbeat(true);
// wait app1-container_04 reserved on nm1
waitTime = 1000;
while (waitTime > 0 && sn1.getReservedContainer() == null) {
waitTime -= 10;
Thread.sleep(10);
}
Assert.assertNotNull(sn1.getReservedContainer());
final CapacityScheduler cs = (CapacityScheduler) scheduler;
final CapacityScheduler spyCs = Mockito.spy(cs);
final AtomicBoolean isFirstReserve = new AtomicBoolean(true);
final AtomicBoolean isChecked = new AtomicBoolean(false);
// handle CapacityScheduler#tryCommit,
// reproduce the process that can raise IllegalStateException before
Mockito.doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) throws Exception {
ResourceCommitRequest request =
(ResourceCommitRequest) invocation.getArguments()[1];
if (request.getContainersToReserve().size() > 0 && isFirstReserve
.compareAndSet(true, false)) {
// release app1-container_03 on nm2
RMContainer killableContainer =
sn2.getCopiedListOfRunningContainers().get(0);
cs.completedContainer(killableContainer, ContainerStatus
.newInstance(killableContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
Assert.assertEquals(0, sn2.getCopiedListOfRunningContainers().size());
// unreserve app1-container_04 on nm1
// and allocate app1-container_05 on nm2
cs.handle(new NodeUpdateSchedulerEvent(sn2.getRMNode()));
int waitTime = 1000;
while (waitTime > 0
&& sn2.getCopiedListOfRunningContainers().size() == 0) {
waitTime -= 10;
Thread.sleep(10);
}
Assert.assertEquals(1, sn2.getCopiedListOfRunningContainers().size());
Assert.assertNull(sn1.getReservedContainer());
// reserve app2-container_02 on nm1
ResourceRequest rr3 = ResourceRequest
.newInstance(Priority.newInstance(0), "*",
Resources.createResource(5 * GB), 1);
am2.allocate(Arrays.asList(rr3), null);
cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
waitTime = 1000;
while (waitTime > 0 && sn1.getReservedContainer() == null) {
waitTime -= 10;
Thread.sleep(10);
}
Assert.assertNotNull(sn1.getReservedContainer());
// call real apply
try {
cs.tryCommit((Resource) invocation.getArguments()[0],
(ResourceCommitRequest) invocation.getArguments()[1]);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
}
isChecked.set(true);
} else {
cs.tryCommit((Resource) invocation.getArguments()[0],
(ResourceCommitRequest) invocation.getArguments()[1]);
}
return null;
}
}).when(spyCs).tryCommit(Mockito.any(Resource.class),
Mockito.any(ResourceCommitRequest.class));
spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
waitTime = 1000;
while (waitTime > 0 && !isChecked.get()) {
waitTime -= 10;
Thread.sleep(10);
}
rm.stop();
}
@Test(timeout = 30000)
public void testReturnNullWhenGetSchedulerContainer() throws Exception {
// disable async-scheduling for simulating complex scenario
Configuration disableAsyncConf = new Configuration(conf);
disableAsyncConf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
// init RM & NMs
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB);
rm.drainEvents();
final CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
final RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode();
SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
// launch app1-am on nm1
RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default",
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// app2 asks 1 * 1G container
am1.allocate(ImmutableList.of(ResourceRequest
.newInstance(Priority.newInstance(0), "*",
Resources.createResource(1 * GB), 1)), null);
final RMContainer amContainer = cs.getRMContainer(
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1));
// spy CapacityScheduler
final CapacityScheduler spyCs = Mockito.spy(cs);
// hook CapacityScheduler#submitResourceCommitRequest
final List<CSAssignment> assignmentSnapshots = new ArrayList<>();
Mockito.doAnswer(new Answer<Object>() {
public Boolean answer(InvocationOnMock invocation) throws Exception {
CSAssignment assignment = (CSAssignment) invocation.getArguments()[1];
if (cs.getNode(nm1.getNodeId()) != null) {
// decommission nm1 for first allocation on nm1
cs.getRMContext().getDispatcher().getEventHandler().handle(
new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION));
rm.drainEvents();
Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState());
Assert.assertNull(cs.getNode(nm1.getNodeId()));
assignmentSnapshots.add(assignment);
} else {
// add am container on nm1 to containersToKill
// for second allocation on nm2
assignment.setContainersToKill(ImmutableList.of(amContainer));
}
// check no NPE in actual submit, before YARN-8233 will throw NPE
cs.submitResourceCommitRequest((Resource) invocation.getArguments()[0],
assignment);
return false;
}
}).when(spyCs).submitResourceCommitRequest(Mockito.any(Resource.class),
Mockito.any(CSAssignment.class));
// allocation on nm1, test return null when get scheduler container
PlacementSet<FiCaSchedulerNode> candidateNodeSet =
new SimplePlacementSet(sn1);
spyCs.allocateContainersToNode(candidateNodeSet, false);
// make sure unconfirmed resource is decreased correctly
Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
.hasPendingResourceRequest(
rm.getResourceScheduler().getResourceCalculator(),
RMNodeLabelsManager.NO_LABEL,
rm.getResourceScheduler().getClusterResource(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
// allocation on nm2,
// test return null when get scheduler container to release
candidateNodeSet =
new SimplePlacementSet(sn2);
spyCs.allocateContainersToNode(candidateNodeSet, false);
// make sure unconfirmed resource is decreased correctly
Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
.hasPendingResourceRequest(
rm.getResourceScheduler().getResourceCalculator(),
RMNodeLabelsManager.NO_LABEL,
rm.getResourceScheduler().getClusterResource(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
rm.stop();
}
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
int nContainer, Resource resource, int priority, int startContainerId)
throws Exception {
am.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(priority), "*", resource,
nContainer)), null);
ContainerId lastContainerId = ContainerId
.newContainerId(am.getApplicationAttemptId(),
startContainerId + nContainer - 1);
Assert.assertTrue(
rm.waitForState(nm, lastContainerId, RMContainerState.ALLOCATED));
// Acquire them, and NM report RUNNING
am.allocate(null, null);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
for (int cId = startContainerId;
cId < startContainerId + nContainer; cId++) {
ContainerId containerId =
ContainerId.newContainerId(am.getApplicationAttemptId(), cId);
RMContainer rmContainer = cs.getRMContainer(containerId);
if (rmContainer != null) {
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
} else {
Assert.fail("Cannot find RMContainer");
}
rm.waitForState(nm,
ContainerId.newContainerId(am.getApplicationAttemptId(), cId),
RMContainerState.RUNNING);
}
}
}