Reimplemented min/max aggregate functions to support all totally ordered types that have an IBinaryComparator implementation. Added tests.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@619 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql
index 1d044f8..8eefced 100644
--- a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql
@@ -15,5 +15,7 @@
 let $i64 := max([int64("1"), int64("2"), int64("3")])
 let $f := max([float("1"), float("2"), float("3")])
 let $d := max([double("1"), double("2"), double("3")])
-for $i in [$i8, $i16, $i32, $i64, $f, $d]
+let $s := max(["foo", "bar", "world"])
+let $dt := max([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z")])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
 return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql
index 7dcd368..04436bf 100644
--- a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql
@@ -15,5 +15,7 @@
 let $i64 := max([int64("1"), int64("2"), int64("3"), null])
 let $f := max([float("1"), float("2"), float("3"), null])
 let $d := max([double("1"), double("2"), double("3"), null])
-for $i in [$i8, $i16, $i32, $i64, $f, $d]
+let $s := max(["foo", "bar", "world", null])
+let $dt := min([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z"), null])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
 return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql
index 3089758..04aa735 100644
--- a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql
@@ -15,5 +15,7 @@
 let $i64 := min([int64("1"), int64("2"), int64("3")])
 let $f := min([float("1"), float("2"), float("3")])
 let $d := min([double("1"), double("2"), double("3")])
-for $i in [$i8, $i16, $i32, $i64, $f, $d]
+let $s := min(["foo", "bar", "world"])
+let $dt := min([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z")])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
 return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql
index 8601228..523ca26 100644
--- a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql
@@ -15,5 +15,7 @@
 let $i64 := min([int64("1"), int64("2"), int64("3"), null])
 let $f := min([float("1"), float("2"), float("3"), null])
 let $d := min([double("1"), double("2"), double("3"), null])
-for $i in [$i8, $i16, $i32, $i64, $f, $d]
+let $s := min(["foo", "bar", "world", null])
+let $dt := min([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z"), null])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
 return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm
index a2a1efe..d420c2f 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm
@@ -3,4 +3,6 @@
 3
 3i64
 3.0f
-3.0d
\ No newline at end of file
+3.0d
+"world"
+datetime("2012-03-01T00:00:00.000Z")
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm
index 0800a91..c9f3cb3 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm
@@ -3,4 +3,6 @@
 null
 null
 null
+null
+null
 null
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm
index ebfec4e..6acf213 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm
@@ -3,4 +3,6 @@
 1
 1i64
 1.0f
-1.0d
\ No newline at end of file
+1.0d
+"bar"
+datetime("2012-01-01T00:00:00.000Z")
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm
index 0800a91..c9f3cb3 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm
@@ -3,4 +3,6 @@
 null
 null
 null
+null
+null
 null
\ No newline at end of file
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
index 93167cf..e4db8da 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
@@ -6,7 +6,6 @@
 import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
 import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectDescBinaryComparatorFactory;
 import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.BooleanBinaryComparatorFactory;
-import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.LongBinaryComparatorFactory;
 import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.RectangleBinaryComparatorFactory;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -15,17 +14,26 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.BytePointable;
 import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
 import edu.uci.ics.hyracks.data.std.primitive.FloatPointable;
 import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 
 public class AqlBinaryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider, Serializable {
 
     private static final long serialVersionUID = 1L;
     public static final AqlBinaryComparatorFactoryProvider INSTANCE = new AqlBinaryComparatorFactoryProvider();
+    public static final PointableBinaryComparatorFactory BYTE_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
+            BytePointable.FACTORY);
+    public static final PointableBinaryComparatorFactory SHORT_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
+            ShortPointable.FACTORY);
     public static final PointableBinaryComparatorFactory INTEGER_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
             IntegerPointable.FACTORY);
+    public static final PointableBinaryComparatorFactory LONG_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
+            LongPointable.FACTORY);
     public static final PointableBinaryComparatorFactory FLOAT_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
             FloatPointable.FACTORY);
     public static final PointableBinaryComparatorFactory DOUBLE_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
@@ -58,7 +66,11 @@
             return anyBinaryComparatorFactory(ascending);
         }        
         IAType aqlType = (IAType) type;
-        switch (aqlType.getTypeTag()) {
+        return getBinaryComparatorFactory(aqlType.getTypeTag(), ascending);
+    }
+    
+    public IBinaryComparatorFactory getBinaryComparatorFactory(ATypeTag type, boolean ascending) {
+        switch (type) {
             case ANY:
             case UNION: { // we could do smth better for nullable fields
                 return anyBinaryComparatorFactory(ascending);
@@ -83,11 +95,17 @@
             case BOOLEAN: {
                 return addOffset(BooleanBinaryComparatorFactory.INSTANCE, ascending);
             }
+            case INT8: {
+                return addOffset(BYTE_POINTABLE_INSTANCE, ascending);
+            }
+            case INT16: {
+                return addOffset(SHORT_POINTABLE_INSTANCE, ascending);
+            }
             case INT32: {
                 return addOffset(INTEGER_POINTABLE_INSTANCE, ascending);
             }
             case INT64: {
-                return addOffset(LongBinaryComparatorFactory.INSTANCE, ascending);
+                return addOffset(LONG_POINTABLE_INSTANCE, ascending);
             }
             case FLOAT: {
                 return addOffset(FLOAT_POINTABLE_INSTANCE, ascending);
@@ -101,12 +119,14 @@
             case RECTANGLE: {
                 return addOffset(RectangleBinaryComparatorFactory.INSTANCE, ascending);
             }
+            case DATE:
+            case TIME:
             case DATETIME: {
-            	return addOffset(ADateTimeAscBinaryComparatorFactory.INSTANCE, ascending);
+                return addOffset(ADateTimeAscBinaryComparatorFactory.INSTANCE, ascending);
             }
             default: {
                 throw new NotImplementedException("No binary comparator factory implemented for type "
-                        + aqlType.getTypeTag() + " .");
+                        + type + " .");
             }
         }
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index 5376e57..08bb063 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -116,19 +116,21 @@
                     @Override
                     public void step(IFrameTupleReference tuple) throws AlgebricksException {
                         inputVal.reset();
-                        eval.evaluate(tuple);
-                        ++count;
+                        eval.evaluate(tuple);                        
                         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
                                 .deserialize(inputVal.getByteArray()[0]);
                         if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
                             aggType = ATypeTag.NULL;
                             return;
-                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+                        } else if (aggType == ATypeTag.SYSTEM_NULL) {                           
                             aggType = typeTag;
                         } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
                             throw new AlgebricksException("Unexpected type " + typeTag
                                     + " in aggregation input stream. Expected type " + aggType + ".");
                         }
+                        if (typeTag != ATypeTag.SYSTEM_NULL) {
+                            ++count;
+                        }                        
                         switch (typeTag) {
                             case INT8: {
                                 byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
index b629c32..02d8c91 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
@@ -81,6 +81,7 @@
                     private DataOutput out = provider.getDataOutput();
                     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
                     private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
+                    private ATypeTag aggType;
                     private double sum;
                     private int count;
 
@@ -104,65 +105,72 @@
                             .getSerializerDeserializer(BuiltinType.ANULL);
                     private AMutableDouble aDouble = new AMutableDouble(0);
                     private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private boolean metNull;
 
                     @Override
                     public void init() {
+                        aggType = ATypeTag.SYSTEM_NULL;
                         sum = 0.0;
                         count = 0;
-                        metNull = false;
                     }
 
                     @Override
                     public void step(IFrameTupleReference tuple) throws AlgebricksException {
                         inputVal.reset();
                         eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type "
-                                            + typeTag);
-                                }
-                            }
-                            inputVal.reset();
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                                .deserialize(inputVal.getByteArray()[0]);
+                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+                            aggType = ATypeTag.NULL;
+                            return;
+                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+                            aggType = typeTag;
+                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+                            throw new AlgebricksException("Unexpected type " + typeTag
+                                    + " in aggregation input stream. Expected type " + aggType + ".");
                         }
+                        if (typeTag != ATypeTag.SYSTEM_NULL) {
+                            ++count;
+                        }
+                        switch (typeTag) {
+                            case INT8: {
+                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT16: {
+                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT32: {
+                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT64: {
+                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case FLOAT: {
+                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case DOUBLE: {
+                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case NULL: {
+                                break;
+                            }
+                            default: {
+                                throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type "
+                                        + typeTag);
+                            }
+                        }
+                        inputVal.reset();
                     }
 
                     @Override
@@ -172,7 +180,7 @@
                                 out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
                                 return;
                             }
-                            if (metNull) {
+                            if (aggType == ATypeTag.NULL) {
                                 sumBytes.reset();
                                 nullSerde.serialize(ANull.NULL, sumBytesOutput);
                             } else {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
index 4d234f3..59287d5 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
@@ -36,7 +36,7 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-                return new MaxAggregateFunction(args, provider, true);
+                return new MinMaxAggregateFunction(args, provider, false, true);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
index f3c8ea6..ca32e2f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -36,7 +36,7 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-                return new MinAggregateFunction(args, provider, true);
+                return new MinMaxAggregateFunction(args, provider, true, true);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
index e59c19d..18bfe9f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
@@ -35,7 +35,7 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-                return new MaxAggregateFunction(args, provider, false);
+                return new MinMaxAggregateFunction(args, provider, false, false);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
deleted file mode 100644
index df244e1..0000000
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
+++ /dev/null
@@ -1,208 +0,0 @@
-package edu.uci.ics.asterix.runtime.aggregates.std;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class MaxAggregateFunction implements ICopyAggregateFunction {
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private DataOutput out;    
-    private ICopyEvaluator eval;
-    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
-    private byte byteVal = Byte.MIN_VALUE;
-    private short shortVal = Short.MIN_VALUE;
-    private int intVal = Integer.MIN_VALUE;
-    private long longVal = Long.MIN_VALUE;
-    private float floatVal = Float.MIN_VALUE;
-    private double doubleVal = Double.MIN_VALUE;
-
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableFloat aFloat = new AMutableFloat(0);
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-    @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer serde;
-    private final boolean isLocalAgg;
-    
-    public MaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
-            throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
-        this.isLocalAgg = isLocalAgg;
-    }
-    
-    @Override
-    public void init() {
-        byteVal = Byte.MIN_VALUE;
-        shortVal = Short.MIN_VALUE;
-        intVal = Integer.MIN_VALUE;
-        longVal = Long.MIN_VALUE;
-        floatVal = Float.MIN_VALUE;
-        doubleVal = Double.MIN_VALUE;
-
-        metInt8s = false;
-        metInt16s = false;
-        metInt32s = false;
-        metInt64s = false;
-        metFloats = false;
-        metDoubles = false;
-        metNull = false;
-    }
-
-    @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        if (inputVal.getLength() > 0) {
-            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                    .getByteArray()[0]);
-            switch (typeTag) {
-                case INT8: {
-                    metInt8s = true;
-                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                    if (val > byteVal)
-                        byteVal = val;
-                    break;
-                }
-                case INT16: {
-                    metInt16s = true;
-                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                    if (val > shortVal)
-                        shortVal = val;
-                    break;
-                }
-                case INT32: {
-                    metInt32s = true;
-                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                    if (val > intVal)
-                        intVal = val;
-                    break;
-                }
-                case INT64: {
-                    metInt64s = true;
-                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                    if (val > longVal)
-                        longVal = val;
-                    break;
-                }
-                case FLOAT: {
-                    metFloats = true;
-                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                    if (val > floatVal)
-                        floatVal = val;
-                    break;
-                }
-                case DOUBLE: {
-                    metDoubles = true;
-                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                    if (val > doubleVal)
-                        doubleVal = val;
-                    break;
-                }
-                case NULL: {
-                    metNull = true;
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // For global aggregates simply ignore system null here,
-                    // but if all input value are system null, then we should return
-                    // null in finish().
-                    if (isLocalAgg) {
-                        throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                    }
-                    break;
-                }
-                default: {
-                    throw new NotImplementedException("Cannot compute SUM for values of type "
-                            + typeTag);
-                }
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void finish() throws AlgebricksException {
-        try {
-            if (metNull) {
-                serde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(BuiltinType.ANULL);
-                serde.serialize(ANull.NULL, out);
-            } else if (metDoubles) {
-                serde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                aDouble.setValue(doubleVal);
-                serde.serialize(aDouble, out);
-            } else if (metFloats) {
-                serde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(BuiltinType.AFLOAT);
-                aFloat.setValue(floatVal);
-                serde.serialize(aFloat, out);
-            } else if (metInt64s) {
-                serde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(BuiltinType.AINT64);
-                aInt64.setValue(longVal);
-                serde.serialize(aInt64, out);
-            } else if (metInt32s) {
-                serde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(BuiltinType.AINT32);
-                aInt32.setValue(intVal);
-                serde.serialize(aInt32, out);
-            } else if (metInt16s) {
-                serde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(BuiltinType.AINT16);
-                aInt16.setValue(shortVal);
-                serde.serialize(aInt16, out);
-            } else if (metInt8s) {
-                serde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(BuiltinType.AINT8);
-                aInt8.setValue(byteVal);
-                serde.serialize(aInt8, out);
-            } else {
-                // Empty stream. For local agg return system null. For global agg return null.
-                if (isLocalAgg) {
-                    out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                } else {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
-                }
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
-    }
-}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
index fd9fc0d..0bf6ddc 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
@@ -35,7 +35,7 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-                return new MinAggregateFunction(args, provider, false);
+                return new MinMaxAggregateFunction(args, provider, true, false);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
deleted file mode 100644
index 6581d07..0000000
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package edu.uci.ics.asterix.runtime.aggregates.std;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class MinAggregateFunction implements ICopyAggregateFunction {	
-	private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-	private DataOutput out;
-	private ICopyEvaluator eval;
-	private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats,
-			metDoubles, metNull;
-	private byte byteVal = Byte.MAX_VALUE;
-	private short shortVal = Short.MAX_VALUE;
-	private int intVal = Integer.MAX_VALUE;
-	private long longVal = Long.MAX_VALUE;
-	private float floatVal = Float.MAX_VALUE;
-	private double doubleVal = Double.MAX_VALUE;
-
-	private AMutableDouble aDouble = new AMutableDouble(0);
-	private AMutableFloat aFloat = new AMutableFloat(0);
-	private AMutableInt64 aInt64 = new AMutableInt64(0);
-	private AMutableInt32 aInt32 = new AMutableInt32(0);
-	private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-	private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-	@SuppressWarnings("rawtypes")
-	private ISerializerDeserializer serde;
-	private final boolean isLocalAgg;
-	
-    public MinAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
-            throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
-        this.isLocalAgg = isLocalAgg;
-    }
-	
-	@Override
-	public void init() {
-	    byteVal = Byte.MAX_VALUE;
-	    shortVal = Short.MAX_VALUE;
-		intVal = Integer.MAX_VALUE;
-		longVal = Long.MAX_VALUE;
-		floatVal = Float.MAX_VALUE;
-		doubleVal = Double.MAX_VALUE;
-
-		metInt8s = false;
-		metInt16s = false;
-		metInt32s = false;
-		metInt64s = false;
-		metFloats = false;
-		metDoubles = false;
-		metNull = false;
-	}
-
-	@Override
-	public void step(IFrameTupleReference tuple) throws AlgebricksException {
-		inputVal.reset();
-		eval.evaluate(tuple);
-		if (inputVal.getLength() > 0) {
-			ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-					.deserialize(inputVal.getByteArray()[0]);
-			switch (typeTag) {
-			case INT8: {
-				metInt8s = true;
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                if (val < byteVal)
-                    byteVal = val;
-                break;
-			}
-			case INT16: {
-				metInt16s = true;
-				short val = AInt16SerializerDeserializer.getShort(
-						inputVal.getByteArray(), 1);
-				if (val < shortVal)
-					shortVal = val;
-				break;
-			}
-			case INT32: {
-				metInt32s = true;
-				int val = AInt32SerializerDeserializer.getInt(
-						inputVal.getByteArray(), 1);
-				if (val < intVal)
-					intVal = val;
-				break;
-			}
-			case INT64: {
-				metInt64s = true;
-				long val = AInt64SerializerDeserializer.getLong(
-						inputVal.getByteArray(), 1);
-				if (val < longVal)
-					longVal = val;
-				break;
-			}
-			case FLOAT: {
-				metFloats = true;
-				float val = AFloatSerializerDeserializer.getFloat(
-						inputVal.getByteArray(), 1);
-				if (val < floatVal)
-					floatVal = val;
-				break;
-			}
-			case DOUBLE: {
-				metDoubles = true;
-				double val = ADoubleSerializerDeserializer.getDouble(
-						inputVal.getByteArray(), 1);
-				if (val < doubleVal)
-					doubleVal = val;
-				break;
-			}
-			case NULL: {
-				metNull = true;
-				break;
-			}
-			case SYSTEM_NULL: {
-                // For global aggregates simply ignore system null here,
-                // but if all input value are system null, then we should return
-                // null in finish().
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                }
-                break;
-			}
-			default: {
-				throw new NotImplementedException(
-						"Cannot compute SUM for values of type " + typeTag);
-			}
-			}
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-    @Override
-	public void finish() throws AlgebricksException {
-		try {
-			if (metNull) {
-				serde = AqlSerializerDeserializerProvider.INSTANCE
-						.getSerializerDeserializer(BuiltinType.ANULL);
-				serde.serialize(ANull.NULL, out);
-			} else if (metDoubles) {
-				serde = AqlSerializerDeserializerProvider.INSTANCE
-						.getSerializerDeserializer(BuiltinType.ADOUBLE);
-				aDouble.setValue(doubleVal);
-				serde.serialize(aDouble, out);
-			} else if (metFloats) {
-				serde = AqlSerializerDeserializerProvider.INSTANCE
-						.getSerializerDeserializer(BuiltinType.AFLOAT);
-				aFloat.setValue(floatVal);
-				serde.serialize(aFloat, out);
-			} else if (metInt64s) {
-				serde = AqlSerializerDeserializerProvider.INSTANCE
-						.getSerializerDeserializer(BuiltinType.AINT64);
-				aInt64.setValue(longVal);
-				serde.serialize(aInt64, out);
-			} else if (metInt32s) {
-				serde = AqlSerializerDeserializerProvider.INSTANCE
-						.getSerializerDeserializer(BuiltinType.AINT32);
-				aInt32.setValue(intVal);
-				serde.serialize(aInt32, out);
-			} else if (metInt16s) {
-				serde = AqlSerializerDeserializerProvider.INSTANCE
-						.getSerializerDeserializer(BuiltinType.AINT16);
-				aInt16.setValue(shortVal);
-				serde.serialize(aInt16, out);
-			} else if (metInt8s) {
-			    serde = AqlSerializerDeserializerProvider.INSTANCE
-                        .getSerializerDeserializer(BuiltinType.AINT8);
-                aInt8.setValue(byteVal);
-                serde.serialize(aInt8, out);
-			} else {
-                // Empty stream. For local agg return system null. For global agg return null.
-                if (isLocalAgg) {
-                    out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                } else {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
-                }
-            }
-		} catch (IOException e) {
-			throw new AlgebricksException(e);
-		}
-	}
-
-	@Override
-	public void finishPartial() throws AlgebricksException {
-		finish();
-	}
-}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
new file mode 100644
index 0000000..bc3508f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
@@ -0,0 +1,106 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class MinMaxAggregateFunction implements ICopyAggregateFunction {
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
+    private DataOutput out;
+    private ICopyEvaluator eval;
+    private ATypeTag aggType;
+    private IBinaryComparator cmp;
+    private final boolean isMin;
+    private final boolean isLocalAgg;
+
+    public MinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
+            boolean isLocalAgg) throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isMin = isMin;
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        outputVal.reset();
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+            aggType = ATypeTag.NULL;
+            return;
+        }
+        if (aggType == ATypeTag.SYSTEM_NULL) {
+            if (typeTag == ATypeTag.SYSTEM_NULL) {
+                // Ignore.
+                return;
+            }
+            // First value encountered. Set type, comparator, and initial value.
+            aggType = typeTag;
+            // Set comparator.
+            IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(aggType, isMin);
+            cmp = cmpFactory.createBinaryComparator();
+            // Initialize min value.
+            outputVal.assign(inputVal);
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        }
+        if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+                outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+            outputVal.assign(inputVal);
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            switch (aggType) {
+                case NULL: {
+                    out.writeByte(ATypeTag.NULL.serialize());
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    // Empty stream. For local agg return system null. For global agg return null.
+                    if (isLocalAgg) {
+                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                    } else {
+                        out.writeByte(ATypeTag.NULL.serialize());
+                    }
+                    break;
+                }
+                default: {
+                    out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index fd83725..d0e82ce 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -65,9 +65,6 @@
         inputVal.reset();
         eval.evaluate(tuple);
         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-        }
         if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
             aggType = ATypeTag.NULL;
             return;