blob: bcadf76e4bd4df3fbdd247ec257c4e3e4a949f13 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeManagerTestBase;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestResourcePluginManager extends NodeManagerTestBase {
private NodeManager nm;
ResourcePluginManager stubResourcePluginmanager() {
// Stub ResourcePluginManager
final ResourcePluginManager rpm = mock(ResourcePluginManager.class);
Map<String, ResourcePlugin> plugins = new HashMap<>();
// First resource plugin
ResourcePlugin resourcePlugin = mock(ResourcePlugin.class);
NodeResourceUpdaterPlugin nodeResourceUpdaterPlugin = mock(
NodeResourceUpdaterPlugin.class);
when(resourcePlugin.getNodeResourceHandlerInstance()).thenReturn(
nodeResourceUpdaterPlugin);
plugins.put("resource1", resourcePlugin);
// Second resource plugin
resourcePlugin = mock(ResourcePlugin.class);
when(resourcePlugin.createResourceHandler(any(Context.class), any(
CGroupsHandler.class), any(PrivilegedOperationExecutor.class)))
.thenReturn(new CustomizedResourceHandler());
plugins.put("resource2", resourcePlugin);
when(rpm.getNameToPlugins()).thenReturn(plugins);
return rpm;
}
@After
public void tearDown() {
if (nm != null) {
try {
ServiceOperations.stop(nm);
} catch (Throwable t) {
// ignore
}
}
}
private class CustomizedResourceHandler implements ResourceHandler {
@Override
public List<PrivilegedOperation> bootstrap(Configuration configuration)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> teardown()
throws ResourceHandlerException {
return null;
}
}
private class MyMockNM extends NodeManager {
private final ResourcePluginManager rpm;
public MyMockNM(ResourcePluginManager rpm) {
this.rpm = rpm;
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
((NodeManager.NMContext)context).setResourcePluginManager(rpm);
return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker,
metrics, new BaseResourceTrackerForTest());
}
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService diskhandler) {
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
metrics, diskhandler);
}
@Override
protected ResourcePluginManager createResourcePluginManager() {
return rpm;
}
}
public class MyLCE extends LinuxContainerExecutor {
private PrivilegedOperationExecutor poe = mock(PrivilegedOperationExecutor.class);
@Override
protected PrivilegedOperationExecutor getPrivilegedOperationExecutor() {
return poe;
}
}
/*
* Make sure ResourcePluginManager is initialized during NM start up.
*/
@Test(timeout = 30000)
public void testResourcePluginManagerInitialization() throws Exception {
final ResourcePluginManager rpm = stubResourcePluginmanager();
nm = new MyMockNM(rpm);
YarnConfiguration conf = createNMConfig();
nm.init(conf);
verify(rpm, times(1)).initialize(
any(Context.class));
}
/*
* Make sure ResourcePluginManager is invoked during NM update.
*/
@Test(timeout = 30000)
public void testNodeStatusUpdaterWithResourcePluginsEnabled() throws Exception {
final ResourcePluginManager rpm = stubResourcePluginmanager();
nm = new MyMockNM(rpm);
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
NodeResourceUpdaterPlugin nodeResourceUpdaterPlugin =
rpm.getNameToPlugins().get("resource1")
.getNodeResourceHandlerInstance();
verify(nodeResourceUpdaterPlugin, times(1)).updateConfiguredResource(
any(Resource.class));
}
/*
* Make sure ResourcePluginManager is used to initialize ResourceHandlerChain
*/
@Test(timeout = 30000)
public void testLinuxContainerExecutorWithResourcePluginsEnabled() throws Exception {
final ResourcePluginManager rpm = stubResourcePluginmanager();
final LinuxContainerExecutor lce = new MyLCE();
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
((NMContext)context).setResourcePluginManager(rpm);
return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker,
metrics, new BaseResourceTrackerForTest());
}
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService diskhandler) {
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
metrics, diskhandler);
}
@Override
protected ContainerExecutor createContainerExecutor(Configuration conf) {
((NMContext)this.getNMContext()).setResourcePluginManager(rpm);
lce.setConf(conf);
return lce;
}
};
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
ResourceHandler handler = lce.getResourceHandler();
Assert.assertNotNull(handler);
Assert.assertTrue(handler instanceof ResourceHandlerChain);
boolean newHandlerAdded = false;
for (ResourceHandler h : ((ResourceHandlerChain) handler)
.getResourceHandlerList()) {
if (h instanceof CustomizedResourceHandler) {
newHandlerAdded = true;
break;
}
}
Assert.assertTrue("New ResourceHandler should be added", newHandlerAdded);
}
}