package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import junit.framework.Assert;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * Test to restart the AM on failure.
 *
 */
public class TestAMRestart {
//  private static final Log LOG = LogFactory.getLog(TestAMRestart.class);
//  ApplicationsManagerImpl appImpl;
//  RMContext asmContext = new RMContextImpl(new MemStore());
//  ApplicationTokenSecretManager appTokenSecretManager = 
//    new ApplicationTokenSecretManager();
//  DummyResourceScheduler scheduler;
//  private ClientRMService clientRMService;
//  int count = 0;
//  ApplicationId appID;
//  final int maxFailures = 3;
//  AtomicInteger launchNotify = new AtomicInteger();
//  AtomicInteger schedulerNotify = new AtomicInteger();
//  volatile boolean stop = false;
//  int schedulerAddApplication = 0;
//  int schedulerRemoveApplication = 0;
//  int launcherLaunchCalled = 0;
//  int launcherCleanupCalled = 0;
//  private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
//  
//  private class ExtApplicationsManagerImpl extends ApplicationsManagerImpl {
//    public ExtApplicationsManagerImpl(
//        ApplicationTokenSecretManager applicationTokenSecretManager,
//        YarnScheduler scheduler, RMContext asmContext) {
//      super(applicationTokenSecretManager, scheduler, asmContext);
//    }
//
//    @Override
//    public EventHandler<ASMEvent<AMLauncherEventType>> createNewApplicationMasterLauncher(
//        ApplicationTokenSecretManager tokenSecretManager) {
//      return new DummyAMLauncher();
//    }
//  }
//
//  private class DummyAMLauncher implements EventHandler<ASMEvent<AMLauncherEventType>> {
//
//    public DummyAMLauncher() {
//      asmContext.getDispatcher().register(AMLauncherEventType.class, this);
//      new Thread() {
//        public void run() {
//          while (!stop) {
//            LOG.info("DEBUG -- waiting for launch");
//            synchronized(launchNotify) {
//              while (launchNotify.get() == 0) {
//                try { 
//                  launchNotify.wait();
//                } catch (InterruptedException e) {
//                }
//              }
//              asmContext.getDispatcher().getEventHandler().handle(
//                  new ApplicationEvent(
//                      ApplicationEventType.LAUNCHED, appID));
//              launchNotify.addAndGet(-1);
//            }
//          }
//        }
//      }.start();
//    }
//
//    @Override
//    public void handle(ASMEvent<AMLauncherEventType> event) {
//      switch (event.getType()) {
//      case CLEANUP:
//        launcherCleanupCalled++;
//        break;
//      case LAUNCH:
//        LOG.info("DEBUG -- launching");
//        launcherLaunchCalled++;
//        synchronized (launchNotify) {
//          launchNotify.addAndGet(1);
//          launchNotify.notify();
//        }
//        break;
//      default:
//        break;
//      }
//    }
//  }
//
//  private class DummyResourceScheduler implements ResourceScheduler {
//   
//    @Override
//    public void removeNode(RMNode node) {
//    }
//    
//    @Override
//    public Allocation allocate(ApplicationId applicationId,
//        List<ResourceRequest> ask, List<Container> release) throws IOException {
//      Container container = recordFactory.newRecordInstance(Container.class);
//      container.setContainerToken(recordFactory.newRecordInstance(ContainerToken.class));
//      container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
//      container.setContainerManagerAddress("localhost");
//      container.setNodeHttpAddress("localhost:9999");
//      container.setId(recordFactory.newRecordInstance(ContainerId.class));
//      container.getId().setAppId(appID);
//      container.getId().setId(count);
//      count++;
//      return new Allocation(Arrays.asList(container), Resources.none());
//    }
//
//    @Override
//    public void handle(ASMEvent<ApplicationTrackerEventType> event) {
//      switch (event.getType()) {
//      case ADD:
//        schedulerAddApplication++;
//        break;
//      case EXPIRE:
//        schedulerRemoveApplication++;
//        LOG.info("REMOVING app : " + schedulerRemoveApplication);
//        if (schedulerRemoveApplication == maxFailures) {
//          synchronized (schedulerNotify) {
//            schedulerNotify.addAndGet(1);
//            schedulerNotify.notify();
//          }
//        }
//        break;
//      default:
//        break;
//      }
//    }
//
//    @Override
//    public QueueInfo getQueueInfo(String queueName,
//        boolean includeChildQueues,
//        boolean recursive) throws IOException {
//      return null;
//    }
//    @Override
//    public List<QueueUserACLInfo> getQueueUserAclInfo() {
//      return null;
//    }
//    @Override
//    public void addApplication(ApplicationId applicationId,
//        ApplicationMaster master, String user, String queue, Priority priority,
//        ApplicationStore store)
//        throws IOException {
//    }
//    @Override
//    public void addNode(RMNode nodeInfo) {
//    }
//    @Override
//    public void recover(RMState state) throws Exception {
//    }
//    @Override
//    public void reinitialize(Configuration conf,
//        ContainerTokenSecretManager secretManager, RMContext rmContext)
//        throws IOException {
//    }
//
//    @Override
//    public void nodeUpdate(RMNode nodeInfo,
//        Map<String, List<Container>> containers) {      
//    }
//
//    @Override
//    public Resource getMaximumResourceCapability() {
//      // TODO Auto-generated method stub
//      return null;
//    }
//
//    @Override
//    public Resource getMinimumResourceCapability() {
//      // TODO Auto-generated method stub
//      return null;
//    }
//  }
//
//  @Before
//  public void setUp() {
//
//    asmContext.getDispatcher().register(ApplicationEventType.class,
//        new ResourceManager.ApplicationEventDispatcher(asmContext));
//
//    appID = recordFactory.newRecordInstance(ApplicationId.class);
//    appID.setClusterTimestamp(System.currentTimeMillis());
//    appID.setId(1);
//    Configuration conf = new Configuration();
//    scheduler = new DummyResourceScheduler();
//    asmContext.getDispatcher().init(conf);
//    asmContext.getDispatcher().start();
//    asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
//    appImpl = new ExtApplicationsManagerImpl(appTokenSecretManager, scheduler, asmContext);
//    
//    conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
//    conf.setInt(RMConfig.AM_MAX_RETRIES, maxFailures);
//    appImpl.init(conf);
//    appImpl.start();
//
//    this.clientRMService = new ClientRMService(asmContext, appImpl
//        .getAmLivelinessMonitor(), appImpl.getClientToAMSecretManager(),
//        scheduler); 
//    this.clientRMService.init(conf);
//  }
//
//  @After
//  public void tearDown() {
//  }
//
//  private void waitForFailed(AppAttempt application, ApplicationState 
//      finalState) throws Exception {
//    int count = 0;
//    while(application.getState() != finalState && count < 10) {
//      Thread.sleep(500);
//      count++;
//    }
//    Assert.assertEquals(finalState, application.getState());
//  }
//
//  @Test
//  public void testAMRestart() throws Exception {
//    ApplicationSubmissionContext subContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
//    subContext.setApplicationId(appID);
//    subContext.setApplicationName("dummyApp");
////    subContext.command = new ArrayList<String>();
////    subContext.environment = new HashMap<String, String>();
////    subContext.fsTokens = new ArrayList<String>();
//    subContext.setFsTokensTodo(ByteBuffer.wrap(new byte[0]));
//    SubmitApplicationRequest request = recordFactory
//        .newRecordInstance(SubmitApplicationRequest.class);
//    request.setApplicationSubmissionContext(subContext);
//    clientRMService.submitApplication(request);
//    AppAttempt application = asmContext.getApplications().get(appID); 
//    synchronized (schedulerNotify) {
//      while(schedulerNotify.get() == 0) {
//        schedulerNotify.wait();
//      }
//    }
//    Assert.assertEquals(maxFailures, launcherCleanupCalled);
//    Assert.assertEquals(maxFailures, launcherLaunchCalled);
//    Assert.assertEquals(maxFailures, schedulerAddApplication);
//    Assert.assertEquals(maxFailures, schedulerRemoveApplication);
//    Assert.assertEquals(maxFailures, application.getFailedCount());
//    waitForFailed(application, ApplicationState.FAILED);
//    stop = true;
//  }
}