PIG-5083: CombinerPackager and LitePackager should not materialize bags (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1779325 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d150d62..1dd912c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -73,7 +73,9 @@
  
 BUG FIXES
 
-PIG-5087 e2e Native3 failing after PIG-4923 (knoguchi)
+PIG-5083: CombinerPackager and LitePackager should not materialize bags (rohini)
+
+PIG-5087: e2e Native3 failing after PIG-4923 (knoguchi)
 
 PIG-5073: Skip e2e Limit_5 test for Tez (knoguchi)
 
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
index a9b9fd4..2df1157 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
@@ -49,7 +49,7 @@
     private Map<Integer, Integer> keyLookup;
 
     private int numBags;
-    
+
     private transient boolean initialized;
     private transient boolean useDefaultBag;
 
@@ -77,6 +77,15 @@
         }
     }
 
+    @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
     /**
      * @param keyInfo the keyInfo to set
      */
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
index bc4aadd..fe97992 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
@@ -17,7 +17,7 @@
  */
 
 /**
- * 
+ *
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
@@ -28,6 +28,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +49,15 @@
     private PigNullableWritable keyWritable;
 
     @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
+    @Override
     public boolean[] getInner() {
         return null;
     }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
index 114b689..2914128 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
@@ -34,12 +34,16 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -64,6 +68,7 @@
     private transient WritableComparator groupingComparator = null;
     private transient Configuration conf;
     private transient int accumulativeBatchSize;
+    private transient boolean readOnceOneBag;
 
     public POShuffleTezLoad(POPackage pack) {
         super(pack);
@@ -123,6 +128,11 @@
             for (int i = 0; i < numTezInputs; i++) {
                 finished[i] = !readers.get(i).next();
             }
+
+            this.readOnceOneBag = (numInputs == 1) && (pkgr instanceof CombinerPackager || pkgr instanceof LitePackager);
+            if (readOnceOneBag) {
+                readOnce[0] = true;
+            }
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -193,43 +203,47 @@
 
                 } else {
 
-                    for (int i = 0; i < numInputs; i++) {
-                        bags[i] = new InternalCachedBag(numInputs);
-                    }
-
-                    if (numTezInputs == 1) {
-                        do {
-                            Iterable<Object> vals = readers.get(0).getCurrentValues();
-                            for (Object val : vals) {
-                                NullableTuple nTup = (NullableTuple) val;
-                                int index = nTup.getIndex();
-                                Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
-                                bags[index].add(tup);
-                            }
-                            finished[0] = !readers.get(0).next();
-                            if (finished[0]) {
-                                break;
-                            }
-                            cur = readers.get(0).getCurrentKey();
-                        } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+                    if (readOnceOneBag) {
+                        bags[0] = new TezReadOnceBag(pkgr, min);
                     } else {
-                        for (int i = 0; i < numTezInputs; i++) {
-                            if (!finished[i]) {
-                                cur = readers.get(i).getCurrentKey();
-                                // We need to loop in case of Grouping Comparators
-                                while (groupingComparator.compare(min, cur) == 0) {
-                                    Iterable<Object> vals = readers.get(i).getCurrentValues();
-                                    for (Object val : vals) {
-                                        NullableTuple nTup = (NullableTuple) val;
-                                        int index = nTup.getIndex();
-                                        Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
-                                        bags[index].add(tup);
-                                    }
-                                    finished[i] = !readers.get(i).next();
-                                    if (finished[i]) {
-                                        break;
-                                    }
+                        for (int i = 0; i < numInputs; i++) {
+                            bags[i] = new InternalCachedBag(numInputs);
+                        }
+
+                        if (numTezInputs == 1) {
+                            do {
+                                Iterable<Object> vals = readers.get(0).getCurrentValues();
+                                for (Object val : vals) {
+                                    NullableTuple nTup = (NullableTuple) val;
+                                    int index = nTup.getIndex();
+                                    Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                    bags[index].add(tup);
+                                }
+                                finished[0] = !readers.get(0).next();
+                                if (finished[0]) {
+                                    break;
+                                }
+                                cur = readers.get(0).getCurrentKey();
+                            } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+                        } else {
+                            for (int i = 0; i < numTezInputs; i++) {
+                                if (!finished[i]) {
                                     cur = readers.get(i).getCurrentKey();
+                                    // We need to loop in case of Grouping Comparators
+                                    while (groupingComparator.compare(min, cur) == 0) {
+                                        Iterable<Object> vals = readers.get(i).getCurrentValues();
+                                        for (Object val : vals) {
+                                            NullableTuple nTup = (NullableTuple) val;
+                                            int index = nTup.getIndex();
+                                            Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                            bags[index].add(tup);
+                                        }
+                                        finished[i] = !readers.get(i).next();
+                                        if (finished[i]) {
+                                            break;
+                                        }
+                                        cur = readers.get(i).getCurrentKey();
+                                    }
                                 }
                             }
                         }
@@ -389,4 +403,74 @@
 
     }
 
+    private class TezReadOnceBag extends ReadOnceBag {
+
+        private static final long serialVersionUID = 1L;
+        private Iterator<Object> iter;
+
+        public TezReadOnceBag(Packager pkgr,
+                PigNullableWritable currentKey) throws IOException {
+            this.pkgr = pkgr;
+            this.keyWritable = currentKey;
+            this.iter = readers.get(0).getCurrentValues().iterator();
+        }
+
+        @Override
+        public Iterator<Tuple> iterator() {
+            return new TezReadOnceBagIterator();
+        }
+
+        private class TezReadOnceBagIterator implements Iterator<Tuple> {
+
+            @Override
+            public boolean hasNext() {
+                if (iter.hasNext()) {
+                    return true;
+                } else {
+                    try {
+                        finished[0] = !readers.get(0).next();
+                        if (finished[0]) {
+                            return false;
+                        }
+                        // Currently combiner is not being applied when secondary key(grouping comparator) is used
+                        // But might change in future. So check if the next key is same and return its values
+                        Object cur = readers.get(0).getCurrentKey();
+                        if (groupingComparator.compare(keyWritable, cur) == 0) {
+                            iter = readers.get(0).getCurrentValues().iterator();
+                            // Key should at least have one value. But doing a check just for safety
+                            if (iter.hasNext()) {
+                                return true;
+                            } else {
+                                throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values");
+                            }
+                        }
+                        return false;
+                    } catch (IOException e) {
+                        throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+                    }
+                }
+            }
+
+            @Override
+            public Tuple next() {
+                NullableTuple ntup = (NullableTuple) iter.next();
+                int index = ntup.getIndex();
+                Tuple ret = null;
+                try {
+                    ret = pkgr.getValueTuple(keyWritable, ntup, index);
+                } catch (ExecException e) {
+                    throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+                }
+                return ret;
+            }
+
+            @Override
+            public void remove() {
+                throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
+            }
+        }
+
+    }
+
+
 }
diff --git a/src/org/apache/pig/data/ReadOnceBag.java b/src/org/apache/pig/data/ReadOnceBag.java
index 1d8d637..0bdbb8c 100644
--- a/src/org/apache/pig/data/ReadOnceBag.java
+++ b/src/org/apache/pig/data/ReadOnceBag.java
@@ -50,6 +50,9 @@
      */
     private static final long serialVersionUID = 2L;
 
+    public ReadOnceBag() {
+    }
+
     /**
      * This constructor creates a bag out of an existing iterator
      * of tuples by taking ownership of the iterator and NOT
diff --git a/src/org/apache/pig/impl/io/NullableTuple.java b/src/org/apache/pig/impl/io/NullableTuple.java
index 85f1e9f..a6e49ae 100644
--- a/src/org/apache/pig/impl/io/NullableTuple.java
+++ b/src/org/apache/pig/impl/io/NullableTuple.java
@@ -57,6 +57,8 @@
     public void readFields(DataInput in) throws IOException {
         boolean nullness = in.readBoolean();
         setNull(nullness);
+        // Free up the previous value for GC
+        mValue = null;
         if (!nullness) {
             mValue = bis.readTuple(in);
         }