blob: b4ebd15ccde9064562dca0e84bcc0e72fae89e24 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
public class TestCapacitySchedulerNodeLabelUpdate {
private final int GB = 1024;
private YarnConfiguration conf;
RMNodeLabelsManager mgr;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
private Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 100);
conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z"));
conf.setCapacityByLabel(A, "x", 100);
conf.setCapacityByLabel(A, "y", 100);
conf.setCapacityByLabel(A, "z", 100);
return conf;
}
private Configuration getConfigurationWithSubQueueLabels(
Configuration config) {
CapacitySchedulerConfiguration conf2 =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf2.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a", "b"});
conf2.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf2.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
final String a = CapacitySchedulerConfiguration.ROOT + ".a";
final String b = CapacitySchedulerConfiguration.ROOT + ".b";
final String aa1 = a + ".a1";
final String aa2 = a + ".a2";
final String aa3 = a + ".a3";
final String aa4 = a + ".a4";
conf2.setQueues(a, new String[] {"a1", "a2", "a3", "a4"});
conf2.setCapacity(a, 50);
conf2.setCapacity(b, 50);
conf2.setCapacity(aa1, 40);
conf2.setCapacity(aa2, 20);
conf2.setCapacity(aa3, 20);
conf2.setCapacity(aa4, 20);
conf2.setAccessibleNodeLabels(a, ImmutableSet.of("x", "y", "z"));
conf2.setAccessibleNodeLabels(aa1, ImmutableSet.of("x", "y"));
conf2.setAccessibleNodeLabels(aa2, ImmutableSet.of("y"));
conf2.setAccessibleNodeLabels(aa3, ImmutableSet.of("x", "y", "z"));
conf2.setAccessibleNodeLabels(aa4, ImmutableSet.of("x", "y"));
conf2.setCapacityByLabel(a, "x", 50);
conf2.setCapacityByLabel(a, "y", 50);
conf2.setCapacityByLabel(a, "z", 50);
conf2.setCapacityByLabel(b, "x", 50);
conf2.setCapacityByLabel(b, "y", 50);
conf2.setCapacityByLabel(b, "z", 50);
conf2.setCapacityByLabel(aa1, "x", 50);
conf2.setCapacityByLabel(aa3, "x", 25);
conf2.setCapacityByLabel(aa4, "x", 25);
conf2.setCapacityByLabel(aa1, "y", 25);
conf2.setCapacityByLabel(aa2, "y", 25);
conf2.setCapacityByLabel(aa4, "y", 50);
conf2.setCapacityByLabel(aa3, "z", 50);
conf2.setCapacityByLabel(aa4, "z", 50);
return conf2;
}
private Set<String> toSet(String... elements) {
Set<String> set = Sets.newHashSet(elements);
return set;
}
private void checkUsedResource(MockRM rm, String queueName, int memory) {
checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
}
private void checkAMUsedResource(MockRM rm, String queueName, int memory) {
checkAMUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
}
private void checkUsedCapacity(MockRM rm, String queueName, int capacity,
int total) {
checkUsedCapacity(rm, queueName, capacity, total,
RMNodeLabelsManager.NO_LABEL);
}
private void checkUsedResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queueName);
Assert.assertEquals(memory, queue.getQueueResourceUsage().getUsed(label)
.getMemorySize());
}
private void checkUsedCapacity(MockRM rm, String queueName, int capacity,
int total, String label) {
float epsillon = 0.0001f;
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queueName);
Assert.assertEquals((float)capacity/total,
queue.getQueueCapacities().getUsedCapacity(label), epsillon);
}
private void checkAMUsedResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queueName);
Assert.assertEquals(memory, queue.getQueueResourceUsage().getAMUsed(label)
.getMemorySize());
}
private void checkUserUsedResource(MockRM rm, String queueName,
String userName, String partition, int memory) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
UsersManager.User user = queue.getUser(userName);
Assert.assertEquals(memory,
user.getResourceUsage().getUsed(partition).getMemorySize());
}
@Test(timeout = 60000)
public void testRequestContainerAfterNodePartitionUpdated()
throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
"z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 2048);
MockNM nm2 = rm.registerNode("h2:1234", 2048);
MockNM nm3 = rm.registerNode("h3:1234", 2048);
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm.submitApp(GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
ApplicationResourceUsageReport appResourceUsageReport =
rm.getResourceScheduler().getAppResourceUsageReport(
am1.getApplicationAttemptId());
Assert.assertEquals(1024, appResourceUsageReport.getUsedResources()
.getMemorySize());
Assert.assertEquals(1, appResourceUsageReport.getUsedResources()
.getVirtualCores());
// request a container.
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm.waitForState(nm1, containerId, RMContainerState.ALLOCATED);
appResourceUsageReport =
rm.getResourceScheduler().getAppResourceUsageReport(
am1.getApplicationAttemptId());
Assert.assertEquals(2048, appResourceUsageReport.getUsedResources()
.getMemorySize());
Assert.assertEquals(2, appResourceUsageReport.getUsedResources()
.getVirtualCores());
LeafQueue queue =
(LeafQueue) ((CapacityScheduler) rm.getResourceScheduler())
.getQueue("a");
ArrayList<UserInfo> users = queue.getUsersManager().getUsersInfo();
for (UserInfo userInfo : users) {
if (userInfo.getUsername().equals("user")) {
ResourceInfo resourcesUsed = userInfo.getResourcesUsed();
Assert.assertEquals(2048, resourcesUsed.getMemorySize());
Assert.assertEquals(2, resourcesUsed.getvCores());
}
}
rm.stop();
}
@Test
public void testResourceUsageWhenNodeUpdatesPartition()
throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 8000);
MockNM nm2 = rm.registerNode("h2:1234", 8000);
MockNM nm3 = rm.registerNode("h3:1234", 8000);
ContainerId containerId1;
ContainerId containerId2;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm.submitApp(GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
// request a container.
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId2,
RMContainerState.ALLOCATED));
// check used resource:
// queue-a used x=1G, ""=1G
checkUsedResource(rm, "a", 1024, "x");
checkUsedResource(rm, "a", 1024);
checkUsedCapacity(rm, "a", 1024, 8000, "x");
checkUsedCapacity(rm, "a", 1024, 8000);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
// change h1's label to z
mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("z")));
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
toSet("z"))));
Thread.sleep(100);
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 1024, "z");
checkUsedResource(rm, "a", 1024);
checkUsedCapacity(rm, "a", 0, 8000, "x");
checkUsedCapacity(rm, "a", 1024, 8000, "z");
checkUsedCapacity(rm, "a", 1024, 8000);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 1024, "z");
checkUsedResource(rm, "root", 1024);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "z", 1024);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(1024,
app.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
// change h1's label to y
mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("y")));
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
toSet("y"))));
Thread.sleep(100);
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 1024, "y");
checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 1024);
checkUsedCapacity(rm, "a", 0, 8000, "x");
checkUsedCapacity(rm, "a", 1024, 16000, "y");
checkUsedCapacity(rm, "a", 0, 8000, "z");
checkUsedCapacity(rm, "a", 1024, 8000);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 1024, "y");
checkUsedResource(rm, "root", 0, "z");
checkUsedResource(rm, "root", 1024);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "y", 1024);
checkUserUsedResource(rm, "a", "user", "z", 0);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(1024,
app.getAppAttemptResourceUsage().getUsed("y").getMemorySize());
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
// change h1's label to no label
Set<String> emptyLabels = new HashSet<>();
Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
emptyLabels);
mgr.replaceLabelsOnNode(map);
cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
Thread.sleep(100);
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 2048);
checkUsedCapacity(rm, "a", 0, 8000, "x");
checkUsedCapacity(rm, "a", 0, 8000, "y");
checkUsedCapacity(rm, "a", 0, 8000, "z");
checkUsedCapacity(rm, "a", 2048, 16000);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 0, "y");
checkUsedResource(rm, "root", 0, "z");
checkUsedResource(rm, "root", 2048);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "y", 0);
checkUserUsedResource(rm, "a", "user", "z", 0);
checkUserUsedResource(rm, "a", "user", "", 2048);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("y").getMemorySize());
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
Assert.assertEquals(2048,
app.getAppAttemptResourceUsage().getUsed("").getMemorySize());
// Finish the two containers, we should see used resource becomes 0
cs.completedContainer(cs.getRMContainer(containerId2),
ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
cs.completedContainer(cs.getRMContainer(containerId1),
ContainerStatus.newInstance(containerId1, ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 0);
checkUsedCapacity(rm, "a", 0, 8000, "x");
checkUsedCapacity(rm, "a", 0, 8000, "y");
checkUsedCapacity(rm, "a", 0, 8000, "z");
checkUsedCapacity(rm, "a", 0, 16000);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 0, "y");
checkUsedResource(rm, "root", 0, "z");
checkUsedResource(rm, "root", 0);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "y", 0);
checkUserUsedResource(rm, "a", "user", "z", 0);
checkUserUsedResource(rm, "a", "user", "", 0);
rm.close();
}
@Test(timeout = 300000)
public void testMoveApplicationWithLabel() throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(
ImmutableSet.of("x", "y", "z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("z")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithSubQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 4096 * 2);
MockNM nm2 = rm.registerNode("h2:1234", 4096 * 2);
MockNM nm3 = rm.registerNode("h3:1234", 4096 * 2);
MockNM nm4 = rm.registerNode("h4:1234", 4096 * 2);
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm.submitApp(GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
ContainerId container1 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm.waitForState(nm1, container1, RMContainerState.ALLOCATED, 10 * 1000);
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
ContainerId container2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
rm.waitForState(nm2, container2, RMContainerState.ALLOCATED, 10 * 1000);
CapacityScheduler scheduler =
((CapacityScheduler) rm.getResourceScheduler());
try {
scheduler.preValidateMoveApplication(app1.getApplicationId(), "a2");
scheduler.moveApplication(app1.getApplicationId(), "a2");
fail("Should throw exception since target queue doesnt have "
+ "required labels");
} catch (Exception e) {
Assert.assertTrue("Yarn Exception should be thrown",
e instanceof YarnException);
Assert.assertEquals("Specified queue=a2 can't satisfy "
+ "following apps label expressions =[x] accessible "
+ "node labels =[y]", e.getMessage());
}
try {
scheduler.moveApplication(app1.getApplicationId(), "a3");
scheduler.moveApplication(app1.getApplicationId(), "a4");
// Check move to queue with accessible label ANY
scheduler.moveApplication(app1.getApplicationId(), "b");
} catch (Exception e) {
fail("Should not throw exception since target queue has "
+ "required labels");
}
rm.stop();
}
@Test (timeout = 60000)
public void testComplexResourceUsageWhenNodeUpdatesPartition()
throws Exception {
/*
* This test is similar to testResourceUsageWhenNodeUpdatesPartition, this
* will include multiple applications, multiple users and multiple
* containers running on a single node, size of each container is 1G
*
* Node 1
* ------
* App1-container3
* App2-container2
* App2-Container3
*
* Node 2
* ------
* App2-container1
* App1-container1
* App1-container2
*/
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 80000);
MockNM nm2 = rm.registerNode("h2:1234", 80000);
// app1
RMApp app1 = rm.submitApp(GB, "app", "u1", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
// c2 on n1, c3 on n2
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
ContainerId containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.ALLOCATED));
am1.allocate("*", GB, 1, new ArrayList<ContainerId>());
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
Assert.assertTrue(rm.waitForState(nm2, containerId,
RMContainerState.ALLOCATED));
// app2
RMApp app2 = rm.submitApp(GB, "app", "u2", null, "a");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// c2/c3 on n1
am2.allocate("*", GB, 2, new ArrayList<ContainerId>(), "x");
containerId =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 3);
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.ALLOCATED));
// check used resource:
// queue-a used x=1G, ""=1G
checkUsedResource(rm, "a", 3 * GB, "x");
checkUsedResource(rm, "a", 3 * GB);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
FiCaSchedulerApp application1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp application2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// change h1's label to z
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
toSet("z"))));
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 3 * GB, "z");
checkUsedResource(rm, "a", 3 * GB);
checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 3 * GB, "z");
checkUsedResource(rm, "root", 3 * GB);
checkUserUsedResource(rm, "a", "u1", "x", 0 * GB);
checkUserUsedResource(rm, "a", "u1", "z", 1 * GB);
checkUserUsedResource(rm, "a", "u1", "", 2 * GB);
checkUserUsedResource(rm, "a", "u2", "x", 0 * GB);
checkUserUsedResource(rm, "a", "u2", "z", 2 * GB);
checkUserUsedResource(rm, "a", "u2", "", 1 * GB);
Assert.assertEquals(0,
application1.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(1 * GB,
application1.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
Assert.assertEquals(2 * GB,
application1.getAppAttemptResourceUsage().getUsed("").getMemorySize());
Assert.assertEquals(0,
application2.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(2 * GB,
application2.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
Assert.assertEquals(1 * GB,
application2.getAppAttemptResourceUsage().getUsed("").getMemorySize());
rm.close();
}
@Test
public void testAMResourceLimitNodeUpdatePartition() throws Exception {
conf.setInt("yarn.scheduler.minimum-allocation-mb", 64);
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
rm.registerNode("h1:1234", 6400);
mgr.addToCluserNodeLabelsWithDefaultExclusivity(
ImmutableSet.of("x", "y", "z"));
// .1 percentage of 6400 will be for am
checkAMResourceLimit(rm, "a", 640, "");
checkAMResourceLimit(rm, "a", 0, "x");
checkAMResourceLimit(rm, "a", 0, "y");
checkAMResourceLimit(rm, "a", 0, "z");
mgr.replaceLabelsOnNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
rm.drainEvents();
checkAMResourceLimit(rm, "a", 640, "x");
checkAMResourceLimit(rm, "a", 0, "y");
checkAMResourceLimit(rm, "a", 0, "z");
checkAMResourceLimit(rm, "a", 0, "");
// Switch
mgr.replaceLabelsOnNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
rm.drainEvents();
checkAMResourceLimit(rm, "a", 0, "x");
checkAMResourceLimit(rm, "a", 640, "y");
checkAMResourceLimit(rm, "a", 0, "z");
checkAMResourceLimit(rm, "a", 0, "");
}
@Test(timeout = 60000)
public void testAMResourceUsageWhenNodeUpdatesPartition()
throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 8000);
rm.registerNode("h2:1234", 8000);
rm.registerNode("h3:1234", 8000);
ContainerId containerId2;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm.submitApp(GB, "app", "user", null, "a", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// request a container.
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId2,
RMContainerState.ALLOCATED));
// check used resource:
// queue-a used x=2G
checkUsedResource(rm, "a", 2048, "x");
checkAMUsedResource(rm, "a", 1024, "x");
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
// change h1's label to z
cs.handle(new NodeLabelsUpdateSchedulerEvent(
ImmutableMap.of(nm1.getNodeId(), toSet("z"))));
// Now the resources also should change from x to z. Verify AM and normal
// used resource are successfully changed.
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 2048, "z");
checkAMUsedResource(rm, "a", 0, "x");
checkAMUsedResource(rm, "a", 1024, "z");
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "z", 2048);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getAMUsed("x").getMemorySize());
Assert.assertEquals(1024,
app.getAppAttemptResourceUsage().getAMUsed("z").getMemorySize());
// change h1's label to no label
Set<String> emptyLabels = new HashSet<>();
Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
emptyLabels);
cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 2048);
checkAMUsedResource(rm, "a", 0, "x");
checkAMUsedResource(rm, "a", 0, "z");
checkAMUsedResource(rm, "a", 1024);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "z", 0);
checkUserUsedResource(rm, "a", "user", "", 2048);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getAMUsed("x").getMemorySize());
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getAMUsed("z").getMemorySize());
Assert.assertEquals(1024,
app.getAppAttemptResourceUsage().getAMUsed("").getMemorySize());
rm.close();
}
@Test(timeout = 30000)
public void testBlacklistAMDisableLabel() throws Exception {
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
true);
conf.setFloat(
YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
0.5f);
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("x"),
NodeId.newInstance("h3", 0), toSet("x"), NodeId.newInstance("h6", 0),
toSet("x")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("y"),
NodeId.newInstance("h5", 0), toSet("y"), NodeId.newInstance("h7", 0),
toSet("y")));
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
// Nodes in label default h1,h8,h9
// Nodes in label x h2,h3,h6
// Nodes in label y h4,h5,h7
MockNM nm1 = rm.registerNode("h1:1234", 2048);
MockNM nm2 = rm.registerNode("h2:1234", 2048);
rm.registerNode("h3:1234", 2048);
rm.registerNode("h4:1234", 2048);
rm.registerNode("h5:1234", 2048);
rm.registerNode("h6:1234", 2048);
rm.registerNode("h7:1234", 2048);
rm.registerNode("h8:1234", 2048);
rm.registerNode("h9:1234", 2048);
// Submit app with AM container launched on default partition i.e. h1.
RMApp app = rm.submitApp(GB, "app", "user", null, "a");
MockRM.launchAndRegisterAM(app, rm, nm1);
RMAppAttempt appAttempt = app.getCurrentAppAttempt();
// Add default node blacklist from default
appAttempt.getAMBlacklistManager().addNode("h1");
ResourceBlacklistRequest blacklistUpdates =
appAttempt.getAMBlacklistManager().getBlacklistUpdates();
Assert.assertEquals(1, blacklistUpdates.getBlacklistAdditions().size());
Assert.assertEquals(0, blacklistUpdates.getBlacklistRemovals().size());
// Adding second node from default parition
appAttempt.getAMBlacklistManager().addNode("h8");
blacklistUpdates = appAttempt.getAMBlacklistManager().getBlacklistUpdates();
Assert.assertEquals(0, blacklistUpdates.getBlacklistAdditions().size());
Assert.assertEquals(2, blacklistUpdates.getBlacklistRemovals().size());
// Submission in label x
RMApp applabel = rm.submitApp(GB, "app", "user", null, "a", "x");
MockRM.launchAndRegisterAM(applabel, rm, nm2);
RMAppAttempt appAttemptlabelx = applabel.getCurrentAppAttempt();
appAttemptlabelx.getAMBlacklistManager().addNode("h2");
ResourceBlacklistRequest blacklistUpdatesOnx =
appAttemptlabelx.getAMBlacklistManager().getBlacklistUpdates();
Assert.assertEquals(1, blacklistUpdatesOnx.getBlacklistAdditions().size());
Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistRemovals().size());
// Adding second node from default parition
appAttemptlabelx.getAMBlacklistManager().addNode("h3");
blacklistUpdatesOnx =
appAttempt.getAMBlacklistManager().getBlacklistUpdates();
Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistAdditions().size());
Assert.assertEquals(2, blacklistUpdatesOnx.getBlacklistRemovals().size());
rm.close();
}
private void checkAMResourceLimit(MockRM rm, String queuename, int memory,
String label) throws InterruptedException {
Assert.assertEquals(memory,
waitForResourceUpdate(rm, queuename, memory, label, 3000L));
}
private long waitForResourceUpdate(MockRM rm, String queuename, long memory,
String label, long timeout) throws InterruptedException {
long start = System.currentTimeMillis();
long memorySize = 0;
while (System.currentTimeMillis() - start < timeout) {
CapacityScheduler scheduler =
(CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queuename);
memorySize =
queue.getQueueResourceUsage().getAMLimit(label).getMemorySize();
if (memory == memorySize) {
return memorySize;
}
Thread.sleep(100);
}
return memorySize;
}
}