blob: 74968746bc1366fa9ff6c9e2a2e98029badd47d0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import static org.assertj.core.api.Assertions.assertThat;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestNMLeveldbStateStoreService {
private static final File TMP_DIR = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestNMLeveldbStateStoreService.class.getName());
YarnConfiguration conf;
NMLeveldbStateStoreService stateStore;
@Before
public void setup() throws IOException {
FileUtil.fullyDelete(TMP_DIR);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.NM_RECOVERY_DIR, TMP_DIR.toString());
restartStateStore();
}
@After
public void cleanup() throws IOException {
if (stateStore != null) {
stateStore.close();
}
FileUtil.fullyDelete(TMP_DIR);
}
private List<RecoveredContainerState> loadContainersState(
RecoveryIterator<RecoveredContainerState> it) throws IOException {
List<RecoveredContainerState> containers =
new ArrayList<RecoveredContainerState>();
while (it.hasNext()) {
RecoveredContainerState rcs = it.next();
containers.add(rcs);
}
return containers;
}
private List<ContainerManagerApplicationProto> loadApplicationProtos(
RecoveryIterator<ContainerManagerApplicationProto> it)
throws IOException {
List<ContainerManagerApplicationProto> applicationProtos =
new ArrayList<ContainerManagerApplicationProto>();
while (it.hasNext()) {
applicationProtos.add(it.next());
}
return applicationProtos;
}
private List<DeletionServiceDeleteTaskProto> loadDeletionTaskProtos(
RecoveryIterator<DeletionServiceDeleteTaskProto> it) throws IOException {
List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
new ArrayList<DeletionServiceDeleteTaskProto>();
while (it.hasNext()) {
deleteTaskProtos.add(it.next());
}
return deleteTaskProtos;
}
private Map<String, RecoveredUserResources> loadUserResources(
RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it)
throws IOException {
Map<String, RecoveredUserResources> userResources =
new HashMap<String, RecoveredUserResources>();
while (it.hasNext()) {
Map.Entry<String, RecoveredUserResources> entry = it.next();
userResources.put(entry.getKey(), entry.getValue());
}
return userResources;
}
private Map<ApplicationAttemptId, MasterKey> loadNMTokens(
RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it)
throws IOException {
Map<ApplicationAttemptId, MasterKey> nmTokens =
new HashMap<ApplicationAttemptId, MasterKey>();
while (it.hasNext()) {
Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
nmTokens.put(entry.getKey(), entry.getValue());
}
return nmTokens;
}
private Map<ContainerId, Long> loadContainerTokens(
RecoveryIterator<Map.Entry<ContainerId, Long>> it) throws IOException {
Map<ContainerId, Long> containerTokens =
new HashMap<ContainerId, Long>();
while (it.hasNext()) {
Map.Entry<ContainerId, Long> entry = it.next();
containerTokens.put(entry.getKey(), entry.getValue());
}
return containerTokens;
}
private List<LocalizedResourceProto> loadCompletedResources(
RecoveryIterator<LocalizedResourceProto> it) throws IOException {
List<LocalizedResourceProto> completedResources =
new ArrayList<LocalizedResourceProto>();
while (it != null && it.hasNext()) {
completedResources.add(it.next());
}
return completedResources;
}
private Map<LocalResourceProto, Path> loadStartedResources(
RecoveryIterator <Map.Entry<LocalResourceProto, Path>> it)
throws IOException {
Map<LocalResourceProto, Path> startedResources =
new HashMap<LocalResourceProto, Path>();
while (it != null &&it.hasNext()) {
Map.Entry<LocalResourceProto, Path> entry = it.next();
startedResources.put(entry.getKey(), entry.getValue());
}
return startedResources;
}
private void restartStateStore() throws IOException {
// need to close so leveldb releases database lock
if (stateStore != null) {
stateStore.close();
}
stateStore = new NMLeveldbStateStoreService();
stateStore.init(conf);
stateStore.start();
}
private void verifyEmptyState() throws IOException {
RecoveredLocalizationState state = stateStore.loadLocalizationState();
assertNotNull(state);
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertNotNull(pubts);
assertTrue(loadCompletedResources(pubts.getCompletedResourcesIterator())
.isEmpty());
assertTrue(loadStartedResources(pubts.getStartedResourcesIterator())
.isEmpty());
assertTrue(loadUserResources(state.getIterator()).isEmpty());
}
@Test
public void testIsNewlyCreated() throws IOException {
assertTrue(stateStore.isNewlyCreated());
restartStateStore();
assertFalse(stateStore.isNewlyCreated());
}
@Test
public void testEmptyState() throws IOException {
assertTrue(stateStore.canRecover());
verifyEmptyState();
}
@Test
public void testCheckVersion() throws IOException {
// default version
Version defaultVersion = stateStore.getCurrentVersion();
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
// compatible version
Version compatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2);
stateStore.storeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
restartStateStore();
// overwrite the compatible version
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
// incompatible version
Version incompatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion());
stateStore.storeVersion(incompatibleVersion);
try {
restartStateStore();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for NM state:"));
}
}
@Test
public void testApplicationStorage() throws IOException {
// test empty when no state
RecoveredApplicationsState state = stateStore.loadApplicationsState();
List<ContainerManagerApplicationProto> apps =
loadApplicationProtos(state.getIterator());
assertTrue(apps.isEmpty());
// store an application and verify recovered
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId1).getProto());
builder.setUser("user1");
ContainerManagerApplicationProto appProto1 = builder.build();
stateStore.storeApplication(appId1, appProto1);
restartStateStore();
state = stateStore.loadApplicationsState();
apps = loadApplicationProtos(state.getIterator());
assertEquals(1, apps.size());
assertEquals(appProto1, apps.get(0));
// add a new app
final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
builder = ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId2).getProto());
builder.setUser("user2");
ContainerManagerApplicationProto appProto2 = builder.build();
stateStore.storeApplication(appId2, appProto2);
restartStateStore();
state = stateStore.loadApplicationsState();
apps = loadApplicationProtos(state.getIterator());
assertEquals(2, apps.size());
assertTrue(apps.contains(appProto1));
assertTrue(apps.contains(appProto2));
// test removing an application
stateStore.removeApplication(appId2);
restartStateStore();
state = stateStore.loadApplicationsState();
apps = loadApplicationProtos(state.getIterator());
assertEquals(1, apps.size());
assertEquals(appProto1, apps.get(0));
}
@Test
public void testContainerStorageWhenContainerIsRequested()
throws IOException {
final ContainerStateConstructParams containerParams =
storeContainerInStateStore();
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
final RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(containerParams.getContainerStartTime().longValue(),
rcs.getStartTime());
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerParams.getContainerRequest(), rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerParams.getContainerResource(), rcs.getCapability());
}
@Test
public void testContainerStorageWhenContainerIsQueued()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
StartContainerRequest containerReq = containerParams.getContainerRequest();
Resource containerResource = containerParams.getContainerResource();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerResource, rcs.getCapability());
}
@Test
public void testContainerStorageWhenContainerIsLaunched()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
StartContainerRequest containerReq = containerParams.getContainerRequest();
Resource containerResource = containerParams.getContainerResource();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
StringBuilder diags = launchContainerWithDiagnostics(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics());
assertEquals(containerResource, rcs.getCapability());
}
@Test
public void testContainerStorageWhenContainerIsPaused()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
StartContainerRequest containerReq = containerParams.getContainerRequest();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
stateStore.storeContainerPaused(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
// Resume the container
stateStore.removeContainerPaused(containerId);
restartStateStore();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
}
@Test
public void testContainerStorageWhenContainerSizeIncreased()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
launchContainerWithDiagnostics(containerId);
increaseContainerSize(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(Resource.newInstance(2468, 4), rcs.getCapability());
}
@Test
public void testContainerStorageWhenContainerMarkedAsKilled()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
StringBuilder diags = launchContainerWithDiagnostics(containerId);
ContainerTokenIdentifier updateTokenIdentifier =
increaseContainerSize(containerId);
markContainerAsKilled(containerId, diags);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertTrue(rcs.getKilled());
ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils
.newContainerTokenIdentifier(rcs.getStartRequest()
.getContainerToken());
assertEquals(updateTokenIdentifier, tokenReadFromRequest);
assertEquals(diags.toString(), rcs.getDiagnostics());
}
@Test
public void testContainerStorageWhenContainerCompleted()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
stateStore.storeContainerQueued(containerId);
StringBuilder diags = launchContainerWithDiagnostics(containerId);
markContainerAsKilled(containerId, diags);
// add yet more diags, mark container completed
diags.append("some final diags");
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerCompleted(containerId, 21);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
assertEquals(21, rcs.getExitCode());
assertTrue(rcs.getKilled());
assertEquals(diags.toString(), rcs.getDiagnostics());
}
@Test
public void testContainerStorage() throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
// remaining retry attempts, work dir and log dir are stored
stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
stateStore.storeContainerLogDir(containerId, "/test/logdir");
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(6, rcs.getRemainingRetryAttempts());
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
validateRetryAttempts(containerId);
}
@Test
public void testContainerStorageWhenContainerRemoved()
throws IOException {
ContainerStateConstructParams containerParams =
storeContainerInStateStore();
ContainerId containerId = containerParams.getContainerId();
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
// recover again to check remove clears all containers
restartStateStore();
NMStateStoreService nmStoreSpy = spy(stateStore);
loadContainersState(nmStoreSpy.getContainerStateIterator());
verify(nmStoreSpy, times(0)).removeContainer(any(ContainerId.class));
}
private ContainerStateConstructParams storeContainerInStateStore()
throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
// create a container request
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
Resource containerResource = Resource.newInstance(1024, 2);
StartContainerRequest containerReq =
createContainerRequest(containerId, containerResource);
long anyContainerStartTime = 1573155078494L;
stateStore.storeContainer(containerId, 0, anyContainerStartTime,
containerReq);
// verify the container version key is not stored for new containers
DB db = stateStore.getDB();
assertNull("version key present for new container", db.get(bytes(
stateStore.getContainerVersionKey(containerId.toString()))));
return new ContainerStateConstructParams()
.setContainerRequest(containerReq)
.setContainerResource(containerResource)
.setContainerStartTime(anyContainerStartTime)
.setAppAttemptId(appAttemptId)
.setContainerId(containerId);
}
private static class ContainerStateConstructParams {
private StartContainerRequest containerRequest;
private Resource containerResource;
private Long containerStartTime;
private ApplicationAttemptId appAttemptId;
private ContainerId containerId;
public ApplicationAttemptId getAppAttemptId() {
return appAttemptId;
}
public ContainerStateConstructParams setAppAttemptId(ApplicationAttemptId
theAppAttemptId) {
this.appAttemptId = theAppAttemptId;
return this;
}
public ContainerId getContainerId() {
return containerId;
}
public ContainerStateConstructParams setContainerId(ContainerId
theContainerId) {
this.containerId = theContainerId;
return this;
}
public StartContainerRequest getContainerRequest() {
return containerRequest;
}
public ContainerStateConstructParams setContainerRequest(
StartContainerRequest theContainerRequest) {
this.containerRequest = theContainerRequest;
return this;
}
public Resource getContainerResource() {
return containerResource;
}
public ContainerStateConstructParams setContainerResource(
Resource theContainerResource) {
this.containerResource = theContainerResource;
return this;
}
public Long getContainerStartTime() {
return containerStartTime;
}
public ContainerStateConstructParams setContainerStartTime(
Long theContainerStartTime) {
this.containerStartTime = theContainerStartTime;
return this;
}
}
private void markContainerAsKilled(ContainerId containerId,
StringBuilder diags) throws IOException {
// mark the container killed, add some more diags
diags.append("some more diags for container");
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerKilled(containerId);
}
private ContainerTokenIdentifier increaseContainerSize(
ContainerId containerId) throws IOException {
ContainerTokenIdentifier updateTokenIdentifier =
new ContainerTokenIdentifier(containerId, "host", "user",
Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
Priority.newInstance(7), 13579);
stateStore
.storeContainerUpdateToken(containerId, updateTokenIdentifier);
return updateTokenIdentifier;
}
private StringBuilder launchContainerWithDiagnostics(ContainerId containerId)
throws IOException {
StringBuilder diags = new StringBuilder();
stateStore.storeContainerLaunched(containerId);
diags.append("some diags for container");
stateStore.storeContainerDiagnostics(containerId, diags);
return diags;
}
private void storeNewContainerRecordWithoutStartContainerRequest(
ApplicationAttemptId appAttemptId) throws IOException {
// store a new container record without StartContainerRequest
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
stateStore.storeContainerLaunched(containerId1);
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
// check whether the new container record is discarded
assertEquals(1, recoveredContainers.size());
}
private void validateRetryAttempts(ContainerId containerId)
throws IOException {
// store finishTimeForRetryAttempts
List<Long> finishTimeForRetryAttempts = Arrays.asList(1462700529039L,
1462700529050L, 1462700529120L);
stateStore.storeContainerRestartTimes(containerId,
finishTimeForRetryAttempts);
restartStateStore();
RecoveredContainerState rcs =
loadContainersState(stateStore.getContainerStateIterator()).get(0);
List<Long> recoveredRestartTimes = rcs.getRestartTimes();
assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0));
assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1));
assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2));
}
private StartContainerRequest createContainerRequest(
ContainerId containerId, Resource res) {
return createContainerRequestInternal(containerId, res);
}
private StartContainerRequest createContainerRequestInternal(ContainerId
containerId, Resource res) {
LocalResource lrsrc = LocalResource.newInstance(
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
1234567890L);
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put("rsrc", lrsrc);
Map<String, String> env = new HashMap<String, String>();
env.put("somevar", "someval");
List<String> containerCmds = new ArrayList<String>();
containerCmds.add("somecmd");
containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice",
ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
ByteBuffer containerTokens =
ByteBuffer.wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, env, containerCmds, serviceData, containerTokens,
acls);
Resource containerRsrc = Resource.newInstance(1357, 3);
if (res != null) {
containerRsrc = res;
}
ContainerTokenIdentifier containerTokenId =
new ContainerTokenIdentifier(containerId, "host", "user",
containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
13579);
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
"tokenservice");
return StartContainerRequest.newInstance(clc, containerToken);
}
@Test
public void testLocalTrackerStateIterator() throws IOException {
String user1 = "somebody";
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
ApplicationId appId2 = ApplicationId.newInstance(2, 2);
String user2 = "someone";
ApplicationId appId3 = ApplicationId.newInstance(3, 3);
// start and finish local resource for applications
Path appRsrcPath1 = new Path("hdfs://some/app/resource1");
LocalResourcePBImpl rsrcPb1 = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath1),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto1 = rsrcPb1.getProto();
Path appRsrcLocalPath1 = new Path("/some/local/dir/for/apprsrc1");
Path appRsrcPath2 = new Path("hdfs://some/app/resource2");
LocalResourcePBImpl rsrcPb2 = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath2),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto2 = rsrcPb2.getProto();
Path appRsrcLocalPath2 = new Path("/some/local/dir/for/apprsrc2");
Path appRsrcPath3 = new Path("hdfs://some/app/resource3");
LocalResourcePBImpl rsrcPb3 = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath3),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto3 = rsrcPb3.getProto();
Path appRsrcLocalPath3 = new Path("/some/local/dir/for/apprsrc2");
stateStore.startResourceLocalization(user1, appId1, appRsrcProto1,
appRsrcLocalPath1);
stateStore.startResourceLocalization(user1, appId2, appRsrcProto2,
appRsrcLocalPath2);
stateStore.startResourceLocalization(user2, appId3, appRsrcProto3,
appRsrcLocalPath3);
LocalizedResourceProto appLocalizedProto1 =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto1)
.setLocalPath(appRsrcLocalPath1.toString())
.setSize(1234567L)
.build();
LocalizedResourceProto appLocalizedProto2 =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto2)
.setLocalPath(appRsrcLocalPath2.toString())
.setSize(1234567L)
.build();
LocalizedResourceProto appLocalizedProto3 =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto3)
.setLocalPath(appRsrcLocalPath3.toString())
.setSize(1234567L)
.build();
stateStore.finishResourceLocalization(user1, appId1, appLocalizedProto1);
stateStore.finishResourceLocalization(user1, appId2, appLocalizedProto2);
stateStore.finishResourceLocalization(user2, appId3, appLocalizedProto3);
List<LocalizedResourceProto> completedResources =
new ArrayList<LocalizedResourceProto>();
Map<LocalResourceProto, Path> startedResources =
new HashMap<LocalResourceProto, Path>();
// restart and verify two users exist and two apps completed for user1.
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(2, userResources.size());
RecoveredUserResources uResource = userResources.get(user1);
assertEquals(2, uResource.getAppTrackerStates().size());
LocalResourceTrackerState app1ts =
uResource.getAppTrackerStates().get(appId1);
assertNotNull(app1ts);
completedResources = loadCompletedResources(
app1ts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
app1ts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto1,
completedResources.iterator().next());
LocalResourceTrackerState app2ts =
uResource.getAppTrackerStates().get(appId2);
assertNotNull(app2ts);
completedResources = loadCompletedResources(
app2ts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
app2ts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto2,
completedResources.iterator().next());
}
@Test
public void testStartResourceLocalizationForApplicationResource()
throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
// start a local resource for an application
Path appRsrcPath = new Path("hdfs://some/app/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto = rsrcPb.getProto();
Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
List<LocalizedResourceProto> completedResources =
new ArrayList<LocalizedResourceProto>();
Map<LocalResourceProto, Path> startedResources =
new HashMap<LocalResourceProto, Path>();
// restart and verify only app resource is marked in-progress
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertTrue(startedResources.isEmpty());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
completedResources = loadCompletedResources(
privts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
privts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertTrue(startedResources.isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
completedResources = loadCompletedResources(
appts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
appts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(1, startedResources.size());
assertEquals(appRsrcLocalPath,
startedResources.get(appRsrcProto));
}
@Test
public void testStartResourceLocalizationForPublicResources()
throws IOException {
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
stateStore.startResourceLocalization(null, null, pubRsrcProto1,
pubRsrcLocalPath1);
Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(pubRsrcPath2),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
// restart and verify resources are marked in-progress
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
List<LocalizedResourceProto> completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator());
Map<LocalResourceProto, Path> startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(2, startedResources.size());
assertEquals(pubRsrcLocalPath1,
startedResources.get(pubRsrcProto1));
assertEquals(pubRsrcLocalPath2,
startedResources.get(pubRsrcProto2));
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(0, userResources.size());
}
@Test
public void testStartResourceLocalizationForPrivateResource()
throws IOException {
Path privRsrcPath = new Path("hdfs://some/private/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
String user = "somebody";
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
// restart and verify resources are marked in-progress
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
List<LocalizedResourceProto> completedResources = loadCompletedResources(
privts.getCompletedResourcesIterator());
Map<LocalResourceProto, Path> startedResources = loadStartedResources(
privts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(1, startedResources.size());
assertEquals(privRsrcLocalPath,
startedResources.get(privRsrcProto));
assertEquals(0, rur.getAppTrackerStates().size());
}
@Test
public void testFinishResourceLocalizationForApplicationResource()
throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
// start and finish a local resource for an application
Path appRsrcPath = new Path("hdfs://some/app/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto = rsrcPb.getProto();
Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
LocalizedResourceProto appLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto)
.setLocalPath(appRsrcLocalPath.toString())
.setSize(1234567L)
.build();
stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
List<LocalizedResourceProto> completedResources =
new ArrayList<LocalizedResourceProto>();
Map<LocalResourceProto, Path> startedResources =
new HashMap<LocalResourceProto, Path>();
// restart and verify only app resource is completed
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertTrue(startedResources.isEmpty());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
completedResources = loadCompletedResources(
privts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
privts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertTrue(startedResources.isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
completedResources = loadCompletedResources(
appts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
appts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto,
completedResources.iterator().next());
}
@Test
public void testFinishResourceLocalizationForPublicResources()
throws IOException {
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
stateStore.startResourceLocalization(null, null, pubRsrcProto1,
pubRsrcLocalPath1);
Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(pubRsrcPath2),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
// finish some of the resources
LocalizedResourceProto pubLocalizedProto1 =
LocalizedResourceProto.newBuilder()
.setResource(pubRsrcProto1)
.setLocalPath(pubRsrcLocalPath1.toString())
.setSize(pubRsrcProto1.getSize())
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
// restart and verify state
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
List<LocalizedResourceProto> completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator());
Map<LocalResourceProto, Path> startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertEquals(1, completedResources.size());
assertEquals(pubLocalizedProto1,
completedResources.iterator().next());
assertEquals(1, startedResources.size());
assertEquals(pubRsrcLocalPath2,
startedResources.get(pubRsrcProto2));
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(0, userResources.size());
}
@Test
public void testFinishResourceLocalizationForPrivateResource()
throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
Path privRsrcPath = new Path("hdfs://some/private/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
LocalizedResourceProto privLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(privRsrcProto)
.setLocalPath(privRsrcLocalPath.toString())
.setSize(privRsrcProto.getSize())
.build();
stateStore.finishResourceLocalization(user, null, privLocalizedProto);
// restart and verify state
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
List<LocalizedResourceProto> completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator());
Map<LocalResourceProto, Path> startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertEquals(0, completedResources.size());
assertEquals(0, startedResources.size());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
completedResources = loadCompletedResources(
privts.getCompletedResourcesIterator());
startedResources = loadStartedResources(
privts.getStartedResourcesIterator());
assertEquals(1, completedResources.size());
assertEquals(privLocalizedProto,
completedResources.iterator().next());
assertTrue(startedResources.isEmpty());
assertEquals(0, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNull(appts);
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
}
@Test
public void testRemoveLocalizedResourceForApplicationResource()
throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
// go through the complete lifecycle for an application local resource
Path appRsrcPath = new Path("hdfs://some/app/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl)
LocalResource.newInstance(
URL.fromPath(appRsrcPath),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
123L, 456L);
LocalResourceProto appRsrcProto = rsrcPb.getProto();
Path appRsrcLocalPath = new Path("/some/local/dir/for/apprsrc");
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
LocalizedResourceProto appLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(appRsrcProto)
.setLocalPath(appRsrcLocalPath.toString())
.setSize(1234567L)
.build();
stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath);
restartStateStore();
verifyEmptyState();
// remove an app resource that didn't finish
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
stateStore.removeLocalizedResource(user, appId, appRsrcLocalPath);
restartStateStore();
verifyEmptyState();
}
@Test
public void testRemoveLocalizedResourceForPublicResources()
throws IOException {
// add public resources and remove some
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto1 = rsrcPb.getProto();
Path pubRsrcLocalPath1 = new Path("/some/local/dir/for/pubrsrc1");
stateStore.startResourceLocalization(null, null, pubRsrcProto1,
pubRsrcLocalPath1);
LocalizedResourceProto pubLocalizedProto1 =
LocalizedResourceProto.newBuilder()
.setResource(pubRsrcProto1)
.setLocalPath(pubRsrcLocalPath1.toString())
.setSize(789L)
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
Path pubRsrcPath2 = new Path("hdfs://some/public/resource2");
rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
URL.fromPath(pubRsrcPath2),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
LocalResourceProto pubRsrcProto2 = rsrcPb.getProto();
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
LocalizedResourceProto pubLocalizedProto2 =
LocalizedResourceProto.newBuilder()
.setResource(pubRsrcProto2)
.setLocalPath(pubRsrcLocalPath2.toString())
.setSize(7654321L)
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto2);
stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2);
// restart and verify state
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
List<LocalizedResourceProto> completedResources =
loadCompletedResources(pubts.getCompletedResourcesIterator());
Map<LocalResourceProto, Path> startedResources =
loadStartedResources(pubts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
assertEquals(pubLocalizedProto1,
completedResources.iterator().next());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertTrue(userResources.isEmpty());
}
@Test
public void testRemoveLocalizedResourceForPrivateResource()
throws IOException {
String user = "somebody";
Path privRsrcPath = new Path("hdfs://some/private/resource");
LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
.newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
LocalResourceProto privRsrcProto = rsrcPb.getProto();
Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
stateStore.startResourceLocalization(user, null, privRsrcProto,
privRsrcLocalPath);
stateStore.removeLocalizedResource(user, null, privRsrcLocalPath);
// restart and verify state
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
List<LocalizedResourceProto> completedResources =
loadCompletedResources(pubts.getCompletedResourcesIterator());
Map<LocalResourceProto, Path> startedResources =
loadStartedResources(pubts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
assertEquals(0, completedResources.size());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertTrue(userResources.isEmpty());
}
@Test
public void testDeletionTaskStorage() throws IOException {
// test empty when no state
RecoveredDeletionServiceState state =
stateStore.loadDeletionServiceState();
List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
loadDeletionTaskProtos(state.getIterator());
assertTrue(deleteTaskProtos.isEmpty());
// store a deletion task and verify recovered
DeletionServiceDeleteTaskProto proto =
DeletionServiceDeleteTaskProto.newBuilder()
.setId(7)
.setUser("someuser")
.setSubdir("some/subdir")
.addBasedirs("some/dir/path")
.addBasedirs("some/other/dir/path")
.setDeletionTime(123456L)
.addSuccessorIds(8)
.addSuccessorIds(9)
.build();
stateStore.storeDeletionTask(proto.getId(), proto);
restartStateStore();
state = stateStore.loadDeletionServiceState();
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
assertEquals(1, deleteTaskProtos.size());
assertEquals(proto, deleteTaskProtos.get(0));
// store another deletion task
DeletionServiceDeleteTaskProto proto2 =
DeletionServiceDeleteTaskProto.newBuilder()
.setId(8)
.setUser("user2")
.setSubdir("subdir2")
.setDeletionTime(789L)
.build();
stateStore.storeDeletionTask(proto2.getId(), proto2);
restartStateStore();
state = stateStore.loadDeletionServiceState();
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
assertEquals(2, deleteTaskProtos.size());
assertTrue(deleteTaskProtos.contains(proto));
assertTrue(deleteTaskProtos.contains(proto2));
// delete a task and verify gone after recovery
stateStore.removeDeletionTask(proto2.getId());
restartStateStore();
state = stateStore.loadDeletionServiceState();
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
assertEquals(1, deleteTaskProtos.size());
assertEquals(proto, deleteTaskProtos.get(0));
// delete the last task and verify none left
stateStore.removeDeletionTask(proto.getId());
restartStateStore();
state = stateStore.loadDeletionServiceState();
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
assertTrue(deleteTaskProtos.isEmpty()); }
@Test
public void testNMTokenStorage() throws IOException {
// test empty when no state
RecoveredNMTokensState state = stateStore.loadNMTokensState();
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
loadNMTokens(state.getIterator());
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(loadedAppKeys.isEmpty());
// store a master key and verify recovered
NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
MasterKey currentKey = secretMgr.generateKey();
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokensState();
loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(loadedAppKeys.isEmpty());
// store a previous key and verify recovered
MasterKey prevKey = secretMgr.generateKey();
stateStore.storeNMTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadNMTokensState();
loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertTrue(loadedAppKeys.isEmpty());
// store a few application keys and verify recovered
ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
MasterKey attemptKey1 = secretMgr.generateKey();
stateStore.storeNMTokenApplicationMasterKey(attempt1, attemptKey1);
ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(2, 3), 4);
MasterKey attemptKey2 = secretMgr.generateKey();
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
restartStateStore();
state = stateStore.loadNMTokensState();
loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertEquals(2, loadedAppKeys.size());
assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
// add/update/remove keys and verify recovered
ApplicationAttemptId attempt3 = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(5, 6), 7);
MasterKey attemptKey3 = secretMgr.generateKey();
stateStore.storeNMTokenApplicationMasterKey(attempt3, attemptKey3);
stateStore.removeNMTokenApplicationMasterKey(attempt1);
attemptKey2 = prevKey;
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
prevKey = currentKey;
stateStore.storeNMTokenPreviousMasterKey(prevKey);
currentKey = secretMgr.generateKey();
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokensState();
loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertEquals(2, loadedAppKeys.size());
assertNull(loadedAppKeys.get(attempt1));
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
}
@Test
public void testContainerTokenStorage() throws IOException {
// test empty when no state
RecoveredContainerTokensState state =
stateStore.loadContainerTokensState();
Map<ContainerId, Long> loadedActiveTokens = loadContainerTokens(state.it);
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(loadedActiveTokens.isEmpty());
// store a master key and verify recovered
ContainerTokenKeyGeneratorForTest keygen =
new ContainerTokenKeyGeneratorForTest(new YarnConfiguration());
MasterKey currentKey = keygen.generateKey();
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(loadedActiveTokens.isEmpty());
// store a previous key and verify recovered
MasterKey prevKey = keygen.generateKey();
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertTrue(loadedActiveTokens.isEmpty());
// store a few container tokens and verify recovered
ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
Long expTime1 = 1234567890L;
ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
Long expTime2 = 9876543210L;
stateStore.storeContainerToken(cid1, expTime1);
stateStore.storeContainerToken(cid2, expTime2);
restartStateStore();
state = stateStore.loadContainerTokensState();
loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertEquals(2, loadedActiveTokens.size());
assertEquals(expTime1, loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
// add/update/remove tokens and verify recovered
ContainerId cid3 = BuilderUtils.newContainerId(3, 3, 3, 3);
Long expTime3 = 135798642L;
stateStore.storeContainerToken(cid3, expTime3);
stateStore.removeContainerToken(cid1);
expTime2 += 246897531L;
stateStore.storeContainerToken(cid2, expTime2);
prevKey = currentKey;
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
currentKey = keygen.generateKey();
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertEquals(2, loadedActiveTokens.size());
assertNull(loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
assertEquals(expTime3, loadedActiveTokens.get(cid3));
}
@Test
public void testLogDeleterStorage() throws IOException {
// test empty when no state
RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
assertTrue(state.getLogDeleterMap().isEmpty());
// store log deleter state
final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
LogDeleterProto proto1 = LogDeleterProto.newBuilder()
.setUser("user1")
.setDeletionTime(1234)
.build();
stateStore.storeLogDeleter(appId1, proto1);
// restart state store and verify recovered
restartStateStore();
state = stateStore.loadLogDeleterState();
assertEquals(1, state.getLogDeleterMap().size());
assertEquals(proto1, state.getLogDeleterMap().get(appId1));
// store another log deleter
final ApplicationId appId2 = ApplicationId.newInstance(2, 2);
LogDeleterProto proto2 = LogDeleterProto.newBuilder()
.setUser("user2")
.setDeletionTime(5678)
.build();
stateStore.storeLogDeleter(appId2, proto2);
// restart state store and verify recovered
restartStateStore();
state = stateStore.loadLogDeleterState();
assertEquals(2, state.getLogDeleterMap().size());
assertEquals(proto1, state.getLogDeleterMap().get(appId1));
assertEquals(proto2, state.getLogDeleterMap().get(appId2));
// remove a deleter and verify removed after restart and recovery
stateStore.removeLogDeleter(appId1);
restartStateStore();
state = stateStore.loadLogDeleterState();
assertEquals(1, state.getLogDeleterMap().size());
assertEquals(proto2, state.getLogDeleterMap().get(appId2));
// remove last deleter and verify empty after restart and recovery
stateStore.removeLogDeleter(appId2);
restartStateStore();
state = stateStore.loadLogDeleterState();
assertTrue(state.getLogDeleterMap().isEmpty());
}
@Test
public void testCompactionCycle() throws IOException {
final DB mockdb = mock(DB.class);
conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1);
NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() {
@Override
protected void checkVersion() {}
@Override
protected DB openDatabase(Configuration conf) {
return mockdb;
}
};
store.init(conf);
store.start();
verify(mockdb, timeout(10000).atLeastOnce()).compactRange(
(byte[]) isNull(), (byte[]) isNull());
store.close();
}
@Test
public void testUnexpectedKeyDoesntThrowException() throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
StartContainerRequest startContainerRequest = storeMockContainer(
containerId);
// add a invalid key
byte[] invalidKey = ("ContainerManager/containers/"
+ containerId.toString() + "/invalidKey1234").getBytes();
stateStore.getDB().put(invalidKey, new byte[1]);
restartStateStore();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(startContainerRequest, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
// assert unknown keys are cleaned up finally
assertNotNull(stateStore.getDB().get(invalidKey));
stateStore.removeContainer(containerId);
assertNull(stateStore.getDB().get(invalidKey));
}
@Test
public void testAMRMProxyStorage() throws IOException {
RecoveredAMRMProxyState state = stateStore.loadAMRMProxyState();
assertThat(state.getCurrentMasterKey()).isNull();
assertThat(state.getNextMasterKey()).isNull();
assertThat(state.getAppContexts()).isEmpty();
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
ApplicationId appId2 = ApplicationId.newInstance(1, 2);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId1, 1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId2, 2);
String key1 = "key1";
String key2 = "key2";
byte[] data1 = "data1".getBytes();
byte[] data2 = "data2".getBytes();
AMRMProxyTokenSecretManager secretManager =
new AMRMProxyTokenSecretManager(stateStore);
secretManager.init(conf);
// Generate currentMasterKey
secretManager.start();
try {
// Add two applications, each with two data entries
stateStore.storeAMRMProxyAppContextEntry(attemptId1, key1, data1);
stateStore.storeAMRMProxyAppContextEntry(attemptId2, key1, data1);
stateStore.storeAMRMProxyAppContextEntry(attemptId1, key2, data2);
stateStore.storeAMRMProxyAppContextEntry(attemptId2, key2, data2);
// restart state store and verify recovered
restartStateStore();
secretManager.setNMStateStoreService(stateStore);
state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(),
secretManager.getCurrentMasterKeyData().getMasterKey());
assertThat(state.getNextMasterKey()).isNull();
assertThat(state.getAppContexts()).hasSize(2);
// app1
Map<String, byte[]> map = state.getAppContexts().get(attemptId1);
assertNotEquals(map, null);
assertThat(map).hasSize(2);
assertTrue(Arrays.equals(map.get(key1), data1));
assertTrue(Arrays.equals(map.get(key2), data2));
// app2
map = state.getAppContexts().get(attemptId2);
assertNotEquals(map, null);
assertThat(map).hasSize(2);
assertTrue(Arrays.equals(map.get(key1), data1));
assertTrue(Arrays.equals(map.get(key2), data2));
// Generate next master key and remove one entry of app2
secretManager.rollMasterKey();
stateStore.removeAMRMProxyAppContextEntry(attemptId2, key1);
// restart state store and verify recovered
restartStateStore();
secretManager.setNMStateStoreService(stateStore);
state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(),
secretManager.getCurrentMasterKeyData().getMasterKey());
assertEquals(state.getNextMasterKey(),
secretManager.getNextMasterKeyData().getMasterKey());
assertEquals(state.getAppContexts().size(), 2);
// app1
map = state.getAppContexts().get(attemptId1);
assertThat(map).isNotNull();
assertThat(map).hasSize(2);
assertTrue(Arrays.equals(map.get(key1), data1));
assertTrue(Arrays.equals(map.get(key2), data2));
// app2
map = state.getAppContexts().get(attemptId2);
assertThat(map).isNotNull();
assertThat(map).hasSize(1);
assertTrue(Arrays.equals(map.get(key2), data2));
// Activate next master key and remove all entries of app1
secretManager.activateNextMasterKey();
stateStore.removeAMRMProxyAppContext(attemptId1);
// restart state store and verify recovered
restartStateStore();
secretManager.setNMStateStoreService(stateStore);
state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(),
secretManager.getCurrentMasterKeyData().getMasterKey());
assertThat(state.getNextMasterKey()).isNull();
assertThat(state.getAppContexts()).hasSize(1);
// app2 only
map = state.getAppContexts().get(attemptId2);
assertThat(map).isNotNull();
assertThat(map).hasSize(1);
assertTrue(Arrays.equals(map.get(key2), data2));
} finally {
secretManager.stop();
}
}
@Test
public void testStateStoreForResourceMapping() throws IOException {
// test that stateStore is initially empty
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
storeMockContainer(containerId);
Container container = mock(Container.class);
when(container.getContainerId()).thenReturn(containerId);
ResourceMappings resourceMappings = new ResourceMappings();
when(container.getResourceMappings()).thenReturn(resourceMappings);
stateStore.storeAssignedResources(container, "gpu",
Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 2),
new GpuDevice(3, 3)));
// This will overwrite the above
List<Serializable> gpuRes1 = Arrays.asList(
new GpuDevice(1, 1), new GpuDevice(2, 2), new GpuDevice(4, 4));
stateStore.storeAssignedResources(container, "gpu", gpuRes1);
List<Serializable> fpgaRes = Arrays.asList(
new FpgaDevice("testType", 3, 3, "testIPID"),
new FpgaDevice("testType", 4, 4, "testIPID"),
new FpgaDevice("testType", 5, 5, "testIPID"),
new FpgaDevice("testType", 6, 6, "testIPID"));
stateStore.storeAssignedResources(container, "fpga", fpgaRes);
List<Serializable> numaRes = Arrays.asList(
new NumaResourceAllocation("testmemNodeId", 2048, "testCpuNodeId", 10));
stateStore.storeAssignedResources(container, "numa", numaRes);
restartStateStore();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
List<Serializable> resources = rcs.getResourceMappings()
.getAssignedResources("gpu");
Assert.assertEquals(gpuRes1, resources);
Assert.assertEquals(gpuRes1, resourceMappings.getAssignedResources("gpu"));
resources = rcs.getResourceMappings().getAssignedResources("fpga");
Assert.assertEquals(fpgaRes, resources);
Assert.assertEquals(fpgaRes, resourceMappings.getAssignedResources("fpga"));
resources = rcs.getResourceMappings().getAssignedResources("numa");
Assert.assertEquals(numaRes, resources);
Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa"));
// test removing numa resources from state store
stateStore.releaseAssignedResources(containerId, "numa");
recoveredContainers = loadContainersState(stateStore.getContainerStateIterator());
resourceMappings = recoveredContainers.get(0).getResourceMappings();
assertTrue(resourceMappings.getAssignedResources("numa").isEmpty());
// testing calling deletion of non-existing key doesn't break anything
try {
stateStore.releaseAssignedResources(containerId, "numa");
}catch (RuntimeException e){
Assert.fail("Should not throw exception while deleting non existing key from statestore");
}
}
@Test
public void testStateStoreNodeHealth() throws IOException {
// keep the working DB clean, break a temp DB
DB keepDB = stateStore.getDB();
DB myMocked = mock(DB.class);
stateStore.setDB(myMocked);
ApplicationId appId = ApplicationId.newInstance(1234, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
DBException toThrow = new DBException();
Mockito.doThrow(toThrow).when(myMocked).
put(any(byte[].class), any(byte[].class));
// write some data
try {
// chosen a simple method could be any of the "void" methods
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
stateStore.storeContainerKilled(containerId);
} catch (IOException ioErr) {
// Cause should be wrapped DBException
assertTrue(ioErr.getCause() instanceof DBException);
// check the store is marked unhealthy
assertFalse("Statestore should have been unhealthy",
stateStore.isHealthy());
return;
} finally {
// restore the working DB
stateStore.setDB(keepDB);
}
Assert.fail("Expected exception not thrown");
}
@Test
public void testEmptyRestartTimes() throws IOException {
List<Long> restartTimes = new ArrayList<>();
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
storeMockContainer(containerId);
stateStore.storeContainerRestartTimes(containerId,
restartTimes);
restartStateStore();
RecoveredContainerState rcs =
loadContainersState(stateStore.getContainerStateIterator()).get(0);
List<Long> recoveredRestartTimes = rcs.getRestartTimes();
assertTrue(recoveredRestartTimes.isEmpty());
}
private StartContainerRequest storeMockContainer(ContainerId containerId)
throws IOException {
// create a container request
LocalResource lrsrc = LocalResource.newInstance(
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
1234567890L);
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put("rsrc", lrsrc);
Map<String, String> env = new HashMap<String, String>();
env.put("somevar", "someval");
List<String> containerCmds = new ArrayList<String>();
containerCmds.add("somecmd");
containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice",
ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
ByteBuffer containerTokens = ByteBuffer
.wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, env, containerCmds,
serviceData, containerTokens, acls);
Resource containerRsrc = Resource.newInstance(1357, 3);
ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
Priority.newInstance(7), 13579);
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
"tokenservice");
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
containerToken);
stateStore.storeContainer(containerId, 0, 0, containerReq);
return containerReq;
}
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {
return createNewMasterKey().getMasterKey();
}
}
private static class ContainerTokenKeyGeneratorForTest extends
BaseContainerTokenSecretManager {
public ContainerTokenKeyGeneratorForTest(Configuration conf) {
super(conf);
}
public MasterKey generateKey() {
return createNewMasterKey().getMasterKey();
}
}
}