blob: c6cebd96928b1e15234c5cb4ceffbfc80c9ba8b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
public abstract class TestSchedulerPlanFollowerBase {
final static int GB = 1024;
protected Clock mClock = null;
protected ResourceScheduler scheduler = null;
protected ReservationAgent mAgent;
protected Resource minAlloc = Resource.newInstance(GB, 1);
protected Resource maxAlloc = Resource.newInstance(GB * 8, 8);
protected CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
protected Plan plan;
private ResourceCalculator res = new DefaultResourceCalculator();
private RMContext context = ReservationSystemTestUtil.createMockRMContext();
protected void testPlanFollower(boolean isMove) throws PlanningException,
InterruptedException, AccessControlException {
// Initialize plan based on move flag
plan =
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
scheduler.getClusterResource(), 1L, res,
scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
null, isMove, context);
// add a few reservations to the plan
long ts = System.currentTimeMillis();
ReservationId r1 = ReservationId.newInstance(ts, 1);
int[] f1 = { 10, 10, 10, 10, 10 };
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
0, 0 + f1.length + 1, f1.length);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
"dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
.generateAllocation(0L, 1L, f1), res, minAlloc), false));
ReservationId r2 = ReservationId.newInstance(ts, 2);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u3",
"dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
.generateAllocation(3L, 1L, f1), res, minAlloc), false));
ReservationId r3 = ReservationId.newInstance(ts, 3);
int[] f2 = { 0, 10, 20, 10, 0 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u4",
"dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
.generateAllocation(10L, 1L, f2), res, minAlloc), false));
// default reseration queue should exist before run of PlanFollower AND have
// no apps
checkDefaultQueueBeforePlanFollowerRun();
AbstractSchedulerPlanFollower planFollower = createPlanFollower();
when(mClock.getTime()).thenReturn(0L);
planFollower.run();
Queue q = getReservationQueue(r1.toString());
assertReservationQueueExists(r1);
// submit an app to r1
String user_0 = "test-user";
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId_0 =
ApplicationAttemptId.newInstance(appId, 0);
AppAddedSchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
scheduler.handle(addAppEvent);
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
scheduler.handle(appAttemptAddedEvent);
// initial default reservation queue should have no apps after first run
Queue defQ = getDefaultQueue();
Assert.assertEquals(0, getNumberOfApplications(defQ));
assertReservationQueueExists(r1, 0.1, 0.1);
Assert.assertEquals(1, getNumberOfApplications(q));
assertReservationQueueDoesNotExist(r2);
assertReservationQueueDoesNotExist(r3);
when(mClock.getTime()).thenReturn(3L);
planFollower.run();
Assert.assertEquals(0, getNumberOfApplications(defQ));
assertReservationQueueExists(r1, 0.1, 0.1);
Assert.assertEquals(1, getNumberOfApplications(q));
assertReservationQueueExists(r2, 0.1, 0.1);
assertReservationQueueDoesNotExist(r3);
when(mClock.getTime()).thenReturn(10L);
planFollower.run();
q = getReservationQueue(r1.toString());
if (isMove) {
// app should have been moved to default reservation queue
Assert.assertEquals(1, getNumberOfApplications(defQ));
assertNull(q);
} else {
// app should be killed
Assert.assertEquals(0, getNumberOfApplications(defQ));
assertNotNull(q);
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
RMAppAttemptState.KILLED, false);
scheduler.handle(appAttemptRemovedEvent);
}
assertReservationQueueDoesNotExist(r2);
assertReservationQueueExists(r3, 0, 1.0);
when(mClock.getTime()).thenReturn(11L);
planFollower.run();
if (isMove) {
// app should have been moved to default reservation queue
Assert.assertEquals(1, getNumberOfApplications(defQ));
} else {
// app should be killed
Assert.assertEquals(0, getNumberOfApplications(defQ));
}
assertReservationQueueDoesNotExist(r1);
assertReservationQueueDoesNotExist(r2);
assertReservationQueueExists(r3, 0.1, 0.1);
when(mClock.getTime()).thenReturn(12L);
planFollower.run();
assertReservationQueueDoesNotExist(r1);
assertReservationQueueDoesNotExist(r2);
assertReservationQueueExists(r3, 0.2, 0.2);
when(mClock.getTime()).thenReturn(16L);
planFollower.run();
assertReservationQueueDoesNotExist(r1);
assertReservationQueueDoesNotExist(r2);
assertReservationQueueDoesNotExist(r3);
verifyCapacity(defQ);
}
protected abstract void checkDefaultQueueBeforePlanFollowerRun();
protected abstract Queue getReservationQueue(String reservationId);
protected abstract void verifyCapacity(Queue defQ);
protected abstract Queue getDefaultQueue();
protected abstract int getNumberOfApplications(Queue queue);
protected abstract AbstractSchedulerPlanFollower createPlanFollower();
protected abstract void assertReservationQueueExists(ReservationId r);
protected abstract void assertReservationQueueExists(ReservationId r2,
double expectedCapacity, double expectedMaxCapacity);
protected abstract void assertReservationQueueDoesNotExist(ReservationId r2);
}