Support variance and standard deviation
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 20038bf..88c5973f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -91,6 +91,8 @@
                                 <argument>-c</argument>
                                 <argument>io.druid.extensions:druid-s3-extensions</argument>
                                 <argument>-c</argument>
+                                <argument>io.druid.extensions:druid-stats</argument>
+                                <argument>-c</argument>
                                 <argument>io.druid.extensions:mysql-metadata-storage</argument>
                                 <argument>-c</argument>
                                 <argument>io.druid.extensions:postgresql-metadata-storage</argument>
diff --git a/docs/content/development/extensions-core/stats.md b/docs/content/development/extensions-core/stats.md
new file mode 100644
index 0000000..7b6afd7
--- /dev/null
+++ b/docs/content/development/extensions-core/stats.md
@@ -0,0 +1,152 @@
+---
+layout: doc_page
+---
+
+# Stats aggregator
+
+Includes stat-related aggregators, including variance and standard deviations, etc. Make sure to [include](../../operations/including-extensions.html) `druid-stats` as an extension.
+
+## Variance aggregator
+
+Algorithm of the aggregator is the same with that of apache hive. This is the description in GenericUDAFVariance in hive.
+
+Evaluate the variance using the algorithm described by Chan, Golub, and LeVeque in
+"Algorithms for computing the sample variance: analysis and recommendations"
+The American Statistician, 37 (1983) pp. 242--247.
+
+variance = variance1 + variance2 + n/(m*(m+n)) * pow(((m/n)*t1 - t2),2)
+
+where: - variance is sum[x-avg^2] (this is actually n times the variance)
+and is updated at every step. - n is the count of elements in chunk1 - m is
+the count of elements in chunk2 - t1 = sum of elements in chunk1, t2 =
+sum of elements in chunk2.
+
+This algorithm was proven to be numerically stable by J.L. Barlow in
+"Error analysis of a pairwise summation algorithm to compute sample variance"
+Numer. Math, 58 (1991) pp. 583--590
+
+### Pre-aggregating variance at ingestion time
+
+To use this feature, an "variance" aggregator must be included at indexing time.
+The ingestion aggregator can only apply to numeric values. If you use "variance"
+then any input rows missing the value will be considered to have a value of 0.
+
+User can specify expected input type as one of "float", "long", "variance" for ingestion, which is by default "float".
+
+```json
+{
+  "type" : "variance",
+  "name" : <output_name>,
+  "fieldName" : <metric_name>,
+  "inputType" : <input_type>,
+  "estimator" : <string>
+}
+```
+
+To query for results, "variance" aggregator with "variance" input type or simply a "varianceFold" aggregator must be included in the query.
+
+```json
+{
+  "type" : "varianceFold",
+  "name" : <output_name>,
+  "fieldName" : <metric_name>,
+  "estimator" : <string>
+}
+```
+
+|Property                 |Description                   |Default                           |
+|-------------------------|------------------------------|----------------------------------|
+|`estimator`|Set "population" to get variance_pop rather than variance_sample, which is default.|null|
+
+
+### Standard Deviation post-aggregator
+
+To acquire standard deviation from variance, user can use "stddev" post aggregator.
+
+```json
+{
+  "type": "stddev",
+  "name": "<output_name>",
+  "fieldName": "<aggregator_name>",
+  "estimator": <string>
+}
+```
+
+## Query Examples:
+
+### Timeseries Query
+
+```json
+{
+  "queryType": "timeseries",
+  "dataSource": "testing",
+  "granularity": "day",
+  "aggregations": [
+    {
+      "type": "variance",
+      "name": "index_var",
+      "fieldName": "index_var"
+    }
+  ]
+  "intervals": [
+    "2016-03-01T00:00:00.000/2013-03-20T00:00:00.000"
+  ]
+}
+```
+
+### TopN Query
+
+```json
+{
+  "queryType": "topN",
+  "dataSource": "testing",
+  "dimensions": ["alias"],
+  "threshold": 5,
+  "granularity": "all",
+  "aggregations": [
+    {
+      "type": "variance",
+      "name": "index_var",
+      "fieldName": "index"
+    }
+  ],
+  "postAggregations": [
+    {
+      "type": "stddev",
+      "name": "index_stddev",
+      "fieldName": "index_var"
+    }
+  ],
+  "intervals": [
+    "2016-03-06T00:00:00/2016-03-06T23:59:59"
+  ]
+}
+```
+
+### GroupBy Query
+
+```json
+{
+  "queryType": "groupBy",
+  "dataSource": "testing",
+  "dimensions": ["alias"],
+  "granularity": "all",
+  "aggregations": [
+    {
+      "type": "variance",
+      "name": "index_var",
+      "fieldName": "index"
+    }
+  ],
+  "postAggregations": [
+    {
+      "type": "stddev",
+      "name": "index_stddev",
+      "fieldName": "index_var"
+    }
+  ],
+  "intervals": [
+    "2016-03-06T00:00:00/2016-03-06T23:59:59"
+  ]
+}
+```
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index e95c397..5a4aec4 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -30,6 +30,7 @@
 |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)|
 |druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
 |druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
+|druid-stats|Statistics related module including variance and standard deviation, which is using the same algorithm with that of hive.|[link](../development/extensions-core/stats.html)|
 |mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)|
 |postgresql-metadata-storage|PostgreSQL metadata store.|[link](../development/extensions-core/postgresql.html)|
 
