blob: 149ddf57371f9251881767bb08bb31fd703da3fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.Assert;
import org.junit.Test;
public class TestKillApplicationWithRMHA extends RMHATestBase{
public static final Log LOG = LogFactory
.getLog(TestKillApplicationWithRMHA.class);
@Test (timeout = 20000)
public void testKillAppWhenFailoverHappensAtNewState()
throws Exception {
// create a customized RMAppManager
// During the process of Application submission,
// the RMAppState will always be NEW.
// The ApplicationState will not be saved in RMStateStore.
startRMsWithCustomizedRMAppManager();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// Submit the application
RMApp app0 =
rm1.submitApp(200, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null,
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
false, false);
// failover and kill application
// When FailOver happens, the state of this application is NEW,
// and ApplicationState is not saved in RMStateStore. The active RM
// can not load the ApplicationState of this application.
// Expected to get ApplicationNotFoundException
// when receives the KillApplicationRequest
try {
failOverAndKillApp(app0.getApplicationId(), RMAppState.NEW);
fail("Should get an exception here");
} catch (ApplicationNotFoundException ex) {
Assert.assertTrue(ex.getMessage().contains(
"Trying to kill an absent application " + app0.getApplicationId()));
}
}
@Test (timeout = 20000)
public void testKillAppWhenFailoverHappensAtRunningState()
throws Exception {
startRMs();
MockNM nm1 = new MockNM("127.0.0.1:1234", 15120,
rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the AM
RMApp app0 = rm1.submitApp(200);
MockAM am0 = launchAM(app0, rm1, nm1);
// failover and kill application
// The application is at RUNNING State when failOver happens.
// Since RMStateStore has already saved ApplicationState, the active RM
// will load the ApplicationState. After that, the application will be at
// ACCEPTED State. Because the application is not at Final State,
// KillApplicationResponse.getIsKillCompleted is expected to return false.
failOverAndKillApp(app0.getApplicationId(),
am0.getApplicationAttemptId(), RMAppState.RUNNING,
RMAppAttemptState.RUNNING, RMAppState.ACCEPTED);
}
@Test (timeout = 20000)
public void testKillAppWhenFailoverHappensAtFinalState()
throws Exception {
startRMs();
MockNM nm1 = new MockNM("127.0.0.1:1234", 15120,
rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the AM
RMApp app0 = rm1.submitApp(200);
MockAM am0 = launchAM(app0, rm1, nm1);
// kill the app.
rm1.killApp(app0.getApplicationId());
rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.KILLED);
// failover and kill application
// The application is at Killed State and RMStateStore has already
// saved this applicationState. After failover happens, the current
// active RM will load the ApplicationState whose RMAppState is killed.
// Because this application is at Final State,
// KillApplicationResponse.getIsKillCompleted is expected to return true.
failOverAndKillApp(app0.getApplicationId(),
am0.getApplicationAttemptId(), RMAppState.KILLED,
RMAppAttemptState.KILLED, RMAppState.KILLED);
}
@Test (timeout = 20000)
public void testKillAppWhenFailOverHappensDuringApplicationKill()
throws Exception {
// create a customized ClientRMService
// When receives the killApplicationRequest, simply return the response
// and make sure the application will not be KILLED State
startRMsWithCustomizedClientRMService();
MockNM nm1 = new MockNM("127.0.0.1:1234", 15120,
rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the AM
RMApp app0 = rm1.submitApp(200);
MockAM am0 = launchAM(app0, rm1, nm1);
// ensure that the app is in running state
Assert.assertEquals(app0.getState(), RMAppState.RUNNING);
// kill the app.
rm1.killApp(app0.getApplicationId());
// failover happens before this application goes to final state.
// The RMAppState that will be loaded by the active rm
// should be ACCEPTED.
failOverAndKillApp(app0.getApplicationId(),
am0.getApplicationAttemptId(), RMAppState.RUNNING,
RMAppAttemptState.RUNNING, RMAppState.ACCEPTED);
}
private void failOverAndKillApp(ApplicationId appId,
ApplicationAttemptId appAttemptId, RMAppState initialRMAppState,
RMAppAttemptState initialRMAppAttemptState,
RMAppState expectedAppStateBeforeKillApp) throws Exception {
Assert.assertEquals(initialRMAppState,
rm1.getRMContext().getRMApps().get(appId).getState());
Assert.assertEquals(initialRMAppAttemptState, rm1.getRMContext()
.getRMApps().get(appId).getAppAttempts().get(appAttemptId).getState());
explicitFailover();
Assert.assertEquals(expectedAppStateBeforeKillApp,
rm2.getRMContext().getRMApps().get(appId).getState());
killApplication(rm2, appId, appAttemptId, initialRMAppState);
}
private void failOverAndKillApp(ApplicationId appId,
RMAppState initialRMAppState) throws Exception {
Assert.assertEquals(initialRMAppState,
rm1.getRMContext().getRMApps().get(appId).getState());
explicitFailover();
Assert.assertTrue(rm2.getRMContext().getRMApps().get(appId) == null);
killApplication(rm2, appId, null, initialRMAppState);
}
private void startRMsWithCustomizedClientRMService() throws IOException {
final Configuration conf1 = new Configuration(confForRM1);
rm1 = new MockRM(conf1) {
@Override
protected ClientRMService createClientRMService() {
return new MyClientRMService(this.rmContext, this.scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager, getRMContext().getRMDelegationTokenSecretManager());
}
};
rm2 = new MockRM(confForRM2);
startRMs(rm1, conf1, rm2, confForRM2);
}
private static class MyClientRMService extends ClientRMService {
private RMContext rmContext;
public MyClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager,
ApplicationACLsManager applicationACLsManager,
QueueACLsManager queueACLsManager,
RMDelegationTokenSecretManager rmDTSecretManager) {
super(rmContext, scheduler, rmAppManager, applicationACLsManager,
queueACLsManager, rmDTSecretManager);
this.rmContext = rmContext;
}
@Override
protected void serviceStart() {
// override to not start rpc handler
}
@Override
protected void serviceStop() {
// don't do anything
}
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException {
ApplicationId applicationId = request.getApplicationId();
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application.isAppFinalStateStored()) {
return KillApplicationResponse.newInstance(true);
} else {
return KillApplicationResponse.newInstance(false);
}
}
}
private void killApplication(MockRM rm, ApplicationId appId,
ApplicationAttemptId appAttemptId, RMAppState rmAppState)
throws Exception {
KillApplicationResponse response = rm.killApp(appId);
Assert
.assertTrue(response.getIsKillCompleted() == isFinalState(rmAppState));
RMApp loadedApp0 =
rm.getRMContext().getRMApps().get(appId);
rm.waitForState(appId, RMAppState.KILLED);
if (appAttemptId != null) {
rm.waitForState(appAttemptId, RMAppAttemptState.KILLED);
}
// no new attempt is created.
Assert.assertEquals(1, loadedApp0.getAppAttempts().size());
}
}