blob: 5212b9f0434f3a5a9b3f80f57429a6acba85ed5d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.filecache;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.After;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
public class TestClientDistributedCacheManager {
private static final Log LOG = LogFactory.getLog(
TestClientDistributedCacheManager.class);
private static final Path TEST_ROOT_DIR = new Path(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestClientDistributedCacheManager.class.getSimpleName());
private static final Path TEST_VISIBILITY_PARENT_DIR =
new Path(TEST_ROOT_DIR, "TestCacheVisibility_Parent");
private static final Path TEST_VISIBILITY_CHILD_DIR =
new Path(TEST_VISIBILITY_PARENT_DIR, "TestCacheVisibility_Child");
private static final String FIRST_CACHE_FILE = "firstcachefile";
private static final String SECOND_CACHE_FILE = "secondcachefile";
private FileSystem fs;
private Path firstCacheFile;
private Path secondCacheFile;
private Configuration conf;
@Before
public void setup() throws IOException {
conf = new Configuration();
fs = FileSystem.get(conf);
firstCacheFile = new Path(TEST_VISIBILITY_PARENT_DIR, FIRST_CACHE_FILE);
secondCacheFile = new Path(TEST_VISIBILITY_CHILD_DIR, SECOND_CACHE_FILE);
createTempFile(firstCacheFile, conf);
createTempFile(secondCacheFile, conf);
}
@After
public void tearDown() throws IOException {
if (fs.delete(TEST_ROOT_DIR, true)) {
LOG.warn("Failed to delete test root dir and its content under "
+ TEST_ROOT_DIR);
}
}
@Test
public void testDetermineTimestamps() throws IOException {
Job job = Job.getInstance(conf);
job.addCacheFile(firstCacheFile.toUri());
job.addCacheFile(secondCacheFile.toUri());
Configuration jobConf = job.getConfiguration();
Map<URI, FileStatus> statCache = new HashMap<>();
ClientDistributedCacheManager.determineTimestamps(jobConf, statCache);
FileStatus firstStatus = statCache.get(firstCacheFile.toUri());
FileStatus secondStatus = statCache.get(secondCacheFile.toUri());
Assert.assertNotNull(firstCacheFile + " was not found in the stats cache",
firstStatus);
Assert.assertNotNull(secondCacheFile + " was not found in the stats cache",
secondStatus);
Assert.assertEquals("Missing/extra entries found in the stas cache",
2, statCache.size());
String expected = firstStatus.getModificationTime() + ","
+ secondStatus.getModificationTime();
Assert.assertEquals(expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS));
job = Job.getInstance(conf);
job.addCacheFile(new Path(TEST_VISIBILITY_CHILD_DIR, "*").toUri());
jobConf = job.getConfiguration();
statCache.clear();
ClientDistributedCacheManager.determineTimestamps(jobConf, statCache);
FileStatus thirdStatus = statCache.get(TEST_VISIBILITY_CHILD_DIR.toUri());
Assert.assertEquals("Missing/extra entries found in the stas cache",
1, statCache.size());
Assert.assertNotNull(TEST_VISIBILITY_CHILD_DIR
+ " was not found in the stats cache", thirdStatus);
expected = Long.toString(thirdStatus.getModificationTime());
Assert.assertEquals("Incorrect timestamp for " + TEST_VISIBILITY_CHILD_DIR,
expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS));
}
@Test
public void testDetermineCacheVisibilities() throws IOException {
fs.setPermission(TEST_VISIBILITY_PARENT_DIR,
new FsPermission((short)00777));
fs.setPermission(TEST_VISIBILITY_CHILD_DIR,
new FsPermission((short)00777));
fs.setWorkingDirectory(TEST_VISIBILITY_CHILD_DIR);
Job job = Job.getInstance(conf);
Path relativePath = new Path(SECOND_CACHE_FILE);
Path wildcardPath = new Path("*");
Map<URI, FileStatus> statCache = new HashMap<>();
Configuration jobConf;
job.addCacheFile(firstCacheFile.toUri());
job.addCacheFile(relativePath.toUri());
jobConf = job.getConfiguration();
ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
statCache);
// We use get() instead of getBoolean() so we can tell the difference
// between wrong and missing
assertEquals("The file paths were not found to be publicly visible "
+ "even though the full path is publicly accessible",
"true,true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
checkCacheEntries(statCache, null, firstCacheFile, relativePath);
job = Job.getInstance(conf);
job.addCacheFile(wildcardPath.toUri());
jobConf = job.getConfiguration();
statCache.clear();
ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
statCache);
// We use get() instead of getBoolean() so we can tell the difference
// between wrong and missing
assertEquals("The file path was not found to be publicly visible "
+ "even though the full path is publicly accessible",
"true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
checkCacheEntries(statCache, null, wildcardPath.getParent());
Path qualifiedParent = fs.makeQualified(TEST_VISIBILITY_PARENT_DIR);
fs.setPermission(TEST_VISIBILITY_PARENT_DIR,
new FsPermission((short)00700));
job = Job.getInstance(conf);
job.addCacheFile(firstCacheFile.toUri());
job.addCacheFile(relativePath.toUri());
jobConf = job.getConfiguration();
statCache.clear();
ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
statCache);
// We use get() instead of getBoolean() so we can tell the difference
// between wrong and missing
assertEquals("The file paths were found to be publicly visible "
+ "even though the parent directory is not publicly accessible",
"false,false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
checkCacheEntries(statCache, qualifiedParent,
firstCacheFile, relativePath);
job = Job.getInstance(conf);
job.addCacheFile(wildcardPath.toUri());
jobConf = job.getConfiguration();
statCache.clear();
ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
statCache);
// We use get() instead of getBoolean() so we can tell the difference
// between wrong and missing
assertEquals("The file path was found to be publicly visible "
+ "even though the parent directory is not publicly accessible",
"false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
checkCacheEntries(statCache, qualifiedParent, wildcardPath.getParent());
}
/**
* Validate that the file status cache contains all and only entries for a
* given set of paths up to a common parent.
*
* @param statCache the cache
* @param top the common parent at which to stop digging
* @param paths the paths to compare against the cache
*/
private void checkCacheEntries(Map<URI, FileStatus> statCache, Path top,
Path... paths) {
Set<URI> expected = new HashSet<>();
for (Path path : paths) {
Path p = fs.makeQualified(path);
while (!p.isRoot() && !p.equals(top)) {
expected.add(p.toUri());
p = p.getParent();
}
expected.add(p.toUri());
}
Set<URI> uris = statCache.keySet();
Set<URI> missing = new HashSet<>(uris);
Set<URI> extra = new HashSet<>(expected);
missing.removeAll(expected);
extra.removeAll(uris);
assertTrue("File status cache does not contain an entries for " + missing,
missing.isEmpty());
assertTrue("File status cache contains extra extries: " + extra,
extra.isEmpty());
}
@SuppressWarnings("deprecation")
void createTempFile(Path p, Configuration conf) throws IOException {
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, p,
Text.class, Text.class,
CompressionType.NONE);
writer.append(new Text("text"), new Text("moretext"));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
if (writer != null) {
writer.close();
}
writer = null;
}
LOG.info("created: " + p);
}
}