diff --git a/extensions-core/stats/pom.xml b/extensions-core/stats/pom.xml
new file mode 100644
index 0000000..cb9379c
--- /dev/null
+++ b/extensions-core/stats/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
+  ~ or more contributor license agreements. See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership. Metamarkets licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied. See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>io.druid.extensions</groupId>
+    <artifactId>druid-stats</artifactId>
+    <name>druid-stats</name>
+    <description>druid-stats</description>
+
+    <parent>
+        <groupId>io.druid</groupId>
+        <artifactId>druid</artifactId>
+        <version>0.9.2-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Tests -->
+        <dependency>
+            <groupId>io.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/stats/DruidStatsModule.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/stats/DruidStatsModule.java
new file mode 100644
index 0000000..cc136f5
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/stats/DruidStatsModule.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.stats;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import io.druid.initialization.DruidModule;
+import io.druid.query.aggregation.variance.StandardDeviationPostAggregator;
+import io.druid.query.aggregation.variance.VarianceAggregatorFactory;
+import io.druid.query.aggregation.variance.VarianceFoldingAggregatorFactory;
+import io.druid.query.aggregation.variance.VarianceSerde;
+import io.druid.segment.serde.ComplexMetrics;
+
+import java.util.List;
+
+/**
+ */
+public class DruidStatsModule implements DruidModule
+{
+  @Override
+  public List<? extends Module> getJacksonModules()
+  {
+    return ImmutableList.of(
+        new SimpleModule().registerSubtypes(
+            VarianceAggregatorFactory.class,
+            VarianceFoldingAggregatorFactory.class,
+            StandardDeviationPostAggregator.class
+        )
+    );
+  }
+
+  @Override
+  public void configure(Binder binder)
+  {
+    if (ComplexMetrics.getSerdeForType("variance") == null) {
+      ComplexMetrics.registerSerde("variance", new VarianceSerde());
+    }
+  }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java
new file mode 100644
index 0000000..2bdcb0d
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.aggregation.post.ArithmeticPostAggregator;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ */
+@JsonTypeName("stddev")
+public class StandardDeviationPostAggregator implements PostAggregator
+{
+  protected final String name;
+  protected final String fieldName;
+  protected final String estimator;
+
+  protected final boolean isVariancePop;
+
+  @JsonCreator
+  public StandardDeviationPostAggregator(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") String fieldName,
+      @JsonProperty("estimator") String estimator
+  )
+  {
+    this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName is null");
+    this.name = Preconditions.checkNotNull(name, "name is null");
+    this.estimator = estimator;
+    this.isVariancePop = VarianceAggregatorCollector.isVariancePop(estimator);
+  }
+
+  @Override
+  public Set<String> getDependentFields()
+  {
+    return Sets.newHashSet(fieldName);
+  }
+
+  @Override
+  public Comparator<Double> getComparator()
+  {
+    return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
+  }
+
+  @Override
+  public Object compute(Map<String, Object> combinedAggregators)
+  {
+    return Math.sqrt(((VarianceAggregatorCollector) combinedAggregators.get(fieldName)).getVariance(isVariancePop));
+  }
+
+  @Override
+  @JsonProperty("name")
+  public String getName()
+  {
+    return name;
+  }
+
+  @JsonProperty("fieldName")
+  public String getFieldName()
+  {
+    return fieldName;
+  }
+
+  @JsonProperty("estimator")
+  public String getEstimator()
+  {
+    return estimator;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "StandardDeviationPostAggregator{" +
+           "name='" + name + '\'' +
+           ", fieldName='" + fieldName + '\'' +
+           ", isVariancePop='" + isVariancePop + '\'' +
+           '}';
+  }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregator.java
new file mode 100644
index 0000000..2553322
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import io.druid.query.aggregation.Aggregator;
+import io.druid.segment.FloatColumnSelector;
+import io.druid.segment.LongColumnSelector;
+import io.druid.segment.ObjectColumnSelector;
+
+/**
+ */
+public abstract class VarianceAggregator implements Aggregator
+{
+  protected final String name;
+
+  protected final VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
+
+  public VarianceAggregator(String name)
+  {
+    this.name = name;
+  }
+
+  @Override
+  public void reset()
+  {
+    holder.reset();
+  }
+
+  @Override
+  public Object get()
+  {
+    return holder;
+  }
+
+  @Override
+  public String getName()
+  {
+    return name;
+  }
+
+  @Override
+  public void close()
+  {
+  }
+
+  @Override
+  public float getFloat()
+  {
+    throw new UnsupportedOperationException("VarianceAggregator does not support getFloat()");
+  }
+
+  @Override
+  public long getLong()
+  {
+    throw new UnsupportedOperationException("VarianceAggregator does not support getLong()");
+  }
+
+  public static final class FloatVarianceAggregator extends VarianceAggregator
+  {
+    private final FloatColumnSelector selector;
+
+    public FloatVarianceAggregator(String name, FloatColumnSelector selector)
+    {
+      super(name);
+      this.selector = selector;
+    }
+
+    @Override
+    public void aggregate()
+    {
+      holder.add(selector.get());
+    }
+  }
+
+  public static final class LongVarianceAggregator extends VarianceAggregator
+  {
+    private final LongColumnSelector selector;
+
+    public LongVarianceAggregator(String name, LongColumnSelector selector)
+    {
+      super(name);
+      this.selector = selector;
+    }
+
+    @Override
+    public void aggregate()
+    {
+      holder.add(selector.get());
+    }
+  }
+
+  public static final class ObjectVarianceAggregator extends VarianceAggregator
+  {
+    private final ObjectColumnSelector selector;
+
+    public ObjectVarianceAggregator(String name, ObjectColumnSelector selector)
+    {
+      super(name);
+      this.selector = selector;
+    }
+
+    @Override
+    public void aggregate()
+    {
+      VarianceAggregatorCollector.combineValues(holder, selector.get());
+    }
+  }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java
new file mode 100644
index 0000000..4ab6bc2
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Longs;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ *
+ * Algorithm used here is copied from apache hive. This is description in GenericUDAFVariance
+ *
+ * Evaluate the variance using the algorithm described by Chan, Golub, and LeVeque in
+ * "Algorithms for computing the sample variance: analysis and recommendations"
+ * The American Statistician, 37 (1983) pp. 242--247.
+ *
+ * variance = variance1 + variance2 + n/(m*(m+n)) * pow(((m/n)*t1 - t2),2)
+ *
+ * where: - variance is sum[x-avg^2] (this is actually n times the variance)
+ * and is updated at every step. - n is the count of elements in chunk1 - m is
+ * the count of elements in chunk2 - t1 = sum of elements in chunk1, t2 =
+ * sum of elements in chunk2.
+ *
+ * This algorithm was proven to be numerically stable by J.L. Barlow in
+ * "Error analysis of a pairwise summation algorithm to compute sample variance"
+ * Numer. Math, 58 (1991) pp. 583--590
+ */
+public class VarianceAggregatorCollector
+{
+  public static boolean isVariancePop(String estimator) {
+    return estimator != null && estimator.equalsIgnoreCase("population");
+  }
+
+  public static VarianceAggregatorCollector from(ByteBuffer buffer)
+  {
+    return new VarianceAggregatorCollector(buffer.getLong(), buffer.getDouble(), buffer.getDouble());
+  }
+
+  public static final Comparator<VarianceAggregatorCollector> COMPARATOR = new Comparator<VarianceAggregatorCollector>()
+  {
+    @Override
+    public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o2)
+    {
+      int compare = Longs.compare(o1.count, o2.count);
+      if (compare == 0) {
+        compare = Doubles.compare(o1.sum, o2.sum);
+        if (compare == 0) {
+          compare = Doubles.compare(o1.nvariance, o2.nvariance);
+        }
+      }
+      return compare;
+    }
+  };
+
+  static Object combineValues(Object lhs, Object rhs)
+  {
+    final VarianceAggregatorCollector holder1 = (VarianceAggregatorCollector) lhs;
+    final VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) rhs;
+
+    if (holder2.count == 0) {
+      return holder1;
+    }
+    if (holder1.count == 0) {
+      holder1.nvariance = holder2.nvariance;
+      holder1.count = holder2.count;
+      holder1.sum = holder2.sum;
+      return holder1;
+    }
+
+    final double ratio = holder1.count / (double) holder2.count;
+    final double t = holder1.sum / ratio - holder2.sum;
+
+    holder1.nvariance += holder2.nvariance + (ratio / (holder1.count + holder2.count) * t * t);
+    holder1.count += holder2.count;
+    holder1.sum += holder2.sum;
+
+    return holder1;
+  }
+
+  static int getMaxIntermediateSize()
+  {
+    return Longs.BYTES + Doubles.BYTES + Doubles.BYTES;
+  }
+
+  long count; // number of elements
+  double sum; // sum of elements
+  double nvariance; // sum[x-avg^2] (this is actually n times of the variance)
+
+  public VarianceAggregatorCollector()
+  {
+    this(0, 0, 0);
+  }
+
+  public void reset()
+  {
+    count = 0;
+    sum = 0;
+    nvariance = 0;
+  }
+
+  public VarianceAggregatorCollector(long count, double sum, double nvariance)
+  {
+    this.count = count;
+    this.sum = sum;
+    this.nvariance = nvariance;
+  }
+
+  public VarianceAggregatorCollector add(float v)
+  {
+    count++;
+    sum += v;
+    if (count > 1) {
+      double t = count * v - sum;
+      nvariance += (t * t) / ((double) count * (count - 1));
+    }
+    return this;
+  }
+
+  public VarianceAggregatorCollector add(long v)
+  {
+    count++;
+    sum += v;
+    if (count > 1) {
+      double t = count * v - sum;
+      nvariance += (t * t) / ((double) count * (count - 1));
+    }
+    return this;
+  }
+
+  public double getVariance(boolean variancePop)
+  {
+    if (count == 0) {
+      // in SQL standard, we should return null for zero elements. But druid there should not be such a case
+      throw new IllegalStateException("should not be empty holder");
+    } else if (count == 1) {
+      return 0d;
+    } else {
+      return variancePop ? nvariance / count : nvariance / (count - 1);
+    }
+  }
+
+  @JsonValue
+  public byte[] toByteArray()
+  {
+    final ByteBuffer buffer = toByteBuffer();
+    buffer.flip();
+    byte[] theBytes = new byte[buffer.remaining()];
+    buffer.get(theBytes);
+
+    return theBytes;
+  }
+
+  public ByteBuffer toByteBuffer()
+  {
+    return ByteBuffer.allocate(Longs.BYTES + Doubles.BYTES + Doubles.BYTES)
+                     .putLong(count)
+                     .putDouble(sum)
+                     .putDouble(nvariance);
+  }
+
+  @VisibleForTesting
+  boolean equalsWithEpsilon(VarianceAggregatorCollector o, double epsilon)
+  {
+    if (this == o) {
+      return true;
+    }
+
+    if (count != o.count) {
+      return false;
+    }
+    if (Math.abs(sum - o.sum) > epsilon) {
+      return false;
+    }
+    if (Math.abs(nvariance - o.nvariance) > epsilon) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    VarianceAggregatorCollector that = (VarianceAggregatorCollector) o;
+
+    if (count != that.count) {
+      return false;
+    }
+    if (Double.compare(that.sum, sum) != 0) {
+      return false;
+    }
+    if (Double.compare(that.nvariance, nvariance) != 0) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result;
+    long temp;
+    result = (int) (count ^ (count >>> 32));
+    temp = Double.doubleToLongBits(sum);
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    temp = Double.doubleToLongBits(nvariance);
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    return result;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "VarianceHolder{" +
+           "count=" + count +
+           ", sum=" + sum +
+           ", nvariance=" + nvariance +
+           '}';
+  }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java
new file mode 100644
index 0000000..d5b8571
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.metamx.common.IAE;
+import com.metamx.common.StringUtils;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
+import io.druid.query.aggregation.Aggregators;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.ObjectColumnSelector;
+import org.apache.commons.codec.binary.Base64;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ */
+@JsonTypeName("variance")
+public class VarianceAggregatorFactory extends AggregatorFactory
+{
+  protected static final byte CACHE_TYPE_ID = 16;
+
+  protected final String fieldName;
+  protected final String name;
+  protected final String estimator;
+  private final String inputType;
+
+  protected final boolean isVariancePop;
+
+  @JsonCreator
+  public VarianceAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") String fieldName,
+      @JsonProperty("estimator") String estimator,
+      @JsonProperty("inputType") String inputType
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
+
+    this.name = name;
+    this.fieldName = fieldName;
+    this.estimator = estimator;
+    this.isVariancePop = VarianceAggregatorCollector.isVariancePop(estimator);
+    this.inputType = inputType == null ? "float" : inputType;
+  }
+
+  public VarianceAggregatorFactory(String name, String fieldName)
+  {
+    this(name, fieldName, null, null);
+  }
+
+  @Override
+  public String getTypeName()
+  {
+    return "variance";
+  }
+
+  @Override
+  public int getMaxIntermediateSize()
+  {
+    return VarianceAggregatorCollector.getMaxIntermediateSize();
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
+    if (selector == null) {
+      return Aggregators.noopAggregator();
+    }
+
+    if ("float".equalsIgnoreCase(inputType)) {
+      return new VarianceAggregator.FloatVarianceAggregator(
+          name,
+          metricFactory.makeFloatColumnSelector(fieldName)
+      );
+    } else if ("long".equalsIgnoreCase(inputType)) {
+      return new VarianceAggregator.LongVarianceAggregator(
+          name,
+          metricFactory.makeLongColumnSelector(fieldName)
+      );
+    } else if ("variance".equalsIgnoreCase(inputType)) {
+      return new VarianceAggregator.ObjectVarianceAggregator(name, selector);
+    }
+    throw new IAE(
+        "Incompatible type for metric[%s], expected a float, long or variance, got a %s", fieldName, inputType
+    );
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+  {
+    ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
+    if (selector == null) {
+      return Aggregators.noopBufferAggregator();
+    }
+    if ("float".equalsIgnoreCase(inputType)) {
+      return new VarianceBufferAggregator.FloatVarianceAggregator(
+          name,
+          metricFactory.makeFloatColumnSelector(fieldName)
+      );
+    } else if ("long".equalsIgnoreCase(inputType)) {
+      return new VarianceBufferAggregator.LongVarianceAggregator(
+          name,
+          metricFactory.makeLongColumnSelector(fieldName)
+      );
+    } else if ("variance".equalsIgnoreCase(inputType)) {
+      return new VarianceBufferAggregator.ObjectVarianceAggregator(name, selector);
+    }
+    throw new IAE(
+        "Incompatible type for metric[%s], expected a float, long or variance, got a %s", fieldName, inputType
+    );
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new VarianceFoldingAggregatorFactory(name, name, estimator);
+  }
+
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return Arrays.<AggregatorFactory>asList(new VarianceAggregatorFactory(fieldName, fieldName, estimator, inputType));
+  }
+
+  @Override
+  public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
+  {
+    if (Objects.equals(getName(), other.getName()) && this.getClass() == other.getClass()) {
+      return getCombiningFactory();
+    } else {
+      throw new AggregatorFactoryNotMergeableException(this, other);
+    }
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    return VarianceAggregatorCollector.COMPARATOR;
+  }
+
+  @Override
+  public Object getAggregatorStartValue()
+  {
+    return new VarianceAggregatorCollector();
+  }
+
+  @Override
+  public Object combine(Object lhs, Object rhs)
+  {
+    return VarianceAggregatorCollector.combineValues(lhs, rhs);
+  }
+
+  @Override
+  public Object finalizeComputation(Object object)
+  {
+    return ((VarianceAggregatorCollector) object).getVariance(isVariancePop);
+  }
+
+  @Override
+  public Object deserialize(Object object)
+  {
+    if (object instanceof byte[]) {
+      return VarianceAggregatorCollector.from(ByteBuffer.wrap((byte[]) object));
+    } else if (object instanceof ByteBuffer) {
+      return VarianceAggregatorCollector.from((ByteBuffer) object);
+    } else if (object instanceof String) {
+      return VarianceAggregatorCollector.from(
+          ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)))
+      );
+    }
+    return object;
+  }
+
+  @JsonProperty
+  public String getFieldName()
+  {
+    return fieldName;
+  }
+
+  @Override
+  @JsonProperty
+  public String getName()
+  {
+    return name;
+  }
+
+  @JsonProperty
+  public String getEstimator()
+  {
+    return estimator;
+  }
+
+  @JsonProperty
+  public String getInputType()
+  {
+    return inputType;
+  }
+
+  @Override
+  public List<String> requiredFields()
+  {
+    return Arrays.asList(fieldName);
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
+    byte[] inputTypeBytes = StringUtils.toUtf8(inputType);
+    return ByteBuffer.allocate(2 + fieldNameBytes.length + 1 + inputTypeBytes.length)
+                     .put(CACHE_TYPE_ID)
+                     .put(isVariancePop ? (byte) 1 : 0)
+                     .put(fieldNameBytes)
+                     .put((byte) 0xFF)
+                     .put(inputTypeBytes)
+                     .array();
+  }
+
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + "{" +
+           "fieldName='" + fieldName + '\'' +
+           ", name='" + name + '\'' +
+           ", isVariancePop='" + isVariancePop + '\'' +
+           ", inputType='" + inputType + '\'' +
+           '}';
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    VarianceAggregatorFactory that = (VarianceAggregatorFactory) o;
+
+    if (!Objects.equals(name, that.name)) {
+      return false;
+    }
+    if (!Objects.equals(isVariancePop, that.isVariancePop)) {
+      return false;
+    }
+    if (!Objects.equals(inputType, that.inputType)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result = fieldName.hashCode();
+    result = 31 * result + Objects.hashCode(name);
+    result = 31 * result + Objects.hashCode(isVariancePop);
+    result = 31 * result + Objects.hashCode(inputType);
+    return result;
+  }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceBufferAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceBufferAggregator.java
new file mode 100644
index 0000000..77017de
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceBufferAggregator.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Longs;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.segment.FloatColumnSelector;
+import io.druid.segment.LongColumnSelector;
+import io.druid.segment.ObjectColumnSelector;
+
+import java.nio.ByteBuffer;
+
+/**
+ */
+public abstract class VarianceBufferAggregator implements BufferAggregator
+{
+  private static final int COUNT_OFFSET = 0;
+  private static final int SUM_OFFSET = Longs.BYTES;
+  private static final int NVARIANCE_OFFSET = SUM_OFFSET + Doubles.BYTES;
+
+  protected final String name;
+
+  public VarianceBufferAggregator(String name)
+  {
+    this.name = name;
+  }
+
+  @Override
+  public void init(final ByteBuffer buf, final int position)
+  {
+    buf.putLong(position + COUNT_OFFSET, 0)
+       .putDouble(position + SUM_OFFSET, 0)
+       .putDouble(position + NVARIANCE_OFFSET, 0);
+  }
+
+  @Override
+  public Object get(final ByteBuffer buf, final int position)
+  {
+    VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
+    holder.count = buf.getLong(position);
+    holder.sum = buf.getDouble(position + SUM_OFFSET);
+    holder.nvariance = buf.getDouble(position + NVARIANCE_OFFSET);
+    return holder;
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("VarianceBufferAggregator does not support getFloat()");
+  }
+
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("VarianceBufferAggregator does not support getFloat()");
+  }
+
+  @Override
+  public void close()
+  {
+  }
+
+  public static final class FloatVarianceAggregator extends VarianceBufferAggregator
+  {
+    private final FloatColumnSelector selector;
+
+    public FloatVarianceAggregator(String name, FloatColumnSelector selector)
+    {
+      super(name);
+      this.selector = selector;
+    }
+
+    @Override
+    public void aggregate(ByteBuffer buf, int position)
+    {
+      float v = selector.get();
+      long count = buf.getLong(position + COUNT_OFFSET) + 1;
+      double sum = buf.getDouble(position + SUM_OFFSET) + v;
+      buf.putLong(position, count);
+      buf.putDouble(position + SUM_OFFSET, sum);
+      if (count > 1) {
+        double t = count * v - sum;
+        double variance = buf.getDouble(position + NVARIANCE_OFFSET) + (t * t) / ((double) count * (count - 1));
+        buf.putDouble(position + NVARIANCE_OFFSET, variance);
+      }
+    }
+  }
+
+  public static final class LongVarianceAggregator extends VarianceBufferAggregator
+  {
+    private final LongColumnSelector selector;
+
+    public LongVarianceAggregator(String name, LongColumnSelector selector)
+    {
+      super(name);
+      this.selector = selector;
+    }
+
+    @Override
+    public void aggregate(ByteBuffer buf, int position)
+    {
+      long v = selector.get();
+      long count = buf.getLong(position + COUNT_OFFSET) + 1;
+      double sum = buf.getDouble(position + SUM_OFFSET) + v;
+      buf.putLong(position, count);
+      buf.putDouble(position + SUM_OFFSET, sum);
+      if (count > 1) {
+        double t = count * v - sum;
+        double variance = buf.getDouble(position + NVARIANCE_OFFSET) + (t * t) / ((double) count * (count - 1));
+        buf.putDouble(position + NVARIANCE_OFFSET, variance);
+      }
+    }
+  }
+
+  public static final class ObjectVarianceAggregator extends VarianceBufferAggregator
+  {
+    private final ObjectColumnSelector selector;
+
+    public ObjectVarianceAggregator(String name, ObjectColumnSelector selector)
+    {
+      super(name);
+      this.selector = selector;
+    }
+
+    @Override
+    public void aggregate(ByteBuffer buf, int position)
+    {
+      VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) selector.get();
+
+      long count = buf.getLong(position + COUNT_OFFSET);
+      if (count == 0) {
+        buf.putLong(position, holder2.count);
+        buf.putDouble(position + SUM_OFFSET, holder2.sum);
+        buf.putDouble(position + NVARIANCE_OFFSET, holder2.nvariance);
+        return;
+      }
+
+      double sum = buf.getDouble(position + SUM_OFFSET);
+      double nvariance = buf.getDouble(position + NVARIANCE_OFFSET);
+
+      final double ratio = count / (double) holder2.count;
+      final double t = sum / ratio - holder2.sum;
+
+      nvariance += holder2.nvariance + (ratio / (count + holder2.count) * t * t);
+      count += holder2.count;
+      sum += holder2.sum;
+
+      buf.putLong(position, count);
+      buf.putDouble(position + SUM_OFFSET, sum);
+      buf.putDouble(position + NVARIANCE_OFFSET, nvariance);
+    }
+  }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceFoldingAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceFoldingAggregatorFactory.java
new file mode 100644
index 0000000..a113418
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceFoldingAggregatorFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ */
+@JsonTypeName("varianceFold")
+public class VarianceFoldingAggregatorFactory extends VarianceAggregatorFactory
+{
+  public VarianceFoldingAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") String fieldName,
+      @JsonProperty("estimator") String estimator
+  )
+  {
+    super(name, fieldName, estimator, "variance");
+  }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java
new file mode 100644
index 0000000..4bb83ad
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.Ordering;
+import io.druid.data.input.InputRow;
+import io.druid.segment.column.ColumnBuilder;
+import io.druid.segment.data.GenericIndexed;
+import io.druid.segment.data.ObjectStrategy;
+import io.druid.segment.serde.ComplexColumnPartSupplier;
+import io.druid.segment.serde.ComplexMetricExtractor;
+import io.druid.segment.serde.ComplexMetricSerde;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ */
+public class VarianceSerde extends ComplexMetricSerde
+{
+  private static final Ordering<VarianceAggregatorCollector> comparator =
+      Ordering.from(VarianceAggregatorCollector.COMPARATOR).nullsFirst();
+
+  @Override
+  public String getTypeName()
+  {
+    return "variance";
+  }
+
+  @Override
+  public ComplexMetricExtractor getExtractor()
+  {
+    return new ComplexMetricExtractor()
+    {
+      @Override
+      public Class<VarianceAggregatorCollector> extractedClass()
+      {
+        return VarianceAggregatorCollector.class;
+      }
+
+      @Override
+      public VarianceAggregatorCollector extractValue(InputRow inputRow, String metricName)
+      {
+        Object rawValue = inputRow.getRaw(metricName);
+
+        if (rawValue instanceof VarianceAggregatorCollector) {
+          return (VarianceAggregatorCollector) rawValue;
+        }
+        VarianceAggregatorCollector collector = new VarianceAggregatorCollector();
+
+        List<String> dimValues = inputRow.getDimension(metricName);
+        if (dimValues != null && dimValues.size() > 0) {
+          for (String dimValue : dimValues) {
+            float value = Float.parseFloat(dimValue);
+            collector.add(value);
+          }
+        }
+        return collector;
+      }
+    };
+  }
+
+  @Override
+  public void deserializeColumn(
+      ByteBuffer byteBuffer, ColumnBuilder columnBuilder
+  )
+  {
+    final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy());
+    columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));
+  }
+
+  @Override
+  public ObjectStrategy getObjectStrategy()
+  {
+    return new ObjectStrategy<VarianceAggregatorCollector>()
+    {
+      @Override
+      public Class<? extends VarianceAggregatorCollector> getClazz()
+      {
+        return VarianceAggregatorCollector.class;
+      }
+
+      @Override
+      public VarianceAggregatorCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
+      {
+        final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
+        readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
+        return VarianceAggregatorCollector.from(readOnlyBuffer);
+      }
+
+      @Override
+      public byte[] toBytes(VarianceAggregatorCollector collector)
+      {
+        return collector == null ? new byte[]{} : collector.toByteArray();
+      }
+
+      @Override
+      public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o2)
+      {
+        return comparator.compare(o1, o2);
+      }
+    };
+  }
+}
diff --git a/extensions-core/stats/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/stats/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 0000000..823ccff
--- /dev/null
+++ b/extensions-core/stats/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1 @@
+io.druid.query.aggregation.stats.DruidStatsModule
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorCollectorTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorCollectorTest.java
new file mode 100644
index 0000000..89d9217
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorCollectorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.Lists;
+import com.metamx.common.Pair;
+import io.druid.segment.FloatColumnSelector;
+import io.druid.segment.ObjectColumnSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+public class VarianceAggregatorCollectorTest
+{
+  private static final float[] market_upfront = new float[]{
+      800.0f, 800.0f, 826.0602f, 1564.6177f, 1006.4021f, 869.64374f, 809.04175f, 1458.4027f, 852.4375f, 879.9881f,
+      950.1468f, 712.7746f, 846.2675f, 682.8855f, 1109.875f, 594.3817f, 870.1159f, 677.511f, 1410.2781f, 1219.4321f,
+      979.306f, 1224.5016f, 1215.5898f, 716.6092f, 1301.0233f, 786.3633f, 989.9315f, 1609.0967f, 1023.2952f, 1367.6381f,
+      1627.598f, 810.8894f, 1685.5001f, 545.9906f, 1870.061f, 555.476f, 1643.3408f, 943.4972f, 1667.4978f, 913.5611f,
+      1218.5619f, 1273.7074f, 888.70526f, 1113.1141f, 864.5689f, 1308.582f, 785.07886f, 1363.6149f, 787.1253f,
+      826.0392f, 1107.2438f, 872.6257f, 1188.3693f, 911.9568f, 794.0988f, 1299.0933f, 1212.9283f, 901.3273f, 723.5143f,
+      1061.9734f, 602.97955f, 879.4061f, 724.2625f, 862.93134f, 1133.1351f, 948.65796f, 807.6017f, 914.525f, 1553.3485f,
+      1208.4567f, 679.6193f, 645.1777f, 1120.0887f, 1649.5333f, 1433.3988f, 1598.1793f, 1192.5631f, 1022.85455f,
+      1228.5024f, 1298.4158f, 1345.9644f, 1291.898f, 1306.4957f, 1287.7667f, 1631.5844f, 578.79596f, 1017.5732f,
+      782.0135f, 829.91626f, 1862.7379f, 873.3065f, 1427.0167f, 1430.2573f, 1101.9182f, 1166.1411f, 1004.94086f,
+      740.1837f, 865.7779f, 901.30756f, 691.9589f, 1674.3317f, 975.57794f, 1360.6948f, 755.89935f, 771.34845f,
+      869.30835f, 1095.6376f, 906.3738f, 988.8938f, 835.76263f, 776.70294f, 875.6834f, 1070.8363f, 835.46124f,
+      715.5161f, 755.64655f, 771.1005f, 764.50806f, 736.40924f, 884.8373f, 918.72284f, 893.98505f, 832.8749f,
+      850.995f, 767.9733f, 848.3399f, 878.6838f, 906.1019f, 1403.8302f, 936.4296f, 846.2884f, 856.4901f, 1032.2576f,
+      954.7542f, 1031.99f, 907.02155f, 1110.789f, 843.95215f, 1362.6506f, 884.8015f, 1684.2688f, 873.65204f, 855.7177f,
+      996.56415f, 1061.6786f, 962.2358f, 1019.8985f, 1056.4193f, 1198.7231f, 1108.1361f, 1289.0095f,
+      1069.4318f, 1001.13403f, 1030.4995f, 1734.2749f, 1063.2012f, 1447.3412f, 1234.2476f, 1144.3424f, 1049.7385f,
+      811.9913f, 768.4231f, 1151.0692f, 877.0794f, 1146.4231f, 902.6157f, 1355.8434f, 897.39343f, 1260.1431f, 762.8625f,
+      935.168f, 782.10785f, 996.2054f, 767.69214f, 1031.7415f, 775.9656f, 1374.9684f, 853.163f, 1456.6118f, 811.92523f,
+      989.0328f, 744.7446f, 1166.4012f, 753.105f, 962.7312f, 780.272f
+  };
+
+  private static final float[] market_total_market = new float[]{
+      1000.0f, 1000.0f, 1040.9456f, 1689.0128f, 1049.142f, 1073.4766f, 1007.36554f, 1545.7089f, 1016.9652f, 1077.6127f,
+      1075.0896f, 953.9954f, 1022.7833f, 937.06195f, 1156.7448f, 849.8775f, 1066.208f, 904.34064f, 1240.5255f,
+      1343.2325f, 1088.9431f, 1349.2544f, 1102.8667f, 939.2441f, 1109.8754f, 997.99457f, 1037.4495f, 1686.4197f,
+      1074.007f, 1486.2013f, 1300.3022f, 1021.3345f, 1314.6195f, 792.32605f, 1233.4489f, 805.9301f, 1184.9207f,
+      1127.231f, 1203.4656f, 1100.9048f, 1097.2112f, 1410.793f, 1033.4012f, 1283.166f, 1025.6333f, 1331.861f,
+      1039.5005f, 1332.4684f, 1011.20544f, 1029.9952f, 1047.2129f, 1057.08f, 1064.9727f, 1082.7277f, 971.0508f,
+      1320.6383f, 1070.1655f, 1089.6478f, 980.3866f, 1179.6959f, 959.2362f, 1092.417f, 987.0674f, 1103.4583f,
+      1091.2231f, 1199.6074f, 1044.3843f, 1183.2408f, 1289.0973f, 1360.0325f, 993.59125f, 1021.07117f, 1105.3834f,
+      1601.8295f, 1200.5272f, 1600.7233f, 1317.4584f, 1304.3262f, 1544.1082f, 1488.7378f, 1224.8271f, 1421.6487f,
+      1251.9062f, 1414.619f, 1350.1754f, 970.7283f, 1057.4272f, 1073.9673f, 996.4337f, 1743.9218f, 1044.5629f,
+      1474.5911f, 1159.2788f, 1292.5428f, 1124.2014f, 1243.354f, 1051.809f, 1143.0784f, 1097.4907f, 1010.3703f,
+      1326.8291f, 1179.8038f, 1281.6012f, 994.73126f, 1081.6504f, 1103.2397f, 1177.8584f, 1152.5477f, 1117.954f,
+      1084.3325f, 1029.8025f, 1121.3854f, 1244.85f, 1077.2794f, 1098.5432f, 998.65076f, 1088.8076f, 1008.74554f,
+      998.75397f, 1129.7233f, 1075.243f, 1141.5884f, 1037.3811f, 1099.1973f, 981.5773f, 1092.942f, 1072.2394f,
+      1154.4156f, 1311.1786f, 1176.6052f, 1107.2202f, 1102.699f, 1285.0901f, 1217.5475f, 1283.957f, 1178.8302f,
+      1301.7781f, 1119.2472f, 1403.3389f, 1156.6019f, 1429.5802f, 1137.8423f, 1124.9352f, 1256.4998f, 1217.8774f,
+      1247.8909f, 1185.71f, 1345.7817f, 1250.1667f, 1390.754f, 1224.1162f, 1361.0802f, 1190.9337f, 1310.7971f,
+      1466.2094f, 1366.4476f, 1314.8397f, 1522.0437f, 1193.5563f, 1321.375f, 1055.7837f, 1021.6387f, 1197.0084f,
+      1131.532f, 1192.1443f, 1154.2896f, 1272.6771f, 1141.5146f, 1190.8961f, 1009.36316f, 1006.9138f, 1032.5999f,
+      1137.3857f, 1030.0756f, 1005.25305f, 1030.0947f, 1112.7948f, 1113.3575f, 1153.9747f, 1069.6409f, 1016.13745f,
+      994.9023f, 1032.1543f, 999.5864f, 994.75275f, 1029.057f
+  };
+
+  @Test
+  public void testVariance()
+  {
+    Random random = new Random();
+    for (float[] values : Arrays.asList(market_upfront, market_total_market)) {
+      double sum = 0;
+      for (float f : values) {
+        sum += f;
+      }
+      final double mean = sum / values.length;
+      double temp = 0;
+      for (float f : values) {
+        temp += Math.pow(f - mean, 2);
+      }
+
+      final double variance_pop = temp / values.length;
+      final double variance_sample = temp / (values.length - 1);
+
+      VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
+      for (float f : values) {
+        holder.add(f);
+      }
+      Assert.assertEquals(holder.getVariance(true), variance_pop, 0.001);
+      Assert.assertEquals(holder.getVariance(false), variance_sample, 0.001);
+
+      for (int mergeOn : new int[] {2, 3, 5, 9}) {
+        List<VarianceAggregatorCollector> holders1 = Lists.newArrayListWithCapacity(mergeOn);
+        List<Pair<VarianceBufferAggregator, ByteBuffer>> holders2 = Lists.newArrayListWithCapacity(mergeOn);
+
+        FloatHandOver valueHandOver = new FloatHandOver();
+        for (int i = 0; i < mergeOn; i++) {
+          holders1.add(new VarianceAggregatorCollector());
+          holders2.add(Pair.<VarianceBufferAggregator, ByteBuffer>of(
+                           new VarianceBufferAggregator.FloatVarianceAggregator("XX", valueHandOver),
+                           ByteBuffer.allocate(VarianceAggregatorCollector.getMaxIntermediateSize())
+                       ));
+        }
+        for (float f : values) {
+          valueHandOver.v = f;
+          int index = random.nextInt(mergeOn);
+          holders1.get(index).add(f);
+          holders2.get(index).lhs.aggregate(holders2.get(index).rhs, 0);
+        }
+        VarianceAggregatorCollector holder1 = holders1.get(0);
+        for (int i = 1; i < mergeOn; i++) {
+          holder1 = (VarianceAggregatorCollector) VarianceAggregatorCollector.combineValues(holder1, holders1.get(i));
+        }
+        ObjectHandOver collectHandOver = new ObjectHandOver();
+        ByteBuffer buffer = ByteBuffer.allocate(VarianceAggregatorCollector.getMaxIntermediateSize());
+        VarianceBufferAggregator.ObjectVarianceAggregator merger = new VarianceBufferAggregator.ObjectVarianceAggregator("xxx", collectHandOver);
+        for (int i = 0; i < mergeOn; i++) {
+          collectHandOver.v = holders2.get(i).lhs.get(holders2.get(i).rhs, 0);
+          merger.aggregate(buffer, 0);
+        }
+        VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) merger.get(buffer, 0);
+        Assert.assertEquals(holder2.getVariance(true), variance_pop, 0.01);
+        Assert.assertEquals(holder2.getVariance(false), variance_sample, 0.01);
+      }
+    }
+  }
+
+  private static class FloatHandOver implements FloatColumnSelector
+  {
+    float v;
+
+    @Override
+    public float get()
+    {
+      return v;
+    }
+  }
+
+  private static class ObjectHandOver implements ObjectColumnSelector
+  {
+    Object v;
+
+    @Override
+    public Class classOfObject()
+    {
+      return v == null ? Object.class : v.getClass();
+    }
+
+    @Override
+    public Object get()
+    {
+      return v;
+    }
+  }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorTest.java
new file mode 100644
index 0000000..9beb980
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.aggregation.TestFloatColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import io.druid.segment.ColumnSelectorFactory;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+ */
+public class VarianceAggregatorTest
+{
+  private VarianceAggregatorFactory aggFactory;
+  private ColumnSelectorFactory colSelectorFactory;
+  private TestFloatColumnSelector selector;
+
+  private final float[] values = {1.1f, 2.7f, 3.5f, 1.3f};
+  private final double[] variances_pop = new double[values.length]; // calculated
+  private final double[] variances_samp = new double[values.length]; // calculated
+
+  public VarianceAggregatorTest() throws Exception
+  {
+    String aggSpecJson = "{\"type\": \"variance\", \"name\": \"billy\", \"fieldName\": \"nilly\"}";
+    aggFactory = new DefaultObjectMapper().readValue(aggSpecJson, VarianceAggregatorFactory.class);
+    double sum = 0;
+    for (int i = 0; i < values.length; i++) {
+      sum += values[i];
+      if (i > 0) {
+        double mean = sum / (i + 1);
+        double temp = 0;
+        for (int j = 0; j <= i; j++) {
+          temp += Math.pow(values[j] - mean, 2);
+        }
+        variances_pop[i] = temp / (i + 1);
+        variances_samp[i] = temp / i;
+      }
+    }
+  }
+
+  @Before
+  public void setup()
+  {
+    selector = new TestFloatColumnSelector(values);
+    colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
+    EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("nilly")).andReturn(new TestObjectColumnSelector(0.0f));
+    EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(selector);
+    EasyMock.replay(colSelectorFactory);
+  }
+
+  @Test
+  public void testDoubleVarianceAggregator()
+  {
+    VarianceAggregator agg = (VarianceAggregator) aggFactory.factorize(colSelectorFactory);
+
+    Assert.assertEquals("billy", agg.getName());
+
+    assertValues((VarianceAggregatorCollector) agg.get(), 0, 0d, 0d);
+    aggregate(selector, agg);
+    assertValues((VarianceAggregatorCollector) agg.get(), 1, 1.1d, 0d);
+    aggregate(selector, agg);
+    assertValues((VarianceAggregatorCollector) agg.get(), 2, 3.8d, 1.28d);
+    aggregate(selector, agg);
+    assertValues((VarianceAggregatorCollector) agg.get(), 3, 7.3d, 2.9866d);
+    aggregate(selector, agg);
+    assertValues((VarianceAggregatorCollector) agg.get(), 4, 8.6d, 3.95d);
+
+    agg.reset();
+    assertValues((VarianceAggregatorCollector) agg.get(), 0, 0d, 0d);
+  }
+
+  private void assertValues(VarianceAggregatorCollector holder, long count, double sum, double nvariance)
+  {
+    Assert.assertEquals(count, holder.count);
+    Assert.assertEquals(sum, holder.sum, 0.0001);
+    Assert.assertEquals(nvariance, holder.nvariance, 0.0001);
+    if (count == 0) {
+      try {
+        holder.getVariance(false);
+        Assert.fail("Should throw ISE");
+      }
+      catch (IllegalStateException e) {
+        Assert.assertTrue(e.getMessage().contains("should not be empty holder"));
+      }
+    } else {
+      Assert.assertEquals(holder.getVariance(true), variances_pop[(int) count - 1], 0.0001);
+      Assert.assertEquals(holder.getVariance(false), variances_samp[(int) count - 1], 0.0001);
+    }
+  }
+
+  @Test
+  public void testDoubleVarianceBufferAggregator()
+  {
+    VarianceBufferAggregator agg = (VarianceBufferAggregator) aggFactory.factorizeBuffered(
+        colSelectorFactory
+    );
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[aggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 0, 0d, 0d);
+    aggregate(selector, agg, buffer, 0);
+    assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 1, 1.1d, 0d);
+    aggregate(selector, agg, buffer, 0);
+    assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 2, 3.8d, 1.28d);
+    aggregate(selector, agg, buffer, 0);
+    assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 3, 7.3d, 2.9866d);
+    aggregate(selector, agg, buffer, 0);
+    assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 4, 8.6d, 3.95d);
+  }
+
+  @Test
+  public void testCombine()
+  {
+    VarianceAggregatorCollector holder1 = new VarianceAggregatorCollector().add(1.1f).add(2.7f);
+    VarianceAggregatorCollector holder2 = new VarianceAggregatorCollector().add(3.5f).add(1.3f);
+    VarianceAggregatorCollector expected = new VarianceAggregatorCollector(4, 8.6d, 3.95d);
+    Assert.assertTrue(expected.equalsWithEpsilon((VarianceAggregatorCollector) aggFactory.combine(holder1, holder2), 0.00001));
+  }
+
+  @Test
+  public void testEqualsAndHashCode() throws Exception
+  {
+    VarianceAggregatorFactory one = new VarianceAggregatorFactory("name1", "fieldName1");
+    VarianceAggregatorFactory oneMore = new VarianceAggregatorFactory("name1", "fieldName1");
+    VarianceAggregatorFactory two = new VarianceAggregatorFactory("name2", "fieldName2");
+
+    Assert.assertEquals(one.hashCode(), oneMore.hashCode());
+
+    Assert.assertTrue(one.equals(oneMore));
+    Assert.assertFalse(one.equals(two));
+  }
+
+  private void aggregate(TestFloatColumnSelector selector, VarianceAggregator agg)
+  {
+    agg.aggregate();
+    selector.increment();
+  }
+
+  private void aggregate(
+      TestFloatColumnSelector selector,
+      VarianceBufferAggregator agg,
+      ByteBuffer buff,
+      int position
+  )
+  {
+    agg.aggregate(buff, position);
+    selector.increment();
+  }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java
new file mode 100644
index 0000000..a45901b
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.data.input.Row;
+import io.druid.granularity.PeriodGranularity;
+import io.druid.query.QueryRunner;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DefaultDimensionSpec;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.groupby.GroupByQueryConfig;
+import io.druid.query.groupby.GroupByQueryRunnerFactory;
+import io.druid.query.groupby.GroupByQueryRunnerTest;
+import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
+import io.druid.query.groupby.having.GreaterThanHavingSpec;
+import io.druid.query.groupby.having.HavingSpec;
+import io.druid.query.groupby.having.OrHavingSpec;
+import io.druid.query.groupby.orderby.DefaultLimitSpec;
+import io.druid.query.groupby.orderby.OrderByColumnSpec;
+import io.druid.segment.TestHelper;
+import org.joda.time.Period;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ */
+@RunWith(Parameterized.class)
+public class VarianceGroupByQueryTest
+{
+  private final GroupByQueryConfig config;
+  private final QueryRunner<Row> runner;
+  private final GroupByQueryRunnerFactory factory;
+
+  @Parameterized.Parameters
+  public static Collection<?> constructorFeeder() throws IOException
+  {
+    return GroupByQueryRunnerTest.constructorFeeder();
+  }
+
+  public VarianceGroupByQueryTest(GroupByQueryConfig config, GroupByQueryRunnerFactory factory, QueryRunner runner)
+  {
+    this.config = config;
+    this.factory = factory;
+    this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
+  }
+
+  @Test
+  public void testGroupByVarianceOnly()
+  {
+    GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.dataSource)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+        .setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
+        .setAggregatorSpecs(Arrays.<AggregatorFactory>asList(VarianceTestHelper.indexVarianceAggr))
+        .setPostAggregatorSpecs(Arrays.<PostAggregator>asList(VarianceTestHelper.stddevOfIndexPostAggr))
+        .setGranularity(QueryRunnerTestHelper.dayGran)
+        .build();
+
+    VarianceTestHelper.RowBuilder builder =
+        new VarianceTestHelper.RowBuilder(new String[]{"alias", "index_stddev", "index_var"});
+
+    List<Row> expectedResults = builder
+        .add("2011-04-01", "automotive", 0d, 0d)
+        .add("2011-04-01", "business", 0d, 0d)
+        .add("2011-04-01", "entertainment", 0d, 0d)
+        .add("2011-04-01", "health", 0d, 0d)
+        .add("2011-04-01", "mezzanine", 737.0179286322613d, 543195.4271253889d)
+        .add("2011-04-01", "news", 0d, 0d)
+        .add("2011-04-01", "premium", 726.6322593583996d, 527994.4403402924d)
+        .add("2011-04-01", "technology", 0d, 0d)
+        .add("2011-04-01", "travel", 0d, 0d)
+
+        .add("2011-04-02", "automotive", 0d, 0d)
+        .add("2011-04-02", "business", 0d, 0d)
+        .add("2011-04-02", "entertainment", 0d, 0d)
+        .add("2011-04-02", "health", 0d, 0d)
+        .add("2011-04-02", "mezzanine", 611.3420766546617d, 373739.13468843425d)
+        .add("2011-04-02", "news", 0d, 0d)
+        .add("2011-04-02", "premium", 621.3898134843073d, 386125.30030206224d)
+        .add("2011-04-02", "technology", 0d, 0d)
+        .add("2011-04-02", "travel", 0d, 0d)
+        .build();
+
+    Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+    TestHelper.assertExpectedObjects(expectedResults, results, "");
+  }
+
+  @Test
+  public void testGroupBy()
+  {
+    GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.dataSource)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+        .setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
+        .setAggregatorSpecs(
+            Arrays.asList(
+                VarianceTestHelper.rowsCount,
+                VarianceTestHelper.indexVarianceAggr,
+                new LongSumAggregatorFactory("idx", "index")
+            )
+        )
+        .setPostAggregatorSpecs(
+            Arrays.<PostAggregator>asList(VarianceTestHelper.stddevOfIndexPostAggr)
+        )
+        .setGranularity(QueryRunnerTestHelper.dayGran)
+        .build();
+
+    VarianceTestHelper.RowBuilder builder =
+        new VarianceTestHelper.RowBuilder(new String[]{"alias", "rows", "idx", "index_stddev", "index_var"});
+
+    List<Row> expectedResults = builder
+        .add("2011-04-01", "automotive", 1L, 135L, 0d, 0d)
+        .add("2011-04-01", "business", 1L, 118L, 0d, 0d)
+        .add("2011-04-01", "entertainment", 1L, 158L, 0d, 0d)
+        .add("2011-04-01", "health", 1L, 120L, 0d, 0d)
+        .add("2011-04-01", "mezzanine", 3L, 2870L, 737.0179286322613d, 543195.4271253889d)
+        .add("2011-04-01", "news", 1L, 121L, 0d, 0d)
+        .add("2011-04-01", "premium", 3L, 2900L, 726.6322593583996d, 527994.4403402924d)
+        .add("2011-04-01", "technology", 1L, 78L, 0d, 0d)
+        .add("2011-04-01", "travel", 1L, 119L, 0d, 0d)
+
+        .add("2011-04-02", "automotive", 1L, 147L, 0d, 0d)
+        .add("2011-04-02", "business", 1L, 112L, 0d, 0d)
+        .add("2011-04-02", "entertainment", 1L, 166L, 0d, 0d)
+        .add("2011-04-02", "health", 1L, 113L, 0d, 0d)
+        .add("2011-04-02", "mezzanine", 3L, 2447L, 611.3420766546617d, 373739.13468843425d)
+        .add("2011-04-02", "news", 1L, 114L, 0d, 0d)
+        .add("2011-04-02", "premium", 3L, 2505L, 621.3898134843073d, 386125.30030206224d)
+        .add("2011-04-02", "technology", 1L, 97L, 0d, 0d)
+        .add("2011-04-02", "travel", 1L, 126L, 0d, 0d)
+        .build();
+
+    Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+    TestHelper.assertExpectedObjects(expectedResults, results, "");
+  }
+
+  @Test
+  public void testPostAggHavingSpec()
+  {
+    VarianceTestHelper.RowBuilder expect = new VarianceTestHelper.RowBuilder(
+        new String[]{"alias", "rows", "index", "index_var", "index_stddev"}
+    );
+
+    List<Row> expectedResults = expect
+        .add("2011-04-01", "automotive", 2L, 269L, 299.0009819048282, 17.29164485827847)
+        .add("2011-04-01", "mezzanine", 6L, 4420L, 254083.76447001836, 504.06722217380724)
+        .add("2011-04-01", "premium", 6L, 4416L, 252279.2020389339, 502.27403082275106)
+        .build();
+
+    GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(VarianceTestHelper.dataSource)
+        .setInterval("2011-04-02/2011-04-04")
+        .setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
+        .setAggregatorSpecs(
+            Arrays.asList(
+                VarianceTestHelper.rowsCount,
+                VarianceTestHelper.indexLongSum,
+                VarianceTestHelper.indexVarianceAggr
+            )
+        )
+        .setPostAggregatorSpecs(ImmutableList.<PostAggregator>of(VarianceTestHelper.stddevOfIndexPostAggr))
+        .setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
+        .setHavingSpec(
+            new OrHavingSpec(
+                ImmutableList.<HavingSpec>of(
+                    new GreaterThanHavingSpec(VarianceTestHelper.stddevOfIndexMetric, 15L) // 3 rows
+                )
+            )
+        )
+        .build();
+
+    Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+    TestHelper.assertExpectedObjects(expectedResults, results, "");
+
+    query = query.withLimitSpec(
+        new DefaultLimitSpec(
+            Arrays.<OrderByColumnSpec>asList(
+                OrderByColumnSpec.asc(
+                    VarianceTestHelper.stddevOfIndexMetric
+                )
+            ), 2
+        )
+    );
+
+    expectedResults = expect
+        .add("2011-04-01", "automotive", 2L, 269L, 299.0009819048282, 17.29164485827847)
+        .add("2011-04-01", "premium", 6L, 4416L, 252279.2020389339, 502.27403082275106)
+        .build();
+
+    results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+    TestHelper.assertExpectedObjects(expectedResults, results, "");
+  }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceSerdeTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceSerdeTest.java
new file mode 100644
index 0000000..47f5b00
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceSerdeTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import io.druid.segment.data.ObjectStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+public class VarianceSerdeTest
+{
+  @Test
+  public void testSerde()
+  {
+    Random r = new Random();
+    VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
+    ObjectStrategy strategy = new VarianceSerde().getObjectStrategy();
+    Assert.assertEquals(VarianceAggregatorCollector.class, strategy.getClazz());
+
+    for (int i = 0; i < 100; i++) {
+      byte[] array = strategy.toBytes(holder);
+      Assert.assertArrayEquals(array, holder.toByteArray());
+      Assert.assertEquals(holder, strategy.fromByteBuffer(ByteBuffer.wrap(array), array.length));
+      holder.add(r.nextFloat());
+    }
+  }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTestHelper.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTestHelper.java
new file mode 100644
index 0000000..3799d03
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTestHelper.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.druid.data.input.MapBasedRow;
+import io.druid.data.input.Row;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.aggregation.stats.DruidStatsModule;
+import org.joda.time.DateTime;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class VarianceTestHelper extends QueryRunnerTestHelper
+{
+  static {
+    DruidStatsModule module = new DruidStatsModule();
+    module.configure(null);
+  }
+
+  public static final String indexVarianceMetric = "index_var";
+
+  public static final VarianceAggregatorFactory indexVarianceAggr = new VarianceAggregatorFactory(
+      indexVarianceMetric,
+      indexMetric
+  );
+
+  public static final String stddevOfIndexMetric = "index_stddev";
+
+  public static final PostAggregator stddevOfIndexPostAggr = new StandardDeviationPostAggregator(
+      stddevOfIndexMetric,
+      indexVarianceMetric,
+      null
+  );
+
+  public static final List<AggregatorFactory> commonPlusVarAggregators = Arrays.asList(
+      rowsCount,
+      indexDoubleSum,
+      qualityUniques,
+      indexVarianceAggr
+  );
+
+  public static class RowBuilder
+  {
+    private final String[] names;
+    private final List<Row> rows = Lists.newArrayList();
+
+    public RowBuilder(String[] names)
+    {
+      this.names = names;
+    }
+
+    public RowBuilder add(final String timestamp, Object... values)
+    {
+      rows.add(build(timestamp, values));
+      return this;
+    }
+
+    public List<Row> build()
+    {
+      try {
+        return Lists.newArrayList(rows);
+      }
+      finally {
+        rows.clear();
+      }
+    }
+
+    public Row build(final String timestamp, Object... values)
+    {
+      Preconditions.checkArgument(names.length == values.length);
+
+      Map<String, Object> theVals = Maps.newHashMap();
+      for (int i = 0; i < values.length; i++) {
+        theVals.put(names[i], values[i]);
+      }
+      DateTime ts = new DateTime(timestamp);
+      return new MapBasedRow(ts, theVals);
+    }
+  }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTimeseriesQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTimeseriesQueryTest.java
new file mode 100644
index 0000000..8080c08
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTimeseriesQueryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.Lists;
+import com.metamx.common.guava.Sequences;
+import io.druid.query.Druids;
+import io.druid.query.QueryRunner;
+import io.druid.query.Result;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryRunnerTest;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.TestHelper;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class VarianceTimeseriesQueryTest
+{
+  @Parameterized.Parameters(name="{0}:descending={1}")
+  public static Iterable<Object[]> constructorFeeder() throws IOException
+  {
+    return TimeseriesQueryRunnerTest.constructorFeeder();
+  }
+
+  private final QueryRunner runner;
+  private final boolean descending;
+
+  public VarianceTimeseriesQueryTest(QueryRunner runner, boolean descending)
+  {
+    this.runner = runner;
+    this.descending = descending;
+  }
+
+  @Test
+  public void testTimeseriesWithNullFilterOnNonExistentDimension()
+  {
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource(VarianceTestHelper.dataSource)
+                                  .granularity(VarianceTestHelper.dayGran)
+                                  .filters("bobby", null)
+                                  .intervals(VarianceTestHelper.firstToThird)
+                                  .aggregators(VarianceTestHelper.commonPlusVarAggregators)
+                                  .postAggregators(
+                                      Arrays.<PostAggregator>asList(
+                                          VarianceTestHelper.addRowsIndexConstant,
+                                          VarianceTestHelper.stddevOfIndexPostAggr
+                                      )
+                                  )
+                                  .descending(descending)
+                                  .build();
+
+    List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            new DateTime("2011-04-01"),
+            new TimeseriesResultValue(
+                VarianceTestHelper.of(
+                    "rows", 13L,
+                    "index", 6626.151596069336,
+                    "addRowsIndexConstant", 6640.151596069336,
+                    "uniques", VarianceTestHelper.UNIQUES_9,
+                    "index_var", descending ? 368885.6897238851 : 368885.689155086,
+                    "index_stddev", descending ? 607.3596049490657 : 607.35960448081
+                )
+            )
+        ),
+        new Result<>(
+            new DateTime("2011-04-02"),
+            new TimeseriesResultValue(
+                VarianceTestHelper.of(
+                    "rows", 13L,
+                    "index", 5833.2095947265625,
+                    "addRowsIndexConstant", 5847.2095947265625,
+                    "uniques", VarianceTestHelper.UNIQUES_9,
+                    "index_var", descending ? 259061.6037088883 : 259061.60216419376,
+                    "index_stddev", descending ? 508.9809463122252 : 508.98094479478675
+                )
+            )
+        )
+    );
+
+    Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
+        runner.run(query, new HashMap<String, Object>()),
+        Lists.<Result<TimeseriesResultValue>>newArrayList()
+    );
+    assertExpectedResults(expectedResults, results);
+  }
+
+  private <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Iterable<Result<T>> results)
+  {
+    if (descending) {
+      expectedResults = TestHelper.revert(expectedResults);
+    }
+    TestHelper.assertExpectedResults(expectedResults, results);
+  }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTopNQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTopNQueryTest.java
new file mode 100644
index 0000000..eb2665d
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTopNQueryTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.metamx.common.guava.Sequence;
+import io.druid.query.QueryRunner;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
+import io.druid.query.aggregation.DoubleMinAggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.topn.TopNQuery;
+import io.druid.query.topn.TopNQueryBuilder;
+import io.druid.query.topn.TopNQueryConfig;
+import io.druid.query.topn.TopNQueryQueryToolChest;
+import io.druid.query.topn.TopNQueryRunnerTest;
+import io.druid.query.topn.TopNResultValue;
+import io.druid.segment.TestHelper;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class VarianceTopNQueryTest
+{
+  @Parameterized.Parameters
+  public static Iterable<Object[]> constructorFeeder() throws IOException
+  {
+    return TopNQueryRunnerTest.constructorFeeder();
+  }
+
+  private final QueryRunner runner;
+
+  public VarianceTopNQueryTest(
+      QueryRunner runner
+  )
+  {
+    this.runner = runner;
+  }
+
+  @Test
+  public void testFullOnTopNOverUniques()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(QueryRunnerTestHelper.marketDimension)
+        .metric(QueryRunnerTestHelper.uniqueMetric)
+        .threshold(3)
+        .intervals(QueryRunnerTestHelper.fullOnInterval)
+        .aggregators(
+            Lists.<AggregatorFactory>newArrayList(
+                Iterables.concat(
+                    VarianceTestHelper.commonPlusVarAggregators,
+                    Lists.newArrayList(
+                        new DoubleMaxAggregatorFactory("maxIndex", "index"),
+                        new DoubleMinAggregatorFactory("minIndex", "index")
+                    )
+                )
+            )
+        )
+        .postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+        new Result<TopNResultValue>(
+            new DateTime("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.<String, Object>builder()
+                                .put("market", "spot")
+                                .put("rows", 837L)
+                                .put("index", 95606.57232284546D)
+                                .put("addRowsIndexConstant", 96444.57232284546D)
+                                .put("uniques", QueryRunnerTestHelper.UNIQUES_9)
+                                .put("maxIndex", 277.2735290527344D)
+                                .put("minIndex", 59.02102279663086D)
+                                .put("index_var", 439.3851694586573D)
+                                .build(),
+                    ImmutableMap.<String, Object>builder()
+                                .put("market", "total_market")
+                                .put("rows", 186L)
+                                .put("index", 215679.82879638672D)
+                                .put("addRowsIndexConstant", 215866.82879638672D)
+                                .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
+                                .put("maxIndex", 1743.9217529296875D)
+                                .put("minIndex", 792.3260498046875D)
+                                .put("index_var", 27679.900887366413D)
+                                .build(),
+                    ImmutableMap.<String, Object>builder()
+                                .put("market", "upfront")
+                                .put("rows", 186L)
+                                .put("index", 192046.1060180664D)
+                                .put("addRowsIndexConstant", 192233.1060180664D)
+                                .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
+                                .put("maxIndex", 1870.06103515625D)
+                                .put("minIndex", 545.9906005859375D)
+                                .put("index_var", 79699.9780741607D)
+                                .build()
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  private Sequence<Result<TopNResultValue>> assertExpectedResults(
+      Iterable<Result<TopNResultValue>> expectedResults,
+      TopNQuery query
+  )
+  {
+    final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(
+        new TopNQueryConfig(),
+        QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+    );
+    final QueryRunner<Result<TopNResultValue>> mergeRunner = chest.mergeResults(runner);
+    final Sequence<Result<TopNResultValue>> retval = mergeRunner.run(query, ImmutableMap.<String, Object>of());
+    TestHelper.assertExpectedResults(expectedResults, retval);
+    return retval;
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index d7d2f5a..2f7471b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@
         <module>extensions-core/datasketches</module>
         <module>extensions-core/hdfs-storage</module>
         <module>extensions-core/histogram</module>
+        <module>extensions-core/stats</module>
         <module>extensions-core/kafka-eight</module>
         <module>extensions-core/kafka-extraction-namespace</module>
         <module>extensions-core/kafka-indexing-service</module>
diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java
index def4115..7fd4cb1 100644
--- a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java
+++ b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java
@@ -37,7 +37,7 @@
  */
 public class ArithmeticPostAggregator implements PostAggregator
 {
-  private static final Comparator DEFAULT_COMPARATOR = new Comparator()
+  public static final Comparator DEFAULT_COMPARATOR = new Comparator()
   {
     @Override
     public int compare(Object o, Object o1)
diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
index c76274e..b209dee 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
@@ -386,6 +386,22 @@
     );
   }
 
+  public GroupByQuery withLimitSpec(final LimitSpec limitSpec)
+  {
+    return new GroupByQuery(
+        getDataSource(),
+        getQuerySegmentSpec(),
+        getDimFilter(),
+        getGranularity(),
+        getDimensions(),
+        getAggregatorSpecs(),
+        getPostAggregatorSpecs(),
+        getHavingSpec(),
+        limitSpec,
+        getContext()
+    );
+  }
+
   public static class Builder
   {
     private DataSource dataSource;
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
index 9707fad..7464f5e 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
@@ -758,6 +758,22 @@
     return metricDesc != null ? metricDesc.getType() : null;
   }
 
+  public Class getMetricClass(String metric)
+  {
+    MetricDesc metricDesc = metricDescs.get(metric);
+    switch (metricDesc.getCapabilities().getType()) {
+      case COMPLEX:
+        return ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz();
+      case FLOAT:
+        return Float.TYPE;
+      case LONG:
+        return Long.TYPE;
+      case STRING:
+        return String.class;
+    }
+    return null;
+  }
+
   public Interval getInterval()
   {
     return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java
index ca2b22d..7df01f6 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java
@@ -21,7 +21,6 @@
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
-import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
@@ -52,8 +51,6 @@
 import io.druid.segment.data.IndexedInts;
 import io.druid.segment.data.ListIndexed;
 import io.druid.segment.filter.BooleanValueMatcher;
-import io.druid.segment.serde.ComplexMetricSerde;
-import io.druid.segment.serde.ComplexMetrics;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -70,7 +67,6 @@
  */
 public class IncrementalIndexStorageAdapter implements StorageAdapter
 {
-  private static final Splitter SPLITTER = Splitter.on(",");
   private static final NullDimensionSelector NULL_DIMENSION_SELECTOR = new NullDimensionSelector();
 
   private final IncrementalIndex index;
@@ -307,7 +303,7 @@
                 }
 
                 if (Thread.interrupted()) {
-                  throw new QueryInterruptedException( new InterruptedException());
+                  throw new QueryInterruptedException(new InterruptedException());
                 }
 
                 boolean foundMatched = false;
@@ -532,14 +528,13 @@
                 final Integer metricIndexInt = index.getMetricIndex(column);
                 if (metricIndexInt != null) {
                   final int metricIndex = metricIndexInt;
-
-                  final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(column));
+                  final Class classOfObject = index.getMetricClass(column);
                   return new ObjectColumnSelector()
                   {
                     @Override
                     public Class classOfObject()
                     {
-                      return serde.getObjectStrategy().getClazz();
+                      return classOfObject;
                     }
 
                     @Override
diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
index c3b4b3d..6198951 100644
--- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
+++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
@@ -21,6 +21,7 @@
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -100,6 +101,7 @@
 
   public static final QueryGranularity dayGran = QueryGranularities.DAY;
   public static final QueryGranularity allGran = QueryGranularities.ALL;
+  public static final String timeDimension = "__time";
   public static final String marketDimension = "market";
   public static final String qualityDimension = "quality";
   public static final String placementDimension = "placement";
@@ -119,9 +121,9 @@
 
   public static String dependentPostAggMetric = "dependentPostAgg";
   public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
-  public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
-  public static final LongSumAggregatorFactory __timeLongSum = new LongSumAggregatorFactory("sumtime", "__time");
-  public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
+  public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", indexMetric);
+  public static final LongSumAggregatorFactory __timeLongSum = new LongSumAggregatorFactory("sumtime", timeDimension);
+  public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", indexMetric);
   public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }";
   public static final String JS_RESET_0 = "function reset() { return 0; }";
   public static final JavaScriptAggregatorFactory jsIndexSumIfPlacementishA = new JavaScriptAggregatorFactory(
@@ -515,4 +517,13 @@
       }
     };
   }
+
+  public static Map<String, Object> of(Object... keyvalues)
+  {
+    ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+    for (int i = 0; i < keyvalues.length; i += 2) {
+      builder.put(String.valueOf(keyvalues[i]), keyvalues[i + 1]);
+    }
+    return builder.build();
+  }
 }
diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java
index a7bc025..20ae203 100644
--- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java
@@ -695,7 +695,7 @@
           if (acHolder.getEvent().get(ex.getKey()) instanceof Double) {
             actVal = ((Double) actVal).floatValue();
           }
-          Assert.assertEquals(ex.getValue(), actVal);
+          Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal);
         }
       }