blob: 7acfe5da0ec682c4010eb9a33c8d05e8f6d1ebd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
.extractCustomResourcesAsStrings;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper.newResource;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_VCORE_SECONDS_PREEMPTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestQueueMetricsForCustomResources {
public enum MetricsForCustomResource {
ALLOCATED, AVAILABLE, PENDING, RESERVED, AGGREGATE_PREEMPTED_SECONDS
}
public static final long GB = 1024; // MB
private static final Configuration CONF = new Configuration();
public static final String CUSTOM_RES_1 = "custom_res_1";
public static final String CUSTOM_RES_2 = "custom_res_2";
public static final String USER = "alice";
private Resource defaultResource;
private MetricsSystem ms;
@Before
public void setUp() {
ms = new MetricsSystemImpl();
QueueMetrics.clearQueueMetrics();
initializeResourceTypes();
createDefaultResource();
}
private void createDefaultResource() {
defaultResource = newResource(4 * GB, 4,
ImmutableMap.<String, String> builder()
.put(CUSTOM_RES_1, String.valueOf(15 * GB))
.put(CUSTOM_RES_2, String.valueOf(20 * GB))
.build());
}
private void initializeResourceTypes() {
Map<String, ResourceInformation> riMap = new HashMap<>();
ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES_1,
ResourceInformation.VCORES.getUnits(), 0, 2000);
ResourceInformation res2 = ResourceInformation.newInstance(CUSTOM_RES_2,
ResourceInformation.VCORES.getUnits(), 0, 2000);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
riMap.put(CUSTOM_RES_1, res1);
riMap.put(CUSTOM_RES_2, res2);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
private static void assertCustomResourceValue(QueueMetrics metrics,
MetricsForCustomResource metricsType,
Function<QueueMetrics, Resource> func,
String resourceName,
long expectedValue) {
Resource res = func.apply(metrics);
Long value = res.getResourceValue(resourceName);
assertCustomResourceValueInternal(metricsType, resourceName,
expectedValue, value);
}
private static void assertCustomResourceValueInternal(
MetricsForCustomResource metricsType, String resourceName, long
expectedValue, Long value) {
assertNotNull(
"QueueMetrics should have custom resource metrics value " +
"for resource: " + resourceName, value);
assertEquals(String.format(
"QueueMetrics should have custom resource metrics value %d " +
"for resource: %s for metrics type %s",
expectedValue, resourceName, metricsType), expectedValue,
(long) value);
}
private static Map<String, String> getCustomResourcesWithValue(long value) {
return ImmutableMap.<String, String>builder()
.put(CUSTOM_RES_1, String.valueOf(value))
.put(CUSTOM_RES_2, String.valueOf(value))
.build();
}
private QueueInfo createFourLevelQueueHierarchy() {
QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
QueueInfo sub = new QueueInfo(root, "root.subQ", ms, CONF, USER);
QueueInfo sub2 = new QueueInfo(sub, "root.subQ2", ms, CONF, USER);
return new QueueInfo(sub2, "root.subQ2.leafQ", ms, CONF, USER);
}
private QueueInfo createBasicQueueHierarchy() {
QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
return new QueueInfo(root, "root.leaf", ms, CONF, USER);
}
private QueueMetricsTestData.Builder
createQueueMetricsTestDataWithContainers(int containers) {
return createDefaultQueueMetricsTestData()
.withContainers(containers);
}
private QueueMetricsTestData.Builder createDefaultQueueMetricsTestData() {
return QueueMetricsTestData.Builder.create()
.withUser(USER)
.withPartition(RMNodeLabelsManager.NO_LABEL);
}
private void testIncreasePendingResources(QueueMetricsTestData testData) {
testIncreasePendingResourcesInternal(testData.containers, testData);
}
private void testIncreasePendingResourcesWithoutContainer(
QueueMetricsTestData testData) {
testIncreasePendingResourcesInternal(1, testData);
}
private void testIncreasePendingResourcesInternal(int containers,
QueueMetricsTestData testData) {
testData.leafQueue.queueMetrics.incrPendingResources(testData.partition,
testData.user, containers, testData.resource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(PENDING_CONTAINERS, containers)
.gaugeLong(PENDING_MB, containers *
testData.resource.getMemorySize())
.gaugeInt(PENDING_V_CORES, containers *
testData.resource.getVirtualCores())
.gaugeLong(PENDING_CUSTOM_RES1,
containers * testData.customResourceValues.get(CUSTOM_RES_1))
.gaugeLong(PENDING_CUSTOM_RES2,
containers * testData.customResourceValues.get(CUSTOM_RES_2));
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getPendingResources,
MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> v * containers));
}
private void testAllocateResources(boolean decreasePending,
QueueMetricsTestData testData) {
testData.leafQueue.queueMetrics.allocateResources(testData.partition,
testData.user, testData.containers, testData.resource, decreasePending);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(ALLOCATED_CONTAINERS, testData.containers)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, testData.containers)
.gaugeLong(ALLOCATED_MB, testData.containers *
testData.resource.getMemorySize())
.gaugeInt(ALLOCATED_V_CORES, testData.containers *
testData.resource.getVirtualCores())
.gaugeInt(PENDING_CONTAINERS, 0)
.gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0)
.gaugeLong(ALLOCATED_CUSTOM_RES1,
testData.containers
* testData.customResourceValues.get(CUSTOM_RES_1))
.gaugeLong(ALLOCATED_CUSTOM_RES2,
testData.containers
* testData.customResourceValues.get(CUSTOM_RES_2))
.checkAgainst(testData.leafQueue.queueSource);
if (decreasePending) {
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getPendingResources,
MetricsForCustomResource.PENDING,
computeExpectedCustomResourceValues(testData.customResourceValues,
(k, v) -> 0L));
}
if (!testData.customResourceValues.isEmpty()) {
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getAllocatedResources,
MetricsForCustomResource.ALLOCATED,
computeExpectedCustomResourceValues(testData.customResourceValues,
(k, v) -> v * testData.containers));
}
}
private void testUpdatePreemptedSeconds(QueueMetricsTestData testData,
int seconds) {
testData.leafQueue.queueMetrics.updatePreemptedMemoryMBSeconds(
testData.resource.getMemorySize() * seconds);
testData.leafQueue.queueMetrics.updatePreemptedVcoreSeconds(
testData.resource.getVirtualCores() * seconds);
testData.leafQueue.queueMetrics.updatePreemptedSecondsForCustomResources(
testData.resource, seconds);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED,
testData.resource.getMemorySize() * seconds)
.counter(AGGREGATE_VCORE_SECONDS_PREEMPTED,
testData.resource.getVirtualCores() * seconds)
.gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1,
testData.customResourceValues.get(CUSTOM_RES_1) * seconds)
.gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2,
testData.customResourceValues.get(CUSTOM_RES_2) * seconds);
assertQueueMetricsOnly(testData.leafQueue, checker,
this::convertPreemptedSecondsToResource,
MetricsForCustomResource.AGGREGATE_PREEMPTED_SECONDS,
computeExpectedCustomResourceValues(testData.customResourceValues,
(k, v) -> v * seconds));
}
private Resource convertPreemptedSecondsToResource(QueueMetrics qm) {
CustomResourceMetricValue customValues = qm
.getAggregatedPreemptedSecondsResources();
MutableCounterLong vcoreSeconds = qm
.getAggregateVcoreSecondsPreempted();
MutableCounterLong memorySeconds = qm
.getAggregateMemoryMBSecondsPreempted();
return Resource.newInstance(
memorySeconds.value(), (int) vcoreSeconds.value(),
customValues.getValues());
}
private void testReserveResources(QueueMetricsTestData testData) {
testData.leafQueue.queueMetrics.reserveResource(testData.partition,
testData.user, testData.resource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(RESERVED_CONTAINERS, 1)
.gaugeLong(RESERVED_MB, testData.resource.getMemorySize())
.gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores())
.gaugeLong(RESERVED_CUSTOM_RES1,
testData.customResourceValues.get(CUSTOM_RES_1))
.gaugeLong(RESERVED_CUSTOM_RES2,
testData.customResourceValues.get(CUSTOM_RES_2))
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getReservedResources,
MetricsForCustomResource.RESERVED,
computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> v));
}
private void testGetAllocatedResources(QueueMetricsTestData testData) {
testAllocateResources(false, testData);
Resource res = testData.leafQueue.queueMetrics.getAllocatedResources();
if (testData.customResourceValues.size() > 0) {
assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
CUSTOM_RES_1,
testData.customResourceValues.get(CUSTOM_RES_1) * testData.containers,
res.getResourceValue(CUSTOM_RES_1));
assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
CUSTOM_RES_2,
testData.customResourceValues.get(CUSTOM_RES_2) * testData.containers,
res.getResourceValue(CUSTOM_RES_2));
}
}
private void assertAllMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
Function<QueueMetrics, Resource> func,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllQueueMetrics(queueInfo, checker, func, metricsType,
expectedCustomResourceValues);
//assert leaf and root userSources
checker = ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.userSource);
ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.getRoot().userSource);
}
private void assertQueueMetricsOnly(QueueInfo queueInfo,
ResourceMetricsChecker checker,
Function<QueueMetrics, Resource> func,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllQueueMetrics(queueInfo, checker, func, metricsType,
expectedCustomResourceValues);
}
private void assertAllQueueMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
Function<QueueMetrics, Resource> func,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
// assert normal resource metrics values
queueInfo.checkAllQueueSources(qs -> ResourceMetricsChecker
.createFromChecker(checker).checkAgainst(qs));
// assert custom resource metrics values
queueInfo.checkAllQueueMetrics(qm -> {
assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_1,
expectedCustomResourceValues.get(CUSTOM_RES_1));
assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_2,
expectedCustomResourceValues.get(CUSTOM_RES_2));
});
}
private Map<String, Long> computeExpectedCustomResourceValues(
Map<String, Long> customResourceValues,
BiFunction<String, Long, Long> func) {
Map<String, Long> values = Maps.newHashMap();
for (Map.Entry<String, Long> res : customResourceValues.entrySet()) {
values.put(res.getKey(), func.apply(res.getKey(), res.getValue()));
}
return values;
}
@Test
public void testSetAvailableResourcesToQueue1() {
String queueName = "single";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
false, CONF);
MetricsSource queueSource = queueSource(ms, queueName);
metrics.setAvailableResourcesToQueue(newResource(
GB, 4,
ImmutableMap.<String, String> builder()
.put(CUSTOM_RES_1, String.valueOf(5 * GB))
.put(CUSTOM_RES_2, String.valueOf(6 * GB))
.build()));
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, GB)
.gaugeInt(AVAILABLE_V_CORES, 4)
.gaugeLong(AVAILABLE_CUSTOM_RES1, 5 * GB)
.gaugeLong(AVAILABLE_CUSTOM_RES2, 6 * GB)
.checkAgainst(queueSource);
assertCustomResourceValue(metrics,
MetricsForCustomResource.AVAILABLE,
QueueMetrics::getAvailableResources, CUSTOM_RES_1, 5 * GB);
assertCustomResourceValue(metrics,
MetricsForCustomResource.AVAILABLE,
QueueMetrics::getAvailableResources, CUSTOM_RES_2, 6 * GB);
}
@Test
public void testSetAvailableResourcesToQueue2() {
String queueName = "single";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
false, CONF);
MetricsSource queueSource = queueSource(ms, queueName);
metrics.setAvailableResourcesToQueue(null,
newResource(GB, 4,
ImmutableMap.<String, String> builder()
.put(CUSTOM_RES_1, String.valueOf(15 * GB))
.put(CUSTOM_RES_2, String.valueOf(20 * GB))
.build()));
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, GB)
.gaugeInt(AVAILABLE_V_CORES, 4)
.gaugeLong(AVAILABLE_CUSTOM_RES1, 15 * GB)
.gaugeLong(AVAILABLE_CUSTOM_RES2, 20 * GB)
.checkAgainst(queueSource);
assertCustomResourceValue(metrics,
MetricsForCustomResource.AVAILABLE,
QueueMetrics::getAvailableResources, CUSTOM_RES_1, 15 * GB);
assertCustomResourceValue(metrics,
MetricsForCustomResource.AVAILABLE,
QueueMetrics::getAvailableResources, CUSTOM_RES_2, 20 * GB);
}
@Test
public void testIncreasePendingResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(
newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
.withResources(defaultResource)
.build();
testIncreasePendingResources(testData);
}
@Test
public void testDecreasePendingResources() {
Resource resourceToDecrease =
newResource(GB, 2, getCustomResourcesWithValue(2 * GB));
int containersToDecrease = 2;
int containers = 5;
QueueMetricsTestData testData =
createQueueMetricsTestDataWithContainers(containers)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(resourceToDecrease, containers)
.withResources(defaultResource)
.build();
//compute expected values
final int vCoresToDecrease = resourceToDecrease.getVirtualCores();
final long memoryMBToDecrease = resourceToDecrease.getMemorySize();
final int containersAfterDecrease = containers - containersToDecrease;
final long customRes1ToDecrease =
resourceToDecrease.getResourceValue(CUSTOM_RES_1);
final long customRes2ToDecrease =
resourceToDecrease.getResourceValue(CUSTOM_RES_2);
final int vcoresAfterDecrease =
(defaultResource.getVirtualCores() * containers)
- (vCoresToDecrease * containersToDecrease);
final long memoryAfterDecrease =
(defaultResource.getMemorySize() * containers)
- (memoryMBToDecrease * containersToDecrease);
final long customResource1AfterDecrease =
(testData.customResourceValues.get(CUSTOM_RES_1) * containers)
- (customRes1ToDecrease * containersToDecrease);
final long customResource2AfterDecrease =
(testData.customResourceValues.get(CUSTOM_RES_2) * containers)
- (customRes2ToDecrease * containersToDecrease);
//first, increase resources to be able to decrease some
testIncreasePendingResources(testData);
//decrease resources
testData.leafQueue.queueMetrics.decrPendingResources(testData.partition,
testData.user, containersToDecrease,
ResourceTypesTestHelper.newResource(memoryMBToDecrease,
vCoresToDecrease,
extractCustomResourcesAsStrings(resourceToDecrease)));
//check
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(PENDING_CONTAINERS, containersAfterDecrease)
.gaugeLong(PENDING_MB, memoryAfterDecrease)
.gaugeInt(PENDING_V_CORES, vcoresAfterDecrease)
.gaugeLong(PENDING_CUSTOM_RES1, customResource1AfterDecrease)
.gaugeLong(PENDING_CUSTOM_RES2, customResource2AfterDecrease)
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getPendingResources,
MetricsForCustomResource.PENDING,
computeExpectedCustomResourceValues(testData.customResourceValues,
(k, v) -> v * containers - (resourceToDecrease.getResourceValue(k)
* containersToDecrease)));
}
@Test
public void testAllocateResourcesWithoutDecreasePending() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testAllocateResources(false, testData);
}
@Test
public void testAllocateResourcesWithDecreasePending() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(
newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
.withResources(defaultResource)
.build();
//first, increase pending resources to be able to decrease some
testIncreasePendingResources(testData);
//then allocate with decrease pending resources
testAllocateResources(true, testData);
}
@Test
public void testAllocateResourcesWithoutContainer() {
QueueMetricsTestData testData = createDefaultQueueMetricsTestData()
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
//first, increase pending resources
testIncreasePendingResourcesWithoutContainer(testData);
Resource resource = testData.resource;
testData.leafQueue.queueMetrics.allocateResources(testData.partition,
testData.user, resource);
ResourceMetricsChecker checker = ResourceMetricsChecker.create()
.gaugeLong(ALLOCATED_MB, resource.getMemorySize())
.gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores())
.gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0)
.gaugeLong(ALLOCATED_CUSTOM_RES1,
testData.customResourceValues.get(CUSTOM_RES_1))
.gaugeLong(ALLOCATED_CUSTOM_RES2,
testData.customResourceValues.get(CUSTOM_RES_2));
checker.checkAgainst(testData.leafQueue.queueSource);
checker.checkAgainst(testData.leafQueue.getRoot().queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getPendingResources,
MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> 0L));
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getAllocatedResources,
MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> v));
}
@Test
public void testReleaseResources() {
int containers = 5;
QueueMetricsTestData testData =
createQueueMetricsTestDataWithContainers(containers)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(defaultResource, containers)
.withResources(defaultResource)
.build();
//first, allocate some resources so that we can release some
testAllocateResources(false, testData);
testData.leafQueue.queueMetrics.releaseResources(testData.partition,
testData.user, containers, defaultResource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.counter(AGGREGATE_CONTAINERS_ALLOCATED, containers)
.counter(AGGREGATE_CONTAINERS_RELEASED, containers)
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getAllocatedResources,
MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> 0L));
}
@Test
public void testUpdatePreemptedSecondsForCustomResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createFourLevelQueueHierarchy())
.withResources(defaultResource)
.build();
final int seconds = 1;
testUpdatePreemptedSeconds(testData, seconds);
}
@Test
public void testUpdatePreemptedSecondsForCustomResourcesMoreSeconds() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createFourLevelQueueHierarchy())
.withResources(defaultResource)
.build();
final int seconds = 15;
testUpdatePreemptedSeconds(testData, seconds);
}
@Test
public void testReserveResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testReserveResources(testData);
}
@Test
public void testUnreserveResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testReserveResources(testData);
testData.leafQueue.queueMetrics.unreserveResource(testData.partition,
testData.user, defaultResource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(RESERVED_CONTAINERS, 0)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeLong(RESERVED_CUSTOM_RES1, 0).gaugeLong(RESERVED_CUSTOM_RES2, 0)
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getReservedResources,
MetricsForCustomResource.RESERVED, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> 0L));
}
@Test
public void testGetAllocatedResourcesWithCustomResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testGetAllocatedResources(testData);
}
@Test
public void testGetAllocatedResourcesWithoutCustomResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withResources(newResource(4 * GB, 4, Collections.emptyMap()))
.withLeafQueue(createBasicQueueHierarchy())
.build();
testGetAllocatedResources(testData);
}
}