blob: 6c5aa670d4418c95b880bd30a6398365d905bc58 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Test class for IntraQueuePreemption scenarios.
*/
public class TestProportionalCapacityPreemptionPolicyIntraQueue
extends
ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
super.setup();
conf.setBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
}
@Test
public void testSimpleIntraQueuePreemption() throws IOException {
/**
* The simplest test preemption, Queue structure is:
*
* <pre>
* root
* / | | \
* a b c d
* </pre>
*
* Guaranteed resource of a/b/c/d are 11:40:20:29 Total cluster resource =
* 100
* Scenario:
* Queue B has few running apps and two high priority apps have demand.
* Apps which are running at low priority (4) will preempt few of its
* resources to meet the demand.
*/
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 79 120 0]);" + // root
"-a(=[11 100 11 50 0]);" + // a
"-b(=[40 100 38 60 0]);" + // b
"-c(=[20 100 10 10 0]);" + // c
"-d(=[29 100 20 0 0])"; // d
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,
// pending)
"a\t" // app1 in a
+ "(1,1,n1,,6,false,25);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,34,false,20);" + // app3 b
"b\t" // app4 in b
+ "(4,1,n1,,2,false,10);" + // app4 b
"b\t" // app4 in b
+ "(5,1,n1,,1,false,10);" + // app5 b
"b\t" // app4 in b
+ "(6,1,n1,,1,false,10);" + // app6 in b
"c\t" // app1 in a
+ "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+ "(1,1,n1,,20,false,0)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue B, app3 and app4 were of lower priority. Hence take 8
// containers from them by hitting the intraQueuePreemptionDemand of 20%.
verify(mDisp, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(7)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testNoIntraQueuePreemptionWithPreemptionDisabledOnQueues()
throws IOException {
/**
* This test has the same configuration as testSimpleIntraQueuePreemption
* except that preemption is disabled specifically for each queue. The
* purpose is to test that disabling preemption on a specific queue will
* avoid intra-queue preemption.
*/
conf.setPreemptionDisabled("root.a", true);
conf.setPreemptionDisabled("root.b", true);
conf.setPreemptionDisabled("root.c", true);
conf.setPreemptionDisabled("root.d", true);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 80 120 0]);" + // root
"-a(=[11 100 11 50 0]);" + // a
"-b(=[40 100 38 60 0]);" + // b
"-c(=[20 100 10 10 0]);" + // c
"-d(=[29 100 20 0 0])"; // d
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,
// pending)
"a\t" // app1 in a
+ "(1,1,n1,,6,false,25);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,34,false,20);" + // app3 b
"b\t" // app4 in b
+ "(4,1,n1,,2,false,10);" + // app4 b
"b\t" // app4 in b
+ "(5,1,n1,,1,false,10);" + // app5 b
"b\t" // app4 in b
+ "(6,1,n1,,1,false,10);" + // app6 in b
"c\t" // app1 in a
+ "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+ "(1,1,n1,,20,false,0)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testNoPreemptionForSamePriorityApps() throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / | | \
* a b c d
* </pre>
*
* Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
* 100
* Scenario: In queue A/B, all apps are running at same priority. However
* there are many demands also from these apps. Since all apps are at same
* priority, preemption should not occur here.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 80 120 0]);" + // root
"-a(=[10 100 10 50 0]);" + // a
"-b(=[40 100 40 60 0]);" + // b
"-c(=[20 100 10 10 0]);" + // c
"-d(=[30 100 20 0 0])"; // d
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,
// pending)
"a\t" // app1 in a
+ "(1,1,n1,,6,false,25);" + // app1 a
"a\t" // app2 in a
+ "(1,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(1,1,n1,,34,false,20);" + // app3 b
"b\t" // app4 in b
+ "(1,1,n1,,2,false,10);" + // app4 b
"b\t" // app4 in b
+ "(1,1,n1,,1,false,20);" + // app5 b
"b\t" // app4 in b
+ "(1,1,n1,,1,false,10);" + // app6 in b
"c\t" // app1 in a
+ "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+ "(1,1,n1,,20,false,0)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue B, none of the apps should be preempted.
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(6))));
}
@Test
public void testNoPreemptionWhenQueueIsUnderCapacityLimit()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100 BY
* default, this limit is 50%. Test to verify that there wont be any
* preemption since used capacity is under 50% for queue a/b even though
* there are demands from high priority apps in queue.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 35 80 0]);" + // root
"-a(=[40 100 10 50 0]);" + // a
"-b(=[60 100 25 30 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,40,false,20);" + // app3 b
"b\t" // app1 in a
+ "(6,1,n1,,5,false,20)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue A/B, none of the apps should be preempted as used capacity
// is under 50%.
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@Test
public void testLimitPreemptionWithMaxIntraQueuePreemptableLimit()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* maxIntraQueuePreemptableLimit by default is 50%. This test is to verify
* that the maximum preemption should occur upto 50%, eventhough demand is
* more.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 55 170 0]);" + // root
"-a(=[40 100 10 50 0]);" + // a
"-b(=[60 100 45 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,40,false,20);" + // app3 b
"b\t" // app1 in a
+ "(6,1,n1,,5,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queueB, eventhough app4 needs 100 resources, only 30 resources were
// preempted. (max is 50% of guaranteed cap of any queue
// "maxIntraQueuePreemptable")
verify(mDisp, times(30)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testLimitPreemptionWithTotalPreemptedResourceAllowed()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* totalPreemption allowed is 10%. This test is to verify that only
* 10% is preempted.
*/
// report "ideal" preempt as 10%. Ensure preemption happens only for 10%
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
(float) 0.1);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 55 170 0]);" + // root
"-a(=[40 100 10 50 0]);" + // a
"-b(=[60 100 45 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,40,false,20);" + // app3 b
"b\t" // app1 in a
+ "(6,1,n1,,5,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// For queue B eventhough app4 needs 100 resources, only 10 resources were
// preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND.
verify(mDisp, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testAlreadySelectedContainerFromInterQueuePreemption()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* QueueB is under utilized and QueueA has to release 9 containers here.
* However within queue A, high priority app has also a demand for 20.
* So additional 11 more containers will be preempted making a tota of 20.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 95 170 0]);" + // root
"-a(=[60 100 70 35 0]);" + // a
"-b(=[40 100 25 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,50,false,15);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,20,false,20);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,20,false,20);" + // app3 b
"b\t" // app1 in a
+ "(4,1,n1,,5,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// As per intra queue preemption algorithm, 20 more containers were needed
// for app2 (in queue a). Inter queue pre-emption had already preselected 9
// containers and hence preempted only 11 more.
verify(mDisp, times(20)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testSkipAMContainersInInterQueuePreemption() throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 60:40 Total cluster resource = 100
* While preempting containers during intra-queue preemption, AM containers
* will be spared for now. Verify the same.
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 170 0]);" + // root
"-a(=[60 100 60 50 0]);" + // a
"-b(=[40 100 40 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,30,false,10);" + "a\t" // app2 in a
+ "(1,1,n1,,10,false,20);" + "a\t" // app3 in a
+ "(2,1,n1,,20,false,20);" + "b\t" // app4 in b
+ "(4,1,n1,,20,false,20);" + "b\t" // app5 in a
+ "(4,1,n1,,20,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Ensure that only 9 containers are preempted from app2 (sparing 1 AM)
verify(mDisp, times(11)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testSkipAMContainersInInterQueuePreemptionSingleApp()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 50:50 Total cluster resource = 100
* Spare Am container from a lower priority app during its preemption
* cycle. Eventhough there are more demand and no other low priority
* apps are present, still AM contaier need to soared.
*/
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 170 0]);" + // root
"-a(=[50 100 50 50 0]);" + // a
"-b(=[50 100 50 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,10,false,10);" + "a\t" // app1 in a
+ "(2,1,n1,,40,false,10);" + "b\t" // app2 in a
+ "(4,1,n1,,20,false,20);" + "b\t" // app3 in b
+ "(4,1,n1,,30,false,100)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Make sure that app1's Am container is spared. Only 9/10 is preempted.
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testNoPreemptionForSingleApp() throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 60:40 Total cluster resource = 100
* Only one app is running in queue. And it has more demand but no
* resource are available in queue. Preemption must not occur here.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 20 50 0]);" + // root
"-a(=[60 100 20 50 0]);" + // a
"-b(=[40 100 0 0 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(4,1,n1,,20,false,50)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Ensure there are 0 preemptions since only one app is running in queue.
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@Test
public void testOverutilizedQueueResourceWithInterQueuePreemption()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
* Scenario:
* Guaranteed resource of a/b are 20:80 Total cluster resource = 100
* QueueB is under utilized and 20 resource will be released from queueA.
* 10 containers will also selected for intra-queue too but it will be
* pre-selected.
*/
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 100 70 0]);" + // root
"-a(=[20 100 100 30 0]);" + // a
"-b(=[80 100 0 20 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,50,false,0);" + "a\t" // app1 in a
+ "(3,1,n1,,50,false,30);" + "b\t" // app2 in a
+ "(4,1,n1,,0,false,20)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Complete demand request from QueueB for 20 resource must be preempted.
verify(mDisp, times(20)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testNodePartitionIntraQueuePreemption() throws IOException {
/**
* The simplest test of node label, Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Both a/b can access x, and guaranteed capacity of them is 50:50. Two
* nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
* app1/app2/app3 in a, and app4/app5 in b. app1 uses 50 x, app2 uses 50
* NO_LABEL, app3 uses 50 x, app4 uses 50 NO_LABEL. a has 20 pending
* resource for x for app3 of priority 2
*
* After preemption, it should preempt 20 from app1
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;" + // default partition
"x=100,true"; // partition=x
String nodesConfig = "n1=x;" + // n1 has partition=x
"n2="; // n2 is default partition
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100],x=[100 100 100 100]);" + // root
"-a(=[50 100 50 50],x=[50 100 50 50]);" + // a
"-b(=[50 100 50 50],x=[50 100 50 50])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t" // app1 in a
+ "(1,1,n1,x,50,false,10);" + // 50 * x in n1
"a\t" // app2 in a
+ "(2,1,n1,x,0,false,20);" + // 0 * x in n1
"a\t" // app2 in a
+ "(1,1,n2,,50,false);" + // 50 default in n2
"b\t" // app3 in b
+ "(1,1,n1,x,50,false);" + // 50 * x in n1
"b\t" // app4 in b
+ "(1,1,n2,,50,false)"; // 50 default in n2
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// 20 preempted from app1
verify(mDisp, times(20))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
verify(mDisp, never())
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
verify(mDisp, never())
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
}
@Test
public void testComplexIntraQueuePreemption() throws IOException {
/**
* The complex test preemption, Queue structure is:
*
* <pre>
* root
* / | | \
* a b c d
* </pre>
*
* Scenario:
* Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
* 100
* All queues under its capacity, but within each queue there are many
* under served applications.
*/
// report "ideal" preempt as 50%. Ensure preemption happens only for 50%
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
(float) 0.5);
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 75 130 0]);" + // root
"-a(=[10 100 5 50 0]);" + // a
"-b(=[40 100 35 60 0]);" + // b
"-c(=[20 100 10 10 0]);" + // c
"-d(=[30 100 25 10 0])"; // d
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,
// pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t"
+ "(4,1,n1,,0,false,25);" + // app2 a
"a\t"
+ "(5,1,n1,,0,false,2);" + // app3 a
"b\t"
+ "(3,1,n1,,5,false,20);" + // app4 b
"b\t"
+ "(4,1,n1,,15,false,10);" + // app5 b
"b\t"
+ "(4,1,n1,,10,false,10);" + // app6 b
"b\t"
+ "(5,1,n1,,3,false,5);" + // app7 b
"b\t"
+ "(5,1,n1,,0,false,2);" + // app8 b
"b\t"
+ "(6,1,n1,,2,false,10);" + // app9 in b
"c\t"
+ "(1,1,n1,,8,false,10);" + // app10 in c
"c\t"
+ "(1,1,n1,,2,false,5);" + // app11 in c
"c\t"
+ "(2,1,n1,,0,false,3);" + "d\t" // app12 in c
+ "(2,1,n1,,25,false,0);" + "d\t" // app13 in d
+ "(1,1,n1,,0,false,20)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// High priority app in queueA has 30 resource demand. But low priority
// app has only 5 resource. Hence preempt 4 here sparing AM.
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
// Multiple high priority apps has demand of 17. This will be preempted
// from another set of low priority apps.
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(6))));
verify(mDisp, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
// Only 3 resources will be freed in this round for queue C as we
// are trying to save AM container.
verify(mDisp, times(2)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(10))));
verify(mDisp, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(11))));
}
@Test
public void testIntraQueuePreemptionWithTwoUsers()
throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Guaranteed resource of a/b are 40:60 Total cluster resource = 100
* Consider 2 users in a queue, assume minimum user limit factor is 50%.
* Hence in queueB of 40, each use has a quota of 20. app4 of high priority
* has a demand of 30 and its already using 5. Adhering to userlimit only
* 15 more must be preempted. If its same user,20 would have been preempted
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 55 170 0]);" + // root
"-a(=[60 100 10 50 0]);" + // a
"-b(=[40 100 40 120 0])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
"a\t" // app1 in a
+ "(1,1,n1,,5,false,25);" + // app1 a
"a\t" // app2 in a
+ "(2,1,n1,,5,false,25);" + // app2 a
"b\t" // app3 in b
+ "(4,1,n1,,35,false,20,user1);" + // app3 b
"b\t" // app4 in b
+ "(6,1,n1,,5,false,30,user2)";
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Considering user-limit of 50% since only 2 users are there, only preempt
// 14 more (5 is already running) eventhough demand is for 30. Ideally we
// must preempt 15. But 15th container will bring user1's usage to 20 which
// is same as user-limit. Hence skip 15th container.
verify(mDisp, times(14)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testComplexNodePartitionIntraQueuePreemption()
throws IOException {
/**
* The simplest test of node label, Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario:
* Both a/b can access x, and guaranteed capacity of them is 50:50. Two
* nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
* app1-app4 in a, and app5-app9 in b.
*
*/
// Set max preemption limit as 50%.
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100,true;" + // default partition
"x=100,true"; // partition=x
String nodesConfig = "n1=x;" + // n1 has partition=x
"n2="; // n2 is default partition
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100],x=[100 100 100 100]);" + // root
"-a(=[50 100 50 50],x=[50 100 40 50]);" + // a
"-b(=[50 100 35 50],x=[50 100 50 50])"; // b
String appsConfig =
// queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t" // app1 in a
+ "(1,1,n1,x,35,false,10);" + // 20 * x in n1
"a\t" // app2 in a
+ "(1,1,n1,x,5,false,10);" + // 20 * x in n1
"a\t" // app3 in a
+ "(2,1,n1,x,0,false,20);" + // 0 * x in n1
"a\t" // app4 in a
+ "(1,1,n2,,50,false);" + // 50 default in n2
"b\t" // app5 in b
+ "(1,1,n1,x,50,false);" + // 50 * x in n1
"b\t" // app6 in b
+ "(1,1,n2,,25,false);" + // 25 * default in n2
"b\t" // app7 in b
+ "(1,1,n2,,3,false);" + // 3 * default in n2
"b\t" // app8 in b
+ "(1,1,n2,,2,false);" + // 2 * default in n2
"b\t" // app9 in b
+ "(5,1,n2,,5,false,30)"; // 50 default in n2
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Label X: app3 has demand of 20 for label X. Hence app2 will loose
// 4 (sparing AM) and 16 more from app1 till preemption limit is met.
verify(mDisp, times(16))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
verify(mDisp, times(4))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
// Default Label:For a demand of 30, preempt from all low priority
// apps of default label. 25 will be preempted as preemption limit is
// met.
verify(mDisp, times(1))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8))));
verify(mDisp, times(2))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7))));
verify(mDisp, times(22))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
}
}