This change list includes several fixes:
1. Adds a rule to push subplan into group-by
2. Adds a rule to eliminate subplan with input cardinality one
3. Fix the nested running aggregate runtime
4. Adds a wrapper of FrameTupleAppender to internally flush full frames.
   A TODO item is to cleanup existing usage of FrameTupleAppender to use the wrapper, which makes code simpler.

Change-Id: I647f9bce2f40700b18bdcad1fa64fb8f0a26838b
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/149
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Preston Carman <ecarm002@ucr.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 08f985b..f3cf0a4 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -12,77 +12,86 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;

-

-import java.util.Collection;

-import java.util.HashSet;

-import java.util.List;

-import java.util.Set;

-

-import org.apache.commons.lang3.mutable.Mutable;

-

-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;

-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

-import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;

-import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;

-

-public class VariableUtilities {

-

-    public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)

-            throws AlgebricksException {

-        ILogicalOperatorVisitor<Void, Void> visitor = new UsedVariableVisitor(usedVariables);

-        op.accept(visitor, null);

-    }

-

-    public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)

-            throws AlgebricksException {

-        ILogicalOperatorVisitor<Void, Void> visitor = new ProducedVariableVisitor(producedVariables);

-        op.accept(visitor, null);

-    }

-

-    public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)

-            throws AlgebricksException {

-        ILogicalOperatorVisitor<Void, Void> visitor = new SchemaVariableVisitor(schemaVariables);

-        op.accept(visitor, null);

-    }

-

-    public static void getUsedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)

-            throws AlgebricksException {

-        // DFS traversal

-        VariableUtilities.getUsedVariables(op, vars);

-        for (Mutable<ILogicalOperator> c : op.getInputs()) {

-            getUsedVariablesInDescendantsAndSelf(c.getValue(), vars);

-        }

-    }

-

-    public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,

-            ITypingContext ctx) throws AlgebricksException {

-        substituteVariables(op, v1, v2, true, ctx);

-    }

-    

-    public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,

-            LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {

-        for (Mutable<ILogicalOperator> childOp : op.getInputs()) {

-            substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);

-        }

-        substituteVariables(op, v1, v2, true, ctx);

-    }

-    

-    public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,

-            boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {

-        ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(

-                goThroughNts, ctx);

-        op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));

-    }

-    

-    public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {

-        Set<T> varSet = new HashSet<T>();

-        Set<T> varArgSet = new HashSet<T>();

-        varSet.addAll(var);

-        varArgSet.addAll(varArg);

-        return varSet.equals(varArgSet);

-    }

-

-}

+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class VariableUtilities {
+
+    public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)
+            throws AlgebricksException {
+        ILogicalOperatorVisitor<Void, Void> visitor = new UsedVariableVisitor(usedVariables);
+        op.accept(visitor, null);
+    }
+
+    public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)
+            throws AlgebricksException {
+        ILogicalOperatorVisitor<Void, Void> visitor = new ProducedVariableVisitor(producedVariables);
+        op.accept(visitor, null);
+    }
+
+    public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)
+            throws AlgebricksException {
+        ILogicalOperatorVisitor<Void, Void> visitor = new SchemaVariableVisitor(schemaVariables);
+        op.accept(visitor, null);
+    }
+
+    public static void getUsedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
+            throws AlgebricksException {
+        // DFS traversal
+        VariableUtilities.getUsedVariables(op, vars);
+        for (Mutable<ILogicalOperator> c : op.getInputs()) {
+            getUsedVariablesInDescendantsAndSelf(c.getValue(), vars);
+        }
+    }
+
+    public static void getProducedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
+            throws AlgebricksException {
+        // DFS traversal
+        VariableUtilities.getProducedVariables(op, vars);
+        for (Mutable<ILogicalOperator> c : op.getInputs()) {
+            getProducedVariablesInDescendantsAndSelf(c.getValue(), vars);
+        }
+    }
+
+    public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
+            ITypingContext ctx) throws AlgebricksException {
+        substituteVariables(op, v1, v2, true, ctx);
+    }
+
+    public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,
+            LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
+            substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);
+        }
+        substituteVariables(op, v1, v2, true, ctx);
+    }
+
+    public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
+            boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
+        ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(
+                goThroughNts, ctx);
+        op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));
+    }
+
+    public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {
+        Set<T> varSet = new HashSet<T>();
+        Set<T> varArgSet = new HashSet<T>();
+        varSet.addAll(var);
+        varArgSet.addAll(varArg);
+        return varSet.equals(varArgSet);
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 0838291..6648252 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -21,6 +21,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -99,6 +100,55 @@
         }
     }
 
