blob: 7ca2c91e3c7ffb0ceba2e474a23545263106e125 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestResourceLocalizationService {
static final Path basedir =
new Path("target", TestResourceLocalizationService.class.getName());
static Server mockServer;
@BeforeClass
public static void setup() {
mockServer = mock(Server.class);
doReturn(new InetSocketAddress(123)).when(mockServer).getListenerAddress();
}
@Test
public void testLocalizationInit() throws Exception {
final Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(new Configuration());
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = spy(new DeletionService(exec));
delService.init(new Configuration());
delService.start();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
FileContext lfs = FileContext.getFileContext(spylfs, conf);
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
LocalDirsHandlerService diskhandler = new LocalDirsHandlerService();
diskhandler.init(conf);
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
dispatcher.start();
// initialize ResourceLocalizationService
locService.init(conf);
final FsPermission defaultPerm = new FsPermission((short)0755);
// verify directory creation
for (Path p : localDirs) {
p = new Path((new URI(p.toString())).getPath());
Path usercache = new Path(p, ContainerLocalizer.USERCACHE);
verify(spylfs)
.mkdir(eq(usercache),
eq(defaultPerm), eq(true));
Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
verify(spylfs)
.mkdir(eq(publicCache),
eq(defaultPerm), eq(true));
Path nmPriv = new Path(p, ResourceLocalizationService.NM_PRIVATE_DIR);
verify(spylfs).mkdir(eq(nmPriv),
eq(ResourceLocalizationService.NM_PRIVATE_PERM), eq(true));
}
} finally {
dispatcher.stop();
delService.stop();
}
}
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testResourceRelease() throws Exception {
Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher.register(ContainerEventType.class, containerBus);
//Ignore actual localization
EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
DeletionService delService = new DeletionService(exec);
delService.init(null);
delService.start();
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
isA(Configuration.class));
doReturn(lfs).when(spyService)
.getLocalFileContext(isA(Configuration.class));
try {
spyService.init(conf);
spyService.start();
final String user = "user0";
// init application
final Application app = mock(Application.class);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
dispatcher.await();
//Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
LocalResourcesTracker appTracker =
spyService.getLocalResourcesTracker(
LocalResourceVisibility.APPLICATION, user, appId);
LocalResourcesTracker privTracker =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
user, appId);
LocalResourcesTracker pubTracker =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
user, appId);
// init container.
final Container c = getMockContainer(appId, 42);
// init resources
Random r = new Random();
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
// Send localization requests for one resource of each type.
final LocalResource privResource = getPrivateMockedResource(r);
final LocalResourceRequest privReq =
new LocalResourceRequest(privResource);
final LocalResource pubResource = getPublicMockedResource(r);
final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
final LocalResource pubResource2 = getPublicMockedResource(r);
final LocalResourceRequest pubReq2 =
new LocalResourceRequest(pubResource2);
final LocalResource appResource = getAppMockedResource(r);
final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req.put(LocalResourceVisibility.PRIVATE,
Collections.singletonList(privReq));
req.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq));
req.put(LocalResourceVisibility.APPLICATION,
Collections.singletonList(appReq));
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req2.put(LocalResourceVisibility.PRIVATE,
Collections.singletonList(privReq));
req2.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq2));
Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
pubRsrcs.add(pubReq);
pubRsrcs.add(pubReq2);
// Send Request event
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
dispatcher.await();
int privRsrcCount = 0;
for (LocalizedResource lr : privTracker) {
privRsrcCount++;
Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
Assert.assertEquals(privReq, lr.getRequest());
}
Assert.assertEquals(1, privRsrcCount);
int pubRsrcCount = 0;
for (LocalizedResource lr : pubTracker) {
pubRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
pubRsrcs.remove(lr.getRequest());
}
Assert.assertEquals(0, pubRsrcs.size());
Assert.assertEquals(2, pubRsrcCount);
int appRsrcCount = 0;
for (LocalizedResource lr : appTracker) {
appRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
Assert.assertEquals(appReq, lr.getRequest());
}
Assert.assertEquals(1, appRsrcCount);
//Send Cleanup Event
spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
verify(mockLocallilzerTracker)
.cleanupPrivLocalizers("container_314159265358979_0003_01_000042");
req2.remove(LocalResourceVisibility.PRIVATE);
spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
dispatcher.await();
pubRsrcs.add(pubReq);
pubRsrcs.add(pubReq2);
privRsrcCount = 0;
for (LocalizedResource lr : privTracker) {
privRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
Assert.assertEquals(privReq, lr.getRequest());
}
Assert.assertEquals(1, privRsrcCount);
pubRsrcCount = 0;
for (LocalizedResource lr : pubTracker) {
pubRsrcCount++;
Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
pubRsrcs.remove(lr.getRequest());
}
Assert.assertEquals(0, pubRsrcs.size());
Assert.assertEquals(2, pubRsrcCount);
appRsrcCount = 0;
for (LocalizedResource lr : appTracker) {
appRsrcCount++;
Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
Assert.assertEquals(appReq, lr.getRequest());
}
Assert.assertEquals(1, appRsrcCount);
} finally {
dispatcher.stop();
delService.stop();
}
}
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
DeletionService delServiceReal = new DeletionService(exec);
DeletionService delService = spy(delServiceReal);
delService.init(new Configuration());
delService.start();
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
try {
spyService.init(conf);
spyService.start();
// init application
final Application app = mock(Application.class);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn("user0");
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
ArgumentMatcher<ApplicationEvent> matchesAppInit =
new ArgumentMatcher<ApplicationEvent>() {
@Override
public boolean matches(Object o) {
ApplicationEvent evt = (ApplicationEvent) o;
return evt.getType() == ApplicationEventType.APPLICATION_INITED
&& appId == evt.getApplicationID();
}
};
dispatcher.await();
verify(applicationBus).handle(argThat(matchesAppInit));
// init container rsrc, localizer
Random r = new Random();
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
final Container c = getMockContainer(appId, 42);
FSDataOutputStream out =
new FSDataOutputStream(new DataOutputBuffer(), null);
doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
final LocalResource resource = getPrivateMockedResource(r);
final LocalResourceRequest req = new LocalResourceRequest(resource);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
Thread.sleep(1000);
dispatcher.await();
String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerID().toString();
ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
verify(exec).startLocalizer(tokenPathCaptor.capture(),
isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr),
isA(List.class), isA(List.class));
Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer
LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
LocalizerStatus stat = mock(LocalizerStatus.class);
when(stat.getLocalizerId()).thenReturn(ctnrStr);
when(rsrcStat.getResource()).thenReturn(resource);
when(rsrcStat.getLocalSize()).thenReturn(4344L);
URL locPath = getPath("/cache/private/blah");
when(rsrcStat.getLocalPath()).thenReturn(locPath);
when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
when(stat.getResources())
.thenReturn(Collections.<LocalResourceStatus>emptyList())
.thenReturn(Collections.singletonList(rsrcStat))
.thenReturn(Collections.<LocalResourceStatus>emptyList());
// get rsrc
LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(req, new LocalResourceRequest(response.getLocalResource(0)));
// empty rsrc
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(0, response.getAllResources().size());
// get shutdown
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
// verify container notification
ArgumentMatcher<ContainerEvent> matchesContainerLoc =
new ArgumentMatcher<ContainerEvent>() {
@Override
public boolean matches(Object o) {
ContainerEvent evt = (ContainerEvent) o;
return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
&& c.getContainerID() == evt.getContainerID();
}
};
dispatcher.await();
verify(containerBus).handle(argThat(matchesContainerLoc));
// Verify deletion of localization token.
verify(delService).delete((String)isNull(), eq(localizationTokenPath));
} finally {
spyService.stop();
dispatcher.stop();
delService.stop();
}
}
@Test(timeout=20000)
@SuppressWarnings("unchecked") // mocked generics
public void testFailedPublicResource() throws Exception {
Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
DrainDispatcher dispatcher = new DrainDispatcher();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = mock(DeletionService.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
dispatcher.init(conf);
dispatcher.start();
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
isA(Configuration.class));
spyService.init(conf);
spyService.start();
final String user = "user0";
// init application
final Application app = mock(Application.class);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
dispatcher.await();
// init container.
final Container c = getMockContainer(appId, 42);
// init resources
Random r = new Random();
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
// cause chmod to fail after a delay
final CyclicBarrier barrier = new CyclicBarrier(2);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) throws IOException {
try {
barrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
throw new IOException("forced failure");
}
}).when(spylfs)
.setPermission(isA(Path.class), isA(FsPermission.class));
// Queue up two localization requests for the same public resource
final LocalResource pubResource = getPublicMockedResource(r);
final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq));
Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
pubRsrcs.add(pubReq);
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
dispatcher.await();
// allow the chmod to fail now that both requests have been queued
barrier.await();
verify(containerBus, timeout(5000).times(2))
.handle(isA(ContainerResourceFailedEvent.class));
} finally {
dispatcher.stop();
}
}
private static URL getPath(String path) {
URL url = BuilderUtils.newURL("file", null, 0, path);
return url;
}
private static LocalResource getMockedResource(Random r,
LocalResourceVisibility vis) {
String name = Long.toHexString(r.nextLong());
URL url = getPath("/local/PRIVATE/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
return rsrc;
}
private static LocalResource getAppMockedResource(Random r) {
return getMockedResource(r, LocalResourceVisibility.APPLICATION);
}
private static LocalResource getPublicMockedResource(Random r) {
return getMockedResource(r, LocalResourceVisibility.PUBLIC);
}
private static LocalResource getPrivateMockedResource(Random r) {
return getMockedResource(r, LocalResourceVisibility.PRIVATE);
}
private static Container getMockContainer(ApplicationId appId, int id) {
Container c = mock(Container.class);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
when(c.getUser()).thenReturn("user0");
when(c.getContainerID()).thenReturn(cId);
Credentials creds = new Credentials();
creds.addToken(new Text("tok" + id), getToken(id));
when(c.getCredentials()).thenReturn(creds);
when(c.toString()).thenReturn(cId.toString());
return c;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
static Token<? extends TokenIdentifier> getToken(int id) {
return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),
new Text("kind" + id), new Text("service" + id));
}
}