blob: 0a1b15275cfdd09e6e9e323824ce9ac5ba4b7e70 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
public class TestZKRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
private static final int ZK_TIMEOUT_MS = 1000;
private TestingServer curatorTestingServer;
private CuratorFramework curatorFramework;
public static TestingServer setupCuratorServer() throws Exception {
TestingServer curatorTestingServer = new TestingServer();
curatorTestingServer.start();
return curatorTestingServer;
}
public static CuratorFramework setupCuratorFramework(
TestingServer curatorTestingServer) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(curatorTestingServer.getConnectString())
.retryPolicy(new RetryNTimes(100, 100))
.build();
curatorFramework.start();
return curatorFramework;
}
@Before
public void setupCurator() throws Exception {
curatorTestingServer = setupCuratorServer();
curatorFramework = setupCuratorFramework(curatorTestingServer);
}
@After
public void cleanupCuratorServer() throws IOException {
curatorFramework.close();
curatorTestingServer.stop();
}
class TestZKRMStateStoreTester implements RMStateStoreHelper {
TestZKRMStateStoreInternal store;
String workingZnode;
class TestZKRMStateStoreInternal extends ZKRMStateStore {
TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
throws Exception {
setResourceManager(new ResourceManager());
init(conf);
dispatcher.disableExitOnDispatchException();
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
private String getVersionNode() {
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
}
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
private String getAppNode(String appId, int splitIdx) {
String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
RM_APP_ROOT;
String appPath = appId;
if (splitIdx != 0) {
int idx = appId.length() - splitIdx;
appPath = appId.substring(0, idx) + "/" + appId.substring(idx);
return rootPath + "/" + RM_APP_ROOT_HIERARCHIES + "/" +
Integer.toString(splitIdx) + "/" + appPath;
}
return rootPath + "/" + appPath;
}
private String getAppNode(String appId) {
return getAppNode(appId, 0);
}
private String getAttemptNode(String appId, String attemptId) {
return getAppNode(appId) + "/" + attemptId;
}
/**
* Emulating retrying createRootDir not to raise NodeExist exception
* @throws Exception
*/
private void testRetryingCreateRootDir() throws Exception {
create(znodeWorkingPath);
}
private String getDelegationTokenNode(int rmDTSequenceNumber, int splitIdx) {
String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
RM_DT_SECRET_MANAGER_ROOT + "/" +
RMStateStore.RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME;
String nodeName = DELEGATION_TOKEN_PREFIX;
if (splitIdx == 0) {
nodeName += rmDTSequenceNumber;
} else {
nodeName += String.format("%04d", rmDTSequenceNumber);
}
String path = nodeName;
if (splitIdx != 0) {
int idx = nodeName.length() - splitIdx;
path = splitIdx + "/" + nodeName.substring(0, idx) + "/"
+ nodeName.substring(idx);
}
return rootPath + "/" + path;
}
}
private RMStateStore createStore(Configuration conf) throws Exception {
workingZnode = "/jira/issue/3077/rmstore";
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store;
}
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
return createStore(conf);
}
@Override
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
return createStore(conf);
}
@Override
public boolean isFinalStateValid() throws Exception {
return 1 ==
curatorFramework.getChildren().forPath(store.znodeWorkingPath).size();
}
@Override
public void writeVersion(Version version) throws Exception {
curatorFramework.setData().withVersion(-1)
.forPath(store.getVersionNode(),
((VersionPBImpl) version).getProto().toByteArray());
}
@Override
public Version getCurrentVersion() throws Exception {
return store.getCurrentVersion();
}
@Override
public boolean appExists(RMApp app) throws Exception {
String appIdPath = app.getApplicationId().toString();
int split =
store.getConfig().getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
return null != curatorFramework.checkExists()
.forPath(store.getAppNode(appIdPath, split));
}
@Override
public boolean attemptExists(RMAppAttempt attempt) throws Exception {
ApplicationAttemptId attemptId = attempt.getAppAttemptId();
return null != curatorFramework.checkExists()
.forPath(store.getAttemptNode(
attemptId.getApplicationId().toString(), attemptId.toString()));
}
public boolean delegationTokenExists(RMDelegationTokenIdentifier token,
int index) throws Exception {
int rmDTSequenceNumber = token.getSequenceNumber();
return curatorFramework.checkExists().forPath(
store.getDelegationTokenNode(rmDTSequenceNumber, index)) != null;
}
public int getDelegationTokenNodeSplitIndex() {
return store.delegationTokenNodeSplitIndex;
}
}
@Test (timeout = 60000)
public void testZKRMStateStoreRealZK() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
testCheckVersion(zkTester);
testEpoch(zkTester);
testAppDeletion(zkTester);
testDeleteStore(zkTester);
testRemoveApplication(zkTester);
testRemoveAttempt(zkTester);
testAMRMTokenSecretManagerStateStore(zkTester);
testReservationStateStore(zkTester);
((TestZKRMStateStoreTester.TestZKRMStateStoreInternal)
zkTester.getRMStateStore()).testRetryingCreateRootDir();
}
@Test
public void testZKNodeLimit() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
long submitTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis() + 1234;
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES, 1);
RMStateStore store = zkTester.getRMStateStore(conf);
TestAppRejDispatcher dispatcher = new TestAppRejDispatcher();
store.setRMDispatcher(dispatcher);
ApplicationId appId1 =
ApplicationId.fromString("application_1352994193343_0001");
storeApp(store, appId1, submitTime, startTime);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return dispatcher.appsavefailedEvnt;
}
}, 100, 5000);
}
static class TestAppRejDispatcher extends TestDispatcher {
private boolean appsavefailedEvnt;
@Override
public void handle(Event event) {
if (event instanceof RMAppEvent && event.getType()
.equals(RMAppEventType.APP_SAVE_FAILED)) {
appsavefailedEvnt = true;
}
}
;
}
@Test (timeout = 60000)
public void testCheckMajorVersionChange() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() {
Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);
@Override
public Version getCurrentVersion() throws Exception {
return VERSION_INFO;
}
@Override
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
Version storedVersion = null;
@Override
public Version getCurrentVersion() {
return VERSION_INFO;
}
@Override
protected synchronized Version loadVersion() throws Exception {
return storedVersion;
}
@Override
protected synchronized void storeVersion() throws Exception {
storedVersion = VERSION_INFO;
}
};
return this.store;
}
};
// default version
RMStateStore store = zkTester.getRMStateStore();
Version defaultVersion = zkTester.getCurrentVersion();
store.checkVersion();
assertEquals("Store had wrong version",
defaultVersion, store.loadVersion());
}
public static Configuration createHARMConf(String rmIds, String rmId,
int adminPort, boolean autoFailoverEnabled,
TestingServer curatorTestServer) {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestServer.getConnectString());
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
conf.setBoolean(
YarnConfiguration.AUTO_FAILOVER_ENABLED, autoFailoverEnabled);
for (String rpcAddress : YarnConfiguration.getServiceAddressConfKeys(conf)) {
for (String id : HAUtil.getRMHAIds(conf)) {
conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
}
}
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
"localhost:" + adminPort);
return conf;
}
private static boolean verifyZKACL(String id, String scheme, int perm,
List<ACL> acls) {
for (ACL acl : acls) {
if (acl.getId().getScheme().equals(scheme) &&
acl.getId().getId().startsWith(id) &&
acl.getPerms() == perm) {
return true;
}
}
return false;
}
/**
* Test if RM can successfully start in HA disabled mode if it was previously
* running in HA enabled mode. And then start it in HA mode after running it
* with HA disabled. NoAuth Exception should not be sent by zookeeper and RM
* should start successfully.
*/
@Test
public void testZKRootPathAcls() throws Exception {
StateChangeRequestInfo req = new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
String rootPath =
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH + "/" +
ZKRMStateStore.ROOT_ZNODE_NAME;
// Start RM with HA enabled
Configuration conf =
createHARMConf("rm1,rm2", "rm1", 1234, false, curatorTestingServer);
ResourceManager rm = new MockRM(conf);
rm.start();
rm.getRMContext().getRMAdminService().transitionToActive(req);
List<ACL> acls =
((ZKRMStateStore)rm.getRMContext().getStateStore()).getACL(rootPath);
assertEquals(acls.size(), 2);
// CREATE and DELETE permissions for root node based on RM ID
verifyZKACL("digest", "localhost", Perms.CREATE | Perms.DELETE, acls);
verifyZKACL(
"world", "anyone", Perms.ALL ^ (Perms.CREATE | Perms.DELETE), acls);
rm.close();
// Now start RM with HA disabled. NoAuth Exception should not be thrown.
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false);
rm = new MockRM(conf);
rm.start();
rm.getRMContext().getRMAdminService().transitionToActive(req);
acls = ((ZKRMStateStore)rm.getRMContext().getStateStore()).getACL(rootPath);
assertEquals(acls.size(), 1);
verifyZKACL("world", "anyone", Perms.ALL, acls);
rm.close();
// Start RM with HA enabled.
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
rm = new MockRM(conf);
rm.start();
rm.getRMContext().getRMAdminService().transitionToActive(req);
acls = ((ZKRMStateStore)rm.getRMContext().getStateStore()).getACL(rootPath);
assertEquals(acls.size(), 2);
verifyZKACL("digest", "localhost", Perms.CREATE | Perms.DELETE, acls);
verifyZKACL(
"world", "anyone", Perms.ALL ^ (Perms.CREATE | Perms.DELETE), acls);
rm.close();
}
@Test
public void testFencing() throws Exception {
StateChangeRequestInfo req = new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
Configuration conf1 =
createHARMConf("rm1,rm2", "rm1", 1234, false, curatorTestingServer);
ResourceManager rm1 = new MockRM(conf1);
rm1.start();
rm1.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm1.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
Configuration conf2 =
createHARMConf("rm1,rm2", "rm2", 5678, false, curatorTestingServer);
ResourceManager rm2 = new MockRM(conf2);
rm2.start();
rm2.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm2.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
if (HAServiceProtocol.HAServiceState.ACTIVE ==
rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) {
Thread.sleep(100);
}
}
assertEquals("RM should have been fenced",
HAServiceProtocol.HAServiceState.STANDBY,
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
rm1.close();
rm2.close();
}
@Test
public void testFencedState() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
RMStateStore store = zkTester.getRMStateStore();
// Move state to FENCED from ACTIVE
store.updateFencedState();
assertEquals("RMStateStore should have been in fenced state",
true, store.isFencedState());
long submitTime = System.currentTimeMillis();
long startTime = submitTime + 1000;
// Add a new app
RMApp mockApp = mock(RMApp.class);
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
when(mockApp.getSubmitTime()).thenReturn(submitTime);
when(mockApp.getStartTime()).thenReturn(startTime);
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
when(mockApp.getUser()).thenReturn("test");
store.storeNewApplication(mockApp);
assertEquals("RMStateStore should have been in fenced state",
true, store.isFencedState());
// Add a new attempt
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
new ClientToAMTokenSecretManagerInRM();
ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(
"appattempt_1234567894321_0001_000001");
SecretKey clientTokenMasterKey =
clientToAMTokenMgr.createMasterKey(attemptId);
RMAppAttemptMetrics mockRmAppAttemptMetrics =
mock(RMAppAttemptMetrics.class);
Container container = new ContainerPBImpl();
container.setId(ContainerId.fromString("container_1234567891234_0001_01_000001"));
RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
when(mockAttempt.getMasterContainer()).thenReturn(container);
when(mockAttempt.getClientTokenMasterKey())
.thenReturn(clientTokenMasterKey);
when(mockAttempt.getRMAppAttemptMetrics())
.thenReturn(mockRmAppAttemptMetrics);
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
.thenReturn(new AggregateAppResourceUsage(new HashMap<>()));
store.storeNewApplicationAttempt(mockAttempt);
assertEquals("RMStateStore should have been in fenced state",
true, store.isFencedState());
long finishTime = submitTime + 1000;
// Update attempt
ApplicationAttemptStateData newAttemptState =
ApplicationAttemptStateData.newInstance(attemptId, container,
store.getCredentialsFromAppAttempt(mockAttempt),
startTime, RMAppAttemptState.FINISHED, "testUrl",
"test", FinalApplicationStatus.SUCCEEDED, 100,
finishTime, new HashMap<>(), new HashMap<>());
store.updateApplicationAttemptState(newAttemptState);
assertEquals("RMStateStore should have been in fenced state",
true, store.isFencedState());
// Update app
ApplicationStateData appState = ApplicationStateData.newInstance(submitTime,
startTime, context, "test");
store.updateApplicationState(appState);
assertEquals("RMStateStore should have been in fenced state",
true, store.isFencedState());
// Remove app
store.removeApplication(mockApp);
assertEquals("RMStateStore should have been in fenced state",
true, store.isFencedState());
// store RM delegation token;
RMDelegationTokenIdentifier dtId1 =
new RMDelegationTokenIdentifier(new Text("owner1"),
new Text("renewer1"), new Text("realuser1"));
Long renewDate1 = new Long(System.currentTimeMillis());
dtId1.setSequenceNumber(1111);
assertFalse("Token " + dtId1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(dtId1, 0));
store.storeRMDelegationToken(dtId1, renewDate1);
assertFalse("Token " + dtId1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(dtId1, 0));
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
store.updateRMDelegationToken(dtId1, renewDate1);
assertFalse("Token " + dtId1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(dtId1, 0));
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
// remove delegation key;
store.removeRMDelegationToken(dtId1);
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
// store delegation master key;
DelegationKey key = new DelegationKey(1234, 4321, "keyBytes".getBytes());
store.storeRMDTMasterKey(key);
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
// remove delegation master key;
store.removeRMDTMasterKey(key);
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
// store or update AMRMToken;
store.storeOrUpdateAMRMTokenSecretManager(null, false);
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
store.close();
}
@Test
public void testDuplicateRMAppDeletion() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
long submitTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis() + 1234;
RMStateStore store = zkTester.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
ApplicationAttemptId attemptIdRemoved = ApplicationAttemptId.fromString(
"appattempt_1352994193343_0002_000001");
ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
storeApp(store, appIdRemoved, submitTime, startTime);
storeAttempt(store, attemptIdRemoved,
"container_1352994193343_0002_01_000001", null, null, dispatcher);
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appIdRemoved);
ApplicationStateData appStateRemoved =
ApplicationStateData.newInstance(
submitTime, startTime, context, "user1");
appStateRemoved.attempts.put(attemptIdRemoved, null);
store.removeApplicationStateInternal(appStateRemoved);
try {
store.removeApplicationStateInternal(appStateRemoved);
} catch (KeeperException.NoNodeException nne) {
fail("NoNodeException should not happen.");
}
store.close();
}
private static String createPath(String... parts) {
return Joiner.on("/").join(parts);
}
private static Configuration createConfForAppNodeSplit(int splitIndex) {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, splitIndex);
return conf;
}
private static RMApp createMockAppForRemove(ApplicationId appId,
ApplicationAttemptId... attemptIds) {
RMApp app = mock(RMApp.class);
ApplicationSubmissionContextPBImpl context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appId);
when(app.getApplicationSubmissionContext()).thenReturn(context);
when(app.getUser()).thenReturn("test");
if (attemptIds.length > 0) {
HashMap<ApplicationAttemptId, RMAppAttempt> attempts = new HashMap<>();
for (ApplicationAttemptId attemptId : attemptIds) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(attemptId);
attempts.put(attemptId, appAttempt);
}
when(app.getAppAttempts()).thenReturn(attempts);
}
return app;
}
private static void verifyLoadedApp(ApplicationStateData appState,
ApplicationId appId, String user, long submitTime, long startTime,
RMAppState state, long finishTime, String diagnostics) {
// Check if app is loaded correctly
assertNotNull("App " + appId + " should have been loaded.", appState);
assertEquals("App submit time in app state", submitTime,
appState.getSubmitTime());
assertEquals("App start time in app state", startTime,
appState.getStartTime());
assertEquals("App ID in app state", appId,
appState.getApplicationSubmissionContext().getApplicationId());
assertEquals("App state", state, appState.getState());
assertEquals("Finish time in app state", finishTime,
appState.getFinishTime());
assertEquals("User in app state", user, appState.getUser());
assertEquals("Diagnostics in app state", diagnostics,
appState.getDiagnostics());
}
private static void verifyLoadedApp(RMState rmState,
ApplicationId appId, long submitTime, long startTime, long finishTime,
boolean isFinished, List<ApplicationAttemptId> attempts) {
verifyLoadedApp(rmState, appId, submitTime, startTime, finishTime,
isFinished, attempts, null, null);
}
private static void verifyLoadedApp(RMState rmState,
ApplicationId appId, long submitTime, long startTime, long finishTime,
boolean isFinished, List<ApplicationAttemptId> attempts,
List<Integer> amExitStatuses,
List<FinalApplicationStatus> finalStatuses) {
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
ApplicationStateData appState = rmAppState.get(appId);
assertNotNull(appId + " is not there in loaded apps", appState);
verifyLoadedApp(appState, appId, "test", submitTime, startTime,
isFinished ? RMAppState.FINISHED : null, finishTime,
isFinished ? "appDiagnostics" : "");
// Check attempt state.
if (attempts != null) {
assertEquals("Attempts loaded for app " + appId, attempts.size(),
appState.attempts.size());
if (finalStatuses != null && amExitStatuses != null) {
for (int i = 0; i < attempts.size(); i++) {
if (finalStatuses.get(i) != null) {
verifyLoadedAttempt(appState, attempts.get(i),
amExitStatuses.get(i), true);
} else {
verifyLoadedAttempt(appState, attempts.get(i),
amExitStatuses.get(i), false);
}
}
}
} else {
assertEquals(
"Attempts loaded for app " + appId, 0, appState.attempts.size());
}
}
private static void verifyLoadedAttempt(ApplicationStateData appState,
ApplicationAttemptId attemptId, int amExitStatus, boolean isFinished) {
verifyLoadedAttempt(appState, attemptId, isFinished ? "myTrackingUrl" :
"N/A", ContainerId.newContainerId(attemptId, 1), null,
isFinished ? RMAppAttemptState.FINISHED : null, isFinished ?
"attemptDiagnostics" : "", 0, amExitStatus,
isFinished ? FinalApplicationStatus.SUCCEEDED : null);
}
private static void verifyLoadedAttempt(ApplicationStateData appState,
ApplicationAttemptId attemptId, String trackingURL,
ContainerId masterContainerId, SecretKey clientTokenKey,
RMAppAttemptState state, String diagnostics, long finishTime,
int amExitStatus, FinalApplicationStatus finalStatus) {
ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId);
// Check if attempt is loaded correctly
assertNotNull(
"Attempt " + attemptId + " should have been loaded.", attemptState);
assertEquals("Attempt Id in attempt state",
attemptId, attemptState.getAttemptId());
assertEquals("Master Container Id in attempt state",
masterContainerId, attemptState.getMasterContainer().getId());
if (null != clientTokenKey) {
assertArrayEquals("Client token key in attempt state",
clientTokenKey.getEncoded(), attemptState.getAppAttemptTokens().
getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
}
assertEquals("Attempt state", state, attemptState.getState());
assertEquals("Finish time in attempt state", finishTime,
attemptState.getFinishTime());
assertEquals("Diagnostics in attempt state", diagnostics,
attemptState.getDiagnostics());
assertEquals("AM Container exit status in attempt state", amExitStatus,
attemptState.getAMContainerExitStatus());
assertEquals("Final app status in attempt state", finalStatus,
attemptState.getFinalApplicationStatus());
assertEquals("Tracking URL in attempt state", trackingURL,
attemptState.getFinalTrackingUrl());
}
private static ApplicationStateData createAppState(
ApplicationSubmissionContext ctxt, long submitTime, long startTime,
long finishTime, boolean isFinished) {
return ApplicationStateData.newInstance(submitTime, startTime, "test",
ctxt, isFinished ? RMAppState.FINISHED : null, isFinished ?
"appDiagnostics" : "", isFinished ? finishTime : 0, null);
}
private static ApplicationAttemptStateData createFinishedAttempt(
ApplicationAttemptId attemptId, Container container, long startTime,
int amExitStatus) {
Map<String, Long> resourceSecondsMap = new HashMap<>();
Map<String, Long> preemptedResoureSecondsMap = new HashMap<>();
resourceSecondsMap
.put(ResourceInformation.MEMORY_MB.getName(), 0L);
resourceSecondsMap
.put(ResourceInformation.VCORES.getName(), 0L);
preemptedResoureSecondsMap.put(ResourceInformation.MEMORY_MB.getName(),
0L);
preemptedResoureSecondsMap
.put(ResourceInformation.VCORES.getName(), 0L);
return ApplicationAttemptStateData.newInstance(attemptId,
container, null, startTime, RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
amExitStatus, 0, resourceSecondsMap, preemptedResoureSecondsMap);
}
private ApplicationAttemptId storeAttempt(RMStateStore store,
TestDispatcher dispatcher, String appAttemptIdStr,
AMRMTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
boolean createContainer) throws Exception {
ApplicationAttemptId attemptId =
ApplicationAttemptId.fromString(appAttemptIdStr);
Token<AMRMTokenIdentifier> appAttemptToken = null;
if (appTokenMgr != null) {
appAttemptToken = generateAMRMToken(attemptId, appTokenMgr);
}
SecretKey clientTokenKey = null;
if (clientToAMTokenMgr != null) {
clientTokenKey = clientToAMTokenMgr.createMasterKey(attemptId);
Credentials attemptCred = new Credentials();
attemptCred.addSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME,
clientTokenKey.getEncoded());
}
ContainerId containerId = null;
if (createContainer) {
containerId = ContainerId.newContainerId(attemptId, 1);
}
storeAttempt(store, attemptId, containerId.toString(), appAttemptToken,
clientTokenKey, dispatcher);
return attemptId;
}
private void finishAppWithAttempts(RMState state, RMStateStore store,
TestDispatcher dispatcher, ApplicationAttemptId attemptId,
long submitTime, long startTime, int amExitStatus, long finishTime,
boolean createNewApp) throws Exception {
ApplicationId appId = attemptId.getApplicationId();
ApplicationStateData appStateNew = null;
if (createNewApp) {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appId);
context.setAMContainerSpec(new ContainerLaunchContextPBImpl());
appStateNew = createAppState(context, submitTime, startTime, finishTime,
true);
} else {
ApplicationStateData appState = state.getApplicationState().get(appId);
appStateNew = createAppState(appState.getApplicationSubmissionContext(),
submitTime, startTime, finishTime, true);
appStateNew.attempts.putAll(appState.attempts);
}
store.updateApplicationState(appStateNew);
waitNotify(dispatcher);
Container container = new ContainerPBImpl();
container.setId(ContainerId.newContainerId(attemptId, 1));
ApplicationAttemptStateData newAttemptState =
createFinishedAttempt(attemptId, container, startTime, amExitStatus);
updateAttempt(store, dispatcher, newAttemptState);
}
private void storeAppWithAttempts(RMStateStore store,
TestDispatcher dispatcher, ApplicationAttemptId attemptId,
long submitTime, long startTime) throws Exception {
storeAppWithAttempts(store, dispatcher, submitTime, startTime, null, null,
attemptId);
}
private void storeApp(RMStateStore store, TestDispatcher dispatcher,
ApplicationId appId, long submitTime, long startTime) throws Exception {
storeApp(store, appId, submitTime, startTime);
waitNotify(dispatcher);
}
private void storeAppWithAttempts(RMStateStore store,
TestDispatcher dispatcher, long submitTime, long startTime,
AMRMTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
ApplicationAttemptId attemptId, ApplicationAttemptId... attemptIds)
throws Exception {
ApplicationId appId = attemptId.getApplicationId();
storeApp(store, dispatcher, appId, submitTime, startTime);
storeAttempt(store, dispatcher, attemptId.toString(), appTokenMgr,
clientToAMTokenMgr, true);
for (ApplicationAttemptId attempt : attemptIds) {
storeAttempt(store, dispatcher, attempt.toString(), appTokenMgr,
clientToAMTokenMgr, true);
}
}
private static void removeApps(RMStateStore store,
Map<ApplicationId, ApplicationAttemptId[]> appWithAttempts) {
for (Map.Entry<ApplicationId, ApplicationAttemptId[]> entry :
appWithAttempts.entrySet()) {
RMApp mockApp = createMockAppForRemove(entry.getKey(), entry.getValue());
store.removeApplication(mockApp);
}
}
private static void verifyAppPathPath(RMStateStore store, ApplicationId appId,
int splitIndex) throws Exception {
String appIdStr = appId.toString();
String appParent = appIdStr.substring(0, appIdStr.length() - splitIndex);
String appPath = appIdStr.substring(appIdStr.length() - splitIndex);
String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT,
ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, String.valueOf(splitIndex),
appParent, appPath);
assertTrue("Application with id " + appIdStr + " does not exist as per " +
"split in state store.", ((ZKRMStateStore)store).exists(path));
}
private static void verifyAppInHierarchicalPath(RMStateStore store,
String appId, int splitIdx) throws Exception {
String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT);
if (splitIdx != 0) {
path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
String.valueOf(splitIdx), appId.substring(0, appId.length() -
splitIdx), appId.substring(appId.length() - splitIdx));
} else {
path = createPath(path, appId);
}
assertTrue(appId + " should exist in path " + path,
((ZKRMStateStore)store).exists(createPath(path)));
}
private static void assertHierarchicalPaths(RMStateStore store,
Map<Integer, Integer> pathToApps) throws Exception {
for (Map.Entry<Integer, Integer> entry : pathToApps.entrySet()) {
String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT);
if (entry.getKey() != 0) {
path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
String.valueOf(entry.getKey()));
}
assertEquals("Number of childrens for path " + path,
(int) entry.getValue(),
((ZKRMStateStore)store).getChildren(path).size());
}
}
// Test to verify storing of apps and app attempts in ZK state store with app
// node split index configured more than 0.
@Test
public void testAppNodeSplit() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
long submitTime = System.currentTimeMillis();
long startTime = submitTime + 1234;
Configuration conf = new YarnConfiguration();
// Get store with app node split config set as 1.
RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
// Create RM Context and app token manager.
RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(store);
AMRMTokenSecretManager appTokenMgr =
spy(new AMRMTokenSecretManager(conf, rmContext));
MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
new ClientToAMTokenSecretManagerInRM();
// Store app1.
ApplicationId appId1 = ApplicationId.newInstance(1352994193343L, 1);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId1, 1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId1, 2);
storeAppWithAttempts(store, dispatcher, submitTime, startTime,
appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
// Store app2 with app id application_1352994193343_120213.
ApplicationId appId21 = ApplicationId.newInstance(1352994193343L, 120213);
storeApp(store, appId21, submitTime, startTime);
waitNotify(dispatcher);
// Store another app which will be removed.
ApplicationId appIdRemoved = ApplicationId.newInstance(1352994193343L, 2);
ApplicationAttemptId attemptIdRemoved =
ApplicationAttemptId.newInstance(appIdRemoved, 1);
storeAppWithAttempts(store, dispatcher, submitTime, startTime,
null, null, attemptIdRemoved);
// Remove the app.
RMApp mockRemovedApp =
createMockAppForRemove(appIdRemoved, attemptIdRemoved);
store.removeApplication(mockRemovedApp);
// Close state store
store.close();
// Load state store
store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
store.setRMDispatcher(dispatcher);
RMState state = store.loadState();
// Check if application_1352994193343_120213 (i.e. app2) exists in state
// store as per split index.
verifyAppPathPath(store, appId21, 1);
// Verify loaded apps and attempts based on the operations we did before
// reloading the state store.
verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
-1000), Lists.newArrayList((FinalApplicationStatus) null, null));
// Update app state for app1.
finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
startTime, 100, 1234, false);
// Test updating app/attempt for app whose initial state is not saved
ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
ApplicationAttemptId dummyAttemptId =
ApplicationAttemptId.newInstance(dummyAppId, 6);
finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
startTime, 111, 1234, true);
// Close the store
store.close();
// Check updated application state.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
store.setRMDispatcher(dispatcher);
RMState newRMState = store.loadState();
verifyLoadedApp(newRMState, dummyAppId, submitTime, startTime, 1234, true,
Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
verifyLoadedApp(newRMState, appId1, submitTime, startTime, 1234, true,
Lists.newArrayList(attemptId1, attemptId2),
Lists.newArrayList(-1000, 100), Lists.newArrayList(null,
FinalApplicationStatus.SUCCEEDED));
// assert store is in expected state after everything is cleaned
assertTrue("Store is not in expected state", zkTester.isFinalStateValid());
store.close();
}
// Test to verify storing of apps and app attempts in ZK state store with app
// node split index config changing across restarts.
@Test
public void testAppNodeSplitChangeAcrossRestarts() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
long submitTime = System.currentTimeMillis();
long startTime = submitTime + 1234;
Configuration conf = new YarnConfiguration();
// Create store with app node split set as 1.
RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(store);
AMRMTokenSecretManager appTokenMgr =
spy(new AMRMTokenSecretManager(conf, rmContext));
MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
new ClientToAMTokenSecretManagerInRM();
// Store app1 with 2 attempts.
ApplicationId appId1 = ApplicationId.newInstance(1442994194053L, 1);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId1, 1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId1, 2);
storeAppWithAttempts(store, dispatcher, submitTime, startTime,
appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
// Store app2 and associated attempt.
ApplicationId appId11 = ApplicationId.newInstance(1442994194053L, 2);
ApplicationAttemptId attemptId11 =
ApplicationAttemptId.newInstance(appId11, 1);
storeAppWithAttempts(store, dispatcher, attemptId11, submitTime, startTime);
// Close state store
store.close();
// Load state store with app node split config of 2.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(2));
store.setRMDispatcher(dispatcher);
RMState state = store.loadState();
ApplicationId appId21 = ApplicationId.newInstance(1442994194053L, 120213);
storeApp(store, dispatcher, appId21, submitTime, startTime);
// Check if app is loaded correctly despite change in split index.
verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
-1000), Lists.newArrayList((FinalApplicationStatus) null, null));
// Finish app/attempt state
finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
startTime, 100, 1234, false);
// Test updating app/attempt for app whose initial state is not saved
ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
ApplicationAttemptId dummyAttemptId =
ApplicationAttemptId.newInstance(dummyAppId, 6);
finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
startTime, 111, 1234, true);
// Close the store
store.close();
// Load state store this time with split index of 0.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(0));
store.setRMDispatcher(dispatcher);
state = store.loadState();
assertEquals("Number of Apps loaded should be 4.", 4,
state.getApplicationState().size());
verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
100), Lists.newArrayList(null, FinalApplicationStatus.SUCCEEDED));
// Remove attempt1
store.removeApplicationAttempt(attemptId1);
ApplicationId appId31 = ApplicationId.newInstance(1442994195071L, 45);
storeApp(store, dispatcher, appId31, submitTime, startTime);
// Close state store.
store.close();
// Load state store with split index of 3.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
store.setRMDispatcher(dispatcher);
state = store.loadState();
assertEquals("Number of apps loaded should be 5.", 5,
state.getApplicationState().size());
verifyLoadedApp(state, dummyAppId, submitTime, startTime, 1234, true,
Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
verifyLoadedApp(state, appId31, submitTime, startTime, 0, false, null);
verifyLoadedApp(state, appId21, submitTime, startTime, 0, false, null);
verifyLoadedApp(state, appId11, submitTime, startTime, 0, false,
Lists.newArrayList(attemptId11), Lists.newArrayList(-1000),
Lists.newArrayList((FinalApplicationStatus) null));
verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
Lists.newArrayList(attemptId2), Lists.newArrayList(100),
Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
// Store another app.
ApplicationId appId41 = ApplicationId.newInstance(1442994195087L, 1);
storeApp(store, dispatcher, appId41, submitTime, startTime);
// Check how many apps exist in each of the hierarchy based paths. 0 paths
// should exist in "HIERARCHIES/4" path as app split index was never set
// as 4 in tests above.
assertHierarchicalPaths(store, ImmutableMap.of(0, 2, 1, 1, 2, 2,
3, 1, 4, 0));
verifyAppInHierarchicalPath(store, "application_1442994195087_0001", 3);
ApplicationId appId71 = ApplicationId.newInstance(1442994195087L, 7);
//storeApp(store, dispatcher, appId71, submitTime, startTime);
storeApp(store, appId71, submitTime, startTime);
waitNotify(dispatcher);
ApplicationAttemptId attemptId71 =
ApplicationAttemptId.newInstance(appId71, 1);
storeAttempt(store, ApplicationAttemptId.newInstance(appId71, 1),
ContainerId.newContainerId(attemptId71, 1).toString(), null, null,
dispatcher);
// Remove applications.
removeApps(store, ImmutableMap.of(appId11, new ApplicationAttemptId[]
{attemptId11}, appId71, new ApplicationAttemptId[] {attemptId71},
appId41, new ApplicationAttemptId[0], appId31,
new ApplicationAttemptId[0], appId21, new ApplicationAttemptId[0]));
removeApps(store, ImmutableMap.of(dummyAppId,
new ApplicationAttemptId[] {dummyAttemptId}, appId1,
new ApplicationAttemptId[] {attemptId1, attemptId2}));
store.close();
// Load state store with split index of 3 again. As all apps have been
// removed nothing should be loaded back.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
store.setRMDispatcher(dispatcher);
state = store.loadState();
assertEquals("Number of apps loaded should be 0.", 0,
state.getApplicationState().size());
// Close the state store.
store.close();
}
private static Configuration createConfForDelegationTokenNodeSplit(
int splitIndex) {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
splitIndex);
return conf;
}
private void verifyDelegationTokensStateStore(
TestZKRMStateStoreTester zkTester,
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal,
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex,
int sequenceNumber) throws Exception {
RMStateStore.RMDTSecretManagerState secretManagerState =
zkTester.store.loadState().getRMDTSecretManagerState();
assertEquals("Unexpected token state",
tokensWithRenewal, secretManagerState.getTokenState());
assertEquals("Unexpected sequence number", sequenceNumber,
secretManagerState.getDTSequenceNumber());
for (Map.Entry<RMDelegationTokenIdentifier, Integer> tokenEntry
: tokensWithIndex.entrySet()) {
assertTrue("Expected to find token " + tokenEntry.getKey()
+ " in zookeeper but did not",
zkTester.delegationTokenExists(tokenEntry.getKey(),
tokenEntry.getValue()));
}
}
private void verifyDelegationTokenInStateStore(
TestZKRMStateStoreTester zkTester, RMDelegationTokenIdentifier token,
long renewDate, int index) throws Exception {
RMStateStore.RMDTSecretManagerState secretManagerState =
zkTester.store.loadState().getRMDTSecretManagerState();
Map<RMDelegationTokenIdentifier, Long> tokenState =
secretManagerState.getTokenState();
assertTrue("token state does not contain " + token,
tokenState.containsKey(token));
assertTrue("token state does not contain a token with renewal " + renewDate,
tokenState.containsValue(renewDate));
assertTrue("Token " + token + "should exist but was not found in ZooKeeper",
zkTester.delegationTokenExists(token, index));
}
private RMDelegationTokenIdentifier storeUpdateAndVerifyDelegationToken(
TestZKRMStateStoreTester zkTester,
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal,
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex,
int sequenceNumber, int split) throws Exception {
// Store token
RMDelegationTokenIdentifier token =
new RMDelegationTokenIdentifier(new Text("owner"),
new Text("renewer"), new Text("realuser"));
assertFalse("Token should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token, split));
token.setSequenceNumber(sequenceNumber);
Long renewDate = System.currentTimeMillis();
zkTester.store.storeRMDelegationToken(token, renewDate);
modifyRMDelegationTokenState();
tokensWithRenewal.put(token, renewDate);
tokensWithIndex.put(token, split);
// Verify the token
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber);
// Update the token
renewDate = System.currentTimeMillis();
zkTester.store.updateRMDelegationToken(token, renewDate);
tokensWithRenewal.put(token, renewDate);
tokensWithIndex.put(token, split);
// Verify updates
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber);
return token;
}
@Test
public void testDelegationTokenSplitIndexConfig() throws Exception {
// Valid values
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(0)).close();
assertEquals("Incorrect split index",
0, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(1)).close();
assertEquals("Incorrect split index",
1, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(2)).close();
assertEquals("Incorrect split index",
2, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(3)).close();
assertEquals("Incorrect split index",
3, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(4)).close();
assertEquals("Incorrect split index",
4, zkTester.getDelegationTokenNodeSplitIndex());
// Invalid values --> override to 0
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(-1)).close();
assertEquals("Incorrect split index",
0, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(5)).close();
assertEquals("Incorrect split index",
0, zkTester.getDelegationTokenNodeSplitIndex());
}
@Test
public void testDelegationTokenNodeNoSplit() throws Exception {
testDelegationTokenNode(0);
}
@Test
public void testDelegationTokenNodeWithSplitOne() throws Exception {
testDelegationTokenNode(1);
}
@Test
public void testDelegationTokenNodeWithSplitTwo() throws Exception {
testDelegationTokenNode(2);
}
@Test
public void testDelegationTokenNodeWithSplitThree() throws Exception {
testDelegationTokenNode(3);
}
@Test
public void testDelegationTokenNodeWithSplitFour() throws Exception {
testDelegationTokenNode(4);
}
public void testDelegationTokenNode(int split) throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
Configuration conf = createConfForDelegationTokenNodeSplit(split);
RMStateStore store = zkTester.getRMStateStore(conf);
// Store the token and verify
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new HashMap<>();
int sequenceNumber = 0;
RMDelegationTokenIdentifier token = storeUpdateAndVerifyDelegationToken(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, split);
// Delete the token and verify
store.removeRMDelegationToken(token);
RMStateStore.RMDTSecretManagerState state =
store.loadState().getRMDTSecretManagerState();
tokensWithRenewal.clear();
tokensWithIndex.clear();
assertEquals("Unexpected token state",
tokensWithRenewal, state.getTokenState());
assertEquals("Unexpected sequence number",
sequenceNumber, state.getDTSequenceNumber());
assertFalse("Token should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token, split));
store.close();
}
@Test
public void testDelegationTokenNodeWithSplitMultiple() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
Configuration conf = createConfForDelegationTokenNodeSplit(1);
RMStateStore store = zkTester.getRMStateStore(conf);
// With the split set to 1, we can store 10 tokens under a znode (i.e. 0-9)
// Try to store more than 10
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new HashMap<>();
Set<RMDelegationTokenIdentifier> tokensToDelete = new HashSet<>();
int sequenceNumber = 0;
for (int i = 0; i <= 12; i++) {
RMDelegationTokenIdentifier token =
new RMDelegationTokenIdentifier(new Text("owner" + i),
new Text("renewer" + i), new Text("realuser" + i));
sequenceNumber = i;
token.setSequenceNumber(sequenceNumber);
assertFalse("Token should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token, 1));
Long renewDate = System.currentTimeMillis();
store.storeRMDelegationToken(token, renewDate);
modifyRMDelegationTokenState();
tokensWithRenewal.put(token, renewDate);
tokensWithIndex.put(token, 1);
switch (i) {
case 0:
case 3:
case 6:
case 11:
tokensToDelete.add(token);
break;
default:
break;
}
}
// Verify
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber);
// Try deleting some tokens and adding some new ones
for (RMDelegationTokenIdentifier tokenToDelete : tokensToDelete) {
store.removeRMDelegationToken(tokenToDelete);
tokensWithRenewal.remove(tokenToDelete);
tokensWithIndex.remove(tokenToDelete);
}
for (int i = 13; i <= 22; i++) {
RMDelegationTokenIdentifier token =
new RMDelegationTokenIdentifier(new Text("owner" + i),
new Text("renewer" + i), new Text("realuser" + i));
sequenceNumber = i;
token.setSequenceNumber(sequenceNumber);
Long renewDate = System.currentTimeMillis();
store.storeRMDelegationToken(token, renewDate);
modifyRMDelegationTokenState();
tokensWithRenewal.put(token, renewDate);
tokensWithIndex.put(token, 1);
}
// Verify
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber);
for (RMDelegationTokenIdentifier token : tokensToDelete) {
assertFalse("Token " + token
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token, 1));
}
store.close();
}
@Test
public void testDelegationTokenNodeWithSplitChangeAcrossRestarts()
throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new HashMap<>();
int sequenceNumber = 0;
// Start the store with index 1
Configuration conf = createConfForDelegationTokenNodeSplit(1);
RMStateStore store = zkTester.getRMStateStore(conf);
// Store a token with index 1
RMDelegationTokenIdentifier token1 = storeUpdateAndVerifyDelegationToken(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 1);
store.close();
// Start the store with index 2
conf = createConfForDelegationTokenNodeSplit(2);
store = zkTester.getRMStateStore(conf);
// Verify token1 is still there and under the /1/ znode
verifyDelegationTokenInStateStore(
zkTester, token1, tokensWithRenewal.get(token1), 1);
// Store a token with index 2
sequenceNumber++;
RMDelegationTokenIdentifier token2 = storeUpdateAndVerifyDelegationToken(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 2);
// Update and verify token1
long renewDate1 = System.currentTimeMillis();
zkTester.store.updateRMDelegationToken(token1, renewDate1);
tokensWithRenewal.put(token1, renewDate1);
verifyDelegationTokenInStateStore(
zkTester, token1, tokensWithRenewal.get(token1), 1);
store.close();
// Start the store with index 0
conf = createConfForDelegationTokenNodeSplit(0);
store = zkTester.getRMStateStore(conf);
// Verify token1 is still there and under the /1/ znode
verifyDelegationTokenInStateStore(
zkTester, token1, tokensWithRenewal.get(token1), 1);
// Verify token2 is still there and under the /2/ znode
verifyDelegationTokenInStateStore(
zkTester, token2, tokensWithRenewal.get(token2), 2);
// Store a token with no index
sequenceNumber++;
RMDelegationTokenIdentifier token0 = storeUpdateAndVerifyDelegationToken(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 0);
store.close();
// Start the store with index 3
conf = createConfForDelegationTokenNodeSplit(3);
store = zkTester.getRMStateStore(conf);
// Verify token1 is still there and under the /1/ znode
verifyDelegationTokenInStateStore(
zkTester, token1, tokensWithRenewal.get(token1), 1);
// Verify token2 is still there and under the /2/ znode
verifyDelegationTokenInStateStore(
zkTester, token2, tokensWithRenewal.get(token2), 2);
// Verify token0 is still there and under the token root node
verifyDelegationTokenInStateStore(
zkTester, token0, tokensWithRenewal.get(token0), 0);
// Delete all tokens and verify
for (RMDelegationTokenIdentifier token : tokensWithRenewal.keySet()) {
store.removeRMDelegationToken(token);
}
tokensWithRenewal.clear();
tokensWithIndex.clear();
verifyDelegationTokensStateStore(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber);
assertFalse("Token " + token1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token1, 1));
assertFalse("Token " + token1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token2, 2));
assertFalse("Token " + token1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token0, 0));
// Store a token with index 3
sequenceNumber++;
storeUpdateAndVerifyDelegationToken(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber, 3);
store.close();
}
@Test
public void testAppSubmissionContextIsPrunedInFinalApplicationState()
throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
ApplicationId appId = ApplicationId.fromString("application_1234_0010");
Configuration conf = createConfForDelegationTokenNodeSplit(1);
RMStateStore store = zkTester.getRMStateStore(conf);
ApplicationSubmissionContext ctx =
new ApplicationSubmissionContextPBImpl();
ctx.setApplicationId(appId);
ctx.setQueue("a_queue");
ContainerLaunchContextPBImpl containerLaunchCtx =
new ContainerLaunchContextPBImpl();
containerLaunchCtx.setCommands(Collections.singletonList("a_command"));
ctx.setAMContainerSpec(containerLaunchCtx);
Resource resource = new ResourcePBImpl();
resource.setMemorySize(17L);
ctx.setResource(resource);
Map<String, String> schedulingPropertiesMap =
Collections.singletonMap("a_key", "a_value");
ctx.setApplicationSchedulingPropertiesMap(schedulingPropertiesMap);
ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl();
appState.setState(RMAppState.RUNNING);
appState.setApplicationSubmissionContext(ctx);
store.storeApplicationStateInternal(appId, appState);
RMState rmState = store.loadState();
assertEquals(1, rmState.getApplicationState().size());
ctx = rmState.getApplicationState().get(appId)
.getApplicationSubmissionContext();
appState.setState(RMAppState.RUNNING);
store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null));
rmState = store.loadState();
ctx = rmState.getApplicationState().get(appId)
.getApplicationSubmissionContext();
assertEquals("ApplicationSchedulingPropertiesMap should not have been "
+ "pruned from the application submission context before the "
+ "FINISHED state",
schedulingPropertiesMap, ctx.getApplicationSchedulingPropertiesMap());
appState.setState(RMAppState.FINISHED);
store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null));
rmState = store.loadState();
ctx = rmState.getApplicationState().get(appId)
.getApplicationSubmissionContext();
assertEquals(appId, ctx.getApplicationId());
assertEquals("a_queue", ctx.getQueue());
assertNotNull(ctx.getAMContainerSpec());
assertEquals(17L, ctx.getResource().getMemorySize());
assertEquals("ApplicationSchedulingPropertiesMap should have been pruned"
+ " from the application submission context when in FINISHED STATE",
Collections.emptyMap(), ctx.getApplicationSchedulingPropertiesMap());
store.close();
}
}