blob: 505614de2be072cc2bd3452ed10fadada6da749b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Random;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestProportionalCapacityPreemptionPolicy {
static final long TS = 3141592653L;
int appAlloc = 0;
boolean setAMContainer = false;
boolean setLabeledContainer = false;
float setAMResourcePercent = 0.0f;
Random rand = null;
Clock mClock = null;
CapacitySchedulerConfiguration conf = null;
CapacityScheduler mCS = null;
RMContext rmContext = null;
RMNodeLabelsManager lm = null;
EventHandler<SchedulerEvent> mDisp = null;
ResourceCalculator rc = new DefaultResourceCalculator();
Resource clusterResources = null;
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 0), 0);
final ApplicationAttemptId appB = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 1), 0);
final ApplicationAttemptId appC = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 2), 0);
final ApplicationAttemptId appD = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 3), 0);
final ApplicationAttemptId appE = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 4), 0);
final ApplicationAttemptId appF = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 4), 0);
final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
ArgumentCaptor.forClass(ContainerPreemptEvent.class);
public enum priority {
AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2);
int value;
priority(int value) {
this.value = value;
}
public int getValue() {
return this.value;
}
};
@Rule public TestName name = new TestName();
@Before
@SuppressWarnings("unchecked")
public void setup() {
conf = new CapacitySchedulerConfiguration(new Configuration(false));
conf.setLong(
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
3000);
// report "ideal" preempt
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
1.0f);
conf.setFloat(
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
1.0f);
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
// FairScheduler doesn't support this test,
// Set CapacityScheduler as the scheduler for this test.
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
mClock = mock(Clock.class);
mCS = mock(CapacityScheduler.class);
when(mCS.getResourceCalculator()).thenReturn(rc);
lm = mock(RMNodeLabelsManager.class);
try {
when(lm.isExclusiveNodeLabel(anyString())).thenReturn(true);
} catch (IOException e) {
// do nothing
}
when(mCS.getConfiguration()).thenReturn(conf);
rmContext = mock(RMContext.class);
when(mCS.getRMContext()).thenReturn(rmContext);
when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
when(rmContext.getNodeLabelManager()).thenReturn(lm);
mDisp = mock(EventHandler.class);
Dispatcher disp = mock(Dispatcher.class);
when(rmContext.getDispatcher()).thenReturn(disp);
when(disp.getEventHandler()).thenReturn(mDisp);
rand = new Random();
long seed = rand.nextLong();
System.out.println(name.getMethodName() + " SEED: " + seed);
rand.setSeed(seed);
appAlloc = 0;
}
private static final int[][] Q_DATA_FOR_IGNORE = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 0, 0, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
@Test
public void testIgnore() {
ProportionalCapacityPreemptionPolicy policy =
buildPolicy(Q_DATA_FOR_IGNORE);
policy.editSchedule();
// don't correct imbalances without demand
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testProportionalPreemption() {
int[][] qData = new int[][]{
// / A B C D
{ 100, 10, 40, 20, 30 }, // abs
{ 100, 100, 100, 100, 100 }, // maxCap
{ 100, 30, 60, 10, 0 }, // used
{ 45, 20, 5, 20, 0 }, // pending
{ 0, 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1, 0 }, // apps
{ -1, 1, 1, 1, 1 }, // req granularity
{ 4, 0, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// A will preempt guaranteed-allocated.
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testMaxCap() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 45, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// despite the imbalance, since B is at maxCap, do not correct
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testPreemptCycle() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// ensure all pending rsrc from A get preempted from other queues
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testExpireKill() {
final long killTime = 10000L;
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setLong(
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
killTime);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
// ensure all pending rsrc from A get preempted from other queues
when(mClock.getTime()).thenReturn(0L);
policy.editSchedule();
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
// requests reiterated
when(mClock.getTime()).thenReturn(killTime / 2);
policy.editSchedule();
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
// kill req sent
when(mClock.getTime()).thenReturn(killTime + 1);
policy.editSchedule();
verify(mDisp, times(20)).handle(evtCaptor.capture());
List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
for (ContainerPreemptEvent e : events.subList(20, 20)) {
assertEquals(appC, e.getAppId());
assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType());
}
}
@Test
public void testDeadzone() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 39, 43, 21 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setFloat(
CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY,
(float) 0.1);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// ignore 10% overcapacity to avoid jitter
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testPerQueueDisablePreemption() {
int[][] qData = new int[][]{
// / A B C
{ 100, 55, 25, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 54, 46 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
// appA appB appC
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setPreemptionDisabled("root.queueB", true);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// Since queueB is not preemptable, get resources from queueC
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
// Since queueB is preemptable, resources will be preempted
// from both queueB and queueC. Test must be reset so that the mDisp
// event handler will count only events from the following test and not the
// previous one.
setup();
conf.setPreemptionDisabled("root.queueB", false);
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
policy2.editSchedule();
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testPerQueueDisablePreemptionHierarchical() {
int[][] qData = new int[][] {
// / A D
// B C E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from queueB (appA), not queueE (appC) despite
// queueE being far over its absolute capacity because queueA (queueB's
// parent) is over capacity and queueD (queueE's parent) is not.
ApplicationAttemptId expectedAttemptOnQueueB =
ApplicationAttemptId.newInstance(
appA.getApplicationId(), appA.getAttemptId());
assertTrue("appA should be running on queueB",
mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
// Need to call setup() again to reset mDisp
setup();
// Turn off preemption for queueB and it's children
conf.setPreemptionDisabled("root.queueA.queueB", true);
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
policy2.editSchedule();
ApplicationAttemptId expectedAttemptOnQueueC =
ApplicationAttemptId.newInstance(
appB.getApplicationId(), appB.getAttemptId());
ApplicationAttemptId expectedAttemptOnQueueE =
ApplicationAttemptId.newInstance(
appC.getApplicationId(), appC.getAttemptId());
// Now, all of queueB's (appA) over capacity is not preemptable, so neither
// is queueA's. Verify that capacity is taken from queueE (appC).
assertTrue("appB should be running on queueC",
mCS.getAppsInQueue("queueC").contains(expectedAttemptOnQueueC));
assertTrue("appC should be running on queueE",
mCS.getAppsInQueue("queueE").contains(expectedAttemptOnQueueE));
// Resources should have come from queueE (appC) and neither of queueA's
// children.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testPerQueueDisablePreemptionBroadHierarchical() {
int[][] qData = new int[][] {
// / A D G
// B C E F H I
{1000, 350, 150, 200, 400, 200, 200, 250, 100, 150 }, // abs
{1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
{1000, 400, 200, 200, 400, 250, 150, 200, 150, 50 }, // used
{ 50, 0, 0, 0, 50, 0, 50, 0, 0, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE appF
{ 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// queueF(appD) wants resources, Verify that resources come from queueE(appC)
// because it's a sibling and queueB(appA) because queueA is over capacity.
verify(mDisp, times(28)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(22)).handle(argThat(new IsPreemptionRequestFor(appC)));
// Need to call setup() again to reset mDisp
setup();
// Turn off preemption for queueB(appA)
conf.setPreemptionDisabled("root.queueA.queueB", true);
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
policy2.editSchedule();
// Now that queueB(appA) is not preemptable, verify that resources come
// from queueE(appC)
verify(mDisp, times(50)).handle(argThat(new IsPreemptionRequestFor(appC)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
setup();
// Turn off preemption for two of the 3 queues with over-capacity.
conf.setPreemptionDisabled("root.queueD.queueE", true);
conf.setPreemptionDisabled("root.queueA.queueB", true);
ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
policy3.editSchedule();
// Verify that the request was starved out even though queueH(appE) is
// over capacity. This is because queueG (queueH's parent) is NOT
// overcapacity.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueB
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
}
@Test
public void testPerQueueDisablePreemptionInheritParent() {
int[][] qData = new int[][] {
// / A E
// B C D F G H
{1000, 500, 200, 200, 100, 500, 200, 200, 100 }, // abs (guar)
{1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
{1000, 700, 0, 350, 350, 300, 0, 200, 100 }, // used
{ 200, 0, 0, 0, 0, 200, 200, 0, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE
{ 5, 2, 0, 1, 1, 3, 1, 1, 1 }, // apps
{ -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granulrity
{ 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// With all queues preemptable, resources should be taken from queueC(appA)
// and queueD(appB). Resources taken more from queueD(appB) than
// queueC(appA) because it's over its capacity by a larger percentage.
verify(mDisp, times(17)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(183)).handle(argThat(new IsPreemptionRequestFor(appB)));
// Turn off preemption for queueA and it's children. queueF(appC)'s request
// should starve.
setup(); // Call setup() to reset mDisp
conf.setPreemptionDisabled("root.queueA", true);
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
policy2.editSchedule();
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueD
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueG
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
}
@Test
public void testPerQueuePreemptionNotAllUntouchable() {
int[][] qData = new int[][] {
// / A E
// B C D F G H
{ 2000, 1000, 800, 100, 100, 1000, 500, 300, 200 }, // abs
{ 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000 }, // maxCap
{ 2000, 1300, 300, 800, 200, 700, 500, 0, 200 }, // used
{ 300, 0, 0, 0, 0, 300, 0, 300, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE appF
{ 6, 3, 1, 1, 1, 3, 1, 1, 1 }, // apps
{ -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity
{ 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
};
conf.setPreemptionDisabled("root.queueA.queueC", true);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// Although queueC(appB) is way over capacity and is untouchable,
// queueD(appC) is preemptable. Request should be filled from queueD(appC).
verify(mDisp, times(100)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testPerQueueDisablePreemptionRootDisablesAll() {
int[][] qData = new int[][] {
// / A D G
// B C E F H I
{1000, 500, 250, 250, 250, 100, 150, 250, 100, 150 }, // abs
{1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
{1000, 20, 0, 20, 490, 240, 250, 490, 240, 250 }, // used
{ 200, 200, 200, 0, 0, 0, 0, 0, 0, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE appF
{ 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
conf.setPreemptionDisabled("root", true);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// All queues should be non-preemptable, so request should starve.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueB
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
}
@Test
public void testPerQueueDisablePreemptionOverAbsMaxCapacity() {
int[][] qData = new int[][] {
// / A D
// B C E F
{1000, 725, 360, 365, 275, 17, 258 }, // absCap
{1000,1000,1000,1000, 550, 109,1000 }, // absMaxCap
{1000, 741, 396, 345, 259, 110, 149 }, // used
{ 40, 20, 0, 20, 20, 20, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granulrity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
// QueueE inherits non-preemption from QueueD
conf.setPreemptionDisabled("root.queueD", true);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// appC is running on QueueE. QueueE is over absMaxCap, but is not
// preemptable. Therefore, appC resources should not be preempted.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testOverCapacityImbalance() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// Will not preempt for over capacity queues
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testNaturalTermination() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setFloat(
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
(float) 0.1);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// ignore 10% imbalance between over-capacity queues
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testObserveOnly() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 90, 10, 0 }, // used
{ 80, 10, 20, 50 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setBoolean(CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
true);
when(mCS.getConfiguration()).thenReturn(
new CapacitySchedulerConfiguration(conf));
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify even severe imbalance not affected
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testHierarchical() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testHierarchicalWithReserved() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 40, 25, 15, 10, 15, 15, 0 }, // reserved
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testZeroGuar() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 0, 99, 100, 10, 90 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 170, 80, 60, 20, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testZeroGuarOverCap() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 0, 100, 0, 100, 100 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 170, 170, 60, 20, 90, 0, 0 }, // used
{ 85, 50, 30, 10, 10, 20, 20 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 4, 3, 1, 1, 1, 1, 1 }, // apps
{ -1, -1, 1, 1, 1, -1, 1 }, // req granularity
{ 2, 3, 0, 0, 0, 1, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// No preemption should happen because zero guaranteed queues should be
// treated as always satisfied, they should not preempt from each other.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD)));
}
@Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
// / A D G
// B C E F H I
{ 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
{ 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap
{ 400, 210, 70, 140, 100, 50, 50, 90, 90, 0 }, // used
{ 15, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE appF
{ 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not H1 despite H1 being far over
// its absolute guaranteed capacity
// XXX note: compensating for rounding error in Resources.multiplyTo
// which is likely triggered since we use small numbers for readability
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appE)));
}
@Test
public void testContainerOrdering(){
List<RMContainer> containers = new ArrayList<RMContainer>();
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 10), 0);
// create a set of containers
RMContainer rm1 = mockContainer(appAttId, 5, mock(Resource.class), 3);
RMContainer rm2 = mockContainer(appAttId, 3, mock(Resource.class), 3);
RMContainer rm3 = mockContainer(appAttId, 2, mock(Resource.class), 2);
RMContainer rm4 = mockContainer(appAttId, 1, mock(Resource.class), 2);
RMContainer rm5 = mockContainer(appAttId, 4, mock(Resource.class), 1);
// insert them in non-sorted order
containers.add(rm3);
containers.add(rm2);
containers.add(rm1);
containers.add(rm5);
containers.add(rm4);
// sort them
FifoCandidatesSelector.sortContainers(containers);
// verify the "priority"-first, "reverse container-id"-second
// ordering is enforced correctly
assert containers.get(0).equals(rm1);
assert containers.get(1).equals(rm2);
assert containers.get(2).equals(rm3);
assert containers.get(3).equals(rm4);
assert containers.get(4).equals(rm5);
}
@Test
public void testPolicyInitializeAfterSchedulerInitialized() {
@SuppressWarnings("resource")
MockRM rm = new MockRM(conf);
rm.init(conf);
// ProportionalCapacityPreemptionPolicy should be initialized after
// CapacityScheduler initialized. We will
// 1) find SchedulingMonitor from RMActiveService's service list,
// 2) check if ResourceCalculator in policy is null or not.
// If it's not null, we can come to a conclusion that policy initialized
// after scheduler got initialized
for (Service service : rm.getRMActiveService().getServices()) {
if (service instanceof SchedulingMonitor) {
ProportionalCapacityPreemptionPolicy policy =
(ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
.getSchedulingEditPolicy();
assertNotNull(policy.getResourceCalculator());
return;
}
}
fail("Failed to find SchedulingMonitor service, please check what happened");
}
@Test
public void testSkipAMContainer() {
int[][] qData = new int[][] {
// / A B
{ 100, 50, 50 }, // abs
{ 100, 100, 100 }, // maxcap
{ 100, 100, 0 }, // used
{ 70, 20, 50 }, // pending
{ 0, 0, 0 }, // reserved
{ 5, 4, 1 }, // apps
{ -1, 1, 1 }, // req granularity
{ 2, 0, 0 }, // subqueues
};
setAMContainer = true;
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// By skipping AM Container, all other 24 containers of appD will be
// preempted
verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD)));
// By skipping AM Container, all other 24 containers of appC will be
// preempted
verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC)));
// Since AM containers of appC and appD are saved, 2 containers from appB
// has to be preempted.
verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
setAMContainer = false;
}
@Test
public void testPreemptSkippedAMContainers() {
int[][] qData = new int[][] {
// / A B
{ 100, 10, 90 }, // abs
{ 100, 100, 100 }, // maxcap
{ 100, 100, 0 }, // used
{ 70, 20, 90 }, // pending
{ 0, 0, 0 }, // reserved
{ 5, 4, 1 }, // apps
{ -1, 5, 5 }, // req granularity
{ 2, 0, 0 }, // subqueues
};
setAMContainer = true;
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// All 5 containers of appD will be preempted including AM container.
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
// All 5 containers of appC will be preempted including AM container.
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
// By skipping AM Container, all other 4 containers of appB will be
// preempted
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
// By skipping AM Container, all other 4 containers of appA will be
// preempted
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
setAMContainer = false;
}
@Test
public void testAMResourcePercentForSkippedAMContainers() {
int[][] qData = new int[][] {
// / A B
{ 100, 10, 90 }, // abs
{ 100, 100, 100 }, // maxcap
{ 100, 100, 0 }, // used
{ 70, 20, 90 }, // pending
{ 0, 0, 0 }, // reserved
{ 5, 4, 1 }, // apps
{ -1, 5, 5 }, // req granularity
{ 2, 0, 0 }, // subqueues
};
setAMContainer = true;
setAMResourcePercent = 0.5f;
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb.
// Total used AM container size is 20GB, hence 2 AM container has
// to be preempted as Queue Capacity is 10Gb.
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
// Including AM Container, all other 4 containers of appC will be
// preempted
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
// By skipping AM Container, all other 4 containers of appB will be
// preempted
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
// By skipping AM Container, all other 4 containers of appA will be
// preempted
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
setAMContainer = false;
}
@Test
public void testPreemptionWithVCoreResource() {
int[][] qData = new int[][]{
// / A B
{100, 100, 100}, // maxcap
{5, 1, 1}, // apps
{2, 0, 0}, // subqueues
};
// Resources can be set like memory:vcores
String[][] resData = new String[][]{
// / A B
{"100:100", "50:50", "50:50"}, // abs
{"10:100", "10:100", "0"}, // used
{"70:20", "70:20", "10:100"}, // pending
{"0", "0", "0"}, // reserved
{"-1", "1:10", "1:10"}, // req granularity
};
// Passing last param as TRUE to use DominantResourceCalculator
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData,
true);
policy.editSchedule();
// 5 containers will be preempted here
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testHierarchicalLarge3Levels() {
int[][] qData = new int[][] {
// / A F I
// B C G H J K
// D E
{ 400, 200, 60, 140, 100, 40, 100, 70, 30, 100, 10, 90 }, // abs
{ 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap
{ 400, 210, 60, 150, 100, 50, 100, 50, 50, 90, 10, 80 }, // used
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE appF appG
{ 7, 3, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 3, 2, 0, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// XXX note: compensating for rounding error in Resources.multiplyTo
// which is likely triggered since we use small numbers for readability
//run with Logger.getRootLogger().setLevel(Level.DEBUG);
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemorySize());
//2nd level child(E) preempts 10, but parent A has only 9 extra
//check the parent can prempt only the extra from > 2 level child
TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get(
"queueA").get("");
assertEquals(0, tempQueueAPartition.untouchableExtra.getMemorySize());
long extraForQueueA =
tempQueueAPartition.getUsed().getMemorySize() - tempQueueAPartition
.getGuaranteed().getMemorySize();
assertEquals(extraForQueueA,
tempQueueAPartition.preemptableExtra.getMemorySize());
}
@Test
public void testHierarchicalLarge3LevelsWithReserved() {
int[][] qData = new int[][] {
// / A F I
// B C G H J K
// D E
{ 400, 200, 60, 140, 100, 40, 100, 70, 30, 100, 10, 90 }, // abs
{ 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap
{ 400, 210, 60, 150, 100, 50, 100, 50, 50, 90, 10, 80 }, // used
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, // pending
{ 50, 30, 20, 10, 5, 5, 0, 0, 0, 10, 10, 0 }, // reserved
// appA appB appC appD appE appF appG
{ 7, 3, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 3, 2, 0, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
assertEquals(10, policy.getQueuePartitions().get("queueE")
.get("").preemptableExtra.getMemorySize());
//2nd level child(E) preempts 10, but parent A has only 9 extra
//check the parent can prempt only the extra from > 2 level child
TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get(
"queueA").get("");
assertEquals(0, tempQueueAPartition.untouchableExtra.getMemorySize());
long extraForQueueA =
tempQueueAPartition.getUsed().getMemorySize() - tempQueueAPartition
.getGuaranteed().getMemorySize();
assertEquals(extraForQueueA,
tempQueueAPartition.preemptableExtra.getMemorySize());
}
@Test
public void testPreemptionNotHappenForSingleReservedQueue() {
/*
* Test case to make sure, when reserved > pending, preemption will not
* happen if there's only one demanding queue.
*/
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 70, 0, 0 }, // used
{ 10, 30, 0, 0 }, // pending
{ 0, 50, 0, 0 }, // reserved
{ 1, 1, 0, 0 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// No preemption happens
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testRefreshPreemptionProperties() throws Exception {
ProportionalCapacityPreemptionPolicy policy =
buildPolicy(Q_DATA_FOR_IGNORE);
assertEquals(
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL,
policy.getMonitoringInterval());
assertEquals(
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY,
policy.isObserveOnly());
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
long newMonitoringInterval = 5000;
boolean newObserveOnly = true;
newConf.setLong(
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
newMonitoringInterval);
newConf.setBoolean(CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
newObserveOnly);
when(mCS.getConfiguration()).thenReturn(newConf);
policy.editSchedule();
assertEquals(newMonitoringInterval, policy.getMonitoringInterval());
assertEquals(newObserveOnly, policy.isObserveOnly());
}
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
private final SchedulerEventType type;
IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
this(appAttId, MARK_CONTAINER_FOR_PREEMPTION);
}
IsPreemptionRequestFor(ApplicationAttemptId appAttId,
SchedulerEventType type) {
this.appAttId = appAttId;
this.type = type;
}
@Override
public boolean matches(Object o) {
return appAttId.equals(((ContainerPreemptEvent)o).getAppId())
&& type.equals(((ContainerPreemptEvent)o).getType());
}
@Override
public String toString() {
return appAttId.toString();
}
}
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(
rmContext, mCS, mClock);
clusterResources = Resource.newInstance(
leafAbsCapacities(qData[0], qData[7]), 0);
ParentQueue mRoot = buildMockRootQueue(rand, qData);
when(mCS.getRootQueue()).thenReturn(mRoot);
setResourceAndNodeDetails();
return policy;
}
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
String[][] resData, boolean useDominantResourceCalculator) {
if (useDominantResourceCalculator) {
when(mCS.getResourceCalculator()).thenReturn(
new DominantResourceCalculator());
}
ProportionalCapacityPreemptionPolicy policy =
new ProportionalCapacityPreemptionPolicy(rmContext, mCS, mClock);
clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
qData[2]);
ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
when(mCS.getRootQueue()).thenReturn(mRoot);
setResourceAndNodeDetails();
return policy;
}
private void setResourceAndNodeDetails() {
when(mCS.getClusterResource()).thenReturn(clusterResources);
when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
clusterResources);
FiCaSchedulerNode mNode = mock(FiCaSchedulerNode.class);
when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
}
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
Resource[] abs = generateResourceList(queueData[0]);
Resource[] used = generateResourceList(queueData[2]);
Resource[] pending = generateResourceList(queueData[3]);
Resource[] reserved = generateResourceList(queueData[4]);
Resource[] gran = generateResourceList(queueData[6]);
int[] maxCap = queueData[1];
int[] apps = queueData[5];
int[] queues = queueData[7];
return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
}
ParentQueue buildMockRootQueue(Random r, String[][] resData,
int[]... queueData) {
Resource[] abs = parseResourceDetails(resData[0]);
Resource[] used = parseResourceDetails(resData[1]);
Resource[] pending = parseResourceDetails(resData[2]);
Resource[] reserved = parseResourceDetails(resData[3]);
Resource[] gran = parseResourceDetails(resData[4]);
int[] maxCap = queueData[0];
int[] apps = queueData[1];
int[] queues = queueData[2];
return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
}
Resource[] parseResourceDetails(String[] resData) {
List<Resource> resourceList = new ArrayList<Resource>();
for (int i = 0; i < resData.length; i++) {
String[] resource = resData[i].split(":");
if (resource.length == 1) {
resourceList.add(Resource.newInstance(Integer.parseInt(resource[0]), 0));
} else {
resourceList.add(Resource.newInstance(Integer.parseInt(resource[0]),
Integer.parseInt(resource[1])));
}
}
return resourceList.toArray(new Resource[resourceList.size()]);
}
Resource[] generateResourceList(int[] qData) {
List<Resource> resourceList = new ArrayList<Resource>();
for (int i = 0; i < qData.length; i++) {
resourceList.add(Resource.newInstance(qData[i], 0));
}
return resourceList.toArray(new Resource[resourceList.size()]);
}
ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran,
int[] queues) {
ResourceCalculator rc = mCS.getResourceCalculator();
Resource tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
ParentQueue root = mockParentQueue(null, queues[0], pqs);
ResourceUsage resUsage = new ResourceUsage();
resUsage.setUsed(used[0]);
resUsage.setReserved(reserved[0]);
when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
when(root.getAbsoluteUsedCapacity()).thenReturn(
Resources.divide(rc, tot, used[0], tot));
when(root.getAbsoluteCapacity()).thenReturn(
Resources.divide(rc, tot, abs[0], tot));
when(root.getAbsoluteMaximumCapacity()).thenReturn(
maxCap[0] / (float) tot.getMemorySize());
when(root.getQueueResourceUsage()).thenReturn(resUsage);
QueueCapacities rootQc = new QueueCapacities(true);
rootQc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[0], tot));
rootQc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[0], tot));
rootQc.setAbsoluteMaximumCapacity(maxCap[0] / (float) tot.getMemorySize());
when(root.getQueueCapacities()).thenReturn(rootQc);
when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
boolean preemptionDisabled = mockPreemptionStatus("root");
when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled);
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
final ParentQueue p = pqs.removeLast();
final String queueName = "queue" + ((char) ('A' + i - 1));
if (queues[i] > 0) {
q = mockParentQueue(p, queues[i], pqs);
ResourceUsage resUsagePerQueue = new ResourceUsage();
resUsagePerQueue.setUsed(used[i]);
resUsagePerQueue.setReserved(reserved[i]);
when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
} else {
q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
}
when(q.getParent()).thenReturn(p);
when(q.getQueueName()).thenReturn(queueName);
when(q.getAbsoluteUsedCapacity()).thenReturn(
Resources.divide(rc, tot, used[i], tot));
when(q.getAbsoluteCapacity()).thenReturn(
Resources.divide(rc, tot, abs[i], tot));
when(q.getAbsoluteMaximumCapacity()).thenReturn(
maxCap[i] / (float) tot.getMemorySize());
// We need to make these fields to QueueCapacities
QueueCapacities qc = new QueueCapacities(false);
qc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[i], tot));
qc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[i], tot));
qc.setAbsoluteMaximumCapacity(maxCap[i] / (float) tot.getMemorySize());
when(q.getQueueCapacities()).thenReturn(qc);
String parentPathName = p.getQueuePath();
parentPathName = (parentPathName == null) ? "root" : parentPathName;
String queuePathName = (parentPathName + "." + queueName).replace("/",
"root");
when(q.getQueuePath()).thenReturn(queuePathName);
preemptionDisabled = mockPreemptionStatus(queuePathName);
when(q.getPreemptionDisabled()).thenReturn(preemptionDisabled);
}
assert 0 == pqs.size();
return root;
}
// Determine if any of the elements in the queupath have preemption disabled.
// Also must handle the case where preemption disabled property is explicitly
// set to something other than the default. Assumes system-wide preemption
// property is true.
private boolean mockPreemptionStatus(String queuePathName) {
boolean preemptionDisabled = false;
StringTokenizer tokenizer = new StringTokenizer(queuePathName, ".");
String qName = "";
while(tokenizer.hasMoreTokens()) {
qName += tokenizer.nextToken();
preemptionDisabled = conf.getPreemptionDisabled(qName, preemptionDisabled);
qName += ".";
}
return preemptionDisabled;
}
ParentQueue mockParentQueue(ParentQueue p, int subqueues,
Deque<ParentQueue> pqs) {
ParentQueue pq = mock(ParentQueue.class);
List<CSQueue> cqs = new ArrayList<CSQueue>();
when(pq.getChildQueues()).thenReturn(cqs);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(pq.getReadLock()).thenReturn(lock.readLock());
// Ordering policy
QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
when(policy.getConfigName()).thenReturn(
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
when(pq.getQueueOrderingPolicy()).thenReturn(policy);
when(pq.getPriority()).thenReturn(Priority.newInstance(0));
for (int i = 0; i < subqueues; ++i) {
pqs.add(pq);
}
if (p != null) {
p.getChildQueues().add(pq);
}
return pq;
}
@SuppressWarnings("rawtypes")
LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs,
Resource[] used, Resource[] pending, Resource[] reserved, int[] apps,
Resource[] gran) {
LeafQueue lq = mock(LeafQueue.class);
ResourceCalculator rc = mCS.getResourceCalculator();
List<ApplicationAttemptId> appAttemptIdList =
new ArrayList<ApplicationAttemptId>();
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(false))).thenReturn(pending[i]);
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(true))).thenReturn(Resources.componentwiseMax(
Resources.subtract(pending[i],
reserved[i] == null ? Resources.none() : reserved[i]),
Resources.none()));
// need to set pending resource in resource usage as well
ResourceUsage ru = new ResourceUsage();
ru.setPending(pending[i]);
ru.setUsed(used[i]);
ru.setReserved(reserved[i]);
when(lq.getQueueResourceUsage()).thenReturn(ru);
// consider moving where CapacityScheduler::comparator accessible
final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
return a1.getApplicationAttemptId()
.compareTo(a2.getApplicationAttemptId());
}
});
// applications are added in global L->R order in queues
if (apps[i] != 0) {
Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]);
Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]);
Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]);
for (int a = 0; a < apps[i]; ++a) {
FiCaSchedulerApp mockFiCaApp =
mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
qApps.add(mockFiCaApp);
++appAlloc;
appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId());
}
when(mCS.getAppsInQueue("queue" + (char)('A' + i - 1)))
.thenReturn(appAttemptIdList);
}
when(lq.getApplications()).thenReturn(qApps);
@SuppressWarnings("unchecked")
OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
when(so.getPreemptionIterator()).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
return qApps.descendingIterator();
}
});
when(lq.getOrderingPolicy()).thenReturn(so);
if(setAMResourcePercent != 0.0f){
when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(lq.getReadLock()).thenReturn(lock.readLock());
when(lq.getPriority()).thenReturn(Priority.newInstance(0));
p.getChildQueues().add(lq);
return lq;
}
FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending,
Resource reserved, Resource gran) {
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
ResourceCalculator rc = mCS.getResourceCalculator();
ApplicationId appId = ApplicationId.newInstance(TS, id);
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
when(app.getApplicationId()).thenReturn(appId);
when(app.getApplicationAttemptId()).thenReturn(appAttId);
int cAlloc = 0;
Resource unit = gran;
List<RMContainer> cReserved = new ArrayList<RMContainer>();
Resource resIter = Resource.newInstance(0, 0);
for (; Resources.lessThan(rc, clusterResources, resIter, reserved); Resources
.addTo(resIter, gran)) {
cReserved.add(mockContainer(appAttId, cAlloc, unit,
priority.CONTAINER.getValue()));
++cAlloc;
}
when(app.getReservedContainers()).thenReturn(cReserved);
List<RMContainer> cLive = new ArrayList<RMContainer>();
Resource usedIter = Resource.newInstance(0, 0);
int i = 0;
for (; Resources.lessThan(rc, clusterResources, usedIter, used); Resources
.addTo(usedIter, gran)) {
if (setAMContainer && i == 0) {
cLive.add(mockContainer(appAttId, cAlloc, unit,
priority.AMCONTAINER.getValue()));
} else if (setLabeledContainer && i == 1) {
cLive.add(mockContainer(appAttId, cAlloc, unit,
priority.LABELEDCONTAINER.getValue()));
Resources.addTo(used, Resource.newInstance(1, 1));
} else {
cLive.add(mockContainer(appAttId, cAlloc, unit,
priority.CONTAINER.getValue()));
}
++cAlloc;
++i;
}
when(app.getLiveContainers()).thenReturn(cLive);
return app;
}
RMContainer mockContainer(ApplicationAttemptId appAttId, int id,
Resource r, int cpriority) {
ContainerId cId = ContainerId.newContainerId(appAttId, id);
Container c = mock(Container.class);
when(c.getResource()).thenReturn(r);
when(c.getPriority()).thenReturn(Priority.newInstance(cpriority));
SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c);
RMContainer mC = mock(RMContainer.class);
when(mC.getContainerId()).thenReturn(cId);
when(mC.getAllocatedSchedulerKey()).thenReturn(sk);
when(mC.getContainer()).thenReturn(c);
when(mC.getApplicationAttemptId()).thenReturn(appAttId);
when(mC.getAllocatedResource()).thenReturn(r);
if (priority.AMCONTAINER.getValue() == cpriority) {
when(mC.isAMContainer()).thenReturn(true);
}
if (priority.LABELEDCONTAINER.getValue() == cpriority) {
when(mC.getAllocatedNode()).thenReturn(NodeId.newInstance("node1", 0));
}
return mC;
}
static int leafAbsCapacities(int[] abs, int[] subqueues) {
int ret = 0;
for (int i = 0; i < abs.length; ++i) {
if (0 == subqueues[i]) {
ret += abs[i];
}
}
return ret;
}
static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) {
Resource ret = Resource.newInstance(0, 0);
for (int i = 0; i < abs.length; ++i) {
if (0 == subqueues[i]) {
Resources.addTo(ret, abs[i]);
}
}
return ret;
}
}