PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.17@1852184 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 10db1e8..a84534b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,7 @@
 OPTIMIZATIONS
  
 BUG FIXES
+PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi)
 
 PIG-5262: Fix jdiff related issues: fail build upon error, correct xml character escaping (szita)
 
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
index 5917820..1209989 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
@@ -112,8 +112,6 @@
     @Override
     public void setConf(Configuration job) {
         conf = job;
-        PigMapReduce.sJobConfInternal.set(conf);
-        PigMapReduce.sJobConf = conf;
     }
 
     @Override
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java b/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
index b485c56..ec931f6 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
@@ -93,10 +93,10 @@
             conf.set("yarn.resourcemanager.principal", mapConf.get("yarn.resourcemanager.principal"));
         }
 
-        if (PigMapReduce.sJobConfInternal.get().get("fs.file.impl")!=null)
-            conf.set("fs.file.impl", PigMapReduce.sJobConfInternal.get().get("fs.file.impl"));
-        if (PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl")!=null)
-            conf.set("fs.hdfs.impl", PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl"));
+        if (mapConf.get("fs.file.impl")!=null)
+            conf.set("fs.file.impl", mapConf.get("fs.file.impl"));
+        if (mapConf.get("fs.hdfs.impl")!=null)
+            conf.set("fs.hdfs.impl", mapConf.get("fs.hdfs.impl"));
 
         copyTmpFileConfigurationValues(PigMapReduce.sJobConfInternal.get(), conf);
 
diff --git a/test/org/apache/pig/test/TestSkewedJoin.java b/test/org/apache/pig/test/TestSkewedJoin.java
index 947a31b..e1ad73c 100644
--- a/test/org/apache/pig/test/TestSkewedJoin.java
+++ b/test/org/apache/pig/test/TestSkewedJoin.java
@@ -207,7 +207,6 @@
         assertEquals(0, count);
     }
 
-
     @Test
     public void testSkewedJoinWithGroup() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
@@ -354,7 +353,7 @@
         try {
             DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
             {
-                pigServer.registerQuery("C = join A by id, B by id using 'skewed';");
+                pigServer.registerQuery("C = join A by id, B by id using 'skewed' parallel 2;");
                 Iterator<Tuple> iter = pigServer.openIterator("C");
 
                 while(iter.hasNext()) {
@@ -375,7 +374,7 @@
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
         {
-            pigServer.registerQuery("C = join A by id left, B by id using 'skewed';");
+            pigServer.registerQuery("C = join A by id left, B by id using 'skewed' parallel 2;");
             Iterator<Tuple> iter = pigServer.openIterator("C");
 
             while(iter.hasNext()) {
@@ -383,7 +382,7 @@
             }
         }
         {
-            pigServer.registerQuery("C = join A by id right, B by id using 'skewed';");
+            pigServer.registerQuery("C = join A by id right, B by id using 'skewed' parallel 2;");
             Iterator<Tuple> iter = pigServer.openIterator("C");
 
             while(iter.hasNext()) {
@@ -391,7 +390,7 @@
             }
         }
         {
-            pigServer.registerQuery("C = join A by id full, B by id using 'skewed';");
+            pigServer.registerQuery("C = join A by id full, B by id using 'skewed' parallel 2;");
             Iterator<Tuple> iter = pigServer.openIterator("C");
 
             while(iter.hasNext()) {
@@ -413,7 +412,7 @@
 
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag();
         {
-            pigServer.registerQuery("E = join C by id, D by id using 'skewed';");
+            pigServer.registerQuery("E = join C by id, D by id using 'skewed' parallel 2;");
             Iterator<Tuple> iter = pigServer.openIterator("E");
 
             while(iter.hasNext()) {
@@ -487,7 +486,7 @@
         pigServer.registerQuery("a = load 'left.dat' as (nums:chararray);");
         pigServer.registerQuery("b = load 'right.dat' as (number:chararray,text:chararray);");
         pigServer.registerQuery("c = filter a by nums == '7';");
-        pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number USING 'skewed';");
+        pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number USING 'skewed' parallel 2;");
 
         Iterator<Tuple> iter = pigServer.openIterator("d");
 
@@ -515,7 +514,7 @@
 
         pigServer.registerQuery("a = load 'foo' as (nums:chararray);");
         pigServer.registerQuery("b = load 'foo' as (nums:chararray);");
-        pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed';");
+        pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed' parallel 2;");
 
         Iterator<Tuple> iter = pigServer.openIterator("d");
         int count = 0;
@@ -569,7 +568,7 @@
           "exists = LOAD '" + INPUT_FILE2 + "' AS (a:long, x:chararray);" +
           "missing = LOAD '/non/existing/directory' AS (a:long);" +
           "missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" +
-          "joined = JOIN exists BY a, missing BY a USING 'skewed';";
+          "joined = JOIN exists BY a, missing BY a USING 'skewed' parallel 2;";
 
         String logFile = Util.createTempFileDelOnExit("tmp", ".log").getAbsolutePath();
         Logger logger = Logger.getLogger("org.apache.pig");
@@ -619,4 +618,34 @@
         }
     }
 
+    // PIG-5372
+    @Test
+    public void testSkewedJoinWithRANDOMudf() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
+        pigServer.registerQuery("A2 = FOREACH A GENERATE id, RANDOM() as randnum;");
+
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("D = join A2 by id, B by id using 'skewed' parallel 2;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("D = join A2 by id, B by id;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        assertTrue(dbfrj.size()>0);
+        assertTrue(dbshj.size()>0);
+        assertEquals(dbfrj.size(), dbshj.size());
+    }
+
+
 }