blob: ceba12ad2e473dd50411ba32bf1f7bcbc343c615 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.junit.Before;
import org.junit.Test;
public class TestRMAppTransitions {
static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
private RMContext rmContext;
private static int maxRetries = 4;
private static int appId = 1;
// private AsyncDispatcher rmDispatcher;
// ignore all the RM application attempt events
private static final class TestApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> {
private final RMContext rmContext;
public TestApplicationAttemptEventDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void handle(RMAppAttemptEvent event) {
ApplicationId appId = event.getApplicationAttemptId().getApplicationId();
RMApp rmApp = this.rmContext.getRMApps().get(appId);
if (rmApp != null) {
try {
rmApp.getRMAppAttempt(event.getApplicationAttemptId()).handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for application " + appId, t);
}
}
}
}
// handle all the RM application events - same as in ResourceManager.java
private static final class TestApplicationEventDispatcher implements
EventHandler<RMAppEvent> {
private final RMContext rmContext;
public TestApplicationEventDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void handle(RMAppEvent event) {
ApplicationId appID = event.getApplicationId();
RMApp rmApp = this.rmContext.getRMApps().get(appID);
if (rmApp != null) {
try {
rmApp.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for application " + appID, t);
}
}
}
}
@Before
public void setUp() throws Exception {
AsyncDispatcher rmDispatcher = new AsyncDispatcher();
Configuration conf = new Configuration();
rmDispatcher = new InlineDispatcher();
ContainerAllocationExpirer containerAllocationExpirer =
mock(ContainerAllocationExpirer.class);
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, null);
rmDispatcher.register(RMAppAttemptEventType.class,
new TestApplicationAttemptEventDispatcher(this.rmContext));
rmDispatcher.register(RMAppEventType.class,
new TestApplicationEventDispatcher(rmContext));
rmDispatcher.init(conf);
rmDispatcher.start();
}
protected RMApp createNewTestApp() {
ApplicationId applicationId = MockApps.newAppID(appId++);
String user = MockApps.newUserName();
String name = MockApps.newAppName();
String queue = MockApps.newQueue();
Configuration conf = new YarnConfiguration();
// ensure max retries set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, maxRetries);
ApplicationSubmissionContext submissionContext = null;
String clientTokenStr = "bogusstring";
ApplicationStore appStore = mock(ApplicationStore.class);
YarnScheduler scheduler = mock(YarnScheduler.class);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext,
new ApplicationTokenSecretManager(), scheduler);
RMApp application = new RMAppImpl(applicationId, rmContext,
conf, name, user,
queue, submissionContext, clientTokenStr,
appStore, scheduler,
masterService, System.currentTimeMillis());
testAppStartState(applicationId, user, name, queue, application);
return application;
}
// Test expected newly created app state
private static void testAppStartState(ApplicationId applicationId,
String user, String name, String queue, RMApp application) {
Assert.assertTrue("application start time is not greater then 0",
application.getStartTime() > 0);
Assert.assertTrue("application start time is before currentTime",
application.getStartTime() <= System.currentTimeMillis());
Assert.assertEquals("application user is not correct",
user, application.getUser());
Assert.assertEquals("application id is not correct",
applicationId, application.getApplicationId());
Assert.assertEquals("application progress is not correct",
(float)0.0, application.getProgress());
Assert.assertEquals("application queue is not correct",
queue, application.getQueue());
Assert.assertEquals("application name is not correct",
name, application.getName());
Assert.assertEquals("application finish time is not 0 and should be",
0, application.getFinishTime());
Assert.assertEquals("application tracking url is not correct",
null, application.getTrackingUrl());
StringBuilder diag = application.getDiagnostics();
Assert.assertEquals("application diagnostics is not correct",
0, diag.length());
}
// test to make sure times are set when app finishes
private static void assertStartTimeSet(RMApp application) {
Assert.assertTrue("application start time is not greater then 0",
application.getStartTime() > 0);
Assert.assertTrue("application start time is before currentTime",
application.getStartTime() <= System.currentTimeMillis());
}
private static void assertAppState(RMAppState state, RMApp application) {
Assert.assertEquals("application state should have been " + state,
state, application.getState());
}
private static void assertFinalAppStatus(FinalApplicationStatus status, RMApp application) {
Assert.assertEquals("Final application status should have been " + status,
status, application.getFinalApplicationStatus());
}
// test to make sure times are set when app finishes
private static void assertTimesAtFinish(RMApp application) {
assertStartTimeSet(application);
Assert.assertTrue("application finish time is not greater then 0",
(application.getFinishTime() > 0));
Assert.assertTrue("application finish time is not >= then start time",
(application.getFinishTime() >= application.getStartTime()));
}
private static void assertKilled(RMApp application) {
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
assertFinalAppStatus(FinalApplicationStatus.KILLED, application);
StringBuilder diag = application.getDiagnostics();
Assert.assertEquals("application diagnostics is not correct",
"Application killed by user.", diag.toString());
}
private static void assertAppAndAttemptKilled(RMApp application) {
assertKilled(application);
/* also check if the attempt is killed */
Assert.assertEquals( RMAppAttemptState.KILLED,
application.getCurrentAppAttempt().getAppAttemptState()
);
}
private static void assertFailed(RMApp application, String regex) {
assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application);
assertFinalAppStatus(FinalApplicationStatus.FAILED, application);
StringBuilder diag = application.getDiagnostics();
Assert.assertTrue("application diagnostics is not correct",
diag.toString().matches(regex));
}
protected RMApp testCreateAppSubmitted() throws IOException {
RMApp application = createNewTestApp();
// NEW => SUBMITTED event RMAppEventType.START
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);
return application;
}
protected RMApp testCreateAppAccepted() throws IOException {
RMApp application = testCreateAppSubmitted();
// SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
RMAppEvent event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_ACCEPTED);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.ACCEPTED, application);
return application;
}
protected RMApp testCreateAppRunning() throws IOException {
RMApp application = testCreateAppAccepted();
// ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED
RMAppEvent event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_REGISTERED);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.RUNNING, application);
assertFinalAppStatus(FinalApplicationStatus.UNDEFINED, application);
return application;
}
protected RMApp testCreateAppFinished() throws IOException {
RMApp application = testCreateAppRunning();
// RUNNING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
RMAppEvent event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FINISHED);
application.handle(event);
assertAppState(RMAppState.FINISHED, application);
assertTimesAtFinish(application);
// finished without a proper unregister implies failed
assertFinalAppStatus(FinalApplicationStatus.FAILED, application);
return application;
}
@Test
public void testAppSuccessPath() throws IOException {
LOG.info("--- START: testAppSuccessPath ---");
testCreateAppFinished();
}
@Test
public void testAppNewKill() throws IOException {
LOG.info("--- START: testAppNewKill ---");
RMApp application = createNewTestApp();
// NEW => KILLED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertKilled(application);
}
@Test
public void testAppNewReject() throws IOException {
LOG.info("--- START: testAppNewReject ---");
RMApp application = createNewTestApp();
// NEW => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "Test Application Rejected";
RMAppEvent event =
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
application.handle(event);
assertFailed(application, rejectedText);
}
@Test
public void testAppSubmittedRejected() throws IOException {
LOG.info("--- START: testAppSubmittedRejected ---");
RMApp application = testCreateAppSubmitted();
// SUBMITTED => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "app rejected";
RMAppEvent event =
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
application.handle(event);
assertFailed(application, rejectedText);
}
@Test
public void testAppSubmittedKill() throws IOException {
LOG.info("--- START: testAppSubmittedKill---");
RMApp application = testCreateAppAccepted();
// SUBMITTED => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application);
application.handle(event);
assertAppAndAttemptKilled(application);
}
@Test
public void testAppAcceptedFailed() throws IOException {
LOG.info("--- START: testAppAcceptedFailed ---");
RMApp application = testCreateAppAccepted();
// ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED
for (int i=1; i<maxRetries; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
assertAppState(RMAppState.SUBMITTED, application);
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_ACCEPTED);
application.handle(event);
assertAppState(RMAppState.ACCEPTED, application);
}
// ACCEPTED => FAILED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED
// after max retries
String message = "Test fail";
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, message);
application.handle(event);
assertFailed(application, ".*" + message + ".*Failing the application.*");
}
@Test
public void testAppAcceptedKill() throws IOException {
LOG.info("--- START: testAppAcceptedKill ---");
RMApp application = testCreateAppAccepted();
// ACCEPTED => KILLED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertKilled(application);
}
@Test
public void testAppRunningKill() throws IOException {
LOG.info("--- START: testAppRunningKill ---");
RMApp application = testCreateAppRunning();
// RUNNING => KILLED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertKilled(application);
}
@Test
public void testAppRunningFailed() throws IOException {
LOG.info("--- START: testAppRunningFailed ---");
RMApp application = testCreateAppRunning();
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
int expectedAttemptId = 1;
Assert.assertEquals(expectedAttemptId,
appAttempt.getAppAttemptId().getAttemptId());
// RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED
for (int i=1; i<maxRetries; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
assertAppState(RMAppState.SUBMITTED, application);
appAttempt = application.getCurrentAppAttempt();
Assert.assertEquals(++expectedAttemptId,
appAttempt.getAppAttemptId().getAttemptId());
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_ACCEPTED);
application.handle(event);
assertAppState(RMAppState.ACCEPTED, application);
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_REGISTERED);
application.handle(event);
assertAppState(RMAppState.RUNNING, application);
}
// RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED
// after max retries
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
assertFailed(application, ".*Failing the application.*");
// FAILED => FAILED event RMAppEventType.KILL
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertFailed(application, ".*Failing the application.*");
}
@Test
public void testAppFinishedFinished() throws IOException {
LOG.info("--- START: testAppFinishedFinished ---");
RMApp application = testCreateAppFinished();
// FINISHED => FINISHED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.FINISHED, application);
StringBuilder diag = application.getDiagnostics();
Assert.assertEquals("application diagnostics is not correct",
"", diag.toString());
}
@Test
public void testAppKilledKilled() throws IOException {
LOG.info("--- START: testAppKilledKilled ---");
RMApp application = testCreateAppRunning();
// RUNNING => KILLED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.ATTEMPT_FINISHED
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FINISHED);
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.ATTEMPT_FAILED
event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.KILL
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
}
}