blob: 33c39290de572aafe17076fc8e059ed7f5183238 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestQueueMetrics {
private static Queue createMockQueue(QueueMetrics metrics) {
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);
return queue;
}
private static final int GB = 1024; // MB
private static final String USER = "alice";
private static final String USER_2 = "dodo";
private static final Configuration conf = new Configuration();
private MetricsSystem ms;
@Before
public void setUp() {
ms = new MetricsSystemImpl();
QueueMetrics.clearQueueMetrics();
}
@Test
public void testDefaultSingleQueueMetrics() {
String queueName = "single";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
conf);
MetricsSource queueSource= queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(USER);
metrics.submitApp(USER);
MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
USER, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
ResourceMetricsChecker rmChecker =
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(queueSource);
metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
USER, 3, Resources.createResource(2*GB, 2), true);
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.checkAgainst(queueSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
USER, 1, Resources.createResource(2*GB, 2));
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.checkAgainst(queueSource);
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
USER, 0, Resources.createResource(2 * GB, 2));
//nothing should change in values
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource);
metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
USER, 0, Resources.createResource(2 * GB, 2));
//nothing should change in values
ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource);
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
metrics.finishApp(USER, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
assertNull(userSource);
}
@Test
public void testQueueAppMetricsForMultipleFailures() {
String queueName = "single";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
new Configuration());
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(USER);
metrics.submitApp(USER);
MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
// Suppose say application has failed this time as well.
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
// Suppose say application has failed, and there's no more retries.
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
metrics.finishApp(USER, RMAppState.FAILED);
AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.counter(APPS_FAILED, 1)
.checkAgainst(queueSource, true);
assertNull(userSource);
}
@Test
public void testSingleQueueWithUserMetrics() {
String queueName = "single2";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true,
conf);
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(USER_2);
metrics.submitApp(USER_2);
MetricsSource userSource = userSource(ms, queueName, USER_2);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(userSource, true);
metrics.submitAppAttempt(USER_2);
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
appMetricsUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(userSource, true);
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
USER_2, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
USER_2, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
ResourceMetricsChecker resMetricsQueueSourceChecker =
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 100 * GB)
.gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource);
ResourceMetricsChecker resMetricsUserSourceChecker =
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 10 * GB)
.gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(userSource);
metrics.runAppAttempt(app.getApplicationId(), USER_2);
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
appMetricsUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(userSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
USER_2, 3, Resources.createResource(2*GB, 2), true);
resMetricsQueueSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.checkAgainst(queueSource);
resMetricsUserSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.checkAgainst(userSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
USER_2, 1, Resources.createResource(2*GB, 2));
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.checkAgainst(queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.checkAgainst(userSource);
metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(userSource, true);
metrics.finishApp(USER_2, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(userSource, true);
}
@Test
public void testNodeTypeMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getMetrics()).thenReturn(parentMetrics);
QueueMetrics metrics =
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf);
MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
MetricsSource queueSource = queueSource(ms, leafQueueName);
//AppSchedulingInfo app = mockApp(user);
metrics.submitApp(USER);
MetricsSource userSource = userSource(ms, leafQueueName, USER);
MetricsSource parentUserSource = userSource(ms, parentQueueName, USER);
metrics.incrNodeTypeAggregations(USER, NodeType.NODE_LOCAL);
checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(userSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L);
metrics.incrNodeTypeAggregations(USER, NodeType.RACK_LOCAL);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L);
metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L);
metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 2L);
}
@Test
public void testTwoLevelWithUserMetrics() {
AppSchedulingInfo app = mockApp(USER);
QueueInfo root = new QueueInfo(null, "root", ms, conf, USER);
QueueInfo leaf = new QueueInfo(root, "root.leaf", ms, conf, USER);
leaf.queueMetrics.submitApp(USER);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(leaf.queueSource, true);
AppMetricsChecker appMetricsParentQueueSourceChecker =
AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(root.queueSource, true);
AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(leaf.userSource, true);
AppMetricsChecker appMetricsParentUserSourceChecker =
AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(root.userSource, true);
leaf.queueMetrics.submitAppAttempt(USER);
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(leaf.queueSource, true);
appMetricsParentQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(root.queueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(leaf.userSource, true);
appMetricsParentUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(root.userSource, true);
root.queueMetrics.setAvailableResourcesToQueue(
RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
leaf.queueMetrics.setAvailableResourcesToQueue(
RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
root.queueMetrics.setAvailableResourcesToUser(
RMNodeLabelsManager.NO_LABEL,
USER, Resources.createResource(10*GB, 10));
leaf.queueMetrics.setAvailableResourcesToUser(
RMNodeLabelsManager.NO_LABEL,
USER, Resources.createResource(10*GB, 10));
leaf.queueMetrics.incrPendingResources(
RMNodeLabelsManager.NO_LABEL,
USER, 5, Resources.createResource(3*GB, 3));
ResourceMetricsChecker resMetricsQueueSourceChecker =
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(leaf.queueSource);
ResourceMetricsChecker resMetricsParentQueueSourceChecker =
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(root.queueSource);
ResourceMetricsChecker resMetricsUserSourceChecker =
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 10 * GB).gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(leaf.userSource);
ResourceMetricsChecker resMetricsParentUserSourceChecker =
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 10 * GB).gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(root.userSource);
leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(leaf.queueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(leaf.userSource, true);
leaf.queueMetrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
USER, 3, Resources.createResource(2*GB, 2), true);
leaf.queueMetrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
USER, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
resMetricsQueueSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(leaf.queueSource);
resMetricsParentQueueSourceChecker =
ResourceMetricsChecker
.createFromChecker(resMetricsParentQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(root.queueSource);
resMetricsUserSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(leaf.userSource);
resMetricsParentUserSourceChecker = ResourceMetricsChecker
.createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(root.userSource);
leaf.queueMetrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
USER, 1, Resources.createResource(2*GB, 2));
leaf.queueMetrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
USER, Resources.createResource(3*GB, 3));
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(leaf.queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(root.queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(leaf.userSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(root.userSource);
leaf.queueMetrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(leaf.queueSource, true);
appMetricsParentQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(root.queueSource, true);
appMetricsUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(leaf.userSource, true);
appMetricsParentUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(root.userSource, true);
leaf.queueMetrics.finishApp(USER, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(leaf.queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(root.queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(leaf.userSource, true);
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(root.userSource, true);
}
@Test
public void testMetricsCache() {
MetricsSystem ms = new MetricsSystemImpl("cache");
ms.start();
try {
String p1 = "root1";
String leafQueueName = "root1.leaf";
QueueMetrics p1Metrics =
QueueMetrics.forQueue(ms, p1, null, true, conf);
Queue parentQueue1 = mock(Queue.class);
when(parentQueue1.getMetrics()).thenReturn(p1Metrics);
QueueMetrics metrics =
QueueMetrics.forQueue(ms, leafQueueName, parentQueue1, true, conf);
Assert.assertNotNull("QueueMetrics for A shoudn't be null", metrics);
// Re-register to check for cache hit, shouldn't blow up metrics-system...
// also, verify parent-metrics
QueueMetrics alterMetrics =
QueueMetrics.forQueue(ms, leafQueueName, parentQueue1, true, conf);
Assert.assertNotNull("QueueMetrics for alterMetrics shoudn't be null",
alterMetrics);
} finally {
ms.shutdown();
}
}
@Test
public void testMetricsInitializedOnRMInit() {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
MockRM rm = new MockRM(conf);
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
AppMetricsChecker.create()
.checkAgainst(metrics, true);
MetricsAsserts.assertGauge(RESERVED_CONTAINERS.getValue(), 0, metrics);
}
// This is to test all metrics can consistently show up if specified true to
// collect all metrics, even though they are not modified from last time they
// are collected. If not collecting all metrics, only modified metrics will show up.
@Test
public void testCollectAllMetrics() {
String queueName = "single";
QueueMetrics.forQueue(ms, queueName, null, false, conf);
MetricsSource queueSource = queueSource(ms, queueName);
AppMetricsChecker.create()
.checkAgainst(queueSource, true);
try {
// do not collect all metrics
AppMetricsChecker.create()
.checkAgainst(queueSource, false);
Assert.fail();
} catch (AssertionError e) {
Assert.assertTrue(
e.getMessage().contains("Expected exactly one metric for name "));
}
// collect all metrics
AppMetricsChecker.create()
.checkAgainst(queueSource, true);
}
private static void checkAggregatedNodeTypes(MetricsSource source,
long nodeLocal, long rackLocal, long offSwitch) {
MetricsRecordBuilder rb = getMetrics(source);
assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb);
assertCounter("AggregateRackLocalContainersAllocated", rackLocal, rb);
assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb);
}
static AppSchedulingInfo mockApp(String user) {
AppSchedulingInfo app = mock(AppSchedulingInfo.class);
when(app.getUser()).thenReturn(user);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1);
when(app.getApplicationAttemptId()).thenReturn(id);
return app;
}
public static MetricsSource queueSource(MetricsSystem ms, String queue) {
return ms.getSource(QueueMetrics.sourceName(queue).toString());
}
public static MetricsSource userSource(MetricsSystem ms, String queue,
String user) {
return ms.getSource(QueueMetrics.sourceName(queue).
append(",user=").append(user).toString());
}
}