blob: 8ee6c945cf756bc577668d45577a1f1e63f75425 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestPartitionQueueMetrics {
static final int GB = 1024; // MB
private static final Configuration CONF = new Configuration();
private MetricsSystem ms;
@Before
public void setUp() {
ms = new MetricsSystemImpl();
QueueMetrics.clearQueueMetrics();
PartitionQueueMetrics.clearQueueMetrics();
}
@After
public void tearDown() {
ms.shutdown();
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in only 1 partition, x.
*
* root
* / \
* q1 q2
*
* @throws Exception
*/
@Test
public void testSinglePartitionWithSingleLevelQueueMetrics()
throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
String user = "alice";
QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, CONF);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
q1.submitApp(user);
q1.submitAppAttempt(user);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("x",
Resources.createResource(100 * GB, 100));
q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(ms, "x");
MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
MetricsSource q2Source = queueSource(ms, "x", "root.q2");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
PartitionQueueMetrics pq1 =
new PartitionQueueMetrics(ms, "root.q1", parentQueue, true, CONF, "x");
Assert.assertTrue("Name of registry should be \""
+ PartitionQueueMetrics.P_RECORD_INFO.name() + "\", but was \""
+ pq1.registry.info().name() + "\".", pq1.registry.info().name()
.compareTo(PartitionQueueMetrics.P_RECORD_INFO.name()) == 0);
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in both partitions, x & y.
*
* root
* / \
* q1 q2
*
* @throws Exception
*/
@Test
public void testTwoPartitionWithSingleLevelQueueMetrics() throws Exception {
String parentQueueName = "root";
String user = "alice";
QueueMetrics root =
QueueMetrics.forQueue(ms, parentQueueName, null, false, CONF);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, false, CONF);
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, false, CONF);
AppSchedulingInfo app = mockApp(user);
q1.submitApp(user);
q1.submitAppAttempt(user);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("x",
Resources.createResource(100 * GB, 100));
q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
MetricsSource xPartitionSource = partitionSource(ms, "x");
MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
checkResources(xPartitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(xRootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
root.setAvailableResourcesToQueue("y",
Resources.createResource(400 * GB, 400));
q2.setAvailableResourcesToQueue("y",
Resources.createResource(200 * GB, 200));
q2.incrPendingResources("y", user, 3, Resource.newInstance(1024, 1));
MetricsSource yPartitionSource = partitionSource(ms, "y");
MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
MetricsSource q2Source = queueSource(ms, "y", "root.q2");
checkResources(yPartitionSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
checkResources(yRootQueueSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
checkResources(q2Source, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
}
/**
* Structure:
* Both queues, q1 has been configured to run in multiple partitions, x & y.
*
* root
* /
* q1
*
* @throws Exception
*/
@Test
public void testMultiplePartitionWithSingleQueueMetrics() throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
QueueMetrics root =
QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
root.setAvailableResourcesToQueue("y",
Resources.createResource(300 * GB, 300));
q1.incrPendingResources("x", "test_user", 2, Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(ms, "x");
MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
checkResources(userSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
q1.incrPendingResources("x", "test_user", 3, Resource.newInstance(1024, 1));
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(q1Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
checkResources(userSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
q1.incrPendingResources("x", "test_user1", 4,
Resource.newInstance(1024, 1));
MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
checkResources(q1Source, 0, 0, 0, 0, 0, 9 * GB, 9, 9);
checkResources(userSource1, 0, 0, 0, 0, 0, 4 * GB, 4, 4);
q1.incrPendingResources("y", "test_user1", 6,
Resource.newInstance(1024, 1));
MetricsSource partitionSourceY = partitionSource(ms, "y");
MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName);
MetricsSource q1SourceY = queueSource(ms, "y", "root.q1");
MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1");
checkResources(partitionSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
checkResources(rootQueueSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
checkResources(q1SourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
checkResources(userSourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in both partitions, x & y.
*
* root
* / \
* q1 q2
* q1
* / \
* q11 q12
* q2
* / \
* q21 q22
*
* @throws Exception
*/
@Test
public void testMultiplePartitionsWithMultiLevelQueuesMetrics()
throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
QueueMetrics root =
QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
when(parentQueue.getMetrics()).thenReturn(root);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
Queue childQueue1 = mock(Queue.class);
when(childQueue1.getQueueName()).thenReturn("root.q1");
when(childQueue1.getMetrics()).thenReturn(q1);
QueueMetrics q11 =
QueueMetrics.forQueue(ms, "root.q1.q11", childQueue1, true, CONF);
QueueMetrics q12 =
QueueMetrics.forQueue(ms, "root.q1.q12", childQueue1, true, CONF);
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
Queue childQueue2 = mock(Queue.class);
when(childQueue2.getQueueName()).thenReturn("root.q2");
when(childQueue2.getMetrics()).thenReturn(q2);
QueueMetrics q21 =
QueueMetrics.forQueue(ms, "root.q2.q21", childQueue2, true, CONF);
QueueMetrics q22 =
QueueMetrics.forQueue(ms, "root.q2.q22", childQueue2, true, CONF);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("x",
Resources.createResource(100 * GB, 100));
q11.setAvailableResourcesToQueue("x",
Resources.createResource(50 * GB, 50));
q11.incrPendingResources("x", "test_user", 2,
Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(ms, "x");
MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
checkResources(userSource, 0, 0, 0, 0 * GB, 0, 2 * GB, 2, 2);
q11.incrPendingResources("x", "test_user", 4,
Resource.newInstance(1024, 1));
MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 6 * GB, 6, 6);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 6 * GB, 6, 6);
checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
q11.incrPendingResources("x", "test_user1", 5,
Resource.newInstance(1024, 1));
MetricsSource q1UserSource1 = userSource(ms, "x", "test_user1", "root.q1");
MetricsSource userSource1 =
userSource(ms, "x", "test_user1", "root.q1.q11");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 11 * GB, 11, 11);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 11 * GB, 11, 11);
checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
checkResources(q1UserSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
checkResources(userSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
q12.incrPendingResources("x", "test_user", 5,
Resource.newInstance(1024, 1));
MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 16 * GB, 16, 16);
checkResources(q12Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
root.setAvailableResourcesToQueue("y",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("y",
Resources.createResource(100 * GB, 100));
q12.setAvailableResourcesToQueue("y",
Resources.createResource(50 * GB, 50));
q12.incrPendingResources("y", "test_user", 3,
Resource.newInstance(1024, 1));
MetricsSource yPartitionSource = partitionSource(ms, "y");
MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
MetricsSource q1YSource = queueSource(ms, "y", "root.q1");
MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12");
checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
checkResources(q1YSource, 0, 0, 0, 100 * GB, 100, 3 * GB, 3, 3);
checkResources(q12YSource, 0, 0, 0, 50 * GB, 50, 3 * GB, 3, 3);
root.setAvailableResourcesToQueue("y",
Resources.createResource(200 * GB, 200));
q2.setAvailableResourcesToQueue("y",
Resources.createResource(100 * GB, 100));
q21.setAvailableResourcesToQueue("y",
Resources.createResource(50 * GB, 50));
q21.incrPendingResources("y", "test_user", 5,
Resource.newInstance(1024, 1));
MetricsSource q21Source = queueSource(ms, "y", "root.q2.q21");
MetricsSource q2YSource = queueSource(ms, "y", "root.q2");
checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
checkResources(q2YSource, 0, 0, 0, 100 * GB, 100, 5 * GB, 5, 5);
checkResources(q21Source, 0, 0, 0, 50 * GB, 50, 5 * GB, 5, 5);
q22.incrPendingResources("y", "test_user", 6,
Resource.newInstance(1024, 1));
MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22");
checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
checkResources(q22Source, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
}
@Test
public void testTwoLevelWithUserMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
String user = "alice";
String partition = "x";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
when(parentQueue.getMetrics()).thenReturn(parentMetrics);
QueueMetrics metrics =
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, CONF);
AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user);
metrics.submitAppAttempt(user);
parentMetrics.setAvailableResourcesToQueue(partition,
Resources.createResource(100 * GB, 100));
metrics.setAvailableResourcesToQueue(partition,
Resources.createResource(100 * GB, 100));
parentMetrics.setAvailableResourcesToUser(partition, user,
Resources.createResource(10 * GB, 10));
metrics.setAvailableResourcesToUser(partition, user,
Resources.createResource(10 * GB, 10));
metrics.incrPendingResources(partition, user, 6,
Resources.createResource(3 * GB, 3));
MetricsSource partitionSource = partitionSource(ms, partition);
MetricsSource parentQueueSource =
queueSource(ms, partition, parentQueueName);
MetricsSource queueSource = queueSource(ms, partition, leafQueueName);
MetricsSource userSource = userSource(ms, partition, user, leafQueueName);
MetricsSource userSource1 =
userSource(ms, partition, user, parentQueueName);
checkResources(queueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, 0,
0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
6, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, 0,
0);
checkResources(userSource1, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
0, 0);
checkResources(partitionSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
6, 0, 0, 0);
metrics.runAppAttempt(app.getApplicationId(), user);
metrics.allocateResources(partition, user, 3,
Resources.createResource(1 * GB, 1), true);
metrics.reserveResource(partition, user,
Resources.createResource(3 * GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, 15,
3, 3 * GB, 3, 1);
checkResources(parentQueueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100,
15 * GB, 15, 3, 3 * GB, 3, 1);
checkResources(partitionSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB,
15, 3, 3 * GB, 3, 1);
checkResources(userSource, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
3 * GB, 3, 1);
checkResources(userSource1, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
3 * GB, 3, 1);
metrics.allocateResources(partition, user, 3,
Resources.createResource(1 * GB, 1), true);
checkResources(queueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, 12 * GB, 12,
0, 3 * GB, 3, 1);
checkResources(parentQueueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100,
12 * GB, 12, 0, 3 * GB, 3, 1);
metrics.releaseResources(partition, user, 1,
Resources.createResource(2 * GB, 2));
metrics.unreserveResource(partition, user,
Resources.createResource(3 * GB, 3));
checkResources(queueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, 12,
0, 0, 0, 0);
checkResources(parentQueueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100,
12 * GB, 12, 0, 0, 0, 0);
checkResources(partitionSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB,
12, 0, 0, 0, 0);
checkResources(userSource, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
0, 0, 0);
checkResources(userSource1, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
0, 0, 0);
metrics.finishAppAttempt(app.getApplicationId(), app.isPending(),
app.getUser());
metrics.finishApp(user, RMAppState.FINISHED);
}
@Test
public void testThreeLevelWithUserMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
String leafQueueName1 = "root.leaf.leaf1";
String user = "alice";
String partitionX = "x";
String partitionY = "y";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(parentQueueName, null, true, CONF);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
when(parentQueue.getMetrics()).thenReturn(parentMetrics);
QueueMetrics metrics =
QueueMetrics.forQueue(leafQueueName, parentQueue, true, CONF);
Queue leafQueue = mock(Queue.class);
when(leafQueue.getQueueName()).thenReturn(leafQueueName);
when(leafQueue.getMetrics()).thenReturn(metrics);
QueueMetrics metrics1 =
QueueMetrics.forQueue(leafQueueName1, leafQueue, true, CONF);
AppSchedulingInfo app = mockApp(user);
metrics1.submitApp(user);
metrics1.submitAppAttempt(user);
parentMetrics.setAvailableResourcesToQueue(partitionX,
Resources.createResource(200 * GB, 200));
parentMetrics.setAvailableResourcesToQueue(partitionY,
Resources.createResource(500 * GB, 500));
metrics.setAvailableResourcesToQueue(partitionX,
Resources.createResource(100 * GB, 100));
metrics.setAvailableResourcesToQueue(partitionY,
Resources.createResource(400 * GB, 400));
metrics1.setAvailableResourcesToQueue(partitionX,
Resources.createResource(50 * GB, 50));
metrics1.setAvailableResourcesToQueue(partitionY,
Resources.createResource(300 * GB, 300));
parentMetrics.setAvailableResourcesToUser(partitionX, user,
Resources.createResource(20 * GB, 20));
parentMetrics.setAvailableResourcesToUser(partitionY, user,
Resources.createResource(50 * GB, 50));
metrics.setAvailableResourcesToUser(partitionX, user,
Resources.createResource(10 * GB, 10));
metrics.setAvailableResourcesToUser(partitionY, user,
Resources.createResource(40 * GB, 40));
metrics1.setAvailableResourcesToUser(partitionX, user,
Resources.createResource(5 * GB, 5));
metrics1.setAvailableResourcesToUser(partitionY, user,
Resources.createResource(30 * GB, 30));
metrics1.incrPendingResources(partitionX, user, 6,
Resources.createResource(3 * GB, 3));
metrics1.incrPendingResources(partitionY, user, 6,
Resources.createResource(4 * GB, 4));
MetricsSource partitionSourceX =
partitionSource(metrics1.getMetricsSystem(), partitionX);
MetricsSource parentQueueSourceWithPartX =
queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName);
MetricsSource queueSourceWithPartX =
queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName);
MetricsSource queueSource1WithPartX =
queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1);
MetricsSource parentUserSourceWithPartX = userSource(metrics1.getMetricsSystem(),
partitionX, user, parentQueueName);
MetricsSource userSourceWithPartX = userSource(metrics1.getMetricsSystem(),
partitionX, user, leafQueueName);
MetricsSource userSource1WithPartX = userSource(metrics1.getMetricsSystem(),
partitionX, user, leafQueueName1);
checkResources(partitionSourceX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, 18,
6, 0, 0, 0);
checkResources(parentQueueSourceWithPartX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB,
18, 6, 0, 0, 0);
checkResources(queueSourceWithPartX, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6,
0, 0, 0);
checkResources(queueSource1WithPartX, 0, 0, 0, 0, 0, 50 * GB, 50, 18 * GB, 18, 6,
0, 0, 0);
checkResources(parentUserSourceWithPartX, 0, 0, 0, 0, 0, 20 * GB, 20, 18 * GB, 18,
6, 0, 0, 0);
checkResources(userSourceWithPartX, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
0, 0);
checkResources(userSource1WithPartX, 0, 0, 0, 0, 0, 5 * GB, 5, 18 * GB, 18, 6, 0,
0, 0);
MetricsSource partitionSourceY =
partitionSource(metrics1.getMetricsSystem(), partitionY);
MetricsSource parentQueueSourceWithPartY =
queueSource(metrics1.getMetricsSystem(), partitionY, parentQueueName);
MetricsSource queueSourceWithPartY =
queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName);
MetricsSource queueSource1WithPartY =
queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName1);
MetricsSource parentUserSourceWithPartY = userSource(metrics1.getMetricsSystem(),
partitionY, user, parentQueueName);
MetricsSource userSourceWithPartY = userSource(metrics1.getMetricsSystem(),
partitionY, user, leafQueueName);
MetricsSource userSource1WithPartY = userSource(metrics1.getMetricsSystem(),
partitionY, user, leafQueueName1);
checkResources(partitionSourceY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, 24,
6, 0, 0, 0);
checkResources(parentQueueSourceWithPartY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB,
24, 6, 0, 0, 0);
checkResources(queueSourceWithPartY, 0, 0, 0, 0, 0, 400 * GB, 400, 24 * GB, 24, 6,
0, 0, 0);
checkResources(queueSource1WithPartY, 0, 0, 0, 0, 0, 300 * GB, 300, 24 * GB, 24, 6,
0, 0, 0);
checkResources(parentUserSourceWithPartY, 0, 0, 0, 0, 0, 50 * GB, 50, 24 * GB, 24,
6, 0, 0, 0);
checkResources(userSourceWithPartY, 0, 0, 0, 0, 0, 40 * GB, 40, 24 * GB, 24, 6, 0,
0, 0);
checkResources(userSource1WithPartY, 0, 0, 0, 0, 0, 30 * GB, 30, 24 * GB, 24, 6, 0,
0, 0);
metrics1.finishAppAttempt(app.getApplicationId(), app.isPending(),
app.getUser());
metrics1.finishApp(user, RMAppState.FINISHED);
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in only 1 partition, x
* UserMetrics has been disabled, hence trying to access the user source
* throws NPE from sources.
*
* root
* / \
* q1 q2
*
* @throws Exception
*/
@Test(expected = NullPointerException.class)
public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics()
throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
String user = "alice";
QueueMetrics root = QueueMetrics.forQueue("root", null, false, CONF);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
CSQueueMetrics q1 =
CSQueueMetrics.forQueue("root.q1", parentQueue, false, CONF);
CSQueueMetrics q2 =
CSQueueMetrics.forQueue("root.q2", parentQueue, false, CONF);
AppSchedulingInfo app = mockApp(user);
q1.submitApp(user);
q1.submitAppAttempt(user);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x");
MetricsSource rootQueueSource =
queueSource(q1.getMetricsSystem(), "x", parentQueueName);
MetricsSource q1Source = queueSource(q1.getMetricsSystem(), "x", "root.q1");
MetricsSource q1UserSource =
userSource(q1.getMetricsSystem(), "x", user, "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
checkResources(q1UserSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
MetricsSource q2Source = queueSource(q2.getMetricsSystem(), "x", "root.q2");
MetricsSource q2UserSource =
userSource(q1.getMetricsSystem(), "x", user, "root.q2");
checkResources(partitionSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
checkResources(q2UserSource, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
q1.finishAppAttempt(app.getApplicationId(), app.isPending(), app.getUser());
q1.finishApp(user, RMAppState.FINISHED);
}
public static MetricsSource partitionSource(MetricsSystem ms,
String partition) {
MetricsSource s =
ms.getSource(QueueMetrics.pSourceName(partition).toString());
return s;
}
public static MetricsSource queueSource(MetricsSystem ms, String partition,
String queue) {
MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
.append(QueueMetrics.qSourceName(queue)).toString());
return s;
}
public static MetricsSource userSource(MetricsSystem ms, String partition,
String user, String queue) {
MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
.append(QueueMetrics.qSourceName(queue)).append(",user=")
.append(user).toString());
return s;
}
public static void checkResources(MetricsSource source, long allocatedMB,
int allocatedCores, int allocCtnrs, long availableMB, int availableCores,
long pendingMB, int pendingCores, int pendingCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb);
assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb);
}
private static AppSchedulingInfo mockApp(String user) {
AppSchedulingInfo app = mock(AppSchedulingInfo.class);
when(app.getUser()).thenReturn(user);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1);
when(app.getApplicationAttemptId()).thenReturn(id);
return app;
}
public static void checkResources(MetricsSource source, long allocatedMB,
int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
long aggreReleasedCtnrs, long availableMB, int availableCores,
long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB,
int reservedCores, int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb);
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb);
assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb);
assertGauge("ReservedMB", reservedMB, rb);
assertGauge("ReservedVCores", reservedCores, rb);
assertGauge("ReservedContainers", reservedCtnrs, rb);
}
}