TEZ-4084. Tez local mode fails when distributed cache creates link with parent
Signed-off-by: Jonathan Eagles <jeagles@apache.org>
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
index 45e5540..9bcbb15 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
@@ -96,11 +96,13 @@
Path linkPath = new Path(cwd, entry.getKey());
if (resourceInfo.containsKey(resource)) {
- // We've already downloaded this resource and just need to add another link.
- resourceInfo.get(resource).linkPaths.add(linkPath);
+ // We've already downloaded this resource and just need to add another link.
+ resourceInfo.get(resource).getLinkPaths().add(linkPath);
} else {
// submit task to download the object
- java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName);
+ java.nio.file.Path fp = Paths.get(resourceName).getFileName();
+ String prefix = fp == null ? "" : fp.toString(); // The null case is unexpected, but FindBugs complains
+ java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, prefix);
Path dest = new Path(downloadDir.toAbsolutePath().toString());
FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource);
Future<Path> downloadedPath = threadPool.submit(downloader);
@@ -113,12 +115,12 @@
LocalResource resource = entry.getKey();
ResourceInfo resourceMeta = entry.getValue();
- for (Path linkPath : resourceMeta.linkPaths) {
+ for (Path linkPath : resourceMeta.getLinkPaths()) {
Path targetPath;
try {
// this blocks on the download completing
- targetPath = resourceMeta.downloadPath.get();
+ targetPath = resourceMeta.getDownloadPath().get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
@@ -144,7 +146,7 @@
*/
public void cleanup() throws IOException {
for (ResourceInfo info : resourceInfo.values()) {
- for (Path linkPath : info.linkPaths) {
+ for (Path linkPath : info.getLinkPaths()) {
if (fileContext.util().exists(linkPath)) {
fileContext.delete(linkPath, true);
}
@@ -172,23 +174,31 @@
try {
Files.createSymbolicLink(Paths.get(linkPath), Paths.get(targetPath));
return true;
- } catch (UnsupportedOperationException e) {
- LOG.warn("Unable to create symlink {} <- {}: UnsupportedOperationException", target, link);
+ } catch (UnsupportedOperationException | IOException e) {
+ LOG.warn("Unable to create symlink {} <- {}: {}", target, link, e);
return false;
}
}
}
/**
- * Wrapper to keep track of download path and link path
+ * Wrapper to keep track of download path and link path.
*/
private static class ResourceInfo {
- final Future<Path> downloadPath;
- final Set<Path> linkPaths = new HashSet<>();
+ private final Future<Path> downloadPath;
+ private final Set<Path> linkPaths = new HashSet<>();
- public ResourceInfo(Future<Path> downloadPath, Path linkPath) {
+ ResourceInfo(Future<Path> downloadPath, Path linkPath) {
this.downloadPath = downloadPath;
- this.linkPaths.add(linkPath);
+ this.getLinkPaths().add(linkPath);
+ }
+
+ Future<Path> getDownloadPath() {
+ return downloadPath;
+ }
+
+ Set<Path> getLinkPaths() {
+ return linkPaths;
}
}
}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
index fb23a1d..beca047 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
@@ -39,69 +39,75 @@
import java.util.HashMap;
import java.util.Map;
+/**
+ * Test local cache manager.
+ */
public class TestTezLocalCacheManager {
- @Test
- public void testManager() throws URISyntaxException, IOException {
- Map<String, LocalResource> resources = new HashMap<>();
+ @Test
+ public void testManager() throws URISyntaxException, IOException {
+ Map<String, LocalResource> resources = new HashMap<>();
- // Test that localization works for regular files and verify that if multiple symlinks are created,
- // they all work
- LocalResource resourceOne = createFile("content-one");
- LocalResource resourceTwo = createFile("content-two");
+ // Test that localization works for regular files and verify that if multiple symlinks are created,
+ // they all work
+ LocalResource resourceOne = createFile("content-one");
+ LocalResource resourceTwo = createFile("content-two");
- resources.put("file-one", resourceOne);
- resources.put("file-two", resourceTwo);
- resources.put("file-three", resourceTwo);
+ resources.put("file-one", resourceOne);
+ resources.put("file-two", resourceTwo);
+ resources.put("file-three", resourceTwo);
- TezLocalCacheManager manager = new TezLocalCacheManager(resources, new Configuration());
+ // Not currently supported, but shouldn't throw an exception...
+ resources.put("some-subdir/file-three", resourceTwo);
- try {
- manager.localize();
+ TezLocalCacheManager manager = new TezLocalCacheManager(resources, new Configuration());
- Assert.assertEquals(
- "content-one",
- new String(Files.readAllBytes(Paths.get("./file-one")))
- );
+ try {
+ manager.localize();
- Assert.assertEquals(
- "content-two",
- new String(Files.readAllBytes(Paths.get("./file-two")))
- );
+ Assert.assertEquals(
+ "content-one",
+ new String(Files.readAllBytes(Paths.get("./file-one")))
+ );
- Assert.assertEquals(
- "content-two",
- new String(Files.readAllBytes(Paths.get("./file-three")))
- );
- } finally {
- manager.cleanup();
- }
+ Assert.assertEquals(
+ "content-two",
+ new String(Files.readAllBytes(Paths.get("./file-two")))
+ );
- // verify that symlinks were removed
- Assert.assertFalse(Files.exists(Paths.get("./file-one")));
- Assert.assertFalse(Files.exists(Paths.get("./file-two")));
- Assert.assertFalse(Files.exists(Paths.get("./file-three")));
+ Assert.assertEquals(
+ "content-two",
+ new String(Files.readAllBytes(Paths.get("./file-three")))
+ );
+ } finally {
+ manager.cleanup();
}
- // create a temporary file with the given content and return a LocalResource
- private static LocalResource createFile(String content) throws IOException {
- FileContext fs = FileContext.getLocalFSFileContext();
+ // verify that symlinks were removed
+ Assert.assertFalse(Files.exists(Paths.get("./file-one")));
+ Assert.assertFalse(Files.exists(Paths.get("./file-two")));
+ Assert.assertFalse(Files.exists(Paths.get("./file-three")));
+ }
- java.nio.file.Path tempFile = Files.createTempFile("test-cache-manager", ".txt");
- File temp = tempFile.toFile();
- temp.deleteOnExit();
- Path p = new Path("file:///" + tempFile.toAbsolutePath().toString());
+ // create a temporary file with the given content and return a LocalResource
+ private static LocalResource createFile(String content) throws IOException {
+ FileContext fs = FileContext.getLocalFSFileContext();
- Files.write(tempFile, content.getBytes());
+ java.nio.file.Path tempFile = Files.createTempFile("test-cache-manager", ".txt");
+ File temp = tempFile.toFile();
+ temp.deleteOnExit();
+ Path p = new Path("file:///" + tempFile.toAbsolutePath().toString());
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
- URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(p);
- ret.setResource(yarnUrlFromPath);
- ret.setSize(content.getBytes().length);
- ret.setType(LocalResourceType.FILE);
- ret.setVisibility(LocalResourceVisibility.PRIVATE);
- ret.setTimestamp(fs.getFileStatus(p).getModificationTime());
- return ret;
- }
+ Files.write(tempFile, content.getBytes());
+
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+ URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(p);
+ ret.setResource(yarnUrlFromPath);
+ ret.setSize(content.getBytes().length);
+ ret.setType(LocalResourceType.FILE);
+ ret.setVisibility(LocalResourceVisibility.PRIVATE);
+ ret.setTimestamp(fs.getFileStatus(p).getModificationTime());
+ return ret;
+ }
}