Fixed the code format based on the Hyracks code format profile.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_spilling_groupby@309 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
index e036470..a99cba9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
@@ -50,44 +50,48 @@
         };
     }
 
-	@Override
-	public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
-		return new ISpillableFieldValueResultingAggregator() {
-			private int count;
+    @Override
+    public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
+        return new ISpillableFieldValueResultingAggregator() {
+            private int count;
 
-			@Override
-			public void output(DataOutput resultAcceptor)
-					throws HyracksDataException {
-				try {
-					resultAcceptor.writeInt(count);
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
+            @Override
+            public void output(DataOutput resultAcceptor) throws HyracksDataException {
+                try {
+                    resultAcceptor.writeInt(count);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
 
-			@Override
-			public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
-					throws HyracksDataException {
-				count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2 + accessor.getFieldStartOffset(tIndex, fIndex));
+            @Override
+            public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+                    throws HyracksDataException {
+                count = IntegerSerializerDeserializer.getInt(
+                        accessor.getBuffer().array(),
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                                + accessor.getFieldStartOffset(tIndex, fIndex));
 
-			}
+            }
 
-			@Override
-			public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				count++;
-			}
+            @Override
+            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                count++;
+            }
 
-			@Override
-			public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-				count = 0;
-			}
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                count = 0;
+            }
 
-			@Override
-			public void accumulatePartialResult(IFrameTupleAccessor accessor,
-					int tIndex, int fIndex) throws HyracksDataException {
-				count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2 + accessor.getFieldStartOffset(tIndex, fIndex));
-			}
-		};
-	}
+            @Override
+            public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+                    throws HyracksDataException {
+                count += IntegerSerializerDeserializer.getInt(
+                        accessor.getBuffer().array(),
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                                + accessor.getFieldStartOffset(tIndex, fIndex));
+            }
+        };
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
index 0b02378..7e6bc8c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
@@ -25,123 +25,111 @@
  * SUM aggregator on float type data.
  * 
  * @author jarodwen
- * 
  */
-public class FloatSumAggregatorFactory implements
-		IFieldValueResultingAggregatorFactory {
+public class FloatSumAggregatorFactory implements IFieldValueResultingAggregatorFactory {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
-	private int sumField;
+    private static final long serialVersionUID = 1L;
+    private int sumField;
 
-	public FloatSumAggregatorFactory(int field) {
-		this.sumField = field;
-	}
+    public FloatSumAggregatorFactory(int field) {
+        this.sumField = field;
+    }
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
-	 * IFieldValueResultingAggregatorFactory
-	 * #createFieldValueResultingAggregator()
-	 */
-	@Override
-	public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
-		return new IFieldValueResultingAggregator() {
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
+     * IFieldValueResultingAggregatorFactory
+     * #createFieldValueResultingAggregator()
+     */
+    @Override
+    public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
+        return new IFieldValueResultingAggregator() {
 
-			private float sum;
+            private float sum;
 
-			@Override
-			public void output(DataOutput resultAcceptor)
-					throws HyracksDataException {
-				try {
-					resultAcceptor.writeFloat(sum);
-				} catch (IOException ex) {
-					throw new HyracksDataException(ex);
-				}
-			}
+            @Override
+            public void output(DataOutput resultAcceptor) throws HyracksDataException {
+                try {
+                    resultAcceptor.writeFloat(sum);
+                } catch (IOException ex) {
+                    throw new HyracksDataException(ex);
+                }
+            }
 
-			@Override
-			public void init(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				sum = 0;
-			}
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                sum = 0;
+            }
 
-			@Override
-			public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				int tupleOffset = accessor.getTupleStartOffset(tIndex);
-				int fieldCount = accessor.getFieldCount();
-				int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
-				sum += FloatSerializerDeserializer.getFloat(accessor
-						.getBuffer().array(), tupleOffset + 2 * fieldCount
-						+ fieldStart);
-			}
-		};
-	}
+            @Override
+            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
+                sum += FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                        + fieldStart);
+            }
+        };
+    }
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
-	 * IFieldValueResultingAggregatorFactory
-	 * #createSpillableFieldValueResultingAggregator()
-	 */
-	@Override
-	public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
-		return new ISpillableFieldValueResultingAggregator() {
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
+     * IFieldValueResultingAggregatorFactory
+     * #createSpillableFieldValueResultingAggregator()
+     */
+    @Override
+    public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
+        return new ISpillableFieldValueResultingAggregator() {
 
-			private float sum;
+            private float sum;
 
-			@Override
-			public void output(DataOutput resultAcceptor)
-					throws HyracksDataException {
-				try {
-					resultAcceptor.writeFloat(sum);
-				} catch (IOException ex) {
-					throw new HyracksDataException(ex);
-				}
-			}
+            @Override
+            public void output(DataOutput resultAcceptor) throws HyracksDataException {
+                try {
+                    resultAcceptor.writeFloat(sum);
+                } catch (IOException ex) {
+                    throw new HyracksDataException(ex);
+                }
+            }
 
-			@Override
-			public void initFromPartial(IFrameTupleAccessor accessor,
-					int tIndex, int fIndex) throws HyracksDataException {
-				sum = FloatSerializerDeserializer.getFloat(
-						accessor.getBuffer().array(),
-						accessor.getTupleStartOffset(tIndex)
-								+ accessor.getFieldCount() * 2
-								+ accessor.getFieldStartOffset(tIndex, fIndex));
-			}
+            @Override
+            public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+                    throws HyracksDataException {
+                sum = FloatSerializerDeserializer.getFloat(
+                        accessor.getBuffer().array(),
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                                + accessor.getFieldStartOffset(tIndex, fIndex));
+            }
 
-			@Override
-			public void init(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				sum = 0;
-			}
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                sum = 0;
+            }
 
-			@Override
-			public void accumulatePartialResult(IFrameTupleAccessor accessor,
-					int tIndex, int fIndex) throws HyracksDataException {
-				sum += FloatSerializerDeserializer.getFloat(
-						accessor.getBuffer().array(),
-						accessor.getTupleStartOffset(tIndex)
-								+ accessor.getFieldCount() * 2
-								+ accessor.getFieldStartOffset(tIndex, fIndex));
-			}
+            @Override
+            public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+                    throws HyracksDataException {
+                sum += FloatSerializerDeserializer.getFloat(
+                        accessor.getBuffer().array(),
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                                + accessor.getFieldStartOffset(tIndex, fIndex));
+            }
 
-			@Override
-			public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				int tupleOffset = accessor.getTupleStartOffset(tIndex);
-				int fieldCount = accessor.getFieldCount();
-				int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
-				sum += FloatSerializerDeserializer.getFloat(accessor
-						.getBuffer().array(), tupleOffset + 2 * fieldCount
-						+ fieldStart);
-			}
-		};
-	}
+            @Override
+            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
+                sum += FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                        + fieldStart);
+            }
+        };
+    }
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
index b3ce8dd..1b7da7f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
@@ -18,6 +18,6 @@
 
 public interface IFieldValueResultingAggregatorFactory extends Serializable {
     public IFieldValueResultingAggregator createFieldValueResultingAggregator();
-    
+
     public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator();
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
index 95d4ac9..d52f458 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
@@ -20,32 +20,29 @@
 /**
  * An extended version of the {@link IFieldValueResultingAggregator} supporting
  * external aggregation.
- * 
  */
-public interface ISpillableFieldValueResultingAggregator extends
-		IFieldValueResultingAggregator {
+public interface ISpillableFieldValueResultingAggregator extends IFieldValueResultingAggregator {
 
-	/**
-	 * Called once per aggregator before calling accumulate for the first time.
-	 * 
-	 * @param accessor
-	 *            - Accessor to the data tuple.
-	 * @param tIndex
-	 *            - Index of the tuple in the accessor.
-	 * @throws HyracksDataException
-	 */
-	public void initFromPartial(IFrameTupleAccessor accessor, int tIndex,
-			int fIndex) throws HyracksDataException;
+    /**
+     * Called once per aggregator before calling accumulate for the first time.
+     * 
+     * @param accessor
+     *            - Accessor to the data tuple.
+     * @param tIndex
+     *            - Index of the tuple in the accessor.
+     * @throws HyracksDataException
+     */
+    public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex) throws HyracksDataException;
 
-	/**
-	 * Aggregate another partial result.
-	 * 
-	 * @param accessor
-	 * @param tIndex
-	 * @param fIndex
-	 * @throws HyracksDataException
-	 */
-	public void accumulatePartialResult(IFrameTupleAccessor accessor,
-			int tIndex, int fIndex) throws HyracksDataException;
+    /**
+     * Aggregate another partial result.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param fIndex
+     * @throws HyracksDataException
+     */
+    public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+            throws HyracksDataException;
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
index 95740b2..0c3b0d5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
@@ -23,127 +23,115 @@
 
 /**
  * Min/Max aggregator factory
- *
  */
-public class MinMaxAggregatorFactory implements
-		IFieldValueResultingAggregatorFactory {
+public class MinMaxAggregatorFactory implements IFieldValueResultingAggregatorFactory {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	/**
-	 * indicate the type of the value: true: max false: min
-	 */
-	private boolean type;
+    /**
+     * indicate the type of the value: true: max false: min
+     */
+    private boolean type;
 
-	/**
-	 * The field to be aggregated.
-	 */
-	private int field;
+    /**
+     * The field to be aggregated.
+     */
+    private int field;
 
-	public MinMaxAggregatorFactory(boolean type, int field) {
-		this.type = type;
-		this.field = field;
-	}
+    public MinMaxAggregatorFactory(boolean type, int field) {
+        this.type = type;
+        this.field = field;
+    }
 
-	@Override
-	public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
-		return new IFieldValueResultingAggregator() {
+    @Override
+    public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
+        return new IFieldValueResultingAggregator() {
 
-			private float minmax;
+            private float minmax;
 
-			@Override
-			public void output(DataOutput resultAcceptor)
-					throws HyracksDataException {
-				try {
-					resultAcceptor.writeFloat(minmax);
-				} catch (IOException ex) {
-					throw new HyracksDataException(ex);
-				}
-			}
+            @Override
+            public void output(DataOutput resultAcceptor) throws HyracksDataException {
+                try {
+                    resultAcceptor.writeFloat(minmax);
+                } catch (IOException ex) {
+                    throw new HyracksDataException(ex);
+                }
+            }
 
-			@Override
-			public void init(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				minmax = 0;
-			}
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                minmax = 0;
+            }
 
-			@Override
-			public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				int tupleOffset = accessor.getTupleStartOffset(tIndex);
-				int fieldCount = accessor.getFieldCount();
-				int fieldStart = accessor.getFieldStartOffset(tIndex, field);
-				float nval = FloatSerializerDeserializer.getFloat(accessor
-						.getBuffer().array(), tupleOffset + 2 * fieldCount
-						+ fieldStart);
-				if ((type ? (nval > minmax) : (nval < minmax))) {
-					minmax = nval;
-				}
-			}
-		};
-	}
+            @Override
+            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, field);
+                float nval = FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(), tupleOffset + 2
+                        * fieldCount + fieldStart);
+                if ((type ? (nval > minmax) : (nval < minmax))) {
+                    minmax = nval;
+                }
+            }
+        };
+    }
 
-	@Override
-	public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
-		return new ISpillableFieldValueResultingAggregator() {
+    @Override
+    public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
+        return new ISpillableFieldValueResultingAggregator() {
 
-			private float minmax;
+            private float minmax;
 
-			@Override
-			public void output(DataOutput resultAcceptor)
-					throws HyracksDataException {
-				try {
-					resultAcceptor.writeFloat(minmax);
-				} catch (IOException ex) {
-					throw new HyracksDataException(ex);
-				}
-			}
+            @Override
+            public void output(DataOutput resultAcceptor) throws HyracksDataException {
+                try {
+                    resultAcceptor.writeFloat(minmax);
+                } catch (IOException ex) {
+                    throw new HyracksDataException(ex);
+                }
+            }
 
-			@Override
-			public void initFromPartial(IFrameTupleAccessor accessor,
-					int tIndex, int fIndex) throws HyracksDataException {
-				minmax = FloatSerializerDeserializer.getFloat(
-						accessor.getBuffer().array(),
-						accessor.getTupleStartOffset(tIndex)
-								+ accessor.getFieldCount() * 2
-								+ accessor.getFieldStartOffset(tIndex, fIndex));
+            @Override
+            public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+                    throws HyracksDataException {
+                minmax = FloatSerializerDeserializer.getFloat(
+                        accessor.getBuffer().array(),
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                                + accessor.getFieldStartOffset(tIndex, fIndex));
 
-			}
+            }
 
-			@Override
-			public void init(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				minmax = 0;
-			}
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                minmax = 0;
+            }
 
-			@Override
-			public void accumulatePartialResult(IFrameTupleAccessor accessor,
-					int tIndex, int fIndex) throws HyracksDataException {
-				minmax = FloatSerializerDeserializer.getFloat(
-						accessor.getBuffer().array(),
-						accessor.getTupleStartOffset(tIndex)
-								+ accessor.getFieldCount() * 2
-								+ accessor.getFieldStartOffset(tIndex, fIndex));
+            @Override
+            public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+                    throws HyracksDataException {
+                minmax = FloatSerializerDeserializer.getFloat(
+                        accessor.getBuffer().array(),
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                                + accessor.getFieldStartOffset(tIndex, fIndex));
 
-			}
+            }
 
-			@Override
-			public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				int tupleOffset = accessor.getTupleStartOffset(tIndex);
-				int fieldCount = accessor.getFieldCount();
-				int fieldStart = accessor.getFieldStartOffset(tIndex, field);
-				float nval = FloatSerializerDeserializer.getFloat(accessor
-						.getBuffer().array(), tupleOffset + 2 * fieldCount
-						+ fieldStart);
-				if ((type ? (nval > minmax) : (nval < minmax))) {
-					minmax = nval;
-				}
-			}
-		};
-	}
+            @Override
+            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, field);
+                float nval = FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(), tupleOffset + 2
+                        * fieldCount + fieldStart);
+                if ((type ? (nval > minmax) : (nval < minmax))) {
+                    minmax = nval;
+                }
+            }
+        };
+    }
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
index b237d63..957c453 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
@@ -27,170 +27,149 @@
 import edu.uci.ics.hyracks.dataflow.std.group.ISpillableAccumulatingAggregator;
 
 public class MultiAggregatorFactory implements IAccumulatingAggregatorFactory {
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	private IFieldValueResultingAggregatorFactory[] aFactories;
+    private IFieldValueResultingAggregatorFactory[] aFactories;
 
-	public MultiAggregatorFactory(
-			IFieldValueResultingAggregatorFactory[] aFactories) {
-		this.aFactories = aFactories;
-	}
+    public MultiAggregatorFactory(IFieldValueResultingAggregatorFactory[] aFactories) {
+        this.aFactories = aFactories;
+    }
 
-	@Override
-	public ISpillableAccumulatingAggregator createSpillableAggregator(
-			IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
-			final RecordDescriptor outRecordDescriptor) {
-		final ISpillableFieldValueResultingAggregator aggregators[] = new ISpillableFieldValueResultingAggregator[aFactories.length];
-		for (int i = 0; i < aFactories.length; ++i) {
-			aggregators[i] = aFactories[i]
-					.createSpillableFieldValueResultingAggregator();
-		}
-		final ArrayTupleBuilder tb = new ArrayTupleBuilder(
-				outRecordDescriptor.getFields().length);
-		return new ISpillableAccumulatingAggregator() {
-			private boolean pending;
+    @Override
+    public ISpillableAccumulatingAggregator createSpillableAggregator(IHyracksStageletContext ctx,
+            RecordDescriptor inRecordDesc, final RecordDescriptor outRecordDescriptor) {
+        final ISpillableFieldValueResultingAggregator aggregators[] = new ISpillableFieldValueResultingAggregator[aFactories.length];
+        for (int i = 0; i < aFactories.length; ++i) {
+            aggregators[i] = aFactories[i].createSpillableFieldValueResultingAggregator();
+        }
+        final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+        return new ISpillableAccumulatingAggregator() {
+            private boolean pending;
 
-			@Override
-			public boolean output(FrameTupleAppender appender,
-					IFrameTupleAccessor accessor, int tIndex,
-					int[] keyFieldIndexes) throws HyracksDataException {
-				if (!pending) {
-					tb.reset();
-					for (int i = 0; i < keyFieldIndexes.length; ++i) {
-						tb.addField(accessor, tIndex, keyFieldIndexes[i]);
-					}
-					DataOutput dos = tb.getDataOutput();
-					for (int i = 0; i < aggregators.length; ++i) {
-						aggregators[i].output(dos);
-						tb.addFieldEndOffset();
-					}
-				}
-				if (!appender.append(tb.getFieldEndOffsets(),
-						tb.getByteArray(), 0, tb.getSize())) {
-					pending = true;
-					return false;
-				}
-				return true;
-			}
+            @Override
+            public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex,
+                    int[] keyFieldIndexes) throws HyracksDataException {
+                if (!pending) {
+                    tb.reset();
+                    for (int i = 0; i < keyFieldIndexes.length; ++i) {
+                        tb.addField(accessor, tIndex, keyFieldIndexes[i]);
+                    }
+                    DataOutput dos = tb.getDataOutput();
+                    for (int i = 0; i < aggregators.length; ++i) {
+                        aggregators[i].output(dos);
+                        tb.addFieldEndOffset();
+                    }
+                }
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    pending = true;
+                    return false;
+                }
+                return true;
+            }
 
-			@Override
-			public void init(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				tb.reset();
-				for (int i = 0; i < aggregators.length; ++i) {
-					aggregators[i].init(accessor, tIndex);
-				}
-				pending = false;
-			}
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                tb.reset();
+                for (int i = 0; i < aggregators.length; ++i) {
+                    aggregators[i].init(accessor, tIndex);
+                }
+                pending = false;
+            }
 
-			@Override
-			public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				for (int i = 0; i < aggregators.length; ++i) {
-					aggregators[i].accumulate(accessor, tIndex);
-				}
-			}
+            @Override
+            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                for (int i = 0; i < aggregators.length; ++i) {
+                    aggregators[i].accumulate(accessor, tIndex);
+                }
+            }
 
