blob: 2e2b7d6f94284a8d2cdc5055ea2e593ce466bec2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestASMStateMachine extends TestCase {
private static final Log LOG = LogFactory.getLog(TestASMStateMachine.class);
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
RMContext context = new ResourceManager.RMContextImpl(new MemStore());
EventHandler handler;
private boolean snreceivedCleanUp = false;
private boolean snAllocateReceived = false;
private boolean launchCalled = false;
private boolean addedApplication = false;
private boolean removedApplication = false;
private boolean launchCleanupCalled = false;
private AtomicInteger waitForState = new AtomicInteger();
@Before
public void setUp() {
context.getDispatcher().init(new Configuration());
context.getDispatcher().start();
handler = context.getDispatcher().getEventHandler();
new DummyAMLaunchEventHandler();
new DummySNEventHandler();
new ApplicationTracker();
new MockAppplicationMasterInfo();
}
@After
public void tearDown() {
}
private class DummyAMLaunchEventHandler implements EventHandler<ASMEvent<AMLauncherEventType>> {
AppContext appcontext;
AtomicInteger amsync = new AtomicInteger(0);
public DummyAMLaunchEventHandler() {
context.getDispatcher().register(AMLauncherEventType.class, this);
}
@Override
public void handle(ASMEvent<AMLauncherEventType> event) {
switch(event.getType()) {
case LAUNCH:
launchCalled = true;
appcontext = event.getAppContext();
context.getDispatcher().getEventHandler().handle(
new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
appcontext));
break;
case CLEANUP:
launchCleanupCalled = true;
break;
}
}
}
private class DummySNEventHandler implements EventHandler<ASMEvent<SNEventType>> {
AppContext appContext;
AtomicInteger snsync = new AtomicInteger(0);
public DummySNEventHandler() {
context.getDispatcher().register(SNEventType.class, this);
}
@Override
public void handle(ASMEvent<SNEventType> event) {
switch(event.getType()) {
case CLEANUP:
snreceivedCleanUp = true;
break;
case SCHEDULE:
snAllocateReceived = true;
appContext = event.getAppContext();
context.getDispatcher().getEventHandler().handle(
new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED,
appContext));
break;
}
}
}
private static class StatusContext implements AppContext {
@Override
public ApplicationSubmissionContext getSubmissionContext() {
return null;
}
@Override
public Resource getResource() {
return null;
}
@Override
public ApplicationId getApplicationID() {
return null;
}
@Override
public ApplicationStatus getStatus() {
ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
status.setLastSeen(-99);
return status;
}
@Override
public ApplicationMaster getMaster() {
return null;
}
@Override
public Container getMasterContainer() {
return null;
}
@Override
public String getUser() {
return null;
}
@Override
public long getLastSeen() {
return 0;
}
@Override
public String getName() {
return null;
}
@Override
public String getQueue() {
return null;
}
@Override
public int getFailedCount() {
return 0;
}
@Override
public ApplicationStore getStore() {
return StoreFactory.createVoidAppStore();
}
}
private class ApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
public ApplicationTracker() {
context.getDispatcher().register(ApplicationTrackerEventType.class, this);
}
@Override
public void handle(ASMEvent<ApplicationTrackerEventType> event) {
switch (event.getType()) {
case ADD:
addedApplication = true;
break;
case REMOVE:
removedApplication = true;
break;
}
}
}
private class MockAppplicationMasterInfo implements EventHandler<ASMEvent<ApplicationEventType>> {
MockAppplicationMasterInfo() {
context.getDispatcher().register(ApplicationEventType.class, this);
}
@Override
public void handle(ASMEvent<ApplicationEventType> event) {
LOG.info("The event type is " + event.getType());
}
}
private void waitForState( ApplicationState
finalState, ApplicationMasterInfo masterInfo) throws Exception {
int count = 0;
while(masterInfo.getState() != finalState && count < 10) {
Thread.sleep(500);
count++;
}
assertTrue(masterInfo.getState() == finalState);
}
/* Test the state machine.
*
*/
@Test
public void testStateMachine() throws Exception {
ApplicationSubmissionContext submissioncontext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
submissioncontext.setApplicationId(recordFactory.newRecordInstance(ApplicationId.class));
submissioncontext.getApplicationId().setId(1);
submissioncontext.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
ApplicationMasterInfo masterInfo
= new ApplicationMasterInfo(context, "dummyuser", submissioncontext, "dummyToken"
, StoreFactory.createVoidAppStore());
context.getDispatcher().register(ApplicationEventType.class, masterInfo);
handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
ALLOCATE, masterInfo));
waitForState(ApplicationState.ALLOCATED, masterInfo);
handler.handle(new ASMEvent<ApplicationEventType>(
ApplicationEventType.LAUNCH, masterInfo));
waitForState(ApplicationState.LAUNCHED, masterInfo);
Assert.assertTrue(snAllocateReceived);
Assert.assertTrue(launchCalled);
Assert.assertTrue(addedApplication);
handler.handle(new ASMEvent<ApplicationEventType>(
ApplicationEventType.REGISTERED, masterInfo));
waitForState(ApplicationState.RUNNING, masterInfo);
Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
handler.handle(new ASMEvent<ApplicationEventType>(
ApplicationEventType.STATUSUPDATE, new StatusContext()));
/* check if the state is still RUNNING */
Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
handler.handle(new ApplicationFinishEvent(masterInfo,
ApplicationState.COMPLETED));
waitForState(ApplicationState.COMPLETED, masterInfo);
Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
/* check if clean up is called for everyone */
Assert.assertTrue(launchCleanupCalled);
Assert.assertTrue(snreceivedCleanUp);
Assert.assertTrue(removedApplication);
/* check if expiry doesnt make it failed */
handler.handle(
new ASMEvent<ApplicationEventType>(ApplicationEventType.EXPIRE, masterInfo));
Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
}
}