+    /**
+     * Adds the free variables of the operator path from
+     * op to dest, where dest is a direct/indirect input operator of op in the query plan.
+     * 
+     * @param op
+     *            , the start operator.
+     * @param dest
+     *            , the destination operator (a direct/indirect input operator).
+     * @param freeVars
+     *            - The collection to which the free variables will be added.
+     */
+    public static void getFreeVariablesInPath(ILogicalOperator op, ILogicalOperator dest, Set<LogicalVariable> freeVars)
+            throws AlgebricksException {
+        Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+        VariableUtilities.getLiveVariables(op, freeVars);
+        collectUsedAndProducedVariablesInPath(op, dest, freeVars, producedVars);
+        freeVars.removeAll(producedVars);
+    }
+
+    /**
+     * @param op
+     *            , the start operator.
+     * @param dest
+     *            , the destination operator (a direct/indirect input operator).
+     * @param usedVars
+     *            , the collection of used variables.
+     * @param producedVars
+     *            , the collection of produced variables.
+     * @return if the current operator is on the path from the original start operator to the destination operator.
+     * @throws AlgebricksException
+     */
+    private static boolean collectUsedAndProducedVariablesInPath(ILogicalOperator op, ILogicalOperator dest,
+            Set<LogicalVariable> usedVars, Set<LogicalVariable> producedVars) throws AlgebricksException {
+        if (op == dest) {
+            return true;
+        }
+        boolean onPath = false;
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            if (collectUsedAndProducedVariablesInPath(childRef.getValue(), dest, usedVars, producedVars)) {
+                onPath = true;
+            }
+        }
+        if (onPath) {
+            VariableUtilities.getUsedVariables(op, usedVars);
+            VariableUtilities.getProducedVariables(op, producedVars);
+        }
+        return onPath;
+    }
+
     public static void getFreeVariablesInSubplans(AbstractOperatorWithNestedPlans op, Set<LogicalVariable> freeVars)
             throws AlgebricksException {
         for (ILogicalPlan p : op.getNestedPlans()) {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java
new file mode 100644
index 0000000..1397956
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule eliminates a subplan with the following pattern:
+ * -- SUBPLAN
+ * -- OP (where OP produces exactly one tuple)
+ * The live variables at OP will not be used after SUBPLAN.
+ * Note: This rule must be applied after
+ * the RemoveRedundantVariablesRule (to avoid the lineage analysis of variable cardinality).
+ * 
+ * @author yingyib
+ */
+public class EliminateSubplanWithInputCardinalityOneRule implements IAlgebraicRewriteRule {
+    /** The pointer to the topmost operator */
+    private Mutable<ILogicalOperator> rootRef;
+    /** Whether the rule has even been invoked */
+    private boolean invoked = false;
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        if (!invoked) {
+            rootRef = opRef;
+            invoked = true;
+        }
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getInputs().size() <= 0) {
+            return false;
+        }
+        boolean changed = false;
+        for (Mutable<ILogicalOperator> subplanRef : op.getInputs()) {
+            AbstractLogicalOperator op1 = (AbstractLogicalOperator) subplanRef.getValue();
+            if (op1.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+                continue;
+            }
+
+            SubplanOperator subplan = (SubplanOperator) op1;
+            Set<LogicalVariable> usedVarsUp = new ListSet<LogicalVariable>();
+            OperatorPropertiesUtil.getFreeVariablesInPath(rootRef.getValue(), subplan, usedVarsUp);
+            // TODO(buyingyi): figure out the rewriting for subplan operators with multiple subplans.
+            if (subplan.getNestedPlans().size() != 1) {
+                continue;
+            }
+
+            ILogicalOperator subplanInputOperator = subplan.getInputs().get(0).getValue();
+            Set<LogicalVariable> subplanInputVars = new ListSet<LogicalVariable>();
+            VariableUtilities.getLiveVariables(subplanInputOperator, subplanInputVars);
+            int subplanInputVarSize = subplanInputVars.size();
+            subplanInputVars.removeAll(usedVarsUp);
+            // Makes sure the free variables are only used in the subplan.
+            if (subplanInputVars.size() < subplanInputVarSize) {
+                continue;
+            }
+            Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>();
+            OperatorPropertiesUtil.getFreeVariablesInSubplans(subplan, freeVars);
+            boolean cardinalityOne = isCardinalityOne(subplan.getInputs().get(0), freeVars);
+            if (cardinalityOne) {
+                /** If the cardinality of freeVars in the subplan is one, the subplan can be removed. */
+                ILogicalPlan plan = subplan.getNestedPlans().get(0);
+
+                List<Mutable<ILogicalOperator>> rootRefs = plan.getRoots();
+                // TODO(buyingyi): investigate the case of multi-root plans.
+                if (rootRefs.size() != 1) {
+                    continue;
+                }
+                Set<Mutable<ILogicalOperator>> ntsSet = new ListSet<Mutable<ILogicalOperator>>();
+                findNts(rootRefs.get(0), ntsSet);
+
+                /** Replaces nts with the input operator of the subplan. */
+                for (Mutable<ILogicalOperator> nts : ntsSet) {
+                    nts.setValue(subplanInputOperator);
+                }
+                subplanRef.setValue(rootRefs.get(0).getValue());
+                changed = true;
+            } else {
+                continue;
+            }
+        }
+        return changed;
+    }
+
+    /**
+     * Whether the cardinality of the input free variables are one.
+     * 
+     * @param opRef
+     *            the operator to be checked (including its input operators)
+     * @param freeVars
+     *            variables to be checked for produced operators
+     * @return true if every input variable has cardinality one; false otherwise.
+     * @throws AlgebricksException
+     */
+    private boolean isCardinalityOne(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> freeVars)
+            throws AlgebricksException {
+        Set<LogicalVariable> varsWithCardinalityOne = new ListSet<LogicalVariable>();
+        Set<LogicalVariable> varsLiveAtUnnestAndJoin = new ListSet<LogicalVariable>();
+        isCardinalityOne(opRef, freeVars, varsWithCardinalityOne, varsLiveAtUnnestAndJoin);
+        varsWithCardinalityOne.removeAll(varsLiveAtUnnestAndJoin);
+        return varsWithCardinalityOne.equals(freeVars);
+    }
+
+    /**
+     * Recursively adding variables which has cardinality one and in int the input free variable set.
+     * 
+     * @param opRef
+     *            , the current operator reference.
+     * @param freeVars
+     *            , a set of variables.
+     * @param varsWithCardinalityOne
+     *            , variables in the free variable set with cardinality one at the time they are created.
+     * @param varsLiveAtUnnestAndJoin
+     *            , live variables at Unnest and Join. The cardinalities of those variables can become more than one
+     *            even if their cardinalities were one at the time those variables were created.
+     * @throws AlgebricksException
+     */
+    private void isCardinalityOne(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> freeVars,
+            Set<LogicalVariable> varsWithCardinalityOne, Set<LogicalVariable> varsLiveAtUnnestAndJoin)
+            throws AlgebricksException {
+        AbstractLogicalOperator operator = (AbstractLogicalOperator) opRef.getValue();
+        List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getProducedVariables(operator, producedVars);
+        if (operator.getOperatorTag() == LogicalOperatorTag.UNNEST
+                || operator.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                || operator.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+            VariableUtilities.getLiveVariables(operator, varsLiveAtUnnestAndJoin);
+        }
+        if (operator.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+            for (LogicalVariable producedVar : producedVars) {
+                if (freeVars.contains(producedVar)) {
+                    varsWithCardinalityOne.add(producedVar);
+                }
+            }
+        }
+        if (varsWithCardinalityOne.size() == freeVars.size()) {
+            return;
+        }
+        for (Mutable<ILogicalOperator> childRef : operator.getInputs()) {
+            isCardinalityOne(childRef, freeVars, varsWithCardinalityOne, varsLiveAtUnnestAndJoin);
+        }
+    }
+
+    /**
+     * Find the NestedTupleSource operator in the direct/undirect input operators of opRef.
+     * 
+     * @param opRef
+     *            , the current operator reference.
+     * @param ntsSet
+     *            , the set NestedTupleSource operator references.
+     */
+    private void findNts(Mutable<ILogicalOperator> opRef, Set<Mutable<ILogicalOperator>> ntsSet) {
+        int childSize = opRef.getValue().getInputs().size();
+        if (childSize == 0) {
+            AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+            if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                ntsSet.add(opRef);
+            }
+            return;
+        }
+        for (Mutable<ILogicalOperator> childRef : opRef.getValue().getInputs()) {
+            findNts(childRef, ntsSet);
+        }
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
index 1c66317..ab7835d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
@@ -27,6 +27,7 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -160,6 +161,16 @@
                 testForNull = innerUnnest.getVariable();
                 break;
             }