-			@Override
-			public void initFromPartial(IFrameTupleAccessor accessor,
-					int tIndex, int[] keyFieldIndexes)
-					throws HyracksDataException {
-				tb.reset();
-				for (int i = 0; i < aggregators.length; i++) {
-					aggregators[i].initFromPartial(accessor, tIndex,
-							keyFieldIndexes.length + i);
-				}
-				pending = false;
-			}
+            @Override
+            public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
+                    throws HyracksDataException {
+                tb.reset();
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].initFromPartial(accessor, tIndex, keyFieldIndexes.length + i);
+                }
+                pending = false;
+            }
 
-			@Override
-			public void accumulatePartialResult(IFrameTupleAccessor accessor,
-					int tIndex, int[] keyFieldIndexes)
-					throws HyracksDataException {
-				for (int i = 0; i < aggregators.length; i++) {
-					aggregators[i].accumulatePartialResult(accessor, tIndex,
-							keyFieldIndexes.length + i);
-				}
+            @Override
+            public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
+                    throws HyracksDataException {
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].accumulatePartialResult(accessor, tIndex, keyFieldIndexes.length + i);
+                }
 
-			}
+            }
 
-			@Override
-			public boolean output(FrameTupleAppender appender,
-					ArrayTupleBuilder tbder) throws HyracksDataException {
-				if (!pending) {
-					// TODO Here to be fixed:
-					DataOutput dos = tbder.getDataOutput();
-					for (int i = 0; i < aggregators.length; ++i) {
-						aggregators[i].output(dos);
-						tbder.addFieldEndOffset();
-					}
-				}
-				if (!appender.append(tbder.getFieldEndOffsets(),
-						tbder.getByteArray(), 0, tbder.getSize())) {
-					pending = true;
-					return false;
-				}
-				return true;
-			}
-		};
-	}
+            @Override
+            public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder) throws HyracksDataException {
+                if (!pending) {
+                    // TODO Here to be fixed:
+                    DataOutput dos = tbder.getDataOutput();
+                    for (int i = 0; i < aggregators.length; ++i) {
+                        aggregators[i].output(dos);
+                        tbder.addFieldEndOffset();
+                    }
+                }
+                if (!appender.append(tbder.getFieldEndOffsets(), tbder.getByteArray(), 0, tbder.getSize())) {
+                    pending = true;
+                    return false;
+                }
+                return true;
+            }
+        };
+    }
 
-	@Override
-	public IAccumulatingAggregator createAggregator(
-			IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
-			RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-		final IFieldValueResultingAggregator aggregators[] = new IFieldValueResultingAggregator[aFactories.length];
-		for (int i = 0; i < aFactories.length; ++i) {
-			aggregators[i] = aFactories[i]
-					.createFieldValueResultingAggregator();
-		}
-		final ArrayTupleBuilder tb = new ArrayTupleBuilder(
-				outRecordDescriptor.getFields().length);
-		return new IAccumulatingAggregator() {
+    @Override
+    public IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+        final IFieldValueResultingAggregator aggregators[] = new IFieldValueResultingAggregator[aFactories.length];
+        for (int i = 0; i < aFactories.length; ++i) {
+            aggregators[i] = aFactories[i].createFieldValueResultingAggregator();
+        }
+        final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+        return new IAccumulatingAggregator() {
 
-			private boolean pending;
+            private boolean pending;
 
-			@Override
-			public void init(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				tb.reset();
-				for (int i = 0; i < aggregators.length; ++i) {
-					aggregators[i].init(accessor, tIndex);
-				}
-				pending = false;
-			}
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                tb.reset();
+                for (int i = 0; i < aggregators.length; ++i) {
+                    aggregators[i].init(accessor, tIndex);
+                }
+                pending = false;
+            }
 
-			@Override
-			public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				for (int i = 0; i < aggregators.length; ++i) {
-					aggregators[i].accumulate(accessor, tIndex);
-				}
-			}
+            @Override
+            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                for (int i = 0; i < aggregators.length; ++i) {
+                    aggregators[i].accumulate(accessor, tIndex);
+                }
+            }
 
-			@Override
-			public boolean output(FrameTupleAppender appender,
-					IFrameTupleAccessor accessor, int tIndex,
-					int[] keyFieldIndexes) throws HyracksDataException {
-				if (!pending) {
-					tb.reset();
-					for (int i = 0; i < keyFieldIndexes.length; ++i) {
-						tb.addField(accessor, tIndex, keyFieldIndexes[i]);
-					}
-					DataOutput dos = tb.getDataOutput();
-					for (int i = 0; i < aggregators.length; ++i) {
-						aggregators[i].output(dos);
-						tb.addFieldEndOffset();
-					}
-				}
-				if (!appender.append(tb.getFieldEndOffsets(),
-						tb.getByteArray(), 0, tb.getSize())) {
-					pending = true;
-					return false;
-				}
-				return true;
-			}
+            @Override
+            public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex,
+                    int[] keyFieldIndexes) throws HyracksDataException {
+                if (!pending) {
+                    tb.reset();
+                    for (int i = 0; i < keyFieldIndexes.length; ++i) {
+                        tb.addField(accessor, tIndex, keyFieldIndexes[i]);
+                    }
+                    DataOutput dos = tb.getDataOutput();
+                    for (int i = 0; i < aggregators.length; ++i) {
+                        aggregators[i].output(dos);
+                        tb.addFieldEndOffset();
+                    }
+                }
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    pending = true;
+                    return false;
+                }
+                return true;
+            }
 
-		};
-	}
+        };
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
index cfb5293..64335f1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
@@ -24,123 +24,111 @@
 /**
  * SUM aggregator factory (for integer only; another SUM aggregator for floats
  * is available at {@link FloatSumAggregatorFactory})
- * 
  */
-public class SumAggregatorFactory implements
-		IFieldValueResultingAggregatorFactory {
+public class SumAggregatorFactory implements IFieldValueResultingAggregatorFactory {
 
-	private int sumField;
+    private int sumField;
 
-	public SumAggregatorFactory(int field) {
-		sumField = field;
-	}
+    public SumAggregatorFactory(int field) {
+        sumField = field;
+    }
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
-	 * IFieldValueResultingAggregatorFactory
-	 * #createFieldValueResultingAggregator()
-	 */
-	@Override
-	public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
-		return new IFieldValueResultingAggregator() {
-			private int sum;
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
+     * IFieldValueResultingAggregatorFactory
+     * #createFieldValueResultingAggregator()
+     */
+    @Override
+    public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
+        return new IFieldValueResultingAggregator() {
+            private int sum;
 
-			@Override
-			public void output(DataOutput resultAcceptor)
-					throws HyracksDataException {
-				try {
-					resultAcceptor.writeInt(sum);
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
+            @Override
+            public void output(DataOutput resultAcceptor) throws HyracksDataException {
+                try {
+                    resultAcceptor.writeInt(sum);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
 
-			@Override
-			public void init(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				sum++;
-			}
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                sum++;
+            }
 
-			@Override
-			public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				int tupleOffset = accessor.getTupleStartOffset(tIndex);
-				int fieldCount = accessor.getFieldCount();
-				int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
-				sum += IntegerSerializerDeserializer.getInt(accessor
-						.getBuffer().array(), tupleOffset + 2 * fieldCount
-						+ fieldStart);
-			}
-		};
-	}
+            @Override
+            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                        + fieldStart);
+            }
+        };
+    }
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see edu.uci.ics.hyracks.dataflow.std.aggregators.spillable.
-	 * ISpillableFieldValueResultingAggregatorFactory
-	 * #createFieldValueResultingAggregator()
-	 */
-	@Override
-	public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
-		return new ISpillableFieldValueResultingAggregator() {
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.spillable.
+     * ISpillableFieldValueResultingAggregatorFactory
+     * #createFieldValueResultingAggregator()
+     */
+    @Override
+    public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
+        return new ISpillableFieldValueResultingAggregator() {
 
-			private int sum;
+            private int sum;
 
-			@Override
-			public void output(DataOutput resultAcceptor)
-					throws HyracksDataException {
-				try {
-					resultAcceptor.writeInt(sum);
-				} catch (IOException ex) {
-					throw new HyracksDataException(ex);
-				}
-			}
+            @Override
+            public void output(DataOutput resultAcceptor) throws HyracksDataException {
+                try {
+                    resultAcceptor.writeInt(sum);
+                } catch (IOException ex) {
+                    throw new HyracksDataException(ex);
+                }
+            }
 
-			@Override
-			public void initFromPartial(IFrameTupleAccessor accessor,
-					int tIndex, int fIndex) throws HyracksDataException {
-				sum = IntegerSerializerDeserializer.getInt(
-						accessor.getBuffer().array(),
-						accessor.getTupleStartOffset(tIndex)
-								+ accessor.getFieldCount() * 2
-								+ accessor.getFieldStartOffset(tIndex, fIndex));
-			}
+            @Override
+            public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+                    throws HyracksDataException {
+                sum = IntegerSerializerDeserializer.getInt(
+                        accessor.getBuffer().array(),
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                                + accessor.getFieldStartOffset(tIndex, fIndex));
+            }
 
-			@Override
-			public void init(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				sum = 0;
-			}
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                sum = 0;
+            }
 
-			@Override
-			public void accumulatePartialResult(IFrameTupleAccessor accessor,
-					int tIndex, int fIndex) throws HyracksDataException {
-				sum += IntegerSerializerDeserializer.getInt(
-						accessor.getBuffer().array(),
-						accessor.getTupleStartOffset(tIndex)
-								+ accessor.getFieldCount() * 2
-								+ accessor.getFieldStartOffset(tIndex, fIndex));
-			}
+            @Override
+            public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+                    throws HyracksDataException {
+                sum += IntegerSerializerDeserializer.getInt(
+                        accessor.getBuffer().array(),
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                                + accessor.getFieldStartOffset(tIndex, fIndex));
+            }
 
-			@Override
-			public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				int tupleOffset = accessor.getTupleStartOffset(tIndex);
-				int fieldCount = accessor.getFieldCount();
-				int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
-				sum += IntegerSerializerDeserializer.getInt(accessor
-						.getBuffer().array(), tupleOffset + 2 * fieldCount
-						+ fieldStart);
-			}
-		};
-	}
+            @Override
+            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                        + fieldStart);
+            }
+        };
+    }
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 1bad6a7..530207d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -33,111 +33,104 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 /**
- * File writer to output plain text. 
- *
+ * File writer to output plain text.
  */
