PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1852183 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index dc92076..ceba83d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -87,6 +87,7 @@
OPTIMIZATIONS
BUG FIXES
+PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi)
PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
index 5917820..1209989 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
@@ -112,8 +112,6 @@
@Override
public void setConf(Configuration job) {
conf = job;
- PigMapReduce.sJobConfInternal.set(conf);
- PigMapReduce.sJobConf = conf;
}
@Override
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java b/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
index b485c56..ec931f6 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
@@ -93,10 +93,10 @@
conf.set("yarn.resourcemanager.principal", mapConf.get("yarn.resourcemanager.principal"));
}
- if (PigMapReduce.sJobConfInternal.get().get("fs.file.impl")!=null)
- conf.set("fs.file.impl", PigMapReduce.sJobConfInternal.get().get("fs.file.impl"));
- if (PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl")!=null)
- conf.set("fs.hdfs.impl", PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl"));
+ if (mapConf.get("fs.file.impl")!=null)
+ conf.set("fs.file.impl", mapConf.get("fs.file.impl"));
+ if (mapConf.get("fs.hdfs.impl")!=null)
+ conf.set("fs.hdfs.impl", mapConf.get("fs.hdfs.impl"));
copyTmpFileConfigurationValues(PigMapReduce.sJobConfInternal.get(), conf);
diff --git a/test/org/apache/pig/test/TestSkewedJoin.java b/test/org/apache/pig/test/TestSkewedJoin.java
index 947a31b..e1ad73c 100644
--- a/test/org/apache/pig/test/TestSkewedJoin.java
+++ b/test/org/apache/pig/test/TestSkewedJoin.java
@@ -207,7 +207,6 @@
assertEquals(0, count);
}
-
@Test
public void testSkewedJoinWithGroup() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
@@ -354,7 +353,7 @@
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
{
- pigServer.registerQuery("C = join A by id, B by id using 'skewed';");
+ pigServer.registerQuery("C = join A by id, B by id using 'skewed' parallel 2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
@@ -375,7 +374,7 @@
pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
{
- pigServer.registerQuery("C = join A by id left, B by id using 'skewed';");
+ pigServer.registerQuery("C = join A by id left, B by id using 'skewed' parallel 2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
@@ -383,7 +382,7 @@
}
}
{
- pigServer.registerQuery("C = join A by id right, B by id using 'skewed';");
+ pigServer.registerQuery("C = join A by id right, B by id using 'skewed' parallel 2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
@@ -391,7 +390,7 @@
}
}
{
- pigServer.registerQuery("C = join A by id full, B by id using 'skewed';");
+ pigServer.registerQuery("C = join A by id full, B by id using 'skewed' parallel 2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
@@ -413,7 +412,7 @@
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag();
{
- pigServer.registerQuery("E = join C by id, D by id using 'skewed';");
+ pigServer.registerQuery("E = join C by id, D by id using 'skewed' parallel 2;");
Iterator<Tuple> iter = pigServer.openIterator("E");
while(iter.hasNext()) {
@@ -487,7 +486,7 @@
pigServer.registerQuery("a = load 'left.dat' as (nums:chararray);");
pigServer.registerQuery("b = load 'right.dat' as (number:chararray,text:chararray);");
pigServer.registerQuery("c = filter a by nums == '7';");
- pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number USING 'skewed';");
+ pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number USING 'skewed' parallel 2;");
Iterator<Tuple> iter = pigServer.openIterator("d");
@@ -515,7 +514,7 @@
pigServer.registerQuery("a = load 'foo' as (nums:chararray);");
pigServer.registerQuery("b = load 'foo' as (nums:chararray);");
- pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed';");
+ pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed' parallel 2;");
Iterator<Tuple> iter = pigServer.openIterator("d");
int count = 0;
@@ -569,7 +568,7 @@
"exists = LOAD '" + INPUT_FILE2 + "' AS (a:long, x:chararray);" +
"missing = LOAD '/non/existing/directory' AS (a:long);" +
"missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" +
- "joined = JOIN exists BY a, missing BY a USING 'skewed';";
+ "joined = JOIN exists BY a, missing BY a USING 'skewed' parallel 2;";
String logFile = Util.createTempFileDelOnExit("tmp", ".log").getAbsolutePath();
Logger logger = Logger.getLogger("org.apache.pig");
@@ -619,4 +618,34 @@
}
}
+ // PIG-5372
+ @Test
+ public void testSkewedJoinWithRANDOMudf() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
+ pigServer.registerQuery("A2 = FOREACH A GENERATE id, RANDOM() as randnum;");
+
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("D = join A2 by id, B by id using 'skewed' parallel 2;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = join A2 by id, B by id;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ assertTrue(dbfrj.size()>0);
+ assertTrue(dbshj.size()>0);
+ assertEquals(dbfrj.size(), dbshj.size());
+ }
+
+
}