Add vectorization for druid-histogram extension (#10304)
* First draft
* Remove redundant code from FixedBucketsHistogramAggregator classes
* Add test cases for new classes
* Fix tests in sql compatible mode
* Typo fix
* Fix comment
* Add spelling
* Vectorize only for supported types
* Rename internal aggregator files
* Fix tests
diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md
index 12d51c9..a937d84 100644
--- a/docs/querying/query-context.md
+++ b/docs/querying/query-context.md
@@ -90,7 +90,8 @@
include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not".
- All filters in filtered aggregators must offer vectorized row-matchers.
- All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin",
- "longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", and "filtered".
+ "longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", "filtered", "approxHistogram",
+ "approxHistogramFold", and "fixedBucketsHistogram" (with numerical input).
- No virtual columns.
- For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs).
- For GroupBy: No multi-value dimensions.
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java
index c961f50..cde33fd 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java
@@ -32,10 +32,14 @@
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@@ -104,6 +108,24 @@
}
@Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory)
+ {
+ return new ApproximateHistogramVectorAggregator(
+ metricVectorFactory.makeValueSelector(fieldName),
+ resolution
+ );
+ }
+
+ @Override
+ public boolean canVectorize(ColumnInspector columnInspector)
+ {
+ /* skip vectorization for string types which may be parseable to numbers. There is no vector equivalent of
+ string value selector*/
+ ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+ return (capabilities != null) && capabilities.getType().isNumeric();
+ }
+
+ @Override
public Comparator getComparator()
{
return ApproximateHistogramAggregator.COMPARATOR;
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java
index 4390a63..a33d27b 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java
@@ -28,54 +28,30 @@
public class ApproximateHistogramBufferAggregator implements BufferAggregator
{
private final BaseFloatColumnValueSelector selector;
- private final int resolution;
+ private final ApproximateHistogramBufferAggregatorHelper innerAggregator;
public ApproximateHistogramBufferAggregator(BaseFloatColumnValueSelector selector, int resolution)
{
this.selector = selector;
- this.resolution = resolution;
+ this.innerAggregator = new ApproximateHistogramBufferAggregatorHelper(resolution);
}
@Override
public void init(ByteBuffer buf, int position)
{
- ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
-
- mutationBuffer.putInt(resolution);
- mutationBuffer.putInt(0); //initial binCount
- for (int i = 0; i < resolution; ++i) {
- mutationBuffer.putFloat(0f);
- }
- for (int i = 0; i < resolution; ++i) {
- mutationBuffer.putLong(0L);
- }
-
- // min
- mutationBuffer.putFloat(Float.POSITIVE_INFINITY);
- // max
- mutationBuffer.putFloat(Float.NEGATIVE_INFINITY);
+ innerAggregator.init(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
- ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
-
- ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
- h0.offer(selector.getFloat());
-
- mutationBuffer.position(position);
- h0.toBytesDense(mutationBuffer);
+ innerAggregator.aggregate(buf, position, selector.getFloat());
}
@Override
public Object get(ByteBuffer buf, int position)
{
- ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
- return ApproximateHistogram.fromBytes(mutationBuffer);
+ return innerAggregator.get(buf, position);
}
@Override
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregatorHelper.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregatorHelper.java
new file mode 100644
index 0000000..b597c4d
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregatorHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A helper class used by {@link ApproximateHistogramBufferAggregator} and {@link ApproximateHistogramVectorAggregator}
+ * for aggregation operations on byte buffers. Getting the object from value selectors is outside this class.
+ */
+final class ApproximateHistogramBufferAggregatorHelper
+{
+ private final int resolution;
+
+ public ApproximateHistogramBufferAggregatorHelper(int resolution)
+ {
+ this.resolution = resolution;
+ }
+
+ public void init(final ByteBuffer buf, final int position)
+ {
+ ApproximateHistogram histogram = new ApproximateHistogram(resolution);
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ histogram.toBytesDense(mutationBuffer);
+ }
+
+ public ApproximateHistogram get(final ByteBuffer buf, final int position)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ return ApproximateHistogram.fromBytesDense(mutationBuffer);
+ }
+
+ public void put(final ByteBuffer buf, final int position, final ApproximateHistogram histogram)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ histogram.toBytesDense(mutationBuffer);
+ }
+
+ public void aggregate(final ByteBuffer buf, final int position, final float value)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+
+ ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
+ h0.offer(value);
+
+ mutationBuffer.position(position);
+ h0.toBytesDense(mutationBuffer);
+ }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java
index becbfe1..7a8a80c 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java
@@ -27,9 +27,15 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
import javax.annotation.Nullable;
import java.util.Objects;
@@ -94,9 +100,32 @@
}
@Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory)
+ {
+ VectorObjectSelector selector = metricVectorFactory.makeObjectSelector(fieldName);
+ return new ApproximateHistogramFoldingVectorAggregator(selector, resolution, lowerLimit, upperLimit);
+ }
+
+
+ @Override
+ public boolean canVectorize(ColumnInspector columnInspector)
+ {
+ ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+ return (capabilities != null) && (capabilities.getType() == ValueType.COMPLEX);
+ }
+
+ @Override
public AggregatorFactory getCombiningFactory()
{
- return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit, finalizeAsBase64Binary);
+ return new ApproximateHistogramFoldingAggregatorFactory(
+ name,
+ name,
+ resolution,
+ numBuckets,
+ lowerLimit,
+ upperLimit,
+ finalizeAsBase64Binary
+ );
}
@Override
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java
index bd9d0cd..811e2bf 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java
@@ -28,12 +28,7 @@
public class ApproximateHistogramFoldingBufferAggregator implements BufferAggregator
{
private final BaseObjectColumnValueSelector<ApproximateHistogram> selector;
- private final int resolution;
- private final float upperLimit;
- private final float lowerLimit;
-
- private float[] tmpBufferP;
- private long[] tmpBufferB;
+ private final ApproximateHistogramFoldingBufferAggregatorHelper innerAggregator;
public ApproximateHistogramFoldingBufferAggregator(
BaseObjectColumnValueSelector<ApproximateHistogram> selector,
@@ -43,50 +38,26 @@
)
{
this.selector = selector;
- this.resolution = resolution;
- this.lowerLimit = lowerLimit;
- this.upperLimit = upperLimit;
-
- tmpBufferP = new float[resolution];
- tmpBufferB = new long[resolution];
+ this.innerAggregator = new ApproximateHistogramFoldingBufferAggregatorHelper(resolution, lowerLimit, upperLimit);
}
@Override
public void init(ByteBuffer buf, int position)
{
- ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit);
-
- ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
- // use dense storage for aggregation
- h.toBytesDense(mutationBuffer);
+ innerAggregator.init(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
ApproximateHistogram hNext = selector.getObject();
- if (hNext == null) {
- return;
- }
- ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
-
- ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
- h0.setLowerLimit(lowerLimit);
- h0.setUpperLimit(upperLimit);
- h0.foldFast(hNext, tmpBufferP, tmpBufferB);
-
- mutationBuffer.position(position);
- h0.toBytesDense(mutationBuffer);
+ innerAggregator.aggregate(buf, position, hNext);
}
@Override
public Object get(ByteBuffer buf, int position)
{
- ByteBuffer mutationBuffer = buf.asReadOnlyBuffer();
- mutationBuffer.position(position);
- return ApproximateHistogram.fromBytesDense(mutationBuffer);
+ return innerAggregator.get(buf, position);
}
@Override
@@ -106,6 +77,7 @@
{
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getDouble()");
}
+
@Override
public void close()
{
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregatorHelper.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregatorHelper.java
new file mode 100644
index 0000000..64c6a4c
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregatorHelper.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * A helper class used by {@link ApproximateHistogramFoldingBufferAggregator} and
+ * {@link ApproximateHistogramFoldingVectorAggregator} for aggregation operations on byte buffers.
+ * Getting the object from value selectors is outside this class.
+ */
+final class ApproximateHistogramFoldingBufferAggregatorHelper
+{
+ private final int resolution;
+ private final float upperLimit;
+ private final float lowerLimit;
+
+ private float[] tmpBufferA;
+ private long[] tmpBufferB;
+
+ public ApproximateHistogramFoldingBufferAggregatorHelper(
+ int resolution,
+ float lowerLimit,
+ float upperLimit
+ )
+ {
+ this.resolution = resolution;
+ this.lowerLimit = lowerLimit;
+ this.upperLimit = upperLimit;
+
+ tmpBufferA = new float[resolution];
+ tmpBufferB = new long[resolution];
+ }
+
+ public void init(ByteBuffer buf, int position)
+ {
+ ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit);
+
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ // use dense storage for aggregation
+ h.toBytesDense(mutationBuffer);
+ }
+
+ public void aggregate(ByteBuffer buf, int position, @Nullable ApproximateHistogram hNext)
+ {
+ if (hNext == null) {
+ return;
+ }
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+
+ ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
+ foldFast(h0, hNext);
+
+ mutationBuffer.position(position);
+ h0.toBytesDense(mutationBuffer);
+ }
+
+ public void foldFast(ApproximateHistogram left, ApproximateHistogram right)
+ {
+ //These have to set in every call since limits are transient and lost during serialization-deserialization
+ left.setLowerLimit(lowerLimit);
+ left.setUpperLimit(upperLimit);
+ left.foldFast(right, tmpBufferA, tmpBufferB);
+ }
+
+ public ApproximateHistogram get(ByteBuffer buf, int position)
+ {
+ ByteBuffer mutationBuffer = buf.asReadOnlyBuffer();
+ mutationBuffer.position(position);
+ return ApproximateHistogram.fromBytesDense(mutationBuffer);
+ }
+
+ public void put(ByteBuffer buf, int position, ApproximateHistogram histogram)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ histogram.toBytesDense(mutationBuffer);
+ }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregator.java
new file mode 100644
index 0000000..69c6e59
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class ApproximateHistogramFoldingVectorAggregator implements VectorAggregator
+{
+ private final ApproximateHistogramFoldingBufferAggregatorHelper innerAggregator;
+ private final VectorObjectSelector selector;
+
+ public ApproximateHistogramFoldingVectorAggregator(
+ final VectorObjectSelector selector,
+ final int resolution,
+ final float lowerLimit,
+ final float upperLimit
+ )
+ {
+ this.selector = selector;
+ this.innerAggregator = new ApproximateHistogramFoldingBufferAggregatorHelper(resolution, lowerLimit, upperLimit);
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ innerAggregator.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ Object[] vector = selector.getObjectVector();
+ ApproximateHistogram histogram = innerAggregator.get(buf, position);
+ for (int i = startRow; i < endRow; i++) {
+ ApproximateHistogram other = (ApproximateHistogram) vector[i];
+ if (null != other) {
+ innerAggregator.foldFast(histogram, other);
+ }
+ }
+ innerAggregator.put(buf, position, histogram);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+ {
+ Object[] vector = selector.getObjectVector();
+ for (int i = 0; i < numRows; i++) {
+ ApproximateHistogram other = (ApproximateHistogram) vector[null != rows ? rows[i] : i];
+ if (null == other) {
+ continue;
+ }
+ int position = positions[i] + positionOffset;
+ innerAggregator.aggregate(buf, position, other);
+ }
+ }
+
+ @Nullable
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return innerAggregator.get(buf, position);
+ }
+
+ @Override
+ public void close()
+ {
+ // Nothing to close
+ }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregator.java
new file mode 100644
index 0000000..728271f
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class ApproximateHistogramVectorAggregator implements VectorAggregator
+{
+
+ private final VectorValueSelector selector;
+ private final ApproximateHistogramBufferAggregatorHelper innerAggregator;
+
+ public ApproximateHistogramVectorAggregator(
+ VectorValueSelector selector,
+ int resolution
+ )
+ {
+ this.selector = selector;
+ this.innerAggregator = new ApproximateHistogramBufferAggregatorHelper(resolution);
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ innerAggregator.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+ {
+ final boolean[] isValueNull = selector.getNullVector();
+ final float[] vector = selector.getFloatVector();
+ ApproximateHistogram histogram = innerAggregator.get(buf, position);
+
+ for (int i = startRow; i < endRow; i++) {
+ if (isValueNull != null && isValueNull[i]) {
+ continue;
+ }
+ histogram.offer(vector[i]);
+ }
+ innerAggregator.put(buf, position, histogram);
+ }
+
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return innerAggregator.get(buf, position);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+ {
+ final float[] vector = selector.getFloatVector();
+ final boolean[] isValueNull = selector.getNullVector();
+
+ for (int i = 0; i < numRows; i++) {
+ if (isValueNull != null && isValueNull[i]) {
+ continue;
+ }
+ final int position = positions[i] + positionOffset;
+ innerAggregator.aggregate(buf, position, vector[rows != null ? rows[i] : i]);
+ }
+ }
+
+
+ @Override
+ public void close()
+ {
+ // no resources to cleanup
+ }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java
index f408dbf..e10203c 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java
@@ -25,9 +25,11 @@
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -432,6 +434,34 @@
}
/**
+ * Merge another datapoint into this one. The other datapoint could be
+ * - base64 encoded string of {@code FixedBucketsHistogram}
+ * - {@code FixedBucketsHistogram} object
+ * - Numeric value
+ *
+ * @param val
+ */
+ @VisibleForTesting
+ public void combine(@Nullable Object val)
+ {
+ if (val == null) {
+ if (NullHandling.replaceWithDefault()) {
+ add(NullHandling.defaultDoubleValue());
+ } else {
+ incrementMissing();
+ }
+ } else if (val instanceof String) {
+ combineHistogram(fromBase64((String) val));
+ } else if (val instanceof FixedBucketsHistogram) {
+ combineHistogram((FixedBucketsHistogram) val);
+ } else if (val instanceof Number) {
+ add(((Number) val).doubleValue());
+ } else {
+ throw new ISE("Unknown class for object: " + val.getClass());
+ }
+ }
+
+ /**
* Merge another histogram into this one. Only the state of this histogram is updated.
*
* If the two histograms have identical buckets, a simpler algorithm is used.
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java
index 2f5b565..eed1c9e 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java
@@ -20,8 +20,6 @@
package org.apache.druid.query.aggregation.histogram;
import com.google.common.primitives.Longs;
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
@@ -66,22 +64,7 @@
public void aggregate()
{
Object val = selector.getObject();
-
- if (val == null) {
- if (NullHandling.replaceWithDefault()) {
- histogram.add(NullHandling.defaultDoubleValue());
- } else {
- histogram.incrementMissing();
- }
- } else if (val instanceof String) {
- histogram.combineHistogram(FixedBucketsHistogram.fromBase64((String) val));
- } else if (val instanceof FixedBucketsHistogram) {
- histogram.combineHistogram((FixedBucketsHistogram) val);
- } else if (val instanceof Number) {
- histogram.add(((Number) val).doubleValue());
- } else {
- throw new ISE("Unknown class for object: " + val.getClass());
- }
+ histogram.combine(val);
}
@Nullable
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java
index 7cd8de8..705c420 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
@@ -29,10 +30,14 @@
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -101,6 +106,34 @@
}
@Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
+ {
+ ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
+ if (null == capabilities) {
+ throw new IAE("could not find the column type for column %s", fieldName);
+ }
+ ValueType type = capabilities.getType();
+ if (type.isNumeric()) {
+ return new FixedBucketsHistogramVectorAggregator(
+ columnSelectorFactory.makeValueSelector(fieldName),
+ lowerLimit,
+ upperLimit,
+ numBuckets,
+ outlierHandlingMode
+ );
+ } else {
+ throw new IAE("cannot vectorize fixed bucket histogram aggregation for type %s", type);
+ }
+ }
+
+ @Override
+ public boolean canVectorize(ColumnInspector columnInspector)
+ {
+ ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+ return (capabilities != null) && capabilities.getType().isNumeric();
+ }
+
+ @Override
public Comparator getComparator()
{
return FixedBucketsHistogramAggregator.COMPARATOR;
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java
index 0e894d7..1ecebb5 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java
@@ -19,7 +19,6 @@
package org.apache.druid.query.aggregation.histogram;
-import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
@@ -29,8 +28,7 @@
public class FixedBucketsHistogramBufferAggregator implements BufferAggregator
{
private final BaseObjectColumnValueSelector selector;
-
- private FixedBucketsHistogram histogram;
+ private final FixedBucketsHistogramBufferAggregatorHelper innerAggregator;
public FixedBucketsHistogramBufferAggregator(
BaseObjectColumnValueSelector selector,
@@ -41,7 +39,7 @@
)
{
this.selector = selector;
- this.histogram = new FixedBucketsHistogram(
+ this.innerAggregator = new FixedBucketsHistogramBufferAggregatorHelper(
lowerLimit,
upperLimit,
numBuckets,
@@ -52,45 +50,20 @@
@Override
public void init(ByteBuffer buf, int position)
{
- ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
- mutationBuffer.put(histogram.toBytesFull(false));
+ innerAggregator.init(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
- ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
-
- FixedBucketsHistogram h0 = FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
-
Object val = selector.getObject();
- if (val == null) {
- if (NullHandling.replaceWithDefault()) {
- h0.incrementMissing();
- } else {
- h0.add(NullHandling.defaultDoubleValue());
- }
- } else if (val instanceof String) {
- h0.combineHistogram(FixedBucketsHistogram.fromBase64((String) val));
- } else if (val instanceof FixedBucketsHistogram) {
- h0.combineHistogram((FixedBucketsHistogram) val);
- } else {
- Double x = ((Number) val).doubleValue();
- h0.add(x);
- }
-
- mutationBuffer.position(position);
- mutationBuffer.put(h0.toBytesFull(false));
+ innerAggregator.aggregate(buf, position, val);
}
@Override
public Object get(ByteBuffer buf, int position)
{
- ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
- return FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
+ return innerAggregator.get(buf, position);
}
@Override
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregatorHelper.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregatorHelper.java
new file mode 100644
index 0000000..8844e25
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregatorHelper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * A helper class used by {@link FixedBucketsHistogramBufferAggregator} and
+ * {@link FixedBucketsHistogramVectorAggregator} for aggregation operations on byte buffers.
+ * Getting the object from value selectors is outside this class.
+ */
+final class FixedBucketsHistogramBufferAggregatorHelper
+{
+ private final double lowerLimit;
+ private final double upperLimit;
+ private final int numBuckets;
+ private final FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode;
+
+ public FixedBucketsHistogramBufferAggregatorHelper(
+ double lowerLimit,
+ double upperLimit,
+ int numBuckets,
+ FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode
+ )
+ {
+ this.lowerLimit = lowerLimit;
+ this.upperLimit = upperLimit;
+ this.numBuckets = numBuckets;
+ this.outlierHandlingMode = outlierHandlingMode;
+ }
+
+ public void init(ByteBuffer buf, int position)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ FixedBucketsHistogram histogram = new FixedBucketsHistogram(
+ lowerLimit,
+ upperLimit,
+ numBuckets,
+ outlierHandlingMode
+ );
+ mutationBuffer.put(histogram.toBytesFull(false));
+ }
+
+ public void aggregate(ByteBuffer buf, int position, @Nullable Object val)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+
+ FixedBucketsHistogram h0 = FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
+ h0.combine(val);
+
+ mutationBuffer.position(position);
+ mutationBuffer.put(h0.toBytesFull(false));
+ }
+
+ public FixedBucketsHistogram get(ByteBuffer buf, int position)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ return FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
+ }
+
+ public void put(ByteBuffer buf, int position, FixedBucketsHistogram histogram)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ mutationBuffer.put(histogram.toBytesFull(false));
+ }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregator.java
new file mode 100644
index 0000000..8bfbe6f
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class FixedBucketsHistogramVectorAggregator implements VectorAggregator
+{
+ private final VectorValueSelector selector;
+ private final FixedBucketsHistogramBufferAggregatorHelper innerAggregator;
+
+ public FixedBucketsHistogramVectorAggregator(
+ VectorValueSelector selector,
+ double lowerLimit,
+ double upperLimit,
+ int numBuckets,
+ FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode
+ )
+ {
+ this.selector = selector;
+ this.innerAggregator = new FixedBucketsHistogramBufferAggregatorHelper(
+ lowerLimit,
+ upperLimit,
+ numBuckets,
+ outlierHandlingMode
+ );
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ innerAggregator.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ double[] vector = selector.getDoubleVector();
+ boolean[] isNull = selector.getNullVector();
+ FixedBucketsHistogram histogram = innerAggregator.get(buf, position);
+ for (int i = startRow; i < endRow; i++) {
+ histogram.combine(toObject(vector, isNull, i));
+ }
+ innerAggregator.put(buf, position, histogram);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+ {
+ double[] vector = selector.getDoubleVector();
+ boolean[] isNull = selector.getNullVector();
+ for (int i = 0; i < numRows; i++) {
+ int position = positions[i] + positionOffset;
+ int index = rows != null ? rows[i] : i;
+ Double val = toObject(vector, isNull, index);
+ innerAggregator.aggregate(buf, position, val);
+ }
+ }
+
+ @Nullable
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return innerAggregator.get(buf, position);
+ }
+
+ @Override
+ public void close()
+ {
+ // Nothing to close
+ }
+
+ @Nullable
+ private Double toObject(double[] vector, @Nullable boolean[] isNull, int index)
+ {
+ return (isNull != null && isNull[index]) ? null : vector[index];
+ }
+}
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregatorTest.java
new file mode 100644
index 0000000..ee7283b
--- /dev/null
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregatorTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+
+public class ApproximateHistogramFoldingVectorAggregatorTest
+{
+ private static final float[] FLOATS = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45};
+ private VectorColumnSelectorFactory vectorColumnSelectorFactory;
+ private ApproximateHistogram h1;
+ private ApproximateHistogram h2;
+
+ @Before
+ public void setup()
+ {
+
+ h1 = new ApproximateHistogram(5);
+ h2 = new ApproximateHistogram(5);
+
+ for (int i = 0; i < 5; ++i) {
+ h1.offer(FLOATS[i]);
+ }
+ for (int i = 5; i < FLOATS.length; ++i) {
+ h2.offer(FLOATS[i]);
+ }
+
+ VectorObjectSelector vectorObjectSelector = createMock(VectorObjectSelector.class);
+ expect(vectorObjectSelector.getObjectVector()).andReturn(new Object[]{h1, null, h2, null}).anyTimes();
+
+ EasyMock.replay(vectorObjectSelector);
+
+ vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class);
+ expect(vectorColumnSelectorFactory.makeObjectSelector("field"))
+ .andReturn(vectorObjectSelector).anyTimes();
+ expect(vectorColumnSelectorFactory.getColumnCapabilities("field")).andReturn(
+ new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX)
+ );
+ expect(vectorColumnSelectorFactory.getColumnCapabilities("string_field")).andReturn(
+ new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+ );
+ expect(vectorColumnSelectorFactory.getColumnCapabilities("double_field")).andReturn(
+ new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+ );
+ EasyMock.replay(vectorColumnSelectorFactory);
+ }
+
+ @Test
+ public void doNotVectorizedNonComplexTypes()
+ {
+ ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory("string_field");
+ Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
+
+ factory = buildHistogramFactory("double_field");
+ Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
+ }
+
+ @Test
+ public void testAggregateSinglePosition()
+ {
+ ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+ Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory));
+ VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+ vectorAggregator.init(byteBuffer, 0);
+ vectorAggregator.aggregate(byteBuffer, 0, 0, 4);
+ ApproximateHistogram h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+
+ Assert.assertArrayEquals(new float[]{19.6f, 45.0f}, h.positions(), 0.1f);
+ Assert.assertArrayEquals(new long[]{9, 1}, h.bins());
+ Assert.assertEquals(10, h.count());
+ Assert.assertEquals(2.0f, h.min(), 0.1f);
+ Assert.assertEquals(45.0f, h.max(), 0.1f);
+ }
+
+ @Test
+ public void testAggregateMultiPositions()
+ {
+ ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSize() * 2);
+ int[] positions = new int[]{0, factory.getMaxIntermediateSize()};
+ VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+ vectorAggregator.init(byteBuffer, 0);
+ vectorAggregator.init(byteBuffer, positions[1]);
+
+ vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0);
+ vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{1, 2}, 0); // indirection
+ ApproximateHistogram actualH1 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+ ApproximateHistogram actualH2 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, positions[1]);
+
+ Assert.assertEquals(actualH1, h1);
+ Assert.assertEquals(actualH2, h2);
+
+ }
+
+ private ApproximateHistogramFoldingAggregatorFactory buildHistogramFactory()
+ {
+ return buildHistogramFactory("field");
+ }
+
+ private ApproximateHistogramFoldingAggregatorFactory buildHistogramFactory(String fieldName)
+ {
+ return new ApproximateHistogramFoldingAggregatorFactory(
+ "approximateHistoFold",
+ fieldName,
+ 5,
+ 5,
+ 0f,
+ 50.0f,
+ false
+ );
+ }
+}
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregatorTest.java
new file mode 100644
index 0000000..9958194
--- /dev/null
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregatorTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+
+public class ApproximateHistogramVectorAggregatorTest
+{
+ private static final float[] FLOATS = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 33}; // Last value is never included
+ private static final boolean[] NULL_VECTOR =
+ {false, false, false, false, false, false, false, false, false, false, true};
+ private VectorColumnSelectorFactory vectorColumnSelectorFactory;
+
+ @Before
+ public void setup()
+ {
+ NullHandling.initializeForTests();
+ VectorValueSelector vectorValueSelector_1 = createMock(VectorValueSelector.class);
+ expect(vectorValueSelector_1.getFloatVector()).andReturn(FLOATS).anyTimes();
+ expect(vectorValueSelector_1.getNullVector()).andReturn(NULL_VECTOR).anyTimes();
+
+ VectorValueSelector vectorValueSelector_2 = createMock(VectorValueSelector.class);
+ expect(vectorValueSelector_2.getFloatVector()).andReturn(FLOATS).anyTimes();
+ expect(vectorValueSelector_2.getNullVector()).andReturn(null).anyTimes();
+
+ EasyMock.replay(vectorValueSelector_1);
+ EasyMock.replay(vectorValueSelector_2);
+
+ ColumnCapabilities columnCapabilities
+ = ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE);
+ vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class);
+ expect(vectorColumnSelectorFactory.getColumnCapabilities("field_1")).andReturn(columnCapabilities).anyTimes();
+ expect(vectorColumnSelectorFactory.makeValueSelector("field_1"))
+ .andReturn(vectorValueSelector_1).anyTimes();
+ expect(vectorColumnSelectorFactory.getColumnCapabilities("field_2")).andReturn(columnCapabilities).anyTimes();
+ expect(vectorColumnSelectorFactory.makeValueSelector("field_2"))
+ .andReturn(vectorValueSelector_2).anyTimes();
+ expect(vectorColumnSelectorFactory.getColumnCapabilities("string_field")).andReturn(
+ new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+ );
+ expect(vectorColumnSelectorFactory.getColumnCapabilities("complex_field")).andReturn(
+ new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX)
+ );
+ EasyMock.replay(vectorColumnSelectorFactory);
+ }
+
+ @Test
+ public void doNotVectorizedNonNumericTypes()
+ {
+ ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("string_field");
+ Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
+
+ factory = buildHistogramAggFactory("complex_field");
+ Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
+ }
+
+ @Test
+ public void testAggregateSinglePosition()
+ {
+ ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("field_1");
+ ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls());
+ Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory));
+ VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+ vectorAggregator.init(byteBuffer, 0);
+ vectorAggregator.aggregate(byteBuffer, 0, 0, 11);
+ ApproximateHistogram h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+
+ // (2, 1), (9.5, 2), (19.33, 3), (32.67, 3), (45, 1)
+ Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions(), 0.1f);
+ Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h.bins());
+
+ factory = buildHistogramAggFactory("field_2");
+ vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+ vectorAggregator.init(byteBuffer, 0);
+ vectorAggregator.aggregate(byteBuffer, 0, 0, 10);
+ h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+
+ Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions(), 0.1f);
+ Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h.bins());
+
+ }
+
+ @Test
+ public void testAggregateMultiPositions()
+ {
+ ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("field_2");
+ int size = factory.getMaxIntermediateSize();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(size * 2);
+ VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+ int[] positions = new int[]{0, size};
+ vectorAggregator.init(byteBuffer, positions[0]);
+ vectorAggregator.init(byteBuffer, positions[1]);
+ vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0);
+ // Put rest of 10 elements using the access indirection. Second vector gets the same element always
+ for (int i = 1; i < 10; i++) {
+ vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{i, 1}, 0);
+ }
+
+ ApproximateHistogram h0 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+ Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h0.positions(), 0.1f);
+ Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h0.bins());
+
+ ApproximateHistogram h2 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, size);
+ Assert.assertArrayEquals(new float[]{19}, h2.positions(), 0.1f);
+ Assert.assertArrayEquals(new long[]{10}, h2.bins());
+ }
+
+ private ApproximateHistogramAggregatorFactory buildHistogramAggFactory(String fieldName)
+ {
+ return new ApproximateHistogramAggregatorFactory(
+ "approxHisto",
+ fieldName,
+ 5,
+ 5,
+ 0.0f,
+ 45.0f,
+ false
+ );
+ }
+}
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java
index bf92974..630d6e8 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java
@@ -1243,6 +1243,94 @@
}
@Test
+ public void testCombineBase64()
+ {
+ FixedBucketsHistogram h = buildHistogram(
+ 0,
+ 20,
+ 5,
+ FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+ new float[]{1, 2, 7, 12, 18}
+ );
+
+ FixedBucketsHistogram h2 = buildHistogram(
+ 0,
+ 20,
+ 7,
+ FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+ new float[]{3, 8, 9, 19, 99, -50}
+ );
+
+ h.combine(h2.toBase64());
+ Assert.assertEquals(5, h.getNumBuckets());
+ Assert.assertEquals(4.0, h.getBucketSize(), 0.01);
+ Assert.assertEquals(0, h.getLowerLimit(), 0.01);
+ Assert.assertEquals(20, h.getUpperLimit(), 0.01);
+ Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
+ Assert.assertArrayEquals(new long[]{2, 3, 1, 1, 2}, h.getHistogram());
+ Assert.assertEquals(9, h.getCount());
+ Assert.assertEquals(1, h.getMin(), 0.01);
+ Assert.assertEquals(18, h.getMax(), 0.01);
+ Assert.assertEquals(0, h.getMissingValueCount());
+ Assert.assertEquals(1, h.getLowerOutlierCount());
+ Assert.assertEquals(1, h.getUpperOutlierCount());
+ }
+
+ @Test
+ public void testCombineAnotherHistogram()
+ {
+ FixedBucketsHistogram h = buildHistogram(
+ 0,
+ 20,
+ 5,
+ FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+ new float[]{1, 2, 7, 12, 18}
+ );
+
+ FixedBucketsHistogram h2 = buildHistogram(
+ 0,
+ 20,
+ 7,
+ FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+ new float[]{3, 8, 9, 19, 99, -50}
+ );
+
+ h.combine(h2);
+ Assert.assertEquals(5, h.getNumBuckets());
+ Assert.assertEquals(4.0, h.getBucketSize(), 0.01);
+ Assert.assertEquals(0, h.getLowerLimit(), 0.01);
+ Assert.assertEquals(20, h.getUpperLimit(), 0.01);
+ Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
+ Assert.assertArrayEquals(new long[]{2, 3, 1, 1, 2}, h.getHistogram());
+ Assert.assertEquals(9, h.getCount());
+ Assert.assertEquals(1, h.getMin(), 0.01);
+ Assert.assertEquals(18, h.getMax(), 0.01);
+ Assert.assertEquals(0, h.getMissingValueCount());
+ Assert.assertEquals(1, h.getLowerOutlierCount());
+ Assert.assertEquals(1, h.getUpperOutlierCount());
+ }
+
+ @Test
+ public void testCombineNumber()
+ {
+ FixedBucketsHistogram h = new FixedBucketsHistogram(
+ 0,
+ 200,
+ 200,
+ FixedBucketsHistogram.OutlierHandlingMode.IGNORE
+ );
+
+ h.combine(10);
+ h.combine(20);
+
+ Assert.assertEquals(0, h.getUpperOutlierCount());
+ Assert.assertEquals(0, h.getLowerOutlierCount());
+ Assert.assertEquals(2, h.getCount());
+ Assert.assertEquals(10, h.getMin(), 0.01);
+ Assert.assertEquals(20, h.getMax(), 0.01);
+ }
+
+ @Test
public void testMissing()
{
FixedBucketsHistogram h = new FixedBucketsHistogram(
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregatorTest.java
new file mode 100644
index 0000000..78f74ed
--- /dev/null
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregatorTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+
+public class FixedBucketsHistogramVectorAggregatorTest
+{
+ private static final double[] DOUBLES = {1.0, 12.0, 3.0, 14.0, 15.0, 16.0};
+ private static final boolean[] NULL_VECTOR = {false, false, false, false, true, false};
+ private VectorColumnSelectorFactory vectorColumnSelectorFactory;
+
+ @Before
+ public void setup()
+ {
+ NullHandling.initializeForTests();
+ VectorValueSelector vectorValueSelector_1 = createMock(VectorValueSelector.class);
+ expect(vectorValueSelector_1.getDoubleVector()).andReturn(DOUBLES).anyTimes();
+ expect(vectorValueSelector_1.getNullVector()).andReturn(NULL_VECTOR).anyTimes();
+
+ VectorValueSelector vectorValueSelector_2 = createMock(VectorValueSelector.class);
+ expect(vectorValueSelector_2.getDoubleVector()).andReturn(DOUBLES).anyTimes();
+ expect(vectorValueSelector_2.getNullVector()).andReturn(null).anyTimes();
+
+ EasyMock.replay(vectorValueSelector_1);
+ EasyMock.replay(vectorValueSelector_2);
+
+ ColumnCapabilities columnCapabilities
+ = ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE);
+ vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class);
+ expect(vectorColumnSelectorFactory.getColumnCapabilities("field_1")).andReturn(columnCapabilities).anyTimes();
+ expect(vectorColumnSelectorFactory.makeValueSelector("field_1"))
+ .andReturn(vectorValueSelector_1).anyTimes();
+ expect(vectorColumnSelectorFactory.getColumnCapabilities("field_2")).andReturn(columnCapabilities).anyTimes();
+ expect(vectorColumnSelectorFactory.makeValueSelector("field_2"))
+ .andReturn(vectorValueSelector_2).anyTimes();
+ EasyMock.replay(vectorColumnSelectorFactory);
+ }
+
+ @Test
+ public void testAggregateSinglePosition()
+ {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(FixedBucketsHistogram.getFullStorageSize(2));
+ FixedBucketsHistogramAggregatorFactory factory = buildHistogramAggFactory("field_1");
+ Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory));
+ VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+ vectorAggregator.init(byteBuffer, 0);
+ vectorAggregator.aggregate(byteBuffer, 0, 0, 6);
+ FixedBucketsHistogram h = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
+
+ Assert.assertEquals(2, h.getNumBuckets());
+ Assert.assertEquals(10.0, h.getBucketSize(), 0.01);
+ Assert.assertEquals(1, h.getLowerLimit(), 0.01);
+ Assert.assertEquals(21, h.getUpperLimit(), 0.01);
+ Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
+ Assert.assertArrayEquals(new long[]{2, 3}, h.getHistogram());
+ Assert.assertEquals(5, h.getCount());
+ Assert.assertEquals(1.0, h.getMin(), 0.01);
+ Assert.assertEquals(16.0, h.getMax(), 0.01);
+ // Default value of null is 0 which is an outlier.
+ Assert.assertEquals(NullHandling.replaceWithDefault() ? 0 : 1, h.getMissingValueCount());
+ Assert.assertEquals(NullHandling.replaceWithDefault() ? 1 : 0, h.getLowerOutlierCount());
+ Assert.assertEquals(0, h.getUpperOutlierCount());
+
+ factory = buildHistogramAggFactory("field_2");
+ vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+ vectorAggregator.init(byteBuffer, 0);
+ vectorAggregator.aggregate(byteBuffer, 0, 0, 6);
+ h = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
+
+ Assert.assertEquals(2, h.getNumBuckets());
+ Assert.assertEquals(10.0, h.getBucketSize(), 0.01);
+ Assert.assertEquals(1, h.getLowerLimit(), 0.01);
+ Assert.assertEquals(21, h.getUpperLimit(), 0.01);
+ Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
+ Assert.assertArrayEquals(new long[]{2, 4}, h.getHistogram());
+ Assert.assertEquals(6, h.getCount());
+ Assert.assertEquals(1.0, h.getMin(), 0.01);
+ Assert.assertEquals(16.0, h.getMax(), 0.01);
+ Assert.assertEquals(0, h.getMissingValueCount());
+ Assert.assertEquals(0, h.getLowerOutlierCount());
+ Assert.assertEquals(0, h.getUpperOutlierCount());
+ }
+
+ @Test
+ public void testAggregateMultiPositions()
+ {
+ int size = FixedBucketsHistogram.getFullStorageSize(2);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(size * 2);
+ FixedBucketsHistogramAggregatorFactory factory = buildHistogramAggFactory("field_2");
+ VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+ int[] positions = new int[]{0, size};
+ vectorAggregator.init(byteBuffer, positions[0]);
+ vectorAggregator.init(byteBuffer, positions[1]);
+ vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0);
+
+ FixedBucketsHistogram h0 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
+
+ Assert.assertEquals(2, h0.getNumBuckets());
+ Assert.assertEquals(10.0, h0.getBucketSize(), 0.01);
+ Assert.assertEquals(1, h0.getLowerLimit(), 0.01);
+ Assert.assertEquals(21, h0.getUpperLimit(), 0.01);
+ Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h0.getOutlierHandlingMode());
+ Assert.assertArrayEquals(new long[]{1, 0}, h0.getHistogram());
+ Assert.assertEquals(1, h0.getCount());
+ Assert.assertEquals(1.0, h0.getMin(), 0.01);
+ Assert.assertEquals(1.0, h0.getMax(), 0.01);
+ Assert.assertEquals(0, h0.getMissingValueCount());
+ Assert.assertEquals(0, h0.getLowerOutlierCount());
+ Assert.assertEquals(0, h0.getUpperOutlierCount());
+
+ FixedBucketsHistogram h1 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, positions[1]);
+
+ Assert.assertEquals(2, h1.getNumBuckets());
+ Assert.assertEquals(10.0, h1.getBucketSize(), 0.01);
+ Assert.assertEquals(1, h1.getLowerLimit(), 0.01);
+ Assert.assertEquals(21, h1.getUpperLimit(), 0.01);
+ Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h1.getOutlierHandlingMode());
+ Assert.assertArrayEquals(new long[]{0, 1}, h1.getHistogram());
+ Assert.assertEquals(1, h1.getCount());
+ Assert.assertEquals(12.0, h1.getMin(), 0.01);
+ Assert.assertEquals(12.0, h1.getMax(), 0.01);
+ Assert.assertEquals(0, h1.getMissingValueCount());
+ Assert.assertEquals(0, h1.getLowerOutlierCount());
+ Assert.assertEquals(0, h1.getUpperOutlierCount());
+
+ // Tests when there is a level of indirection in accessing the vector
+ byteBuffer = ByteBuffer.allocate(size * 2);
+ vectorAggregator.init(byteBuffer, positions[0]);
+ vectorAggregator.init(byteBuffer, positions[1]);
+ vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{2, 3}, 0);
+
+ FixedBucketsHistogram h2 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
+
+ Assert.assertEquals(2, h2.getNumBuckets());
+ Assert.assertEquals(10.0, h2.getBucketSize(), 0.01);
+ Assert.assertEquals(1, h2.getLowerLimit(), 0.01);
+ Assert.assertEquals(21, h2.getUpperLimit(), 0.01);
+ Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h2.getOutlierHandlingMode());
+ Assert.assertArrayEquals(new long[]{1, 0}, h2.getHistogram());
+ Assert.assertEquals(1, h2.getCount());
+ Assert.assertEquals(3.0, h2.getMin(), 0.01);
+ Assert.assertEquals(3.0, h2.getMax(), 0.01);
+ Assert.assertEquals(0, h2.getMissingValueCount());
+ Assert.assertEquals(0, h2.getLowerOutlierCount());
+ Assert.assertEquals(0, h2.getUpperOutlierCount());
+
+ FixedBucketsHistogram h3 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, positions[1]);
+
+ Assert.assertEquals(2, h3.getNumBuckets());
+ Assert.assertEquals(10.0, h3.getBucketSize(), 0.01);
+ Assert.assertEquals(1, h3.getLowerLimit(), 0.01);
+ Assert.assertEquals(21, h3.getUpperLimit(), 0.01);
+ Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h3.getOutlierHandlingMode());
+ Assert.assertArrayEquals(new long[]{0, 1}, h3.getHistogram());
+ Assert.assertEquals(1, h3.getCount());
+ Assert.assertEquals(14.0, h3.getMin(), 0.01);
+ Assert.assertEquals(14.0, h3.getMax(), 0.01);
+ Assert.assertEquals(0, h3.getMissingValueCount());
+ Assert.assertEquals(0, h3.getLowerOutlierCount());
+ Assert.assertEquals(0, h3.getUpperOutlierCount());
+
+ }
+
+ private FixedBucketsHistogramAggregatorFactory buildHistogramAggFactory(String fieldName)
+ {
+ return new FixedBucketsHistogramAggregatorFactory(
+ "fixedHisto",
+ fieldName,
+ 2,
+ 1,
+ 21,
+ FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+ false
+ );
+ }
+}
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
index 49fd806..beb275f 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
@@ -48,6 +48,7 @@
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@@ -120,6 +121,7 @@
ApproximateHistogramDruidModule.registerSerde();
for (Module mod : new ApproximateHistogramDruidModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod);
+ TestHelper.JSON_MAPPER.registerModule(mod);
}
final QueryableIndex index = IndexBuilder.create()
diff --git a/website/.spelling b/website/.spelling
index 249db61..14fd910 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -596,6 +596,7 @@
- ../docs/development/extensions-core/approximate-histograms.md
approxHistogram
approxHistogramFold
+fixedBucketsHistogram
bucketNum
lowerLimit
numBuckets