-public class PlainFileWriterOperatorDescriptor extends
-		AbstractSingleActivityOperatorDescriptor {
+public class PlainFileWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-	/**
-	 * 
-	 */
-	private static final long serialVersionUID = 1L;
-	
-	private IFileSplitProvider fileSplitProvider;
-	
-	private String delim;
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
 
-	/**
-	 * @param spec
-	 * @param inputArity
-	 * @param outputArity
-	 */
-	public PlainFileWriterOperatorDescriptor(JobSpecification spec, IFileSplitProvider fileSplitProvider, String delim) {
-		super(spec, 1, 0);
-		this.fileSplitProvider = fileSplitProvider;
-		this.delim = delim;
-	}
+    private IFileSplitProvider fileSplitProvider;
 
-	/* (non-Javadoc)
-	 * @see edu.uci.ics.hyracks.api.dataflow.IActivityNode#createPushRuntime(edu.uci.ics.hyracks.api.context.IHyracksContext, edu.uci.ics.hyracks.api.job.IOperatorEnvironment, edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int, int)
-	 */
-	@Override
-	public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx,
-			IOperatorEnvironment env,
-			IRecordDescriptorProvider recordDescProvider, final int partition,
-			int nPartitions) throws HyracksDataException {
-		// Output files
-		final FileSplit[] splits = fileSplitProvider.getFileSplits();
-		// Frame accessor
-		final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
-		// Record descriptor
-		final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-		return new AbstractUnaryInputSinkOperatorNodePushable() {
-			private BufferedWriter out;
-			
-			private ByteBufferInputStream bbis;
-			
-			private DataInputStream di;
-			
-			@Override
-			public void open() throws HyracksDataException {
-				try {
+    private String delim;
+
+    /**
+     * @param spec
+     * @param inputArity
+     * @param outputArity
+     */
+    public PlainFileWriterOperatorDescriptor(JobSpecification spec, IFileSplitProvider fileSplitProvider, String delim) {
+        super(spec, 1, 0);
+        this.fileSplitProvider = fileSplitProvider;
+        this.delim = delim;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.api.dataflow.IActivityNode#createPushRuntime(edu.uci.ics.hyracks.api.context.IHyracksContext, edu.uci.ics.hyracks.api.job.IOperatorEnvironment, edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int, int)
+     */
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+    throws HyracksDataException {
+        // Output files
+        final FileSplit[] splits = fileSplitProvider.getFileSplits();
+        // Frame accessor
+        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+        // Record descriptor
+        final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+            private BufferedWriter out;
+
+            private ByteBufferInputStream bbis;
+
+            private DataInputStream di;
+
+            @Override
+            public void open() throws HyracksDataException {
+                try {
                     out = new BufferedWriter(new FileWriter(splits[partition].getLocalFile().getFile()));
                     bbis = new ByteBufferInputStream();
                     di = new DataInputStream(bbis);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
-			}
-			
-			@Override
-			public void nextFrame(ByteBuffer buffer)
-					throws HyracksDataException {
-				try {
-					frameTupleAccessor.reset(buffer);
-					for (int tIndex = 0; tIndex < frameTupleAccessor
-							.getTupleCount(); tIndex++) {
-						int start = frameTupleAccessor
-								.getTupleStartOffset(tIndex)
-								+ frameTupleAccessor.getFieldSlotsLength();
-						bbis.setByteBuffer(buffer, start);
-						Object[] record = new Object[recordDescriptor
-								.getFields().length];
-						for (int i = 0; i < record.length; ++i) {
-							Object instance = recordDescriptor.getFields()[i]
-									.deserialize(di);
-							if (i == 0) {
-								out.write(String.valueOf(instance));
-							} else {
-								out.write(delim + String.valueOf(instance));
-							}
-						}
-						out.write("\n");
-					}
-				} catch (IOException ex) {
-					throw new HyracksDataException(ex);
-				}
-			}
-			
-			@Override
-			public void flush() throws HyracksDataException {
-				try {
-					out.flush();
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
-			
-			@Override
-			public void close() throws HyracksDataException {
-				try {
-					out.close();
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
-		};
-	}
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                try {
+                    frameTupleAccessor.reset(buffer);
+                    for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                        int start = frameTupleAccessor.getTupleStartOffset(tIndex)
+                        + frameTupleAccessor.getFieldSlotsLength();
+                        bbis.setByteBuffer(buffer, start);
+                        Object[] record = new Object[recordDescriptor.getFields().length];
+                        for (int i = 0; i < record.length; ++i) {
+                            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                            if (i == 0) {
+                                out.write(String.valueOf(instance));
+                            } else {
+                                out.write(delim + String.valueOf(instance));
+                            }
+                        }
+                        out.write("\n");
+                    }
+                } catch (IOException ex) {
+                    throw new HyracksDataException(ex);
+                }
+            }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                try {
+                    out.flush();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
index af78b7d..b13b087 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
@@ -52,717 +52,641 @@
 
 /**
  * This is an implementation of the external hash group operator.
- * 
  * The motivation of this operator is that when tuples are processed in
  * parallel, distinguished aggregating keys partitioned on one node may exceed
  * the main memory, so aggregation results should be output onto the disk to
  * make space for aggregating more input tuples.
- * 
- * 
  */
-public class ExternalHashGroupOperatorDescriptor extends
-		AbstractOperatorDescriptor {
+public class ExternalHashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
 
-	/**
-	 * The input frame identifier (in the job environment)
-	 */
-	private static final String GROUPTABLES = "gtables";
+    /**
+     * The input frame identifier (in the job environment)
+     */
+    private static final String GROUPTABLES = "gtables";
 
-	/**
-	 * The runs files identifier (in the job environment)
-	 */
-	private static final String RUNS = "runs";
+    /**
+     * The runs files identifier (in the job environment)
+     */
+    private static final String RUNS = "runs";
 
-	/**
-	 * The fields used for grouping (grouping keys).
-	 */
-	private final int[] keyFields;
+    /**
+     * The fields used for grouping (grouping keys).
+     */
+    private final int[] keyFields;
 
-	/**
-	 * The comparator for checking the grouping conditions, corresponding to the
-	 * {@link #keyFields}.
-	 */
-	private final IBinaryComparatorFactory[] comparatorFactories;
+    /**
+     * The comparator for checking the grouping conditions, corresponding to the {@link #keyFields}.
+     */
+    private final IBinaryComparatorFactory[] comparatorFactories;
 
-	/**
-	 * The aggregator factory for the aggregating field, corresponding to the
-	 * {@link #aggregateFields}.
-	 */
-	private IAccumulatingAggregatorFactory aggregatorFactory;
+    /**
+     * The aggregator factory for the aggregating field, corresponding to the {@link #aggregateFields}.
+     */
+    private IAccumulatingAggregatorFactory aggregatorFactory;
 
-	/**
-	 * The maximum number of frames in the main memory.
-	 */
-	private final int framesLimit;
+    /**
+     * The maximum number of frames in the main memory.
+     */
+    private final int framesLimit;
 
-	/**
-	 * Indicate whether the final output will be sorted or not.
-	 */
-	private final boolean sortOutput;
+    /**
+     * Indicate whether the final output will be sorted or not.
+     */
+    private final boolean sortOutput;
 
-	/**
-	 * Partition computer factory
-	 */
-	private final ITuplePartitionComputerFactory tpcf;
+    /**
+     * Partition computer factory
+     */
+    private final ITuplePartitionComputerFactory tpcf;
 
-	/**
-	 * The size of the in-memory table, which should be specified now by the
-	 * creator of this operator descriptor.
-	 */
-	private final int tableSize;
+    /**
+     * The size of the in-memory table, which should be specified now by the
+     * creator of this operator descriptor.
+     */
+    private final int tableSize;
 
-	/**
-	 * XXX Logger for debug information
-	 */
-	private static Logger LOGGER = Logger
-			.getLogger(ExternalHashGroupOperatorDescriptor.class.getName());
+    /**
+     * XXX Logger for debug information
+     */
+    private static Logger LOGGER = Logger.getLogger(ExternalHashGroupOperatorDescriptor.class.getName());
 
-	/**
-	 * Constructor of the external hash group operator descriptor.
-	 * 
-	 * @param spec
-	 * @param keyFields
-	 *            The fields as keys of grouping.
-	 * @param framesLimit
-	 *            The maximum number of frames to be used in memory.
-	 * @param sortOutput
-	 *            Whether the output should be sorted or not. Note that if the
-	 *            input data is large enough for external grouping, the output
-	 *            will be sorted surely. The only case that when the output is
-	 *            not sorted is when the size of the input data can be grouped
-	 *            in memory and this parameter is false.
-	 * @param tpcf
-	 *            The partitioner.
-	 * @param comparatorFactories
-	 *            The comparators.
-	 * @param aggregatorFactory
-	 *            The aggregators.
-	 * @param recordDescriptor
-	 *            The record descriptor for the input data.
-	 * @param tableSize
-	 *            The maximum size of the in memory table usable to this
-	 *            operator.
-	 */
-	public ExternalHashGroupOperatorDescriptor(JobSpecification spec,
-			int[] keyFields, int framesLimit, boolean sortOutput,
-			ITuplePartitionComputerFactory tpcf,
-			IBinaryComparatorFactory[] comparatorFactories,
-			IAccumulatingAggregatorFactory aggregatorFactory,
-			RecordDescriptor recordDescriptor, int tableSize) {
-		super(spec, 1, 1);
-		this.framesLimit = framesLimit;
-		if (framesLimit <= 1) {
-			// Minimum of 2 frames: 1 for input records, and 1 for output
-			// aggregation results.
-			throw new IllegalStateException();
-		}
-		this.aggregatorFactory = aggregatorFactory;
-		this.keyFields = keyFields;
-		this.comparatorFactories = comparatorFactories;
+    /**
+     * Constructor of the external hash group operator descriptor.
+     * 
+     * @param spec
+     * @param keyFields
+     *            The fields as keys of grouping.
+     * @param framesLimit
+     *            The maximum number of frames to be used in memory.
+     * @param sortOutput
+     *            Whether the output should be sorted or not. Note that if the
+     *            input data is large enough for external grouping, the output
+     *            will be sorted surely. The only case that when the output is
+     *            not sorted is when the size of the input data can be grouped
+     *            in memory and this parameter is false.
+     * @param tpcf
+     *            The partitioner.
+     * @param comparatorFactories
+     *            The comparators.
+     * @param aggregatorFactory
+     *            The aggregators.
+     * @param recordDescriptor
+     *            The record descriptor for the input data.
+     * @param tableSize
+     *            The maximum size of the in memory table usable to this
+     *            operator.
+     */
+    public ExternalHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+            boolean sortOutput, ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
+            IAccumulatingAggregatorFactory aggregatorFactory, RecordDescriptor recordDescriptor, int tableSize) {
+        super(spec, 1, 1);
+        this.framesLimit = framesLimit;
+        if (framesLimit <= 1) {
+            // Minimum of 2 frames: 1 for input records, and 1 for output
+            // aggregation results.
+            throw new IllegalStateException();
+        }
+        this.aggregatorFactory = aggregatorFactory;
+        this.keyFields = keyFields;
+        this.comparatorFactories = comparatorFactories;
 
-		this.sortOutput = sortOutput;
+        this.sortOutput = sortOutput;
 
-		this.tpcf = tpcf;
+        this.tpcf = tpcf;
 
-		this.tableSize = tableSize;
+        this.tableSize = tableSize;
 
-		// Set the record descriptor. Note that since this operator is a unary
-		// operator,
-		// only the first record descritpor is used here.
-		recordDescriptors[0] = recordDescriptor;
-	}
+        // Set the record descriptor. Note that since this operator is a unary
+        // operator,
+        // only the first record descritpor is used here.
+        recordDescriptors[0] = recordDescriptor;
+    }
 
-	/**
-	 * 
-	 */
-	private static final long serialVersionUID = 1L;
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeTaskGraph
-	 * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
-	 */
-	@Override
-	public void contributeTaskGraph(IActivityGraphBuilder builder) {
-		PartialAggregateActivity partialAggAct = new PartialAggregateActivity();
-		MergeActivity mergeAct = new MergeActivity();
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeTaskGraph
+     * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+     */
+    @Override
+    public void contributeTaskGraph(IActivityGraphBuilder builder) {
+        PartialAggregateActivity partialAggAct = new PartialAggregateActivity();
+        MergeActivity mergeAct = new MergeActivity();
 
-		builder.addTask(partialAggAct);
-		builder.addSourceEdge(0, partialAggAct, 0);
+        builder.addTask(partialAggAct);
+        builder.addSourceEdge(0, partialAggAct, 0);
 
-		builder.addTask(mergeAct);
-		builder.addTargetEdge(0, mergeAct, 0);
+        builder.addTask(mergeAct);
+        builder.addTargetEdge(0, mergeAct, 0);
 
-		// FIXME Block or not?
-		builder.addBlockingEdge(partialAggAct, mergeAct);
+        // FIXME Block or not?
+        builder.addBlockingEdge(partialAggAct, mergeAct);
 
-	}
+    }
 
-	private class PartialAggregateActivity extends AbstractActivityNode {
+    private class PartialAggregateActivity extends AbstractActivityNode {
 
-		/**
-		 * 
-		 */
-		private static final long serialVersionUID = 1L;
+        /**
+         * 
+         */
+        private static final long serialVersionUID = 1L;
 
-		@Override
-		public IOperatorNodePushable createPushRuntime(
-				final IHyracksStageletContext ctx,
-				final IOperatorEnvironment env,
-				final IRecordDescriptorProvider recordDescProvider,
-				int partition, int nPartitions) {
-			// Create the in-memory hash table
-			final SpillableGroupingHashTable gTable = new SpillableGroupingHashTable(
-					ctx, keyFields, comparatorFactories, tpcf,
-					aggregatorFactory,
-					recordDescProvider.getInputRecordDescriptor(
-							getOperatorId(), 0), recordDescriptors[0],
-					// Always take one frame for the input records
-					framesLimit - 1, tableSize);
-			// Create the tuple accessor
-			final FrameTupleAccessor accessor = new FrameTupleAccessor(
-					ctx.getFrameSize(),
-					recordDescProvider.getInputRecordDescriptor(
-							getOperatorId(), 0));
-			// Create the partial aggregate activity node
-			IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, final IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) {
+            // Create the in-memory hash table
+            final SpillableGroupingHashTable gTable = new SpillableGroupingHashTable(ctx, keyFields,
+                    comparatorFactories, tpcf, aggregatorFactory, recordDescProvider.getInputRecordDescriptor(
+                            getOperatorId(), 0), recordDescriptors[0],
+                            // Always take one frame for the input records
+                            framesLimit - 1, tableSize);
+            // Create the tuple accessor
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+            // Create the partial aggregate activity node
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
 
-				/**
-				 * Run files
-				 */
-				private LinkedList<RunFileReader> runs;
+                /**
+                 * Run files
+                 */
+                private LinkedList<RunFileReader> runs;
 
-				@Override
-				public void close() throws HyracksDataException {
-					if (gTable.getFrameCount() >= 0) {
-						if (runs.size() <= 0) {
-							// All in memory
-							env.set(GROUPTABLES, gTable);
-						} else {
-							// flush the memory into the run file.
-							flushFramesToRun();
-						}
-					}
-					env.set(RUNS, runs);
-				}
+                @Override
+                public void close() throws HyracksDataException {
+                    if (gTable.getFrameCount() >= 0) {
+                        if (runs.size() <= 0) {
+                            // All in memory
+                            env.set(GROUPTABLES, gTable);
+                        } else {
+                            // flush the memory into the run file.
+                            flushFramesToRun();
+                        }
+                    }
+                    env.set(RUNS, runs);
+                }
 
-				@Override
-				public void flush() throws HyracksDataException {
+                @Override
+                public void flush() throws HyracksDataException {
 
-				}
+                }
 
-				/**
-				 * Process the next input buffer.
-				 * 
-				 * The actual insertion is processed in {@link #gTable}. It will
-				 * check whether it is possible to contain the data into the
-				 * main memory or not. If not, it will indicate the operator to
-				 * flush the content of the table into a run file.
-				 */
-				@Override
-				public void nextFrame(ByteBuffer buffer)
-						throws HyracksDataException {
-					accessor.reset(buffer);
-					int tupleCount = accessor.getTupleCount();
-					for (int i = 0; i < tupleCount; i++) {
-						// If the group table is too large, flush the table into
-						// a run file.
-						if (!gTable.insert(accessor, i)) {
-							flushFramesToRun();
-							if (!gTable.insert(accessor, i))
-								throw new HyracksDataException(
-										"Failed to insert a new buffer into the aggregate operator!");
-						}
-					}
+                /**
+                 * Process the next input buffer.
+                 * The actual insertion is processed in {@link #gTable}. It will
+                 * check whether it is possible to contain the data into the
+                 * main memory or not. If not, it will indicate the operator to
+                 * flush the content of the table into a run file.
+                 */
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    accessor.reset(buffer);
+                    int tupleCount = accessor.getTupleCount();
+                    for (int i = 0; i < tupleCount; i++) {
+                        // If the group table is too large, flush the table into
+                        // a run file.
+                        if (!gTable.insert(accessor, i)) {
+                            flushFramesToRun();
+                            if (!gTable.insert(accessor, i))
+                                throw new HyracksDataException(
+                                "Failed to insert a new buffer into the aggregate operator!");
+                        }
+                    }
 
-				}
+                }
 
-				@Override
-				public void open() throws HyracksDataException {
-					runs = new LinkedList<RunFileReader>();
-					gTable.reset();
-				}
+                @Override
+                public void open() throws HyracksDataException {
+                    runs = new LinkedList<RunFileReader>();
+                    gTable.reset();
+                }
 
-				/**
-				 * Flush the content of the group table into a run file.
-				 * 
-				 * During the flushing, the hash table will be sorted as first.
-				 * After that, a run file handler is initialized and the hash
-				 * table is flushed into the run file.
-				 * 
-				 * @throws HyracksDataException
-				 */
-				private void flushFramesToRun() throws HyracksDataException {
-					// Sort the contents of the hash table.
-					gTable.sortFrames();
-					FileReference runFile;
-					try {
-						runFile = ctx.getJobletContext().createWorkspaceFile(
-								ExternalHashGroupOperatorDescriptor.class
-										.getSimpleName());
-					} catch (IOException e) {
-						throw new HyracksDataException(e);
-					}
-					RunFileWriter writer = new RunFileWriter(runFile,
-							ctx.getIOManager());
-					writer.open();
-					try {
-						gTable.flushFrames(writer, true);
-					} catch (Exception ex) {
-						throw new HyracksDataException(ex);
-					} finally {
-						writer.close();
-					}
-					gTable.reset();
-					runs.add(((RunFileWriter) writer).createReader());
-					LOGGER.warning("Created run file: "
-							+ runFile.getFile().getAbsolutePath());
-				}
+                /**
+                 * Flush the content of the group table into a run file.
+                 * During the flushing, the hash table will be sorted as first.
+                 * After that, a run file handler is initialized and the hash
+                 * table is flushed into the run file.
+                 * 
+                 * @throws HyracksDataException
+                 */
+                private void flushFramesToRun() throws HyracksDataException {
+                    // Sort the contents of the hash table.
+                    gTable.sortFrames();
+                    FileReference runFile;
+                    try {
+                        runFile = ctx.getJobletContext().createWorkspaceFile(
+                                ExternalHashGroupOperatorDescriptor.class.getSimpleName());
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
+                    RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+                    writer.open();
+                    try {
+                        gTable.flushFrames(writer, true);
+                    } catch (Exception ex) {
+                        throw new HyracksDataException(ex);
+                    } finally {
+                        writer.close();
+                    }
+                    gTable.reset();
+                    runs.add(((RunFileWriter) writer).createReader());
+                    LOGGER.warning("Created run file: " + runFile.getFile().getAbsolutePath());
+                }
 
-			};
+            };
 
-			return op;
-		}
+            return op;
+        }
 
-		@Override
-		public IOperatorDescriptor getOwner() {
-			return ExternalHashGroupOperatorDescriptor.this;
-		}
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return ExternalHashGroupOperatorDescriptor.this;
+        }
 
-	}
+    }
 
-	private class MergeActivity extends AbstractActivityNode {
+    private class MergeActivity extends AbstractActivityNode {
 
-		/**
-		 * 
-		 */
-		private static final long serialVersionUID = 1L;
+        /**
+         * 
+         */
+        private static final long serialVersionUID = 1L;
 
-		@Override
-		public IOperatorNodePushable createPushRuntime(
-				final IHyracksStageletContext ctx,
-				final IOperatorEnvironment env,
-				IRecordDescriptorProvider recordDescProvider, int partition,
-				int nPartitions) {
-			final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-			for (int i = 0; i < comparatorFactories.length; ++i) {
-				comparators[i] = comparatorFactories[i]
-						.createBinaryComparator();
-			}
-			IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-				/**
-				 * Input frames, one for each run file.
-				 */
-				private List<ByteBuffer> inFrames;
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) {
+            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+            for (int i = 0; i < comparatorFactories.length; ++i) {
+                comparators[i] = comparatorFactories[i].createBinaryComparator();
+            }
+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+                /**
+                 * Input frames, one for each run file.
+                 */
+                private List<ByteBuffer> inFrames;
 
-				/**
-				 * Output frame.
-				 */
-				private ByteBuffer outFrame;
+                /**
+                 * Output frame.
+                 */
+                private ByteBuffer outFrame;
 
-				/**
-				 * List of the run files to be merged
-				 */
-				LinkedList<RunFileReader> runs;
+                /**
+                 * List of the run files to be merged
+                 */
+                LinkedList<RunFileReader> runs;
 
-				/**
-				 * Tuple appender for the output frame {@link #outFrame}.
-				 */
-				private FrameTupleAppender outFrameAppender;
+                /**
+                 * Tuple appender for the output frame {@link #outFrame}.
+                 */
+                private FrameTupleAppender outFrameAppender;
 
-				private ISpillableAccumulatingAggregator visitingAggregator;
-				private ArrayTupleBuilder visitingKeyTuple;
+                private ISpillableAccumulatingAggregator visitingAggregator;
+                private ArrayTupleBuilder visitingKeyTuple;
 
-				@SuppressWarnings("unchecked")
-				@Override
-				public void initialize() throws HyracksDataException {
-					runs = (LinkedList<RunFileReader>) env.get(RUNS);
-					writer.open();
+                @SuppressWarnings("unchecked")
+                @Override
+                public void initialize() throws HyracksDataException {
+                    runs = (LinkedList<RunFileReader>) env.get(RUNS);
+                    writer.open();
 
-					try {
-						if (runs.size() <= 0) {
-							// If the aggregate results can be fit into
-							// memory...
-							SpillableGroupingHashTable gTable = (SpillableGroupingHashTable) env
-									.get(GROUPTABLES);
-							if (gTable != null) {
-								gTable.flushFrames(writer, sortOutput);
-							}
-							env.set(GROUPTABLES, null);
-						} else {
-							// Otherwise, merge the run files into a single file
-							inFrames = new ArrayList<ByteBuffer>();
-							outFrame = ctx.allocateFrame();
-							outFrameAppender = new FrameTupleAppender(
-									ctx.getFrameSize());
-							outFrameAppender.reset(outFrame, true);
-							for (int i = 0; i < framesLimit - 1; ++i) {
-								inFrames.add(ctx.allocateFrame());
-							}
-							int passCount = 0;
-							while (runs.size() > 0) {
-								passCount++;
-								try {
-									doPass(runs, passCount);
-								} catch (Exception e) {
-									throw new HyracksDataException(e);
-								}
-							}
-						}
+                    try {
+                        if (runs.size() <= 0) {
+                            // If the aggregate results can be fit into
+                            // memory...
+                            SpillableGroupingHashTable gTable = (SpillableGroupingHashTable) env.get(GROUPTABLES);
+                            if (gTable != null) {
+                                gTable.flushFrames(writer, sortOutput);
+                            }
+                            env.set(GROUPTABLES, null);
+                        } else {
+                            // Otherwise, merge the run files into a single file
+                            inFrames = new ArrayList<ByteBuffer>();
+                            outFrame = ctx.allocateFrame();
+                            outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+                            outFrameAppender.reset(outFrame, true);
+                            for (int i = 0; i < framesLimit - 1; ++i) {
+                                inFrames.add(ctx.allocateFrame());
+                            }
+                            int passCount = 0;
+                            while (runs.size() > 0) {
+                                passCount++;
+                                try {
+                                    doPass(runs, passCount);
+                                } catch (Exception e) {
+                                    throw new HyracksDataException(e);
+                                }
+                            }
+                        }
 
-					} finally {
-						writer.close();
-					}
-					env.set(RUNS, null);
-				}
+                    } finally {
+                        writer.close();
+                    }
+                    env.set(RUNS, null);
+                }
 
-				/**
-				 * Merge the run files once.
-				 * 
-				 * @param runs
-				 * @param passCount
-				 * @throws HyracksDataException
-				 * @throws IOException
-				 */
-				private void doPass(LinkedList<RunFileReader> runs,
-						int passCount) throws HyracksDataException, IOException {
-					FileReference newRun = null;
-					IFrameWriter writer = this.writer;
-					boolean finalPass = false;
+                /**
+                 * Merge the run files once.
+                 * 
+                 * @param runs
+                 * @param passCount
+                 * @throws HyracksDataException
+                 * @throws IOException
+                 */
+                private void doPass(LinkedList<RunFileReader> runs, int passCount) throws HyracksDataException,
+                IOException {
+                    FileReference newRun = null;
+                    IFrameWriter writer = this.writer;
+                    boolean finalPass = false;
 
-					int[] storedKeys = new int[keyFields.length];
-					// Get the list of the fields in the stored records.
-					for (int i = 0; i < keyFields.length; ++i) {
-						storedKeys[i] = i;
-					}
+                    int[] storedKeys = new int[keyFields.length];
+                    // Get the list of the fields in the stored records.
+                    for (int i = 0; i < keyFields.length; ++i) {
+                        storedKeys[i] = i;
+                    }
 
-					// Release the space not used
-					if (runs.size() + 1 <= framesLimit) {
-						// If there are run files no more than the available
-						// frame slots...
-						// No run file to be generated, since the result can be
-						// directly
-						// outputted into the output frame for write.
-						finalPass = true;
-						for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
-							inFrames.remove(i);
-						}
-					} else {
-						// Otherwise, a new run file will be created
-						newRun = ctx.getJobletContext().createWorkspaceFile(
-								ExternalHashGroupOperatorDescriptor.class
-										.getSimpleName());
-						writer = new RunFileWriter(newRun, ctx.getIOManager());
-						writer.open();
-					}
-					try {
-						// Create run file read handler for each input frame
-						RunFileReader[] runFileReaders = new RunFileReader[inFrames
-								.size()];
-						// Create input frame accessor
-						FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
-								.size()];
-						Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-						ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
-								ctx.getFrameSize(), recordDescriptors[0],
-								inFrames.size(), comparator);
-						// For the index of tuples visited in each frame.
-						int[] tupleIndexes = new int[inFrames.size()];
-						for (int i = 0; i < inFrames.size(); i++) {
-							tupleIndexes[i] = 0;
-							int runIndex = topTuples.peek().getRunid();
-							runFileReaders[runIndex] = runs.get(runIndex);
-							runFileReaders[runIndex].open();
-							// Load the first frame of the file into the main
-							// memory
-							if (runFileReaders[runIndex].nextFrame(inFrames
-									.get(runIndex))) {
-								// initialize the tuple accessor for the frame
-								tupleAccessors[runIndex] = new FrameTupleAccessor(
-										ctx.getFrameSize(),
-										recordDescriptors[0]);
-								tupleAccessors[runIndex].reset(inFrames
-										.get(runIndex));
-								setNextTopTuple(runIndex, tupleIndexes,
-										runFileReaders, tupleAccessors,
-										topTuples);
-							} else {
-								closeRun(runIndex, runFileReaders,
-										tupleAccessors);
-							}
-						}
-						// Merge
-						// Get a key holder for the current working
-						// aggregator keys
-						visitingAggregator = null;
-						visitingKeyTuple = null;
-						// Loop on all run files, and update the key
-						// holder.
-						while (!topTuples.areRunsExhausted()) {
-							// Get the top record
-							ReferenceEntry top = topTuples.peek();
-							int tupleIndex = top.getTupleIndex();
-							int runIndex = topTuples.peek().getRunid();
-							FrameTupleAccessor fta = top.getAccessor();
-							if (visitingAggregator == null) {
-								// Initialize the aggregator
-								visitingAggregator = aggregatorFactory
-										.createSpillableAggregator(ctx,
-												recordDescriptors[0],
-												recordDescriptors[0]);
-								// Initialize the partial aggregation result
-								visitingAggregator.initFromPartial(fta,
-										tupleIndex, keyFields);
-								visitingKeyTuple = new ArrayTupleBuilder(
-										recordDescriptors[0].getFields().length);
-								for (int i = 0; i < keyFields.length; i++) {
-									visitingKeyTuple.addField(fta, tupleIndex,
-											keyFields[i]);
-								}
-							} else {
-								if (compareTupleWithFrame(visitingKeyTuple,
-										fta, tupleIndex, storedKeys, keyFields,
-										comparators) == 0) {
-									// If the two partial results are on the
-									// same key
-									visitingAggregator.accumulatePartialResult(
-											fta, tupleIndex, keyFields);
-								} else {
-									// Otherwise, write the partial result back
-									// to the output frame
-									if (!visitingAggregator.output(
-											outFrameAppender, visitingKeyTuple)) {
-										FrameUtils.flushFrame(outFrame, writer);
-										outFrameAppender.reset(outFrame, true);
-										if (!visitingAggregator.output(
-												outFrameAppender,
-												visitingKeyTuple)) {
-											throw new IllegalStateException();
-										}
-									}
-									// Reset the partial aggregation result
-									visitingAggregator.initFromPartial(fta,
-											tupleIndex, keyFields);
-									visitingKeyTuple.reset();
-									for (int i = 0; i < keyFields.length; i++) {
-										visitingKeyTuple.addField(fta,
-												tupleIndex, keyFields[i]);
-									}
-								}
-							}
-							tupleIndexes[runIndex]++;
-							setNextTopTuple(runIndex, tupleIndexes,
-									runFileReaders, tupleAccessors, topTuples);
-						}
-						// Output the last aggregation result in the frame
-						if (visitingAggregator != null) {
-							if (!visitingAggregator.output(outFrameAppender,
-									visitingKeyTuple)) {
-								FrameUtils.flushFrame(outFrame, writer);
-								outFrameAppender.reset(outFrame, true);
-								if (!visitingAggregator.output(
-										outFrameAppender, visitingKeyTuple)) {
-									throw new IllegalStateException();
-								}
-							}
-						}
-						// Output data into run file writer after all tuples
-						// have been checked
-						if (outFrameAppender.getTupleCount() > 0) {
-							FrameUtils.flushFrame(outFrame, writer);
-							outFrameAppender.reset(outFrame, true);
-						}
-						// empty the input frames
-						runs.subList(0, inFrames.size()).clear();
-						// insert the new run file into the beginning of the run
-						// file list
-						if (!finalPass) {
-							runs.add(0, ((RunFileWriter) writer).createReader());
-						}
-					} catch (Exception ex) {
-						throw new HyracksDataException(ex);
-					} finally {
-						if (!finalPass) {
-							writer.close();
-						}
-					}
-				}
+                    // Release the space not used
+                    if (runs.size() + 1 <= framesLimit) {
+                        // If there are run files no more than the available
+                        // frame slots...
+                        // No run file to be generated, since the result can be
+                        // directly
+                        // outputted into the output frame for write.
+                        finalPass = true;
+                        for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+                            inFrames.remove(i);
+                        }
+                    } else {
+                        // Otherwise, a new run file will be created
+                        newRun = ctx.getJobletContext().createWorkspaceFile(
+                                ExternalHashGroupOperatorDescriptor.class.getSimpleName());
+                        writer = new RunFileWriter(newRun, ctx.getIOManager());
+                        writer.open();
+                    }
+                    try {
+                        // Create run file read handler for each input frame
+                        RunFileReader[] runFileReaders = new RunFileReader[inFrames.size()];
+                        // Create input frame accessor
+                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+                        Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
+                                recordDescriptors[0], inFrames.size(), comparator);
+                        // For the index of tuples visited in each frame.
+                        int[] tupleIndexes = new int[inFrames.size()];
+                        for (int i = 0; i < inFrames.size(); i++) {
+                            tupleIndexes[i] = 0;
+                            int runIndex = topTuples.peek().getRunid();
+                            runFileReaders[runIndex] = runs.get(runIndex);
+                            runFileReaders[runIndex].open();
+                            // Load the first frame of the file into the main
+                            // memory
+                            if (runFileReaders[runIndex].nextFrame(inFrames.get(runIndex))) {
+                                // initialize the tuple accessor for the frame
+                                tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
+                                        recordDescriptors[0]);
+                                tupleAccessors[runIndex].reset(inFrames.get(runIndex));
+                                setNextTopTuple(runIndex, tupleIndexes, runFileReaders, tupleAccessors, topTuples);
+                            } else {
+                                closeRun(runIndex, runFileReaders, tupleAccessors);
+                            }
+                        }
+                        // Merge
+                        // Get a key holder for the current working
+                        // aggregator keys
+                        visitingAggregator = null;
+                        visitingKeyTuple = null;
+                        // Loop on all run files, and update the key
+                        // holder.
+                        while (!topTuples.areRunsExhausted()) {
+                            // Get the top record
+                            ReferenceEntry top = topTuples.peek();
+                            int tupleIndex = top.getTupleIndex();
+                            int runIndex = topTuples.peek().getRunid();
+                            FrameTupleAccessor fta = top.getAccessor();
+                            if (visitingAggregator == null) {
+                                // Initialize the aggregator
+                                visitingAggregator = aggregatorFactory.createSpillableAggregator(ctx,
+                                        recordDescriptors[0], recordDescriptors[0]);
+                                // Initialize the partial aggregation result
+                                visitingAggregator.initFromPartial(fta, tupleIndex, keyFields);
+                                visitingKeyTuple = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+                                for (int i = 0; i < keyFields.length; i++) {
+                                    visitingKeyTuple.addField(fta, tupleIndex, keyFields[i]);
+                                }
+                            } else {
+                                if (compareTupleWithFrame(visitingKeyTuple, fta, tupleIndex, storedKeys, keyFields,
+                                        comparators) == 0) {
+                                    // If the two partial results are on the
+                                    // same key
+                                    visitingAggregator.accumulatePartialResult(fta, tupleIndex, keyFields);
+                                } else {
+                                    // Otherwise, write the partial result back
+                                    // to the output frame
+                                    if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
+                                        FrameUtils.flushFrame(outFrame, writer);
+                                        outFrameAppender.reset(outFrame, true);
+                                        if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
+                                            throw new IllegalStateException();
+                                        }
+                                    }
+                                    // Reset the partial aggregation result
+                                    visitingAggregator.initFromPartial(fta, tupleIndex, keyFields);
+                                    visitingKeyTuple.reset();
+                                    for (int i = 0; i < keyFields.length; i++) {
+                                        visitingKeyTuple.addField(fta, tupleIndex, keyFields[i]);
+                                    }
+                                }
+                            }
+                            tupleIndexes[runIndex]++;
+                            setNextTopTuple(runIndex, tupleIndexes, runFileReaders, tupleAccessors, topTuples);
+                        }
+                        // Output the last aggregation result in the frame
+                        if (visitingAggregator != null) {
+                            if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
+                                FrameUtils.flushFrame(outFrame, writer);
+                                outFrameAppender.reset(outFrame, true);
+                                if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
+                                    throw new IllegalStateException();
+                                }
+                            }
+                        }
+                        // Output data into run file writer after all tuples
+                        // have been checked
+                        if (outFrameAppender.getTupleCount() > 0) {
+                            FrameUtils.flushFrame(outFrame, writer);
+                            outFrameAppender.reset(outFrame, true);
+                        }
+                        // empty the input frames
+                        runs.subList(0, inFrames.size()).clear();
+                        // insert the new run file into the beginning of the run
+                        // file list
+                        if (!finalPass) {
+                            runs.add(0, ((RunFileWriter) writer).createReader());
+                        }
+                    } catch (Exception ex) {
+                        throw new HyracksDataException(ex);
+                    } finally {
+                        if (!finalPass) {
+                            writer.close();
+                        }
+                    }
+                }
 
