blob: 54b504725107e0a7d36203c858e8dcf5d39c658b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
public class TestNodeLabelContainerAllocation {
private final int GB = 1024;
private YarnConfiguration conf;
RMNodeLabelsManager mgr;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
private Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setAccessibleNodeLabels(A, toSet("x"));
conf.setCapacityByLabel(A, "x", 100);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 20);
conf.setAccessibleNodeLabels(B, toSet("y", "z"));
conf.setCapacityByLabel(B, "y", 100);
conf.setCapacityByLabel(B, "z", 100);
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 70);
conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
// Define 2nd-level queues
final String A1 = A + ".a1";
conf.setQueues(A, new String[] {"a1"});
conf.setCapacity(A1, 100);
conf.setMaximumCapacity(A1, 100);
conf.setCapacityByLabel(A1, "x", 100);
final String B1 = B + ".b1";
conf.setQueues(B, new String[] {"b1"});
conf.setCapacity(B1, 100);
conf.setMaximumCapacity(B1, 100);
conf.setCapacityByLabel(B1, "y", 100);
conf.setCapacityByLabel(B1, "z", 100);
final String C1 = C + ".c1";
conf.setQueues(C, new String[] {"c1"});
conf.setCapacity(C1, 100);
conf.setMaximumCapacity(C1, 100);
return conf;
}
private void checkTaskContainersHost(ApplicationAttemptId attemptId,
ContainerId containerId, ResourceManager rm, String host) {
YarnScheduler scheduler = rm.getRMContext().getScheduler();
SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
Assert.assertTrue(appReport.getLiveContainers().size() > 0);
for (RMContainer c : appReport.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
Assert.assertEquals(host, c.getAllocatedNode().getHost());
}
}
}
@SuppressWarnings("unchecked")
private <E> Set<E> toSet(E... elements) {
Set<E> set = Sets.newHashSet(elements);
return set;
}
@Test (timeout = 300000)
public void testContainerAllocationWithSingleUserLimits() throws Exception {
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
rm1.registerNode("h2:1234", 8000); // label = y
MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// A has only 10% of x, so it can only allocate one container in label=empty
ContainerId containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
Assert.assertTrue(rm1.waitForState(nm3, containerId,
RMContainerState.ALLOCATED));
// Cannot allocate 2nd label=empty container
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
Assert.assertFalse(rm1.waitForState(nm3, containerId,
RMContainerState.ALLOCATED));
// A has default user limit = 100, so it can use all resource in label = x
// We can allocate floor(8000 / 1024) = 7 containers
for (int id = 3; id <= 8; id++) {
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
Assert.assertTrue(rm1.waitForState(nm1, containerId,
RMContainerState.ALLOCATED));
}
rm1.close();
}
@Test(timeout = 300000)
public void testContainerAllocateWithComplexLabels() throws Exception {
/*
* Queue structure:
* root (*)
* ________________
* / \
* a x(100%), y(50%) b y(50%), z(100%)
* ________________ ______________
* / / \
* a1 (x,y) b1(no) b2(y,z)
* 100% y = 100%, z = 100%
*
* Node structure:
* h1 : x
* h2 : y
* h3 : y
* h4 : z
* h5 : NO
*
* Total resource:
* x: 4G
* y: 6G
* z: 2G
* *: 2G
*
* Resource of
* a1: x=4G, y=3G, NO=0.2G
* b1: NO=0.9G (max=1G)
* b2: y=3, z=2G, NO=0.9G (max=1G)
*
* Each node can only allocate two containers
*/
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
toSet("z"), NodeId.newInstance("h5", 0),
RMNodeLabelsManager.EMPTY_STRING_SET));
// inject node label manager
MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 2048);
MockNM nm2 = rm1.registerNode("h2:1234", 2048);
MockNM nm3 = rm1.registerNode("h3:1234", 2048);
MockNM nm4 = rm1.registerNode("h4:1234", 2048);
MockNM nm5 = rm1.registerNode("h5:1234", 2048);
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container (label = y). can be allocated on nm2
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
Assert.assertTrue(rm1.waitForState(nm2, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h2");
// launch an app to queue b1 (label = y), and check all container will
// be allocated in h5
RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
// request a container for AM, will succeed
// and now b1's queue capacity will be used, cannot allocate more containers
// (Maximum capacity reached)
am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm4, containerId,
RMContainerState.ALLOCATED));
Assert.assertFalse(rm1.waitForState(nm5, containerId,
RMContainerState.ALLOCATED));
// launch an app to queue b2
RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
// request a container. try to allocate on nm1 (label = x) and nm3 (label =
// y,z). Will successfully allocate on nm3
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm1, containerId,
RMContainerState.ALLOCATED));
Assert.assertTrue(rm1.waitForState(nm3, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
"h3");
// try to allocate container (request label = z) on nm4 (label = y,z).
// Will successfully allocate on nm4 only.
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
Assert.assertTrue(rm1.waitForState(nm4, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
"h4");
rm1.close();
}
@Test (timeout = 120000)
public void testContainerAllocateWithLabels() throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
// request a container.
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm2, containerId,
RMContainerState.ALLOCATED));
Assert.assertTrue(rm1.waitForState(nm1, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h1");
// launch an app to queue b1 (label = y), and check all container will
// be allocated in h2
RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
// request a container.
am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm1, containerId,
RMContainerState.ALLOCATED));
Assert.assertTrue(rm1.waitForState(nm2, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
"h2");
// launch an app to queue c1 (label = ""), and check all container will
// be allocated in h3
RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
// request a container.
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm2, containerId,
RMContainerState.ALLOCATED));
Assert.assertTrue(rm1.waitForState(nm3, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
"h3");
rm1.close();
}
@Test (timeout = 120000)
public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
// This test is pretty much similar to testContainerAllocateWithLabel.
// Difference is, this test doesn't specify label expression in ResourceRequest,
// instead, it uses default queue label expression
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm3, containerId,
RMContainerState.ALLOCATED));
Assert.assertTrue(rm1.waitForState(nm1, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h1");
// launch an app to queue b1 (label = y), and check all container will
// be allocated in h2
RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// request a container.
am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm3, containerId,
RMContainerState.ALLOCATED));
Assert.assertTrue(rm1.waitForState(nm2, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
"h2");
// launch an app to queue c1 (label = ""), and check all container will
// be allocated in h3
RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
// request a container.
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm2, containerId,
RMContainerState.ALLOCATED));
Assert.assertTrue(rm1.waitForState(nm3, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
"h3");
rm1.close();
}
@Test (timeout = 120000)
public void testContainerReservationWithLabels() throws Exception {
// This test is pretty much similar to testContainerAllocateWithLabel.
// Difference is, this test doesn't specify label expression in
// ResourceRequest,
// instead, it uses default queue label expression
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
"z"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
NodeId.newInstance("h3", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(
TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
rm1.registerNode("h2:1234", 8 * GB); // label = y
rm1.registerNode("h3:1234", 8 * GB); // label = x
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
// container for app1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h1");
// Check if a 4G container allocated for app1, and 4G is reserved
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
.getApplicationAttemptId());
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertTrue(schedulerApp1.getReservedContainers().size() > 0);
Assert.assertEquals(9 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(4 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// Cancel asks of app2 and re-kick RM
am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(5 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved("x")
.getMemorySize());
rm1.close();
}
@Test (timeout = 120000)
public void testRMContainerLeakInLeafQueue() throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
NodeId.newInstance("h2", 0), toSet("x")));
// inject node label manager
MockRM rm1 =
new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
rm1.registerNode("h2:1234", 8 * GB); // label = x
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(1 * GB, "app1", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMApp app2 = rm1.submitApp(1 * GB, "app2", "user", null, "a1");
MockRM.launchAndRegisterAM(app2, rm1, nm1);
// request a container.
am1.allocate("*", 7 * GB, 2, new ArrayList<ContainerId>());
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
// Do node heartbeats 1 time
// scheduler will reserve a container for app1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Check if a 4G container allocated for app1, and 4G is reserved
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
// kill app2 then do node heartbeat 1 time
// scheduler will allocate a container from the reserved container on nm1
rm1.killApp(app2.getApplicationId());
rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
// After kill app1, LeafQueue#ignorePartitionExclusivityRMContainers should
// be clean, otherwise resource leak happened
rm1.killApp(app1.getApplicationId());
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
Assert.assertEquals(0, leafQueue.getIgnoreExclusivityRMContainers().size());
rm1.close();
}
private void checkPendingResource(MockRM rm, int priority,
ApplicationAttemptId attemptId, int memory) {
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId);
PendingAsk ask =
app.getAppSchedulingInfo().getPendingAsk(
TestUtils.toSchedulerKey(priority), "*");
Assert.assertEquals(memory,
ask.getPerAllocationResource().getMemorySize() * ask
.getCount());
}
private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId,
int numContainers) {
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
SchedulerNode node = cs.getSchedulerNode(nodeId);
Assert.assertEquals(numContainers, node.getNumContainers());
}
/**
* JIRA YARN-4140, In Resource request set node label will be set only on ANY
* reqest. RACK/NODE local and default requests label expression need to be
* updated. This testcase is to verify the label expression is getting changed
* based on ANY requests.
*
* @throws Exception
*/
@Test
public void testResourceRequestUpdateNodePartitions() throws Exception {
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(NodeLabel.newInstance("x"),
NodeLabel.newInstance("y", false), NodeLabel.newInstance("z", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm2 = rm1.registerNode("h2:1234", 40 * GB); // label = y
// launch an app to queue b1 (label = y), AM container should be launched in
// nm2
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// Creating request set when request before ANY is not having label and any
// is having label
List<ResourceRequest> resourceRequest = new ArrayList<ResourceRequest>();
resourceRequest.add(am1.createResourceReq("/default-rack", 1024, 3, 1,
RMNodeLabelsManager.NO_LABEL));
resourceRequest.add(am1.createResourceReq("*", 1024, 3, 5, "y"));
resourceRequest.add(am1.createResourceReq("h1:1234", 1024, 3, 2,
RMNodeLabelsManager.NO_LABEL));
resourceRequest.add(am1.createResourceReq("*", 1024, 2, 3, "y"));
resourceRequest.add(am1.createResourceReq("h2:1234", 1024, 2, 4, null));
resourceRequest.add(am1.createResourceReq("*", 1024, 4, 3, null));
resourceRequest.add(am1.createResourceReq("h2:1234", 1024, 4, 4, null));
am1.allocate(resourceRequest, new ArrayList<ContainerId>());
CapacityScheduler cs =
(CapacityScheduler) rm1.getRMContext().getScheduler();
FiCaSchedulerApp app =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2, "y");
checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3, "y");
checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 4,
RMNodeLabelsManager.NO_LABEL);
// Previous any request was Y trying to update with z and the
// request before ANY label is null
List<ResourceRequest> newReq = new ArrayList<ResourceRequest>();
newReq.add(am1.createResourceReq("h2:1234", 1024, 3, 4, null));
newReq.add(am1.createResourceReq("*", 1024, 3, 5, "z"));
newReq.add(am1.createResourceReq("h1:1234", 1024, 3, 4, null));
newReq.add(am1.createResourceReq("*", 1024, 4, 5, "z"));
am1.allocate(newReq, new ArrayList<ContainerId>());
checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3, "z");
checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 4, "z");
checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2, "y");
// Request before ANY and ANY request is set as NULL. Request should be set
// with Empty Label
List<ResourceRequest> resourceRequest1 = new ArrayList<ResourceRequest>();
resourceRequest1.add(am1.createResourceReq("/default-rack", 1024, 3, 1,
null));
resourceRequest1.add(am1.createResourceReq("*", 1024, 3, 5, null));
resourceRequest1.add(am1.createResourceReq("h1:1234", 1024, 3, 2,
RMNodeLabelsManager.NO_LABEL));
resourceRequest1.add(am1.createResourceReq("/default-rack", 1024, 2, 1,
null));
resourceRequest1.add(am1.createResourceReq("*", 1024, 2, 3,
RMNodeLabelsManager.NO_LABEL));
resourceRequest1.add(am1.createResourceReq("h2:1234", 1024, 2, 4, null));
am1.allocate(resourceRequest1, new ArrayList<ContainerId>());
checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3,
RMNodeLabelsManager.NO_LABEL);
checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2,
RMNodeLabelsManager.NO_LABEL);
}
private void checkNodePartitionOfRequestedPriority(AppSchedulingInfo info,
int priority, String expectedPartition) {
for (SchedulerRequestKey key : info.getSchedulerKeys()) {
if (key.getPriority().getPriority() == priority) {
Assert.assertEquals("Expected partition is " + expectedPartition,
expectedPartition,
info.getSchedulingPlacementSet(key)
.getPrimaryRequestedNodePartition());
}
}
}
@Test
public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception {
/**
* Test case: Submit two application to a queue (app1 first then app2), app1
* asked for no-label, app2 asked for label=x, when node1 has label=x
* doing heart beat, app2 will get allocation first, even if app2 submits later
* than app1
*/
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x"), NodeLabel.newInstance("y", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
// launch an app to queue b1 (label = y), AM container should be launched in nm2
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// launch another app to queue b1 (label = y), AM container should be launched in nm2
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// request container and nm1 do heartbeat (nm2 has label=y), note that app1
// request non-labeled container, and app2 request labeled container, app2
// will get allocated first even if app1 submitted first.
am1.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>());
am2.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>(), "y");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// Do node heartbeats many times
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App2 will get preference to be allocated on node1, and node1 will be all
// used by App2.
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
// app1 get nothing in nm1 (partition=y)
checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), schedulerApp1);
checkNumOfContainersInAnAppOnGivenNode(9, nm2.getNodeId(), schedulerApp1);
// app2 get all resource in nm1 (partition=y)
checkNumOfContainersInAnAppOnGivenNode(8, nm1.getNodeId(), schedulerApp2);
checkNumOfContainersInAnAppOnGivenNode(1, nm2.getNodeId(), schedulerApp2);
rm1.close();
}
private void checkNumOfContainersInAnAppOnGivenNode(int expectedNum,
NodeId nodeId, FiCaSchedulerApp app) {
int num = 0;
for (RMContainer container : app.getLiveContainers()) {
if (container.getAllocatedNode().equals(nodeId)) {
num++;
}
}
Assert.assertEquals(expectedNum, num);
}
@Test
public void
testPreferenceOfNeedyPrioritiesUnderSameAppTowardsNodePartitions()
throws Exception {
/**
* Test case: Submit one application, it asks label="" in priority=1 and
* label="x" in priority=2, when a node with label=x heartbeat, priority=2
* will get allocation first even if there're pending resource in priority=1
*/
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x"), NodeLabel.newInstance("y", false)));
// Makes y to be non-exclusive node labels
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
// launch an app to queue b1 (label = y), AM container should be launched in nm3
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// request containers from am2, priority=1 asks for "" and priority=2 asks
// for "y", "y" container should be allocated first
am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
// Do a node heartbeat once
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
cs.handle(new NodeUpdateSchedulerEvent(
rm1.getRMContext().getRMNodes().get(nm1.getNodeId())));
// Check pending resource for am2, priority=1 doesn't get allocated before
// priority=2 allocated
checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 1 * GB);
checkPendingResource(rm1, 2, am1.getApplicationAttemptId(), 0 * GB);
rm1.close();
}
@Test
public void testNonLabeledResourceRequestGetPreferrenceToNonLabeledNode()
throws Exception {
/**
* Test case: Submit one application, it asks 6 label="" containers, NM1
* with label=y and NM2 has no label, NM1/NM2 doing heartbeat together. Even
* if NM1 has idle resource, containers are all allocated to NM2 since
* non-labeled request should get allocation on non-labeled nodes first.
*/
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false), NodeLabel.newInstance("y")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
ContainerId nextContainerId;
// launch an app to queue b1 (label = y), AM container should be launched in nm3
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// request containers from am2, priority=1 asks for "" * 6 (id from 4 to 9),
// nm2/nm3 do
// heartbeat at the same time, check containers are always allocated to nm3.
// This is to verify when there's resource available in non-labeled
// partition, non-labeled resource should allocate to non-labeled partition
// first.
am1.allocate("*", 1 * GB, 6, 1, new ArrayList<ContainerId>(), "");
for (int i = 2; i < 2 + 6; i++) {
nextContainerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), i);
Assert.assertTrue(rm1.waitForState(Arrays.asList(nm1, nm2),
nextContainerId, RMContainerState.ALLOCATED));
}
// no more container allocated on nm1
checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 0);
// all 7 (1 AM container + 6 task container) containers allocated on nm2
checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 7);
rm1.close();
}
@Test
public void testPreferenceOfQueuesTowardsNodePartitions()
throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* / | \
* a b c
* / \ / \ / \
* a1 a2 b1 b2 c1 c2
* (x) (x) (x)
* </pre>
*
* Only a1, b1, c1 can access label=x, and their default label=x Each each
* has one application, asks for 5 containers. NM1 has label=x
*
* NM1/NM2 doing heartbeat for 15 times, it should allocate all 15
* containers with label=x
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 33);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 33);
csConf.setQueues(A, new String[] {"a1", "a2"});
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 33);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 33);
csConf.setQueues(B, new String[] {"b1", "b2"});
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
csConf.setCapacity(C, 34);
csConf.setAccessibleNodeLabels(C, toSet("x"));
csConf.setCapacityByLabel(C, "x", 34);
csConf.setQueues(C, new String[] {"c1", "c2"});
// Define 2nd-level queues
final String A1 = A + ".a1";
csConf.setCapacity(A1, 50);
csConf.setCapacityByLabel(A1, "x", 100);
csConf.setDefaultNodeLabelExpression(A1, "x");
final String A2 = A + ".a2";
csConf.setCapacity(A2, 50);
csConf.setCapacityByLabel(A2, "x", 0);
final String B1 = B + ".b1";
csConf.setCapacity(B1, 50);
csConf.setCapacityByLabel(B1, "x", 100);
csConf.setDefaultNodeLabelExpression(B1, "x");
final String B2 = B + ".b2";
csConf.setCapacity(B2, 50);
csConf.setCapacityByLabel(B2, "x", 0);
final String C1 = C + ".c1";
csConf.setCapacity(C1, 50);
csConf.setCapacityByLabel(C1, "x", 100);
csConf.setDefaultNodeLabelExpression(C1, "x");
final String C2 = C + ".c2";
csConf.setCapacity(C2, 50);
csConf.setCapacityByLabel(C2, "x", 0);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false), NodeLabel.newInstance("y")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
// app1 -> a1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// app2 -> a2
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// app3 -> b1
RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
// app4 -> b2
RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm2);
// app5 -> c1
RMApp app5 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
MockAM am5 = MockRM.launchAndRegisterAM(app5, rm1, nm1);
// app6 -> b2
RMApp app6 = rm1.submitApp(1 * GB, "app", "user", null, "c2");
MockAM am6 = MockRM.launchAndRegisterAM(app6, rm1, nm2);
// Each application request 5 * 1GB container
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
am3.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
am4.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
am5.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
am6.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
// NM1 do 15 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
for (int i = 0; i < 15; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// NM1 get 15 new containers (total is 18, 15 task containers and 3 AM
// containers)
checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 18);
// Check pending resource each application
// APP1/APP3/APP5 get satisfied, and APP2/APP2/APP3 get nothing.
checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 0 * GB);
checkPendingResource(rm1, 1, am2.getApplicationAttemptId(), 5 * GB);
checkPendingResource(rm1, 1, am3.getApplicationAttemptId(), 0 * GB);
checkPendingResource(rm1, 1, am4.getApplicationAttemptId(), 5 * GB);
checkPendingResource(rm1, 1, am5.getApplicationAttemptId(), 0 * GB);
checkPendingResource(rm1, 1, am6.getApplicationAttemptId(), 5 * GB);
rm1.close();
}
@Test
public void testQueuesWithoutAccessUsingPartitionedNodes() throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* / \
* a b
* (x)
* </pre>
*
* Only a can access label=x, two nodes in the cluster, n1 has x and n2 has
* no-label.
*
* When user-limit-factor=5, submit one application in queue b and request
* for infinite containers should be able to use up all cluster resources.
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 50);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 100);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 50);
csConf.setAccessibleNodeLabels(B, new HashSet<String>());
csConf.setUserLimitFactor(B, 5);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false), NodeLabel.newInstance("y")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
// app1 -> b
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// Each application request 50 * 1GB container
am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
// NM1 do 50 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
// How much cycles we waited to be allocated when available resource only on
// partitioned node
int cycleWaited = 0;
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
if (schedulerNode1.getNumContainers() == 0) {
cycleWaited++;
}
}
// We will will 10 cycles before get allocated on partitioned node
// NM2 can allocate 10 containers totally, exclude already allocated AM
// container, we will wait 9 to fulfill non-partitioned node, and need wait
// one more cycle before allocating to non-partitioned node
Assert.assertEquals(10, cycleWaited);
// Both NM1/NM2 launched 10 containers, cluster resource is exhausted
checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 10);
checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 10);
rm1.close();
}
@Test
public void testAMContainerAllocationWillAlwaysBeExclusive()
throws Exception {
/**
* Test case: Submit one application without partition, trying to allocate a
* node has partition=x, it should fail to allocate since AM container will
* always respect exclusivity for partitions
*/
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false), NodeLabel.newInstance("y")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
String nodeIdStr = "h1:1234";
MockNM nm1 = rm1.registerNode(nodeIdStr, 8 * GB); // label = x
// launch an app to queue b1 (label = y), AM container should be launched in nm3
RMApp app = rm1.submitApp(1 * GB, "app", "user", null, "b1");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
// Heartbeat for many times, app1 should get nothing
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
Assert.assertTrue(
"Scheduler diagnostics should have reason for not assigning the node",
app.getDiagnostics().toString().contains(
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_IGNORE_EXCLUSIVE_MODE));
Assert.assertTrue(
"Scheduler diagnostics should have last processed node information",
app.getDiagnostics().toString().contains(
CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG
+ nodeIdStr + " ( Partition : [x]"));
Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
.getNumContainers());
rm1.close();
}
@Test(timeout=60000)
public void
testQueueMaxCapacitiesWillNotBeHonoredWhenNotRespectingExclusivity()
throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* / \
* a b
* (x) (x)
* </pre>
*
* a/b can access x, both of them has max-capacity-on-x = 50
*
* When doing non-exclusive allocation, app in a (or b) can use 100% of x
* resource.
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
"b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 50);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 50);
csConf.setMaximumCapacityByLabel(A, "x", 50);
csConf.setUserLimit(A, 200);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 50);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 50);
csConf.setMaximumCapacityByLabel(B, "x", 50);
csConf.setUserLimit(B, 200);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// app1 asks for 10 partition= containers
am1.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>());
// NM1 do 50 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// app1 gets all resource in partition=x
Assert.assertEquals(10, schedulerNode1.getNumContainers());
// check non-exclusive containers of LeafQueue is correctly updated
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
"y"));
Assert.assertEquals(10,
leafQueue.getIgnoreExclusivityRMContainers().get("x").size());
// completes all containers of app1, ignoreExclusivityRMContainers should be
// updated as well.
cs.handle(new AppAttemptRemovedSchedulerEvent(
am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false));
Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
"x"));
rm1.close();
}
private void checkQueueUsedCapacity(String queueName, CapacityScheduler cs,
String nodePartition, float usedCapacity, float absoluteUsedCapacity) {
float epsilon = 1e-6f;
CSQueue queue = cs.getQueue(queueName);
Assert.assertNotNull("Failed to get queue=" + queueName, queue);
Assert.assertEquals(usedCapacity, queue.getQueueCapacities()
.getUsedCapacity(nodePartition), epsilon);
Assert.assertEquals(absoluteUsedCapacity, queue.getQueueCapacities()
.getAbsoluteUsedCapacity(nodePartition), epsilon);
}
private void doNMHeartbeat(MockRM rm, NodeId nodeId, int nHeartbeat) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nodeId);
for (int i = 0; i < nHeartbeat; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
}
private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum)
throws InterruptedException {
int totalWaitTick = 100; // wait 10 sec at most.
while (expectedNodeNum > rm.getResourceScheduler().getNumClusterNodes()
&& totalWaitTick > 0) {
Thread.sleep(100);
totalWaitTick--;
}
}
private void waitSchedulerNodeHasUpdatedLabels(CapacityScheduler cs,
MockNM nm, String partition) throws InterruptedException {
FiCaSchedulerNode node = cs.getNode(nm.getNodeId());
int totalWaitTick = 20; // wait 2 sec at most.
while (!node.getLabels().contains(partition)
&& totalWaitTick > 0) {
Thread.sleep(100);
totalWaitTick--;
}
}
@Test
public void testQueueUsedCapacitiesUpdate()
throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* / \
* a b
* / \ (x)
* a1 a2
* (x) (x)
* </pre>
*
* Both a/b can access x, we need to verify when
* <pre>
* 1) container allocated/released in both partitioned/non-partitioned node,
* 2) clusterResource updates
* 3) queue guaranteed resource changed
* </pre>
*
* used capacity / absolute used capacity of queues are correctly updated.
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
"b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
/**
* Initially, we set A/B's resource 50:50
*/
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 50);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 50);
csConf.setQueues(A, new String[] { "a1", "a2" });
final String A1 = A + ".a1";
csConf.setCapacity(A1, 50);
csConf.setAccessibleNodeLabels(A1, toSet("x"));
csConf.setCapacityByLabel(A1, "x", 50);
final String A2 = A + ".a2";
csConf.setCapacity(A2, 50);
csConf.setAccessibleNodeLabels(A2, toSet("x"));
csConf.setCapacityByLabel(A2, "x", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 50);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 50);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
/*
* Before we adding any node to the cluster, used-capacity/abs-used-capacity
* should be 0
*/
checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a", cs, "", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
checkQueueUsedCapacity("root", cs, "", 0f, 0f);
MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
/*
* After we adding nodes to the cluster, and before starting to use them,
* used-capacity/abs-used-capacity should be 0
*/
checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a", cs, "", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
checkQueueUsedCapacity("root", cs, "", 0f, 0f);
// app1 -> a1
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
// app1 asks for 1 partition= containers
am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
doNMHeartbeat(rm, nm2.getNodeId(), 10);
// Now check usage, app1 uses:
// a1: used(no-label) = 80%
// abs-used(no-label) = 20%
// a: used(no-label) = 40%
// abs-used(no-label) = 20%
// root: used(no-label) = 20%
// abs-used(no-label) = 20%
checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
// app1 asks for 2 partition=x containers
am1.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
// Now check usage, app1 uses:
// a1: used(x) = 80%
// abs-used(x) = 20%
// a: used(x) = 40%
// abs-used(x) = 20%
// root: used(x) = 20%
// abs-used(x) = 20%
checkQueueUsedCapacity("a", cs, "x", 0.4f, 0.2f);
checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.2f, 0.2f);
checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
// submit an app to a2, uses 1 NON_PARTITIONED container and 1 PARTITIONED
// container
// app2 -> a2
RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "a2");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// app1 asks for 1 partition= containers
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
// Now check usage, app1 uses:
// a2: used(x) = 40%
// abs-used(x) = 10%
// a: used(x) = 20%
// abs-used(x) = 10%
// root: used(x) = 10%
// abs-used(x) = 10%
checkQueueUsedCapacity("a", cs, "x", 0.6f, 0.3f);
checkQueueUsedCapacity("a", cs, "", 0.6f, 0.3f);
checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
checkQueueUsedCapacity("a2", cs, "x", 0.4f, 0.1f);
checkQueueUsedCapacity("a2", cs, "", 0.4f, 0.1f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.3f, 0.3f);
checkQueueUsedCapacity("root", cs, "", 0.3f, 0.3f);
// Add nm3/nm4, double resource for both partitioned/non-partitioned
// resource, used capacity should be 1/2 of before
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h3", 0), toSet("x")));
MockNM nm3 = rm.registerNode("h3:1234", 10 * GB); // label = x
MockNM nm4 = rm.registerNode("h4:1234", 10 * GB); // label = <empty>
waitSchedulerNodeJoined(rm, 4);
waitSchedulerNodeHasUpdatedLabels(cs, nm3, "x");
waitSchedulerNodeHasUpdatedLabels(cs, nm4, "");
checkQueueUsedCapacity("a", cs, "x", 0.3f, 0.15f);
checkQueueUsedCapacity("a", cs, "", 0.3f, 0.15f);
checkQueueUsedCapacity("a1", cs, "x", 0.4f, 0.1f);
checkQueueUsedCapacity("a1", cs, "", 0.4f, 0.1f);
checkQueueUsedCapacity("a2", cs, "x", 0.2f, 0.05f);
checkQueueUsedCapacity("a2", cs, "", 0.2f, 0.05f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
// Reinitialize queue, makes A's capacity double, and B's capacity to be 0
csConf.setCapacity(A, 100); // was 50
csConf.setCapacityByLabel(A, "x", 100); // was 50
csConf.setCapacity(B, 0); // was 50
csConf.setCapacityByLabel(B, "x", 0); // was 50
cs.reinitialize(csConf, rm.getRMContext());
checkQueueUsedCapacity("a", cs, "x", 0.15f, 0.15f);
checkQueueUsedCapacity("a", cs, "", 0.15f, 0.15f);
checkQueueUsedCapacity("a1", cs, "x", 0.2f, 0.1f);
checkQueueUsedCapacity("a1", cs, "", 0.2f, 0.1f);
checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
// Release all task containers from a1, check usage
am1.allocate(null, Arrays.asList(
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2),
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3),
ContainerId.newContainerId(am1.getApplicationAttemptId(), 4)));
checkQueueUsedCapacity("a", cs, "x", 0.05f, 0.05f);
checkQueueUsedCapacity("a", cs, "", 0.10f, 0.10f);
checkQueueUsedCapacity("a1", cs, "x", 0.0f, 0.0f);
checkQueueUsedCapacity("a1", cs, "", 0.1f, 0.05f);
checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.05f, 0.05f);
checkQueueUsedCapacity("root", cs, "", 0.10f, 0.10f);
rm.close();
}
@Test
public void testOrderOfAllocationOnPartitions()
throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* ________________
* / | \ \
* a (x) b (x) c d
* </pre>
*
* Both a/b can access x, we need to verify when
* <pre>
* When doing allocation on partitioned nodes,
* - Queue has accessibility to the node will go first
* - When accessibility is same
* - Queue has less used_capacity on given partition will go first
* - When used_capacity is same
* - Queue has more abs_capacity will go first
* </pre>
*
* used capacity / absolute used capacity of queues are correctly updated.
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
"b", "c", "d" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 25);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 30);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 25);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 70);
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
csConf.setAccessibleNodeLabels(C, Collections.<String> emptySet());
csConf.setCapacity(C, 25);
final String D = CapacitySchedulerConfiguration.ROOT + ".d";
csConf.setAccessibleNodeLabels(D, Collections.<String> emptySet());
csConf.setCapacity(D, 25);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
// app1 -> a
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
// app2 -> b
RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// app3 -> c
RMApp app3 = rm.submitApp(1 * GB, "app", "user", null, "c");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm2);
// app4 -> d
RMApp app4 = rm.submitApp(1 * GB, "app", "user", null, "d");
MockAM am4 = MockRM.launchAndRegisterAM(app4, rm, nm2);
// Test case 1
// Both a/b has used_capacity(x) = 0, when doing exclusive allocation, b
// will go first since b has more capacity(x)
am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
// Test case 2
// Do another allocation, a will go first since it has 0 use_capacity(x) and
// b has 1/7 used_capacity(x)
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
// Test case 3
// Just like above, when doing non-exclusive allocation, b will go first as well.
am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
doNMHeartbeat(rm, nm1.getNodeId(), 2);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
// Test case 4
// After b allocated, we should be able to allocate non-exlusive container in a
doNMHeartbeat(rm, nm1.getNodeId(), 2);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
// Test case 5
// b/c/d asks non-exclusive container together, b will go first irrelated to
// used_capacity(x)
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
am3.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
am4.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
doNMHeartbeat(rm, nm1.getNodeId(), 2);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
// Test case 6
// After b allocated, c will go first by lexicographic order
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
// Test case 7
// After c allocated, d will go first because it has less used_capacity(x)
// than c
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
// Test case 8
// After d allocated, c will go first, c/d has same use_capacity(x), so compare c/d's lexicographic order
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
}
@Test
public void testOrderOfAllocationOnPartitionsWhenAccessibilityIsAll()
throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* __________
* / \
* a (*) b (x)
* </pre>
*
* Both queues a/b can access x, we need to verify whether * accessibility
* is considered in ordering of queues
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 25);
csConf.setAccessibleNodeLabels(A, toSet("*"));
csConf.setCapacityByLabel(A, "x", 60);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 75);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 40);
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
// app1 -> a
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// app2 -> b
RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b", "x");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
// Both a/b has used_capacity(x) = 0, when doing exclusive allocation, a
// will go first since a has more capacity(x)
am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
}
@Test
public void testParentQueueMaxCapsAreRespected() throws Exception {
/*
* Queue tree:
* Root
* / \
* A B
* / \
* A1 A2
*
* A has 50% capacity and 50% max capacity (of label=x)
* A1/A2 has 50% capacity and 100% max capacity (of label=x)
* Cluster has one node (label=x) with resource = 24G.
* So we can at most use 12G resources under queueA.
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
"b"});
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 10);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 50);
csConf.setMaximumCapacityByLabel(A, "x", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 90);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 50);
csConf.setMaximumCapacityByLabel(B, "x", 50);
// Define 2nd-level queues
csConf.setQueues(A, new String[] { "a1",
"a2"});
final String A1 = A + ".a1";
csConf.setCapacity(A1, 50);
csConf.setAccessibleNodeLabels(A1, toSet("x"));
csConf.setCapacityByLabel(A1, "x", 50);
csConf.setMaximumCapacityByLabel(A1, "x", 100);
csConf.setUserLimitFactor(A1, 100.0f);
final String A2 = A + ".a2";
csConf.setCapacity(A2, 50);
csConf.setAccessibleNodeLabels(A2, toSet("x"));
csConf.setCapacityByLabel(A2, "x", 50);
csConf.setMaximumCapacityByLabel(A2, "x", 100);
csConf.setUserLimitFactor(A2, 100.0f);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 =
new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService());
nm1.registerNode();
// Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
// Try to launch app2 in a2, asked 2GB, should success
RMApp app2 = rm.submitApp(2 * GB, "app", "user", null, "a2", "x");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
// am2 asks more resources, cannot success because current used = 9G (app1)
// + 2G (app2) = 11G, and queue's max capacity = 12G
am2.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
}
@Test
public void testQueueMetricsWithLabels() throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* / \
* a b
* (x) (x)
* </pre>
*
* a/b can access x, both of them has max-capacity-on-x = 50
*
* When doing non-exclusive allocation, app in a (or b) can use 100% of x
* resource.
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(queueA, 25);
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
csConf.setCapacityByLabel(queueA, "x", 50);
csConf.setMaximumCapacityByLabel(queueA, "x", 50);
final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(queueB, 75);
csConf.setAccessibleNodeLabels(queueB, toSet("x"));
csConf.setCapacityByLabel(queueB, "x", 50);
csConf.setMaximumCapacityByLabel(queueB, "x", 50);
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("y", false)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
LeafQueue leafQueueB = (LeafQueue) cs.getQueue("b");
assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// app1 asks for 5 partition=x containers
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// app1 gets all resource in partition=x
Assert.assertEquals(5, schedulerNode1.getNumContainers());
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(5 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(5 * GB,
reportNm1.getAvailableResource().getMemorySize());
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(10 * GB,
reportNm2.getAvailableResource().getMemorySize());
assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
// The total memory tracked by QueueMetrics is 0GB for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a");
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
rm1.close();
}
@Test
public void testQueueMetricsWithLabelsOnDefaultLabelNode() throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* / \
* a b
* (x) (x)
* </pre>
*
* a/b can access x, both of them has max-capacity-on-x = 50
*
* When doing non-exclusive allocation, app in a (or b) can use 100% of x
* resource.
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(queueA, 25);
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
csConf.setCapacityByLabel(queueA, "x", 50);
csConf.setMaximumCapacityByLabel(queueA, "x", 50);
final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(queueB, 75);
csConf.setAccessibleNodeLabels(queueB, toSet("x"));
csConf.setCapacityByLabel(queueB, "x", 50);
csConf.setMaximumCapacityByLabel(queueB, "x", 50);
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// app1 asks for 3 partition= containers
am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());
// NM1 do 50 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// app1 gets all resource in partition=x (non-exclusive)
Assert.assertEquals(3, schedulerNode1.getNumContainers());
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(7 * GB,
reportNm1.getAvailableResource().getMemorySize());
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(9 * GB,
reportNm2.getAvailableResource().getMemorySize());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
// 3GB is used from label x quota. 1.5 GB is remaining from default label.
// 2GB is remaining from label x.
assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
// app1 asks for 1 default partition container
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
// NM2 do couple of heartbeats
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// app1 gets all resource in default partition
Assert.assertEquals(2, schedulerNode2.getNumContainers());
// 3GB is used from label x quota. 2GB used from default label.
// So 0.5 GB is remaining from default label.
assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB());
assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB());
// The total memory tracked by QueueMetrics is 10GB
// for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
rm1.close();
}
@Test
public void testQueueMetricsWithMixedLabels() throws Exception {
// There is only one queue which can access both default label and label x.
// There are two nodes of 10GB label x and 12GB no label.
// The test is to make sure that the queue metrics is only tracking the
// allocations and availability from default partition
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a"});
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(queueA, 100);
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
csConf.setCapacityByLabel(queueA, "x", 100);
csConf.setMaximumCapacityByLabel(queueA, "x", 100);
// set node -> label
// label x exclusivity is set to true
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", true)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// app1 asks for 5 partition=x containers
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// app1 gets all resource in partition=x
Assert.assertEquals(6, schedulerNode1.getNumContainers());
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(4 * GB,
reportNm1.getAvailableResource().getMemorySize());
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(12 * GB,
reportNm2.getAvailableResource().getMemorySize());
assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
// app2 -> a
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a", "");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// app2 asks for 5 partition= containers
am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
// NM2 do 50 heartbeats
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// app1 gets all resource in partition=x
Assert.assertEquals(6, schedulerNode2.getNumContainers());
reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(4 * GB,
reportNm1.getAvailableResource().getMemorySize());
reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(6 * GB,
reportNm2.getAvailableResource().getMemorySize());
assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());
// The total memory tracked by QueueMetrics is 12GB
// for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a");
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
rm1.close();
}
@Test
public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
*
* root
* / \
* a b
* (x) (x)
* / \
* a1 a2
* (x) (x)
* </pre>
*
* a/b can access x, both of them has max-capacity-on-x = 50
*
* When doing non-exclusive allocation, app in a (or b) can use 100% of x
* resource.
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(queueA, 50);
csConf.setMaximumCapacity(queueA, 100);
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
csConf.setCapacityByLabel(queueA, "x", 50);
csConf.setMaximumCapacityByLabel(queueA, "x", 100);
final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(queueB, 50);
csConf.setMaximumCapacity(queueB, 100);
csConf.setAccessibleNodeLabels(queueB, toSet("x"));
csConf.setCapacityByLabel(queueB, "x", 50);
csConf.setMaximumCapacityByLabel(queueB, "x", 100);
// Define 2nd-level queues
csConf.setQueues(queueA, new String[] { "a1",
"a2"});
final String A1 = queueA + ".a1";
csConf.setCapacity(A1, 20);
csConf.setMaximumCapacity(A1, 60);
csConf.setAccessibleNodeLabels(A1, toSet("x"));
csConf.setCapacityByLabel(A1, "x", 60);
csConf.setMaximumCapacityByLabel(A1, "x", 30);
final String A2 = queueA + ".a2";
csConf.setCapacity(A2, 80);
csConf.setMaximumCapacity(A2, 40);
csConf.setAccessibleNodeLabels(A2, toSet("x"));
csConf.setCapacityByLabel(A2, "x", 40);
csConf.setMaximumCapacityByLabel(A2, "x", 20);
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
// app1 -> a1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// app1 asks for 6 partition=x containers
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
doNMHeartbeat(rm1, nm1.getNodeId(), 50);
checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(14 * GB,
reportNm1.getAvailableResource().getMemorySize());
// Try to launch app2 in a2, asked 2GB, should success
// app2 -> a2
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// app2 asks for 4 partition=x containers
am2.allocate("*", 1 * GB, 4, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
doNMHeartbeat(rm1, nm1.getNodeId(), 50);
checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(10 * GB,
reportNm1.getAvailableResource().getMemorySize());
// Kill all apps in queue a2
cs.killAllAppsInQueue("a2");
rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
rm1.waitForAppRemovedFromScheduler(app2.getApplicationId());
// Try to launch app3 in a2, asked 6GB, should fail
// app3 -> a2
RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
am3.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
doNMHeartbeat(rm1, nm1.getNodeId(), 50);
// app3 cannot preempt more resources restricted by disable elasticity
checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(10 * GB,
reportNm1.getAvailableResource().getMemorySize());
// Kill all apps in queue a1
cs.killAllAppsInQueue("a1");
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
// app4 -> a1, try to allocate more than 6GB resource, should fail
RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm1);
// app3 asks for 7 partition=x containers
am4.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
doNMHeartbeat(rm1, nm1.getNodeId(), 50);
// app4 should only gets 6GB resource in partition=x
// since elasticity is disabled
checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(10 * GB,
reportNm1.getAvailableResource().getMemorySize());
rm1.close();
}
}