blob: b3e66bd567dbdd495c10410ea413bbb81f1c196e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.filecache;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Random;
import javax.security.auth.login.LoginException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.DefaultTaskController;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskController.InitializationContext;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.mortbay.log.Log;
public class TestTrackerDistributedCacheManager extends TestCase {
protected String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp"),
TestTrackerDistributedCacheManager.class.getSimpleName())
.getAbsolutePath();
protected File ROOT_MAPRED_LOCAL_DIR;
private static String TEST_CACHE_BASE_DIR = "cachebasedir";
protected int numLocalDirs = 6;
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
protected Configuration conf;
protected Path firstCacheFile;
protected Path secondCacheFile;
private FileSystem fs;
protected LocalDirAllocator localDirAllocator =
new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
@Override
protected void setUp() throws IOException {
// Prepare the tests' root dir
File TEST_ROOT = new File(TEST_ROOT_DIR);
if (!TEST_ROOT.exists()) {
TEST_ROOT.mkdirs();
}
// Prepare the tests' mapred-local-dir
ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
ROOT_MAPRED_LOCAL_DIR.mkdirs();
conf = new Configuration();
conf.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, ROOT_MAPRED_LOCAL_DIR.toString());
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
fs = FileSystem.get(conf);
// Create the temporary cache files to be used in the tests.
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
createTempFile(firstCacheFile);
createTempFile(secondCacheFile);
}
/**
* Whether the test can run on the machine
*
* @return true if test can run on the machine, false otherwise
*/
protected boolean canRun() {
return true;
}
/**
* This is the typical flow for using the DistributedCache classes.
*
* @throws IOException
* @throws LoginException
*/
public void testManagerFlow() throws IOException, LoginException {
if (!canRun()) {
return;
}
// ****** Imitate JobClient code
// Configures a task/job with both a regular file and a "classpath" file.
Configuration subConf = new Configuration(conf);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
DistributedCache.addFileToClassPath(secondCacheFile, subConf);
TrackerDistributedCacheManager.determineTimestamps(subConf);
// ****** End of imitating JobClient code
Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
subConf.writeXml(os);
os.close();
String userName = getJobOwnerName();
// ****** Imitate TaskRunner code.
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf);
TaskDistributedCacheManager handle =
manager.newTaskDistributedCacheManager(subConf);
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
handle.setup(localDirAllocator, workDir, TaskTracker
.getDistributedCacheDir(userName));
InitializationContext context = new InitializationContext();
context.user = userName;
context.workDir = workDir;
getTaskController().initializeDistributedCache(context);
// ****** End of imitating TaskRunner code
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
assertNotNull(null, localCacheFiles);
assertEquals(2, localCacheFiles.length);
Path cachedFirstFile = localCacheFiles[0];
Path cachedSecondFile = localCacheFiles[1];
assertFileLengthEquals(firstCacheFile, cachedFirstFile);
assertFalse("Paths should be different.",
firstCacheFile.equals(cachedFirstFile));
assertEquals(1, handle.getClassPaths().size());
assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));
checkFilePermissions(localCacheFiles);
// Cleanup
handle.release();
manager.purgeCache();
assertFalse(pathToFile(cachedFirstFile).exists());
}
/**
* This DistributedCacheManager fails in localizing firstCacheFile.
*/
public class FakeTrackerDistributedCacheManager extends
TrackerDistributedCacheManager {
public FakeTrackerDistributedCacheManager(Configuration conf)
throws IOException {
super(conf);
}
@Override
Path localizeCache(Configuration conf, URI cache, long confFileStamp,
CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
throws IOException {
if (cache.equals(firstCacheFile.toUri())) {
throw new IOException("fake fail");
}
return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
fileStatus, isArchive);
}
}
public void testReferenceCount() throws IOException, LoginException,
URISyntaxException {
if (!canRun()) {
return;
}
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
TrackerDistributedCacheManager manager =
new FakeTrackerDistributedCacheManager(conf);
Cluster cluster = new Cluster(conf);
String userName = getJobOwnerName();
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
// Configures a job with a regular file
Job job1 = Job.getInstance(cluster, conf);
job1.addCacheFile(secondCacheFile.toUri());
Configuration conf1 = job1.getConfiguration();
TrackerDistributedCacheManager.determineTimestamps(conf1);
// Task localizing for first job
TaskDistributedCacheManager handle = manager
.newTaskDistributedCacheManager(conf1);
handle.setup(localDirAllocator, workDir, TaskTracker
.getDistributedCacheDir(userName));
handle.release();
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
}
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
createTempFile(thirdCacheFile);
// Configures another job with three regular files.
Job job2 = Job.getInstance(cluster, conf);
// add a file that would get failed to localize
job2.addCacheFile(firstCacheFile.toUri());
// add a file that is already localized by different job
job2.addCacheFile(secondCacheFile.toUri());
// add a file that is never localized
job2.addCacheFile(thirdCacheFile.toUri());
Configuration conf2 = job2.getConfiguration();
TrackerDistributedCacheManager.determineTimestamps(conf2);
// Task localizing for second job
// localization for the "firstCacheFile" will fail.
handle = manager.newTaskDistributedCacheManager(conf2);
Throwable th = null;
try {
handle.setup(localDirAllocator, workDir, TaskTracker
.getDistributedCacheDir(userName));
} catch (IOException e) {
th = e;
Log.info("Exception during setup", e);
}
assertNotNull(th);
assertTrue(th.getMessage().contains("fake fail"));
handle.release();
th = null;
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
try {
assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp));
} catch (IOException ie) {
th = ie;
Log.info("Exception getting reference count for " + c.uri, ie);
}
}
assertNotNull(th);
assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
fs.delete(thirdCacheFile, false);
}
/**
* Check proper permissions on the cache files
*
* @param localCacheFiles
* @throws IOException
*/
protected void checkFilePermissions(Path[] localCacheFiles)
throws IOException {
Path cachedFirstFile = localCacheFiles[0];
Path cachedSecondFile = localCacheFiles[1];
// Both the files should have executable permissions on them.
assertTrue("First cache file is not executable!", new File(cachedFirstFile
.toUri().getPath()).canExecute());
assertTrue("Second cache file is not executable!", new File(
cachedSecondFile.toUri().getPath()).canExecute());
}
protected TaskController getTaskController() {
return new DefaultTaskController();
}
protected String getJobOwnerName() throws LoginException {
UserGroupInformation ugi = UserGroupInformation.login(conf);
return ugi.getUserName();
}
/** test delete cache */
public void testDeleteCache() throws Exception {
if (!canRun()) {
return;
}
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf);
FileSystem localfs = FileSystem.getLocal(conf);
long now = System.currentTimeMillis();
manager.getLocalCache(firstCacheFile.toUri(), conf,
TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
now, new Path(TEST_ROOT_DIR), false);
manager.releaseCache(firstCacheFile.toUri(), conf, now);
//in above code,localized a file of size 4K and then release the cache
// which will cause the cache be deleted when the limit goes out.
// The below code localize another cache which's designed to
//sweep away the first cache.
manager.getLocalCache(secondCacheFile.toUri(), conf,
TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false,
System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
FileStatus[] dirStatuses = localfs.listStatus(
new Path(ROOT_MAPRED_LOCAL_DIR.toString()));
assertTrue("DistributedCache failed deleting old" +
" cache when the cache store is full.",
dirStatuses.length == 1);
}
public void testFileSystemOtherThanDefault() throws Exception {
if (!canRun()) {
return;
}
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf);
conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
Path fileToCache = new Path("fakefile:///"
+ firstCacheFile.toUri().getPath());
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
System.currentTimeMillis(),
new Path(TEST_ROOT_DIR), false);
assertNotNull("DistributedCache cached file on non-default filesystem.",
result);
}
static void createTempFile(Path p) throws IOException {
File f = new File(p.toString());
FileOutputStream os = new FileOutputStream(f);
byte[] toWrite = new byte[TEST_FILE_SIZE];
new Random().nextBytes(toWrite);
os.write(toWrite);
os.close();
FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
}
@Override
protected void tearDown() throws IOException {
new File(firstCacheFile.toString()).delete();
new File(secondCacheFile.toString()).delete();
FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
}
protected void assertFileLengthEquals(Path a, Path b)
throws FileNotFoundException {
assertEquals("File sizes mismatch.",
pathToFile(a).length(), pathToFile(b).length());
}
protected File pathToFile(Path p) {
return new File(p.toString());
}
public static class FakeFileSystem extends RawLocalFileSystem {
private long increment = 0;
public FakeFileSystem() {
super();
}
public FileStatus getFileStatus(Path p) throws IOException {
File f = pathToFile(p);
return new FileStatus(f.length(), f.isDirectory(), 1, 128,
f.lastModified() + increment, makeQualified(new Path(f.getPath())));
}
void advanceClock(long millis) {
increment += millis;
}
}
public void testFreshness() throws Exception {
if (!canRun()) {
return;
}
Configuration myConf = new Configuration(conf);
myConf.set("fs.default.name", "refresh:///");
myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(myConf);
// ****** Imitate JobClient code
// Configures a task/job with both a regular file and a "classpath" file.
Configuration subConf = new Configuration(myConf);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
TrackerDistributedCacheManager.determineTimestamps(subConf);
// ****** End of imitating JobClient code
String userName = getJobOwnerName();
// ****** Imitate TaskRunner code.
TaskDistributedCacheManager handle =
manager.newTaskDistributedCacheManager(subConf);
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
handle.setup(localDirAllocator, workDir, TaskTracker
.getDistributedCacheDir(userName));
// ****** End of imitating TaskRunner code
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
assertNotNull(null, localCacheFiles);
assertEquals(1, localCacheFiles.length);
Path cachedFirstFile = localCacheFiles[0];
assertFileLengthEquals(firstCacheFile, cachedFirstFile);
assertFalse("Paths should be different.",
firstCacheFile.equals(cachedFirstFile));
// release
handle.release();
// change the file timestamp
FileSystem fs = FileSystem.get(myConf);
((FakeFileSystem)fs).advanceClock(1);
// running a task of the same job
Throwable th = null;
try {
handle.setup(localDirAllocator, workDir, TaskTracker
.getDistributedCacheDir(userName));
} catch (IOException ie) {
th = ie;
}
assertNotNull("Throwable is null", th);
assertTrue("Exception message does not match",
th.getMessage().contains("has changed on HDFS since job started"));
// release
handle.release();
// submit another job
Configuration subConf2 = new Configuration(myConf);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
TrackerDistributedCacheManager.determineTimestamps(subConf2);
handle =
manager.newTaskDistributedCacheManager(subConf2);
handle.setup(localDirAllocator, workDir, TaskTracker
.getDistributedCacheDir(userName));
Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
assertNotNull(null, localCacheFiles2);
assertEquals(1, localCacheFiles2.length);
Path cachedFirstFile2 = localCacheFiles2[0];
assertFileLengthEquals(firstCacheFile, cachedFirstFile2);
assertFalse("Paths should be different.",
firstCacheFile.equals(cachedFirstFile2));
// assert that two localizations point to different paths
assertFalse("two jobs with different timestamps did not localize" +
" in different paths", cachedFirstFile.equals(cachedFirstFile2));
// release
handle.release();
}
}