-				/**
-				 * Insert the tuple into the priority queue.
-				 * 
-				 * @param runIndex
-				 * @param tupleIndexes
-				 * @param runCursors
-				 * @param tupleAccessors
-				 * @param topTuples
-				 * @throws IOException
-				 */
-				private void setNextTopTuple(int runIndex, int[] tupleIndexes,
-						RunFileReader[] runCursors,
-						FrameTupleAccessor[] tupleAccessors,
-						ReferencedPriorityQueue topTuples) throws IOException {
-					boolean exists = hasNextTuple(runIndex, tupleIndexes,
-							runCursors, tupleAccessors);
-					if (exists) {
-						topTuples.popAndReplace(tupleAccessors[runIndex],
-								tupleIndexes[runIndex]);
-					} else {
-						topTuples.pop();
-						closeRun(runIndex, runCursors, tupleAccessors);
-					}
-				}
+                /**
+                 * Insert the tuple into the priority queue.
+                 * 
+                 * @param runIndex
+                 * @param tupleIndexes
+                 * @param runCursors
+                 * @param tupleAccessors
+                 * @param topTuples
+                 * @throws IOException
+                 */
+                private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+                        FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
+                    boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+                    if (exists) {
+                        topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
+                    } else {
+                        topTuples.pop();
+                        closeRun(runIndex, runCursors, tupleAccessors);
+                    }
+                }
 
-				/**
-				 * Check whether there are any more tuples to be checked for the
-				 * given run file from the corresponding input frame.
-				 * 
-				 * If the input frame for this run file is exhausted, load a new
-				 * frame of the run file into the input frame.
-				 * 
-				 * @param runIndex
-				 * @param tupleIndexes
-				 * @param runCursors
-				 * @param tupleAccessors
-				 * @return
-				 * @throws IOException
-				 */
-				private boolean hasNextTuple(int runIndex, int[] tupleIndexes,
-						RunFileReader[] runCursors,
-						FrameTupleAccessor[] tupleAccessors) throws IOException {
+                /**
+                 * Check whether there are any more tuples to be checked for the
+                 * given run file from the corresponding input frame.
+                 * If the input frame for this run file is exhausted, load a new
+                 * frame of the run file into the input frame.
+                 * 
+                 * @param runIndex
+                 * @param tupleIndexes
+                 * @param runCursors
+                 * @param tupleAccessors
+                 * @return
+                 * @throws IOException
+                 */
+                private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+                        FrameTupleAccessor[] tupleAccessors) throws IOException {
 
-					if (tupleAccessors[runIndex] == null
-							|| runCursors[runIndex] == null) {
-						/*
-						 * Return false if the targeting run file is not
-						 * available, or the frame for the run file is not
-						 * available.
-						 */
-						return false;
-					} else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex]
-							.getTupleCount()) {
-						/*
-						 * If all tuples in the targeting frame have been
-						 * checked.
-						 */
-						ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
-						// Refill the buffer with contents from the run file.
-						if (runCursors[runIndex].nextFrame(buf)) {
-							tupleIndexes[runIndex] = 0;
-							return hasNextTuple(runIndex, tupleIndexes,
-									runCursors, tupleAccessors);
-						} else {
-							return false;
-						}
-					} else {
-						return true;
-					}
-				}
+                    if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+                        /*
+                         * Return false if the targeting run file is not
+                         * available, or the frame for the run file is not
+                         * available.
+                         */
+                        return false;
+                    } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
+                        /*
+                         * If all tuples in the targeting frame have been
+                         * checked.
+                         */
+                        ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
+                        // Refill the buffer with contents from the run file.
+                        if (runCursors[runIndex].nextFrame(buf)) {
+                            tupleIndexes[runIndex] = 0;
+                            return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+                        } else {
+                            return false;
+                        }
+                    } else {
+                        return true;
+                    }
+                }
 
