blob: 05497cc0c7c3bfff57c8138287b3e788c273550f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.util;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class TestMRApps {
private static File testWorkDir = null;
@BeforeClass
public static void setupTestDirs() throws IOException {
testWorkDir = new File("target", TestMRApps.class.getCanonicalName());
delete(testWorkDir);
testWorkDir.mkdirs();
testWorkDir = testWorkDir.getAbsoluteFile();
}
@AfterClass
public static void cleanupTestDirs() throws IOException {
if (testWorkDir != null) {
delete(testWorkDir);
}
}
private static void delete(File dir) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
fs.delete(p, true);
}
@Test (timeout = 120000)
public void testJobIDtoString() {
JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
jid.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
assertEquals("job_0_0000", MRApps.toString(jid));
}
@Test (timeout = 120000)
public void testToJobID() {
JobId jid = MRApps.toJobID("job_1_1");
assertEquals(1, jid.getAppId().getClusterTimestamp());
assertEquals(1, jid.getAppId().getId());
assertEquals(1, jid.getId()); // tests against some proto.id and not a job.id field
}
@Test (timeout = 120000, expected=IllegalArgumentException.class)
public void testJobIDShort() {
MRApps.toJobID("job_0_0_0");
}
//TODO_get.set
@Test (timeout = 120000)
public void testTaskIDtoString() {
TaskId tid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class);
tid.setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
tid.getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
tid.setTaskType(TaskType.MAP);
TaskType type = tid.getTaskType();
System.err.println(type);
type = TaskType.REDUCE;
System.err.println(type);
System.err.println(tid.getTaskType());
assertEquals("task_0_0000_m_000000", MRApps.toString(tid));
tid.setTaskType(TaskType.REDUCE);
assertEquals("task_0_0000_r_000000", MRApps.toString(tid));
}
@Test (timeout = 120000)
public void testToTaskID() {
TaskId tid = MRApps.toTaskID("task_1_2_r_3");
assertEquals(1, tid.getJobId().getAppId().getClusterTimestamp());
assertEquals(2, tid.getJobId().getAppId().getId());
assertEquals(2, tid.getJobId().getId());
assertEquals(TaskType.REDUCE, tid.getTaskType());
assertEquals(3, tid.getId());
tid = MRApps.toTaskID("task_1_2_m_3");
assertEquals(TaskType.MAP, tid.getTaskType());
}
@Test(timeout = 120000, expected=IllegalArgumentException.class)
public void testTaskIDShort() {
MRApps.toTaskID("task_0_0000_m");
}
@Test(timeout = 120000, expected=IllegalArgumentException.class)
public void testTaskIDBadType() {
MRApps.toTaskID("task_0_0000_x_000000");
}
//TODO_get.set
@Test (timeout = 120000)
public void testTaskAttemptIDtoString() {
TaskAttemptId taid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptId.class);
taid.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
taid.getTaskId().setTaskType(TaskType.MAP);
taid.getTaskId().setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
taid.getTaskId().getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
assertEquals("attempt_0_0000_m_000000_0", MRApps.toString(taid));
}
@Test (timeout = 120000)
public void testToTaskAttemptID() {
TaskAttemptId taid = MRApps.toTaskAttemptID("attempt_0_1_m_2_3");
assertEquals(0, taid.getTaskId().getJobId().getAppId().getClusterTimestamp());
assertEquals(1, taid.getTaskId().getJobId().getAppId().getId());
assertEquals(1, taid.getTaskId().getJobId().getId());
assertEquals(2, taid.getTaskId().getId());
assertEquals(3, taid.getId());
}
@Test(timeout = 120000, expected=IllegalArgumentException.class)
public void testTaskAttemptIDShort() {
MRApps.toTaskAttemptID("attempt_0_0_0_m_0");
}
@Test (timeout = 120000)
public void testGetJobFileWithUser() {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging");
String jobFile = MRApps.getJobFile(conf, "dummy-user",
new JobID("dummy-job", 12345));
assertNotNull("getJobFile results in null.", jobFile);
assertEquals("jobFile with specified user is not as expected.",
"/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
}
@Test (timeout = 120000)
public void testSetClasspath() throws IOException {
Job job = Job.getInstance();
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, job.getConfiguration());
assertTrue(environment.get("CLASSPATH").startsWith(
ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
String yarnAppClasspath =
job.getConfiguration().get(
YarnConfiguration.YARN_APPLICATION_CLASSPATH);
if (yarnAppClasspath != null) {
yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", File.pathSeparator)
.trim();
}
assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath));
String mrAppClasspath =
job.getConfiguration().get(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH);
if (mrAppClasspath != null) {
mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", File.pathSeparator)
.trim();
}
assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
}
@Test (timeout = 120000)
public void testSetClasspathWithArchives () throws IOException {
File testTGZ = new File(testWorkDir, "test.tgz");
FileOutputStream out = new FileOutputStream(testTGZ);
out.write(0);
out.close();
Job job = Job.getInstance();
Configuration conf = job.getConfiguration();
String testTGZQualifiedPath = FileSystem.getLocal(conf).makeQualified(new Path(
testTGZ.getAbsolutePath())).toString();
conf.set(MRJobConfig.CLASSPATH_ARCHIVES, testTGZQualifiedPath);
conf.set(MRJobConfig.CACHE_ARCHIVES, testTGZQualifiedPath + "#testTGZ");
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf);
assertTrue(environment.get("CLASSPATH").startsWith(
ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
if (confClasspath != null) {
confClasspath = confClasspath.replaceAll(",\\s*", File.pathSeparator)
.trim();
}
assertTrue(environment.get("CLASSPATH").contains(confClasspath));
assertTrue(environment.get("CLASSPATH").contains("testTGZ"));
}
@Test (timeout = 120000)
public void testSetClasspathWithUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
Map<String, String> env = new HashMap<String, String>();
try {
MRApps.setClasspath(env, conf);
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(File.pathSeparator,
Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
"job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
env_str.startsWith(expectedClasspath));
}
@Test (timeout = 120000)
public void testSetClasspathWithNoUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
Map<String, String> env = new HashMap<String, String>();
try {
MRApps.setClasspath(env, conf);
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(File.pathSeparator,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
+ " the classpath!", env_str.contains(expectedClasspath));
assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
env_str.startsWith(expectedClasspath));
}
@Test (timeout = 120000)
public void testSetClasspathWithJobClassloader() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf);
String cp = env.get("CLASSPATH");
String appCp = env.get("APP_CLASSPATH");
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the"
+ " classpath!", cp.contains("jar" + File.pathSeparator + "job"));
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
cp.contains("PWD"));
String expectedAppClasspath = StringUtils.join(File.pathSeparator,
Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
"job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$() + "/*"));
assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
+ " classpath!", expectedAppClasspath, appCp);
}
@Test (timeout = 30000)
public void testSetupDistributedCacheEmpty() throws IOException {
Configuration conf = new Configuration();
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertTrue("Empty Config did not produce an empty list of resources",
localResources.isEmpty());
}
@SuppressWarnings("deprecation")
@Test(timeout = 120000, expected = InvalidJobConfException.class)
public void testSetupDistributedCacheConflicts() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip#something");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
}
@SuppressWarnings("deprecation")
@Test(timeout = 120000, expected = InvalidJobConfException.class)
public void testSetupDistributedCacheConflictsFiles() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI file = new URI("mockfs://mock/tmp/something.zip#something");
Path filePath = new Path(file);
URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
Path file2Path = new Path(file);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
DistributedCache.addCacheFile(file, conf);
DistributedCache.addCacheFile(file2, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
}
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSetupDistributedCache() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(2, localResources.size());
LocalResource lr = localResources.get("something.zip");
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
lr = localResources.get("something");
assertNotNull(lr);
assertEquals(11l, lr.getSize());
assertEquals(11l, lr.getTimestamp());
assertEquals(LocalResourceType.FILE, lr.getType());
}
static class MockFileSystem extends FilterFileSystem {
MockFileSystem() {
super(mock(FileSystem.class));
}
public void initialize(URI name, Configuration conf) throws IOException {}
}
}