+            case RUNNINGAGGREGATE: {
+                ILogicalOperator inputToRunningAggregate = right.getInputs().get(0).getValue();
+                Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+                VariableUtilities.getProducedVariables(inputToRunningAggregate, producedVars);
+                if (!producedVars.isEmpty()) {
+                    // Select [ $y != null ]
+                    testForNull = producedVars.iterator().next();
+                }
+                break;
+            }
             case DATASOURCESCAN: {
                 DataSourceScanOperator innerScan = (DataSourceScanOperator) right;
                 // Select [ $y != null ]
@@ -184,7 +195,8 @@
         IFunctionInfo finfoNot = context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.NOT);
         ScalarFunctionCallExpression nonNullTest = new ScalarFunctionCallExpression(finfoNot,
                 new MutableObject<ILogicalExpression>(isNullTest));
-        SelectOperator selectNonNull = new SelectOperator(new MutableObject<ILogicalExpression>(nonNullTest), false, null);
+        SelectOperator selectNonNull = new SelectOperator(new MutableObject<ILogicalExpression>(nonNullTest), false,
+                null);
         GroupByOperator g = new GroupByOperator();
         Mutable<ILogicalOperator> newSubplanRef = new MutableObject<ILogicalOperator>(subplan);
         NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(g));
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java
new file mode 100644
index 0000000..4dfede0
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule pushes a subplan on top of a group-by into the
+ * nested plan of the group-by.
+ * 
+ * @author yingyib
+ */
+public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator parentOperator = opRef.getValue();
+        if (parentOperator.getInputs().size() <= 0) {
+            return false;
+        }
+        boolean changed = false;
+        for (Mutable<ILogicalOperator> ref : parentOperator.getInputs()) {
+            AbstractLogicalOperator op = (AbstractLogicalOperator) ref.getValue();
+            /** Only processes subplan operator. */
+            if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+                AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+                /** Only processes the case a group-by operator is the input of the subplan operator. */
+                if (child.getOperatorTag() == LogicalOperatorTag.GROUP) {
+                    SubplanOperator subplan = (SubplanOperator) op;
+                    GroupByOperator gby = (GroupByOperator) child;
+                    List<ILogicalPlan> subplanNestedPlans = subplan.getNestedPlans();
+                    List<ILogicalPlan> gbyNestedPlans = gby.getNestedPlans();
+                    List<ILogicalPlan> subplanNestedPlansToRemove = new ArrayList<ILogicalPlan>();
+                    for (ILogicalPlan subplanNestedPlan : subplanNestedPlans) {
+                        List<Mutable<ILogicalOperator>> rootOpRefs = subplanNestedPlan.getRoots();
+                        List<Mutable<ILogicalOperator>> rootOpRefsToRemove = new ArrayList<Mutable<ILogicalOperator>>();
+                        for (Mutable<ILogicalOperator> rootOpRef : rootOpRefs) {
+                            /** Gets free variables in the root operator of a nested plan and its descent. */
+                            Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>();
+                            VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), freeVars);
+                            Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+                            VariableUtilities.getProducedVariablesInDescendantsAndSelf(rootOpRef.getValue(),
+                                    producedVars);
+                            freeVars.removeAll(producedVars);
+
+                            /**
+                             * Checks whether the above freeVars are all contained in live variables
+                             * of one nested plan inside the group-by operator.
+                             * If yes, then the subplan can be pushed into the nested plan of the group-by.
+                             */
+                            for (ILogicalPlan gbyNestedPlan : gbyNestedPlans) {
+                                List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots();
+                                for (Mutable<ILogicalOperator> gbyRootOpRef : gbyRootOpRefs) {
+                                    Set<LogicalVariable> liveVars = new ListSet<LogicalVariable>();
+                                    VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
+                                    if (liveVars.containsAll(freeVars)) {
+                                        /** Does the actual push. */
+                                        Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
+                                        ntsRef.setValue(gbyRootOpRef.getValue());
+                                        gbyRootOpRef.setValue(rootOpRef.getValue());
+                                        rootOpRefsToRemove.add(rootOpRef);
+                                        changed = true;
+                                    }
+                                }
+                            }
+                        }
+                        rootOpRefs.removeAll(rootOpRefsToRemove);
+                        if (rootOpRefs.size() == 0) {
+                            subplanNestedPlansToRemove.add(subplanNestedPlan);
+                        }
+                    }
+                    subplanNestedPlans.removeAll(subplanNestedPlansToRemove);
+                    if (subplanNestedPlans.size() == 0) {
+                        ref.setValue(gby);
+                    }
+                }
+            }
+        }
+        return changed;
+    }
+
+    private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> opRef) {
+        Mutable<ILogicalOperator> currentOpRef = opRef;
+        while (currentOpRef.getValue().getInputs().size() > 0) {
+            currentOpRef = currentOpRef.getValue().getInputs().get(0);
+        }
+        return currentOpRef;
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 918be11..c8dc852 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -37,9 +37,9 @@
 public class NestedPlansRunningAggregatorFactory implements IAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
-    private AlgebricksPipeline[] subplans;
-    private int[] keyFieldIdx;
-    private int[] decorFieldIdx;
+    private final AlgebricksPipeline[] subplans;
+    private final int[] keyFieldIdx;
+    private final int[] decorFieldIdx;
 
     public NestedPlansRunningAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx, int[] decorFieldIdx) {
         this.subplans = subplans;
@@ -93,7 +93,6 @@
                 for (int i = 0; i < pipelines.length; i++) {
                     outputWriter.setInputIdx(i);
                     pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
-                    pipelines[i].forceFlush();
                 }
             }
 
