blob: 83a354de5a24445abfc080e532691c310bab7eb0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assume.assumeTrue;
public class TestSchedulerHealth {
private ResourceManager resourceManager;
public void setup() {
resourceManager = new ResourceManager() {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
};
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
resourceManager.init(conf);
resourceManager.getRMContext().getContainerTokenSecretManager()
.rollMasterKey();
resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
}
@Test
public void testCounts() {
SchedulerHealth sh = new SchedulerHealth();
int value = 1;
for (int i = 0; i < 2; ++i) {
sh.updateSchedulerPreemptionCounts(value);
sh.updateSchedulerAllocationCounts(value);
sh.updateSchedulerReservationCounts(value);
sh.updateSchedulerReleaseCounts(value);
Assert.assertEquals(value, sh.getAllocationCount().longValue());
Assert.assertEquals(value, sh.getReleaseCount().longValue());
Assert.assertEquals(value, sh.getReservationCount().longValue());
Assert.assertEquals(value, sh.getPreemptionCount().longValue());
Assert.assertEquals(value * (i + 1), sh.getAggregateAllocationCount()
.longValue());
Assert.assertEquals(value * (i + 1), sh.getAggregateReleaseCount()
.longValue());
Assert.assertEquals(value * (i + 1), sh.getAggregateReservationCount()
.longValue());
Assert.assertEquals(value * (i + 1), sh.getAggregatePreemptionCount()
.longValue());
}
}
@Test
public void testOperationDetails() {
SchedulerHealth sh = new SchedulerHealth();
long now = Time.now();
sh.updateRelease(now, NodeId.newInstance("testhost", 1234),
ContainerId.fromString("container_1427562107907_0002_01_000001"),
"testqueue");
Assert.assertEquals("container_1427562107907_0002_01_000001", sh
.getLastReleaseDetails().getContainerId().toString());
Assert.assertEquals("testhost:1234", sh.getLastReleaseDetails().getNodeId()
.toString());
Assert.assertEquals("testqueue", sh.getLastReleaseDetails().getQueue());
Assert.assertEquals(now, sh.getLastReleaseDetails().getTimestamp());
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
now = Time.now();
sh.updateReservation(now, NodeId.newInstance("testhost1", 1234),
ContainerId.fromString("container_1427562107907_0003_01_000001"),
"testqueue1");
Assert.assertEquals("container_1427562107907_0003_01_000001", sh
.getLastReservationDetails().getContainerId().toString());
Assert.assertEquals("testhost1:1234", sh.getLastReservationDetails()
.getNodeId().toString());
Assert
.assertEquals("testqueue1", sh.getLastReservationDetails().getQueue());
Assert.assertEquals(now, sh.getLastReservationDetails().getTimestamp());
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
now = Time.now();
sh.updateAllocation(now, NodeId.newInstance("testhost2", 1234),
ContainerId.fromString("container_1427562107907_0004_01_000001"),
"testqueue2");
Assert.assertEquals("container_1427562107907_0004_01_000001", sh
.getLastAllocationDetails().getContainerId().toString());
Assert.assertEquals("testhost2:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("testqueue2", sh.getLastAllocationDetails().getQueue());
Assert.assertEquals(now, sh.getLastAllocationDetails().getTimestamp());
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
now = Time.now();
sh.updatePreemption(now, NodeId.newInstance("testhost3", 1234),
ContainerId.fromString("container_1427562107907_0005_01_000001"),
"testqueue3");
Assert.assertEquals("container_1427562107907_0005_01_000001", sh
.getLastPreemptionDetails().getContainerId().toString());
Assert.assertEquals("testhost3:1234", sh.getLastPreemptionDetails()
.getNodeId().toString());
Assert.assertEquals("testqueue3", sh.getLastPreemptionDetails().getQueue());
Assert.assertEquals(now, sh.getLastPreemptionDetails().getTimestamp());
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
}
@Test
public void testResourceUpdate() {
SchedulerHealth sh = new SchedulerHealth();
long now = Time.now();
sh.updateSchedulerRunDetails(now, Resource.newInstance(1024, 1),
Resource.newInstance(2048, 1));
Assert.assertEquals(now, sh.getLastSchedulerRunTime());
Assert.assertEquals(Resource.newInstance(1024, 1),
sh.getResourcesAllocated());
Assert.assertEquals(Resource.newInstance(2048, 1),
sh.getResourcesReserved());
now = Time.now();
sh.updateSchedulerReleaseDetails(now, Resource.newInstance(3072, 1));
Assert.assertEquals(now, sh.getLastSchedulerRunTime());
Assert.assertEquals(Resource.newInstance(3072, 1),
sh.getResourcesReleased());
}
private NodeManager registerNode(String hostName, int containerManagerPort,
int httpPort, String rackName, Resource capability) throws IOException,
YarnException {
NodeManager nm =
new NodeManager(hostName, containerManagerPort, httpPort, rackName,
capability, resourceManager);
NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId()));
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
return nm;
}
private void nodeUpdate(NodeManager nm) {
RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
}
@Test
public void testCapacitySchedulerAllocation() throws Exception {
setup();
boolean isCapacityScheduler =
resourceManager.getResourceScheduler() instanceof CapacityScheduler;
assumeTrue("This test is only supported on Capacity Scheduler",
isCapacityScheduler);
// Register node1
String host_0 = "host_0";
NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * 1024, 1));
// ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0);
Priority priority_1 = Priority.newInstance(1);
// Submit an application
Application application_0 =
new Application("user_0", "default", resourceManager);
application_0.submit();
application_0.addNodeManager(host_0, 1234, nm_0);
Resource capability_0_0 = Resources.createResource(1024, 1);
application_0.addResourceRequestSpec(priority_1, capability_0_0);
Resource capability_0_1 = Resources.createResource(2 * 1024, 1);
application_0.addResourceRequestSpec(priority_0, capability_0_1);
Task task_0_0 =
new Task(application_0, priority_1, new String[] { host_0 });
application_0.addTask(task_0_0);
Task task_0_1 =
new Task(application_0, priority_0, new String[] { host_0 });
application_0.addTask(task_0_1);
// Send resource requests to the scheduler
application_0.schedule();
// Send a heartbeat to kick the tires on the Scheduler
nodeUpdate(nm_0);
SchedulerHealth sh =
((CapacityScheduler) resourceManager.getResourceScheduler())
.getSchedulerHealth();
// Now SchedulerHealth records last container allocated, aggregated
// allocation account will not be changed
Assert.assertEquals(1, sh.getAllocationCount().longValue());
Assert.assertEquals(Resource.newInstance(1 * 1024, 1),
sh.getResourcesAllocated());
Assert.assertEquals(2, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
.getQueue());
Task task_0_2 =
new Task(application_0, priority_0, new String[] { host_0 });
application_0.addTask(task_0_2);
application_0.schedule();
nodeUpdate(nm_0);
Assert.assertEquals(1, sh.getAllocationCount().longValue());
Assert.assertEquals(Resource.newInstance(2 * 1024, 1),
sh.getResourcesAllocated());
Assert.assertEquals(3, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
.getQueue());
}
@Test
public void testCapacitySchedulerReservation() throws Exception {
setup();
boolean isCapacityScheduler =
resourceManager.getResourceScheduler() instanceof CapacityScheduler;
assumeTrue("This test is only supported on Capacity Scheduler",
isCapacityScheduler);
// Register nodes
String host_0 = "host_0";
NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * 1024, 1));
String host_1 = "host_1";
NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * 1024, 1));
nodeUpdate(nm_0);
nodeUpdate(nm_1);
// ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0);
Priority priority_1 = Priority.newInstance(1);
// Submit an application
Application application_0 =
new Application("user_0", "default", resourceManager);
application_0.submit();
application_0.addNodeManager(host_0, 1234, nm_0);
application_0.addNodeManager(host_1, 1234, nm_1);
Resource capability_0_0 = Resources.createResource(1024, 1);
application_0.addResourceRequestSpec(priority_1, capability_0_0);
Resource capability_0_1 = Resources.createResource(2 * 1024, 1);
application_0.addResourceRequestSpec(priority_0, capability_0_1);
Task task_0_0 =
new Task(application_0, priority_1, new String[] { host_0 });
application_0.addTask(task_0_0);
// Send resource requests to the scheduler
application_0.schedule();
// Send a heartbeat to kick the tires on the Scheduler
nodeUpdate(nm_0);
SchedulerHealth sh =
((CapacityScheduler) resourceManager.getResourceScheduler())
.getSchedulerHealth();
Assert.assertEquals(1, sh.getAllocationCount().longValue());
Assert.assertEquals(Resource.newInstance(1024, 1),
sh.getResourcesAllocated());
Assert.assertEquals(1, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
.getQueue());
Task task_0_1 =
new Task(application_0, priority_0, new String[] { host_0 });
application_0.addTask(task_0_1);
application_0.schedule();
nodeUpdate(nm_0);
Assert.assertEquals(0, sh.getAllocationCount().longValue());
Assert.assertEquals(1, sh.getReservationCount().longValue());
Assert.assertEquals(Resource.newInstance(2 * 1024, 1),
sh.getResourcesReserved());
Assert.assertEquals(1, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
.getQueue());
}
}