blob: 829eba4e1c0a1eeae9e3aaf77b28ad383c9a23a1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestContainerLocalizer {
static final Path basedir =
new Path("target", TestContainerLocalizer.class.getName());
static final String appUser = "yak";
static final String appId = "app_RM_0";
static final String containerId = "container_0";
static final InetSocketAddress nmAddr =
new InetSocketAddress("foobar", 8040);
private AbstractFileSystem spylfs;
private Random random;
private List<Path> localDirs;
private Path tokenPath;
private LocalizationProtocol nmProxy;
@Test
public void testContainerLocalizerMain() throws Exception {
ContainerLocalizer localizer = setupContainerLocalizerForTest();
// mock heartbeat responses from NM
LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
LocalResource rsrcC = getMockRsrc(random,
LocalResourceVisibility.APPLICATION);
LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcA)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcB)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcC)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcD)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.<LocalResource>emptyList()))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
null));
doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
isA(UserGroupInformation.class));
doReturn(new FakeDownload(rsrcB.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcB),
isA(UserGroupInformation.class));
doReturn(new FakeDownload(rsrcC.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcC),
isA(UserGroupInformation.class));
doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
isA(UserGroupInformation.class));
// run localization
assertEquals(0, localizer.runLocalization(nmAddr));
// verify created cache
for (Path p : localDirs) {
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
// $x/usercache/$user/filecache
verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(false));
Path appDir =
new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
// $x/usercache/$user/appcache/$appId/filecache
Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(false));
}
// verify tokens read at expected location
verify(spylfs).open(tokenPath);
// verify downloaded resources reported to NM
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC)));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD)));
// verify all HB use localizerID provided
verify(nmProxy, never()).heartbeat(argThat(
new ArgumentMatcher<LocalizerStatus>() {
@Override
public boolean matches(Object o) {
LocalizerStatus status = (LocalizerStatus) o;
return !containerId.equals(status.getLocalizerId());
}
}));
}
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testContainerLocalizerClosesFilesystems() throws Exception {
// verify filesystems are closed when localizer doesn't fail
ContainerLocalizer localizer = setupContainerLocalizerForTest();
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
any(CompletionService.class), any(UserGroupInformation.class));
verify(localizer, never()).closeFileSystems(
any(UserGroupInformation.class));
localizer.runLocalization(nmAddr);
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
// verify filesystems are closed when localizer fails
localizer = setupContainerLocalizerForTest();
doThrow(new YarnException("Forced Failure")).when(localizer).localizeFiles(
any(LocalizationProtocol.class), any(CompletionService.class),
any(UserGroupInformation.class));
verify(localizer, never()).closeFileSystems(
any(UserGroupInformation.class));
localizer.runLocalization(nmAddr);
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
}
@SuppressWarnings("unchecked") // mocked generics
private ContainerLocalizer setupContainerLocalizerForTest()
throws Exception {
spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
// don't actually create dirs
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
Configuration conf = new Configuration();
FileContext lfs = FileContext.getFileContext(spylfs, conf);
localDirs = new ArrayList<Path>();
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
}
RecordFactory mockRF = getMockLocalizerRecordFactory();
ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, appUser,
appId, containerId, localDirs, mockRF);
ContainerLocalizer localizer = spy(concreteLoc);
// return credential stream instead of opening local file
random = new Random();
long seed = random.nextLong();
System.out.println("SEED: " + seed);
random.setSeed(seed);
DataInputBuffer appTokens = createFakeCredentials(random, 10);
tokenPath =
lfs.makeQualified(new Path(
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
containerId)));
doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
).when(spylfs).open(tokenPath);
nmProxy = mock(LocalizationProtocol.class);
doReturn(nmProxy).when(localizer).getProxy(nmAddr);
doNothing().when(localizer).sleep(anyInt());
// return result instantly for deterministic test
ExecutorService syncExec = mock(ExecutorService.class);
CompletionService<Path> cs = mock(CompletionService.class);
when(cs.submit(isA(Callable.class)))
.thenAnswer(new Answer<Future<Path>>() {
@Override
public Future<Path> answer(InvocationOnMock invoc)
throws Throwable {
Future<Path> done = mock(Future.class);
when(done.isDone()).thenReturn(true);
FakeDownload d = (FakeDownload) invoc.getArguments()[0];
when(done.get()).thenReturn(d.call());
return done;
}
});
doReturn(syncExec).when(localizer).createDownloadThreadPool();
doReturn(cs).when(localizer).createCompletionService(syncExec);
return localizer;
}
static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
final LocalResource rsrc;
HBMatches(LocalResource rsrc) {
this.rsrc = rsrc;
}
@Override
public boolean matches(Object o) {
LocalizerStatus status = (LocalizerStatus) o;
for (LocalResourceStatus localized : status.getResources()) {
switch (localized.getStatus()) {
case FETCH_SUCCESS:
if (localized.getLocalPath().getFile().contains(
rsrc.getResource().getFile())) {
return true;
}
break;
default:
fail("Unexpected: " + localized.getStatus());
break;
}
}
return false;
}
}
static class FakeDownload implements Callable<Path> {
private final Path localPath;
private final boolean succeed;
FakeDownload(String absPath, boolean succeed) {
this.localPath = new Path("file:///localcache" + absPath);
this.succeed = succeed;
}
@Override
public Path call() throws IOException {
if (!succeed) {
throw new IOException("FAIL " + localPath);
}
return localPath;
}
}
static RecordFactory getMockLocalizerRecordFactory() {
RecordFactory mockRF = mock(RecordFactory.class);
when(mockRF.newRecordInstance(same(LocalResourceStatus.class)))
.thenAnswer(new Answer<LocalResourceStatus>() {
@Override
public LocalResourceStatus answer(InvocationOnMock invoc)
throws Throwable {
return new MockLocalResourceStatus();
}
});
when(mockRF.newRecordInstance(same(LocalizerStatus.class)))
.thenAnswer(new Answer<LocalizerStatus>() {
@Override
public LocalizerStatus answer(InvocationOnMock invoc)
throws Throwable {
return new MockLocalizerStatus();
}
});
return mockRF;
}
static LocalResource getMockRsrc(Random r,
LocalResourceVisibility vis) {
LocalResource rsrc = mock(LocalResource.class);
String name = Long.toHexString(r.nextLong());
URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
when(uri.getScheme()).thenReturn("file");
when(uri.getHost()).thenReturn(null);
when(uri.getFile()).thenReturn("/local/" + vis + "/" + name);
when(rsrc.getResource()).thenReturn(uri);
when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
when(rsrc.getVisibility()).thenReturn(vis);
return rsrc;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
static DataInputBuffer createFakeCredentials(Random r, int nTok)
throws IOException {
Credentials creds = new Credentials();
byte[] password = new byte[20];
Text kind = new Text();
Text service = new Text();
Text alias = new Text();
for (int i = 0; i < nTok; ++i) {
byte[] identifier = ("idef" + i).getBytes();
r.nextBytes(password);
kind.set("kind" + i);
service.set("service" + i);
alias.set("token" + i);
Token token = new Token(identifier, password, kind, service);
creds.addToken(alias, token);
}
DataOutputBuffer buf = new DataOutputBuffer();
creds.writeTokenStorageToStream(buf);
DataInputBuffer ret = new DataInputBuffer();
ret.reset(buf.getData(), 0, buf.getLength());
return ret;
}
}