blob: 93d52bdb79050a01947aae7c7f2632d9e239d39b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
public class TestSchedulerApplicationAttempt {
private static final NodeId nodeId = NodeId.newInstance("somehost", 5);
private Configuration conf = new Configuration();
@After
public void tearDown() {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.shutdown();
}
@Test
public void testActiveUsersWhenMove() {
final String user = "user1";
Queue parentQueue = createQueue("parent", null);
Queue queue1 = createQueue("queue1", parentQueue);
Queue queue2 = createQueue("queue2", parentQueue);
Queue queue3 = createQueue("queue3", parentQueue);
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, queue1, queue1.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
Priority requestedPriority = Priority.newInstance(2);
ResourceRequest request = ResourceRequest.newInstance(requestedPriority,
ResourceRequest.ANY, requestedResource, 1);
app.updateResourceRequests(Arrays.asList(request));
assertEquals(1, queue1.getAbstractUsersManager().getNumActiveUsers());
// move app from queue1 to queue2
app.move(queue2);
// Active user count has to decrease from queue1
assertEquals(0, queue1.getAbstractUsersManager().getNumActiveUsers());
// Increase the active user count in queue2 if the moved app has pending requests
assertEquals(1, queue2.getAbstractUsersManager().getNumActiveUsers());
// Allocated container
RMContainer container1 = createRMContainer(appAttId, 1, requestedResource);
app.liveContainers.put(container1.getContainerId(), container1);
SchedulerNode node = createNode();
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
toSchedulerKey(requestedPriority), container1);
// Active user count has to decrease from queue2 due to app has NO pending requests
assertEquals(0, queue2.getAbstractUsersManager().getNumActiveUsers());
// move app from queue2 to queue3
app.move(queue3);
// Active user count in queue3 stays same if the moved app has NO pending requests
assertEquals(0, queue3.getAbstractUsersManager().getNumActiveUsers());
}
@Test
public void testMove() {
final String user = "user1";
Queue parentQueue = createQueue("parent", null);
Queue oldQueue = createQueue("old", parentQueue);
Queue newQueue = createQueue("new", parentQueue);
QueueMetrics parentMetrics = parentQueue.getMetrics();
QueueMetrics oldMetrics = oldQueue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext);
oldMetrics.submitApp(user);
// confirm that containerId is calculated based on epoch.
assertEquals(0x30000000001L, app.getNewContainerId());
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
Priority requestedPriority = Priority.newInstance(2);
ResourceRequest request = ResourceRequest.newInstance(requestedPriority,
ResourceRequest.ANY, requestedResource, 3);
app.updateResourceRequests(Arrays.asList(request));
// Allocated container
RMContainer container1 = createRMContainer(appAttId, 1, requestedResource);
app.liveContainers.put(container1.getContainerId(), container1);
SchedulerNode node = createNode();
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
toSchedulerKey(requestedPriority), container1);
// Reserved container
Priority prio1 = Priority.newInstance(1);
Resource reservedResource = Resource.newInstance(2048, 3);
RMContainer container2 = createReservedRMContainer(appAttId, 1, reservedResource,
node.getNodeID(), prio1);
Map<NodeId, RMContainer> reservations = new HashMap<NodeId, RMContainer>();
reservations.put(node.getNodeID(), container2);
app.reservedContainers.put(toSchedulerKey(prio1), reservations);
oldMetrics.reserveResource(container2.getNodeLabelExpression(),
user, reservedResource);
checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0);
checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
app.move(newQueue);
checkQueueMetrics(oldMetrics, 0, 0, 0, 0, 0, 0, 0, 0);
checkQueueMetrics(newMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
}
private void checkQueueMetrics(QueueMetrics metrics, int activeApps,
int runningApps, int allocMb, int allocVcores, int reservedMb,
int reservedVcores, int pendingMb, int pendingVcores) {
assertEquals(activeApps, metrics.getActiveApps());
assertEquals(runningApps, metrics.getAppsRunning());
assertEquals(allocMb, metrics.getAllocatedMB());
assertEquals(allocVcores, metrics.getAllocatedVirtualCores());
assertEquals(reservedMb, metrics.getReservedMB());
assertEquals(reservedVcores, metrics.getReservedVirtualCores());
assertEquals(pendingMb, metrics.getPendingMB());
assertEquals(pendingVcores, metrics.getPendingVirtualCores());
}
private SchedulerNode createNode() {
SchedulerNode node = mock(SchedulerNode.class);
when(node.getNodeName()).thenReturn("somehost");
when(node.getRackName()).thenReturn("somerack");
when(node.getNodeID()).thenReturn(nodeId);
return node;
}
private RMContainer createReservedRMContainer(ApplicationAttemptId appAttId,
int id, Resource resource, NodeId nodeId, Priority reservedPriority) {
RMContainer container = createRMContainer(appAttId, id, resource);
when(container.getReservedResource()).thenReturn(resource);
when(container.getReservedSchedulerKey())
.thenReturn(toSchedulerKey(reservedPriority));
when(container.getReservedNode()).thenReturn(nodeId);
return container;
}
private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
Resource resource) {
ContainerId containerId = ContainerId.newContainerId(appAttId, id);
RMContainer rmContainer = mock(RMContainerImpl.class);
Container container = mock(Container.class);
when(container.getResource()).thenReturn(resource);
when(container.getNodeId()).thenReturn(nodeId);
when(rmContainer.getContainer()).thenReturn(container);
when(rmContainer.getContainerId()).thenReturn(containerId);
return rmContainer;
}
private Queue createQueue(String name, Queue parent) {
return createQueue(name, parent, 1.0f);
}
private Queue createQueue(String name, Queue parent, float capacity) {
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false, -1.0f, null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);
when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
return queue;
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}
@Test
public void testAppPercentages() throws Exception {
FifoScheduler scheduler = mock(FifoScheduler.class);
when(scheduler.getClusterResource())
.thenReturn(Resource.newInstance(10 * 1024, 10));
when(scheduler.getResourceCalculator())
.thenReturn(new DefaultResourceCalculator());
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getScheduler()).thenReturn(scheduler);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
final String user = "user1";
Queue queue = createQueue("test", null);
SchedulerApplicationAttempt app =
new SchedulerApplicationAttempt(appAttId, user, queue,
queue.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
app.attemptResourceUsage.incUsed(requestedResource);
assertEquals(15.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f);
assertEquals(15.0f,
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
queue = createQueue("test2", null, 0.5f);
app = new SchedulerApplicationAttempt(appAttId, user, queue,
queue.getAbstractUsersManager(), rmContext);
app.attemptResourceUsage.incUsed(requestedResource);
assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f);
assertEquals(15.0f,
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
app.attemptResourceUsage.incUsed(requestedResource);
app.attemptResourceUsage.incUsed(requestedResource);
app.attemptResourceUsage.incUsed(requestedResource);
assertEquals(120.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f);
assertEquals(60.0f,
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
queue = createQueue("test3", null, Float.MIN_VALUE);
app = new SchedulerApplicationAttempt(appAttId, user, queue,
queue.getAbstractUsersManager(), rmContext);
// Resource request
app.attemptResourceUsage.incUsed(requestedResource);
assertEquals(0.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f);
assertEquals(15.0f,
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
}
@Test
public void testAppPercentagesOnswitch() throws Exception {
FifoScheduler scheduler = mock(FifoScheduler.class);
when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(0, 0));
when(scheduler.getResourceCalculator())
.thenReturn(new DefaultResourceCalculator());
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getScheduler()).thenReturn(scheduler);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
final String user = "user1";
Queue queue = createQueue("test", null);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, queue, queue.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
app.attemptResourceUsage.incUsed(requestedResource);
assertEquals(0.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.0f);
assertEquals(0.0f, app.getResourceUsageReport().getClusterUsagePercentage(),
0.0f);
}
@Test
public void testAllResourceUsage() throws Exception {
FifoScheduler scheduler = mock(FifoScheduler.class);
when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(0, 0));
when(scheduler.getResourceCalculator())
.thenReturn(new DefaultResourceCalculator());
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getScheduler()).thenReturn(scheduler);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
final String user = "user1";
Queue queue = createQueue("test", null);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, queue, queue.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
app.attemptResourceUsage.incUsed("X", requestedResource);
app.attemptResourceUsage.incUsed("Y", requestedResource);
Resource r2 = Resource.newInstance(1024, 1);
app.attemptResourceUsage.incReserved("X", r2);
app.attemptResourceUsage.incReserved("Y", r2);
assertTrue("getUsedResources expected " + Resource.newInstance(3072, 4)
+ " but was " + app.getResourceUsageReport().getUsedResources(),
Resources.equals(Resource.newInstance(3072, 4),
app.getResourceUsageReport().getUsedResources()));
assertTrue("getReservedResources expected " + Resource.newInstance(2048, 2)
+ " but was "
+ app.getResourceUsageReport().getReservedResources(),
Resources.equals(Resource.newInstance(2048, 2),
app.getResourceUsageReport().getReservedResources()));
}
@Test
public void testSchedulingOpportunityOverflow() throws Exception {
ApplicationAttemptId attemptId = createAppAttemptId(0, 0);
Queue queue = createQueue("test", null);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
Priority priority = Priority.newInstance(1);
SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
assertEquals(0, app.getSchedulingOpportunities(schedulerKey));
app.addSchedulingOpportunity(schedulerKey);
assertEquals(1, app.getSchedulingOpportunities(schedulerKey));
// verify the count is capped at MAX_VALUE and does not overflow
app.setSchedulingOpportunities(schedulerKey, Integer.MAX_VALUE - 1);
assertEquals(Integer.MAX_VALUE - 1,
app.getSchedulingOpportunities(schedulerKey));
app.addSchedulingOpportunity(schedulerKey);
assertEquals(Integer.MAX_VALUE,
app.getSchedulingOpportunities(schedulerKey));
app.addSchedulingOpportunity(schedulerKey);
assertEquals(Integer.MAX_VALUE,
app.getSchedulingOpportunities(schedulerKey));
}
@Test
public void testHasPendingResourceRequest() throws Exception {
ApplicationAttemptId attemptId = createAppAttemptId(0, 0);
Queue queue = createQueue("test", null);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
Priority priority = Priority.newInstance(1);
List<ResourceRequest> requests = new ArrayList<>(2);
Resource unit = Resource.newInstance(1L, 1);
// Add a request for a container with a node label
requests.add(ResourceRequest.newInstance(priority, ResourceRequest.ANY,
unit, 1, false, "label1"));
// Add a request for a container without a node label
requests.add(ResourceRequest.newInstance(priority, ResourceRequest.ANY,
unit, 1, false, ""));
// Add unique allocation IDs so that the requests aren't considered
// duplicates
requests.get(0).setAllocationRequestId(0L);
requests.get(1).setAllocationRequestId(1L);
app.updateResourceRequests(requests);
assertTrue("Reported no pending resource requests for no label when "
+ "resource requests for no label are pending (exclusive partitions)",
app.hasPendingResourceRequest("",
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
assertTrue("Reported no pending resource requests for label with pending "
+ "resource requests (exclusive partitions)",
app.hasPendingResourceRequest("label1",
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
assertFalse("Reported pending resource requests for label with no pending "
+ "resource requests (exclusive partitions)",
app.hasPendingResourceRequest("label2",
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
assertTrue("Reported no pending resource requests for no label when "
+ "resource requests for no label are pending (relaxed partitions)",
app.hasPendingResourceRequest("",
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY));
assertTrue("Reported no pending resource requests for label with pending "
+ "resource requests (relaxed partitions)",
app.hasPendingResourceRequest("label1",
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY));
assertTrue("Reported no pending resource requests for label with no "
+ "pending resource requests (relaxed partitions)",
app.hasPendingResourceRequest("label2",
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY));
}
}