blob: 8300d8fee34aef773ab3dbd413393d22e6c9e88c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import java.io.File;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.After;
import org.junit.Before;
public abstract class BaseContainerManagerTest {
protected static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
protected static FileContext localFS;
protected static File localDir;
protected static File localLogDir;
protected static File remoteLogDir;
protected static File tmpDir;
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
public BaseContainerManagerTest() throws UnsupportedFileSystemException {
localFS = FileContext.getLocalFSFileContext();
localDir =
new File("target", this.getClass().getName() + "-localDir")
.getAbsoluteFile();
localLogDir =
new File("target", this.getClass().getName() + "-localLogDir")
.getAbsoluteFile();
remoteLogDir =
new File("target", this.getClass().getName() + "-remoteLogDir")
.getAbsoluteFile();
tmpDir = new File("target", this.getClass().getName() + "-tmpDir");
}
protected static Log LOG = LogFactory
.getLog(BaseContainerManagerTest.class);
protected Configuration conf = new YarnConfiguration();
protected Context context = new NMContext(new NMContainerTokenSecretManager(
conf));
protected ContainerExecutor exec;
protected DeletionService delSrvc;
protected String user = "nobody";
protected NodeHealthCheckerService nodeHealthChecker;
protected LocalDirsHandlerService dirsHandler;
protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
context, new AsyncDispatcher(), null, metrics) {
@Override
protected ResourceTracker getRMClient() {
return new LocalRMInterface();
};
@Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.
}
};
protected ContainerManagerImpl containerManager = null;
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor();
exec.setConf(conf);
return exec;
}
@Before
public void setup() throws IOException {
localFS.delete(new Path(localDir.getAbsolutePath()), true);
localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
localFS.delete(new Path(localLogDir.getAbsolutePath()), true);
localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true);
localDir.mkdir();
tmpDir.mkdir();
localLogDir.mkdir();
remoteLogDir.mkdir();
LOG.info("Created localDir in " + localDir.getAbsolutePath());
LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
String bindAddress = "0.0.0.0:5555";
conf.set(YarnConfiguration.NM_ADDRESS, bindAddress);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
// Default delSrvc
delSrvc = new DeletionService(exec) {
@Override
public void delete(String user, Path subDir, Path[] baseDirs) {
// Don't do any deletions.
LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
+ ", baseDirs - " + baseDirs);
};
};
delSrvc.init(conf);
exec = createContainerExecutor();
nodeHealthChecker = new NodeHealthCheckerService();
nodeHealthChecker.init(conf);
dirsHandler = nodeHealthChecker.getDiskHandler();
containerManager =
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, new ApplicationACLsManager(conf), dirsHandler);
containerManager.init(conf);
}
@After
public void tearDown() throws IOException, InterruptedException {
if (containerManager != null
&& containerManager.getServiceState() == STATE.STARTED) {
containerManager.stop();
}
createContainerExecutor().deleteAsUser(user,
new Path(localDir.getAbsolutePath()), new Path[] {});
}
public static void waitForContainerState(ContainerManager containerManager,
ContainerId containerID, ContainerState finalState)
throws InterruptedException, YarnRemoteException {
waitForContainerState(containerManager, containerID, finalState, 20);
}
public static void waitForContainerState(ContainerManager containerManager,
ContainerId containerID, ContainerState finalState, int timeOutMax)
throws InterruptedException, YarnRemoteException {
GetContainerStatusRequest request =
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
request.setContainerId(containerID);
ContainerStatus containerStatus =
containerManager.getContainerStatus(request).getStatus();
int timeoutSecs = 0;
while (!containerStatus.getState().equals(finalState)
&& timeoutSecs++ < timeOutMax) {
Thread.sleep(1000);
LOG.info("Waiting for container to get into state " + finalState
+ ". Current state is " + containerStatus.getState());
containerStatus = containerManager.getContainerStatus(request).getStatus();
}
LOG.info("Container state is " + containerStatus.getState());
Assert.assertEquals("ContainerState is not correct (timedout)",
finalState, containerStatus.getState());
}
static void waitForApplicationState(ContainerManagerImpl containerManager,
ApplicationId appID, ApplicationState finalState)
throws InterruptedException {
// Wait for app-finish
Application app =
containerManager.context.getApplications().get(appID);
int timeout = 0;
while (!(app.getApplicationState().equals(finalState))
&& timeout++ < 15) {
LOG.info("Waiting for app to reach " + finalState
+ ".. Current state is "
+ app.getApplicationState());
Thread.sleep(1000);
}
Assert.assertTrue("App is not in " + finalState + " yet!! Timedout!!",
app.getApplicationState().equals(finalState));
}
}