PIG-5290: User Cache upload contention can cause job failures (xkrogen via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1807845 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 2dfdb96..bd1a7d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,6 +48,8 @@
BUG FIXES
+PIG-5290: User Cache upload contention can cause job failures (xkrogen via rohini)
+
PIG-5293: Suspicious code as missing `this' for a member (lifove via daijy)
PIG-5294: Spark unit tests are always run in spark1 mode (szita)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
index 1b6c2b5..4d3ab50 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
@@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Random;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -172,6 +173,9 @@
public static final String PIG_MAP_SEPARATOR = "_";
public HashMap<String, ArrayList<Pair<String,Long>>> globalCounters = new HashMap<String, ArrayList<Pair<String,Long>>>();
+ private static final Random RAND = new Random();
+ private static final String CACHE_TMP_FILE_TEMPLATE = "tmp%d.tmp";
+
public static final String SMALL_JOB_LOG_MSG = "This job was detected as a small job, will run in-process instead";
public static final String BIG_JOB_LOG_MSG = "This job cannot be converted run in-process";
@@ -1734,7 +1738,6 @@
URL url) throws IOException {
InputStream is1 = null;
InputStream is2 = null;
- OutputStream os = null;
try {
Path stagingDir = getCacheStagingDir(conf);
@@ -1757,10 +1760,22 @@
is2 = url.openStream();
short replication = (short)conf.getInt(PigConfiguration.PIG_USER_CACHE_REPLICATION,
conf.getInt("mapred.submit.replication", 10));
- os = fs.create(cacheFile, replication);
- fs.setPermission(cacheFile, FileLocalizer.OWNER_ONLY_PERMS);
- IOUtils.copyBytes(is2, os, 4096, true);
+ Path tempCacheFile = new Path(cacheDir,
+ String.format(CACHE_TMP_FILE_TEMPLATE, RAND.nextInt()));
+ try {
+ try (OutputStream os = fs.create(tempCacheFile, replication)) {
+ fs.setPermission(tempCacheFile, FileLocalizer.OWNER_ONLY_PERMS);
+ IOUtils.copyBytes(is2, os, 4096, true);
+ }
+ fs.rename(tempCacheFile, cacheFile);
+ } catch (IOException ioe) {
+ // Attempt some cleanup to avoid leaving tmp files around
+ if (fs.exists(tempCacheFile)) {
+ fs.delete(tempCacheFile, false);
+ }
+ throw ioe;
+ }
return cacheFile;
} catch (IOException ioe) {
@@ -1769,10 +1784,6 @@
} finally {
org.apache.commons.io.IOUtils.closeQuietly(is1);
org.apache.commons.io.IOUtils.closeQuietly(is2);
- // IOUtils should not close stream to HDFS quietly
- if (os != null) {
- os.close();
- }
}
}
diff --git a/test/e2e/pig/tests/nightly.conf b/test/e2e/pig/tests/nightly.conf
index 8c91b4e..4eb3ae1 100644
--- a/test/e2e/pig/tests/nightly.conf
+++ b/test/e2e/pig/tests/nightly.conf
@@ -4962,7 +4962,7 @@
'tests' => [
{
'num' => 1,
- 'java_params' => ['-Dopt.fetch=false'],
+ 'java_params' => ['-Dopt.fetch=false', '-Dpig.user.cache.enabled=true'],
'execonly' => 'mapred,tez,spark', # since distributed cache is not supported in local mode
'pig' => q?
register :FUNCPATH:/testudf.jar;
@@ -4979,6 +4979,7 @@
'tests' => [
{
'num' => 1,
+ 'java_params' => ['-Dpig.user.cache.enabled=true'],
'pig' => q?register :FUNCPATH:/testudf.jar;
define gm org.apache.pig.test.udf.evalfunc.GoodMonitored();
a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -4989,6 +4990,7 @@
store b into ':OUTPATH:';?,
},{
'num' => 2,
+ 'java_params' => ['-Dpig.user.cache.enabled=true'],
'pig' => q?register :FUNCPATH:/testudf.jar;
define bad org.apache.pig.test.udf.evalfunc.BadMonitored();
a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -5001,6 +5003,7 @@
store b into ':OUTPATH:';?,
},{
'num' => 3,
+ 'java_params' => ['-Dpig.user.cache.enabled=true'],
'pig' => q?register :FUNCPATH:/testudf.jar;
define bad org.apache.pig.test.udf.evalfunc.BadMonitored();
a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);