@@ -103,13 +102,16 @@
                 for (int i = 0; i < pipelines.length; i++) {
                     outputWriter.setInputIdx(i);
                     pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
-                    pipelines[i].forceFlush();
                 }
             }
 
             @Override
             public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
+                for (int i = 0; i < pipelines.length; ++i) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].close();
+                }
                 return false;
             }
 
@@ -131,14 +133,7 @@
 
             @Override
             public void close() {
-                for (int i = 0; i < pipelines.length; ++i) {
-                    try {
-                        outputWriter.setInputIdx(i);
-                        pipelines[i].close();
-                    } catch (HyracksDataException e) {
-                        throw new IllegalStateException(e);
-                    }
-                }
+
             }
         };
     }
@@ -165,15 +160,15 @@
 
     private static class RunningAggregatorOutput implements IFrameWriter {
 
-        private FrameTupleAccessor[] tAccess;
-        private RecordDescriptor[] inputRecDesc;
+        private final FrameTupleAccessor[] tAccess;
+        private final RecordDescriptor[] inputRecDesc;
         private int inputIdx;
-        private ArrayTupleBuilder tb;
-        private ArrayTupleBuilder gbyTb;
-        private AlgebricksPipeline[] subplans;
-        private IFrameWriter outputWriter;
-        private ByteBuffer outputFrame;
-        private FrameTupleAppender outputAppender;
+        private final ArrayTupleBuilder tb;
+        private final ArrayTupleBuilder gbyTb;
+        private final AlgebricksPipeline[] subplans;
+        private final IFrameWriter outputWriter;
+        private final ByteBuffer outputFrame;
+        private final FrameTupleAppender outputAppender;
 
         public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
                 int numDecors, IFrameWriter outputWriter) throws HyracksDataException {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 7ecb288..2e1171c 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 
@@ -76,9 +75,7 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
-                            outRecordDesc, groupFields, groupFields, writer);
-                    pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
+                    pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
                             outRecordDesc, writer);
                     pgw.open();
                 }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 600d641..154f2d1 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -28,15 +28,14 @@
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 
 public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
