blob: 7635f51ba12bbba1b69f8fe2f72b7f7149163148 [file] [log] [blame]
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test to restart the AM on failure.
*
*/
public class TestAMRestart {
// private static final Log LOG = LogFactory.getLog(TestAMRestart.class);
// ApplicationsManagerImpl appImpl;
// RMContext asmContext = new RMContextImpl(new MemStore());
// ApplicationTokenSecretManager appTokenSecretManager =
// new ApplicationTokenSecretManager();
// DummyResourceScheduler scheduler;
// private ClientRMService clientRMService;
// int count = 0;
// ApplicationId appID;
// final int maxFailures = 3;
// AtomicInteger launchNotify = new AtomicInteger();
// AtomicInteger schedulerNotify = new AtomicInteger();
// volatile boolean stop = false;
// int schedulerAddApplication = 0;
// int schedulerRemoveApplication = 0;
// int launcherLaunchCalled = 0;
// int launcherCleanupCalled = 0;
// private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
//
// private class ExtApplicationsManagerImpl extends ApplicationsManagerImpl {
// public ExtApplicationsManagerImpl(
// ApplicationTokenSecretManager applicationTokenSecretManager,
// YarnScheduler scheduler, RMContext asmContext) {
// super(applicationTokenSecretManager, scheduler, asmContext);
// }
//
// @Override
// public EventHandler<ASMEvent<AMLauncherEventType>> createNewApplicationMasterLauncher(
// ApplicationTokenSecretManager tokenSecretManager) {
// return new DummyAMLauncher();
// }
// }
//
// private class DummyAMLauncher implements EventHandler<ASMEvent<AMLauncherEventType>> {
//
// public DummyAMLauncher() {
// asmContext.getDispatcher().register(AMLauncherEventType.class, this);
// new Thread() {
// public void run() {
// while (!stop) {
// LOG.info("DEBUG -- waiting for launch");
// synchronized(launchNotify) {
// while (launchNotify.get() == 0) {
// try {
// launchNotify.wait();
// } catch (InterruptedException e) {
// }
// }
// asmContext.getDispatcher().getEventHandler().handle(
// new ApplicationEvent(
// ApplicationEventType.LAUNCHED, appID));
// launchNotify.addAndGet(-1);
// }
// }
// }
// }.start();
// }
//
// @Override
// public void handle(ASMEvent<AMLauncherEventType> event) {
// switch (event.getType()) {
// case CLEANUP:
// launcherCleanupCalled++;
// break;
// case LAUNCH:
// LOG.info("DEBUG -- launching");
// launcherLaunchCalled++;
// synchronized (launchNotify) {
// launchNotify.addAndGet(1);
// launchNotify.notify();
// }
// break;
// default:
// break;
// }
// }
// }
//
// private class DummyResourceScheduler implements ResourceScheduler {
//
// @Override
// public void removeNode(RMNode node) {
// }
//
// @Override
// public Allocation allocate(ApplicationId applicationId,
// List<ResourceRequest> ask, List<Container> release) throws IOException {
// Container container = recordFactory.newRecordInstance(Container.class);
// container.setContainerToken(recordFactory.newRecordInstance(ContainerToken.class));
// container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
// container.setContainerManagerAddress("localhost");
// container.setNodeHttpAddress("localhost:9999");
// container.setId(recordFactory.newRecordInstance(ContainerId.class));
// container.getId().setAppId(appID);
// container.getId().setId(count);
// count++;
// return new Allocation(Arrays.asList(container), Resources.none());
// }
//
// @Override
// public void handle(ASMEvent<ApplicationTrackerEventType> event) {
// switch (event.getType()) {
// case ADD:
// schedulerAddApplication++;
// break;
// case EXPIRE:
// schedulerRemoveApplication++;
// LOG.info("REMOVING app : " + schedulerRemoveApplication);
// if (schedulerRemoveApplication == maxFailures) {
// synchronized (schedulerNotify) {
// schedulerNotify.addAndGet(1);
// schedulerNotify.notify();
// }
// }
// break;
// default:
// break;
// }
// }
//
// @Override
// public QueueInfo getQueueInfo(String queueName,
// boolean includeChildQueues,
// boolean recursive) throws IOException {
// return null;
// }
// @Override
// public List<QueueUserACLInfo> getQueueUserAclInfo() {
// return null;
// }
// @Override
// public void addApplication(ApplicationId applicationId,
// ApplicationMaster master, String user, String queue, Priority priority,
// ApplicationStore store)
// throws IOException {
// }
// @Override
// public void addNode(RMNode nodeInfo) {
// }
// @Override
// public void recover(RMState state) throws Exception {
// }
// @Override
// public void reinitialize(Configuration conf,
// ContainerTokenSecretManager secretManager, RMContext rmContext)
// throws IOException {
// }
//
// @Override
// public void nodeUpdate(RMNode nodeInfo,
// Map<String, List<Container>> containers) {
// }
//
// @Override
// public Resource getMaximumResourceCapability() {
// // TODO Auto-generated method stub
// return null;
// }
//
// @Override
// public Resource getMinimumResourceCapability() {
// // TODO Auto-generated method stub
// return null;
// }
// }
//
// @Before
// public void setUp() {
//
// asmContext.getDispatcher().register(ApplicationEventType.class,
// new ResourceManager.ApplicationEventDispatcher(asmContext));
//
// appID = recordFactory.newRecordInstance(ApplicationId.class);
// appID.setClusterTimestamp(System.currentTimeMillis());
// appID.setId(1);
// Configuration conf = new Configuration();
// scheduler = new DummyResourceScheduler();
// asmContext.getDispatcher().init(conf);
// asmContext.getDispatcher().start();
// asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
// appImpl = new ExtApplicationsManagerImpl(appTokenSecretManager, scheduler, asmContext);
//
// conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
// conf.setInt(RMConfig.AM_MAX_RETRIES, maxFailures);
// appImpl.init(conf);
// appImpl.start();
//
// this.clientRMService = new ClientRMService(asmContext, appImpl
// .getAmLivelinessMonitor(), appImpl.getClientToAMSecretManager(),
// scheduler);
// this.clientRMService.init(conf);
// }
//
// @After
// public void tearDown() {
// }
//
// private void waitForFailed(AppAttempt application, ApplicationState
// finalState) throws Exception {
// int count = 0;
// while(application.getState() != finalState && count < 10) {
// Thread.sleep(500);
// count++;
// }
// Assert.assertEquals(finalState, application.getState());
// }
//
// @Test
// public void testAMRestart() throws Exception {
// ApplicationSubmissionContext subContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
// subContext.setApplicationId(appID);
// subContext.setApplicationName("dummyApp");
//// subContext.command = new ArrayList<String>();
//// subContext.environment = new HashMap<String, String>();
//// subContext.fsTokens = new ArrayList<String>();
// subContext.setFsTokensTodo(ByteBuffer.wrap(new byte[0]));
// SubmitApplicationRequest request = recordFactory
// .newRecordInstance(SubmitApplicationRequest.class);
// request.setApplicationSubmissionContext(subContext);
// clientRMService.submitApplication(request);
// AppAttempt application = asmContext.getApplications().get(appID);
// synchronized (schedulerNotify) {
// while(schedulerNotify.get() == 0) {
// schedulerNotify.wait();
// }
// }
// Assert.assertEquals(maxFailures, launcherCleanupCalled);
// Assert.assertEquals(maxFailures, launcherLaunchCalled);
// Assert.assertEquals(maxFailures, schedulerAddApplication);
// Assert.assertEquals(maxFailures, schedulerRemoveApplication);
// Assert.assertEquals(maxFailures, application.getFailedCount());
// waitForFailed(application, ApplicationState.FAILED);
// stop = true;
// }
}