-				/**
-				 * Close the run file, and also the corresponding readers and
-				 * input frame.
-				 * 
-				 * @param index
-				 * @param runCursors
-				 * @param tupleAccessor
-				 * @throws HyracksDataException
-				 */
-				private void closeRun(int index, RunFileReader[] runCursors,
-						IFrameTupleAccessor[] tupleAccessor)
-						throws HyracksDataException {
-					runCursors[index].close();
-					runCursors[index] = null;
-					tupleAccessor[index] = null;
-				}
+                /**
+                 * Close the run file, and also the corresponding readers and
+                 * input frame.
+                 * 
+                 * @param index
+                 * @param runCursors
+                 * @param tupleAccessor
+                 * @throws HyracksDataException
+                 */
+                private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+                throws HyracksDataException {
+                    runCursors[index].close();
+                    runCursors[index] = null;
+                    tupleAccessor[index] = null;
+                }
 
-				/**
-				 * Compare a tuple (in the format of a {@link ArrayTupleBuilder}
-				 * ) with a record in a frame (in the format of a
-				 * {@link FrameTupleAccessor}). Comparing keys and comparators
-				 * are specified for this method as inputs.
-				 * 
-				 * @param tuple0
-				 * @param accessor1
-				 * @param tIndex1
-				 * @param keys0
-				 * @param keys1
-				 * @param comparators
-				 * @return
-				 */
-				private int compareTupleWithFrame(ArrayTupleBuilder tuple0,
-						FrameTupleAccessor accessor1, int tIndex1, int[] keys0,
-						int[] keys1, IBinaryComparator[] comparators) {
-					int tStart1 = accessor1.getTupleStartOffset(tIndex1);
-					int fStartOffset1 = accessor1.getFieldSlotsLength()
-							+ tStart1;
+                /**
+                 * Compare a tuple (in the format of a {@link ArrayTupleBuilder} ) with a record in a frame (in the format of a {@link FrameTupleAccessor}). Comparing keys and comparators
+                 * are specified for this method as inputs.
+                 * 
+                 * @param tuple0
+                 * @param accessor1
+                 * @param tIndex1
+                 * @param keys0
+                 * @param keys1
+                 * @param comparators
+                 * @return
+                 */
+                private int compareTupleWithFrame(ArrayTupleBuilder tuple0, FrameTupleAccessor accessor1, int tIndex1,
+                        int[] keys0, int[] keys1, IBinaryComparator[] comparators) {
+                    int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+                    int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
 
-					for (int i = 0; i < keys0.length; ++i) {
-						int fIdx0 = keys0[i];
-						int fStart0 = (i == 0 ? 0
-								: tuple0.getFieldEndOffsets()[fIdx0 - 1]);
-						int fEnd0 = tuple0.getFieldEndOffsets()[fIdx0];
-						int fLen0 = fEnd0 - fStart0;
+                    for (int i = 0; i < keys0.length; ++i) {
+                        int fIdx0 = keys0[i];
+                        int fStart0 = (i == 0 ? 0 : tuple0.getFieldEndOffsets()[fIdx0 - 1]);
+                        int fEnd0 = tuple0.getFieldEndOffsets()[fIdx0];
+                        int fLen0 = fEnd0 - fStart0;
 
-						int fIdx1 = keys1[i];
-						int fStart1 = accessor1.getFieldStartOffset(tIndex1,
-								fIdx1);
-						int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
-						int fLen1 = fEnd1 - fStart1;
+                        int fIdx1 = keys1[i];
+                        int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1);
+                        int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
+                        int fLen1 = fEnd1 - fStart1;
 
-						int c = comparators[i].compare(tuple0.getByteArray(),
-								fStart0, fLen0, accessor1.getBuffer().array(),
-								fStart1 + fStartOffset1, fLen1);
-						if (c != 0) {
-							return c;
-						}
-					}
-					return 0;
-				}
-			};
-			return op;
-		}
+                        int c = comparators[i].compare(tuple0.getByteArray(), fStart0, fLen0, accessor1.getBuffer()
+                                .array(), fStart1 + fStartOffset1, fLen1);
+                        if (c != 0) {
+                            return c;
+                        }
+                    }
+                    return 0;
+                }
+            };
+            return op;
+        }
 
-		@Override
-		public IOperatorDescriptor getOwner() {
-			return ExternalHashGroupOperatorDescriptor.this;
-		}
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return ExternalHashGroupOperatorDescriptor.this;
+        }
 
-		private Comparator<ReferenceEntry> createEntryComparator(
-				final IBinaryComparator[] comparators) {
-			return new Comparator<ReferenceEntry>() {
-				public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
-					FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1
-							.getAccessor();
-					FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2
-							.getAccessor();
-					int j1 = (Integer) tp1.getTupleIndex();
-					int j2 = (Integer) tp2.getTupleIndex();
-					byte[] b1 = fta1.getBuffer().array();
-					byte[] b2 = fta2.getBuffer().array();
-					for (int f = 0; f < keyFields.length; ++f) {
-						int fIdx = keyFields[f];
-						int s1 = fta1.getTupleStartOffset(j1)
-								+ fta1.getFieldSlotsLength()
-								+ fta1.getFieldStartOffset(j1, fIdx);
-						int l1 = fta1.getFieldEndOffset(j1, fIdx)
-								- fta1.getFieldStartOffset(j1, fIdx);
-						int s2 = fta2.getTupleStartOffset(j2)
-								+ fta2.getFieldSlotsLength()
-								+ fta2.getFieldStartOffset(j2, fIdx);
-						int l2 = fta2.getFieldEndOffset(j2, fIdx)
-								- fta2.getFieldStartOffset(j2, fIdx);
-						int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-						if (c != 0) {
-							return c;
-						}
-					}
-					return 0;
-				}
-			};
-		}
+        private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+            return new Comparator<ReferenceEntry>() {
+                public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+                    FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
+                    FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+                    int j1 = (Integer) tp1.getTupleIndex();
+                    int j2 = (Integer) tp2.getTupleIndex();
+                    byte[] b1 = fta1.getBuffer().array();
+                    byte[] b2 = fta2.getBuffer().array();
+                    for (int f = 0; f < keyFields.length; ++f) {
+                        int fIdx = keyFields[f];
+                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                        + fta1.getFieldStartOffset(j1, fIdx);
+                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                        + fta2.getFieldStartOffset(j2, fIdx);
+                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                        if (c != 0) {
+                            return c;
+                        }
+                    }
+                    return 0;
+                }
+            };
+        }
 
-	}
+    }
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
index a2e8281..3fc7d79 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
@@ -21,11 +21,9 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IAccumulatingAggregatorFactory extends Serializable {
-	IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx,
-			RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor)
-			throws HyracksDataException;
+    IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDescriptor) throws HyracksDataException;
 
-	ISpillableAccumulatingAggregator createSpillableAggregator(
-			IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
-			RecordDescriptor outRecordDescriptor) throws HyracksDataException;
+    ISpillableAccumulatingAggregator createSpillableAggregator(IHyracksStageletContext ctx,
+            RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor) throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
index ab7b624..59c69eb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
@@ -22,23 +22,19 @@
 /**
  * An spillable version of the {@link IAccumulatingAggregator} supporting
  * external aggregation.
- * 
  */
-public interface ISpillableAccumulatingAggregator extends
-		IAccumulatingAggregator {
+public interface ISpillableAccumulatingAggregator extends IAccumulatingAggregator {
 
-	public void initFromPartial(IFrameTupleAccessor accessor, int tIndex,
-			int[] keyFieldIndexes) throws HyracksDataException;
+    public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
+    throws HyracksDataException;
 
-	/**
-	 * 
-	 * @param accessor
-	 * @param tIndex
-	 * @throws HyracksDataException
-	 */
-	public void accumulatePartialResult(IFrameTupleAccessor accessor,
-			int tIndex, int[] keyFieldIndexes) throws HyracksDataException;
+    /**
+     * @param accessor
+     * @param tIndex
+     * @throws HyracksDataException
+     */
+    public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
+    throws HyracksDataException;
 
-	public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder)
-			throws HyracksDataException;
+    public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder) throws HyracksDataException;
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
index 16bd972..b3b9f24 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
@@ -34,551 +34,536 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 /**
- * An in-mem hash table for spillable grouping operations. 
- * 
+ * An in-mem hash table for spillable grouping operations.
  * A table of {@link #Link}s are maintained in this object, and each row
- * of this table represents a hash partition. 
- * 
- * 
+ * of this table represents a hash partition.
  */
 public class SpillableGroupingHashTable {
 
-	/**
-	 * Context.
-	 */
-	private final IHyracksStageletContext ctx;
+    /**
+     * Context.
+     */
+    private final IHyracksStageletContext ctx;
 
-	/**
-	 * Columns for group-by
-	 */
-	private final int[] fields;
+    /**
+     * Columns for group-by
+     */
+    private final int[] fields;
 
-	/**
-	 * Key fields of records in the hash table (starting from 0
-	 * to the number of the key fields). 
-	 * 
-	 * This is different from the key fields in the input records, 
-	 * since these fields are extracted when being inserted into
-	 * the hash table.
-	 */
-	private final int[] storedKeys;
+    /**
+     * Key fields of records in the hash table (starting from 0
+     * to the number of the key fields).
+     * This is different from the key fields in the input records,
+     * since these fields are extracted when being inserted into
+     * the hash table.
+     */
+    private final int[] storedKeys;
 
-	/**
-	 * Comparators: one for each column in {@link #groupFields}
-	 */
-	private final IBinaryComparator[] comparators;
+    /**
+     * Comparators: one for each column in {@link #groupFields}
+     */
+    private final IBinaryComparator[] comparators;
 
-	/**
-	 * Record descriptor for the input tuple.
-	 */
-	private final RecordDescriptor inRecordDescriptor;
+    /**
+     * Record descriptor for the input tuple.
+     */
+    private final RecordDescriptor inRecordDescriptor;
 
-	/**
-	 * Record descriptor for the partial aggregation result.
-	 */
-	private final RecordDescriptor outputRecordDescriptor;
+    /**
+     * Record descriptor for the partial aggregation result.
+     */
+    private final RecordDescriptor outputRecordDescriptor;
 
-	/**
-	 * Accumulators in the main memory.
-	 */
-	private ISpillableAccumulatingAggregator[] accumulators;
+    /**
+     * Accumulators in the main memory.
+     */
+    private ISpillableAccumulatingAggregator[] accumulators;
 
-	/**
-	 * The hashing group table containing pointers to aggregators and also the
-	 * corresponding key tuples. So for each entry, there will be three integer
-	 * fields:
-	 * 
-	 * 1. The frame index containing the key tuple; 2. The tuple index inside of
-	 * the frame for the key tuple; 3. The index of the aggregator.
-	 * 
-	 * Note that each link in the table is a partition for the input records. Multiple
-	 * records in the same partition based on the {@link #tpc} are stored as
-	 * pointers.
-	 */
-	private final Link[] table;
+    /**
+     * The hashing group table containing pointers to aggregators and also the
+     * corresponding key tuples. So for each entry, there will be three integer
+     * fields:
+     * 1. The frame index containing the key tuple; 2. The tuple index inside of
+     * the frame for the key tuple; 3. The index of the aggregator.
+     * Note that each link in the table is a partition for the input records. Multiple
+     * records in the same partition based on the {@link #tpc} are stored as
+     * pointers.
+     */
+    private final Link[] table;
 
-	/**
-	 * Number of accumulators.
-	 */
-	private int accumulatorSize = 0;
+    /**
+     * Number of accumulators.
+     */
+    private int accumulatorSize = 0;
 
-	/**
-	 * Factory for the aggregators.
-	 */
-	private final IAccumulatingAggregatorFactory aggregatorFactory;
+    /**
+     * Factory for the aggregators.
+     */
+    private final IAccumulatingAggregatorFactory aggregatorFactory;
 
-	private final List<ByteBuffer> frames;
-	
-	private final ByteBuffer outFrame;
+    private final List<ByteBuffer> frames;
 
-	/**
-	 * Frame appender for output frames in {@link #frames}.
-	 */
-	private final FrameTupleAppender appender;
+    private final ByteBuffer outFrame;
 
-	/**
-	 * The count of used frames in the table.
-	 * 
-	 * Note that this cannot be replaced by {@link #frames} since frames will
-	 * not be removed after being created.
-	 */
-	private int dataFrameCount;
+    /**
+     * Frame appender for output frames in {@link #frames}.
+     */
+    private final FrameTupleAppender appender;
 
-	/**
-	 * Pointers for the sorted aggregators
-	 */
-	private int[] tPointers;
+    /**
+     * The count of used frames in the table.
+     * Note that this cannot be replaced by {@link #frames} since frames will
+     * not be removed after being created.
+     */
+    private int dataFrameCount;
 
-	private static final int INIT_ACCUMULATORS_SIZE = 8;
+    /**
+     * Pointers for the sorted aggregators
+     */
+    private int[] tPointers;
 
-	/**
-	 * The maximum number of frames available for this hashing group table.
-	 */
-	private final int framesLimit;
+    private static final int INIT_ACCUMULATORS_SIZE = 8;
 
-	private final FrameTuplePairComparator ftpc;
+    /**
+     * The maximum number of frames available for this hashing group table.
+     */
+    private final int framesLimit;
 
-	/**
-	 * A partition computer to partition the hashing group table.
-	 */
-	private final ITuplePartitionComputer tpc;
+    private final FrameTuplePairComparator ftpc;
 
-	/**
-	 * Accessors for the tuples. Two accessors are necessary during the sort.
-	 */
-	private final FrameTupleAccessor storedKeysAccessor1;
-	private final FrameTupleAccessor storedKeysAccessor2;
+    /**
+     * A partition computer to partition the hashing group table.
+     */
+    private final ITuplePartitionComputer tpc;
 
-	/**
-	 * Create a spillable grouping hash table. 
-	 * @param ctx					The context of the job.
-	 * @param fields				Fields of keys for grouping.
-	 * @param comparatorFactories	The comparators.
-	 * @param tpcf					The partitioners. These are used to partition the incoming records into proper partition of the hash table.
-	 * @param aggregatorFactory		The aggregators.
-	 * @param inRecordDescriptor	Record descriptor for input data.
-	 * @param outputRecordDescriptor	Record descriptor for output data.
-	 * @param framesLimit			The maximum number of frames usable in the memory for hash table.
-	 * @param tableSize				The size of the table, which specified the number of partitions of the table.
-	 */
-	public SpillableGroupingHashTable(IHyracksStageletContext ctx, int[] fields,
-			IBinaryComparatorFactory[] comparatorFactories,
-			ITuplePartitionComputerFactory tpcf,
-			IAccumulatingAggregatorFactory aggregatorFactory,
-			RecordDescriptor inRecordDescriptor,
-			RecordDescriptor outputRecordDescriptor, int framesLimit,
-			int tableSize) {
-		this.ctx = ctx;
-		this.fields = fields;
+    /**
+     * Accessors for the tuples. Two accessors are necessary during the sort.
+     */
+    private final FrameTupleAccessor storedKeysAccessor1;
+    private final FrameTupleAccessor storedKeysAccessor2;
 
-		storedKeys = new int[fields.length];
-		@SuppressWarnings("rawtypes")
-		ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
-		
-		// Note that after storing a record into the hash table, the index for the fields should
-		// be updated. Here we assume that all these key fields are written at the beginning of 
-		// the record, so their index should start from 0 and end at the length of the key fields.
-		for (int i = 0; i < fields.length; ++i) {
-			storedKeys[i] = i;
-			storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
-		}
-		RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
-				storedKeySerDeser);
-		storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
-				storedKeysRecordDescriptor);
-		storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(),
-				storedKeysRecordDescriptor);
+    /**
+     * Create a spillable grouping hash table.
+     * 
+     * @param ctx
+     *            The context of the job.
+     * @param fields
+     *            Fields of keys for grouping.
+     * @param comparatorFactories
+     *            The comparators.
+     * @param tpcf
+     *            The partitioners. These are used to partition the incoming records into proper partition of the hash table.
+     * @param aggregatorFactory
+     *            The aggregators.
+     * @param inRecordDescriptor
+     *            Record descriptor for input data.
+     * @param outputRecordDescriptor
+     *            Record descriptor for output data.
+     * @param framesLimit
+     *            The maximum number of frames usable in the memory for hash table.
+     * @param tableSize
+     *            The size of the table, which specified the number of partitions of the table.
+     */
+    public SpillableGroupingHashTable(IHyracksStageletContext ctx, int[] fields,
+            IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory tpcf,
+            IAccumulatingAggregatorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outputRecordDescriptor, int framesLimit, int tableSize) {
+        this.ctx = ctx;
+        this.fields = fields;
 
-		comparators = new IBinaryComparator[comparatorFactories.length];
-		for (int i = 0; i < comparatorFactories.length; ++i) {
-			comparators[i] = comparatorFactories[i].createBinaryComparator();
-		}
+        storedKeys = new int[fields.length];
+        @SuppressWarnings("rawtypes")
+        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
 
-		this.table = new Link[tableSize];
+        // Note that after storing a record into the hash table, the index for the fields should
+        // be updated. Here we assume that all these key fields are written at the beginning of 
+        // the record, so their index should start from 0 and end at the length of the key fields.
+        for (int i = 0; i < fields.length; ++i) {
+            storedKeys[i] = i;
+            storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
+        }
+        RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
+        storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
+        storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
 
-		this.aggregatorFactory = aggregatorFactory;
-		accumulators = new ISpillableAccumulatingAggregator[INIT_ACCUMULATORS_SIZE];
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
 
-		this.framesLimit = framesLimit;
+        this.table = new Link[tableSize];
 
-		// Tuple pair comparator
-		ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
-		
-		// Partitioner
-		tpc = tpcf.createPartitioner();
+        this.aggregatorFactory = aggregatorFactory;
+        accumulators = new ISpillableAccumulatingAggregator[INIT_ACCUMULATORS_SIZE];
 
-		this.inRecordDescriptor = inRecordDescriptor;
-		this.outputRecordDescriptor = outputRecordDescriptor;
-		frames = new ArrayList<ByteBuffer>();
-		appender = new FrameTupleAppender(ctx.getFrameSize());
+        this.framesLimit = framesLimit;
 
-		dataFrameCount = -1;
-		
-		outFrame = ctx.allocateFrame();
-	}
+        // Tuple pair comparator
+        ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
 
