PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1765306 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0066953..0f00fc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,6 +48,8 @@
BUG FIXES
+PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini
+
PIG-5038: Pig Limit_2 e2e test failed with sort check (Konstantin_Harasov via rohini)
PIG-5039: TestTypeCheckingValidatorNewLP.TestTypeCheckingValidatorNewLP is failing (nkollar via knoguchi)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
index 96f3794..2cff2dd 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
@@ -26,21 +26,21 @@
Random rGen;
float[] probVec;
float epsilon = 0.0001f;
-
+
private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
-
- public DiscreteProbabilitySampleGenerator(float[] probVec) {
- rGen = new Random();
+
+ public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) {
+ rGen = new Random(seed);
float sum = 0.0f;
for (float f : probVec) {
sum += f;
}
this.probVec = probVec;
- if (1-epsilon > sum || sum > 1+epsilon) {
+ if (1-epsilon > sum || sum > 1+epsilon) {
LOG.info("Sum of probabilities should be near one: " + sum);
}
}
-
+
public int getNext(){
double toss = rGen.nextDouble();
// if the uniformly random number that I generated
@@ -57,13 +57,13 @@
toss -= probVec[i];
if(toss<=0.0)
return i;
- }
+ }
return lastIdx;
}
-
+
public static void main(String[] args) {
float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
- DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec);
+ DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec);
CountingMap<Integer> cm = new CountingMap<Integer>();
for(int i=0;i<100;i++){
cm.put(gen.getNext(), 1);
@@ -75,6 +75,6 @@
public String toString() {
return Arrays.toString(probVec);
}
-
-
+
+
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
index 37f77f7..94069f6 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
@@ -129,11 +130,13 @@
DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
convertToArray(quantilesList);
+ long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode();
+ long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
Tuple key = (Tuple)ent.getKey(); // sample item which repeats
float[] probVec = getProbVec((Tuple)ent.getValue());
weightedParts.put(getPigNullableWritable(key),
- new DiscreteProbabilitySampleGenerator(probVec));
+ new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
}
}
// else - the quantiles file is empty - unless we have a bug, the
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
index 11be86a..590e46b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
@@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
import org.apache.pig.data.DataBag;
@@ -30,6 +31,7 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
@@ -64,11 +66,13 @@
InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM);
convertToArray(quantilesList);
+ long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode();
+ long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
Tuple key = (Tuple) ent.getKey(); // sample item which repeats
float[] probVec = getProbVec((Tuple) ent.getValue());
weightedParts.put(getPigNullableWritable(key),
- new DiscreteProbabilitySampleGenerator(probVec));
+ new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/src/org/apache/pig/impl/builtin/GFCross.java b/src/org/apache/pig/impl/builtin/GFCross.java
index eeb0c9f..e9a44fe 100644
--- a/src/org/apache/pig/impl/builtin/GFCross.java
+++ b/src/org/apache/pig/impl/builtin/GFCross.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
@@ -42,7 +43,7 @@
private BagFactory mBagFactory = BagFactory.getInstance();
private TupleFactory mTupleFactory = TupleFactory.getInstance();
private int parallelism = 0;
- private Random r = new Random();
+ private Random r;
private String crossKey;
static private final int DEFAULT_PARALLELISM = 96;
@@ -69,6 +70,14 @@
if (parallelism < 0) {
throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey + " was " + parallelism);
}
+ long taskIdHashCode = cfg.get(MRConfiguration.TASK_ID).hashCode();
+ long seed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
+ r = new Random(seed);
+ } else {
+ // Don't see a case where cfg can be null.
+ // But there is an existing testcase TestGFCross.testDefault
+ // Using constant generated from task_14738102975522_0001_r_000000 hashcode
+ r = new Random(-4235927512599300514L);
}
numInputs = (Integer)input.get(0);
diff --git a/src/org/apache/pig/pen/LocalMapReduceSimulator.java b/src/org/apache/pig/pen/LocalMapReduceSimulator.java
index 416c78f..ef19a33 100644
--- a/src/org/apache/pig/pen/LocalMapReduceSimulator.java
+++ b/src/org/apache/pig/pen/LocalMapReduceSimulator.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Mapper;
@@ -35,6 +36,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
@@ -75,9 +77,9 @@
*
*/
public class LocalMapReduceSimulator {
-
+
private MapReduceLauncher launcher = new MapReduceLauncher();
-
+
private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();;
@SuppressWarnings("unchecked")
@@ -88,12 +90,12 @@
PigContext pc) throws PigException, IOException, InterruptedException {
phyToMRMap.clear();
MROperPlan mrp = launcher.compile(php, pc);
-
+
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-
+
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
+
JobControl jc;
int numMRJobsCompl = 0;
DataBag input;
@@ -106,6 +108,8 @@
boolean needFileInput;
final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
pc.getProperties().setProperty("pig.illustrating", "true");
+ String jtIdentifier = "" + System.currentTimeMillis();
+ int jobId = 0;
while(mrp.size() != 0) {
jc = jcc.compile(mrp, "Illustrator");
if(jc == null) {
@@ -113,6 +117,7 @@
}
List<Job> jobs = jc.getWaitingJobs();
for (Job job : jobs) {
+ jobId++;
jobConf = job.getJobConf();
FileLocalizer.setInitialized(false);
ArrayList<ArrayList<OperatorKey>> inpTargets =
@@ -123,14 +128,14 @@
PigSplit split = null;
List<POStore> stores = null;
PhysicalOperator pack = null;
- // revisit as there are new physical operators from MR compilation
+ // revisit as there are new physical operators from MR compilation
if (!mro.mapPlan.isEmpty())
attacher.revisit(mro.mapPlan);
if (!mro.reducePlan.isEmpty()) {
attacher.revisit(mro.reducePlan);
pack = mro.reducePlan.getRoots().get(0);
}
-
+
List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
if (!mro.mapPlan.isEmpty()) {
stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
@@ -145,10 +150,10 @@
for (POStore store : stores) {
output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
}
-
+
OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
oa.visit();
-
+
if (!mro.reducePlan.isEmpty()) {
oa = new OutputAttacher(mro.reducePlan, output);
oa.visit();
@@ -168,6 +173,7 @@
if (input != null)
mro.mapPlan.remove(ld);
}
+ int mapTaskId = 0;
for (POLoad ld : lds) {
// check newly generated data first
input = output.get(ld.getLFile().getFileName());
@@ -180,7 +186,7 @@
break;
}
}
- }
+ }
}
needFileInput = (input == null);
split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
@@ -199,6 +205,7 @@
context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
}
((PigMapBase) map).setMapPlan(mro.mapPlan);
+ context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
map.run(context);
} else {
if ("true".equals(jobConf.get("pig.usercomparator")))
@@ -210,10 +217,11 @@
Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
.getIllustratorContext(jobConf, input, intermediateData, split);
((PigMapBase) map).setMapPlan(mro.mapPlan);
+ context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
map.run(context);
}
}
-
+
if (!mro.reducePlan.isEmpty())
{
if (pack instanceof POPackage)
@@ -233,19 +241,20 @@
}
((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
+ context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString());
reduce.run(context);
}
for (PhysicalOperator key : mro.phyToMRMap.keySet())
for (PhysicalOperator value : mro.phyToMRMap.get(key))
phyToMRMap.put(key, value);
}
-
-
+
+
int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
-
+
numMRJobsCompl += removedMROp;
}
-
+
jcc.reset();
}
@@ -256,7 +265,7 @@
plan));
this.outputBuffer = output;
}
-
+
@Override
public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {
diff --git a/test/org/apache/pig/test/TestFindQuantiles.java b/test/org/apache/pig/test/TestFindQuantiles.java
index 44fc977..c04e999 100644
--- a/test/org/apache/pig/test/TestFindQuantiles.java
+++ b/test/org/apache/pig/test/TestFindQuantiles.java
@@ -27,7 +27,6 @@
import java.util.Map;
import java.util.Random;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
@@ -38,10 +37,10 @@
import org.junit.Test;
public class TestFindQuantiles {
-
+
private static TupleFactory tFact = TupleFactory.getInstance();
private static final float epsilon = 0.0001f;
-
+
@Test
public void testFindQuantiles() throws Exception {
final int numSamples = 97778;
@@ -50,7 +49,7 @@
System.out.println("sum: " + sum);
assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
}
-
+
@Test
public void testFindQuantiles2() throws Exception {
final int numSamples = 30000;
@@ -86,7 +85,7 @@
}
private float[] getProbVec(Tuple values) throws Exception {
- float[] probVec = new float[values.size()];
+ float[] probVec = new float[values.size()];
for(int i = 0; i < values.size(); i++) {
probVec[i] = (Float)values.get(i);
}
@@ -95,7 +94,7 @@
private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception {
Random rand = new Random(1000);
- List<Tuple> samples = new ArrayList<Tuple>();
+ List<Tuple> samples = new ArrayList<Tuple>();
for (int i=0; i<numSamples; i++) {
Tuple t = tFact.newTuple(1);
t.set(0, rand.nextInt(max));
@@ -106,7 +105,7 @@
}
private DataBag generateUniqueSamples(int numSamples) throws Exception {
- DataBag samples = BagFactory.getInstance().newDefaultBag();
+ DataBag samples = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<numSamples; i++) {
Tuple t = tFact.newTuple(1);
t.set(0, new Integer(23));
@@ -121,9 +120,9 @@
in.set(0, new Integer(numReduceres));
in.set(1, samples);
-
+
FindQuantiles fq = new FindQuantiles();
-
+
Map<String, Object> res = fq.exec(in);
return res;
}
@@ -135,12 +134,11 @@
InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS);
Iterator<Object> it = weightedPartsData.values().iterator();
float[] probVec = getProbVec((Tuple)it.next());
- new DiscreteProbabilitySampleGenerator(probVec);
float sum = 0.0f;
for (float f : probVec) {
sum += f;
}
return sum;
}
-
+
}
diff --git a/test/org/apache/pig/test/TestGFCross.java b/test/org/apache/pig/test/TestGFCross.java
index 95ee24b..84fcf30 100644
--- a/test/org/apache/pig/test/TestGFCross.java
+++ b/test/org/apache/pig/test/TestGFCross.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -50,6 +51,7 @@
public void testSerial() throws Exception {
Configuration cfg = new Configuration();
cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
+ cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000");
UDFContext.getUDFContext().addJobConf(cfg);
Tuple t = TupleFactory.getInstance().newTuple(2);
@@ -66,6 +68,7 @@
public void testParallelSet() throws Exception {
Configuration cfg = new Configuration();
cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
+ cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000");
UDFContext.getUDFContext().addJobConf(cfg);
Tuple t = TupleFactory.getInstance().newTuple(2);