blob: 7598141f98286e0bba3574b73cc98ddadcd3cebc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.SharedCacheClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Tests the JobResourceUploader class with the shared cache.
*/
public class TestJobResourceUploaderWithSharedCache {
protected static final Log LOG = LogFactory
.getLog(TestJobResourceUploaderWithSharedCache.class);
private static MiniDFSCluster dfs;
private static FileSystem localFs;
private static FileSystem remoteFs;
private static Configuration conf = new Configuration();
private static Path testRootDir;
private static Path remoteStagingDir =
new Path(MRJobConfig.DEFAULT_MR_AM_STAGING_DIR);
private String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
@Before
public void cleanup() throws Exception {
remoteFs.delete(remoteStagingDir, true);
}
@BeforeClass
public static void setup() throws IOException {
// create configuration, dfs, file system
localFs = FileSystem.getLocal(conf);
testRootDir =
new Path("target",
TestJobResourceUploaderWithSharedCache.class.getName() + "-tmpDir")
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
remoteFs = dfs.getFileSystem();
}
@AfterClass
public static void tearDown() {
try {
if (localFs != null) {
localFs.close();
}
if (remoteFs != null) {
remoteFs.close();
}
if (dfs != null) {
dfs.shutdown();
}
} catch (IOException ioe) {
LOG.info("IO exception in closing file system");
ioe.printStackTrace();
}
}
private class MyFileUploader extends JobResourceUploader {
// The mocked SharedCacheClient that will be fed into the FileUploader
private SharedCacheClient mockscClient = mock(SharedCacheClient.class);
// A real client for checksum calculation
private SharedCacheClient scClient = SharedCacheClient
.createSharedCacheClient();
MyFileUploader(FileSystem submitFs, Configuration conf)
throws IOException {
super(submitFs, false);
// Initialize the real client, but don't start it. We don't need or want
// to create an actual proxy because we only use this for mocking out the
// getFileChecksum method.
scClient.init(conf);
when(mockscClient.getFileChecksum(any(Path.class))).thenAnswer(
new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
Path file = (Path) invocation.getArguments()[0];
// Use the real scClient to generate the checksum. We use an
// answer/mock combination to avoid having to spy on a real
// SharedCacheClient object.
return scClient.getFileChecksum(file);
}
});
}
// This method is to prime the mock client with the correct checksum, so it
// looks like a given resource is present in the shared cache.
public void mockFileInSharedCache(Path localFile, URL remoteFile)
throws YarnException, IOException {
// when the resource is referenced, simply return the remote path to the
// caller
when(mockscClient.use(any(ApplicationId.class),
eq(scClient.getFileChecksum(localFile)))).thenReturn(remoteFile);
}
@Override
protected SharedCacheClient createSharedCacheClient(Configuration c) {
// Feed the mocked SharedCacheClient into the FileUploader logic
return mockscClient;
}
}
@Test
public void testSharedCacheDisabled() throws Exception {
JobConf jobConf = createJobConf();
Job job = new Job(jobConf);
job.setJobID(new JobID("567789", 1));
// shared cache is disabled by default
uploadFilesToRemoteFS(job, jobConf, 0, 0, 0, false);
}
@Test
public void testSharedCacheEnabled() throws Exception {
JobConf jobConf = createJobConf();
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
Job job = new Job(jobConf);
job.setJobID(new JobID("567789", 1));
// shared cache is enabled for every file type
// the # of times SharedCacheClient.use is called should ==
// total # of files/libjars/archive/jobjar
uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, false);
}
@Test
public void testSharedCacheEnabledWithJobJarInSharedCache()
throws Exception {
JobConf jobConf = createJobConf();
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
Job job = new Job(jobConf);
job.setJobID(new JobID("567789", 1));
// shared cache is enabled for every file type
// the # of times SharedCacheClient.use is called should ==
// total # of files/libjars/archive/jobjar
uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, true);
}
@Test
public void testSharedCacheArchivesAndLibjarsEnabled() throws Exception {
JobConf jobConf = createJobConf();
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "archives,libjars");
Job job = new Job(jobConf);
job.setJobID(new JobID("567789", 1));
// shared cache is enabled for archives and libjars type
// the # of times SharedCacheClient.use is called should ==
// total # of libjars and archives
uploadFilesToRemoteFS(job, jobConf, 5, 1, 2, true);
}
private JobConf createJobConf() {
JobConf jobConf = new JobConf();
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
jobConf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
jobConf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, remoteFs.getUri()
.toString());
return jobConf;
}
private Path copyToRemote(Path jar) throws IOException {
Path remoteFile = new Path("/tmp", jar.getName());
remoteFs.copyFromLocalFile(jar, remoteFile);
return remoteFile;
}
private void makeJarAvailableInSharedCache(Path jar,
MyFileUploader fileUploader) throws YarnException, IOException {
// copy file to remote file system
Path remoteFile = copyToRemote(jar);
// prime mocking so that it looks like this file is in the shared cache
fileUploader.mockFileInSharedCache(jar, URL.fromPath(remoteFile));
}
private void uploadFilesToRemoteFS(Job job, JobConf jobConf,
int useCallCountExpected,
int numOfFilesShouldBeUploadedToSharedCacheExpected,
int numOfArchivesShouldBeUploadedToSharedCacheExpected,
boolean jobJarInSharedCacheBeforeUpload) throws Exception {
MyFileUploader fileUploader = new MyFileUploader(remoteFs, jobConf);
SharedCacheConfig sharedCacheConfig = new SharedCacheConfig();
sharedCacheConfig.init(jobConf);
Path firstFile = createTempFile("first-input-file", "x");
Path secondFile = createTempFile("second-input-file", "xx");
// Add files to job conf via distributed cache API as well as command line
boolean fileAdded = Job.addFileToSharedCache(firstFile.toUri(), jobConf);
assertEquals(sharedCacheConfig.isSharedCacheFilesEnabled(), fileAdded);
if (!fileAdded) {
Path remoteFile = copyToRemote(firstFile);
job.addCacheFile(remoteFile.toUri());
}
jobConf.set("tmpfiles", secondFile.toString());
// Create jars with a single file inside them.
Path firstJar = makeJar(new Path(testRootDir, "distributed.first.jar"), 1);
Path secondJar =
makeJar(new Path(testRootDir, "distributed.second.jar"), 2);
// Verify duplicated contents can be handled properly.
Path thirdJar = new Path(testRootDir, "distributed.third.jar");
localFs.copyFromLocalFile(secondJar, thirdJar);
// make secondJar cache available
makeJarAvailableInSharedCache(secondJar, fileUploader);
// Add libjars to job conf via distributed cache API as well as command
// line
boolean libjarAdded =
Job.addFileToSharedCacheAndClasspath(firstJar.toUri(), jobConf);
assertEquals(sharedCacheConfig.isSharedCacheLibjarsEnabled(), libjarAdded);
if (!libjarAdded) {
Path remoteJar = copyToRemote(firstJar);
job.addFileToClassPath(remoteJar);
}
jobConf.set("tmpjars", secondJar.toString() + "," + thirdJar.toString());
Path firstArchive = makeArchive("first-archive.zip", "first-file");
Path secondArchive = makeArchive("second-archive.zip", "second-file");
// Add archives to job conf via distributed cache API as well as command
// line
boolean archiveAdded =
Job.addArchiveToSharedCache(firstArchive.toUri(), jobConf);
assertEquals(sharedCacheConfig.isSharedCacheArchivesEnabled(),
archiveAdded);
if (!archiveAdded) {
Path remoteArchive = copyToRemote(firstArchive);
job.addCacheArchive(remoteArchive.toUri());
}
jobConf.set("tmparchives", secondArchive.toString());
// Add job jar to job conf
Path jobJar = makeJar(new Path(testRootDir, "test-job.jar"), 4);
if (jobJarInSharedCacheBeforeUpload) {
makeJarAvailableInSharedCache(jobJar, fileUploader);
}
jobConf.setJar(jobJar.toString());
fileUploader.uploadResources(job, remoteStagingDir);
verify(fileUploader.mockscClient, times(useCallCountExpected)).use(
any(ApplicationId.class), anyString());
int numOfFilesShouldBeUploadedToSharedCache = 0;
Map<String, Boolean> filesSharedCacheUploadPolicies =
Job.getFileSharedCacheUploadPolicies(jobConf);
for (Boolean policy : filesSharedCacheUploadPolicies.values()) {
if (policy) {
numOfFilesShouldBeUploadedToSharedCache++;
}
}
assertEquals(numOfFilesShouldBeUploadedToSharedCacheExpected,
numOfFilesShouldBeUploadedToSharedCache);
int numOfArchivesShouldBeUploadedToSharedCache = 0;
Map<String, Boolean> archivesSharedCacheUploadPolicies =
Job.getArchiveSharedCacheUploadPolicies(jobConf);
for (Boolean policy : archivesSharedCacheUploadPolicies.values()) {
if (policy) {
numOfArchivesShouldBeUploadedToSharedCache++;
}
}
assertEquals(numOfArchivesShouldBeUploadedToSharedCacheExpected,
numOfArchivesShouldBeUploadedToSharedCache);
}
private Path createTempFile(String filename, String contents)
throws IOException {
Path path = new Path(testRootDir, filename);
FSDataOutputStream os = localFs.create(path);
os.writeBytes(contents);
os.close();
localFs.setPermission(path, new FsPermission("700"));
return path;
}
private Path makeJar(Path p, int index) throws FileNotFoundException,
IOException {
FileOutputStream fos =
new FileOutputStream(new File(p.toUri().getPath()));
JarOutputStream jos = new JarOutputStream(fos);
ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
jos.putNextEntry(ze);
jos.write(("inside the jar!" + index).getBytes());
jos.closeEntry();
jos.close();
localFs.setPermission(p, new FsPermission("700"));
return p;
}
private Path makeArchive(String archiveFile, String filename)
throws Exception {
Path archive = new Path(testRootDir, archiveFile);
Path file = new Path(testRootDir, filename);
DataOutputStream out = localFs.create(archive);
ZipOutputStream zos = new ZipOutputStream(out);
ZipEntry ze = new ZipEntry(file.toString());
zos.putNextEntry(ze);
zos.write(input.getBytes("UTF-8"));
zos.closeEntry();
zos.close();
return archive;
}
}