blob: a79aacc196e8013c2eff31083f9ac5eb1bf22d16 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
private final static String ALLOC_FILE = new File(TEST_DIR,
TestFairSchedulerFairShare.class.getName() + ".xml").getAbsolutePath();
@Before
public void setup() throws IOException {
conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
}
@After
public void teardown() {
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
conf = null;
}
private void createClusterWithQueuesAndOneNode(int mem, String policy)
throws IOException {
createClusterWithQueuesAndOneNode(mem, 0, policy);
}
private void createClusterWithQueuesAndOneNode(int mem, int vCores,
String policy) throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\" >");
out.println(" <queue name=\"parentA\" >");
out.println(" <weight>8</weight>");
out.println(" <queue name=\"childA1\" />");
out.println(" <queue name=\"childA2\" />");
out.println(" <queue name=\"childA3\" />");
out.println(" <queue name=\"childA4\" />");
out.println(" </queue>");
out.println(" <queue name=\"parentB\" >");
out.println(" <weight>1</weight>");
out.println(" <queue name=\"childB1\" />");
out.println(" <queue name=\"childB2\" />");
out.println(" </queue>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>" + policy
+ "</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
RMNode node1 = MockNodes.newNodeInfo(1,
Resources.createResource(mem, vCores), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
}
@Test
public void testFairShareNoAppsRunning() throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
scheduler.update();
// No apps are running in the cluster,verify if fair share is zero
// for all queues under parentA and parentB.
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
.getLeafQueues();
for (FSLeafQueue leaf : leafQueues) {
if (leaf.getName().startsWith("root.parentA")) {
assertEquals(0, (double) leaf.getFairShare().getMemorySize() / nodeCapacity,
0);
} else if (leaf.getName().startsWith("root.parentB")) {
assertEquals(0, (double) leaf.getFairShare().getMemorySize() / nodeCapacity,
0);
}
}
verifySteadyFairShareMemory(leafQueues, nodeCapacity);
}
@Test
public void testFairShareOneAppRunning() throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
// Run a app in a childA1. Verify whether fair share is 100% in childA1,
// since it is the only active queue.
// Also verify if fair share is 0 for childA2. since no app is
// running in it.
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
scheduler.update();
assertEquals(
100,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA1", false).getFairShare()
.getMemorySize() / nodeCapacity * 100, 0.1);
assertEquals(
0,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA2", false).getFairShare()
.getMemorySize() / nodeCapacity, 0.1);
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
nodeCapacity);
}
@Test
public void testFairShareMultipleActiveQueuesUnderSameParent()
throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
// Run apps in childA1,childA2,childA3
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
createSchedulingRequest(2 * 1024, "root.parentA.childA2", "user2");
createSchedulingRequest(2 * 1024, "root.parentA.childA3", "user3");
scheduler.update();
// Verify if fair share is 100 / 3 = 33%
for (int i = 1; i <= 3; i++) {
assertEquals(
33,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getMemorySize()
/ nodeCapacity * 100, .9);
}
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
nodeCapacity);
}
@Test
public void testFairShareMultipleActiveQueuesUnderDifferentParent()
throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
// Run apps in childA1,childA2 which are under parentA
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2");
// Run app in childB1 which is under parentB
createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3");
// Run app in root.default queue
createSchedulingRequest(1 * 1024, "root.default", "user4");
scheduler.update();
// The two active child queues under parentA would
// get fair share of 80/2=40%
for (int i = 1; i <= 2; i++) {
assertEquals(
40,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getMemorySize()
/ nodeCapacity * 100, .9);
}
// The child queue under parentB would get a fair share of 10%,
// basically all of parentB's fair share
assertEquals(
10,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentB.childB1", false).getFairShare()
.getMemorySize()
/ nodeCapacity * 100, .9);
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
nodeCapacity);
}
@Test
public void testFairShareResetsToZeroWhenAppsComplete() throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
// Run apps in childA1,childA2 which are under parentA
ApplicationAttemptId app1 = createSchedulingRequest(2 * 1024,
"root.parentA.childA1", "user1");
ApplicationAttemptId app2 = createSchedulingRequest(3 * 1024,
"root.parentA.childA2", "user2");
scheduler.update();
// Verify if both the active queues under parentA get 50% fair
// share
for (int i = 1; i <= 2; i++) {
assertEquals(
50,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getMemorySize()
/ nodeCapacity * 100, .9);
}
// Let app under childA1 complete. This should cause the fair share
// of queue childA1 to be reset to zero,since the queue has no apps running.
// Queue childA2's fair share would increase to 100% since its the only
// active queue.
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
app1, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent1);
scheduler.update();
assertEquals(
0,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA1", false).getFairShare()
.getMemorySize()
/ nodeCapacity * 100, 0);
assertEquals(
100,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA2", false).getFairShare()
.getMemorySize()
/ nodeCapacity * 100, 0.1);
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
nodeCapacity);
}
@Test
public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent()
throws IOException {
int nodeMem = 16 * 1024;
int nodeVCores = 10;
createClusterWithQueuesAndOneNode(nodeMem, nodeVCores, "drf");
// Run apps in childA1,childA2 which are under parentA
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2");
// Run app in childB1 which is under parentB
createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3");
// Run app in root.default queue
createSchedulingRequest(1 * 1024, "root.default", "user4");
scheduler.update();
// The two active child queues under parentA would
// get 80/2=40% memory and vcores
for (int i = 1; i <= 2; i++) {
assertEquals(
40,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getMemorySize()
/ nodeMem * 100, .9);
assertEquals(
40,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getVirtualCores()
/ nodeVCores * 100, .9);
}
// The only active child queue under parentB would get 10% memory and vcores
assertEquals(
10,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentB.childB1", false).getFairShare()
.getMemorySize()
/ nodeMem * 100, .9);
assertEquals(
10,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentB.childB1", false).getFairShare()
.getVirtualCores()
/ nodeVCores * 100, .9);
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
.getLeafQueues();
for (FSLeafQueue leaf : leafQueues) {
if (leaf.getName().startsWith("root.parentA")) {
assertEquals(0.2,
(double) leaf.getSteadyFairShare().getMemorySize() / nodeMem, 0.001);
assertEquals(0.2,
(double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores,
0.001);
} else if (leaf.getName().startsWith("root.parentB")) {
assertEquals(0.05,
(double) leaf.getSteadyFairShare().getMemorySize() / nodeMem, 0.001);
assertEquals(0.1,
(double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores,
0.001);
}
}
}
/**
* Verify whether steady fair shares for all leaf queues still follow
* their weight, not related to active/inactive status.
*
* @param leafQueues
* @param nodeCapacity
*/
private void verifySteadyFairShareMemory(Collection<FSLeafQueue> leafQueues,
int nodeCapacity) {
for (FSLeafQueue leaf : leafQueues) {
if (leaf.getName().startsWith("root.parentA")) {
assertEquals(0.2,
(double) leaf.getSteadyFairShare().getMemorySize() / nodeCapacity,
0.001);
} else if (leaf.getName().startsWith("root.parentB")) {
assertEquals(0.05,
(double) leaf.getSteadyFairShare().getMemorySize() / nodeCapacity,
0.001);
}
}
}
}