blob: 785641cd60e1c456973c86291a3c8fe5e810b950 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.resourceestimator.service;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
import org.apache.hadoop.resourceestimator.common.serialization.RLESparseResourceAllocationSerDe;
import org.apache.hadoop.resourceestimator.common.serialization.ResourceSerDe;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.test.framework.JerseyTest;
/**
* Test ResourceEstimatorService.
*/
public class TestResourceEstimatorService extends JerseyTest {
private final String parseLogCommand = "resourceestimator/translator/"
+ "src/test/resources/resourceEstimatorService.txt";
private final String getHistorySkylineCommand =
"resourceestimator/skylinestore/history/tpch_q12/*";
private final String getEstimatedSkylineCommand =
"resourceestimator/skylinestore/estimation/tpch_q12";
private final String makeEstimationCommand =
"resourceestimator/estimator/tpch_q12";
private final String deleteHistoryCommand =
"resourceestimator/skylinestore/history/tpch_q12/tpch_q12_1";
private static boolean setUpDone = false;
private Resource containerSpec;
private Gson gson;
private long containerMemAlloc;
private int containerCPUAlloc;
public TestResourceEstimatorService() {
super("org.apache.hadoop.resourceestimator.service");
}
@Before @Override public void setUp() throws Exception {
super.setUp();
containerMemAlloc = 1024;
containerCPUAlloc = 1;
containerSpec = Resource.newInstance(containerMemAlloc, containerCPUAlloc);
gson = new GsonBuilder()
.registerTypeAdapter(Resource.class, new ResourceSerDe())
.registerTypeAdapter(RLESparseResourceAllocation.class,
new RLESparseResourceAllocationSerDe())
.enableComplexMapKeySerialization().create();
}
private void compareResourceSkyline(final ResourceSkyline skyline1,
final ResourceSkyline skyline2) {
Assert.assertEquals(skyline1.getJobId(), skyline2.getJobId());
Assert.assertEquals(skyline1.getJobInputDataSize(),
skyline2.getJobInputDataSize(), 0);
Assert.assertEquals(skyline1.getJobSubmissionTime(),
skyline2.getJobSubmissionTime());
Assert
.assertEquals(skyline1.getJobFinishTime(), skyline2.getJobFinishTime());
Assert.assertEquals(skyline1.getContainerSpec().getMemorySize(),
skyline2.getContainerSpec().getMemorySize());
Assert.assertEquals(skyline1.getContainerSpec().getVirtualCores(),
skyline2.getContainerSpec().getVirtualCores());
final RLESparseResourceAllocation skylineList1 = skyline1.getSkylineList();
final RLESparseResourceAllocation skylineList2 = skyline2.getSkylineList();
for (int i = (int) skylineList1.getEarliestStartTime();
i < skylineList1.getLatestNonNullTime(); i++) {
Assert.assertEquals(skylineList1.getCapacityAtTime(i).getMemorySize(),
skylineList2.getCapacityAtTime(i).getMemorySize());
Assert.assertEquals(skylineList1.getCapacityAtTime(i).getVirtualCores(),
skylineList2.getCapacityAtTime(i).getVirtualCores());
}
}
private ResourceSkyline getSkyline1() {
final TreeMap<Long, Resource> resourceOverTime = new TreeMap<>();
ReservationInterval riAdd;
final RLESparseResourceAllocation skylineList =
new RLESparseResourceAllocation(resourceOverTime,
new DefaultResourceCalculator());
riAdd = new ReservationInterval(0, 10);
Resource resource =
Resource.newInstance(containerMemAlloc, containerCPUAlloc);
skylineList.addInterval(riAdd, resource);
riAdd = new ReservationInterval(10, 15);
resource = Resource
.newInstance(containerMemAlloc * 1074, containerCPUAlloc * 1074);
skylineList.addInterval(riAdd, resource);
riAdd = new ReservationInterval(15, 20);
resource = Resource
.newInstance(containerMemAlloc * 2538, containerCPUAlloc * 2538);
skylineList.addInterval(riAdd, resource);
riAdd = new ReservationInterval(20, 25);
resource = Resource
.newInstance(containerMemAlloc * 2468, containerCPUAlloc * 2468);
skylineList.addInterval(riAdd, resource);
final ResourceSkyline resourceSkyline1 =
new ResourceSkyline("tpch_q12_0", 0, 0, 25, containerSpec, skylineList);
return resourceSkyline1;
}
private ResourceSkyline getSkyline2() {
final TreeMap<Long, Resource> resourceOverTime = new TreeMap<>();
ReservationInterval riAdd;
final RLESparseResourceAllocation skylineList =
new RLESparseResourceAllocation(resourceOverTime,
new DefaultResourceCalculator());
riAdd = new ReservationInterval(0, 10);
Resource resource =
Resource.newInstance(containerMemAlloc, containerCPUAlloc);
skylineList.addInterval(riAdd, resource);
riAdd = new ReservationInterval(10, 15);
resource =
Resource.newInstance(containerMemAlloc * 794, containerCPUAlloc * 794);
skylineList.addInterval(riAdd, resource);
riAdd = new ReservationInterval(15, 20);
resource = Resource
.newInstance(containerMemAlloc * 2517, containerCPUAlloc * 2517);
skylineList.addInterval(riAdd, resource);
riAdd = new ReservationInterval(20, 25);
resource = Resource
.newInstance(containerMemAlloc * 2484, containerCPUAlloc * 2484);
skylineList.addInterval(riAdd, resource);
final ResourceSkyline resourceSkyline2 =
new ResourceSkyline("tpch_q12_1", 0, 0, 25, containerSpec, skylineList);
return resourceSkyline2;
}
private void checkResult(final String jobId,
final Map<RecurrenceId, List<ResourceSkyline>> jobHistory) {
switch (jobId) {
case "tpch_q12_0": {
final RecurrenceId recurrenceId =
new RecurrenceId("tpch_q12", "tpch_q12_0");
Assert.assertEquals(1, jobHistory.get(recurrenceId).size());
ResourceSkyline skylineReceive = jobHistory.get(recurrenceId).get(0);
compareResourceSkyline(skylineReceive, getSkyline1());
break;
}
case "tpch_q12_1": {
final RecurrenceId recurrenceId =
new RecurrenceId("tpch_q12", "tpch_q12_1");
Assert.assertEquals(1, jobHistory.get(recurrenceId).size());
ResourceSkyline skylineReceive = jobHistory.get(recurrenceId).get(0);
compareResourceSkyline(skylineReceive, getSkyline2());
break;
}
default:
break;
}
}
private void compareRLESparseResourceAllocation(
final RLESparseResourceAllocation rle1,
final RLESparseResourceAllocation rle2) {
for (int i = (int) rle1.getEarliestStartTime();
i < rle1.getLatestNonNullTime(); i++) {
Assert.assertEquals(rle1.getCapacityAtTime(i), rle2.getCapacityAtTime(i));
}
}
@Test public void testGetPrediction() {
// first, parse the log
final String logFile = "resourceEstimatorService.txt";
WebResource webResource = resource();
webResource.path(parseLogCommand).type(MediaType.APPLICATION_XML_TYPE)
.post(logFile);
webResource = resource().path(getHistorySkylineCommand);
String response = webResource.get(String.class);
Map<RecurrenceId, List<ResourceSkyline>> jobHistory =
gson.fromJson(response,
new TypeToken<Map<RecurrenceId, List<ResourceSkyline>>>() {
}.getType());
checkResult("tpch_q12_0", jobHistory);
checkResult("tpch_q12_1", jobHistory);
// then, try to get estimated resource allocation from skyline store
webResource = resource().path(getEstimatedSkylineCommand);
response = webResource.get(String.class);
Assert.assertEquals("null", response);
// then, we call estimator module to make the prediction
webResource = resource().path(makeEstimationCommand);
response = webResource.get(String.class);
RLESparseResourceAllocation skylineList =
gson.fromJson(response, new TypeToken<RLESparseResourceAllocation>() {
}.getType());
Assert.assertEquals(1,
skylineList.getCapacityAtTime(0).getMemorySize() / containerMemAlloc);
Assert.assertEquals(1058,
skylineList.getCapacityAtTime(10).getMemorySize() / containerMemAlloc);
Assert.assertEquals(2538,
skylineList.getCapacityAtTime(15).getMemorySize() / containerMemAlloc);
Assert.assertEquals(2484,
skylineList.getCapacityAtTime(20).getMemorySize() / containerMemAlloc);
// then, we get estimated resource allocation for tpch_q12
webResource = resource().path(getEstimatedSkylineCommand);
response = webResource.get(String.class);
final RLESparseResourceAllocation skylineList2 =
gson.fromJson(response, new TypeToken<RLESparseResourceAllocation>() {
}.getType());
compareRLESparseResourceAllocation(skylineList, skylineList2);
// then, we call estimator module again to directly get estimated resource
// allocation from skyline store
webResource = resource().path(makeEstimationCommand);
response = webResource.get(String.class);
final RLESparseResourceAllocation skylineList3 =
gson.fromJson(response, new TypeToken<RLESparseResourceAllocation>() {
}.getType());
compareRLESparseResourceAllocation(skylineList, skylineList3);
// finally, test delete
webResource = resource().path(deleteHistoryCommand);
webResource.delete();
webResource = resource().path(getHistorySkylineCommand);
response = webResource.get(String.class);
jobHistory = gson.fromJson(response,
new TypeToken<Map<RecurrenceId, List<ResourceSkyline>>>() {
}.getType());
// jobHistory should only have info for tpch_q12_0
Assert.assertEquals(1, jobHistory.size());
final String pipelineId =
((RecurrenceId) jobHistory.keySet().toArray()[0]).getRunId();
Assert.assertEquals("tpch_q12_0", pipelineId);
}
}