blob: a5b2d868d407346088c19665470a6285745d30f0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
/**
* Test class to verify identification of app starvation
*/
public class TestFSAppStarvation extends FairSchedulerTestBase {
private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
// Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
private static final int NODE_CAPACITY_MULTIPLE = 4;
private static final String[] QUEUES =
{"no-preemption", "minshare", "fairshare.child", "drf.child"};
private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread;
@Before
public void setup() {
createConfiguration();
conf.set(YarnConfiguration.RM_SCHEDULER,
FairSchedulerWithMockPreemption.class.getCanonicalName());
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
ALLOC_FILE.getAbsolutePath());
conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
}
@After
public void teardown() {
ALLOC_FILE.delete();
conf = null;
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
}
/*
* Test to verify application starvation is computed only when preemption
* is enabled.
*/
@Test
public void testPreemptionDisabled() throws Exception {
conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
setupClusterAndSubmitJobs();
assertNull("Found starved apps even when preemption is turned off",
scheduler.getContext().getStarvedApps());
}
/*
* Test to verify application starvation is computed correctly when
* preemption is turned on.
*/
@Test
public void testPreemptionEnabled() throws Exception {
setupClusterAndSubmitJobs();
assertNotNull("FSContext does not have an FSStarvedApps instance",
scheduler.getContext().getStarvedApps());
assertEquals("Expecting 3 starved applications, one each for the "
+ "minshare and fairshare queues",
3, preemptionThread.uniqueAppsAdded());
// Verify the apps get added again on a subsequent update
scheduler.update();
Thread.yield();
verifyLeafQueueStarvation();
assertTrue("Each app is marked as starved exactly once",
preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
}
/*
* Test to verify app starvation is computed only when the cluster
* utilization threshold is over the preemption threshold.
*/
@Test
public void testClusterUtilizationThreshold() throws Exception {
// Set preemption threshold to 1.1, so the utilization is always lower
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f);
setupClusterAndSubmitJobs();
assertNotNull("FSContext does not have an FSStarvedApps instance",
scheduler.getContext().getStarvedApps());
assertEquals("Found starved apps when preemption threshold is over 100%", 0,
preemptionThread.totalAppsAdded());
}
private void verifyLeafQueueStarvation() {
for (String q : QUEUES) {
if (!q.equals("no-preemption")) {
boolean isStarved =
scheduler.getQueueManager().getLeafQueue(q, false).isStarved();
assertTrue(isStarved);
}
}
}
private void setupClusterAndSubmitJobs() throws Exception {
setupStarvedCluster();
submitAppsToEachLeafQueue();
sendEnoughNodeUpdatesToAssignFully();
// Sleep to hit the preemption timeouts
Thread.sleep(10);
// Scheduler update to populate starved apps
scheduler.update();
// Wait for apps to be processed by MockPreemptionThread
Thread.yield();
}
/**
* Setup the cluster for starvation testing:
* 1. Create FS allocation file
* 2. Create and start MockRM
* 3. Add two nodes to the cluster
* 4. Submit an app that uses up all resources on the cluster
*/
private void setupStarvedCluster() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Default queue
out.println("<queue name=\"default\">");
out.println("</queue>");
// Queue with preemption disabled
out.println("<queue name=\"no-preemption\">");
out.println("<fairSharePreemptionThreshold>0" +
"</fairSharePreemptionThreshold>");
out.println("</queue>");
// Queue with minshare preemption enabled
out.println("<queue name=\"minshare\">");
out.println("<fairSharePreemptionThreshold>0" +
"</fairSharePreemptionThreshold>");
out.println("<minSharePreemptionTimeout>0" +
"</minSharePreemptionTimeout>");
out.println("<minResources>2048mb,2vcores</minResources>");
out.println("</queue>");
// FAIR queue with fairshare preemption enabled
out.println("<queue name=\"fairshare\">");
out.println("<fairSharePreemptionThreshold>1" +
"</fairSharePreemptionThreshold>");
out.println("<fairSharePreemptionTimeout>0" +
"</fairSharePreemptionTimeout>");
out.println("<schedulingPolicy>fair</schedulingPolicy>");
addChildQueue(out);
out.println("</queue>");
// DRF queue with fairshare preemption enabled
out.println("<queue name=\"drf\">");
out.println("<fairSharePreemptionThreshold>1" +
"</fairSharePreemptionThreshold>");
out.println("<fairSharePreemptionTimeout>0" +
"</fairSharePreemptionTimeout>");
out.println("<schedulingPolicy>drf</schedulingPolicy>");
addChildQueue(out);
out.println("</queue>");
out.println("</allocations>");
out.close();
assertTrue("Allocation file does not exist, not running the test",
ALLOC_FILE.exists());
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
scheduler.preemptionThread;
// Create and add two nodes to the cluster
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
// Create an app that takes up all the resources on the cluster
ApplicationAttemptId app
= createSchedulingRequest(1024, 1, "root.default", "default", 8);
scheduler.update();
sendEnoughNodeUpdatesToAssignFully();
assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
}
private void addChildQueue(PrintWriter out) {
// Child queue under fairshare with same settings
out.println("<queue name=\"child\">");
out.println("<fairSharePreemptionThreshold>1" +
"</fairSharePreemptionThreshold>");
out.println("<fairSharePreemptionTimeout>0" +
"</fairSharePreemptionTimeout>");
out.println("</queue>");
}
private void submitAppsToEachLeafQueue() {
for (String queue : QUEUES) {
createSchedulingRequest(1024, 1, "root." + queue, "user", 1);
}
scheduler.update();
}
private void sendEnoughNodeUpdatesToAssignFully() {
for (RMNode node : rmNodes) {
NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
new NodeUpdateSchedulerEvent(node);
for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
scheduler.handle(nodeUpdateSchedulerEvent);
}
}
}
}