blob: ec3850181b6bcc617a0902e934f1f2f05c231db8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import static org.mockito.Mockito.spy;
public abstract class BaseContainerManagerTest {
protected static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
protected static FileContext localFS;
protected static File localDir;
protected static File localLogDir;
protected static File remoteLogDir;
protected static File tmpDir;
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
public BaseContainerManagerTest() throws UnsupportedFileSystemException {
localFS = FileContext.getLocalFSFileContext();
localDir =
new File("target", this.getClass().getSimpleName() + "-localDir")
.getAbsoluteFile();
localLogDir =
new File("target", this.getClass().getSimpleName() + "-localLogDir")
.getAbsoluteFile();
remoteLogDir =
new File("target", this.getClass().getSimpleName() + "-remoteLogDir")
.getAbsoluteFile();
tmpDir = new File("target", this.getClass().getSimpleName() + "-tmpDir");
}
protected static Log LOG = LogFactory
.getLog(BaseContainerManagerTest.class);
protected static final int HTTP_PORT = 5412;
protected Configuration conf = new YarnConfiguration();
protected Context context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
conf) {
public int getHttpPort() {
return HTTP_PORT;
};
};
protected ContainerExecutor exec;
protected DeletionService delSrvc;
protected String user = "nobody";
protected NodeHealthCheckerService nodeHealthChecker;
protected LocalDirsHandlerService dirsHandler;
protected final long DUMMY_RM_IDENTIFIER = 1234;
protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
context, new AsyncDispatcher(), null, metrics) {
@Override
protected ResourceTracker getRMClient() {
return new LocalRMInterface();
};
@Override
protected void stopRMProxy() {
return;
}
@Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.
}
@Override
public long getRMIdentifier() {
// There is no real RM registration, simulate and set RMIdentifier
return DUMMY_RM_IDENTIFIER;
}
};
protected ContainerManagerImpl containerManager = null;
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor();
exec.setConf(conf);
return spy(exec);
}
@Before
public void setup() throws IOException {
localFS.delete(new Path(localDir.getAbsolutePath()), true);
localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
localFS.delete(new Path(localLogDir.getAbsolutePath()), true);
localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true);
localDir.mkdir();
tmpDir.mkdir();
localLogDir.mkdir();
remoteLogDir.mkdir();
LOG.info("Created localDir in " + localDir.getAbsolutePath());
LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
String bindAddress = "0.0.0.0:" + ServerSocketUtil.getPort(49162, 10);
conf.set(YarnConfiguration.NM_ADDRESS, bindAddress);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:"
+ ServerSocketUtil.getPort(8040, 10));
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
// Default delSrvc
delSrvc = createDeletionService();
delSrvc.init(conf);
exec = createContainerExecutor();
dirsHandler = new LocalDirsHandlerService();
nodeHealthChecker = new NodeHealthCheckerService(
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
nodeHealthChecker.init(conf);
containerManager = createContainerManager(delSrvc);
((NMContext)context).setContainerManager(containerManager);
nodeStatusUpdater.init(conf);
containerManager.init(conf);
nodeStatusUpdater.start();
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
}
protected ContainerManagerImpl
createContainerManager(DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, dirsHandler) {
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
// do nothing
}
@Override
protected void authorizeUser(UserGroupInformation remoteUgi,
NMTokenIdentifier nmTokenIdentifier) {
// do nothing
}
@Override
protected void authorizeStartAndResourceIncreaseRequest(
NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
boolean startRequest) throws YarnException {
// do nothing
}
@Override
protected void updateNMTokenIdentifier(
NMTokenIdentifier nmTokenIdentifier) throws InvalidToken {
// Do nothing
}
@Override
public Map<String, ByteBuffer> getAuxServiceMetaData() {
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("AuxService1",
ByteBuffer.wrap("AuxServiceMetaData1".getBytes()));
serviceData.put("AuxService2",
ByteBuffer.wrap("AuxServiceMetaData2".getBytes()));
return serviceData;
}
@Override
protected NMTokenIdentifier selectNMTokenIdentifier(
UserGroupInformation remoteUgi) {
return new NMTokenIdentifier();
}
};
}
protected DeletionService createDeletionService() {
return new DeletionService(exec) {
@Override
public void delete(String user, Path subDir, Path... baseDirs) {
// Don't do any deletions.
LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
+ ", baseDirs - " + Arrays.asList(baseDirs));
};
};
}
@After
public void tearDown() throws IOException, InterruptedException {
if (containerManager != null) {
containerManager.stop();
}
createContainerExecutor().deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(user)
.setSubDir(new Path(localDir.getAbsolutePath()))
.setBasedirs(new Path[] {})
.build());
}
public static void waitForContainerState(ContainerManagementProtocol containerManager,
ContainerId containerID, ContainerState finalState)
throws InterruptedException, YarnException, IOException {
waitForContainerState(containerManager, containerID, finalState, 20);
}
public static void waitForContainerState(ContainerManagementProtocol containerManager,
ContainerId containerID, ContainerState finalState, int timeOutMax)
throws InterruptedException, YarnException, IOException {
List<ContainerId> list = new ArrayList<ContainerId>();
list.add(containerID);
GetContainerStatusesRequest request =
GetContainerStatusesRequest.newInstance(list);
ContainerStatus containerStatus = null;
int timeoutSecs = 0;
do {
Thread.sleep(2000);
containerStatus =
containerManager.getContainerStatuses(request)
.getContainerStatuses().get(0);
LOG.info("Waiting for container to get into state " + finalState
+ ". Current state is " + containerStatus.getState());
timeoutSecs += 2;
} while (!containerStatus.getState().equals(finalState)
&& timeoutSecs < timeOutMax);
LOG.info("Container state is " + containerStatus.getState());
Assert.assertEquals("ContainerState is not correct (timedout)",
finalState, containerStatus.getState());
}
static void waitForApplicationState(ContainerManagerImpl containerManager,
ApplicationId appID, ApplicationState finalState)
throws InterruptedException {
// Wait for app-finish
Application app =
containerManager.getContext().getApplications().get(appID);
int timeout = 0;
while (!(app.getApplicationState().equals(finalState))
&& timeout++ < 15) {
LOG.info("Waiting for app to reach " + finalState
+ ".. Current state is "
+ app.getApplicationState());
Thread.sleep(1000);
}
Assert.assertTrue("App is not in " + finalState + " yet!! Timedout!!",
app.getApplicationState().equals(finalState));
}
public static void waitForNMContainerState(ContainerManagerImpl
containerManager, ContainerId containerID,
org.apache.hadoop.yarn.server.nodemanager.containermanager
.container.ContainerState finalState)
throws InterruptedException, YarnException, IOException {
waitForNMContainerState(containerManager, containerID, finalState, 20);
}
public static void waitForNMContainerState(ContainerManagerImpl
containerManager, ContainerId containerID,
org.apache.hadoop.yarn.server.nodemanager.containermanager
.container.ContainerState finalState, int timeOutMax)
throws InterruptedException, YarnException, IOException {
Container container = null;
org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.ContainerState currentState = null;
int timeoutSecs = 0;
do {
Thread.sleep(2000);
container =
containerManager.getContext().getContainers().get(containerID);
if (container != null) {
currentState = container.getContainerState();
}
if (currentState != null) {
LOG.info("Waiting for NM container to get into state " + finalState
+ ". Current state is " + currentState);
}
timeoutSecs += 2;
} while (!currentState.equals(finalState)
&& timeoutSecs++ < timeOutMax);
LOG.info("Container state is " + currentState);
Assert.assertEquals("ContainerState is not correct (timedout)",
finalState, currentState);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager)
throws IOException {
return createContainerToken(cId, rmIdentifier, nodeId, user,
containerTokenSecretManager, null);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext)
throws IOException {
Resource r = BuilderUtils.newResource(1024, 1);
return createContainerToken(cId, rmIdentifier, nodeId, user, r,
containerTokenSecretManager, logAggregationContext);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext)
throws IOException {
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext, null);
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext, ExecutionType executionType)
throws IOException {
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, 0, nodeId.toString(), user, resource,
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext, null,
ContainerType.TASK, executionType);
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}
public static ContainerId createContainerId(int id) {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
return containerId;
}
}