PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1765306 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0066953..0f00fc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,6 +48,8 @@
  
 BUG FIXES
 
+PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini
+
 PIG-5038: Pig Limit_2 e2e test failed with sort check (Konstantin_Harasov via rohini)
 
 PIG-5039: TestTypeCheckingValidatorNewLP.TestTypeCheckingValidatorNewLP is failing (nkollar via knoguchi)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
index 96f3794..2cff2dd 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
@@ -26,21 +26,21 @@
     Random rGen;
     float[] probVec;
     float epsilon = 0.0001f;
-        
+
     private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
-    
-    public DiscreteProbabilitySampleGenerator(float[] probVec) {
-        rGen = new Random();
+
+    public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) {
+        rGen = new Random(seed);
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         this.probVec = probVec;
-        if (1-epsilon > sum || sum > 1+epsilon) { 
+        if (1-epsilon > sum || sum > 1+epsilon) {
             LOG.info("Sum of probabilities should be near one: " + sum);
         }
     }
-    
+
     public int getNext(){
         double toss = rGen.nextDouble();
         // if the uniformly random number that I generated
@@ -57,13 +57,13 @@
             toss -= probVec[i];
             if(toss<=0.0)
                 return i;
-        }        
+        }
         return lastIdx;
     }
-    
+
     public static void main(String[] args) {
         float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
-        DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec);
+        DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec);
         CountingMap<Integer> cm = new CountingMap<Integer>();
         for(int i=0;i<100;i++){
             cm.put(gen.getNext(), 1);
@@ -75,6 +75,6 @@
     public String toString() {
         return Arrays.toString(probVec);
     }
-    
-    
+
+
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
index 37f77f7..94069f6 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
@@ -129,11 +130,13 @@
                 DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
                 InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
                 convertToArray(quantilesList);
+                long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode();
+                long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
                 for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                     Tuple key = (Tuple)ent.getKey(); // sample item which repeats
                     float[] probVec = getProbVec((Tuple)ent.getValue());
                     weightedParts.put(getPigNullableWritable(key),
-                            new DiscreteProbabilitySampleGenerator(probVec));
+                            new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
                 }
             }
             // else - the quantiles file is empty - unless we have a bug, the
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
index 11be86a..590e46b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.data.DataBag;
@@ -30,6 +31,7 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 
 public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
@@ -64,11 +66,13 @@
             InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
             estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM);
             convertToArray(quantilesList);
+            long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode();
+            long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
             for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                 Tuple key = (Tuple) ent.getKey(); // sample item which repeats
                 float[] probVec = getProbVec((Tuple) ent.getValue());
                 weightedParts.put(getPigNullableWritable(key),
-                        new DiscreteProbabilitySampleGenerator(probVec));
+                        new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git a/src/org/apache/pig/impl/builtin/GFCross.java b/src/org/apache/pig/impl/builtin/GFCross.java
index eeb0c9f..e9a44fe 100644
--- a/src/org/apache/pig/impl/builtin/GFCross.java
+++ b/src/org/apache/pig/impl/builtin/GFCross.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -42,7 +43,7 @@
     private BagFactory mBagFactory = BagFactory.getInstance();
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
     private int parallelism = 0;
-    private Random r = new Random();
+    private Random r;
     private String crossKey;
 
     static private final int DEFAULT_PARALLELISM = 96;
@@ -69,6 +70,14 @@
                 if (parallelism < 0) {
                     throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey  + " was " + parallelism);
                 }
+                long taskIdHashCode = cfg.get(MRConfiguration.TASK_ID).hashCode();
+                long seed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
+                r = new Random(seed);
+            } else {
+                // Don't see a case where cfg can be null.
+                // But there is an existing testcase TestGFCross.testDefault
+                // Using constant generated from task_14738102975522_0001_r_000000 hashcode
+                r = new Random(-4235927512599300514L);
             }
 
             numInputs = (Integer)input.get(0);
diff --git a/src/org/apache/pig/pen/LocalMapReduceSimulator.java b/src/org/apache/pig/pen/LocalMapReduceSimulator.java
index 416c78f..ef19a33 100644
--- a/src/org/apache/pig/pen/LocalMapReduceSimulator.java
+++ b/src/org/apache/pig/pen/LocalMapReduceSimulator.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -35,6 +36,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
@@ -75,9 +77,9 @@
  *
  */
 public class LocalMapReduceSimulator {
-    
+
     private MapReduceLauncher launcher = new MapReduceLauncher();
-    
+
     private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();;
 
     @SuppressWarnings("unchecked")
@@ -88,12 +90,12 @@
                               PigContext pc) throws PigException, IOException, InterruptedException {
         phyToMRMap.clear();
         MROperPlan mrp = launcher.compile(php, pc);
-                
+
         ConfigurationValidator.validatePigProperties(pc.getProperties());
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        
+
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        
+
         JobControl jc;
         int numMRJobsCompl = 0;
         DataBag input;
@@ -106,6 +108,8 @@
         boolean needFileInput;
         final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
         pc.getProperties().setProperty("pig.illustrating", "true");
+        String jtIdentifier = "" + System.currentTimeMillis();
+        int jobId = 0;
         while(mrp.size() != 0) {
             jc = jcc.compile(mrp, "Illustrator");
             if(jc == null) {
@@ -113,6 +117,7 @@
             }
             List<Job> jobs = jc.getWaitingJobs();
             for (Job job : jobs) {
+                jobId++;
                 jobConf = job.getJobConf();
                 FileLocalizer.setInitialized(false);
                 ArrayList<ArrayList<OperatorKey>> inpTargets =
@@ -123,14 +128,14 @@
                 PigSplit split = null;
                 List<POStore> stores = null;
                 PhysicalOperator pack = null;
-                // revisit as there are new physical operators from MR compilation 
+                // revisit as there are new physical operators from MR compilation
                 if (!mro.mapPlan.isEmpty())
                     attacher.revisit(mro.mapPlan);
                 if (!mro.reducePlan.isEmpty()) {
                     attacher.revisit(mro.reducePlan);
                     pack = mro.reducePlan.getRoots().get(0);
                 }
-                
+
                 List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
                 if (!mro.mapPlan.isEmpty()) {
                     stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
@@ -145,10 +150,10 @@
                 for (POStore store : stores) {
                     output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
                 }
-               
+
                 OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
                 oa.visit();
-                
+
                 if (!mro.reducePlan.isEmpty()) {
                     oa = new OutputAttacher(mro.reducePlan, output);
                     oa.visit();
@@ -168,6 +173,7 @@
                     if (input != null)
                         mro.mapPlan.remove(ld);
                 }
+                int mapTaskId = 0;
                 for (POLoad ld : lds) {
                     // check newly generated data first
                     input = output.get(ld.getLFile().getFileName());
@@ -180,7 +186,7 @@
                                      break;
                                 }
                             }
-                        } 
+                        }
                     }
                     needFileInput = (input == null);
                     split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
