TEZ-4084. Tez local mode fails when distributed cache creates link with parent

Signed-off-by: Jonathan Eagles <jeagles@apache.org>
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
index 45e5540..9bcbb15 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java
@@ -96,11 +96,13 @@
         Path linkPath = new Path(cwd, entry.getKey());
 
         if (resourceInfo.containsKey(resource)) {
-            // We've already downloaded this resource and just need to add another link.
-            resourceInfo.get(resource).linkPaths.add(linkPath);
+          // We've already downloaded this resource and just need to add another link.
+          resourceInfo.get(resource).getLinkPaths().add(linkPath);
         } else {
           // submit task to download the object
-          java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName);
+          java.nio.file.Path fp = Paths.get(resourceName).getFileName();
+          String prefix = fp == null ? "" : fp.toString(); // The null case is unexpected, but FindBugs complains
+          java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, prefix);
           Path dest = new Path(downloadDir.toAbsolutePath().toString());
           FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource);
           Future<Path> downloadedPath = threadPool.submit(downloader);
@@ -113,12 +115,12 @@
         LocalResource resource = entry.getKey();
         ResourceInfo resourceMeta = entry.getValue();
 
-        for (Path linkPath : resourceMeta.linkPaths) {
+        for (Path linkPath : resourceMeta.getLinkPaths()) {
           Path targetPath;
 
           try {
             // this blocks on the download completing
-            targetPath = resourceMeta.downloadPath.get();
+            targetPath = resourceMeta.getDownloadPath().get();
           } catch (InterruptedException | ExecutionException e) {
             throw new IOException(e);
           }
@@ -144,7 +146,7 @@
    */
   public void cleanup() throws IOException {
     for (ResourceInfo info : resourceInfo.values()) {
-      for (Path linkPath : info.linkPaths) {
+      for (Path linkPath : info.getLinkPaths()) {
         if (fileContext.util().exists(linkPath)) {
           fileContext.delete(linkPath, true);
         }
@@ -172,23 +174,31 @@
       try {
         Files.createSymbolicLink(Paths.get(linkPath), Paths.get(targetPath));
         return true;
-      } catch (UnsupportedOperationException e) {
-        LOG.warn("Unable to create symlink {} <- {}: UnsupportedOperationException", target, link);
+      } catch (UnsupportedOperationException | IOException e) {
+        LOG.warn("Unable to create symlink {} <- {}: {}", target, link, e);
         return false;
       }
     }
   }
 
   /**
-   * Wrapper to keep track of download path and link path
+   * Wrapper to keep track of download path and link path.
    */
   private static class ResourceInfo {
-    final Future<Path> downloadPath;
-    final Set<Path> linkPaths = new HashSet<>();
+    private final Future<Path> downloadPath;
+    private final Set<Path> linkPaths = new HashSet<>();
 
-    public ResourceInfo(Future<Path> downloadPath, Path linkPath) {
+    ResourceInfo(Future<Path> downloadPath, Path linkPath) {
       this.downloadPath = downloadPath;
-      this.linkPaths.add(linkPath);
+      this.getLinkPaths().add(linkPath);
+    }
+
+    Future<Path> getDownloadPath() {
+      return downloadPath;
+    }
+
+    Set<Path> getLinkPaths() {
+      return linkPaths;
     }
   }
 }
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
index fb23a1d..beca047 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java
@@ -39,69 +39,75 @@
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Test local cache manager.
+ */
 public class TestTezLocalCacheManager {
 
-    @Test
-    public void testManager() throws URISyntaxException, IOException {
-        Map<String, LocalResource> resources = new HashMap<>();
+  @Test
+  public void testManager() throws URISyntaxException, IOException {
+    Map<String, LocalResource> resources = new HashMap<>();
 
-        // Test that localization works for regular files and verify that if multiple symlinks are created,
-        // they all work
-        LocalResource resourceOne = createFile("content-one");
-        LocalResource resourceTwo = createFile("content-two");
+    // Test that localization works for regular files and verify that if multiple symlinks are created,
+    // they all work
+    LocalResource resourceOne = createFile("content-one");
+    LocalResource resourceTwo = createFile("content-two");
 
-        resources.put("file-one", resourceOne);
-        resources.put("file-two", resourceTwo);
-        resources.put("file-three", resourceTwo);
+    resources.put("file-one", resourceOne);
+    resources.put("file-two", resourceTwo);
+    resources.put("file-three", resourceTwo);
 
-        TezLocalCacheManager manager = new TezLocalCacheManager(resources, new Configuration());
+    // Not currently supported, but shouldn't throw an exception...
+    resources.put("some-subdir/file-three", resourceTwo);
 
-        try {
-            manager.localize();
+    TezLocalCacheManager manager = new TezLocalCacheManager(resources, new Configuration());
 
-            Assert.assertEquals(
-                    "content-one",
-                    new String(Files.readAllBytes(Paths.get("./file-one")))
-            );
+    try {
+      manager.localize();
 
-            Assert.assertEquals(
-                    "content-two",
-                    new String(Files.readAllBytes(Paths.get("./file-two")))
-            );
+      Assert.assertEquals(
+          "content-one",
+          new String(Files.readAllBytes(Paths.get("./file-one")))
+      );
 
-            Assert.assertEquals(
-                    "content-two",
-                    new String(Files.readAllBytes(Paths.get("./file-three")))
-            );
-        } finally {
-            manager.cleanup();
-        }
+      Assert.assertEquals(
+          "content-two",
+          new String(Files.readAllBytes(Paths.get("./file-two")))
+      );
 
-        // verify that symlinks were removed
-        Assert.assertFalse(Files.exists(Paths.get("./file-one")));
-        Assert.assertFalse(Files.exists(Paths.get("./file-two")));
-        Assert.assertFalse(Files.exists(Paths.get("./file-three")));
+      Assert.assertEquals(
+          "content-two",
+          new String(Files.readAllBytes(Paths.get("./file-three")))
+      );
+    } finally {
+      manager.cleanup();
     }
 
-    // create a temporary file with the given content and return a LocalResource
-    private static LocalResource createFile(String content) throws IOException {
-        FileContext fs = FileContext.getLocalFSFileContext();
+    // verify that symlinks were removed
+    Assert.assertFalse(Files.exists(Paths.get("./file-one")));
+    Assert.assertFalse(Files.exists(Paths.get("./file-two")));
+    Assert.assertFalse(Files.exists(Paths.get("./file-three")));
+  }
 
-        java.nio.file.Path tempFile = Files.createTempFile("test-cache-manager", ".txt");
-        File temp = tempFile.toFile();
-        temp.deleteOnExit();
-        Path p = new Path("file:///" + tempFile.toAbsolutePath().toString());
+  // create a temporary file with the given content and return a LocalResource
+  private static LocalResource createFile(String content) throws IOException {
+    FileContext fs = FileContext.getLocalFSFileContext();
 
-        Files.write(tempFile, content.getBytes());
+    java.nio.file.Path tempFile = Files.createTempFile("test-cache-manager", ".txt");
+    File temp = tempFile.toFile();
+    temp.deleteOnExit();
+    Path p = new Path("file:///" + tempFile.toAbsolutePath().toString());
 
-        RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-        LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
-        URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(p);
-        ret.setResource(yarnUrlFromPath);
-        ret.setSize(content.getBytes().length);
-        ret.setType(LocalResourceType.FILE);
-        ret.setVisibility(LocalResourceVisibility.PRIVATE);
-        ret.setTimestamp(fs.getFileStatus(p).getModificationTime());
-        return ret;
-    }
+    Files.write(tempFile, content.getBytes());
+
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(p);
+    ret.setResource(yarnUrlFromPath);
+    ret.setSize(content.getBytes().length);
+    ret.setType(LocalResourceType.FILE);
+    ret.setVisibility(LocalResourceVisibility.PRIVATE);
+    ret.setTimestamp(fs.getFileStatus(p).getModificationTime());
+    return ret;
+  }
 }