-	public void reset() {
-		dataFrameCount = -1;
-		tPointers = null;
-		// Reset the grouping hash table
-		for (int i = 0; i < table.length; i++) {
-			table[i] = new Link();
-		}
-	}
+        // Partitioner
+        tpc = tpcf.createPartitioner();
 
-	public int getFrameCount() {
-		return dataFrameCount;
-	}
+        this.inRecordDescriptor = inRecordDescriptor;
+        this.outputRecordDescriptor = outputRecordDescriptor;
+        frames = new ArrayList<ByteBuffer>();
+        appender = new FrameTupleAppender(ctx.getFrameSize());
 
-	/**
-	 * How to define pointers for the partial aggregation
-	 * 
-	 * @return
-	 */
-	public int[] getTPointers() {
-		return tPointers;
-	}
+        dataFrameCount = -1;
 
-	/**
-	 * Redefine the number of fields in the pointer. 
-	 * 
-	 * Only two pointers are necessary for external grouping: one is to the
-	 * index of the hash table, and the other is to the row index inside of the
-	 * hash table.
-	 * 
-	 * @return
-	 */
-	public int getPtrFields() {
-		return 2;
-	}
+        outFrame = ctx.allocateFrame();
+    }
 
-	public List<ByteBuffer> getFrames() {
-		return frames;
-	}
+    public void reset() {
+        dataFrameCount = -1;
+        tPointers = null;
+        // Reset the grouping hash table
+        for (int i = 0; i < table.length; i++) {
+            table[i] = new Link();
+        }
+    }
 
-	/**
-	 * Set the working frame to the next available frame in the
-	 * frame list. There are two cases:<br>
-	 * 
-	 * 1) If the next frame is not initialized, allocate
-	 * a new frame.
-	 * 
-	 * 2) When frames are already created, they are recycled.
-	 * 
-	 * @return Whether a new frame is added successfully.
-	 */
-	private boolean nextAvailableFrame() {
-		// Return false if the number of frames is equal to the limit.
-		if (dataFrameCount + 1 >= framesLimit)
-			return false;
-		
-		if (frames.size() < framesLimit) {
-			// Insert a new frame
-			ByteBuffer frame = ctx.allocateFrame();
-			frame.position(0);
-			frame.limit(frame.capacity());
-			frames.add(frame);
-			appender.reset(frame, true);
-			dataFrameCount++;
-		} else {
-			// Reuse an old frame
-			dataFrameCount++;
-			ByteBuffer frame = frames.get(dataFrameCount);
-			frame.position(0);
-			frame.limit(frame.capacity());
-			appender.reset(frame, true);
-		}
-		return true;
-	}
+    public int getFrameCount() {
+        return dataFrameCount;
+    }
 
-	/**
-	 * Insert a new record from the input frame.
-	 * 
-	 * @param accessor
-	 * @param tIndex
-	 * @return
-	 * @throws HyracksDataException
-	 */
-	public boolean insert(FrameTupleAccessor accessor, int tIndex)
-			throws HyracksDataException {
-		if (dataFrameCount < 0)
-			nextAvailableFrame();
-		// Get the partition for the inserting tuple
-		int entry = tpc.partition(accessor, tIndex, table.length);
-		Link link = table[entry];
-		if (link == null) {
-			link = table[entry] = new Link();
-		}
-		// Find the corresponding aggregator from existing aggregators
-		ISpillableAccumulatingAggregator aggregator = null;
-		for (int i = 0; i < link.size; i += 3) {
-			int sbIndex = link.pointers[i];
-			int stIndex = link.pointers[i + 1];
-			int saIndex = link.pointers[i + 2];
-			storedKeysAccessor1.reset(frames.get(sbIndex));
-			int c = ftpc
-					.compare(accessor, tIndex, storedKeysAccessor1, stIndex);
-			if (c == 0) {
-				aggregator = accumulators[saIndex];
-				break;
-			}
-		}
-		// Do insert
-		if (aggregator == null) {
-			// Did not find the aggregator. Insert a new aggregator entry
-			if (!appender.appendProjection(accessor, tIndex, fields)) {
-				if (!nextAvailableFrame()) {
-					// If buffer is full, return false to trigger a run file
-					// write
-					return false;
-				} else {
-					// Try to do insert after adding a new frame.
-					if (!appender.appendProjection(accessor, tIndex, fields)) {
-						throw new IllegalStateException();
-					}
-				}
-			}
-			int sbIndex = dataFrameCount;
-			int stIndex = appender.getTupleCount() - 1;
-			if (accumulatorSize >= accumulators.length) {
-				accumulators = Arrays.copyOf(accumulators,
-						accumulators.length * 2);
-			}
-			int saIndex = accumulatorSize++;
-			aggregator = accumulators[saIndex] = aggregatorFactory
-					.createSpillableAggregator(ctx, inRecordDescriptor,
-							outputRecordDescriptor);
-			aggregator.init(accessor, tIndex);
-			link.add(sbIndex, stIndex, saIndex);
-		}
-		aggregator.accumulate(accessor, tIndex);
-		return true;
-	}
+    /**
+     * How to define pointers for the partial aggregation
+     * 
+     * @return
+     */
+    public int[] getTPointers() {
+        return tPointers;
+    }
 
-	/**
-	 * Sort partial results
-	 */
-	public void sortFrames() {
-		int totalTCount = 0;
-		// Get the number of records
-		for (int i = 0; i < table.length; i++) {
-			if (table[i] == null)
-				continue;
-			totalTCount += table[i].size / 3;
-		}
-		// Start sorting:
-		/*
-		 * Based on the data structure for the partial aggregates, the
-		 * pointers should be initialized.
-		 */
-		tPointers = new int[totalTCount * getPtrFields()];
-		// Initialize pointers
-		int ptr = 0;
-		// Maintain two pointers to each entry of the hashing group table
-		for (int i = 0; i < table.length; i++) {
-			if (table[i] == null)
-				continue;
-			for (int j = 0; j < table[i].size; j = j + 3) {
-				tPointers[ptr * getPtrFields()] = i;
-				tPointers[ptr * getPtrFields() + 1] = j;
-				ptr++;
-			}
-		}
-		// Sort using quick sort
-		if (tPointers.length > 0) {
-			sort(tPointers, 0, totalTCount);
-		}
-	}
+    /**
+     * Redefine the number of fields in the pointer.
+     * Only two pointers are necessary for external grouping: one is to the
+     * index of the hash table, and the other is to the row index inside of the
+     * hash table.
+     * 
+     * @return
+     */
+    public int getPtrFields() {
+        return 2;
+    }
 
-	/**
-	 * 
-	 * @param writer
-	 * @throws HyracksDataException
-	 */
-	public void flushFrames(IFrameWriter writer, boolean sorted) throws HyracksDataException {
-		FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+    public List<ByteBuffer> getFrames() {
+        return frames;
+    }
 
-		ISpillableAccumulatingAggregator aggregator = null;
-		writer.open();
-		appender.reset(outFrame, true);
-		if (sorted){
-			sortFrames();
-		}
-		if(tPointers == null){
-			// Not sorted
-			for (int i = 0; i < table.length; ++i) {
-	            Link link = table[i];
-	            if (link != null) {
-	                for (int j = 0; j < link.size; j += 3) {
-	                    int bIndex = link.pointers[j];
-	                    int tIndex = link.pointers[j + 1];
-	                    int aIndex = link.pointers[j + 2];
-	                    ByteBuffer keyBuffer = frames.get(bIndex);
-	                    storedKeysAccessor1.reset(keyBuffer);
-	                    aggregator = accumulators[aIndex];
-	                    while (!aggregator.output(appender, storedKeysAccessor1, tIndex, storedKeys)) {
-	                    	FrameUtils.flushFrame(outFrame, writer);
-	                    	appender.reset(outFrame, true);
-	                    }
-	                }
-	            }
-	        }
-	        if (appender.getTupleCount() != 0) {
-	        	FrameUtils.flushFrame(outFrame, writer);
-	        }
-	        return;
-		}
-		int n = tPointers.length / getPtrFields();
-		for (int ptr = 0; ptr < n; ptr++) {
-			int tableIndex = tPointers[ptr * 2];
-			int rowIndex = tPointers[ptr * 2 + 1];
-			int frameIndex = table[tableIndex].pointers[rowIndex];
-			int tupleIndex = table[tableIndex].pointers[rowIndex + 1];
-			int aggregatorIndex = table[tableIndex].pointers[rowIndex + 2];
-			// Get the frame containing the value
-			ByteBuffer buffer = frames.get(frameIndex);
-			storedKeysAccessor1.reset(buffer);
+    /**
+     * Set the working frame to the next available frame in the
+     * frame list. There are two cases:<br>
+     * 1) If the next frame is not initialized, allocate
+     * a new frame.
+     * 2) When frames are already created, they are recycled.
+     * 
+     * @return Whether a new frame is added successfully.
+     */
+    private boolean nextAvailableFrame() {
+        // Return false if the number of frames is equal to the limit.
+        if (dataFrameCount + 1 >= framesLimit)
+            return false;
 
-			// Get the aggregator
-			aggregator = accumulators[aggregatorIndex];
-			// Insert
-			if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex,
-					fields)) {
-				FrameUtils.flushFrame(outFrame, writer);
-				appender.reset(outFrame, true);
-				if (!aggregator.output(appender, storedKeysAccessor1,
-						tupleIndex, fields)) {
-					throw new IllegalStateException();
-				} else {
-					accumulators[aggregatorIndex] = null;
-				}
-			} else {
-				accumulators[aggregatorIndex] = null;
-			}
-		}
-		if (appender.getTupleCount() > 0) {
-			FrameUtils.flushFrame(outFrame, writer);
-		}
-	}
+        if (frames.size() < framesLimit) {
+            // Insert a new frame
+            ByteBuffer frame = ctx.allocateFrame();
+            frame.position(0);
+            frame.limit(frame.capacity());
+            frames.add(frame);
+            appender.reset(frame, true);
+            dataFrameCount++;
+        } else {
+            // Reuse an old frame
+            dataFrameCount++;
+            ByteBuffer frame = frames.get(dataFrameCount);
+            frame.position(0);
+            frame.limit(frame.capacity());
+            appender.reset(frame, true);
+        }
+        return true;
+    }
 
-	private void sort(int[] tPointers, int offset, int length) {
-		int m = offset + (length >> 1);
-		// Get table index
-		int mTable = tPointers[m * 2];
-		int mRow = tPointers[m * 2 + 1];
-		// Get frame and tuple index
-		int mFrame = table[mTable].pointers[mRow];
-		int mTuple = table[mTable].pointers[mRow + 1];
-		storedKeysAccessor1.reset(frames.get(mFrame));
+    /**
+     * Insert a new record from the input frame.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @return
+     * @throws HyracksDataException
+     */
+    public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+        if (dataFrameCount < 0)
+            nextAvailableFrame();
+        // Get the partition for the inserting tuple
+        int entry = tpc.partition(accessor, tIndex, table.length);
+        Link link = table[entry];
+        if (link == null) {
+            link = table[entry] = new Link();
+        }
+        // Find the corresponding aggregator from existing aggregators
+        ISpillableAccumulatingAggregator aggregator = null;
+        for (int i = 0; i < link.size; i += 3) {
+            int sbIndex = link.pointers[i];
+            int stIndex = link.pointers[i + 1];
+            int saIndex = link.pointers[i + 2];
+            storedKeysAccessor1.reset(frames.get(sbIndex));
+            int c = ftpc.compare(accessor, tIndex, storedKeysAccessor1, stIndex);
+            if (c == 0) {
+                aggregator = accumulators[saIndex];
+                break;
+            }
+        }
+        // Do insert
+        if (aggregator == null) {
+            // Did not find the aggregator. Insert a new aggregator entry
+            if (!appender.appendProjection(accessor, tIndex, fields)) {
+                if (!nextAvailableFrame()) {
+                    // If buffer is full, return false to trigger a run file
+                    // write
+                    return false;
+                } else {
+                    // Try to do insert after adding a new frame.
+                    if (!appender.appendProjection(accessor, tIndex, fields)) {
+                        throw new IllegalStateException();
+                    }
+                }
+            }
+            int sbIndex = dataFrameCount;
+            int stIndex = appender.getTupleCount() - 1;
+            if (accumulatorSize >= accumulators.length) {
+                accumulators = Arrays.copyOf(accumulators, accumulators.length * 2);
+            }
+            int saIndex = accumulatorSize++;
+            aggregator = accumulators[saIndex] = aggregatorFactory.createSpillableAggregator(ctx, inRecordDescriptor,
+                    outputRecordDescriptor);
+            aggregator.init(accessor, tIndex);
+            link.add(sbIndex, stIndex, saIndex);
+        }
+        aggregator.accumulate(accessor, tIndex);
+        return true;
+    }
 
-		int a = offset;
-		int b = a;
-		int c = offset + length - 1;
-		int d = c;
-		while (true) {
-			while (b <= c) {
-				int bTable = tPointers[b * 2];
-				int bRow = tPointers[b * 2 + 1];
-				int bFrame = table[bTable].pointers[bRow];
-				int bTuple = table[bTable].pointers[bRow + 1];
-				storedKeysAccessor2.reset(frames.get(bFrame));
-				int cmp = ftpc.compare(storedKeysAccessor2, bTuple,
-						storedKeysAccessor1, mTuple);
-				// int cmp = compare(tPointers, b, mi, mj, mv);
-				if (cmp > 0) {
-					break;
-				}
-				if (cmp == 0) {
-					swap(tPointers, a++, b);
-				}
-				++b;
-			}
-			while (c >= b) {
-				int cTable = tPointers[c * 2];
-				int cRow = tPointers[c * 2 + 1];
-				int cFrame = table[cTable].pointers[cRow];
-				int cTuple = table[cTable].pointers[cRow + 1];
-				storedKeysAccessor2.reset(frames.get(cFrame));
-				int cmp = ftpc.compare(storedKeysAccessor2, cTuple,
-						storedKeysAccessor1, mTuple);
-				// int cmp = compare(tPointers, c, mi, mj, mv);
-				if (cmp < 0) {
-					break;
-				}
-				if (cmp == 0) {
-					swap(tPointers, c, d--);
-				}
-				--c;
-			}
-			if (b > c)
-				break;
-			swap(tPointers, b++, c--);
-		}
+    /**
+     * Sort partial results
+     */
+    public void sortFrames() {
+        int totalTCount = 0;
+        // Get the number of records
+        for (int i = 0; i < table.length; i++) {
+            if (table[i] == null)
+                continue;
+            totalTCount += table[i].size / 3;
+        }
+        // Start sorting:
+        /*
+         * Based on the data structure for the partial aggregates, the
+         * pointers should be initialized.
+         */
+        tPointers = new int[totalTCount * getPtrFields()];
+        // Initialize pointers
+        int ptr = 0;
+        // Maintain two pointers to each entry of the hashing group table
+        for (int i = 0; i < table.length; i++) {
+            if (table[i] == null)
+                continue;
+            for (int j = 0; j < table[i].size; j = j + 3) {
+                tPointers[ptr * getPtrFields()] = i;
+                tPointers[ptr * getPtrFields() + 1] = j;
+                ptr++;
+            }
+        }
+        // Sort using quick sort
+        if (tPointers.length > 0) {
+            sort(tPointers, 0, totalTCount);
+        }
+    }
 