-    private int[] outColumns;
-    private IRunningAggregateEvaluatorFactory[] runningAggregates;
+    private final int[] outColumns;
+    private final IRunningAggregateEvaluatorFactory[] runningAggregates;
 
     /**
      * @param outColumns
@@ -83,17 +82,13 @@
         }
 
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
-            private IPointable p = VoidPointable.FACTORY.createPointable();
-            private IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
-            private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            private final IPointable p = VoidPointable.FACTORY.createPointable();
+            private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
+            private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
             private boolean first = true;
 
             @Override
             public void open() throws HyracksDataException {
-                if (!first) {
-                    FrameUtils.flushFrame(frame, writer);
-                    appender.reset(frame, true);
-                }
                 initAccessAppendRef(ctx);
                 if (first) {
                     first = false;
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
new file mode 100644
index 0000000..2de4256
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+/**
+ * This class wraps the calls of FrameTupleAppender and
+ * allows user to not worry about flushing full frames.
+ * TODO(yingyib): cleanup existing usage of FrameTupleAppender.
+ * 
+ * @author yingyib
+ */
+public class FrameTupleAppenderWrapper {
+    private final FrameTupleAppender frameTupleAppender;
+    private final ByteBuffer outputFrame;
+    private final IFrameWriter outputWriter;
+
+    public FrameTupleAppenderWrapper(FrameTupleAppender frameTupleAppender, ByteBuffer outputFrame,
+            IFrameWriter outputWriter) {
+        this.frameTupleAppender = frameTupleAppender;
+        this.outputFrame = outputFrame;
+        this.outputWriter = outputWriter;
+    }
+
+    public void open() throws HyracksDataException {
+        outputWriter.open();
+    }
+
+    public void flush() throws HyracksDataException {
+        if (frameTupleAppender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(outputFrame, outputWriter);
+        }
+    }
+
+    public void close() throws HyracksDataException {
+        outputWriter.close();
+    }
+
+    public void fail() throws HyracksDataException {
+        outputWriter.fail();
+    }
+
+    public void reset(ByteBuffer buffer, boolean clear) {
+        frameTupleAppender.reset(buffer, clear);
+    }
+
+    public void appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
+            throws HyracksDataException {
+        if (!frameTupleAppender.append(fieldSlots, bytes, offset, length)) {
+            FrameUtils.flushFrame(outputFrame, outputWriter);
+            frameTupleAppender.reset(outputFrame, true);
+            if (!frameTupleAppender.appendSkipEmptyField(fieldSlots, bytes, offset, length)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+    }
+
+    public void append(byte[] bytes, int offset, int length) throws HyracksDataException {
+        if (!frameTupleAppender.append(bytes, offset, length)) {
+            FrameUtils.flushFrame(outputFrame, outputWriter);
+            frameTupleAppender.reset(outputFrame, true);
+            if (!frameTupleAppender.append(bytes, offset, length)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+    }
+
+    public void append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) throws HyracksDataException {
+        if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
+            FrameUtils.flushFrame(outputFrame, outputWriter);
+            frameTupleAppender.reset(outputFrame, true);
+            if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+    }
+
+    public void append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
+        if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+            FrameUtils.flushFrame(outputFrame, outputWriter);
+            frameTupleAppender.reset(outputFrame, true);
+            if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+    }
+
+    public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
+            FrameUtils.flushFrame(outputFrame, outputWriter);
+            frameTupleAppender.reset(outputFrame, true);
+            if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+    }
+
+    public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
+            int dataLen1) throws HyracksDataException {
+        if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
+            FrameUtils.flushFrame(outputFrame, outputWriter);
+            frameTupleAppender.reset(outputFrame, true);
+            if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+    }
+
+    public void appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) throws HyracksDataException {
+        if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
+            FrameUtils.flushFrame(outputFrame, outputWriter);
+            frameTupleAppender.reset(outputFrame, true);
+            if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 71af928..9ce70c1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -24,11 +24,9 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-class PreclusteredGroupOperatorNodePushable extends
-        AbstractUnaryInputUnaryOutputOperatorNodePushable {
+class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
     private final IHyracksTaskContext ctx;
     private final int[] groupFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
@@ -54,15 +52,13 @@
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, groupFields, groupFields, writer);
         final ByteBuffer copyFrame = ctx.allocateFrame();
         final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
         copyFrameAccessor.reset(copyFrame);
         ByteBuffer outFrame = ctx.allocateFrame();
         final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
         appender.reset(outFrame, true);
-        pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDescriptor,
+        pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor,
                 outRecordDescriptor, writer);
         pgw.open();
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 114463f..b67e236 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -24,22 +24,22 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
 public class PreclusteredGroupWriter implements IFrameWriter {
     private final int[] groupFields;
     private final IBinaryComparator[] comparators;
     private final IAggregatorDescriptor aggregator;
     private final AggregateState aggregateState;
-    private final IFrameWriter writer;
     private final ByteBuffer copyFrame;
     private final FrameTupleAccessor inFrameAccessor;
     private final FrameTupleAccessor copyFrameAccessor;
 
-    private final ByteBuffer outFrame;
-    private final FrameTupleAppender appender;
+    private final FrameTupleAppenderWrapper appenderWrapper;
     private final ArrayTupleBuilder tupleBuilder;
     private boolean outputPartial = false;
 
@@ -48,35 +48,36 @@
     private boolean isFailed = false;
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
-            IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
-            IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
-        this(ctx, groupFields, comparators, aggregator, inRecordDesc, outRecordDesc, writer);
+            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
+        this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer);
         this.outputPartial = outputPartial;
     }
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
-            IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
-            IFrameWriter writer) throws HyracksDataException {
+            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
         this.groupFields = groupFields;
         this.comparators = comparators;
-        this.aggregator = aggregator;
+        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields,
+                groupFields, writer);
         this.aggregateState = aggregator.createAggregateStates();
