PIG-5083: CombinerPackager and LitePackager should not materialize bags (rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1779325 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d150d62..1dd912c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -73,7 +73,9 @@
BUG FIXES
-PIG-5087 e2e Native3 failing after PIG-4923 (knoguchi)
+PIG-5083: CombinerPackager and LitePackager should not materialize bags (rohini)
+
+PIG-5087: e2e Native3 failing after PIG-4923 (knoguchi)
PIG-5073: Skip e2e Limit_5 test for Tez (knoguchi)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
index a9b9fd4..2df1157 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
@@ -49,7 +49,7 @@
private Map<Integer, Integer> keyLookup;
private int numBags;
-
+
private transient boolean initialized;
private transient boolean useDefaultBag;
@@ -77,6 +77,15 @@
}
}
+ @Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // Bag can be read directly and need not be materialized again
+ }
+
/**
* @param keyInfo the keyInfo to set
*/
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
index bc4aadd..fe97992 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
@@ -17,7 +17,7 @@
*/
/**
- *
+ *
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
@@ -28,6 +28,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +49,15 @@
private PigNullableWritable keyWritable;
@Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // Bag can be read directly and need not be materialized again
+ }
+
+ @Override
public boolean[] getInner() {
return null;
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
index 114b689..2914128 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
@@ -34,12 +34,16 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -64,6 +68,7 @@
private transient WritableComparator groupingComparator = null;
private transient Configuration conf;
private transient int accumulativeBatchSize;
+ private transient boolean readOnceOneBag;
public POShuffleTezLoad(POPackage pack) {
super(pack);
@@ -123,6 +128,11 @@
for (int i = 0; i < numTezInputs; i++) {
finished[i] = !readers.get(i).next();
}
+
+ this.readOnceOneBag = (numInputs == 1) && (pkgr instanceof CombinerPackager || pkgr instanceof LitePackager);
+ if (readOnceOneBag) {
+ readOnce[0] = true;
+ }
} catch (Exception e) {
throw new ExecException(e);
}
@@ -193,43 +203,47 @@
} else {
- for (int i = 0; i < numInputs; i++) {
- bags[i] = new InternalCachedBag(numInputs);
- }
-
- if (numTezInputs == 1) {
- do {
- Iterable<Object> vals = readers.get(0).getCurrentValues();
- for (Object val : vals) {
- NullableTuple nTup = (NullableTuple) val;
- int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
- bags[index].add(tup);
- }
- finished[0] = !readers.get(0).next();
- if (finished[0]) {
- break;
- }
- cur = readers.get(0).getCurrentKey();
- } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+ if (readOnceOneBag) {
+ bags[0] = new TezReadOnceBag(pkgr, min);
} else {
- for (int i = 0; i < numTezInputs; i++) {
- if (!finished[i]) {
- cur = readers.get(i).getCurrentKey();
- // We need to loop in case of Grouping Comparators
- while (groupingComparator.compare(min, cur) == 0) {
- Iterable<Object> vals = readers.get(i).getCurrentValues();
- for (Object val : vals) {
- NullableTuple nTup = (NullableTuple) val;
- int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
- bags[index].add(tup);
- }
- finished[i] = !readers.get(i).next();
- if (finished[i]) {
- break;
- }
+ for (int i = 0; i < numInputs; i++) {
+ bags[i] = new InternalCachedBag(numInputs);
+ }
+
+ if (numTezInputs == 1) {
+ do {
+ Iterable<Object> vals = readers.get(0).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[0] = !readers.get(0).next();
+ if (finished[0]) {
+ break;
+ }
+ cur = readers.get(0).getCurrentKey();
+ } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+ } else {
+ for (int i = 0; i < numTezInputs; i++) {
+ if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
+ // We need to loop in case of Grouping Comparators
+ while (groupingComparator.compare(min, cur) == 0) {
+ Iterable<Object> vals = readers.get(i).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[i] = !readers.get(i).next();
+ if (finished[i]) {
+ break;
+ }
+ cur = readers.get(i).getCurrentKey();
+ }
}
}
}
@@ -389,4 +403,74 @@
}
+ private class TezReadOnceBag extends ReadOnceBag {
+
+ private static final long serialVersionUID = 1L;
+ private Iterator<Object> iter;
+
+ public TezReadOnceBag(Packager pkgr,
+ PigNullableWritable currentKey) throws IOException {
+ this.pkgr = pkgr;
+ this.keyWritable = currentKey;
+ this.iter = readers.get(0).getCurrentValues().iterator();
+ }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new TezReadOnceBagIterator();
+ }
+
+ private class TezReadOnceBagIterator implements Iterator<Tuple> {
+
+ @Override
+ public boolean hasNext() {
+ if (iter.hasNext()) {
+ return true;
+ } else {
+ try {
+ finished[0] = !readers.get(0).next();
+ if (finished[0]) {
+ return false;
+ }
+ // Currently combiner is not being applied when secondary key(grouping comparator) is used
+ // But might change in future. So check if the next key is same and return its values
+ Object cur = readers.get(0).getCurrentKey();
+ if (groupingComparator.compare(keyWritable, cur) == 0) {
+ iter = readers.get(0).getCurrentValues().iterator();
+ // Key should at least have one value. But doing a check just for safety
+ if (iter.hasNext()) {
+ return true;
+ } else {
+ throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values");
+ }
+ }
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+ }
+ }
+ }
+
+ @Override
+ public Tuple next() {
+ NullableTuple ntup = (NullableTuple) iter.next();
+ int index = ntup.getIndex();
+ Tuple ret = null;
+ try {
+ ret = pkgr.getValueTuple(keyWritable, ntup, index);
+ } catch (ExecException e) {
+ throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
+ }
+ }
+
+ }
+
+
}
diff --git a/src/org/apache/pig/data/ReadOnceBag.java b/src/org/apache/pig/data/ReadOnceBag.java
index 1d8d637..0bdbb8c 100644
--- a/src/org/apache/pig/data/ReadOnceBag.java
+++ b/src/org/apache/pig/data/ReadOnceBag.java
@@ -50,6 +50,9 @@
*/
private static final long serialVersionUID = 2L;
+ public ReadOnceBag() {
+ }
+
/**
* This constructor creates a bag out of an existing iterator
* of tuples by taking ownership of the iterator and NOT
diff --git a/src/org/apache/pig/impl/io/NullableTuple.java b/src/org/apache/pig/impl/io/NullableTuple.java
index 85f1e9f..a6e49ae 100644
--- a/src/org/apache/pig/impl/io/NullableTuple.java
+++ b/src/org/apache/pig/impl/io/NullableTuple.java
@@ -57,6 +57,8 @@
public void readFields(DataInput in) throws IOException {
boolean nullness = in.readBoolean();
setNull(nullness);
+ // Free up the previous value for GC
+ mValue = null;
if (!nullness) {
mValue = bis.readTuple(in);
}