-		int s;
-		int n = offset + length;
-		s = Math.min(a - offset, b - a);
-		vecswap(tPointers, offset, b - s, s);
-		s = Math.min(d - c, n - d - 1);
-		vecswap(tPointers, b, n - s, s);
+    /**
+     * @param writer
+     * @throws HyracksDataException
+     */
+    public void flushFrames(IFrameWriter writer, boolean sorted) throws HyracksDataException {
+        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
 
-		if ((s = b - a) > 1) {
-			sort(tPointers, offset, s);
-		}
-		if ((s = d - c) > 1) {
-			sort(tPointers, n - s, s);
-		}
-	}
+        ISpillableAccumulatingAggregator aggregator = null;
+        writer.open();
+        appender.reset(outFrame, true);
+        if (sorted) {
+            sortFrames();
+        }
+        if (tPointers == null) {
+            // Not sorted
+            for (int i = 0; i < table.length; ++i) {
+                Link link = table[i];
+                if (link != null) {
+                    for (int j = 0; j < link.size; j += 3) {
+                        int bIndex = link.pointers[j];
+                        int tIndex = link.pointers[j + 1];
+                        int aIndex = link.pointers[j + 2];
+                        ByteBuffer keyBuffer = frames.get(bIndex);
+                        storedKeysAccessor1.reset(keyBuffer);
+                        aggregator = accumulators[aIndex];
+                        while (!aggregator.output(appender, storedKeysAccessor1, tIndex, storedKeys)) {
+                            FrameUtils.flushFrame(outFrame, writer);
+                            appender.reset(outFrame, true);
+                        }
+                    }
+                }
+            }
+            if (appender.getTupleCount() != 0) {
+                FrameUtils.flushFrame(outFrame, writer);
+            }
+            return;
+        }
+        int n = tPointers.length / getPtrFields();
+        for (int ptr = 0; ptr < n; ptr++) {
+            int tableIndex = tPointers[ptr * 2];
+            int rowIndex = tPointers[ptr * 2 + 1];
+            int frameIndex = table[tableIndex].pointers[rowIndex];
+            int tupleIndex = table[tableIndex].pointers[rowIndex + 1];
+            int aggregatorIndex = table[tableIndex].pointers[rowIndex + 2];
+            // Get the frame containing the value
+            ByteBuffer buffer = frames.get(frameIndex);
+            storedKeysAccessor1.reset(buffer);
 
-	private void swap(int x[], int a, int b) {
-		for (int i = 0; i < 2; ++i) {
-			int t = x[a * 2 + i];
-			x[a * 2 + i] = x[b * 2 + i];
-			x[b * 2 + i] = t;
-		}
-	}
+            // Get the aggregator
+            aggregator = accumulators[aggregatorIndex];
+            // Insert
+            if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex, fields)) {
+                FrameUtils.flushFrame(outFrame, writer);
+                appender.reset(outFrame, true);
+                if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex, fields)) {
+                    throw new IllegalStateException();
+                } else {
+                    accumulators[aggregatorIndex] = null;
+                }
+            } else {
+                accumulators[aggregatorIndex] = null;
+            }
+        }
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(outFrame, writer);
+        }
+    }
 
-	private void vecswap(int x[], int a, int b, int n) {
-		for (int i = 0; i < n; i++, a++, b++) {
-			swap(x, a, b);
-		}
-	}
+    private void sort(int[] tPointers, int offset, int length) {
+        int m = offset + (length >> 1);
+        // Get table index
+        int mTable = tPointers[m * 2];
+        int mRow = tPointers[m * 2 + 1];
+        // Get frame and tuple index
+        int mFrame = table[mTable].pointers[mRow];
+        int mTuple = table[mTable].pointers[mRow + 1];
+        storedKeysAccessor1.reset(frames.get(mFrame));
 
-	/**
-	 * The pointers in the link store 3 int values for each entry in the
-	 * hashtable: (bufferIdx, tIndex, accumulatorIdx).
-	 * 
-	 * @author vinayakb
-	 */
-	private static class Link {
-		private static final int INIT_POINTERS_SIZE = 9;
+        int a = offset;
+        int b = a;
+        int c = offset + length - 1;
+        int d = c;
+        while (true) {
+            while (b <= c) {
+                int bTable = tPointers[b * 2];
+                int bRow = tPointers[b * 2 + 1];
+                int bFrame = table[bTable].pointers[bRow];
+                int bTuple = table[bTable].pointers[bRow + 1];
+                storedKeysAccessor2.reset(frames.get(bFrame));
+                int cmp = ftpc.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
+                // int cmp = compare(tPointers, b, mi, mj, mv);
+                if (cmp > 0) {
+                    break;
+                }
+                if (cmp == 0) {
+                    swap(tPointers, a++, b);
+                }
+                ++b;
+            }
+            while (c >= b) {
+                int cTable = tPointers[c * 2];
+                int cRow = tPointers[c * 2 + 1];
+                int cFrame = table[cTable].pointers[cRow];
+                int cTuple = table[cTable].pointers[cRow + 1];
+                storedKeysAccessor2.reset(frames.get(cFrame));
+                int cmp = ftpc.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
+                // int cmp = compare(tPointers, c, mi, mj, mv);
+                if (cmp < 0) {
+                    break;
+                }
+                if (cmp == 0) {
+                    swap(tPointers, c, d--);
+                }
+                --c;
+            }
+            if (b > c)
+                break;
+            swap(tPointers, b++, c--);
+        }
 
-		int[] pointers;
-		int size;
+        int s;
+        int n = offset + length;
+        s = Math.min(a - offset, b - a);
+        vecswap(tPointers, offset, b - s, s);
+        s = Math.min(d - c, n - d - 1);
+        vecswap(tPointers, b, n - s, s);
 
-		Link() {
-			pointers = new int[INIT_POINTERS_SIZE];
-			size = 0;
-		}
+        if ((s = b - a) > 1) {
+            sort(tPointers, offset, s);
+        }
+        if ((s = d - c) > 1) {
+            sort(tPointers, n - s, s);
+        }
+    }
 
-		void add(int bufferIdx, int tIndex, int accumulatorIdx) {
-			while (size + 3 > pointers.length) {
-				pointers = Arrays.copyOf(pointers, pointers.length * 2);
-			}
-			pointers[size++] = bufferIdx;
-			pointers[size++] = tIndex;
-			pointers[size++] = accumulatorIdx;
-		}
+    private void swap(int x[], int a, int b) {
+        for (int i = 0; i < 2; ++i) {
+            int t = x[a * 2 + i];
+            x[a * 2 + i] = x[b * 2 + i];
+            x[b * 2 + i] = t;
+        }
+    }
 
-		public String toString() {
-			StringBuilder sb = new StringBuilder();
-			sb.append("[Size=" + size + "]");
-			for (int i = 0; i < pointers.length; i = i + 3) {
-				sb.append(pointers[i] + ",");
-				sb.append(pointers[i + 1] + ",");
-				sb.append(pointers[i + 2] + "; ");
-			}
-			return sb.toString();
-		}
-	}
+    private void vecswap(int x[], int a, int b, int n) {
+        for (int i = 0; i < n; i++, a++, b++) {
+            swap(x, a, b);
+        }
+    }
+
+    /**
+     * The pointers in the link store 3 int values for each entry in the
+     * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+     * 
+     * @author vinayakb
+     */
+    private static class Link {
+        private static final int INIT_POINTERS_SIZE = 9;
+
+        int[] pointers;
+        int size;
+
+        Link() {
+            pointers = new int[INIT_POINTERS_SIZE];
+            size = 0;
+        }
+
+        void add(int bufferIdx, int tIndex, int accumulatorIdx) {
+            while (size + 3 > pointers.length) {
+                pointers = Arrays.copyOf(pointers, pointers.length * 2);
+            }
+            pointers[size++] = bufferIdx;
+            pointers[size++] = tIndex;
+            pointers[size++] = accumulatorIdx;
+        }
+
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("[Size=" + size + "]");
+            for (int i = 0; i < pointers.length; i = i + 3) {
+                sb.append(pointers[i] + ",");
+                sb.append(pointers[i + 1] + ",");
+                sb.append(pointers[i + 2] + "; ");
+            }
+            return sb.toString();
+        }
+    }
 }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
index 421b47c..2621049 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
@@ -60,517 +60,371 @@
 
 /**
  * Test cases for external hash group operator.
- * 
  */
 public class ExternalAggregateTest extends AbstractIntegrationTest {
 
-	/**
-	 * Test 01: aggregate (count) on single field, on a simple data set.
-	 * 
-	 * @throws Exception
-	 */
-	@Test
-	public void externalAggregateTestSingleFieldSimpleData() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    /**
+     * Test 01: aggregate (count) on single field, on a simple data set.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void externalAggregateTestSingleFieldSimpleData() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
-				new FileSplit[] {
-						new FileSplit(NC2_ID, new FileReference(new File(
-								"data/wordcount.tsv"))),
-						new FileSplit(NC1_ID, new FileReference(new File(
-								"data/wordcount.tsv"))) });
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+                new FileSplit(NC2_ID, new FileReference(new File("data/wordcount.tsv"))),
+                new FileSplit(NC1_ID, new FileReference(new File("data/wordcount.tsv"))) });
 
-		// Input format: a string field as the key
-		RecordDescriptor desc = new RecordDescriptor(
-				new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+        // Input format: a string field as the key
+        RecordDescriptor desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
-		// Output format: a string field as the key, and an integer field as the
-		// count
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE });
+        // Output format: a string field as the key, and an integer field as the
+        // count
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
-		// Data set format: word(string),count(int)
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec,
-				splitProvider,
-				new DelimitedDataTupleParserFactory(
-						new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
-						','), desc);
-		PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] {
-						new AbsoluteLocationConstraint(NC2_ID),
-						new AbsoluteLocationConstraint(NC1_ID) });
-		csvScanner.setPartitionConstraint(csvPartitionConstraint);
+        // Data set format: word(string),count(int)
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-		int[] keys = new int[] { 0 };
-		int tableSize = 8;
+        int[] keys = new int[] { 0 };
+        int tableSize = 8;
 
-		ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
-				spec, // Job conf
-				keys, // Group key
-				3, // Number of frames
-				false, // Whether to sort the output
-				// Hash partitioner
-				new FieldHashPartitionComputerFactory(
-						keys,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				// Key comparator
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				// Aggregator factory
-				new MultiAggregatorFactory(
-						new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-				outputRec, // Output format
-				tableSize // Size of the hashing table, which is used to control
-							// the partition when hashing
-		);
+        ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
+                spec, // Job conf
+                keys, // Group key
+                3, // Number of frames
+                false, // Whether to sort the output
+                // Hash partitioner
+                new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        // Key comparator
+                        new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                        // Aggregator factory
+                        new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                        outputRec, // Output format
+                        tableSize // Size of the hashing table, which is used to control
+                        // the partition when hashing
+        );
 
-		PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] {
-						new AbsoluteLocationConstraint(NC2_ID),
-						new AbsoluteLocationConstraint(NC1_ID) });
-		grouper.setPartitionConstraint(grouperPartitionConstraint);
+        PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        grouper.setPartitionConstraint(grouperPartitionConstraint);
 
-		IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keys,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-		PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] {
-						new AbsoluteLocationConstraint(NC2_ID),
-						new AbsoluteLocationConstraint(NC1_ID) });
-		printer.setPartitionConstraint(printerPartitionConstraint);
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        printer.setPartitionConstraint(printerPartitionConstraint);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-	/**
-	 * Test 02: Control experiment using in-memory aggregator, on the same data
-	 * set of {@link #externalAggregateTest01()}
-	 * 
-	 * @throws Exception
-	 */
-	@Test
-	public void externalAggregateTestSingleFieldSimpleDataInMemControl()
-			throws Exception {
-		JobSpecification spec = new JobSpecification();
+    /**
+     * Test 02: Control experiment using in-memory aggregator, on the same data
+     * set of {@link #externalAggregateTest01()}
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void externalAggregateTestSingleFieldSimpleDataInMemControl() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
-				new FileSplit[] {
-						new FileSplit(NC2_ID, new FileReference(new File(
-								"data/wordcount.tsv"))),
-						new FileSplit(NC1_ID, new FileReference(new File(
-								"data/wordcount.tsv"))) });
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+                new FileSplit(NC2_ID, new FileReference(new File("data/wordcount.tsv"))),
+                new FileSplit(NC1_ID, new FileReference(new File("data/wordcount.tsv"))) });
 
-		RecordDescriptor desc = new RecordDescriptor(
-				new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec,
-				splitProvider,
-				new DelimitedDataTupleParserFactory(
-						new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
-						','), desc);
-		PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] {
-						new AbsoluteLocationConstraint(NC2_ID),
-						new AbsoluteLocationConstraint(NC1_ID) });
-		csvScanner.setPartitionConstraint(csvPartitionConstraint);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-		int[] keys = new int[] { 0 };
-		int tableSize = 8;
+        int[] keys = new int[] { 0 };
+        int tableSize = 8;
 
-		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-				spec,
-				keys,
-				new FieldHashPartitionComputerFactory(
-						keys,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiAggregatorFactory(
-						new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-				outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keys,
+                new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                        new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                        outputRec, tableSize);
 
-		PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] {
-						new AbsoluteLocationConstraint(NC2_ID),
-						new AbsoluteLocationConstraint(NC1_ID) });
-		grouper.setPartitionConstraint(grouperPartitionConstraint);
+        PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        grouper.setPartitionConstraint(grouperPartitionConstraint);
 
-		IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keys,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-		PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] {
-						new AbsoluteLocationConstraint(NC2_ID),
-						new AbsoluteLocationConstraint(NC1_ID) });
-		printer.setPartitionConstraint(printerPartitionConstraint);
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        printer.setPartitionConstraint(printerPartitionConstraint);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-	/**
-	 * Test 03: aggregates on multiple fields
-	 * 
-	 * @throws Exception
-	 */
-	@Test
-	public void externalAggregateTestMultiAggFields() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    /**
+     * Test 03: aggregates on multiple fields
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void externalAggregateTestMultiAggFields() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID,
-				new FileReference(new File("data/tpch0.001/lineitem.tbl"))) };
-		IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
-				ordersSplits);
-		RecordDescriptor ordersDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+        "data/tpch0.001/lineitem.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-		FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
-				spec, ordersSplitsProvider,
-				new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-						UTF8StringParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						FloatParserFactory.INSTANCE,
-						FloatParserFactory.INSTANCE,
-						FloatParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
-		PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
-		PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		ordScanner.setPartitionConstraint(csvPartitionConstraint);
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        ordScanner.setPartitionConstraint(csvPartitionConstraint);
 
-		int[] keys = new int[] { 0 };
-		int tableSize = 8;
+        int[] keys = new int[] { 0 };
+        int tableSize = 8;
 
-		ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
-				spec,
-				keys,
-				3,
-				false,
-				new FieldHashPartitionComputerFactory(
-						keys,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiAggregatorFactory(
-						new IFieldValueResultingAggregatorFactory[] {
-								new CountAggregatorFactory(),
-								new SumAggregatorFactory(4),
-								new MinMaxAggregatorFactory(true, 5) }),
-				outputRec, tableSize);
+        ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3, false,
+                new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                        new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
+                                new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec, tableSize);
 
-		PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		grouper.setPartitionConstraint(grouperPartitionConstraint);
+        PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        grouper.setPartitionConstraint(grouperPartitionConstraint);
 
-		IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keys,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, ordScanner, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, ordScanner, 0, grouper, 0);
 
-		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-		PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		printer.setPartitionConstraint(printerPartitionConstraint);
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        printer.setPartitionConstraint(printerPartitionConstraint);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-	/**
-	 * Test 05: aggregate on multiple key fields
-	 * 
-	 * @throws Exception
-	 */
-	@Test
-	public void externalAggregateTestMultiKeys() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    /**
+     * Test 05: aggregate on multiple key fields
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void externalAggregateTestMultiKeys() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID,
-				new FileReference(new File("data/tpch0.001/lineitem.tbl"))) };
-		IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
-				ordersSplits);
-		RecordDescriptor ordersDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+        "data/tpch0.001/lineitem.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-		FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
-				spec, ordersSplitsProvider,
-				new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						FloatParserFactory.INSTANCE,
-						FloatParserFactory.INSTANCE,
-						FloatParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
-		PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE });
 
-		PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		ordScanner.setPartitionConstraint(csvPartitionConstraint);
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        ordScanner.setPartitionConstraint(csvPartitionConstraint);
 
-		// Group on two fields
-		int[] keys = new int[] { 0, 1 };
-		int tableSize = 8;
+        // Group on two fields
+        int[] keys = new int[] { 0, 1 };
+        int tableSize = 8;
 
-		ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
-				spec,
-				keys,
-				3,
-				false,
-				new FieldHashPartitionComputerFactory(keys,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiAggregatorFactory(
-						new IFieldValueResultingAggregatorFactory[] {
-								new CountAggregatorFactory(),
-								new SumAggregatorFactory(4),
-								new MinMaxAggregatorFactory(true, 5) }),
-				outputRec, tableSize);
+        ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3, false,
+                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+            UTF8StringBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+                    new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
+                            new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec,
+                            tableSize);
 
-		PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		grouper.setPartitionConstraint(grouperPartitionConstraint);
+        PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        grouper.setPartitionConstraint(grouperPartitionConstraint);
 
-		IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keys,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, ordScanner, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, ordScanner, 0, grouper, 0);
 
-		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-		PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		printer.setPartitionConstraint(printerPartitionConstraint);
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        printer.setPartitionConstraint(printerPartitionConstraint);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-	/**
-	 * Test 06: tests on non-string key field
-	 * 
-	 * @throws Exception
-	 */
-	@Test
-	public void externalAggregateTestNonStringKey() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    /**
+     * Test 06: tests on non-string key field
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void externalAggregateTestNonStringKey() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID,
-				new FileReference(new File("data/tpch0.001/lineitem.tbl"))) };
-		IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
-				ordersSplits);
-		RecordDescriptor ordersDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+        "data/tpch0.001/lineitem.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-		FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
-				spec, ordersSplitsProvider,
-				new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-						IntegerParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						IntegerParserFactory.INSTANCE,
-						FloatParserFactory.INSTANCE,
-						FloatParserFactory.INSTANCE,
-						FloatParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE,
-						UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
-		PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                        IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE });
 
-		PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		ordScanner.setPartitionConstraint(csvPartitionConstraint);
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        ordScanner.setPartitionConstraint(csvPartitionConstraint);
 
-		// Group on two fields
-		int[] keys = new int[] { 0, 1 };
-		int tableSize = 8;
+        // Group on two fields
+        int[] keys = new int[] { 0, 1 };
+        int tableSize = 8;
 
-		ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
-				spec, keys, 3000, true, new FieldHashPartitionComputerFactory(
-						keys, new IBinaryHashFunctionFactory[] {
-								IntegerBinaryHashFunctionFactory.INSTANCE,
-								IntegerBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] {
-						IntegerBinaryComparatorFactory.INSTANCE,
-						IntegerBinaryComparatorFactory.INSTANCE },
-				new MultiAggregatorFactory(
-						new IFieldValueResultingAggregatorFactory[] {
-								new CountAggregatorFactory(),
-								new SumAggregatorFactory(4),
-								new MinMaxAggregatorFactory(true, 5) }),
-				outputRec, tableSize);
+        ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3000, true,
+                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                        IntegerBinaryHashFunctionFactory.INSTANCE, IntegerBinaryHashFunctionFactory.INSTANCE }),
+                        new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE,
+            IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+                    new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
+                            new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec,
+                            tableSize);
 
-		PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		grouper.setPartitionConstraint(grouperPartitionConstraint);
+        PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        grouper.setPartitionConstraint(grouperPartitionConstraint);
 
-		IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keys,
-						new IBinaryHashFunctionFactory[] {
-								IntegerBinaryHashFunctionFactory.INSTANCE,
-								IntegerBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, ordScanner, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                        IntegerBinaryHashFunctionFactory.INSTANCE, IntegerBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, ordScanner, 0, grouper, 0);
 
-		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-		PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
-				new LocationConstraint[] { new AbsoluteLocationConstraint(
-						NC1_ID) });
-		printer.setPartitionConstraint(printerPartitionConstraint);
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        printer.setPartitionConstraint(printerPartitionConstraint);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 }
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 9b88062..f02c855 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -65,308 +65,243 @@
 /**
  * The application client for the performance tests of the external hash group
  * operator.
- * 
  */
 public class ExternalGroupClient {
-	private static class Options {
-		@Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
-		public String host;
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
 
-		@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
-		public int port = 1099;
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        public int port = 1099;
 
-		@Option(name = "-app", usage = "Hyracks Application name", required = true)
-		public String app;
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
 
-		@Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
-		public String inFileSplits;
+        @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+        public String inFileSplits;
 
-		@Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
-		public String outFileSplits;
+        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+        public String outFileSplits;
 
-		@Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
-		public int htSize = 8191;
+        @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
+        public int htSize = 8191;
 
-		@Option(name = "-frames-limit", usage = "Frame size (default: 32768)", required = false)
-		public int framesLimit = 32768;
+        @Option(name = "-frames-limit", usage = "Frame size (default: 32768)", required = false)
+        public int framesLimit = 32768;
 
-		@Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
-		public int sbSize = 512;
+        @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
+        public int sbSize = 512;
 
-		@Option(name = "-sort-output", usage = "Whether to sort the output (default: true)", required = false)
-		public boolean sortOutput = false;
+        @Option(name = "-sort-output", usage = "Whether to sort the output (default: true)", required = false)
+        public boolean sortOutput = false;
 
-		@Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
-		public boolean outPlain = true;
-	}
+        @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
+        public boolean outPlain = true;
+    }
 
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) throws Exception {
-		Options options = new Options();
-		CmdLineParser parser = new CmdLineParser(options);
-		parser.parseArgument(args);
+    /**
+     * @param args
+     */
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
 
-		IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
-				options.port);
+        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
 
-		JobSpecification job;
+        JobSpecification job;
 
-		for (int i = 0; i < 3; i++) {
-			long start = System.currentTimeMillis();
-			job = createJob(parseFileSplits(options.inFileSplits),
-					parseFileSplits(options.outFileSplits, i % 2),
-					options.htSize, options.sbSize, options.framesLimit,
-					options.sortOutput, i % 2, options.outPlain);
+        for (int i = 0; i < 3; i++) {
+            long start = System.currentTimeMillis();
+            job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i % 2),
+                    options.htSize, options.sbSize, options.framesLimit, options.sortOutput, i % 2, options.outPlain);
 
-			System.out.print(i + "\t" + (System.currentTimeMillis() - start));
-			start = System.currentTimeMillis();
-			UUID jobId = hcc.createJob(options.app, job);
-			hcc.start(jobId);
-			hcc.waitForCompletion(jobId);
-			System.out.println("\t" + (System.currentTimeMillis() - start));
-		}
-	}
+            System.out.print(i + "\t" + (System.currentTimeMillis() - start));
+            start = System.currentTimeMillis();
+            UUID jobId = hcc.createJob(options.app, job);
+            hcc.start(jobId);
+            hcc.waitForCompletion(jobId);
+            System.out.println("\t" + (System.currentTimeMillis() - start));
+        }
+    }
 