-        this.writer = writer;
         copyFrame = ctx.allocateFrame();
         inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
         copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
         copyFrameAccessor.reset(copyFrame);
 
-        outFrame = ctx.allocateFrame();
-        appender = new FrameTupleAppender(ctx.getFrameSize());
+        ByteBuffer outFrame = ctx.allocateFrame();
+        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
         appender.reset(outFrame, true);
+        appenderWrapper = new FrameTupleAppenderWrapper(appender, outFrame, writer);
 
         tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
     }
 
     @Override
     public void open() throws HyracksDataException {
-        writer.open();
+        appenderWrapper.open();
         first = true;
     }
 
@@ -133,15 +134,9 @@
                 lastTupleIndex, aggregateState) : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor,
                 lastTupleIndex, aggregateState);
 
-        if (hasOutput
-                && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                        tupleBuilder.getSize())) {
-            FrameUtils.flushFrame(outFrame, writer);
-            appender.reset(outFrame, true);
-            if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                    tupleBuilder.getSize())) {
-                throw new HyracksDataException("The output cannot be fit into a frame.");
-            }
+        if (hasOutput) {
+            appenderWrapper.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize());
         }
 
     }
@@ -163,19 +158,17 @@
     @Override
     public void fail() throws HyracksDataException {
         isFailed = true;
-        writer.fail();
+        appenderWrapper.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
         if (!isFailed && !first) {
             writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(outFrame, writer);
-            }
+            appenderWrapper.flush();
         }
         aggregator.close();
         aggregateState.close();
