blob: b9c5500a7d20b1603a9222c13ed4ec71d51a1ee1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestResourceManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestResourceManager.class);
private ResourceManager resourceManager = null;
@Rule
public ExpectedException thrown = ExpectedException.none();
private FSConfigConverterTestCommons converterTestCommons;
@Before
public void setUp() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
DefaultMetricsSystem.setMiniClusterMode(true);
resourceManager = new ResourceManager();
resourceManager.init(conf);
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
converterTestCommons = new FSConfigConverterTestCommons();
converterTestCommons.setUp();
}
@After
public void tearDown() throws Exception {
resourceManager.stop();
converterTestCommons.tearDown();
}
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
registerNode(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability, NodeStatus nodeStatus)
throws IOException, YarnException {
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
hostName, containerManagerPort, httpPort, rackName, capability,
resourceManager, nodeStatus);
NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext()
.getRMNodes().get(nm.getNodeId()));
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
return nm;
}
@Test
public void testResourceAllocation()
throws IOException, YarnException, InterruptedException,
TimeoutException {
LOG.info("--- START: testResourceAllocation ---");
final int memory = 4 * 1024;
final int vcores = 4;
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node1
String host1 = "host1";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 =
registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory, vcores), mockNodeStatus);
// Register node2
String host2 = "host2";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 =
registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory/2, vcores/2), mockNodeStatus);
// nodes should be in RUNNING state
RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
nm1.getNodeId());
RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
nm2.getNodeId());
node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null,
mockNodeStatus));
node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null,
mockNodeStatus));
// Submit an application
Application application = new Application("user1", resourceManager);
application.submit();
application.addNodeManager(host1, 1234, nm1);
application.addNodeManager(host2, 1234, nm2);
// Application resource requirements
final int memory1 = 1024;
Resource capability1 = Resources.createResource(memory1, 1);
Priority priority1 = Priority.newInstance(1);
application.addResourceRequestSpec(priority1, capability1);
Task t1 = new Task(application, priority1, new String[] {host1, host2});
application.addTask(t1);
final int memory2 = 2048;
Resource capability2 = Resources.createResource(memory2, 1);
Priority priority0 = Priority.newInstance(0); // higher
application.addResourceRequestSpec(priority0, capability2);
// Send resource requests to the scheduler
application.schedule();
// Send a heartbeat to kick the tires on the Scheduler
nodeUpdate(nm1);
((AbstractYarnScheduler)resourceManager.getResourceScheduler()).update();
// Get allocations from the scheduler
application.schedule();
checkResourceUsage(nm1, nm2);
LOG.info("Adding new tasks...");
Task t2 = new Task(application, priority1, new String[] {host1, host2});
application.addTask(t2);
Task t3 = new Task(application, priority0, new String[] {ResourceRequest.ANY});
application.addTask(t3);
// Send resource requests to the scheduler
application.schedule();
checkResourceUsage(nm1, nm2);
// Send heartbeats to kick the tires on the Scheduler
nodeUpdate(nm2);
nodeUpdate(nm2);
nodeUpdate(nm1);
nodeUpdate(nm1);
// Get allocations from the scheduler
LOG.info("Trying to allocate...");
application.schedule();
checkResourceUsage(nm1, nm2);
// Complete tasks
LOG.info("Finishing up tasks...");
application.finishTask(t1);
application.finishTask(t2);
application.finishTask(t3);
// Notify scheduler application is finished.
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(
application.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
checkResourceUsage(nm1, nm2);
LOG.info("--- END: testResourceAllocation ---");
}
private void nodeUpdate(
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1) {
RMNode node = resourceManager.getRMContext().getRMNodes().get(nm1.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
}
@Test
public void testNodeHealthReportIsNotNull() throws Exception{
String host1 = "host1";
final int memory = 4 * 1024;
NodeStatus mockNodeStatus = createMockNodeStatus();
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 =
registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory, 1), mockNodeStatus);
nm1.heartbeat();
nm1.heartbeat();
Collection<RMNode> values = resourceManager.getRMContext().getRMNodes().values();
for (RMNode ni : values) {
assertNotNull(ni.getHealthReport());
}
}
private void checkResourceUsage(
org.apache.hadoop.yarn.server.resourcemanager.NodeManager... nodes ) {
for (org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager : nodes) {
nodeManager.checkResourceUsage();
}
}
@Test (timeout = 30000)
public void testResourceManagerInitConfigValidation() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS, -1);
try {
resourceManager = new MockRM(conf);
fail("Exception is expected because the global max attempts" +
" is negative.");
} catch (YarnRuntimeException e) {
// Exception is expected.
if (!e.getMessage().startsWith(
"Invalid global max attempts configuration")) throw e;
}
Configuration yarnConf = new YarnConfiguration();
yarnConf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, -1);
try {
resourceManager = new MockRM(yarnConf);
fail("Exception is expected because AM max attempts" +
" is negative.");
} catch (YarnRuntimeException e) {
// Exception is expected.
if (!e.getMessage().startsWith(
"Invalid rm am max attempts configuration")) throw e;
}
}
@Test
public void testNMExpiryAndHeartbeatIntervalsValidation() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000);
conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1001);
try {
resourceManager = new MockRM(conf);
} catch (YarnRuntimeException e) {
// Exception is expected.
if (!e.getMessage().startsWith("Nodemanager expiry interval should be no"
+ " less than heartbeat interval")) {
throw e;
}
}
}
@Test(timeout = 50000)
public void testFilterOverrides() throws Exception {
String filterInitializerConfKey = "hadoop.http.filter.initializers";
String[] filterInitializers =
{
AuthenticationFilterInitializer.class.getName(),
RMAuthenticationFilterInitializer.class.getName(),
AuthenticationFilterInitializer.class.getName() + ","
+ RMAuthenticationFilterInitializer.class.getName(),
AuthenticationFilterInitializer.class.getName() + ", "
+ RMAuthenticationFilterInitializer.class.getName(),
AuthenticationFilterInitializer.class.getName() + ", "
+ this.getClass().getName() };
for (String filterInitializer : filterInitializers) {
resourceManager = new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
// Skip the login.
}
};
Configuration conf = new YarnConfiguration();
conf.set(filterInitializerConfKey, filterInitializer);
conf.set("hadoop.security.authentication", "kerberos");
conf.set("hadoop.http.authentication.type", "kerberos");
try {
try {
UserGroupInformation.setConfiguration(conf);
} catch (Exception e) {
// ignore we just care about getting true for
// isSecurityEnabled()
LOG.info("Got expected exception");
}
resourceManager.init(conf);
resourceManager.startWepApp();
} catch (RuntimeException e) {
// Exceptions are expected because we didn't setup everything
// just want to test filter settings
String tmp = resourceManager.getConfig().get(filterInitializerConfKey);
if (filterInitializer.contains(this.getClass().getName())) {
Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName()
+ "," + this.getClass().getName(), tmp);
} else {
Assert.assertEquals(
RMAuthenticationFilterInitializer.class.getName(), tmp);
}
resourceManager.stop();
}
}
// simple mode overrides
String[] simpleFilterInitializers =
{ "", StaticUserWebFilter.class.getName() };
for (String filterInitializer : simpleFilterInitializers) {
resourceManager = new ResourceManager();
Configuration conf = new YarnConfiguration();
conf.set(filterInitializerConfKey, filterInitializer);
try {
UserGroupInformation.setConfiguration(conf);
resourceManager.init(conf);
resourceManager.startWepApp();
} catch (RuntimeException e) {
// Exceptions are expected because we didn't setup everything
// just want to test filter settings
String tmp = resourceManager.getConfig().get(filterInitializerConfKey);
if (filterInitializer.equals(StaticUserWebFilter.class.getName())) {
Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName()
+ "," + StaticUserWebFilter.class.getName(), tmp);
} else {
Assert.assertEquals(
RMAuthenticationFilterInitializer.class.getName(), tmp);
}
resourceManager.stop();
}
}
}
/**
* Test whether ResourceManager passes user-provided conf to
* UserGroupInformation class. If it reads this (incorrect)
* AuthenticationMethod enum an exception is thrown.
*/
@Test
public void testUserProvidedUGIConf() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid attribute value for "
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
+ " of DUMMYAUTH");
Configuration dummyConf = new YarnConfiguration();
dummyConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"DUMMYAUTH");
ResourceManager dummyResourceManager = new ResourceManager();
try {
dummyResourceManager.init(dummyConf);
} finally {
dummyResourceManager.stop();
}
}
}