-	private static FileSplit[] parseFileSplits(String fileSplits) {
-		String[] splits = fileSplits.split(",");
-		FileSplit[] fSplits = new FileSplit[splits.length];
-		for (int i = 0; i < splits.length; ++i) {
-			String s = splits[i].trim();
-			int idx = s.indexOf(':');
-			if (idx < 0) {
-				throw new IllegalArgumentException("File split " + s
-						+ " not well formed");
-			}
-			fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
-					new File(s.substring(idx + 1))));
-		}
-		return fSplits;
-	}
+    private static FileSplit[] parseFileSplits(String fileSplits) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+        }
+        return fSplits;
+    }
 
-	private static FileSplit[] parseFileSplits(String fileSplits, int count) {
-		String[] splits = fileSplits.split(",");
-		FileSplit[] fSplits = new FileSplit[splits.length];
-		for (int i = 0; i < splits.length; ++i) {
-			String s = splits[i].trim();
-			int idx = s.indexOf(':');
-			if (idx < 0) {
-				throw new IllegalArgumentException("File split " + s
-						+ " not well formed");
-			}
-			fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
-					new File(s.substring(idx + 1) + "_" + count)));
-		}
-		return fSplits;
-	}
+    private static FileSplit[] parseFileSplits(String fileSplits, int count) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
+                    + count)));
+        }
+        return fSplits;
+    }
 
-	private static JobSpecification createJob(FileSplit[] inSplits,
-			FileSplit[] outSplits, int htSize, int sbSize, int framesLimit,
-			boolean sortOutput, int alg, boolean outPlain) {
-		JobSpecification spec = new JobSpecification();
-		IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(
-				inSplits);
+    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
+            int framesLimit, boolean sortOutput, int alg, boolean outPlain) {
+        JobSpecification spec = new JobSpecification();
+        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
 
-		RecordDescriptor inDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-		FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(
-				spec, splitsProvider, new DelimitedDataTupleParserFactory(
-						new IValueParserFactory[] {
-								IntegerParserFactory.INSTANCE,
-								IntegerParserFactory.INSTANCE,
-								IntegerParserFactory.INSTANCE,
-								IntegerParserFactory.INSTANCE,
-								IntegerParserFactory.INSTANCE,
-								FloatParserFactory.INSTANCE,
-								FloatParserFactory.INSTANCE,
-								FloatParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE, }, '|'),
-				inDesc);
+        FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                        IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
 
-		fileScanner.setPartitionConstraint(createPartitionConstraint(inSplits));
+        fileScanner.setPartitionConstraint(createPartitionConstraint(inSplits));
 
-		// Output: each unique string with an integer count
-		RecordDescriptor outDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						IntegerSerializerDeserializer.INSTANCE,
-						// IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE });
+        // Output: each unique string with an integer count
+        RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE,
+                // IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
 
-		// Specify the grouping key, which will be the string extracted during
-		// the scan.
-		int[] keys = new int[] { 0,
-		// 1
-		};
+        // Specify the grouping key, which will be the string extracted during
+        // the scan.
+        int[] keys = new int[] { 0,
+                // 1
+        };
 
-		AbstractOperatorDescriptor grouper;
+        AbstractOperatorDescriptor grouper;
 
-		switch (alg) {
-		case 0: // External hash group
-			grouper = new ExternalHashGroupOperatorDescriptor(
-					spec,
-					keys,
-					framesLimit,
-					false,
-					new FieldHashPartitionComputerFactory(keys,
-							new IBinaryHashFunctionFactory[] {
-							// IntegerBinaryHashFunctionFactory.INSTANCE,
-							IntegerBinaryHashFunctionFactory.INSTANCE }),
-					new IBinaryComparatorFactory[] {
-					// IntegerBinaryComparatorFactory.INSTANCE,
-					IntegerBinaryComparatorFactory.INSTANCE },
-					new MultiAggregatorFactory(
-							new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-					outDesc, htSize);
+        switch (alg) {
+            case 0: // External hash group
+                grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                                // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
+                            htSize);
 
-			grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
+                grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
 
-			// Connect scanner with the grouper
-			IConnectorDescriptor scanGroupConn = new MToNHashPartitioningConnectorDescriptor(
-					spec, new FieldHashPartitionComputerFactory(keys,
-							new IBinaryHashFunctionFactory[] {
-							// IntegerBinaryHashFunctionFactory.INSTANCE,
-							IntegerBinaryHashFunctionFactory.INSTANCE }));
-			spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
-			break;
-		case 1: // External sort + pre-cluster
-			ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-					spec, framesLimit, keys, new IBinaryComparatorFactory[] {
-					// IntegerBinaryComparatorFactory.INSTANCE,
-					IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
-			sorter.setPartitionConstraint(createPartitionConstraint(inSplits));
+                // Connect scanner with the grouper
+                IConnectorDescriptor scanGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                                // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                IntegerBinaryHashFunctionFactory.INSTANCE }));
+                spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
+                break;
+            case 1: // External sort + pre-cluster
+                ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
+                        new IBinaryComparatorFactory[] {
+                        // IntegerBinaryComparatorFactory.INSTANCE,
+                        IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
+                sorter.setPartitionConstraint(createPartitionConstraint(inSplits));
 
-			// Connect scan operator with the sorter
-			IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(
-					spec, new FieldHashPartitionComputerFactory(keys,
-							new IBinaryHashFunctionFactory[] {
-							// IntegerBinaryHashFunctionFactory.INSTANCE,
-							IntegerBinaryHashFunctionFactory.INSTANCE }));
-			spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
+                // Connect scan operator with the sorter
+                IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(spec,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                                // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                IntegerBinaryHashFunctionFactory.INSTANCE }));
+                spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
 
-			grouper = new PreclusteredGroupOperatorDescriptor(
-					spec,
-					keys,
-					new IBinaryComparatorFactory[] {
-					// IntegerBinaryComparatorFactory.INSTANCE,
-					IntegerBinaryComparatorFactory.INSTANCE },
-					new MultiAggregatorFactory(
-							new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-					outDesc);
+                grouper = new PreclusteredGroupOperatorDescriptor(spec, keys, new IBinaryComparatorFactory[] {
+                        // IntegerBinaryComparatorFactory.INSTANCE,
+                        IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+                                new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc);
 
-			grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
+                grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
 
-			// Connect sorter with the pre-cluster
-			OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(
-					spec);
-			spec.connect(sortGroupConn, sorter, 0, grouper, 0);
-			break;
-		case 2: // In-memory hash group
-			grouper = new HashGroupOperatorDescriptor(
-					spec,
-					keys,
-					new FieldHashPartitionComputerFactory(keys,
-							new IBinaryHashFunctionFactory[] {
-							// IntegerBinaryHashFunctionFactory.INSTANCE,
-							IntegerBinaryHashFunctionFactory.INSTANCE }),
-					new IBinaryComparatorFactory[] {
-					// IntegerBinaryComparatorFactory.INSTANCE,
-					IntegerBinaryComparatorFactory.INSTANCE },
-					new MultiAggregatorFactory(
-							new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-					outDesc, htSize);
+                // Connect sorter with the pre-cluster
+                OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
+                spec.connect(sortGroupConn, sorter, 0, grouper, 0);
+                break;
+            case 2: // In-memory hash group
+                grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] {
+                        // IntegerBinaryHashFunctionFactory.INSTANCE,
+                        IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
+                            htSize);
 
-			grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
+                grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
 
-			// Connect scanner with the grouper
-			IConnectorDescriptor scanConn = new MToNHashPartitioningConnectorDescriptor(
-					spec, new FieldHashPartitionComputerFactory(keys,
-							new IBinaryHashFunctionFactory[] {
-							// IntegerBinaryHashFunctionFactory.INSTANCE,
-							IntegerBinaryHashFunctionFactory.INSTANCE }));
-			spec.connect(scanConn, fileScanner, 0, grouper, 0);
-			break;
-		default:
-			grouper = new ExternalHashGroupOperatorDescriptor(
-					spec,
-					keys,
-					framesLimit,
-					false,
-					new FieldHashPartitionComputerFactory(keys,
-							new IBinaryHashFunctionFactory[] {
-							// IntegerBinaryHashFunctionFactory.INSTANCE,
-							IntegerBinaryHashFunctionFactory.INSTANCE }),
-					new IBinaryComparatorFactory[] {
-					// IntegerBinaryComparatorFactory.INSTANCE,
-					IntegerBinaryComparatorFactory.INSTANCE },
-					new MultiAggregatorFactory(
-							new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-					outDesc, htSize);
+                // Connect scanner with the grouper
+                IConnectorDescriptor scanConn = new MToNHashPartitioningConnectorDescriptor(spec,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                                // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                IntegerBinaryHashFunctionFactory.INSTANCE }));
+                spec.connect(scanConn, fileScanner, 0, grouper, 0);
+                break;
+            default:
+                grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                                // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
+                            htSize);
 
-			grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
+                grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
 
-			// Connect scanner with the grouper
-			IConnectorDescriptor scanGroupConnDef = new MToNHashPartitioningConnectorDescriptor(
-					spec, new FieldHashPartitionComputerFactory(keys,
-							new IBinaryHashFunctionFactory[] {
-							// IntegerBinaryHashFunctionFactory.INSTANCE,
-							IntegerBinaryHashFunctionFactory.INSTANCE }));
-			spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
-		}
+                // Connect scanner with the grouper
+                IConnectorDescriptor scanGroupConnDef = new MToNHashPartitioningConnectorDescriptor(spec,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                                // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                IntegerBinaryHashFunctionFactory.INSTANCE }));
+                spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
+        }
 
-		IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
-				outSplits);
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
 
-		AbstractSingleActivityOperatorDescriptor writer;
+        AbstractSingleActivityOperatorDescriptor writer;
 
-		if (outPlain)
-			writer = new PlainFileWriterOperatorDescriptor(spec,
-					outSplitProvider, "|");
-		else
-			writer = new FrameFileWriterOperatorDescriptor(spec,
-					outSplitProvider);
+        if (outPlain)
+            writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+        else
+            writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
 
-		writer.setPartitionConstraint(createPartitionConstraint(outSplits));
+        writer.setPartitionConstraint(createPartitionConstraint(outSplits));
 
-		IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(
-				spec);
-		spec.connect(groupOutConn, grouper, 0, writer, 0);
+        IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(groupOutConn, grouper, 0, writer, 0);
 
-		spec.addRoot(writer);
-		return spec;
-	}
+        spec.addRoot(writer);
+        return spec;
+    }
 
-	private static PartitionConstraint createPartitionConstraint(
-			FileSplit[] splits) {
-		LocationConstraint[] lConstraints = new LocationConstraint[splits.length];
-		for (int i = 0; i < splits.length; ++i) {
-			lConstraints[i] = new AbsoluteLocationConstraint(
-					splits[i].getNodeName());
-		}
-		return new ExplicitPartitionConstraint(lConstraints);
-	}
+    private static PartitionConstraint createPartitionConstraint(FileSplit[] splits) {
+        LocationConstraint[] lConstraints = new LocationConstraint[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            lConstraints[i] = new AbsoluteLocationConstraint(splits[i].getNodeName());
+        }
+        return new ExplicitPartitionConstraint(lConstraints);
+    }
 }