blob: d0a80eb20bb320d8b1c1ec7009743049bc09eb3f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Random;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
public class TestProportionalCapacityPreemptionPolicy {
static final long TS = 3141592653L;
int appAlloc = 0;
Random rand = null;
Clock mClock = null;
Configuration conf = null;
CapacityScheduler mCS = null;
EventHandler<ContainerPreemptEvent> mDisp = null;
ResourceCalculator rc = new DefaultResourceCalculator();
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 0), 0);
final ApplicationAttemptId appB = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 1), 0);
final ApplicationAttemptId appC = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 2), 0);
final ApplicationAttemptId appD = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 3), 0);
final ApplicationAttemptId appE = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 4), 0);
final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
ArgumentCaptor.forClass(ContainerPreemptEvent.class);
@Rule public TestName name = new TestName();
@Before
@SuppressWarnings("unchecked")
public void setup() {
conf = new Configuration(false);
conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
conf.setLong(MONITORING_INTERVAL, 3000);
// report "ideal" preempt
conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
mClock = mock(Clock.class);
mCS = mock(CapacityScheduler.class);
when(mCS.getResourceCalculator()).thenReturn(rc);
mDisp = mock(EventHandler.class);
rand = new Random();
long seed = rand.nextLong();
System.out.println(name.getMethodName() + " SEED: " + seed);
rand.setSeed(seed);
appAlloc = 0;
}
@Test
public void testIgnore() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 0, 0, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// don't correct imbalances without demand
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testProportionalPreemption() {
int[][] qData = new int[][]{
// / A B C D
{ 100, 10, 40, 20, 30 }, // abs
{ 100, 100, 100, 100, 100 }, // maxCap
{ 100, 30, 60, 10, 0 }, // used
{ 45, 20, 5, 20, 0 }, // pending
{ 0, 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1, 0 }, // apps
{ -1, 1, 1, 1, 1 }, // req granularity
{ 4, 0, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testMaxCap() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 45, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// despite the imbalance, since B is at maxCap, do not correct
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testPreemptCycle() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// ensure all pending rsrc from A get preempted from other queues
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testExpireKill() {
final long killTime = 10000L;
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setLong(WAIT_TIME_BEFORE_KILL, killTime);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
// ensure all pending rsrc from A get preempted from other queues
when(mClock.getTime()).thenReturn(0L);
policy.editSchedule();
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
// requests reiterated
when(mClock.getTime()).thenReturn(killTime / 2);
policy.editSchedule();
verify(mDisp, times(20)).handle(argThat(new IsPreemptionRequestFor(appC)));
// kill req sent
when(mClock.getTime()).thenReturn(killTime + 1);
policy.editSchedule();
verify(mDisp, times(30)).handle(evtCaptor.capture());
List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
for (ContainerPreemptEvent e : events.subList(20, 30)) {
assertEquals(appC, e.getAppId());
assertEquals(KILL_CONTAINER, e.getType());
}
}
@Test
public void testDeadzone() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 39, 43, 21 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setFloat(MAX_IGNORED_OVER_CAPACITY, (float) 0.1);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// ignore 10% overcapacity to avoid jitter
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testOverCapacityImbalance() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// correct imbalance between over-capacity queues
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testNaturalTermination() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 0.1);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// ignore 10% imbalance between over-capacity queues
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testObserveOnly() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 90, 10, 0 }, // used
{ 80, 10, 20, 50 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setBoolean(OBSERVE_ONLY, true);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify even severe imbalance not affected
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testHierarchical() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testZeroGuar() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 0, 99, 100, 10, 90 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 170, 80, 60, 20, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testZeroGuarOverCap() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 0, 99, 0, 100, 100 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 170, 170, 60, 20, 90, 0, 0 }, // used
{ 85, 50, 30, 10, 10, 20, 20 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 4, 3, 1, 1, 1, 1, 1 }, // apps
{ -1, -1, 1, 1, 1, -1, 1 }, // req granularity
{ 2, 3, 0, 0, 0, 1, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// we verify both that C has priority on B and D (has it has >0 guarantees)
// and that B and D are force to share their over capacity fairly (as they
// are both zero-guarantees) hence D sees some of its containers preempted
verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
// / A B C D E F G H I
{ 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
{ 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap
{ 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not H1 despite H1 being far over
// its absolute guaranteed capacity
// XXX note: compensating for rounding error in Resources.multiplyTo
// which is likely triggered since we use small numbers for readability
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE)));
}
@Test
public void testContainerOrdering(){
List<RMContainer> containers = new ArrayList<RMContainer>();
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 10), 0);
// create a set of containers
RMContainer rm1 = mockContainer(appAttId, 5, mock(Resource.class), 3);
RMContainer rm2 = mockContainer(appAttId, 3, mock(Resource.class), 3);
RMContainer rm3 = mockContainer(appAttId, 2, mock(Resource.class), 2);
RMContainer rm4 = mockContainer(appAttId, 1, mock(Resource.class), 2);
RMContainer rm5 = mockContainer(appAttId, 4, mock(Resource.class), 1);
// insert them in non-sorted order
containers.add(rm3);
containers.add(rm2);
containers.add(rm1);
containers.add(rm5);
containers.add(rm4);
// sort them
ProportionalCapacityPreemptionPolicy.sortContainers(containers);
// verify the "priority"-first, "reverse container-id"-second
// ordering is enforced correctly
assert containers.get(0).equals(rm1);
assert containers.get(1).equals(rm2);
assert containers.get(2).equals(rm3);
assert containers.get(3).equals(rm4);
assert containers.get(4).equals(rm5);
}
@Test
public void testPolicyInitializeAfterSchedulerInitialized() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
@SuppressWarnings("resource")
MockRM rm = new MockRM(conf);
rm.init(conf);
// ProportionalCapacityPreemptionPolicy should be initialized after
// CapacityScheduler initialized. We will
// 1) find SchedulingMonitor from RMActiveService's service list,
// 2) check if ResourceCalculator in policy is null or not.
// If it's not null, we can come to a conclusion that policy initialized
// after scheduler got initialized
for (Service service : rm.getRMActiveService().getServices()) {
if (service instanceof SchedulingMonitor) {
ProportionalCapacityPreemptionPolicy policy =
(ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
.getSchedulingEditPolicy();
assertNotNull(policy.getResourceCalculator());
return;
}
}
fail("Failed to find SchedulingMonitor service, please check what happened");
}
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
private final ContainerPreemptEventType type;
IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
this(appAttId, PREEMPT_CONTAINER);
}
IsPreemptionRequestFor(ApplicationAttemptId appAttId,
ContainerPreemptEventType type) {
this.appAttId = appAttId;
this.type = type;
}
@Override
public boolean matches(Object o) {
return appAttId.equals(((ContainerPreemptEvent)o).getAppId())
&& type.equals(((ContainerPreemptEvent)o).getType());
}
@Override
public String toString() {
return appAttId.toString();
}
}
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
ProportionalCapacityPreemptionPolicy policy =
new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
ParentQueue mRoot = buildMockRootQueue(rand, qData);
when(mCS.getRootQueue()).thenReturn(mRoot);
Resource clusterResources =
Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
when(mCS.getClusterResource()).thenReturn(clusterResources);
return policy;
}
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
int[] abs = queueData[0];
int[] maxCap = queueData[1];
int[] used = queueData[2];
int[] pending = queueData[3];
int[] reserved = queueData[4];
int[] apps = queueData[5];
int[] gran = queueData[6];
int[] queues = queueData[7];
return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
}
ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
float tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
ParentQueue root = mockParentQueue(null, queues[0], pqs);
when(root.getQueueName()).thenReturn("/");
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
final ParentQueue p = pqs.removeLast();
final String queueName = "queue" + ((char)('A' + i - 1));
if (queues[i] > 0) {
q = mockParentQueue(p, queues[i], pqs);
} else {
q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
}
when(q.getParent()).thenReturn(p);
when(q.getQueueName()).thenReturn(queueName);
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
}
assert 0 == pqs.size();
return root;
}
ParentQueue mockParentQueue(ParentQueue p, int subqueues,
Deque<ParentQueue> pqs) {
ParentQueue pq = mock(ParentQueue.class);
List<CSQueue> cqs = new ArrayList<CSQueue>();
when(pq.getChildQueues()).thenReturn(cqs);
for (int i = 0; i < subqueues; ++i) {
pqs.add(pq);
}
if (p != null) {
p.getChildQueues().add(pq);
}
return pq;
}
LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
LeafQueue lq = mock(LeafQueue.class);
when(lq.getTotalResourcePending()).thenReturn(
Resource.newInstance(pending[i], 0));
// consider moving where CapacityScheduler::comparator accessible
NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
return a1.getApplicationAttemptId()
.compareTo(a2.getApplicationAttemptId());
}
});
// applications are added in global L->R order in queues
if (apps[i] != 0) {
int aUsed = used[i] / apps[i];
int aPending = pending[i] / apps[i];
int aReserve = reserved[i] / apps[i];
for (int a = 0; a < apps[i]; ++a) {
qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]));
++appAlloc;
}
}
when(lq.getApplications()).thenReturn(qApps);
p.getChildQueues().add(lq);
return lq;
}
FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
int gran) {
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
ApplicationId appId = ApplicationId.newInstance(TS, id);
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
when(app.getApplicationId()).thenReturn(appId);
when(app.getApplicationAttemptId()).thenReturn(appAttId);
int cAlloc = 0;
Resource unit = Resource.newInstance(gran, 0);
List<RMContainer> cReserved = new ArrayList<RMContainer>();
for (int i = 0; i < reserved; i += gran) {
cReserved.add(mockContainer(appAttId, cAlloc, unit, 1));
++cAlloc;
}
when(app.getReservedContainers()).thenReturn(cReserved);
List<RMContainer> cLive = new ArrayList<RMContainer>();
for (int i = 0; i < used; i += gran) {
cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
++cAlloc;
}
when(app.getLiveContainers()).thenReturn(cLive);
return app;
}
RMContainer mockContainer(ApplicationAttemptId appAttId, int id,
Resource r, int priority) {
ContainerId cId = ContainerId.newInstance(appAttId, id);
Container c = mock(Container.class);
when(c.getResource()).thenReturn(r);
when(c.getPriority()).thenReturn(Priority.create(priority));
RMContainer mC = mock(RMContainer.class);
when(mC.getContainerId()).thenReturn(cId);
when(mC.getContainer()).thenReturn(c);
return mC;
}
static int leafAbsCapacities(int[] abs, int[] subqueues) {
int ret = 0;
for (int i = 0; i < abs.length; ++i) {
if (0 == subqueues[i]) {
ret += abs[i];
}
}
return ret;
}
void printString(CSQueue nq, String indent) {
if (nq instanceof ParentQueue) {
System.out.println(indent + nq.getQueueName()
+ " cur:" + nq.getAbsoluteUsedCapacity()
+ " guar:" + nq.getAbsoluteCapacity()
);
for (CSQueue q : ((ParentQueue)nq).getChildQueues()) {
printString(q, indent + " ");
}
} else {
System.out.println(indent + nq.getQueueName()
+ " pen:" + ((LeafQueue) nq).getTotalResourcePending()
+ " cur:" + nq.getAbsoluteUsedCapacity()
+ " guar:" + nq.getAbsoluteCapacity()
);
for (FiCaSchedulerApp a : ((LeafQueue)nq).getApplications()) {
System.out.println(indent + " " + a.getApplicationId());
}
}
}
}