@@ -199,6 +205,7 @@
                             context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
                         }
                         ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                         map.run(context);
                     } else {
                         if ("true".equals(jobConf.get("pig.usercomparator")))
@@ -210,10 +217,11 @@
                         Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
                           .getIllustratorContext(jobConf, input, intermediateData, split);
                         ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                         map.run(context);
                     }
                 }
-                
+
                 if (!mro.reducePlan.isEmpty())
                 {
                     if (pack instanceof POPackage)
@@ -233,19 +241,20 @@
                     }
 
                     ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
+                    context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString());
                     reduce.run(context);
                 }
                 for (PhysicalOperator key : mro.phyToMRMap.keySet())
                     for (PhysicalOperator value : mro.phyToMRMap.get(key))
                         phyToMRMap.put(key, value);
             }
-            
-            
+
+
             int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
-            
+
             numMRJobsCompl += removedMROp;
         }
-                
+
         jcc.reset();
     }
 
@@ -256,7 +265,7 @@
                     plan));
             this.outputBuffer = output;
         }
-        
+
         @Override
         public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
             if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {
diff --git a/test/org/apache/pig/test/TestFindQuantiles.java b/test/org/apache/pig/test/TestFindQuantiles.java
index 44fc977..c04e999 100644
--- a/test/org/apache/pig/test/TestFindQuantiles.java
+++ b/test/org/apache/pig/test/TestFindQuantiles.java
@@ -27,7 +27,6 @@
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
@@ -38,10 +37,10 @@
 import org.junit.Test;
 
 public class TestFindQuantiles {
-    
+
     private static TupleFactory tFact = TupleFactory.getInstance();
     private static final float epsilon = 0.0001f;
-    
+
     @Test
     public void testFindQuantiles() throws Exception {
        final int numSamples = 97778;
@@ -50,7 +49,7 @@
        System.out.println("sum: " + sum);
        assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
     }
-    
+
     @Test
     public void testFindQuantiles2() throws Exception {
        final int numSamples = 30000;
@@ -86,7 +85,7 @@
     }
 
     private float[] getProbVec(Tuple values) throws Exception {
-        float[] probVec = new float[values.size()];        
+        float[] probVec = new float[values.size()];
         for(int i = 0; i < values.size(); i++) {
             probVec[i] = (Float)values.get(i);
         }
@@ -95,7 +94,7 @@
 
     private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception {
         Random rand = new Random(1000);
-        List<Tuple> samples = new ArrayList<Tuple>(); 
+        List<Tuple> samples = new ArrayList<Tuple>();
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, rand.nextInt(max));
@@ -106,7 +105,7 @@
     }
 
     private DataBag generateUniqueSamples(int numSamples) throws Exception {
-        DataBag samples = BagFactory.getInstance().newDefaultBag(); 
+        DataBag samples = BagFactory.getInstance().newDefaultBag();
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, new Integer(23));
@@ -121,9 +120,9 @@
 
         in.set(0, new Integer(numReduceres));
         in.set(1, samples);
-        
+
         FindQuantiles fq = new FindQuantiles();
-        
+
         Map<String, Object> res = fq.exec(in);
         return res;
     }
@@ -135,12 +134,11 @@
         InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS);
         Iterator<Object> it = weightedPartsData.values().iterator();
         float[] probVec = getProbVec((Tuple)it.next());
-        new DiscreteProbabilitySampleGenerator(probVec);
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         return sum;
     }
-    
+
 }
diff --git a/test/org/apache/pig/test/TestGFCross.java b/test/org/apache/pig/test/TestGFCross.java
index 95ee24b..84fcf30 100644
--- a/test/org/apache/pig/test/TestGFCross.java
+++ b/test/org/apache/pig/test/TestGFCross.java
@@ -20,6 +20,7 @@
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -50,6 +51,7 @@
     public void testSerial() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
+        cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 
@@ -66,6 +68,7 @@
     public void testParallelSet() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
+        cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);