| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.mockito.Matchers.*; |
| import static org.mockito.Mockito.*; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.event.DrainDispatcher; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; |
| import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; |
| import org.apache.hadoop.yarn.server.api.records.MasterKey; |
| import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; |
| import org.apache.hadoop.yarn.server.nodemanager.Context; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; |
| import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; |
| import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; |
| import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; |
| import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.ArgumentMatcher; |
| |
| |
| public class TestApplication { |
| |
| /** |
| * All container start events before application running. |
| */ |
| @Test |
| public void testApplicationInit1() { |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(1, 314159265358979L, "yak", 3); |
| wa.initApplication(); |
| wa.initContainer(1); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| assertEquals(1, wa.app.getContainers().size()); |
| wa.initContainer(0); |
| wa.initContainer(2); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| assertEquals(3, wa.app.getContainers().size()); |
| wa.applicationInited(); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| |
| for (int i = 0; i < wa.containers.size(); i++) { |
| verify(wa.containerBus).handle( |
| argThat(new ContainerInitMatcher(wa.containers.get(i) |
| .getContainerId()))); |
| } |
| } finally { |
| if (wa != null) |
| wa.finished(); |
| } |
| } |
| |
| /** |
| * Container start events after Application Running |
| */ |
| @Test |
| public void testApplicationInit2() { |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(2, 314159265358979L, "yak", 3); |
| wa.initApplication(); |
| wa.initContainer(0); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| assertEquals(1, wa.app.getContainers().size()); |
| |
| wa.applicationInited(); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| verify(wa.containerBus).handle( |
| argThat(new ContainerInitMatcher(wa.containers.get(0) |
| .getContainerId()))); |
| |
| wa.initContainer(1); |
| wa.initContainer(2); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| assertEquals(3, wa.app.getContainers().size()); |
| |
| for (int i = 1; i < wa.containers.size(); i++) { |
| verify(wa.containerBus).handle( |
| argThat(new ContainerInitMatcher(wa.containers.get(i) |
| .getContainerId()))); |
| } |
| } finally { |
| if (wa != null) |
| wa.finished(); |
| } |
| } |
| |
| /** |
| * App state RUNNING after all containers complete, before RM sends |
| * APP_FINISHED |
| */ |
| @Test |
| public void testAppRunningAfterContainersComplete() { |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(3, 314159265358979L, "yak", 3); |
| wa.initApplication(); |
| wa.initContainer(-1); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| wa.applicationInited(); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| |
| wa.containerFinished(0); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| assertEquals(2, wa.app.getContainers().size()); |
| |
| wa.containerFinished(1); |
| wa.containerFinished(2); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| assertEquals(0, wa.app.getContainers().size()); |
| } finally { |
| if (wa != null) |
| wa.finished(); |
| } |
| } |
| |
| /** |
| * Finished containers properly tracked when only container finishes in APP_INITING |
| */ |
| @Test |
| public void testContainersCompleteDuringAppInit1() { |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(3, 314159265358979L, "yak", 1); |
| wa.initApplication(); |
| wa.initContainer(-1); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| |
| wa.containerFinished(0); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| |
| wa.applicationInited(); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| assertEquals(0, wa.app.getContainers().size()); |
| } finally { |
| if (wa != null) |
| wa.finished(); |
| } |
| } |
| |
| /** |
| * Finished containers properly tracked when 1 of several containers finishes in APP_INITING |
| */ |
| @Test |
| public void testContainersCompleteDuringAppInit2() { |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(3, 314159265358979L, "yak", 3); |
| wa.initApplication(); |
| wa.initContainer(-1); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| |
| wa.containerFinished(0); |
| |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| |
| wa.applicationInited(); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| assertEquals(2, wa.app.getContainers().size()); |
| |
| wa.containerFinished(1); |
| wa.containerFinished(2); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| assertEquals(0, wa.app.getContainers().size()); |
| } finally { |
| if (wa != null) |
| wa.finished(); |
| } |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testAppFinishedOnRunningContainers() { |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(4, 314159265358979L, "yak", 3); |
| wa.initApplication(); |
| wa.initContainer(-1); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| wa.applicationInited(); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| |
| wa.containerFinished(0); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| assertEquals(2, wa.app.getContainers().size()); |
| |
| wa.appFinished(); |
| assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT, |
| wa.app.getApplicationState()); |
| assertEquals(2, wa.app.getContainers().size()); |
| |
| for (int i = 1; i < wa.containers.size(); i++) { |
| verify(wa.containerBus).handle( |
| argThat(new ContainerKillMatcher(wa.containers.get(i) |
| .getContainerId()))); |
| } |
| |
| wa.containerFinished(1); |
| assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT, |
| wa.app.getApplicationState()); |
| assertEquals(1, wa.app.getContainers().size()); |
| |
| reset(wa.localizerBus); |
| wa.containerFinished(2); |
| // All containers finished. Cleanup should be called. |
| assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, |
| wa.app.getApplicationState()); |
| assertEquals(0, wa.app.getContainers().size()); |
| |
| verify(wa.localizerBus).handle( |
| refEq(new ApplicationLocalizationEvent( |
| LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); |
| |
| verify(wa.auxBus).handle( |
| refEq(new AuxServicesEvent( |
| AuxServicesEventType.APPLICATION_STOP, wa.appId))); |
| |
| wa.appResourcesCleanedup(); |
| for (Container container : wa.containers) { |
| ContainerTokenIdentifier identifier = |
| wa.getContainerTokenIdentifier(container.getContainerId()); |
| waitForContainerTokenToExpire(identifier); |
| Assert.assertTrue(wa.context.getContainerTokenSecretManager() |
| .isValidStartContainerRequest(identifier)); |
| } |
| assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); |
| |
| } finally { |
| if (wa != null) |
| wa.finished(); |
| } |
| } |
| |
| protected ContainerTokenIdentifier waitForContainerTokenToExpire( |
| ContainerTokenIdentifier identifier) { |
| int attempts = 5; |
| while (System.currentTimeMillis() < identifier.getExpiryTimeStamp() |
| && attempts-- > 0) { |
| try { |
| Thread.sleep(1000); |
| } catch (Exception e) {} |
| } |
| return identifier; |
| } |
| |
| @Test |
| public void testApplicationOnAppLogHandlingInitedEvtShouldStoreLogInitedTime() |
| throws IOException { |
| WrappedApplication wa = new WrappedApplication(5, 314159265358979L, |
| "yak", 0); |
| wa.initApplication(); |
| |
| ArgumentCaptor<ContainerManagerApplicationProto> applicationProto = |
| ArgumentCaptor.forClass(ContainerManagerApplicationProto.class); |
| |
| final long timestamp = wa.applicationLogInited(); |
| |
| verify(wa.stateStoreService).storeApplication(any(ApplicationId.class), |
| applicationProto.capture()); |
| |
| assertEquals(applicationProto.getValue().getAppLogAggregationInitedTime() |
| , timestamp); |
| } |
| |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testAppFinishedOnCompletedContainers() { |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(5, 314159265358979L, "yak", 3); |
| wa.initApplication(); |
| wa.initContainer(-1); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| wa.applicationInited(); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| |
| reset(wa.localizerBus); |
| wa.containerFinished(0); |
| wa.containerFinished(1); |
| wa.containerFinished(2); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| assertEquals(0, wa.app.getContainers().size()); |
| |
| wa.appFinished(); |
| assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, |
| wa.app.getApplicationState()); |
| |
| verify(wa.localizerBus).handle( |
| refEq(new ApplicationLocalizationEvent( |
| LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); |
| |
| wa.appResourcesCleanedup(); |
| for ( Container container : wa.containers) { |
| ContainerTokenIdentifier identifier = |
| wa.getContainerTokenIdentifier(container.getContainerId()); |
| waitForContainerTokenToExpire(identifier); |
| Assert.assertTrue(wa.context.getContainerTokenSecretManager() |
| .isValidStartContainerRequest(identifier)); |
| } |
| assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); |
| } finally { |
| if (wa != null) |
| wa.finished(); |
| } |
| } |
| |
| //TODO Re-work after Application transitions are changed. |
| // @Test |
| @SuppressWarnings("unchecked") |
| public void testStartContainerAfterAppFinished() { |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(5, 314159265358979L, "yak", 3); |
| wa.initApplication(); |
| wa.initContainer(-1); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| wa.applicationInited(); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| |
| reset(wa.localizerBus); |
| wa.containerFinished(0); |
| wa.containerFinished(1); |
| wa.containerFinished(2); |
| assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); |
| assertEquals(0, wa.app.getContainers().size()); |
| |
| wa.appFinished(); |
| assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, |
| wa.app.getApplicationState()); |
| verify(wa.localizerBus).handle( |
| refEq(new ApplicationLocalizationEvent( |
| LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); |
| |
| wa.appResourcesCleanedup(); |
| assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); |
| } finally { |
| if (wa != null) |
| wa.finished(); |
| } |
| } |
| |
| //TODO Re-work after Application transitions are changed. |
| // @Test |
| @SuppressWarnings("unchecked") |
| public void testAppFinishedOnIniting() { |
| // AM may send a startContainer() - AM APP_FINIHSED processed after |
| // APP_FINISHED on another NM |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(1, 314159265358979L, "yak", 3); |
| wa.initApplication(); |
| wa.initContainer(0); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| assertEquals(1, wa.app.getContainers().size()); |
| |
| reset(wa.localizerBus); |
| wa.appFinished(); |
| |
| verify(wa.containerBus).handle( |
| argThat(new ContainerKillMatcher(wa.containers.get(0) |
| .getContainerId()))); |
| assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT, |
| wa.app.getApplicationState()); |
| |
| wa.containerFinished(0); |
| assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, |
| wa.app.getApplicationState()); |
| verify(wa.localizerBus).handle( |
| refEq(new ApplicationLocalizationEvent( |
| LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); |
| |
| wa.initContainer(1); |
| assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, |
| wa.app.getApplicationState()); |
| assertEquals(0, wa.app.getContainers().size()); |
| |
| wa.appResourcesCleanedup(); |
| assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); |
| } finally { |
| if (wa != null) |
| wa.finished(); |
| } |
| } |
| |
| @Test |
| public void testNMTokenSecretManagerCleanup() { |
| WrappedApplication wa = null; |
| try { |
| wa = new WrappedApplication(1, 314159265358979L, "yak", 1); |
| wa.initApplication(); |
| wa.initContainer(0); |
| assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); |
| assertEquals(1, wa.app.getContainers().size()); |
| wa.appFinished(); |
| wa.containerFinished(0); |
| wa.appResourcesCleanedup(); |
| assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); |
| verify(wa.nmTokenSecretMgr).appFinished(eq(wa.appId)); |
| } finally { |
| if (wa != null) { |
| wa.finished(); |
| } |
| } |
| } |
| |
| private class ContainerKillMatcher extends ArgumentMatcher<ContainerEvent> { |
| private ContainerId cId; |
| |
| public ContainerKillMatcher(ContainerId cId) { |
| this.cId = cId; |
| } |
| |
| @Override |
| public boolean matches(Object argument) { |
| if (argument instanceof ContainerKillEvent) { |
| ContainerKillEvent event = (ContainerKillEvent) argument; |
| return event.getContainerID().equals(cId); |
| } |
| return false; |
| } |
| } |
| |
| private class ContainerInitMatcher extends ArgumentMatcher<ContainerEvent> { |
| private ContainerId cId; |
| |
| public ContainerInitMatcher(ContainerId cId) { |
| this.cId = cId; |
| } |
| |
| @Override |
| public boolean matches(Object argument) { |
| if (argument instanceof ContainerInitEvent) { |
| ContainerInitEvent event = (ContainerInitEvent) argument; |
| return event.getContainerID().equals(cId); |
| } |
| return false; |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private class WrappedApplication { |
| final DrainDispatcher dispatcher; |
| final EventHandler<LocalizationEvent> localizerBus; |
| final EventHandler<ContainersLauncherEvent> launcherBus; |
| final EventHandler<ContainersMonitorEvent> monitorBus; |
| final EventHandler<AuxServicesEvent> auxBus; |
| final EventHandler<ContainerEvent> containerBus; |
| final EventHandler<LogHandlerEvent> logAggregationBus; |
| final String user; |
| final List<Container> containers; |
| final Context context; |
| final Map<ContainerId, ContainerTokenIdentifier> containerTokenIdentifierMap; |
| final NMTokenSecretManagerInNM nmTokenSecretMgr; |
| final NMStateStoreService stateStoreService; |
| final ApplicationId appId; |
| final Application app; |
| |
| WrappedApplication(int id, long timestamp, String user, int numContainers) { |
| Configuration conf = new Configuration(); |
| |
| dispatcher = new DrainDispatcher(); |
| containerTokenIdentifierMap = |
| new HashMap<ContainerId, ContainerTokenIdentifier>(); |
| dispatcher.init(conf); |
| |
| localizerBus = mock(EventHandler.class); |
| launcherBus = mock(EventHandler.class); |
| monitorBus = mock(EventHandler.class); |
| auxBus = mock(EventHandler.class); |
| containerBus = mock(EventHandler.class); |
| logAggregationBus = mock(EventHandler.class); |
| |
| dispatcher.register(LocalizationEventType.class, localizerBus); |
| dispatcher.register(ContainersLauncherEventType.class, launcherBus); |
| dispatcher.register(ContainersMonitorEventType.class, monitorBus); |
| dispatcher.register(AuxServicesEventType.class, auxBus); |
| dispatcher.register(ContainerEventType.class, containerBus); |
| dispatcher.register(LogHandlerEventType.class, logAggregationBus); |
| |
| nmTokenSecretMgr = mock(NMTokenSecretManagerInNM.class); |
| stateStoreService = mock(NMStateStoreService.class); |
| context = mock(Context.class); |
| |
| when(context.getContainerTokenSecretManager()).thenReturn( |
| new NMContainerTokenSecretManager(conf)); |
| when(context.getApplicationACLsManager()).thenReturn( |
| new ApplicationACLsManager(conf)); |
| when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr); |
| when(context.getNMStateStore()).thenReturn(stateStoreService); |
| when(context.getConf()).thenReturn(conf); |
| |
| // Setting master key |
| MasterKey masterKey = new MasterKeyPBImpl(); |
| masterKey.setKeyId(123); |
| masterKey.setBytes(ByteBuffer.wrap(new byte[] { (new Integer(123) |
| .byteValue()) })); |
| context.getContainerTokenSecretManager().setMasterKey(masterKey); |
| |
| this.user = user; |
| this.appId = BuilderUtils.newApplicationId(timestamp, id); |
| |
| app = new ApplicationImpl( |
| dispatcher, this.user, appId, null, context); |
| containers = new ArrayList<Container>(); |
| for (int i = 0; i < numContainers; i++) { |
| Container container = createMockedContainer(this.appId, i); |
| containers.add(container); |
| long currentTime = System.currentTimeMillis(); |
| ContainerTokenIdentifier identifier = |
| new ContainerTokenIdentifier(container.getContainerId(), "", "", |
| null, currentTime + 2000, masterKey.getKeyId(), currentTime, |
| Priority.newInstance(0), 0); |
| containerTokenIdentifierMap |
| .put(identifier.getContainerID(), identifier); |
| context.getContainerTokenSecretManager().startContainerSuccessful( |
| identifier); |
| Assert.assertFalse(context.getContainerTokenSecretManager() |
| .isValidStartContainerRequest(identifier)); |
| } |
| |
| dispatcher.start(); |
| } |
| |
| private void drainDispatcherEvents() { |
| dispatcher.await(); |
| } |
| |
| public void finished() { |
| dispatcher.stop(); |
| } |
| |
| public void initApplication() { |
| app.handle(new ApplicationInitEvent(appId, |
| new HashMap<ApplicationAccessType, String>())); |
| } |
| |
| public void initContainer(int containerNum) { |
| if (containerNum == -1) { |
| for (int i = 0; i < containers.size(); i++) { |
| app.handle(new ApplicationContainerInitEvent(containers.get(i))); |
| } |
| } else { |
| app.handle(new ApplicationContainerInitEvent(containers.get(containerNum))); |
| } |
| drainDispatcherEvents(); |
| } |
| |
| public void containerFinished(int containerNum) { |
| app.handle(new ApplicationContainerFinishedEvent(containers.get( |
| containerNum).cloneAndGetContainerStatus())); |
| drainDispatcherEvents(); |
| } |
| |
| public void applicationInited() { |
| app.handle(new ApplicationInitedEvent(appId)); |
| drainDispatcherEvents(); |
| } |
| |
| public long applicationLogInited() { |
| ApplicationEvent appEvt = new ApplicationEvent(app.getAppId(), |
| ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); |
| app.handle(appEvt); |
| return appEvt.getTimestamp(); |
| } |
| |
| public void appFinished() { |
| app.handle(new ApplicationFinishEvent(appId, |
| "Finish Application")); |
| drainDispatcherEvents(); |
| } |
| |
| public void appResourcesCleanedup() { |
| app.handle(new ApplicationEvent(appId, |
| ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)); |
| drainDispatcherEvents(); |
| } |
| |
| public ContainerTokenIdentifier getContainerTokenIdentifier( |
| ContainerId containerId) { |
| return this.containerTokenIdentifierMap.get(containerId); |
| } |
| } |
| |
| private Container createMockedContainer(ApplicationId appId, int containerId) { |
| ApplicationAttemptId appAttemptId = |
| BuilderUtils.newApplicationAttemptId(appId, 1); |
| ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId); |
| Container c = mock(Container.class); |
| when(c.getContainerId()).thenReturn(cId); |
| ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class); |
| when(c.getLaunchContext()).thenReturn(launchContext); |
| when(launchContext.getApplicationACLs()).thenReturn( |
| new HashMap<ApplicationAccessType, String>()); |
| when(c.cloneAndGetContainerStatus()).thenReturn( |
| BuilderUtils.newContainerStatus(cId, |
| ContainerState.NEW, "", 0, Resource.newInstance(1024, 1))); |
| return c; |
| } |
| } |