blob: b9ce54e6a5cbe5181a093f6f6461f285b33a61e1 [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import net.jcip.annotations.NotThreadSafe;
/**
* This class is a base test for {@code SharingPolicy} implementors.
*/
@RunWith(value = Parameterized.class)
@NotThreadSafe
@SuppressWarnings("VisibilityModifier")
public abstract class BaseSharingPolicyTest {
@Parameterized.Parameter(value = 0)
public long duration;
@Parameterized.Parameter(value = 1)
public double height;
@Parameterized.Parameter(value = 2)
public int numSubmissions;
@Parameterized.Parameter(value = 3)
public String recurrenceExpression;
@Parameterized.Parameter(value = 4)
public Class expectedError;
private long step;
private long initTime;
private InMemoryPlan plan;
private ReservationAgent mAgent;
private Resource minAlloc;
private ResourceCalculator res;
private Resource maxAlloc;
private int totCont = 1000;
protected ReservationSchedulerConfiguration conf;
@Before
public void setup() {
// 1 sec step
step = 1000L;
initTime = System.currentTimeMillis();
minAlloc = Resource.newInstance(1024, 1);
res = new DefaultResourceCalculator();
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class);
QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
Resource clusterResource =
ReservationSystemTestUtil.calculateClusterResource(totCont);
// invoke implementors initialization of policy
SharingPolicy policy = getInitializedPolicy();
RMContext context = ReservationSystemTestUtil.createMockRMContext();
plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource,
step, res, minAlloc, maxAlloc, "dedicated", null, true, context);
}
public void runTest() throws IOException, PlanningException {
long period = 1;
if (recurrenceExpression != null) {
period = Long.parseLong(recurrenceExpression);
}
try {
RLESparseResourceAllocation rle = generateRLEAlloc(period);
// Generate the intervalMap (trimming out-of-period entries)
Map<ReservationInterval, Resource> reservationIntervalResourceMap;
if (period > 1) {
rle = new PeriodicRLESparseResourceAllocation(rle, period);
reservationIntervalResourceMap =
ReservationSystemTestUtil.toAllocation(rle, 0, period);
} else {
reservationIntervalResourceMap = ReservationSystemTestUtil
.toAllocation(rle, Long.MIN_VALUE, Long.MAX_VALUE);
}
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime % period, initTime % period + duration + 1, duration, 1,
recurrenceExpression);
// perform multiple submissions where required
for (int i = 0; i < numSubmissions; i++) {
long rstart = rle.getEarliestStartTime();
long rend = rle.getLatestNonNullTime();
InMemoryReservationAllocation resAlloc =
new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", rstart, rend, reservationIntervalResourceMap, res,
minAlloc);
assertTrue(plan.toString(), plan.addReservation(resAlloc, false));
}
// fail if error was expected
if (expectedError != null) {
System.out.println(plan.toString());
fail();
}
} catch (Exception e) {
if (expectedError == null || !e.getClass().getCanonicalName()
.equals(expectedError.getCanonicalName())) {
// fail on unexpected errors
throw e;
}
}
}
private RLESparseResourceAllocation generateRLEAlloc(long period) {
RLESparseResourceAllocation rle =
new RLESparseResourceAllocation(new DefaultResourceCalculator());
Resource alloc = Resources.multiply(minAlloc, height * totCont);
// loop in case the periodicity of the reservation is smaller than LCM
long rStart = initTime % period;
long rEnd = initTime % period + duration;
// handle wrap-around
if (period > 1 && rEnd > period) {
long diff = rEnd - period;
rEnd = period;
// handle multiple wrap-arounds (e.g., 5h duration on a 2h periodicity)
if(duration > period) {
rle.addInterval(new ReservationInterval(0, period),
Resources.multiply(alloc, duration / period - 1));
rle.addInterval(new ReservationInterval(0, diff % period), alloc);
} else {
rle.addInterval(new ReservationInterval(0, diff), alloc);
}
}
if(rStart > rEnd){
rle.addInterval(new ReservationInterval(rStart, period), alloc);
rle.addInterval(new ReservationInterval(0, rEnd), alloc);
} else {
rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
}
return rle;
}
public abstract SharingPolicy getInitializedPolicy();
}