blob: ea2cb2eed700360f66cd18c6b725ca7c58aab701 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestNMLeveldbStateStoreService {
private static final File TMP_DIR = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestNMLeveldbStateStoreService.class.getName());
YarnConfiguration conf;
NMLeveldbStateStoreService stateStore;
@Before
public void setup() throws IOException {
FileUtil.fullyDelete(TMP_DIR);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.NM_RECOVERY_DIR, TMP_DIR.toString());
restartStateStore();
}
@After
public void cleanup() throws IOException {
if (stateStore != null) {
stateStore.close();
}
FileUtil.fullyDelete(TMP_DIR);
}
private void restartStateStore() throws IOException {
// need to close so leveldb releases database lock
if (stateStore != null) {
stateStore.close();
}
stateStore = new NMLeveldbStateStoreService();
stateStore.init(conf);
stateStore.start();
}
private void verifyEmptyState() throws IOException {
RecoveredLocalizationState state = stateStore.loadLocalizationState();
assertNotNull(state);
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertNotNull(pubts);
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
assertTrue(state.getUserResources().isEmpty());
}
@Test
public void testIsNewlyCreated() throws IOException {
assertTrue(stateStore.isNewlyCreated());
restartStateStore();
assertFalse(stateStore.isNewlyCreated());
}
@Test
public void testEmptyState() throws IOException {
assertTrue(stateStore.canRecover());
verifyEmptyState();
}
@Test
public void testCheckVersion() throws IOException {
// default version
Version defaultVersion = stateStore.getCurrentVersion();
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
// compatible version
Version compatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2);
stateStore.storeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
restartStateStore();
// overwrite the compatible version
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
// incompatible version
Version incompatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion());
stateStore.storeVersion(incompatibleVersion);
try {
restartStateStore();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for NM state:"));
}
}
@Test
public void testApplicationStorage() throws IOException {
// test empty when no state
RecoveredApplicationsState state = stateStore.loadApplicationsState();
assertTrue(state.getApplications().isEmpty());
// store an application and verify recovered
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId1).getProto());
builder.setUser("user1");
ContainerManagerApplicationProto appProto1 = builder.build();
stateStore.storeApplication(appId1, appProto1);
restartStateStore();
state = stateStore.loadApplicationsState();
assertEquals(1, state.getApplications().size());
assertEquals(appProto1, state.getApplications().get(0));
// add a new app
final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
builder = ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId2).getProto());
builder.setUser("user2");
ContainerManagerApplicationProto appProto2 = builder.build();
stateStore.storeApplication(appId2, appProto2);
restartStateStore();
state = stateStore.loadApplicationsState();
assertEquals(2, state.getApplications().size());
assertTrue(state.getApplications().contains(appProto1));
assertTrue(state.getApplications().contains(appProto2));
// test removing an application
stateStore.removeApplication(appId2);
restartStateStore();
state = stateStore.loadApplicationsState();
assertEquals(1, state.getApplications().size());
assertEquals(appProto1, state.getApplications().get(0));
}
@Test
public void testContainerStorage() throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers =
stateStore.loadContainersState();
assertTrue(recoveredContainers.isEmpty());
// create a container request
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
Resource containerResource = Resource.newInstance(1024, 2);
StartContainerRequest containerReq =
createContainerRequest(containerId, containerResource);
// store a container and verify recovered
long containerStartTime = System.currentTimeMillis();
stateStore.storeContainer(containerId, 0, containerStartTime, containerReq);
// verify the container version key is not stored for new containers
DB db = stateStore.getDB();
assertNull("version key present for new container", db.get(bytes(
stateStore.getContainerVersionKey(containerId.toString()))));
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(containerStartTime, rcs.getStartTime());
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerResource, rcs.getCapability());
// store a new container record without StartContainerRequest
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
stateStore.storeContainerLaunched(containerId1);
recoveredContainers = stateStore.loadContainersState();
// check whether the new container record is discarded
assertEquals(1, recoveredContainers.size());
// queue the container, and verify recovered
stateStore.storeContainerQueued(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerResource, rcs.getCapability());
// launch the container, add some diagnostics, and verify recovered
StringBuilder diags = new StringBuilder();
stateStore.storeContainerLaunched(containerId);
diags.append("some diags for container");
stateStore.storeContainerDiagnostics(containerId, diags);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics());
assertEquals(containerResource, rcs.getCapability());
// pause the container, and verify recovered
stateStore.storeContainerPaused(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
// Resume the container
stateStore.removeContainerPaused(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
// increase the container size, and verify recovered
ContainerTokenIdentifier updateTokenIdentifier =
new ContainerTokenIdentifier(containerId, "host", "user",
Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
Priority.newInstance(7), 13579);
stateStore
.storeContainerUpdateToken(containerId, updateTokenIdentifier);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(Resource.newInstance(2468, 4), rcs.getCapability());
// mark the container killed, add some more diags, and verify recovered
diags.append("some more diags for container");
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerKilled(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertTrue(rcs.getKilled());
ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils
.newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken());
assertEquals(updateTokenIdentifier, tokenReadFromRequest);
assertEquals(diags.toString(), rcs.getDiagnostics());
// add yet more diags, mark container completed, and verify recovered
diags.append("some final diags");
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerCompleted(containerId, 21);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
assertEquals(21, rcs.getExitCode());
assertTrue(rcs.getKilled());
assertEquals(diags.toString(), rcs.getDiagnostics());
// store remainingRetryAttempts, workDir and logDir
stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
stateStore.storeContainerLogDir(containerId, "/test/logdir");
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(6, rcs.getRemainingRetryAttempts());
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertTrue(recoveredContainers.isEmpty());
}
private StartContainerRequest createContainerRequest(
ContainerId containerId, Resource res) {
return createContainerRequestInternal(containerId, res);
}
private StartContainerRequest createContainerRequest(
ContainerId containerId) {
return createContainerRequestInternal(containerId, null);
}
private StartContainerRequest createContainerRequestInternal(ContainerId
containerId, Resource res) {
LocalResource lrsrc = LocalResource.newInstance(
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
1234567890L);
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put("rsrc", lrsrc);
Map<String, String> env = new HashMap<String, String>();
env.put("somevar", "someval");
List<String> containerCmds = new ArrayList<String>();
containerCmds.add("somecmd");
containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice",
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
ByteBuffer containerTokens =
ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, env, containerCmds, serviceData, containerTokens,
acls);
Resource containerRsrc = Resource.newInstance(1357, 3);
if (res != null) {
containerRsrc = res;
}
ContainerTokenIdentifier containerTokenId =
new ContainerTokenIdentifier(containerId, "host", "user",
containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
13579);
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
"tokenservice");
return StartContainerRequest.newInstance(clc, containerToken);
}
@Test
public void testStartResourceLocalization() throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
// start a local resource for an application
Path appRsrcPath = new Path("hdfs://some/app/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto = rsrcPb.getProto();
Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
// restart and verify only app resource is marked in-progress
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
Map<String, RecoveredUserResources> userResources =
state.getUserResources();
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
assertTrue(privts.getLocalizedResources().isEmpty());
assertTrue(privts.getInProgressResources().isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
assertTrue(appts.getLocalizedResources().isEmpty());
assertEquals(1, appts.getInProgressResources().size());
assertEquals(appRsrcLocalPath,
appts.getInProgressResources().get(appRsrcProto));
// start some public and private resources
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
stateStore.startResourceLocalization(null, null, pubRsrcProto1,
pubRsrcLocalPath1);
Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(pubRsrcPath2),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
Path privRsrcPath = new Path("hdfs://some/private/resource");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
// restart and verify resources are marked in-progress
restartStateStore();
state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState();
assertTrue(pubts.getLocalizedResources().isEmpty());
assertEquals(2, pubts.getInProgressResources().size());
assertEquals(pubRsrcLocalPath1,
pubts.getInProgressResources().get(pubRsrcProto1));
assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2));
userResources = state.getUserResources();
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
assertNotNull(privts);
assertTrue(privts.getLocalizedResources().isEmpty());
assertEquals(1, privts.getInProgressResources().size());
assertEquals(privRsrcLocalPath,
privts.getInProgressResources().get(privRsrcProto));
assertEquals(1, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
assertTrue(appts.getLocalizedResources().isEmpty());
assertEquals(1, appts.getInProgressResources().size());
assertEquals(appRsrcLocalPath,
appts.getInProgressResources().get(appRsrcProto));
}
@Test
public void testFinishResourceLocalization() throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
// start and finish a local resource for an application
Path appRsrcPath = new Path("hdfs://some/app/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto = rsrcPb.getProto();
Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
LocalizedResourceProto appLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto)
.setLocalPath(appRsrcLocalPath.toString())
.setSize(1234567L)
.build();
stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
// restart and verify only app resource is completed
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
Map<String, RecoveredUserResources> userResources =
state.getUserResources();
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
assertTrue(privts.getLocalizedResources().isEmpty());
assertTrue(privts.getInProgressResources().isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
assertTrue(appts.getInProgressResources().isEmpty());
assertEquals(1, appts.getLocalizedResources().size());
assertEquals(appLocalizedProto,
appts.getLocalizedResources().iterator().next());
// start some public and private resources
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
stateStore.startResourceLocalization(null, null, pubRsrcProto1,
pubRsrcLocalPath1);
Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(pubRsrcPath2),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
Path privRsrcPath = new Path("hdfs://some/private/resource");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
// finish some of the resources
LocalizedResourceProto pubLocalizedProto1 =
LocalizedResourceProto.newBuilder()
.setResource(pubRsrcProto1)
.setLocalPath(pubRsrcLocalPath1.toString())
.setSize(pubRsrcProto1.getSize())
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
LocalizedResourceProto privLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(privRsrcProto)
.setLocalPath(privRsrcLocalPath.toString())
.setSize(privRsrcProto.getSize())
.build();
stateStore.finishResourceLocalization(user, null, privLocalizedProto);
// restart and verify state
restartStateStore();
state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState();
assertEquals(1, pubts.getLocalizedResources().size());
assertEquals(pubLocalizedProto1,
pubts.getLocalizedResources().iterator().next());
assertEquals(1, pubts.getInProgressResources().size());
assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2));
userResources = state.getUserResources();
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
assertNotNull(privts);
assertEquals(1, privts.getLocalizedResources().size());
assertEquals(privLocalizedProto,
privts.getLocalizedResources().iterator().next());
assertTrue(privts.getInProgressResources().isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
assertTrue(appts.getInProgressResources().isEmpty());
assertEquals(1, appts.getLocalizedResources().size());
assertEquals(appLocalizedProto,
appts.getLocalizedResources().iterator().next());
}
@Test
public void testRemoveLocalizedResource() throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
// go through the complete lifecycle for an application local resource
Path appRsrcPath = new Path("hdfs://some/app/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto = rsrcPb.getProto();
Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
LocalizedResourceProto appLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto)
.setLocalPath(appRsrcLocalPath.toString())
.setSize(1234567L)
.build();
stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath);
restartStateStore();
verifyEmptyState();
// remove an app resource that didn't finish
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath);
restartStateStore();
verifyEmptyState();
// add public and private resources and remove some
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
stateStore.startResourceLocalization(null, null, pubRsrcProto1,
pubRsrcLocalPath1);
LocalizedResourceProto pubLocalizedProto1 =
LocalizedResourceProto.newBuilder()
.setResource(pubRsrcProto1)
.setLocalPath(pubRsrcLocalPath1.toString())
.setSize(789L)
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(pubRsrcPath2),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
LocalizedResourceProto pubLocalizedProto2 =
LocalizedResourceProto.newBuilder()
.setResource(pubRsrcProto2)
.setLocalPath(pubRsrcLocalPath2.toString())
.setSize(7654321L)
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto2);
stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2);
Path privRsrcPath = new Path("hdfs://some/private/resource");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
stateStore.removeLocalizedResource(user, null, privRsrcLocalPath);
// restart and verify state
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertTrue(pubts.getInProgressResources().isEmpty());
assertEquals(1, pubts.getLocalizedResources().size());
assertEquals(pubLocalizedProto1,
pubts.getLocalizedResources().iterator().next());
Map<String, RecoveredUserResources> userResources =
state.getUserResources();
assertTrue(userResources.isEmpty());
}
@Test
public void testDeletionTaskStorage() throws IOException {
// test empty when no state
RecoveredDeletionServiceState state =
stateStore.loadDeletionServiceState();
assertTrue(state.getTasks().isEmpty());
// store a deletion task and verify recovered
DeletionServiceDeleteTaskProto proto =
DeletionServiceDeleteTaskProto.newBuilder()
.setId(7)
.setUser("someuser")
.setSubdir("some/subdir")
.addBasedirs("some/dir/path")
.addBasedirs("some/other/dir/path")
.setDeletionTime(123456L)
.addSuccessorIds(8)
.addSuccessorIds(9)
.build();
stateStore.storeDeletionTask(proto.getId(), proto);
restartStateStore();
state = stateStore.loadDeletionServiceState();
assertEquals(1, state.getTasks().size());
assertEquals(proto, state.getTasks().get(0));
// store another deletion task
DeletionServiceDeleteTaskProto proto2 =
DeletionServiceDeleteTaskProto.newBuilder()
.setId(8)
.setUser("user2")
.setSubdir("subdir2")
.setDeletionTime(789L)
.build();
stateStore.storeDeletionTask(proto2.getId(), proto2);
restartStateStore();
state = stateStore.loadDeletionServiceState();
assertEquals(2, state.getTasks().size());
assertTrue(state.getTasks().contains(proto));
assertTrue(state.getTasks().contains(proto2));
// delete a task and verify gone after recovery
stateStore.removeDeletionTask(proto2.getId());
restartStateStore();
state = stateStore.loadDeletionServiceState();
assertEquals(1, state.getTasks().size());
assertEquals(proto, state.getTasks().get(0));
// delete the last task and verify none left
stateStore.removeDeletionTask(proto.getId());
restartStateStore();
state = stateStore.loadDeletionServiceState();
assertTrue(state.getTasks().isEmpty());
}
@Test
public void testNMTokenStorage() throws IOException {
// test empty when no state
RecoveredNMTokensState state = stateStore.loadNMTokensState();
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getApplicationMasterKeys().isEmpty());
// store a master key and verify recovered
NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
MasterKey currentKey = secretMgr.generateKey();
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getApplicationMasterKeys().isEmpty());
// store a previous key and verify recovered
MasterKey prevKey = secretMgr.generateKey();
stateStore.storeNMTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadNMTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertTrue(state.getApplicationMasterKeys().isEmpty());
// store a few application keys and verify recovered
ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
MasterKey attemptKey1 = secretMgr.generateKey();
stateStore.storeNMTokenApplicationMasterKey(attempt1, attemptKey1);
ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(2, 3), 4);
MasterKey attemptKey2 = secretMgr.generateKey();
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
restartStateStore();
state = stateStore.loadNMTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
state.getApplicationMasterKeys();
assertEquals(2, loadedAppKeys.size());
assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
// add/update/remove keys and verify recovered
ApplicationAttemptId attempt3 = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(5, 6), 7);
MasterKey attemptKey3 = secretMgr.generateKey();
stateStore.storeNMTokenApplicationMasterKey(attempt3, attemptKey3);
stateStore.removeNMTokenApplicationMasterKey(attempt1);
attemptKey2 = prevKey;
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
prevKey = currentKey;
stateStore.storeNMTokenPreviousMasterKey(prevKey);
currentKey = secretMgr.generateKey();
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
loadedAppKeys = state.getApplicationMasterKeys();
assertEquals(2, loadedAppKeys.size());
assertNull(loadedAppKeys.get(attempt1));
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
}
@Test
public void testContainerTokenStorage() throws IOException {
// test empty when no state
RecoveredContainerTokensState state =
stateStore.loadContainerTokensState();
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getActiveTokens().isEmpty());
// store a master key and verify recovered
ContainerTokenKeyGeneratorForTest keygen =
new ContainerTokenKeyGeneratorForTest(new YarnConfiguration());
MasterKey currentKey = keygen.generateKey();
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getActiveTokens().isEmpty());
// store a previous key and verify recovered
MasterKey prevKey = keygen.generateKey();
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertTrue(state.getActiveTokens().isEmpty());
// store a few container tokens and verify recovered
ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
Long expTime1 = 1234567890L;
ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
Long expTime2 = 9876543210L;
stateStore.storeContainerToken(cid1, expTime1);
stateStore.storeContainerToken(cid2, expTime2);
restartStateStore();
state = stateStore.loadContainerTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
Map<ContainerId, Long> loadedActiveTokens =
state.getActiveTokens();
assertEquals(2, loadedActiveTokens.size());
assertEquals(expTime1, loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
// add/update/remove tokens and verify recovered
ContainerId cid3 = BuilderUtils.newContainerId(3, 3, 3, 3);
Long expTime3 = 135798642L;
stateStore.storeContainerToken(cid3, expTime3);
stateStore.removeContainerToken(cid1);
expTime2 += 246897531L;
stateStore.storeContainerToken(cid2, expTime2);
prevKey = currentKey;
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
currentKey = keygen.generateKey();
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
loadedActiveTokens = state.getActiveTokens();
assertEquals(2, loadedActiveTokens.size());
assertNull(loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
assertEquals(expTime3, loadedActiveTokens.get(cid3));
}
@Test
public void testLogDeleterStorage() throws IOException {
// test empty when no state
RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
assertTrue(state.getLogDeleterMap().isEmpty());
// store log deleter state
final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
LogDeleterProto proto1 = LogDeleterProto.newBuilder()
.setUser("user1")
.setDeletionTime(1234)
.build();
stateStore.storeLogDeleter(appId1, proto1);
// restart state store and verify recovered
restartStateStore();
state = stateStore.loadLogDeleterState();
assertEquals(1, state.getLogDeleterMap().size());
assertEquals(proto1, state.getLogDeleterMap().get(appId1));
// store another log deleter
final ApplicationId appId2 = ApplicationId.newInstance(2, 2);
LogDeleterProto proto2 = LogDeleterProto.newBuilder()
.setUser("user2")
.setDeletionTime(5678)
.build();
stateStore.storeLogDeleter(appId2, proto2);
// restart state store and verify recovered
restartStateStore();
state = stateStore.loadLogDeleterState();
assertEquals(2, state.getLogDeleterMap().size());
assertEquals(proto1, state.getLogDeleterMap().get(appId1));
assertEquals(proto2, state.getLogDeleterMap().get(appId2));
// remove a deleter and verify removed after restart and recovery
stateStore.removeLogDeleter(appId1);
restartStateStore();
state = stateStore.loadLogDeleterState();
assertEquals(1, state.getLogDeleterMap().size());
assertEquals(proto2, state.getLogDeleterMap().get(appId2));
// remove last deleter and verify empty after restart and recovery
stateStore.removeLogDeleter(appId2);
restartStateStore();
state = stateStore.loadLogDeleterState();
assertTrue(state.getLogDeleterMap().isEmpty());
}
@Test
public void testCompactionCycle() throws IOException {
final DB mockdb = mock(DB.class);
conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1);
NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() {
@Override
protected void checkVersion() {}
@Override
protected DB openDatabase(Configuration conf) {
return mockdb;
}
};
store.init(conf);
store.start();
verify(mockdb, timeout(10000).atLeastOnce()).compactRange(
(byte[]) isNull(), (byte[]) isNull());
store.close();
}
@Test
public void testUnexpectedKeyDoesntThrowException() throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers = stateStore
.loadContainersState();
assertTrue(recoveredContainers.isEmpty());
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
StartContainerRequest startContainerRequest = storeMockContainer(
containerId);
// add a invalid key
byte[] invalidKey = ("ContainerManager/containers/"
+ containerId.toString() + "/invalidKey1234").getBytes();
stateStore.getDB().put(invalidKey, new byte[1]);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(startContainerRequest, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
// assert unknown keys are cleaned up finally
assertNotNull(stateStore.getDB().get(invalidKey));
stateStore.removeContainer(containerId);
assertNull(stateStore.getDB().get(invalidKey));
}
@Test
public void testAMRMProxyStorage() throws IOException {
RecoveredAMRMProxyState state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(), null);
assertEquals(state.getNextMasterKey(), null);
assertEquals(state.getAppContexts().size(), 0);
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
ApplicationId appId2 = ApplicationId.newInstance(1, 2);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId1, 1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId2, 2);
String key1 = "key1";
String key2 = "key2";
byte[] data1 = "data1".getBytes();
byte[] data2 = "data2".getBytes();
AMRMProxyTokenSecretManager secretManager =
new AMRMProxyTokenSecretManager(stateStore);
secretManager.init(conf);
// Generate currentMasterKey
secretManager.start();
try {
// Add two applications, each with two data entries
stateStore.storeAMRMProxyAppContextEntry(attemptId1, key1, data1);
stateStore.storeAMRMProxyAppContextEntry(attemptId2, key1, data1);
stateStore.storeAMRMProxyAppContextEntry(attemptId1, key2, data2);
stateStore.storeAMRMProxyAppContextEntry(attemptId2, key2, data2);
// restart state store and verify recovered
restartStateStore();
secretManager.setNMStateStoreService(stateStore);
state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(),
secretManager.getCurrentMasterKeyData().getMasterKey());
assertEquals(state.getNextMasterKey(), null);
assertEquals(state.getAppContexts().size(), 2);
// app1
Map<String, byte[]> map = state.getAppContexts().get(attemptId1);
assertNotEquals(map, null);
assertEquals(map.size(), 2);
assertTrue(Arrays.equals(map.get(key1), data1));
assertTrue(Arrays.equals(map.get(key2), data2));
// app2
map = state.getAppContexts().get(attemptId2);
assertNotEquals(map, null);
assertEquals(map.size(), 2);
assertTrue(Arrays.equals(map.get(key1), data1));
assertTrue(Arrays.equals(map.get(key2), data2));
// Generate next master key and remove one entry of app2
secretManager.rollMasterKey();
stateStore.removeAMRMProxyAppContextEntry(attemptId2, key1);
// restart state store and verify recovered
restartStateStore();
secretManager.setNMStateStoreService(stateStore);
state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(),
secretManager.getCurrentMasterKeyData().getMasterKey());
assertEquals(state.getNextMasterKey(),
secretManager.getNextMasterKeyData().getMasterKey());
assertEquals(state.getAppContexts().size(), 2);
// app1
map = state.getAppContexts().get(attemptId1);
assertNotEquals(map, null);
assertEquals(map.size(), 2);
assertTrue(Arrays.equals(map.get(key1), data1));
assertTrue(Arrays.equals(map.get(key2), data2));
// app2
map = state.getAppContexts().get(attemptId2);
assertNotEquals(map, null);
assertEquals(map.size(), 1);
assertTrue(Arrays.equals(map.get(key2), data2));
// Activate next master key and remove all entries of app1
secretManager.activateNextMasterKey();
stateStore.removeAMRMProxyAppContext(attemptId1);
// restart state store and verify recovered
restartStateStore();
secretManager.setNMStateStoreService(stateStore);
state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(),
secretManager.getCurrentMasterKeyData().getMasterKey());
assertEquals(state.getNextMasterKey(), null);
assertEquals(state.getAppContexts().size(), 1);
// app2 only
map = state.getAppContexts().get(attemptId2);
assertNotEquals(map, null);
assertEquals(map.size(), 1);
assertTrue(Arrays.equals(map.get(key2), data2));
} finally {
secretManager.stop();
}
}
@Test
public void testStateStoreForResourceMapping() throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers = stateStore
.loadContainersState();
assertTrue(recoveredContainers.isEmpty());
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
storeMockContainer(containerId);
Container container = mock(Container.class);
when(container.getContainerId()).thenReturn(containerId);
ResourceMappings resourceMappings = new ResourceMappings();
when(container.getResourceMappings()).thenReturn(resourceMappings);
// Store ResourceMapping
stateStore.storeAssignedResources(container, "gpu",
Arrays.<Serializable>asList("1", "2", "3"));
// This will overwrite above
List<Serializable> gpuRes1 = Arrays.<Serializable>asList("1", "2", "4");
stateStore.storeAssignedResources(container, "gpu", gpuRes1);
List<Serializable> fpgaRes =
Arrays.<Serializable>asList("3", "4", "5", "6");
stateStore.storeAssignedResources(container, "fpga", fpgaRes);
List<Serializable> numaRes = Arrays.<Serializable>asList("numa1");
stateStore.storeAssignedResources(container, "numa", numaRes);
// add a invalid key
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
List<Serializable> res = rcs.getResourceMappings()
.getAssignedResources("gpu");
Assert.assertTrue(res.equals(gpuRes1));
Assert.assertTrue(
resourceMappings.getAssignedResources("gpu").equals(gpuRes1));
res = rcs.getResourceMappings().getAssignedResources("fpga");
Assert.assertTrue(res.equals(fpgaRes));
Assert.assertTrue(
resourceMappings.getAssignedResources("fpga").equals(fpgaRes));
res = rcs.getResourceMappings().getAssignedResources("numa");
Assert.assertTrue(res.equals(numaRes));
Assert.assertTrue(
resourceMappings.getAssignedResources("numa").equals(numaRes));
}
@Test
public void testStateStoreNodeHealth() throws IOException {
// keep the working DB clean, break a temp DB
DB keepDB = stateStore.getDB();
DB myMocked = mock(DB.class);
stateStore.setDB(myMocked);
ApplicationId appId = ApplicationId.newInstance(1234, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
DBException toThrow = new DBException();
Mockito.doThrow(toThrow).when(myMocked).
put(any(byte[].class), any(byte[].class));
// write some data
try {
// chosen a simple method could be any of the "void" methods
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
stateStore.storeContainerKilled(containerId);
} catch (IOException ioErr) {
// Cause should be wrapped DBException
assertTrue(ioErr.getCause() instanceof DBException);
// check the store is marked unhealthy
assertFalse("Statestore should have been unhealthy",
stateStore.isHealthy());
return;
} finally {
// restore the working DB
stateStore.setDB(keepDB);
}
Assert.fail("Expected exception not thrown");
}
private StartContainerRequest storeMockContainer(ContainerId containerId)
throws IOException {
// create a container request
LocalResource lrsrc = LocalResource.newInstance(
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
1234567890L);
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put("rsrc", lrsrc);
Map<String, String> env = new HashMap<String, String>();
env.put("somevar", "someval");
List<String> containerCmds = new ArrayList<String>();
containerCmds.add("somecmd");
containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice",
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
ByteBuffer containerTokens = ByteBuffer
.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, env, containerCmds,
serviceData, containerTokens, acls);
Resource containerRsrc = Resource.newInstance(1357, 3);
ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
Priority.newInstance(7), 13579);
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
"tokenservice");
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
containerToken);
stateStore.storeContainer(containerId, 0, 0, containerReq);
return containerReq;
}
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {
return createNewMasterKey().getMasterKey();
}
}
private static class ContainerTokenKeyGeneratorForTest extends
BaseContainerTokenSecretManager {
public ContainerTokenKeyGeneratorForTest(Configuration conf) {
super(conf);
}
public MasterKey generateKey() {
return createNewMasterKey().getMasterKey();
}
}
}