-        writer.close();
+        appenderWrapper.close();
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
index 2a28dea..e695828 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
@@ -28,7 +28,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
@@ -111,9 +110,7 @@
         for (int i = 0; i < comparators.length; i++) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc,
-                groupFields, groupFields, writer);
-        PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator,
+        PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory,
                 this.inRecordDesc, this.outRecordDesc, writer, true);
         pgw.open();
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
index 1f9b358..2a580d3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
@@ -31,7 +31,6 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
@@ -55,8 +54,8 @@
     private ByteBuffer outFrame;
     private FrameTupleAppender outFrameAppender;
 
-    private IFrameSorter frameSorter; // Used in External sort, no replacement
-                                      // selection
+    private final IFrameSorter frameSorter; // Used in External sort, no replacement
+    // selection
 
     private final int[] groupFields;
     private final INormalizedKeyComputer firstKeyNkc;
@@ -67,7 +66,7 @@
 
     private final int[] mergeSortFields;
     private final int[] mergeGroupFields;
-    private IBinaryComparator[] groupByComparators;
+    private final IBinaryComparator[] groupByComparators;
 
     // Constructor for external sort, no replacement selection
     public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
@@ -115,10 +114,8 @@
 
     public void process() throws HyracksDataException {
         IAggregatorDescriptorFactory aggregatorFactory = localSide ? partialAggregatorFactory : mergeAggregatorFactory;
-        IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, partialAggRecordDesc, outRecordDesc,
-                groupFields, groupFields, writer);
-        PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, groupByComparators, aggregator,
-                inputRecordDesc, outRecordDesc, writer, false);
+        PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, groupByComparators,
+                aggregatorFactory, inputRecordDesc, outRecordDesc, writer, false);
         try {
             if (runs.size() <= 0) {
                 pgw.open();
@@ -149,9 +146,7 @@
                         IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
 
                         aggregatorFactory = localSide ? mergeAggregatorFactory : partialAggregatorFactory;
-                        aggregator = aggregatorFactory.createAggregator(ctx, partialAggRecordDesc,
-                                partialAggRecordDesc, mergeGroupFields, mergeGroupFields, mergeResultWriter);
-                        pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregator,
+                        pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregatorFactory,
                                 partialAggRecordDesc, partialAggRecordDesc, mergeResultWriter, true);
                         pgw.open();
 
@@ -166,10 +161,8 @@
                     }
                 }
                 if (!runs.isEmpty()) {
-                    aggregator = mergeAggregatorFactory.createAggregator(ctx, partialAggRecordDesc, outRecordDesc,
-                            mergeGroupFields, mergeGroupFields, writer);
-                    pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregator,
-                            partialAggRecordDesc, outRecordDesc, writer, false);
+                    pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators,
+                            mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, writer, false);
                     pgw.open();
                     IFrameReader[] runCursors = new RunFileReader[runs.size()];
                     for (int i = 0; i < runCursors.length; i++) {