Support variance and standard deviation
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 20038bf..88c5973f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -91,6 +91,8 @@
<argument>-c</argument>
<argument>io.druid.extensions:druid-s3-extensions</argument>
<argument>-c</argument>
+ <argument>io.druid.extensions:druid-stats</argument>
+ <argument>-c</argument>
<argument>io.druid.extensions:mysql-metadata-storage</argument>
<argument>-c</argument>
<argument>io.druid.extensions:postgresql-metadata-storage</argument>
diff --git a/docs/content/development/extensions-core/stats.md b/docs/content/development/extensions-core/stats.md
new file mode 100644
index 0000000..7b6afd7
--- /dev/null
+++ b/docs/content/development/extensions-core/stats.md
@@ -0,0 +1,152 @@
+---
+layout: doc_page
+---
+
+# Stats aggregator
+
+Includes stat-related aggregators, including variance and standard deviations, etc. Make sure to [include](../../operations/including-extensions.html) `druid-stats` as an extension.
+
+## Variance aggregator
+
+Algorithm of the aggregator is the same with that of apache hive. This is the description in GenericUDAFVariance in hive.
+
+Evaluate the variance using the algorithm described by Chan, Golub, and LeVeque in
+"Algorithms for computing the sample variance: analysis and recommendations"
+The American Statistician, 37 (1983) pp. 242--247.
+
+variance = variance1 + variance2 + n/(m*(m+n)) * pow(((m/n)*t1 - t2),2)
+
+where: - variance is sum[x-avg^2] (this is actually n times the variance)
+and is updated at every step. - n is the count of elements in chunk1 - m is
+the count of elements in chunk2 - t1 = sum of elements in chunk1, t2 =
+sum of elements in chunk2.
+
+This algorithm was proven to be numerically stable by J.L. Barlow in
+"Error analysis of a pairwise summation algorithm to compute sample variance"
+Numer. Math, 58 (1991) pp. 583--590
+
+### Pre-aggregating variance at ingestion time
+
+To use this feature, an "variance" aggregator must be included at indexing time.
+The ingestion aggregator can only apply to numeric values. If you use "variance"
+then any input rows missing the value will be considered to have a value of 0.
+
+User can specify expected input type as one of "float", "long", "variance" for ingestion, which is by default "float".
+
+```json
+{
+ "type" : "variance",
+ "name" : <output_name>,
+ "fieldName" : <metric_name>,
+ "inputType" : <input_type>,
+ "estimator" : <string>
+}
+```
+
+To query for results, "variance" aggregator with "variance" input type or simply a "varianceFold" aggregator must be included in the query.
+
+```json
+{
+ "type" : "varianceFold",
+ "name" : <output_name>,
+ "fieldName" : <metric_name>,
+ "estimator" : <string>
+}
+```
+
+|Property |Description |Default |
+|-------------------------|------------------------------|----------------------------------|
+|`estimator`|Set "population" to get variance_pop rather than variance_sample, which is default.|null|
+
+
+### Standard Deviation post-aggregator
+
+To acquire standard deviation from variance, user can use "stddev" post aggregator.
+
+```json
+{
+ "type": "stddev",
+ "name": "<output_name>",
+ "fieldName": "<aggregator_name>",
+ "estimator": <string>
+}
+```
+
+## Query Examples:
+
+### Timeseries Query
+
+```json
+{
+ "queryType": "timeseries",
+ "dataSource": "testing",
+ "granularity": "day",
+ "aggregations": [
+ {
+ "type": "variance",
+ "name": "index_var",
+ "fieldName": "index_var"
+ }
+ ]
+ "intervals": [
+ "2016-03-01T00:00:00.000/2013-03-20T00:00:00.000"
+ ]
+}
+```
+
+### TopN Query
+
+```json
+{
+ "queryType": "topN",
+ "dataSource": "testing",
+ "dimensions": ["alias"],
+ "threshold": 5,
+ "granularity": "all",
+ "aggregations": [
+ {
+ "type": "variance",
+ "name": "index_var",
+ "fieldName": "index"
+ }
+ ],
+ "postAggregations": [
+ {
+ "type": "stddev",
+ "name": "index_stddev",
+ "fieldName": "index_var"
+ }
+ ],
+ "intervals": [
+ "2016-03-06T00:00:00/2016-03-06T23:59:59"
+ ]
+}
+```
+
+### GroupBy Query
+
+```json
+{
+ "queryType": "groupBy",
+ "dataSource": "testing",
+ "dimensions": ["alias"],
+ "granularity": "all",
+ "aggregations": [
+ {
+ "type": "variance",
+ "name": "index_var",
+ "fieldName": "index"
+ }
+ ],
+ "postAggregations": [
+ {
+ "type": "stddev",
+ "name": "index_stddev",
+ "fieldName": "index_var"
+ }
+ ],
+ "intervals": [
+ "2016-03-06T00:00:00/2016-03-06T23:59:59"
+ ]
+}
+```
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index e95c397..5a4aec4 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -30,6 +30,7 @@
|druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
+|druid-stats|Statistics related module including variance and standard deviation, which is using the same algorithm with that of hive.|[link](../development/extensions-core/stats.html)|
|mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)|
|postgresql-metadata-storage|PostgreSQL metadata store.|[link](../development/extensions-core/postgresql.html)|
diff --git a/extensions-core/stats/pom.xml b/extensions-core/stats/pom.xml
new file mode 100644
index 0000000..cb9379c
--- /dev/null
+++ b/extensions-core/stats/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. Metamarkets licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>io.druid.extensions</groupId>
+ <artifactId>druid-stats</artifactId>
+ <name>druid-stats</name>
+ <description>druid-stats</description>
+
+ <parent>
+ <groupId>io.druid</groupId>
+ <artifactId>druid</artifactId>
+ <version>0.9.2-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Tests -->
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/stats/DruidStatsModule.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/stats/DruidStatsModule.java
new file mode 100644
index 0000000..cc136f5
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/stats/DruidStatsModule.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.stats;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import io.druid.initialization.DruidModule;
+import io.druid.query.aggregation.variance.StandardDeviationPostAggregator;
+import io.druid.query.aggregation.variance.VarianceAggregatorFactory;
+import io.druid.query.aggregation.variance.VarianceFoldingAggregatorFactory;
+import io.druid.query.aggregation.variance.VarianceSerde;
+import io.druid.segment.serde.ComplexMetrics;
+
+import java.util.List;
+
+/**
+ */
+public class DruidStatsModule implements DruidModule
+{
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new SimpleModule().registerSubtypes(
+ VarianceAggregatorFactory.class,
+ VarianceFoldingAggregatorFactory.class,
+ StandardDeviationPostAggregator.class
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ if (ComplexMetrics.getSerdeForType("variance") == null) {
+ ComplexMetrics.registerSerde("variance", new VarianceSerde());
+ }
+ }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java
new file mode 100644
index 0000000..2bdcb0d
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.aggregation.post.ArithmeticPostAggregator;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ */
+@JsonTypeName("stddev")
+public class StandardDeviationPostAggregator implements PostAggregator
+{
+ protected final String name;
+ protected final String fieldName;
+ protected final String estimator;
+
+ protected final boolean isVariancePop;
+
+ @JsonCreator
+ public StandardDeviationPostAggregator(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("estimator") String estimator
+ )
+ {
+ this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName is null");
+ this.name = Preconditions.checkNotNull(name, "name is null");
+ this.estimator = estimator;
+ this.isVariancePop = VarianceAggregatorCollector.isVariancePop(estimator);
+ }
+
+ @Override
+ public Set<String> getDependentFields()
+ {
+ return Sets.newHashSet(fieldName);
+ }
+
+ @Override
+ public Comparator<Double> getComparator()
+ {
+ return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
+ }
+
+ @Override
+ public Object compute(Map<String, Object> combinedAggregators)
+ {
+ return Math.sqrt(((VarianceAggregatorCollector) combinedAggregators.get(fieldName)).getVariance(isVariancePop));
+ }
+
+ @Override
+ @JsonProperty("name")
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty("fieldName")
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ @JsonProperty("estimator")
+ public String getEstimator()
+ {
+ return estimator;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StandardDeviationPostAggregator{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", isVariancePop='" + isVariancePop + '\'' +
+ '}';
+ }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregator.java
new file mode 100644
index 0000000..2553322
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import io.druid.query.aggregation.Aggregator;
+import io.druid.segment.FloatColumnSelector;
+import io.druid.segment.LongColumnSelector;
+import io.druid.segment.ObjectColumnSelector;
+
+/**
+ */
+public abstract class VarianceAggregator implements Aggregator
+{
+ protected final String name;
+
+ protected final VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
+
+ public VarianceAggregator(String name)
+ {
+ this.name = name;
+ }
+
+ @Override
+ public void reset()
+ {
+ holder.reset();
+ }
+
+ @Override
+ public Object get()
+ {
+ return holder;
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ @Override
+ public float getFloat()
+ {
+ throw new UnsupportedOperationException("VarianceAggregator does not support getFloat()");
+ }
+
+ @Override
+ public long getLong()
+ {
+ throw new UnsupportedOperationException("VarianceAggregator does not support getLong()");
+ }
+
+ public static final class FloatVarianceAggregator extends VarianceAggregator
+ {
+ private final FloatColumnSelector selector;
+
+ public FloatVarianceAggregator(String name, FloatColumnSelector selector)
+ {
+ super(name);
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate()
+ {
+ holder.add(selector.get());
+ }
+ }
+
+ public static final class LongVarianceAggregator extends VarianceAggregator
+ {
+ private final LongColumnSelector selector;
+
+ public LongVarianceAggregator(String name, LongColumnSelector selector)
+ {
+ super(name);
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate()
+ {
+ holder.add(selector.get());
+ }
+ }
+
+ public static final class ObjectVarianceAggregator extends VarianceAggregator
+ {
+ private final ObjectColumnSelector selector;
+
+ public ObjectVarianceAggregator(String name, ObjectColumnSelector selector)
+ {
+ super(name);
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate()
+ {
+ VarianceAggregatorCollector.combineValues(holder, selector.get());
+ }
+ }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java
new file mode 100644
index 0000000..4ab6bc2
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Longs;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ *
+ * Algorithm used here is copied from apache hive. This is description in GenericUDAFVariance
+ *
+ * Evaluate the variance using the algorithm described by Chan, Golub, and LeVeque in
+ * "Algorithms for computing the sample variance: analysis and recommendations"
+ * The American Statistician, 37 (1983) pp. 242--247.
+ *
+ * variance = variance1 + variance2 + n/(m*(m+n)) * pow(((m/n)*t1 - t2),2)
+ *
+ * where: - variance is sum[x-avg^2] (this is actually n times the variance)
+ * and is updated at every step. - n is the count of elements in chunk1 - m is
+ * the count of elements in chunk2 - t1 = sum of elements in chunk1, t2 =
+ * sum of elements in chunk2.
+ *
+ * This algorithm was proven to be numerically stable by J.L. Barlow in
+ * "Error analysis of a pairwise summation algorithm to compute sample variance"
+ * Numer. Math, 58 (1991) pp. 583--590
+ */
+public class VarianceAggregatorCollector
+{
+ public static boolean isVariancePop(String estimator) {
+ return estimator != null && estimator.equalsIgnoreCase("population");
+ }
+
+ public static VarianceAggregatorCollector from(ByteBuffer buffer)
+ {
+ return new VarianceAggregatorCollector(buffer.getLong(), buffer.getDouble(), buffer.getDouble());
+ }
+
+ public static final Comparator<VarianceAggregatorCollector> COMPARATOR = new Comparator<VarianceAggregatorCollector>()
+ {
+ @Override
+ public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o2)
+ {
+ int compare = Longs.compare(o1.count, o2.count);
+ if (compare == 0) {
+ compare = Doubles.compare(o1.sum, o2.sum);
+ if (compare == 0) {
+ compare = Doubles.compare(o1.nvariance, o2.nvariance);
+ }
+ }
+ return compare;
+ }
+ };
+
+ static Object combineValues(Object lhs, Object rhs)
+ {
+ final VarianceAggregatorCollector holder1 = (VarianceAggregatorCollector) lhs;
+ final VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) rhs;
+
+ if (holder2.count == 0) {
+ return holder1;
+ }
+ if (holder1.count == 0) {
+ holder1.nvariance = holder2.nvariance;
+ holder1.count = holder2.count;
+ holder1.sum = holder2.sum;
+ return holder1;
+ }
+
+ final double ratio = holder1.count / (double) holder2.count;
+ final double t = holder1.sum / ratio - holder2.sum;
+
+ holder1.nvariance += holder2.nvariance + (ratio / (holder1.count + holder2.count) * t * t);
+ holder1.count += holder2.count;
+ holder1.sum += holder2.sum;
+
+ return holder1;
+ }
+
+ static int getMaxIntermediateSize()
+ {
+ return Longs.BYTES + Doubles.BYTES + Doubles.BYTES;
+ }
+
+ long count; // number of elements
+ double sum; // sum of elements
+ double nvariance; // sum[x-avg^2] (this is actually n times of the variance)
+
+ public VarianceAggregatorCollector()
+ {
+ this(0, 0, 0);
+ }
+
+ public void reset()
+ {
+ count = 0;
+ sum = 0;
+ nvariance = 0;
+ }
+
+ public VarianceAggregatorCollector(long count, double sum, double nvariance)
+ {
+ this.count = count;
+ this.sum = sum;
+ this.nvariance = nvariance;
+ }
+
+ public VarianceAggregatorCollector add(float v)
+ {
+ count++;
+ sum += v;
+ if (count > 1) {
+ double t = count * v - sum;
+ nvariance += (t * t) / ((double) count * (count - 1));
+ }
+ return this;
+ }
+
+ public VarianceAggregatorCollector add(long v)
+ {
+ count++;
+ sum += v;
+ if (count > 1) {
+ double t = count * v - sum;
+ nvariance += (t * t) / ((double) count * (count - 1));
+ }
+ return this;
+ }
+
+ public double getVariance(boolean variancePop)
+ {
+ if (count == 0) {
+ // in SQL standard, we should return null for zero elements. But druid there should not be such a case
+ throw new IllegalStateException("should not be empty holder");
+ } else if (count == 1) {
+ return 0d;
+ } else {
+ return variancePop ? nvariance / count : nvariance / (count - 1);
+ }
+ }
+
+ @JsonValue
+ public byte[] toByteArray()
+ {
+ final ByteBuffer buffer = toByteBuffer();
+ buffer.flip();
+ byte[] theBytes = new byte[buffer.remaining()];
+ buffer.get(theBytes);
+
+ return theBytes;
+ }
+
+ public ByteBuffer toByteBuffer()
+ {
+ return ByteBuffer.allocate(Longs.BYTES + Doubles.BYTES + Doubles.BYTES)
+ .putLong(count)
+ .putDouble(sum)
+ .putDouble(nvariance);
+ }
+
+ @VisibleForTesting
+ boolean equalsWithEpsilon(VarianceAggregatorCollector o, double epsilon)
+ {
+ if (this == o) {
+ return true;
+ }
+
+ if (count != o.count) {
+ return false;
+ }
+ if (Math.abs(sum - o.sum) > epsilon) {
+ return false;
+ }
+ if (Math.abs(nvariance - o.nvariance) > epsilon) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ VarianceAggregatorCollector that = (VarianceAggregatorCollector) o;
+
+ if (count != that.count) {
+ return false;
+ }
+ if (Double.compare(that.sum, sum) != 0) {
+ return false;
+ }
+ if (Double.compare(that.nvariance, nvariance) != 0) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result;
+ long temp;
+ result = (int) (count ^ (count >>> 32));
+ temp = Double.doubleToLongBits(sum);
+ result = 31 * result + (int) (temp ^ (temp >>> 32));
+ temp = Double.doubleToLongBits(nvariance);
+ result = 31 * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "VarianceHolder{" +
+ "count=" + count +
+ ", sum=" + sum +
+ ", nvariance=" + nvariance +
+ '}';
+ }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java
new file mode 100644
index 0000000..d5b8571
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.metamx.common.IAE;
+import com.metamx.common.StringUtils;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
+import io.druid.query.aggregation.Aggregators;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.ObjectColumnSelector;
+import org.apache.commons.codec.binary.Base64;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ */
+@JsonTypeName("variance")
+public class VarianceAggregatorFactory extends AggregatorFactory
+{
+ protected static final byte CACHE_TYPE_ID = 16;
+
+ protected final String fieldName;
+ protected final String name;
+ protected final String estimator;
+ private final String inputType;
+
+ protected final boolean isVariancePop;
+
+ @JsonCreator
+ public VarianceAggregatorFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("estimator") String estimator,
+ @JsonProperty("inputType") String inputType
+ )
+ {
+ Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
+ Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
+
+ this.name = name;
+ this.fieldName = fieldName;
+ this.estimator = estimator;
+ this.isVariancePop = VarianceAggregatorCollector.isVariancePop(estimator);
+ this.inputType = inputType == null ? "float" : inputType;
+ }
+
+ public VarianceAggregatorFactory(String name, String fieldName)
+ {
+ this(name, fieldName, null, null);
+ }
+
+ @Override
+ public String getTypeName()
+ {
+ return "variance";
+ }
+
+ @Override
+ public int getMaxIntermediateSize()
+ {
+ return VarianceAggregatorCollector.getMaxIntermediateSize();
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
+ {
+ ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
+ if (selector == null) {
+ return Aggregators.noopAggregator();
+ }
+
+ if ("float".equalsIgnoreCase(inputType)) {
+ return new VarianceAggregator.FloatVarianceAggregator(
+ name,
+ metricFactory.makeFloatColumnSelector(fieldName)
+ );
+ } else if ("long".equalsIgnoreCase(inputType)) {
+ return new VarianceAggregator.LongVarianceAggregator(
+ name,
+ metricFactory.makeLongColumnSelector(fieldName)
+ );
+ } else if ("variance".equalsIgnoreCase(inputType)) {
+ return new VarianceAggregator.ObjectVarianceAggregator(name, selector);
+ }
+ throw new IAE(
+ "Incompatible type for metric[%s], expected a float, long or variance, got a %s", fieldName, inputType
+ );
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+ {
+ ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
+ if (selector == null) {
+ return Aggregators.noopBufferAggregator();
+ }
+ if ("float".equalsIgnoreCase(inputType)) {
+ return new VarianceBufferAggregator.FloatVarianceAggregator(
+ name,
+ metricFactory.makeFloatColumnSelector(fieldName)
+ );
+ } else if ("long".equalsIgnoreCase(inputType)) {
+ return new VarianceBufferAggregator.LongVarianceAggregator(
+ name,
+ metricFactory.makeLongColumnSelector(fieldName)
+ );
+ } else if ("variance".equalsIgnoreCase(inputType)) {
+ return new VarianceBufferAggregator.ObjectVarianceAggregator(name, selector);
+ }
+ throw new IAE(
+ "Incompatible type for metric[%s], expected a float, long or variance, got a %s", fieldName, inputType
+ );
+ }
+
+ @Override
+ public AggregatorFactory getCombiningFactory()
+ {
+ return new VarianceFoldingAggregatorFactory(name, name, estimator);
+ }
+
+ @Override
+ public List<AggregatorFactory> getRequiredColumns()
+ {
+ return Arrays.<AggregatorFactory>asList(new VarianceAggregatorFactory(fieldName, fieldName, estimator, inputType));
+ }
+
+ @Override
+ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
+ {
+ if (Objects.equals(getName(), other.getName()) && this.getClass() == other.getClass()) {
+ return getCombiningFactory();
+ } else {
+ throw new AggregatorFactoryNotMergeableException(this, other);
+ }
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return VarianceAggregatorCollector.COMPARATOR;
+ }
+
+ @Override
+ public Object getAggregatorStartValue()
+ {
+ return new VarianceAggregatorCollector();
+ }
+
+ @Override
+ public Object combine(Object lhs, Object rhs)
+ {
+ return VarianceAggregatorCollector.combineValues(lhs, rhs);
+ }
+
+ @Override
+ public Object finalizeComputation(Object object)
+ {
+ return ((VarianceAggregatorCollector) object).getVariance(isVariancePop);
+ }
+
+ @Override
+ public Object deserialize(Object object)
+ {
+ if (object instanceof byte[]) {
+ return VarianceAggregatorCollector.from(ByteBuffer.wrap((byte[]) object));
+ } else if (object instanceof ByteBuffer) {
+ return VarianceAggregatorCollector.from((ByteBuffer) object);
+ } else if (object instanceof String) {
+ return VarianceAggregatorCollector.from(
+ ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)))
+ );
+ }
+ return object;
+ }
+
+ @JsonProperty
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public String getEstimator()
+ {
+ return estimator;
+ }
+
+ @JsonProperty
+ public String getInputType()
+ {
+ return inputType;
+ }
+
+ @Override
+ public List<String> requiredFields()
+ {
+ return Arrays.asList(fieldName);
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
+ byte[] inputTypeBytes = StringUtils.toUtf8(inputType);
+ return ByteBuffer.allocate(2 + fieldNameBytes.length + 1 + inputTypeBytes.length)
+ .put(CACHE_TYPE_ID)
+ .put(isVariancePop ? (byte) 1 : 0)
+ .put(fieldNameBytes)
+ .put((byte) 0xFF)
+ .put(inputTypeBytes)
+ .array();
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{" +
+ "fieldName='" + fieldName + '\'' +
+ ", name='" + name + '\'' +
+ ", isVariancePop='" + isVariancePop + '\'' +
+ ", inputType='" + inputType + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ VarianceAggregatorFactory that = (VarianceAggregatorFactory) o;
+
+ if (!Objects.equals(name, that.name)) {
+ return false;
+ }
+ if (!Objects.equals(isVariancePop, that.isVariancePop)) {
+ return false;
+ }
+ if (!Objects.equals(inputType, that.inputType)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = fieldName.hashCode();
+ result = 31 * result + Objects.hashCode(name);
+ result = 31 * result + Objects.hashCode(isVariancePop);
+ result = 31 * result + Objects.hashCode(inputType);
+ return result;
+ }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceBufferAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceBufferAggregator.java
new file mode 100644
index 0000000..77017de
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceBufferAggregator.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Longs;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.segment.FloatColumnSelector;
+import io.druid.segment.LongColumnSelector;
+import io.druid.segment.ObjectColumnSelector;
+
+import java.nio.ByteBuffer;
+
+/**
+ */
+public abstract class VarianceBufferAggregator implements BufferAggregator
+{
+ private static final int COUNT_OFFSET = 0;
+ private static final int SUM_OFFSET = Longs.BYTES;
+ private static final int NVARIANCE_OFFSET = SUM_OFFSET + Doubles.BYTES;
+
+ protected final String name;
+
+ public VarianceBufferAggregator(String name)
+ {
+ this.name = name;
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ buf.putLong(position + COUNT_OFFSET, 0)
+ .putDouble(position + SUM_OFFSET, 0)
+ .putDouble(position + NVARIANCE_OFFSET, 0);
+ }
+
+ @Override
+ public Object get(final ByteBuffer buf, final int position)
+ {
+ VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
+ holder.count = buf.getLong(position);
+ holder.sum = buf.getDouble(position + SUM_OFFSET);
+ holder.nvariance = buf.getDouble(position + NVARIANCE_OFFSET);
+ return holder;
+ }
+
+ @Override
+ public float getFloat(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("VarianceBufferAggregator does not support getFloat()");
+ }
+
+ @Override
+ public long getLong(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("VarianceBufferAggregator does not support getFloat()");
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ public static final class FloatVarianceAggregator extends VarianceBufferAggregator
+ {
+ private final FloatColumnSelector selector;
+
+ public FloatVarianceAggregator(String name, FloatColumnSelector selector)
+ {
+ super(name);
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ float v = selector.get();
+ long count = buf.getLong(position + COUNT_OFFSET) + 1;
+ double sum = buf.getDouble(position + SUM_OFFSET) + v;
+ buf.putLong(position, count);
+ buf.putDouble(position + SUM_OFFSET, sum);
+ if (count > 1) {
+ double t = count * v - sum;
+ double variance = buf.getDouble(position + NVARIANCE_OFFSET) + (t * t) / ((double) count * (count - 1));
+ buf.putDouble(position + NVARIANCE_OFFSET, variance);
+ }
+ }
+ }
+
+ public static final class LongVarianceAggregator extends VarianceBufferAggregator
+ {
+ private final LongColumnSelector selector;
+
+ public LongVarianceAggregator(String name, LongColumnSelector selector)
+ {
+ super(name);
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ long v = selector.get();
+ long count = buf.getLong(position + COUNT_OFFSET) + 1;
+ double sum = buf.getDouble(position + SUM_OFFSET) + v;
+ buf.putLong(position, count);
+ buf.putDouble(position + SUM_OFFSET, sum);
+ if (count > 1) {
+ double t = count * v - sum;
+ double variance = buf.getDouble(position + NVARIANCE_OFFSET) + (t * t) / ((double) count * (count - 1));
+ buf.putDouble(position + NVARIANCE_OFFSET, variance);
+ }
+ }
+ }
+
+ public static final class ObjectVarianceAggregator extends VarianceBufferAggregator
+ {
+ private final ObjectColumnSelector selector;
+
+ public ObjectVarianceAggregator(String name, ObjectColumnSelector selector)
+ {
+ super(name);
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) selector.get();
+
+ long count = buf.getLong(position + COUNT_OFFSET);
+ if (count == 0) {
+ buf.putLong(position, holder2.count);
+ buf.putDouble(position + SUM_OFFSET, holder2.sum);
+ buf.putDouble(position + NVARIANCE_OFFSET, holder2.nvariance);
+ return;
+ }
+
+ double sum = buf.getDouble(position + SUM_OFFSET);
+ double nvariance = buf.getDouble(position + NVARIANCE_OFFSET);
+
+ final double ratio = count / (double) holder2.count;
+ final double t = sum / ratio - holder2.sum;
+
+ nvariance += holder2.nvariance + (ratio / (count + holder2.count) * t * t);
+ count += holder2.count;
+ sum += holder2.sum;
+
+ buf.putLong(position, count);
+ buf.putDouble(position + SUM_OFFSET, sum);
+ buf.putDouble(position + NVARIANCE_OFFSET, nvariance);
+ }
+ }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceFoldingAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceFoldingAggregatorFactory.java
new file mode 100644
index 0000000..a113418
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceFoldingAggregatorFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ */
+@JsonTypeName("varianceFold")
+public class VarianceFoldingAggregatorFactory extends VarianceAggregatorFactory
+{
+ public VarianceFoldingAggregatorFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("estimator") String estimator
+ )
+ {
+ super(name, fieldName, estimator, "variance");
+ }
+}
diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java
new file mode 100644
index 0000000..4bb83ad
--- /dev/null
+++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.Ordering;
+import io.druid.data.input.InputRow;
+import io.druid.segment.column.ColumnBuilder;
+import io.druid.segment.data.GenericIndexed;
+import io.druid.segment.data.ObjectStrategy;
+import io.druid.segment.serde.ComplexColumnPartSupplier;
+import io.druid.segment.serde.ComplexMetricExtractor;
+import io.druid.segment.serde.ComplexMetricSerde;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ */
+public class VarianceSerde extends ComplexMetricSerde
+{
+ private static final Ordering<VarianceAggregatorCollector> comparator =
+ Ordering.from(VarianceAggregatorCollector.COMPARATOR).nullsFirst();
+
+ @Override
+ public String getTypeName()
+ {
+ return "variance";
+ }
+
+ @Override
+ public ComplexMetricExtractor getExtractor()
+ {
+ return new ComplexMetricExtractor()
+ {
+ @Override
+ public Class<VarianceAggregatorCollector> extractedClass()
+ {
+ return VarianceAggregatorCollector.class;
+ }
+
+ @Override
+ public VarianceAggregatorCollector extractValue(InputRow inputRow, String metricName)
+ {
+ Object rawValue = inputRow.getRaw(metricName);
+
+ if (rawValue instanceof VarianceAggregatorCollector) {
+ return (VarianceAggregatorCollector) rawValue;
+ }
+ VarianceAggregatorCollector collector = new VarianceAggregatorCollector();
+
+ List<String> dimValues = inputRow.getDimension(metricName);
+ if (dimValues != null && dimValues.size() > 0) {
+ for (String dimValue : dimValues) {
+ float value = Float.parseFloat(dimValue);
+ collector.add(value);
+ }
+ }
+ return collector;
+ }
+ };
+ }
+
+ @Override
+ public void deserializeColumn(
+ ByteBuffer byteBuffer, ColumnBuilder columnBuilder
+ )
+ {
+ final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy());
+ columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));
+ }
+
+ @Override
+ public ObjectStrategy getObjectStrategy()
+ {
+ return new ObjectStrategy<VarianceAggregatorCollector>()
+ {
+ @Override
+ public Class<? extends VarianceAggregatorCollector> getClazz()
+ {
+ return VarianceAggregatorCollector.class;
+ }
+
+ @Override
+ public VarianceAggregatorCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
+ {
+ final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
+ readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
+ return VarianceAggregatorCollector.from(readOnlyBuffer);
+ }
+
+ @Override
+ public byte[] toBytes(VarianceAggregatorCollector collector)
+ {
+ return collector == null ? new byte[]{} : collector.toByteArray();
+ }
+
+ @Override
+ public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o2)
+ {
+ return comparator.compare(o1, o2);
+ }
+ };
+ }
+}
diff --git a/extensions-core/stats/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/stats/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 0000000..823ccff
--- /dev/null
+++ b/extensions-core/stats/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1 @@
+io.druid.query.aggregation.stats.DruidStatsModule
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorCollectorTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorCollectorTest.java
new file mode 100644
index 0000000..89d9217
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorCollectorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.Lists;
+import com.metamx.common.Pair;
+import io.druid.segment.FloatColumnSelector;
+import io.druid.segment.ObjectColumnSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+public class VarianceAggregatorCollectorTest
+{
+ private static final float[] market_upfront = new float[]{
+ 800.0f, 800.0f, 826.0602f, 1564.6177f, 1006.4021f, 869.64374f, 809.04175f, 1458.4027f, 852.4375f, 879.9881f,
+ 950.1468f, 712.7746f, 846.2675f, 682.8855f, 1109.875f, 594.3817f, 870.1159f, 677.511f, 1410.2781f, 1219.4321f,
+ 979.306f, 1224.5016f, 1215.5898f, 716.6092f, 1301.0233f, 786.3633f, 989.9315f, 1609.0967f, 1023.2952f, 1367.6381f,
+ 1627.598f, 810.8894f, 1685.5001f, 545.9906f, 1870.061f, 555.476f, 1643.3408f, 943.4972f, 1667.4978f, 913.5611f,
+ 1218.5619f, 1273.7074f, 888.70526f, 1113.1141f, 864.5689f, 1308.582f, 785.07886f, 1363.6149f, 787.1253f,
+ 826.0392f, 1107.2438f, 872.6257f, 1188.3693f, 911.9568f, 794.0988f, 1299.0933f, 1212.9283f, 901.3273f, 723.5143f,
+ 1061.9734f, 602.97955f, 879.4061f, 724.2625f, 862.93134f, 1133.1351f, 948.65796f, 807.6017f, 914.525f, 1553.3485f,
+ 1208.4567f, 679.6193f, 645.1777f, 1120.0887f, 1649.5333f, 1433.3988f, 1598.1793f, 1192.5631f, 1022.85455f,
+ 1228.5024f, 1298.4158f, 1345.9644f, 1291.898f, 1306.4957f, 1287.7667f, 1631.5844f, 578.79596f, 1017.5732f,
+ 782.0135f, 829.91626f, 1862.7379f, 873.3065f, 1427.0167f, 1430.2573f, 1101.9182f, 1166.1411f, 1004.94086f,
+ 740.1837f, 865.7779f, 901.30756f, 691.9589f, 1674.3317f, 975.57794f, 1360.6948f, 755.89935f, 771.34845f,
+ 869.30835f, 1095.6376f, 906.3738f, 988.8938f, 835.76263f, 776.70294f, 875.6834f, 1070.8363f, 835.46124f,
+ 715.5161f, 755.64655f, 771.1005f, 764.50806f, 736.40924f, 884.8373f, 918.72284f, 893.98505f, 832.8749f,
+ 850.995f, 767.9733f, 848.3399f, 878.6838f, 906.1019f, 1403.8302f, 936.4296f, 846.2884f, 856.4901f, 1032.2576f,
+ 954.7542f, 1031.99f, 907.02155f, 1110.789f, 843.95215f, 1362.6506f, 884.8015f, 1684.2688f, 873.65204f, 855.7177f,
+ 996.56415f, 1061.6786f, 962.2358f, 1019.8985f, 1056.4193f, 1198.7231f, 1108.1361f, 1289.0095f,
+ 1069.4318f, 1001.13403f, 1030.4995f, 1734.2749f, 1063.2012f, 1447.3412f, 1234.2476f, 1144.3424f, 1049.7385f,
+ 811.9913f, 768.4231f, 1151.0692f, 877.0794f, 1146.4231f, 902.6157f, 1355.8434f, 897.39343f, 1260.1431f, 762.8625f,
+ 935.168f, 782.10785f, 996.2054f, 767.69214f, 1031.7415f, 775.9656f, 1374.9684f, 853.163f, 1456.6118f, 811.92523f,
+ 989.0328f, 744.7446f, 1166.4012f, 753.105f, 962.7312f, 780.272f
+ };
+
+ private static final float[] market_total_market = new float[]{
+ 1000.0f, 1000.0f, 1040.9456f, 1689.0128f, 1049.142f, 1073.4766f, 1007.36554f, 1545.7089f, 1016.9652f, 1077.6127f,
+ 1075.0896f, 953.9954f, 1022.7833f, 937.06195f, 1156.7448f, 849.8775f, 1066.208f, 904.34064f, 1240.5255f,
+ 1343.2325f, 1088.9431f, 1349.2544f, 1102.8667f, 939.2441f, 1109.8754f, 997.99457f, 1037.4495f, 1686.4197f,
+ 1074.007f, 1486.2013f, 1300.3022f, 1021.3345f, 1314.6195f, 792.32605f, 1233.4489f, 805.9301f, 1184.9207f,
+ 1127.231f, 1203.4656f, 1100.9048f, 1097.2112f, 1410.793f, 1033.4012f, 1283.166f, 1025.6333f, 1331.861f,
+ 1039.5005f, 1332.4684f, 1011.20544f, 1029.9952f, 1047.2129f, 1057.08f, 1064.9727f, 1082.7277f, 971.0508f,
+ 1320.6383f, 1070.1655f, 1089.6478f, 980.3866f, 1179.6959f, 959.2362f, 1092.417f, 987.0674f, 1103.4583f,
+ 1091.2231f, 1199.6074f, 1044.3843f, 1183.2408f, 1289.0973f, 1360.0325f, 993.59125f, 1021.07117f, 1105.3834f,
+ 1601.8295f, 1200.5272f, 1600.7233f, 1317.4584f, 1304.3262f, 1544.1082f, 1488.7378f, 1224.8271f, 1421.6487f,
+ 1251.9062f, 1414.619f, 1350.1754f, 970.7283f, 1057.4272f, 1073.9673f, 996.4337f, 1743.9218f, 1044.5629f,
+ 1474.5911f, 1159.2788f, 1292.5428f, 1124.2014f, 1243.354f, 1051.809f, 1143.0784f, 1097.4907f, 1010.3703f,
+ 1326.8291f, 1179.8038f, 1281.6012f, 994.73126f, 1081.6504f, 1103.2397f, 1177.8584f, 1152.5477f, 1117.954f,
+ 1084.3325f, 1029.8025f, 1121.3854f, 1244.85f, 1077.2794f, 1098.5432f, 998.65076f, 1088.8076f, 1008.74554f,
+ 998.75397f, 1129.7233f, 1075.243f, 1141.5884f, 1037.3811f, 1099.1973f, 981.5773f, 1092.942f, 1072.2394f,
+ 1154.4156f, 1311.1786f, 1176.6052f, 1107.2202f, 1102.699f, 1285.0901f, 1217.5475f, 1283.957f, 1178.8302f,
+ 1301.7781f, 1119.2472f, 1403.3389f, 1156.6019f, 1429.5802f, 1137.8423f, 1124.9352f, 1256.4998f, 1217.8774f,
+ 1247.8909f, 1185.71f, 1345.7817f, 1250.1667f, 1390.754f, 1224.1162f, 1361.0802f, 1190.9337f, 1310.7971f,
+ 1466.2094f, 1366.4476f, 1314.8397f, 1522.0437f, 1193.5563f, 1321.375f, 1055.7837f, 1021.6387f, 1197.0084f,
+ 1131.532f, 1192.1443f, 1154.2896f, 1272.6771f, 1141.5146f, 1190.8961f, 1009.36316f, 1006.9138f, 1032.5999f,
+ 1137.3857f, 1030.0756f, 1005.25305f, 1030.0947f, 1112.7948f, 1113.3575f, 1153.9747f, 1069.6409f, 1016.13745f,
+ 994.9023f, 1032.1543f, 999.5864f, 994.75275f, 1029.057f
+ };
+
+ @Test
+ public void testVariance()
+ {
+ Random random = new Random();
+ for (float[] values : Arrays.asList(market_upfront, market_total_market)) {
+ double sum = 0;
+ for (float f : values) {
+ sum += f;
+ }
+ final double mean = sum / values.length;
+ double temp = 0;
+ for (float f : values) {
+ temp += Math.pow(f - mean, 2);
+ }
+
+ final double variance_pop = temp / values.length;
+ final double variance_sample = temp / (values.length - 1);
+
+ VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
+ for (float f : values) {
+ holder.add(f);
+ }
+ Assert.assertEquals(holder.getVariance(true), variance_pop, 0.001);
+ Assert.assertEquals(holder.getVariance(false), variance_sample, 0.001);
+
+ for (int mergeOn : new int[] {2, 3, 5, 9}) {
+ List<VarianceAggregatorCollector> holders1 = Lists.newArrayListWithCapacity(mergeOn);
+ List<Pair<VarianceBufferAggregator, ByteBuffer>> holders2 = Lists.newArrayListWithCapacity(mergeOn);
+
+ FloatHandOver valueHandOver = new FloatHandOver();
+ for (int i = 0; i < mergeOn; i++) {
+ holders1.add(new VarianceAggregatorCollector());
+ holders2.add(Pair.<VarianceBufferAggregator, ByteBuffer>of(
+ new VarianceBufferAggregator.FloatVarianceAggregator("XX", valueHandOver),
+ ByteBuffer.allocate(VarianceAggregatorCollector.getMaxIntermediateSize())
+ ));
+ }
+ for (float f : values) {
+ valueHandOver.v = f;
+ int index = random.nextInt(mergeOn);
+ holders1.get(index).add(f);
+ holders2.get(index).lhs.aggregate(holders2.get(index).rhs, 0);
+ }
+ VarianceAggregatorCollector holder1 = holders1.get(0);
+ for (int i = 1; i < mergeOn; i++) {
+ holder1 = (VarianceAggregatorCollector) VarianceAggregatorCollector.combineValues(holder1, holders1.get(i));
+ }
+ ObjectHandOver collectHandOver = new ObjectHandOver();
+ ByteBuffer buffer = ByteBuffer.allocate(VarianceAggregatorCollector.getMaxIntermediateSize());
+ VarianceBufferAggregator.ObjectVarianceAggregator merger = new VarianceBufferAggregator.ObjectVarianceAggregator("xxx", collectHandOver);
+ for (int i = 0; i < mergeOn; i++) {
+ collectHandOver.v = holders2.get(i).lhs.get(holders2.get(i).rhs, 0);
+ merger.aggregate(buffer, 0);
+ }
+ VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) merger.get(buffer, 0);
+ Assert.assertEquals(holder2.getVariance(true), variance_pop, 0.01);
+ Assert.assertEquals(holder2.getVariance(false), variance_sample, 0.01);
+ }
+ }
+ }
+
+ private static class FloatHandOver implements FloatColumnSelector
+ {
+ float v;
+
+ @Override
+ public float get()
+ {
+ return v;
+ }
+ }
+
+ private static class ObjectHandOver implements ObjectColumnSelector
+ {
+ Object v;
+
+ @Override
+ public Class classOfObject()
+ {
+ return v == null ? Object.class : v.getClass();
+ }
+
+ @Override
+ public Object get()
+ {
+ return v;
+ }
+ }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorTest.java
new file mode 100644
index 0000000..9beb980
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.aggregation.TestFloatColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import io.druid.segment.ColumnSelectorFactory;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+ */
+public class VarianceAggregatorTest
+{
+ private VarianceAggregatorFactory aggFactory;
+ private ColumnSelectorFactory colSelectorFactory;
+ private TestFloatColumnSelector selector;
+
+ private final float[] values = {1.1f, 2.7f, 3.5f, 1.3f};
+ private final double[] variances_pop = new double[values.length]; // calculated
+ private final double[] variances_samp = new double[values.length]; // calculated
+
+ public VarianceAggregatorTest() throws Exception
+ {
+ String aggSpecJson = "{\"type\": \"variance\", \"name\": \"billy\", \"fieldName\": \"nilly\"}";
+ aggFactory = new DefaultObjectMapper().readValue(aggSpecJson, VarianceAggregatorFactory.class);
+ double sum = 0;
+ for (int i = 0; i < values.length; i++) {
+ sum += values[i];
+ if (i > 0) {
+ double mean = sum / (i + 1);
+ double temp = 0;
+ for (int j = 0; j <= i; j++) {
+ temp += Math.pow(values[j] - mean, 2);
+ }
+ variances_pop[i] = temp / (i + 1);
+ variances_samp[i] = temp / i;
+ }
+ }
+ }
+
+ @Before
+ public void setup()
+ {
+ selector = new TestFloatColumnSelector(values);
+ colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
+ EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("nilly")).andReturn(new TestObjectColumnSelector(0.0f));
+ EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(selector);
+ EasyMock.replay(colSelectorFactory);
+ }
+
+ @Test
+ public void testDoubleVarianceAggregator()
+ {
+ VarianceAggregator agg = (VarianceAggregator) aggFactory.factorize(colSelectorFactory);
+
+ Assert.assertEquals("billy", agg.getName());
+
+ assertValues((VarianceAggregatorCollector) agg.get(), 0, 0d, 0d);
+ aggregate(selector, agg);
+ assertValues((VarianceAggregatorCollector) agg.get(), 1, 1.1d, 0d);
+ aggregate(selector, agg);
+ assertValues((VarianceAggregatorCollector) agg.get(), 2, 3.8d, 1.28d);
+ aggregate(selector, agg);
+ assertValues((VarianceAggregatorCollector) agg.get(), 3, 7.3d, 2.9866d);
+ aggregate(selector, agg);
+ assertValues((VarianceAggregatorCollector) agg.get(), 4, 8.6d, 3.95d);
+
+ agg.reset();
+ assertValues((VarianceAggregatorCollector) agg.get(), 0, 0d, 0d);
+ }
+
+ private void assertValues(VarianceAggregatorCollector holder, long count, double sum, double nvariance)
+ {
+ Assert.assertEquals(count, holder.count);
+ Assert.assertEquals(sum, holder.sum, 0.0001);
+ Assert.assertEquals(nvariance, holder.nvariance, 0.0001);
+ if (count == 0) {
+ try {
+ holder.getVariance(false);
+ Assert.fail("Should throw ISE");
+ }
+ catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("should not be empty holder"));
+ }
+ } else {
+ Assert.assertEquals(holder.getVariance(true), variances_pop[(int) count - 1], 0.0001);
+ Assert.assertEquals(holder.getVariance(false), variances_samp[(int) count - 1], 0.0001);
+ }
+ }
+
+ @Test
+ public void testDoubleVarianceBufferAggregator()
+ {
+ VarianceBufferAggregator agg = (VarianceBufferAggregator) aggFactory.factorizeBuffered(
+ colSelectorFactory
+ );
+
+ ByteBuffer buffer = ByteBuffer.wrap(new byte[aggFactory.getMaxIntermediateSize()]);
+ agg.init(buffer, 0);
+
+ assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 0, 0d, 0d);
+ aggregate(selector, agg, buffer, 0);
+ assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 1, 1.1d, 0d);
+ aggregate(selector, agg, buffer, 0);
+ assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 2, 3.8d, 1.28d);
+ aggregate(selector, agg, buffer, 0);
+ assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 3, 7.3d, 2.9866d);
+ aggregate(selector, agg, buffer, 0);
+ assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 4, 8.6d, 3.95d);
+ }
+
+ @Test
+ public void testCombine()
+ {
+ VarianceAggregatorCollector holder1 = new VarianceAggregatorCollector().add(1.1f).add(2.7f);
+ VarianceAggregatorCollector holder2 = new VarianceAggregatorCollector().add(3.5f).add(1.3f);
+ VarianceAggregatorCollector expected = new VarianceAggregatorCollector(4, 8.6d, 3.95d);
+ Assert.assertTrue(expected.equalsWithEpsilon((VarianceAggregatorCollector) aggFactory.combine(holder1, holder2), 0.00001));
+ }
+
+ @Test
+ public void testEqualsAndHashCode() throws Exception
+ {
+ VarianceAggregatorFactory one = new VarianceAggregatorFactory("name1", "fieldName1");
+ VarianceAggregatorFactory oneMore = new VarianceAggregatorFactory("name1", "fieldName1");
+ VarianceAggregatorFactory two = new VarianceAggregatorFactory("name2", "fieldName2");
+
+ Assert.assertEquals(one.hashCode(), oneMore.hashCode());
+
+ Assert.assertTrue(one.equals(oneMore));
+ Assert.assertFalse(one.equals(two));
+ }
+
+ private void aggregate(TestFloatColumnSelector selector, VarianceAggregator agg)
+ {
+ agg.aggregate();
+ selector.increment();
+ }
+
+ private void aggregate(
+ TestFloatColumnSelector selector,
+ VarianceBufferAggregator agg,
+ ByteBuffer buff,
+ int position
+ )
+ {
+ agg.aggregate(buff, position);
+ selector.increment();
+ }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java
new file mode 100644
index 0000000..a45901b
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.data.input.Row;
+import io.druid.granularity.PeriodGranularity;
+import io.druid.query.QueryRunner;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DefaultDimensionSpec;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.groupby.GroupByQueryConfig;
+import io.druid.query.groupby.GroupByQueryRunnerFactory;
+import io.druid.query.groupby.GroupByQueryRunnerTest;
+import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
+import io.druid.query.groupby.having.GreaterThanHavingSpec;
+import io.druid.query.groupby.having.HavingSpec;
+import io.druid.query.groupby.having.OrHavingSpec;
+import io.druid.query.groupby.orderby.DefaultLimitSpec;
+import io.druid.query.groupby.orderby.OrderByColumnSpec;
+import io.druid.segment.TestHelper;
+import org.joda.time.Period;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ */
+@RunWith(Parameterized.class)
+public class VarianceGroupByQueryTest
+{
+ private final GroupByQueryConfig config;
+ private final QueryRunner<Row> runner;
+ private final GroupByQueryRunnerFactory factory;
+
+ @Parameterized.Parameters
+ public static Collection<?> constructorFeeder() throws IOException
+ {
+ return GroupByQueryRunnerTest.constructorFeeder();
+ }
+
+ public VarianceGroupByQueryTest(GroupByQueryConfig config, GroupByQueryRunnerFactory factory, QueryRunner runner)
+ {
+ this.config = config;
+ this.factory = factory;
+ this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
+ }
+
+ @Test
+ public void testGroupByVarianceOnly()
+ {
+ GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
+ .setAggregatorSpecs(Arrays.<AggregatorFactory>asList(VarianceTestHelper.indexVarianceAggr))
+ .setPostAggregatorSpecs(Arrays.<PostAggregator>asList(VarianceTestHelper.stddevOfIndexPostAggr))
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .build();
+
+ VarianceTestHelper.RowBuilder builder =
+ new VarianceTestHelper.RowBuilder(new String[]{"alias", "index_stddev", "index_var"});
+
+ List<Row> expectedResults = builder
+ .add("2011-04-01", "automotive", 0d, 0d)
+ .add("2011-04-01", "business", 0d, 0d)
+ .add("2011-04-01", "entertainment", 0d, 0d)
+ .add("2011-04-01", "health", 0d, 0d)
+ .add("2011-04-01", "mezzanine", 737.0179286322613d, 543195.4271253889d)
+ .add("2011-04-01", "news", 0d, 0d)
+ .add("2011-04-01", "premium", 726.6322593583996d, 527994.4403402924d)
+ .add("2011-04-01", "technology", 0d, 0d)
+ .add("2011-04-01", "travel", 0d, 0d)
+
+ .add("2011-04-02", "automotive", 0d, 0d)
+ .add("2011-04-02", "business", 0d, 0d)
+ .add("2011-04-02", "entertainment", 0d, 0d)
+ .add("2011-04-02", "health", 0d, 0d)
+ .add("2011-04-02", "mezzanine", 611.3420766546617d, 373739.13468843425d)
+ .add("2011-04-02", "news", 0d, 0d)
+ .add("2011-04-02", "premium", 621.3898134843073d, 386125.30030206224d)
+ .add("2011-04-02", "technology", 0d, 0d)
+ .add("2011-04-02", "travel", 0d, 0d)
+ .build();
+
+ Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+ TestHelper.assertExpectedObjects(expectedResults, results, "");
+ }
+
+ @Test
+ public void testGroupBy()
+ {
+ GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
+ .setAggregatorSpecs(
+ Arrays.asList(
+ VarianceTestHelper.rowsCount,
+ VarianceTestHelper.indexVarianceAggr,
+ new LongSumAggregatorFactory("idx", "index")
+ )
+ )
+ .setPostAggregatorSpecs(
+ Arrays.<PostAggregator>asList(VarianceTestHelper.stddevOfIndexPostAggr)
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .build();
+
+ VarianceTestHelper.RowBuilder builder =
+ new VarianceTestHelper.RowBuilder(new String[]{"alias", "rows", "idx", "index_stddev", "index_var"});
+
+ List<Row> expectedResults = builder
+ .add("2011-04-01", "automotive", 1L, 135L, 0d, 0d)
+ .add("2011-04-01", "business", 1L, 118L, 0d, 0d)
+ .add("2011-04-01", "entertainment", 1L, 158L, 0d, 0d)
+ .add("2011-04-01", "health", 1L, 120L, 0d, 0d)
+ .add("2011-04-01", "mezzanine", 3L, 2870L, 737.0179286322613d, 543195.4271253889d)
+ .add("2011-04-01", "news", 1L, 121L, 0d, 0d)
+ .add("2011-04-01", "premium", 3L, 2900L, 726.6322593583996d, 527994.4403402924d)
+ .add("2011-04-01", "technology", 1L, 78L, 0d, 0d)
+ .add("2011-04-01", "travel", 1L, 119L, 0d, 0d)
+
+ .add("2011-04-02", "automotive", 1L, 147L, 0d, 0d)
+ .add("2011-04-02", "business", 1L, 112L, 0d, 0d)
+ .add("2011-04-02", "entertainment", 1L, 166L, 0d, 0d)
+ .add("2011-04-02", "health", 1L, 113L, 0d, 0d)
+ .add("2011-04-02", "mezzanine", 3L, 2447L, 611.3420766546617d, 373739.13468843425d)
+ .add("2011-04-02", "news", 1L, 114L, 0d, 0d)
+ .add("2011-04-02", "premium", 3L, 2505L, 621.3898134843073d, 386125.30030206224d)
+ .add("2011-04-02", "technology", 1L, 97L, 0d, 0d)
+ .add("2011-04-02", "travel", 1L, 126L, 0d, 0d)
+ .build();
+
+ Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+ TestHelper.assertExpectedObjects(expectedResults, results, "");
+ }
+
+ @Test
+ public void testPostAggHavingSpec()
+ {
+ VarianceTestHelper.RowBuilder expect = new VarianceTestHelper.RowBuilder(
+ new String[]{"alias", "rows", "index", "index_var", "index_stddev"}
+ );
+
+ List<Row> expectedResults = expect
+ .add("2011-04-01", "automotive", 2L, 269L, 299.0009819048282, 17.29164485827847)
+ .add("2011-04-01", "mezzanine", 6L, 4420L, 254083.76447001836, 504.06722217380724)
+ .add("2011-04-01", "premium", 6L, 4416L, 252279.2020389339, 502.27403082275106)
+ .build();
+
+ GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(VarianceTestHelper.dataSource)
+ .setInterval("2011-04-02/2011-04-04")
+ .setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
+ .setAggregatorSpecs(
+ Arrays.asList(
+ VarianceTestHelper.rowsCount,
+ VarianceTestHelper.indexLongSum,
+ VarianceTestHelper.indexVarianceAggr
+ )
+ )
+ .setPostAggregatorSpecs(ImmutableList.<PostAggregator>of(VarianceTestHelper.stddevOfIndexPostAggr))
+ .setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
+ .setHavingSpec(
+ new OrHavingSpec(
+ ImmutableList.<HavingSpec>of(
+ new GreaterThanHavingSpec(VarianceTestHelper.stddevOfIndexMetric, 15L) // 3 rows
+ )
+ )
+ )
+ .build();
+
+ Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+ TestHelper.assertExpectedObjects(expectedResults, results, "");
+
+ query = query.withLimitSpec(
+ new DefaultLimitSpec(
+ Arrays.<OrderByColumnSpec>asList(
+ OrderByColumnSpec.asc(
+ VarianceTestHelper.stddevOfIndexMetric
+ )
+ ), 2
+ )
+ );
+
+ expectedResults = expect
+ .add("2011-04-01", "automotive", 2L, 269L, 299.0009819048282, 17.29164485827847)
+ .add("2011-04-01", "premium", 6L, 4416L, 252279.2020389339, 502.27403082275106)
+ .build();
+
+ results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+ TestHelper.assertExpectedObjects(expectedResults, results, "");
+ }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceSerdeTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceSerdeTest.java
new file mode 100644
index 0000000..47f5b00
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceSerdeTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import io.druid.segment.data.ObjectStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+public class VarianceSerdeTest
+{
+ @Test
+ public void testSerde()
+ {
+ Random r = new Random();
+ VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
+ ObjectStrategy strategy = new VarianceSerde().getObjectStrategy();
+ Assert.assertEquals(VarianceAggregatorCollector.class, strategy.getClazz());
+
+ for (int i = 0; i < 100; i++) {
+ byte[] array = strategy.toBytes(holder);
+ Assert.assertArrayEquals(array, holder.toByteArray());
+ Assert.assertEquals(holder, strategy.fromByteBuffer(ByteBuffer.wrap(array), array.length));
+ holder.add(r.nextFloat());
+ }
+ }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTestHelper.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTestHelper.java
new file mode 100644
index 0000000..3799d03
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTestHelper.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.druid.data.input.MapBasedRow;
+import io.druid.data.input.Row;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.aggregation.stats.DruidStatsModule;
+import org.joda.time.DateTime;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class VarianceTestHelper extends QueryRunnerTestHelper
+{
+ static {
+ DruidStatsModule module = new DruidStatsModule();
+ module.configure(null);
+ }
+
+ public static final String indexVarianceMetric = "index_var";
+
+ public static final VarianceAggregatorFactory indexVarianceAggr = new VarianceAggregatorFactory(
+ indexVarianceMetric,
+ indexMetric
+ );
+
+ public static final String stddevOfIndexMetric = "index_stddev";
+
+ public static final PostAggregator stddevOfIndexPostAggr = new StandardDeviationPostAggregator(
+ stddevOfIndexMetric,
+ indexVarianceMetric,
+ null
+ );
+
+ public static final List<AggregatorFactory> commonPlusVarAggregators = Arrays.asList(
+ rowsCount,
+ indexDoubleSum,
+ qualityUniques,
+ indexVarianceAggr
+ );
+
+ public static class RowBuilder
+ {
+ private final String[] names;
+ private final List<Row> rows = Lists.newArrayList();
+
+ public RowBuilder(String[] names)
+ {
+ this.names = names;
+ }
+
+ public RowBuilder add(final String timestamp, Object... values)
+ {
+ rows.add(build(timestamp, values));
+ return this;
+ }
+
+ public List<Row> build()
+ {
+ try {
+ return Lists.newArrayList(rows);
+ }
+ finally {
+ rows.clear();
+ }
+ }
+
+ public Row build(final String timestamp, Object... values)
+ {
+ Preconditions.checkArgument(names.length == values.length);
+
+ Map<String, Object> theVals = Maps.newHashMap();
+ for (int i = 0; i < values.length; i++) {
+ theVals.put(names[i], values[i]);
+ }
+ DateTime ts = new DateTime(timestamp);
+ return new MapBasedRow(ts, theVals);
+ }
+ }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTimeseriesQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTimeseriesQueryTest.java
new file mode 100644
index 0000000..8080c08
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTimeseriesQueryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.Lists;
+import com.metamx.common.guava.Sequences;
+import io.druid.query.Druids;
+import io.druid.query.QueryRunner;
+import io.druid.query.Result;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryRunnerTest;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.TestHelper;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class VarianceTimeseriesQueryTest
+{
+ @Parameterized.Parameters(name="{0}:descending={1}")
+ public static Iterable<Object[]> constructorFeeder() throws IOException
+ {
+ return TimeseriesQueryRunnerTest.constructorFeeder();
+ }
+
+ private final QueryRunner runner;
+ private final boolean descending;
+
+ public VarianceTimeseriesQueryTest(QueryRunner runner, boolean descending)
+ {
+ this.runner = runner;
+ this.descending = descending;
+ }
+
+ @Test
+ public void testTimeseriesWithNullFilterOnNonExistentDimension()
+ {
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource(VarianceTestHelper.dataSource)
+ .granularity(VarianceTestHelper.dayGran)
+ .filters("bobby", null)
+ .intervals(VarianceTestHelper.firstToThird)
+ .aggregators(VarianceTestHelper.commonPlusVarAggregators)
+ .postAggregators(
+ Arrays.<PostAggregator>asList(
+ VarianceTestHelper.addRowsIndexConstant,
+ VarianceTestHelper.stddevOfIndexPostAggr
+ )
+ )
+ .descending(descending)
+ .build();
+
+ List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
+ new Result<>(
+ new DateTime("2011-04-01"),
+ new TimeseriesResultValue(
+ VarianceTestHelper.of(
+ "rows", 13L,
+ "index", 6626.151596069336,
+ "addRowsIndexConstant", 6640.151596069336,
+ "uniques", VarianceTestHelper.UNIQUES_9,
+ "index_var", descending ? 368885.6897238851 : 368885.689155086,
+ "index_stddev", descending ? 607.3596049490657 : 607.35960448081
+ )
+ )
+ ),
+ new Result<>(
+ new DateTime("2011-04-02"),
+ new TimeseriesResultValue(
+ VarianceTestHelper.of(
+ "rows", 13L,
+ "index", 5833.2095947265625,
+ "addRowsIndexConstant", 5847.2095947265625,
+ "uniques", VarianceTestHelper.UNIQUES_9,
+ "index_var", descending ? 259061.6037088883 : 259061.60216419376,
+ "index_stddev", descending ? 508.9809463122252 : 508.98094479478675
+ )
+ )
+ )
+ );
+
+ Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
+ runner.run(query, new HashMap<String, Object>()),
+ Lists.<Result<TimeseriesResultValue>>newArrayList()
+ );
+ assertExpectedResults(expectedResults, results);
+ }
+
+ private <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Iterable<Result<T>> results)
+ {
+ if (descending) {
+ expectedResults = TestHelper.revert(expectedResults);
+ }
+ TestHelper.assertExpectedResults(expectedResults, results);
+ }
+}
diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTopNQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTopNQueryTest.java
new file mode 100644
index 0000000..eb2665d
--- /dev/null
+++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTopNQueryTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.variance;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.metamx.common.guava.Sequence;
+import io.druid.query.QueryRunner;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
+import io.druid.query.aggregation.DoubleMinAggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.topn.TopNQuery;
+import io.druid.query.topn.TopNQueryBuilder;
+import io.druid.query.topn.TopNQueryConfig;
+import io.druid.query.topn.TopNQueryQueryToolChest;
+import io.druid.query.topn.TopNQueryRunnerTest;
+import io.druid.query.topn.TopNResultValue;
+import io.druid.segment.TestHelper;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class VarianceTopNQueryTest
+{
+ @Parameterized.Parameters
+ public static Iterable<Object[]> constructorFeeder() throws IOException
+ {
+ return TopNQueryRunnerTest.constructorFeeder();
+ }
+
+ private final QueryRunner runner;
+
+ public VarianceTopNQueryTest(
+ QueryRunner runner
+ )
+ {
+ this.runner = runner;
+ }
+
+ @Test
+ public void testFullOnTopNOverUniques()
+ {
+ TopNQuery query = new TopNQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.dataSource)
+ .granularity(QueryRunnerTestHelper.allGran)
+ .dimension(QueryRunnerTestHelper.marketDimension)
+ .metric(QueryRunnerTestHelper.uniqueMetric)
+ .threshold(3)
+ .intervals(QueryRunnerTestHelper.fullOnInterval)
+ .aggregators(
+ Lists.<AggregatorFactory>newArrayList(
+ Iterables.concat(
+ VarianceTestHelper.commonPlusVarAggregators,
+ Lists.newArrayList(
+ new DoubleMaxAggregatorFactory("maxIndex", "index"),
+ new DoubleMinAggregatorFactory("minIndex", "index")
+ )
+ )
+ )
+ )
+ .postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
+ .build();
+
+ List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+ new Result<TopNResultValue>(
+ new DateTime("2011-01-12T00:00:00.000Z"),
+ new TopNResultValue(
+ Arrays.<Map<String, Object>>asList(
+ ImmutableMap.<String, Object>builder()
+ .put("market", "spot")
+ .put("rows", 837L)
+ .put("index", 95606.57232284546D)
+ .put("addRowsIndexConstant", 96444.57232284546D)
+ .put("uniques", QueryRunnerTestHelper.UNIQUES_9)
+ .put("maxIndex", 277.2735290527344D)
+ .put("minIndex", 59.02102279663086D)
+ .put("index_var", 439.3851694586573D)
+ .build(),
+ ImmutableMap.<String, Object>builder()
+ .put("market", "total_market")
+ .put("rows", 186L)
+ .put("index", 215679.82879638672D)
+ .put("addRowsIndexConstant", 215866.82879638672D)
+ .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
+ .put("maxIndex", 1743.9217529296875D)
+ .put("minIndex", 792.3260498046875D)
+ .put("index_var", 27679.900887366413D)
+ .build(),
+ ImmutableMap.<String, Object>builder()
+ .put("market", "upfront")
+ .put("rows", 186L)
+ .put("index", 192046.1060180664D)
+ .put("addRowsIndexConstant", 192233.1060180664D)
+ .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
+ .put("maxIndex", 1870.06103515625D)
+ .put("minIndex", 545.9906005859375D)
+ .put("index_var", 79699.9780741607D)
+ .build()
+ )
+ )
+ )
+ );
+ assertExpectedResults(expectedResults, query);
+ }
+
+ private Sequence<Result<TopNResultValue>> assertExpectedResults(
+ Iterable<Result<TopNResultValue>> expectedResults,
+ TopNQuery query
+ )
+ {
+ final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(
+ new TopNQueryConfig(),
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+ );
+ final QueryRunner<Result<TopNResultValue>> mergeRunner = chest.mergeResults(runner);
+ final Sequence<Result<TopNResultValue>> retval = mergeRunner.run(query, ImmutableMap.<String, Object>of());
+ TestHelper.assertExpectedResults(expectedResults, retval);
+ return retval;
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index d7d2f5a..2f7471b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@
<module>extensions-core/datasketches</module>
<module>extensions-core/hdfs-storage</module>
<module>extensions-core/histogram</module>
+ <module>extensions-core/stats</module>
<module>extensions-core/kafka-eight</module>
<module>extensions-core/kafka-extraction-namespace</module>
<module>extensions-core/kafka-indexing-service</module>
diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java
index def4115..7fd4cb1 100644
--- a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java
+++ b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java
@@ -37,7 +37,7 @@
*/
public class ArithmeticPostAggregator implements PostAggregator
{
- private static final Comparator DEFAULT_COMPARATOR = new Comparator()
+ public static final Comparator DEFAULT_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o, Object o1)
diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
index c76274e..b209dee 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
@@ -386,6 +386,22 @@
);
}
+ public GroupByQuery withLimitSpec(final LimitSpec limitSpec)
+ {
+ return new GroupByQuery(
+ getDataSource(),
+ getQuerySegmentSpec(),
+ getDimFilter(),
+ getGranularity(),
+ getDimensions(),
+ getAggregatorSpecs(),
+ getPostAggregatorSpecs(),
+ getHavingSpec(),
+ limitSpec,
+ getContext()
+ );
+ }
+
public static class Builder
{
private DataSource dataSource;
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
index 9707fad..7464f5e 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
@@ -758,6 +758,22 @@
return metricDesc != null ? metricDesc.getType() : null;
}
+ public Class getMetricClass(String metric)
+ {
+ MetricDesc metricDesc = metricDescs.get(metric);
+ switch (metricDesc.getCapabilities().getType()) {
+ case COMPLEX:
+ return ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz();
+ case FLOAT:
+ return Float.TYPE;
+ case LONG:
+ return Long.TYPE;
+ case STRING:
+ return String.class;
+ }
+ return null;
+ }
+
public Interval getInterval()
{
return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java
index ca2b22d..7df01f6 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java
@@ -21,7 +21,6 @@
import com.google.common.base.Function;
import com.google.common.base.Predicate;
-import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
@@ -52,8 +51,6 @@
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.filter.BooleanValueMatcher;
-import io.druid.segment.serde.ComplexMetricSerde;
-import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -70,7 +67,6 @@
*/
public class IncrementalIndexStorageAdapter implements StorageAdapter
{
- private static final Splitter SPLITTER = Splitter.on(",");
private static final NullDimensionSelector NULL_DIMENSION_SELECTOR = new NullDimensionSelector();
private final IncrementalIndex index;
@@ -307,7 +303,7 @@
}
if (Thread.interrupted()) {
- throw new QueryInterruptedException( new InterruptedException());
+ throw new QueryInterruptedException(new InterruptedException());
}
boolean foundMatched = false;
@@ -532,14 +528,13 @@
final Integer metricIndexInt = index.getMetricIndex(column);
if (metricIndexInt != null) {
final int metricIndex = metricIndexInt;
-
- final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(column));
+ final Class classOfObject = index.getMetricClass(column);
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
- return serde.getObjectStrategy().getClazz();
+ return classOfObject;
}
@Override
diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
index c3b4b3d..6198951 100644
--- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
+++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
@@ -21,6 +21,7 @@
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
@@ -100,6 +101,7 @@
public static final QueryGranularity dayGran = QueryGranularities.DAY;
public static final QueryGranularity allGran = QueryGranularities.ALL;
+ public static final String timeDimension = "__time";
public static final String marketDimension = "market";
public static final String qualityDimension = "quality";
public static final String placementDimension = "placement";
@@ -119,9 +121,9 @@
public static String dependentPostAggMetric = "dependentPostAgg";
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
- public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
- public static final LongSumAggregatorFactory __timeLongSum = new LongSumAggregatorFactory("sumtime", "__time");
- public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
+ public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", indexMetric);
+ public static final LongSumAggregatorFactory __timeLongSum = new LongSumAggregatorFactory("sumtime", timeDimension);
+ public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", indexMetric);
public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }";
public static final String JS_RESET_0 = "function reset() { return 0; }";
public static final JavaScriptAggregatorFactory jsIndexSumIfPlacementishA = new JavaScriptAggregatorFactory(
@@ -515,4 +517,13 @@
}
};
}
+
+ public static Map<String, Object> of(Object... keyvalues)
+ {
+ ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+ for (int i = 0; i < keyvalues.length; i += 2) {
+ builder.put(String.valueOf(keyvalues[i]), keyvalues[i + 1]);
+ }
+ return builder.build();
+ }
}
diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java
index a7bc025..20ae203 100644
--- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java
@@ -695,7 +695,7 @@
if (acHolder.getEvent().get(ex.getKey()) instanceof Double) {
actVal = ((Double) actVal).floatValue();
}
- Assert.assertEquals(ex.getValue(), actVal);
+ Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal);
}
}