PIG-5290: User Cache upload contention can cause job failures (xkrogen via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1807845 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 2dfdb96..bd1a7d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,6 +48,8 @@
  
 BUG FIXES
 
+PIG-5290: User Cache upload contention can cause job failures (xkrogen via rohini)
+
 PIG-5293: Suspicious code as missing `this' for a member (lifove via daijy)
 
 PIG-5294: Spark unit tests are always run in spark1 mode (szita)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
index 1b6c2b5..4d3ab50 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
@@ -37,6 +37,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Random;
 import java.util.TreeMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -172,6 +173,9 @@
     public static final String PIG_MAP_SEPARATOR = "_";
     public HashMap<String, ArrayList<Pair<String,Long>>> globalCounters = new HashMap<String, ArrayList<Pair<String,Long>>>();
 
+    private static final Random RAND = new Random();
+    private static final String CACHE_TMP_FILE_TEMPLATE = "tmp%d.tmp";
+
     public static final String SMALL_JOB_LOG_MSG = "This job was detected as a small job, will run in-process instead";
     public static final String BIG_JOB_LOG_MSG = "This job cannot be converted run in-process";
 
@@ -1734,7 +1738,6 @@
             URL url) throws IOException {
         InputStream is1 = null;
         InputStream is2 = null;
-        OutputStream os = null;
 
         try {
             Path stagingDir = getCacheStagingDir(conf);
@@ -1757,10 +1760,22 @@
             is2 = url.openStream();
             short replication = (short)conf.getInt(PigConfiguration.PIG_USER_CACHE_REPLICATION,
                     conf.getInt("mapred.submit.replication", 10));
-            os = fs.create(cacheFile, replication);
-            fs.setPermission(cacheFile, FileLocalizer.OWNER_ONLY_PERMS);
-            IOUtils.copyBytes(is2, os, 4096, true);
+            Path tempCacheFile = new Path(cacheDir,
+                String.format(CACHE_TMP_FILE_TEMPLATE, RAND.nextInt()));
 
+            try {
+                try (OutputStream os = fs.create(tempCacheFile, replication)) {
+                    fs.setPermission(tempCacheFile, FileLocalizer.OWNER_ONLY_PERMS);
+                    IOUtils.copyBytes(is2, os, 4096, true);
+                }
+                fs.rename(tempCacheFile, cacheFile);
+            } catch (IOException ioe) {
+                // Attempt some cleanup to avoid leaving tmp files around
+                if (fs.exists(tempCacheFile)) {
+                    fs.delete(tempCacheFile, false);
+                }
+                throw ioe;
+            }
             return cacheFile;
 
         } catch (IOException ioe) {
@@ -1769,10 +1784,6 @@
         } finally {
             org.apache.commons.io.IOUtils.closeQuietly(is1);
             org.apache.commons.io.IOUtils.closeQuietly(is2);
-            // IOUtils should not close stream to HDFS quietly
-            if (os != null) {
-                os.close();
-            }
         }
     }
 
diff --git a/test/e2e/pig/tests/nightly.conf b/test/e2e/pig/tests/nightly.conf
index 8c91b4e..4eb3ae1 100644
--- a/test/e2e/pig/tests/nightly.conf
+++ b/test/e2e/pig/tests/nightly.conf
@@ -4962,7 +4962,7 @@
             'tests' => [
                 {
                     'num' => 1,
-                    'java_params' => ['-Dopt.fetch=false'],
+                    'java_params' => ['-Dopt.fetch=false', '-Dpig.user.cache.enabled=true'],
                     'execonly' => 'mapred,tez,spark', # since distributed cache is not supported in local mode
                     'pig' => q?
                         register :FUNCPATH:/testudf.jar;
@@ -4979,6 +4979,7 @@
                 'tests' => [
                     {
                         'num' => 1,
+                        'java_params' => ['-Dpig.user.cache.enabled=true'],
                         'pig' => q?register :FUNCPATH:/testudf.jar;
                                 define gm org.apache.pig.test.udf.evalfunc.GoodMonitored();
                                 a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -4989,6 +4990,7 @@
                                                 store b into ':OUTPATH:';?,
                     },{
                         'num' => 2,
+                        'java_params' => ['-Dpig.user.cache.enabled=true'],
                         'pig' => q?register :FUNCPATH:/testudf.jar;
                                 define bad org.apache.pig.test.udf.evalfunc.BadMonitored();
                                 a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -5001,6 +5003,7 @@
                                                 store b into ':OUTPATH:';?,
                     },{
                         'num' => 3,
+                        'java_params' => ['-Dpig.user.cache.enabled=true'],
                         'pig' => q?register :FUNCPATH:/testudf.jar;
                                 define bad org.apache.pig.test.udf.evalfunc.BadMonitored();
                                 a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);