Add vectorization for druid-histogram extension (#10304)

* First draft

* Remove redundant code from FixedBucketsHistogramAggregator classes

* Add test cases for new classes

* Fix tests in sql compatible mode

* Typo fix

* Fix comment

* Add spelling

* Vectorize only for supported types

* Rename internal aggregator files

* Fix tests
diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md
index 12d51c9..a937d84 100644
--- a/docs/querying/query-context.md
+++ b/docs/querying/query-context.md
@@ -90,7 +90,8 @@
 include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "not".
 - All filters in filtered aggregators must offer vectorized row-matchers.
 - All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin",
- "longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", and "filtered".
+ "longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "hyperUnique", "filtered", "approxHistogram", 
+ "approxHistogramFold", and "fixedBucketsHistogram" (with numerical input). 
 - No virtual columns.
 - For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs).
 - For GroupBy: No multi-value dimensions.
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java
index c961f50..cde33fd 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java
@@ -32,10 +32,14 @@
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.aggregation.VectorAggregator;
 import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
@@ -104,6 +108,24 @@
   }
 
   @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory)
+  {
+    return new ApproximateHistogramVectorAggregator(
+        metricVectorFactory.makeValueSelector(fieldName),
+        resolution
+    );
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    /* skip vectorization for string types which may be parseable to numbers. There is no vector equivalent of
+     string value selector*/
+    ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+    return (capabilities != null) && capabilities.getType().isNumeric();
+  }
+
+  @Override
   public Comparator getComparator()
   {
     return ApproximateHistogramAggregator.COMPARATOR;
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java
index 4390a63..a33d27b 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java
@@ -28,54 +28,30 @@
 public class ApproximateHistogramBufferAggregator implements BufferAggregator
 {
   private final BaseFloatColumnValueSelector selector;
-  private final int resolution;
+  private final ApproximateHistogramBufferAggregatorHelper innerAggregator;
 
   public ApproximateHistogramBufferAggregator(BaseFloatColumnValueSelector selector, int resolution)
   {
     this.selector = selector;
-    this.resolution = resolution;
+    this.innerAggregator = new ApproximateHistogramBufferAggregatorHelper(resolution);
   }
 
   @Override
   public void init(ByteBuffer buf, int position)
   {
-    ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-
-    mutationBuffer.putInt(resolution);
-    mutationBuffer.putInt(0); //initial binCount
-    for (int i = 0; i < resolution; ++i) {
-      mutationBuffer.putFloat(0f);
-    }
-    for (int i = 0; i < resolution; ++i) {
-      mutationBuffer.putLong(0L);
-    }
-
-    // min
-    mutationBuffer.putFloat(Float.POSITIVE_INFINITY);
-    // max
-    mutationBuffer.putFloat(Float.NEGATIVE_INFINITY);
+    innerAggregator.init(buf, position);
   }
 
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
-    ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-
-    ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
-    h0.offer(selector.getFloat());
-
-    mutationBuffer.position(position);
-    h0.toBytesDense(mutationBuffer);
+    innerAggregator.aggregate(buf, position, selector.getFloat());
   }
 
   @Override
   public Object get(ByteBuffer buf, int position)
   {
-    ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-    return ApproximateHistogram.fromBytes(mutationBuffer);
+    return innerAggregator.get(buf, position);
   }
 
   @Override
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregatorHelper.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregatorHelper.java
new file mode 100644
index 0000000..b597c4d
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregatorHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A helper class used by {@link ApproximateHistogramBufferAggregator} and {@link ApproximateHistogramVectorAggregator}
+ * for aggregation operations on byte buffers. Getting the object from value selectors is outside this class.
+ */
+final class ApproximateHistogramBufferAggregatorHelper
+{
+  private final int resolution;
+
+  public ApproximateHistogramBufferAggregatorHelper(int resolution)
+  {
+    this.resolution = resolution;
+  }
+
+  public void init(final ByteBuffer buf, final int position)
+  {
+    ApproximateHistogram histogram = new ApproximateHistogram(resolution);
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    histogram.toBytesDense(mutationBuffer);
+  }
+
+  public ApproximateHistogram get(final ByteBuffer buf, final int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    return ApproximateHistogram.fromBytesDense(mutationBuffer);
+  }
+
+  public void put(final ByteBuffer buf, final int position, final ApproximateHistogram histogram)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    histogram.toBytesDense(mutationBuffer);
+  }
+
+  public void aggregate(final ByteBuffer buf, final int position, final float value)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
+    h0.offer(value);
+
+    mutationBuffer.position(position);
+    h0.toBytesDense(mutationBuffer);
+  }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java
index becbfe1..7a8a80c 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregatorFactory.java
@@ -27,9 +27,15 @@
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
 import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
 
 import javax.annotation.Nullable;
 import java.util.Objects;
@@ -94,9 +100,32 @@
   }
 
   @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory metricVectorFactory)
+  {
+    VectorObjectSelector selector = metricVectorFactory.makeObjectSelector(fieldName);
+    return new ApproximateHistogramFoldingVectorAggregator(selector, resolution, lowerLimit, upperLimit);
+  }
+
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+    return (capabilities != null) && (capabilities.getType() == ValueType.COMPLEX);
+  }
+
+  @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit, finalizeAsBase64Binary);
+    return new ApproximateHistogramFoldingAggregatorFactory(
+        name,
+        name,
+        resolution,
+        numBuckets,
+        lowerLimit,
+        upperLimit,
+        finalizeAsBase64Binary
+    );
   }
 
   @Override
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java
index bd9d0cd..811e2bf 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java
@@ -28,12 +28,7 @@
 public class ApproximateHistogramFoldingBufferAggregator implements BufferAggregator
 {
   private final BaseObjectColumnValueSelector<ApproximateHistogram> selector;
-  private final int resolution;
-  private final float upperLimit;
-  private final float lowerLimit;
-
-  private float[] tmpBufferP;
-  private long[] tmpBufferB;
+  private final ApproximateHistogramFoldingBufferAggregatorHelper innerAggregator;
 
   public ApproximateHistogramFoldingBufferAggregator(
       BaseObjectColumnValueSelector<ApproximateHistogram> selector,
@@ -43,50 +38,26 @@
   )
   {
     this.selector = selector;
-    this.resolution = resolution;
-    this.lowerLimit = lowerLimit;
-    this.upperLimit = upperLimit;
-
-    tmpBufferP = new float[resolution];
-    tmpBufferB = new long[resolution];
+    this.innerAggregator = new ApproximateHistogramFoldingBufferAggregatorHelper(resolution, lowerLimit, upperLimit);
   }
 
   @Override
   public void init(ByteBuffer buf, int position)
   {
-    ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit);
-
-    ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-    // use dense storage for aggregation
-    h.toBytesDense(mutationBuffer);
+    innerAggregator.init(buf, position);
   }
 
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
     ApproximateHistogram hNext = selector.getObject();
-    if (hNext == null) {
-      return;
-    }
-    ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-
-    ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
-    h0.setLowerLimit(lowerLimit);
-    h0.setUpperLimit(upperLimit);
-    h0.foldFast(hNext, tmpBufferP, tmpBufferB);
-
-    mutationBuffer.position(position);
-    h0.toBytesDense(mutationBuffer);
+    innerAggregator.aggregate(buf, position, hNext);
   }
 
   @Override
   public Object get(ByteBuffer buf, int position)
   {
-    ByteBuffer mutationBuffer = buf.asReadOnlyBuffer();
-    mutationBuffer.position(position);
-    return ApproximateHistogram.fromBytesDense(mutationBuffer);
+    return innerAggregator.get(buf, position);
   }
 
   @Override
@@ -106,6 +77,7 @@
   {
     throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getDouble()");
   }
+
   @Override
   public void close()
   {
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregatorHelper.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregatorHelper.java
new file mode 100644
index 0000000..64c6a4c
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregatorHelper.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * A helper class used by {@link ApproximateHistogramFoldingBufferAggregator} and
+ * {@link ApproximateHistogramFoldingVectorAggregator} for aggregation operations on byte buffers.
+ * Getting the object from value selectors is outside this class.
+ */
+final class ApproximateHistogramFoldingBufferAggregatorHelper
+{
+  private final int resolution;
+  private final float upperLimit;
+  private final float lowerLimit;
+
+  private float[] tmpBufferA;
+  private long[] tmpBufferB;
+
+  public ApproximateHistogramFoldingBufferAggregatorHelper(
+      int resolution,
+      float lowerLimit,
+      float upperLimit
+  )
+  {
+    this.resolution = resolution;
+    this.lowerLimit = lowerLimit;
+    this.upperLimit = upperLimit;
+
+    tmpBufferA = new float[resolution];
+    tmpBufferB = new long[resolution];
+  }
+
+  public void init(ByteBuffer buf, int position)
+  {
+    ApproximateHistogram h = new ApproximateHistogram(resolution, lowerLimit, upperLimit);
+
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    // use dense storage for aggregation
+    h.toBytesDense(mutationBuffer);
+  }
+
+  public void aggregate(ByteBuffer buf, int position, @Nullable ApproximateHistogram hNext)
+  {
+    if (hNext == null) {
+      return;
+    }
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    ApproximateHistogram h0 = ApproximateHistogram.fromBytesDense(mutationBuffer);
+    foldFast(h0, hNext);
+
+    mutationBuffer.position(position);
+    h0.toBytesDense(mutationBuffer);
+  }
+
+  public void foldFast(ApproximateHistogram left, ApproximateHistogram right)
+  {
+    //These have to set in every call since limits are transient and lost during serialization-deserialization
+    left.setLowerLimit(lowerLimit);
+    left.setUpperLimit(upperLimit);
+    left.foldFast(right, tmpBufferA, tmpBufferB);
+  }
+
+  public ApproximateHistogram get(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.asReadOnlyBuffer();
+    mutationBuffer.position(position);
+    return ApproximateHistogram.fromBytesDense(mutationBuffer);
+  }
+
+  public void put(ByteBuffer buf, int position, ApproximateHistogram histogram)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    histogram.toBytesDense(mutationBuffer);
+  }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregator.java
new file mode 100644
index 0000000..69c6e59
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class ApproximateHistogramFoldingVectorAggregator implements VectorAggregator
+{
+  private final ApproximateHistogramFoldingBufferAggregatorHelper innerAggregator;
+  private final VectorObjectSelector selector;
+
+  public ApproximateHistogramFoldingVectorAggregator(
+      final VectorObjectSelector selector,
+      final int resolution,
+      final float lowerLimit,
+      final float upperLimit
+  )
+  {
+    this.selector = selector;
+    this.innerAggregator = new ApproximateHistogramFoldingBufferAggregatorHelper(resolution, lowerLimit, upperLimit);
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    innerAggregator.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    Object[] vector = selector.getObjectVector();
+    ApproximateHistogram histogram = innerAggregator.get(buf, position);
+    for (int i = startRow; i < endRow; i++) {
+      ApproximateHistogram other = (ApproximateHistogram) vector[i];
+      if (null != other) {
+        innerAggregator.foldFast(histogram, other);
+      }
+    }
+    innerAggregator.put(buf, position, histogram);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    Object[] vector = selector.getObjectVector();
+    for (int i = 0; i < numRows; i++) {
+      ApproximateHistogram other = (ApproximateHistogram) vector[null != rows ? rows[i] : i];
+      if (null == other) {
+        continue;
+      }
+      int position = positions[i] + positionOffset;
+      innerAggregator.aggregate(buf, position, other);
+    }
+  }
+
+  @Nullable
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return innerAggregator.get(buf, position);
+  }
+
+  @Override
+  public void close()
+  {
+    // Nothing to close
+  }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregator.java
new file mode 100644
index 0000000..728271f
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class ApproximateHistogramVectorAggregator implements VectorAggregator
+{
+
+  private final VectorValueSelector selector;
+  private final ApproximateHistogramBufferAggregatorHelper innerAggregator;
+
+  public ApproximateHistogramVectorAggregator(
+      VectorValueSelector selector,
+      int resolution
+  )
+  {
+    this.selector = selector;
+    this.innerAggregator = new ApproximateHistogramBufferAggregatorHelper(resolution);
+  }
+
+  @Override
+  public void init(final ByteBuffer buf, final int position)
+  {
+    innerAggregator.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+  {
+    final boolean[] isValueNull = selector.getNullVector();
+    final float[] vector = selector.getFloatVector();
+    ApproximateHistogram histogram = innerAggregator.get(buf, position);
+
+    for (int i = startRow; i < endRow; i++) {
+      if (isValueNull != null && isValueNull[i]) {
+        continue;
+      }
+      histogram.offer(vector[i]);
+    }
+    innerAggregator.put(buf, position, histogram);
+  }
+
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return innerAggregator.get(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    final float[] vector = selector.getFloatVector();
+    final boolean[] isValueNull = selector.getNullVector();
+
+    for (int i = 0; i < numRows; i++) {
+      if (isValueNull != null && isValueNull[i]) {
+        continue;
+      }
+      final int position = positions[i] + positionOffset;
+      innerAggregator.aggregate(buf, position, vector[rows != null ? rows[i] : i]);
+    }
+  }
+
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java
index f408dbf..e10203c 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java
@@ -25,9 +25,11 @@
 import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 
+import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -432,6 +434,34 @@
   }
 
   /**
+   * Merge another datapoint into this one. The other datapoint could be
+   *  - base64 encoded string of {@code FixedBucketsHistogram}
+   *  - {@code FixedBucketsHistogram} object
+   *  - Numeric value
+   *
+   * @param val
+   */
+  @VisibleForTesting
+  public void combine(@Nullable Object val)
+  {
+    if (val == null) {
+      if (NullHandling.replaceWithDefault()) {
+        add(NullHandling.defaultDoubleValue());
+      } else {
+        incrementMissing();
+      }
+    } else if (val instanceof String) {
+      combineHistogram(fromBase64((String) val));
+    } else if (val instanceof FixedBucketsHistogram) {
+      combineHistogram((FixedBucketsHistogram) val);
+    } else if (val instanceof Number) {
+      add(((Number) val).doubleValue());
+    } else {
+      throw new ISE("Unknown class for object: " + val.getClass());
+    }
+  }
+
+  /**
    * Merge another histogram into this one. Only the state of this histogram is updated.
    *
    * If the two histograms have identical buckets, a simpler algorithm is used.
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java
index 2f5b565..eed1c9e 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregator.java
@@ -20,8 +20,6 @@
 package org.apache.druid.query.aggregation.histogram;
 
 import com.google.common.primitives.Longs;
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
@@ -66,22 +64,7 @@
   public void aggregate()
   {
     Object val = selector.getObject();
-
-    if (val == null) {
-      if (NullHandling.replaceWithDefault()) {
-        histogram.add(NullHandling.defaultDoubleValue());
-      } else {
-        histogram.incrementMissing();
-      }
-    } else if (val instanceof String) {
-      histogram.combineHistogram(FixedBucketsHistogram.fromBase64((String) val));
-    } else if (val instanceof FixedBucketsHistogram) {
-      histogram.combineHistogram((FixedBucketsHistogram) val);
-    } else if (val instanceof Number) {
-      histogram.add(((Number) val).doubleValue());
-    } else {
-      throw new ISE("Unknown class for object: " + val.getClass());
-    }
+    histogram.combine(val);
   }
 
   @Nullable
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java
index 7cd8de8..705c420 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramAggregatorFactory.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.aggregation.AggregateCombiner;
 import org.apache.druid.query.aggregation.Aggregator;
@@ -29,10 +30,14 @@
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.aggregation.VectorAggregator;
 import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -101,6 +106,34 @@
   }
 
   @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
+  {
+    ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
+    if (null == capabilities) {
+      throw new IAE("could not find the column type for column %s", fieldName);
+    }
+    ValueType type = capabilities.getType();
+    if (type.isNumeric()) {
+      return new FixedBucketsHistogramVectorAggregator(
+          columnSelectorFactory.makeValueSelector(fieldName),
+          lowerLimit,
+          upperLimit,
+          numBuckets,
+          outlierHandlingMode
+      );
+    } else {
+      throw new IAE("cannot vectorize fixed bucket histogram aggregation for type %s", type);
+    }
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
+    return (capabilities != null) && capabilities.getType().isNumeric();
+  }
+
+  @Override
   public Comparator getComparator()
   {
     return FixedBucketsHistogramAggregator.COMPARATOR;
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java
index 0e894d7..1ecebb5 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregator.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.query.aggregation.histogram;
 
-import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
@@ -29,8 +28,7 @@
 public class FixedBucketsHistogramBufferAggregator implements BufferAggregator
 {
   private final BaseObjectColumnValueSelector selector;
-
-  private FixedBucketsHistogram histogram;
+  private final FixedBucketsHistogramBufferAggregatorHelper innerAggregator;
 
   public FixedBucketsHistogramBufferAggregator(
       BaseObjectColumnValueSelector selector,
@@ -41,7 +39,7 @@
   )
   {
     this.selector = selector;
-    this.histogram = new FixedBucketsHistogram(
+    this.innerAggregator = new FixedBucketsHistogramBufferAggregatorHelper(
         lowerLimit,
         upperLimit,
         numBuckets,
@@ -52,45 +50,20 @@
   @Override
   public void init(ByteBuffer buf, int position)
   {
-    ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-    mutationBuffer.put(histogram.toBytesFull(false));
+    innerAggregator.init(buf, position);
   }
 
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
-    ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-
-    FixedBucketsHistogram h0 = FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
-
     Object val = selector.getObject();
-    if (val == null) {
-      if (NullHandling.replaceWithDefault()) {
-        h0.incrementMissing();
-      } else {
-        h0.add(NullHandling.defaultDoubleValue());
-      }
-    } else if (val instanceof String) {
-      h0.combineHistogram(FixedBucketsHistogram.fromBase64((String) val));
-    } else if (val instanceof FixedBucketsHistogram) {
-      h0.combineHistogram((FixedBucketsHistogram) val);
-    } else {
-      Double x = ((Number) val).doubleValue();
-      h0.add(x);
-    }
-
-    mutationBuffer.position(position);
-    mutationBuffer.put(h0.toBytesFull(false));
+    innerAggregator.aggregate(buf, position, val);
   }
 
   @Override
   public Object get(ByteBuffer buf, int position)
   {
-    ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-    return FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
+    return innerAggregator.get(buf, position);
   }
 
   @Override
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregatorHelper.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregatorHelper.java
new file mode 100644
index 0000000..8844e25
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramBufferAggregatorHelper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * A helper class used by {@link FixedBucketsHistogramBufferAggregator} and
+ * {@link FixedBucketsHistogramVectorAggregator} for aggregation operations on byte buffers.
+ * Getting the object from value selectors is outside this class.
+ */
+final class FixedBucketsHistogramBufferAggregatorHelper
+{
+  private final double lowerLimit;
+  private final double upperLimit;
+  private final int numBuckets;
+  private final FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode;
+
+  public FixedBucketsHistogramBufferAggregatorHelper(
+      double lowerLimit,
+      double upperLimit,
+      int numBuckets,
+      FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode
+  )
+  {
+    this.lowerLimit = lowerLimit;
+    this.upperLimit = upperLimit;
+    this.numBuckets = numBuckets;
+    this.outlierHandlingMode = outlierHandlingMode;
+  }
+
+  public void init(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    FixedBucketsHistogram histogram = new FixedBucketsHistogram(
+        lowerLimit,
+        upperLimit,
+        numBuckets,
+        outlierHandlingMode
+    );
+    mutationBuffer.put(histogram.toBytesFull(false));
+  }
+
+  public void aggregate(ByteBuffer buf, int position, @Nullable Object val)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    FixedBucketsHistogram h0 = FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
+    h0.combine(val);
+
+    mutationBuffer.position(position);
+    mutationBuffer.put(h0.toBytesFull(false));
+  }
+
+  public FixedBucketsHistogram get(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    return FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
+  }
+
+  public void put(ByteBuffer buf, int position, FixedBucketsHistogram histogram)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    mutationBuffer.put(histogram.toBytesFull(false));
+  }
+}
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregator.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregator.java
new file mode 100644
index 0000000..8bfbe6f
--- /dev/null
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class FixedBucketsHistogramVectorAggregator implements VectorAggregator
+{
+  private final VectorValueSelector selector;
+  private final FixedBucketsHistogramBufferAggregatorHelper innerAggregator;
+
+  public FixedBucketsHistogramVectorAggregator(
+      VectorValueSelector selector,
+      double lowerLimit,
+      double upperLimit,
+      int numBuckets,
+      FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode
+  )
+  {
+    this.selector = selector;
+    this.innerAggregator = new FixedBucketsHistogramBufferAggregatorHelper(
+        lowerLimit,
+        upperLimit,
+        numBuckets,
+        outlierHandlingMode
+    );
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    innerAggregator.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    double[] vector = selector.getDoubleVector();
+    boolean[] isNull = selector.getNullVector();
+    FixedBucketsHistogram histogram = innerAggregator.get(buf, position);
+    for (int i = startRow; i < endRow; i++) {
+      histogram.combine(toObject(vector, isNull, i));
+    }
+    innerAggregator.put(buf, position, histogram);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    double[] vector = selector.getDoubleVector();
+    boolean[] isNull = selector.getNullVector();
+    for (int i = 0; i < numRows; i++) {
+      int position = positions[i] + positionOffset;
+      int index = rows != null ? rows[i] : i;
+      Double val = toObject(vector, isNull, index);
+      innerAggregator.aggregate(buf, position, val);
+    }
+  }
+
+  @Nullable
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return innerAggregator.get(buf, position);
+  }
+
+  @Override
+  public void close()
+  {
+    // Nothing to close
+  }
+
+  @Nullable
+  private Double toObject(double[] vector, @Nullable boolean[] isNull, int index)
+  {
+    return (isNull != null && isNull[index]) ? null : vector[index];
+  }
+}
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregatorTest.java
new file mode 100644
index 0000000..ee7283b
--- /dev/null
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramFoldingVectorAggregatorTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+
+public class ApproximateHistogramFoldingVectorAggregatorTest
+{
+  private static final float[] FLOATS = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45};
+  private VectorColumnSelectorFactory vectorColumnSelectorFactory;
+  private ApproximateHistogram h1;
+  private ApproximateHistogram h2;
+
+  @Before
+  public void setup()
+  {
+
+    h1 = new ApproximateHistogram(5);
+    h2 = new ApproximateHistogram(5);
+
+    for (int i = 0; i < 5; ++i) {
+      h1.offer(FLOATS[i]);
+    }
+    for (int i = 5; i < FLOATS.length; ++i) {
+      h2.offer(FLOATS[i]);
+    }
+
+    VectorObjectSelector vectorObjectSelector = createMock(VectorObjectSelector.class);
+    expect(vectorObjectSelector.getObjectVector()).andReturn(new Object[]{h1, null, h2, null}).anyTimes();
+
+    EasyMock.replay(vectorObjectSelector);
+
+    vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class);
+    expect(vectorColumnSelectorFactory.makeObjectSelector("field"))
+        .andReturn(vectorObjectSelector).anyTimes();
+    expect(vectorColumnSelectorFactory.getColumnCapabilities("field")).andReturn(
+        new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX)
+    );
+    expect(vectorColumnSelectorFactory.getColumnCapabilities("string_field")).andReturn(
+        new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+    );
+    expect(vectorColumnSelectorFactory.getColumnCapabilities("double_field")).andReturn(
+        new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+    );
+    EasyMock.replay(vectorColumnSelectorFactory);
+  }
+
+  @Test
+  public void doNotVectorizedNonComplexTypes()
+  {
+    ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory("string_field");
+    Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
+
+    factory = buildHistogramFactory("double_field");
+    Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
+  }
+
+  @Test
+  public void testAggregateSinglePosition()
+  {
+    ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory();
+    ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory));
+    VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+    vectorAggregator.init(byteBuffer, 0);
+    vectorAggregator.aggregate(byteBuffer, 0, 0, 4);
+    ApproximateHistogram h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+
+    Assert.assertArrayEquals(new float[]{19.6f, 45.0f}, h.positions(), 0.1f);
+    Assert.assertArrayEquals(new long[]{9, 1}, h.bins());
+    Assert.assertEquals(10, h.count());
+    Assert.assertEquals(2.0f, h.min(), 0.1f);
+    Assert.assertEquals(45.0f, h.max(), 0.1f);
+  }
+
+  @Test
+  public void testAggregateMultiPositions()
+  {
+    ApproximateHistogramFoldingAggregatorFactory factory = buildHistogramFactory();
+    ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSize() * 2);
+    int[] positions = new int[]{0, factory.getMaxIntermediateSize()};
+    VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+    vectorAggregator.init(byteBuffer, 0);
+    vectorAggregator.init(byteBuffer, positions[1]);
+
+    vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0);
+    vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{1, 2}, 0);  // indirection
+    ApproximateHistogram actualH1 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+    ApproximateHistogram actualH2 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, positions[1]);
+
+    Assert.assertEquals(actualH1, h1);
+    Assert.assertEquals(actualH2, h2);
+
+  }
+
+  private ApproximateHistogramFoldingAggregatorFactory buildHistogramFactory()
+  {
+    return buildHistogramFactory("field");
+  }
+
+  private ApproximateHistogramFoldingAggregatorFactory buildHistogramFactory(String fieldName)
+  {
+    return new ApproximateHistogramFoldingAggregatorFactory(
+        "approximateHistoFold",
+        fieldName,
+        5,
+        5,
+        0f,
+        50.0f,
+        false
+    );
+  }
+}
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregatorTest.java
new file mode 100644
index 0000000..9958194
--- /dev/null
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogramVectorAggregatorTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+
+public class ApproximateHistogramVectorAggregatorTest
+{
+  private static final float[] FLOATS = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 33};   // Last value is never included
+  private static final boolean[] NULL_VECTOR =
+      {false, false, false, false, false, false, false, false, false, false, true};
+  private VectorColumnSelectorFactory vectorColumnSelectorFactory;
+
+  @Before
+  public void setup()
+  {
+    NullHandling.initializeForTests();
+    VectorValueSelector vectorValueSelector_1 = createMock(VectorValueSelector.class);
+    expect(vectorValueSelector_1.getFloatVector()).andReturn(FLOATS).anyTimes();
+    expect(vectorValueSelector_1.getNullVector()).andReturn(NULL_VECTOR).anyTimes();
+
+    VectorValueSelector vectorValueSelector_2 = createMock(VectorValueSelector.class);
+    expect(vectorValueSelector_2.getFloatVector()).andReturn(FLOATS).anyTimes();
+    expect(vectorValueSelector_2.getNullVector()).andReturn(null).anyTimes();
+
+    EasyMock.replay(vectorValueSelector_1);
+    EasyMock.replay(vectorValueSelector_2);
+
+    ColumnCapabilities columnCapabilities
+        = ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE);
+    vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class);
+    expect(vectorColumnSelectorFactory.getColumnCapabilities("field_1")).andReturn(columnCapabilities).anyTimes();
+    expect(vectorColumnSelectorFactory.makeValueSelector("field_1"))
+        .andReturn(vectorValueSelector_1).anyTimes();
+    expect(vectorColumnSelectorFactory.getColumnCapabilities("field_2")).andReturn(columnCapabilities).anyTimes();
+    expect(vectorColumnSelectorFactory.makeValueSelector("field_2"))
+        .andReturn(vectorValueSelector_2).anyTimes();
+    expect(vectorColumnSelectorFactory.getColumnCapabilities("string_field")).andReturn(
+        new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+    );
+    expect(vectorColumnSelectorFactory.getColumnCapabilities("complex_field")).andReturn(
+        new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX)
+    );
+    EasyMock.replay(vectorColumnSelectorFactory);
+  }
+
+  @Test
+  public void doNotVectorizedNonNumericTypes()
+  {
+    ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("string_field");
+    Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
+
+    factory = buildHistogramAggFactory("complex_field");
+    Assert.assertFalse(factory.canVectorize(vectorColumnSelectorFactory));
+  }
+
+  @Test
+  public void testAggregateSinglePosition()
+  {
+    ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("field_1");
+    ByteBuffer byteBuffer = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls());
+    Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory));
+    VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+    vectorAggregator.init(byteBuffer, 0);
+    vectorAggregator.aggregate(byteBuffer, 0, 0, 11);
+    ApproximateHistogram h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+
+    // (2, 1), (9.5, 2), (19.33, 3), (32.67, 3), (45, 1)
+    Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions(), 0.1f);
+    Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h.bins());
+
+    factory = buildHistogramAggFactory("field_2");
+    vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+    vectorAggregator.init(byteBuffer, 0);
+    vectorAggregator.aggregate(byteBuffer, 0, 0, 10);
+    h = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+
+    Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h.positions(), 0.1f);
+    Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h.bins());
+
+  }
+
+  @Test
+  public void testAggregateMultiPositions()
+  {
+    ApproximateHistogramAggregatorFactory factory = buildHistogramAggFactory("field_2");
+    int size = factory.getMaxIntermediateSize();
+    ByteBuffer byteBuffer = ByteBuffer.allocate(size * 2);
+    VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+    int[] positions = new int[]{0, size};
+    vectorAggregator.init(byteBuffer, positions[0]);
+    vectorAggregator.init(byteBuffer, positions[1]);
+    vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0);
+    // Put rest of 10 elements using the access indirection. Second vector gets the same element always
+    for (int i = 1; i < 10; i++) {
+      vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{i, 1}, 0);
+    }
+
+    ApproximateHistogram h0 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, 0);
+    Assert.assertArrayEquals(new float[]{2, 9.5f, 19.33f, 32.67f, 45f}, h0.positions(), 0.1f);
+    Assert.assertArrayEquals(new long[]{1, 2, 3, 3, 1}, h0.bins());
+
+    ApproximateHistogram h2 = (ApproximateHistogram) vectorAggregator.get(byteBuffer, size);
+    Assert.assertArrayEquals(new float[]{19}, h2.positions(), 0.1f);
+    Assert.assertArrayEquals(new long[]{10}, h2.bins());
+  }
+
+  private ApproximateHistogramAggregatorFactory buildHistogramAggFactory(String fieldName)
+  {
+    return new ApproximateHistogramAggregatorFactory(
+        "approxHisto",
+        fieldName,
+        5,
+        5,
+        0.0f,
+        45.0f,
+        false
+    );
+  }
+}
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java
index bf92974..630d6e8 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramTest.java
@@ -1243,6 +1243,94 @@
   }
 
   @Test
+  public void testCombineBase64()
+  {
+    FixedBucketsHistogram h = buildHistogram(
+        0,
+        20,
+        5,
+        FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+        new float[]{1, 2, 7, 12, 18}
+    );
+
+    FixedBucketsHistogram h2 = buildHistogram(
+        0,
+        20,
+        7,
+        FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+        new float[]{3, 8, 9, 19, 99, -50}
+    );
+
+    h.combine(h2.toBase64());
+    Assert.assertEquals(5, h.getNumBuckets());
+    Assert.assertEquals(4.0, h.getBucketSize(), 0.01);
+    Assert.assertEquals(0, h.getLowerLimit(), 0.01);
+    Assert.assertEquals(20, h.getUpperLimit(), 0.01);
+    Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
+    Assert.assertArrayEquals(new long[]{2, 3, 1, 1, 2}, h.getHistogram());
+    Assert.assertEquals(9, h.getCount());
+    Assert.assertEquals(1, h.getMin(), 0.01);
+    Assert.assertEquals(18, h.getMax(), 0.01);
+    Assert.assertEquals(0, h.getMissingValueCount());
+    Assert.assertEquals(1, h.getLowerOutlierCount());
+    Assert.assertEquals(1, h.getUpperOutlierCount());
+  }
+
+  @Test
+  public void testCombineAnotherHistogram()
+  {
+    FixedBucketsHistogram h = buildHistogram(
+        0,
+        20,
+        5,
+        FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+        new float[]{1, 2, 7, 12, 18}
+    );
+
+    FixedBucketsHistogram h2 = buildHistogram(
+        0,
+        20,
+        7,
+        FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+        new float[]{3, 8, 9, 19, 99, -50}
+    );
+
+    h.combine(h2);
+    Assert.assertEquals(5, h.getNumBuckets());
+    Assert.assertEquals(4.0, h.getBucketSize(), 0.01);
+    Assert.assertEquals(0, h.getLowerLimit(), 0.01);
+    Assert.assertEquals(20, h.getUpperLimit(), 0.01);
+    Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
+    Assert.assertArrayEquals(new long[]{2, 3, 1, 1, 2}, h.getHistogram());
+    Assert.assertEquals(9, h.getCount());
+    Assert.assertEquals(1, h.getMin(), 0.01);
+    Assert.assertEquals(18, h.getMax(), 0.01);
+    Assert.assertEquals(0, h.getMissingValueCount());
+    Assert.assertEquals(1, h.getLowerOutlierCount());
+    Assert.assertEquals(1, h.getUpperOutlierCount());
+  }
+
+  @Test
+  public void testCombineNumber()
+  {
+    FixedBucketsHistogram h = new FixedBucketsHistogram(
+        0,
+        200,
+        200,
+        FixedBucketsHistogram.OutlierHandlingMode.IGNORE
+    );
+
+    h.combine(10);
+    h.combine(20);
+
+    Assert.assertEquals(0, h.getUpperOutlierCount());
+    Assert.assertEquals(0, h.getLowerOutlierCount());
+    Assert.assertEquals(2, h.getCount());
+    Assert.assertEquals(10, h.getMin(), 0.01);
+    Assert.assertEquals(20, h.getMax(), 0.01);
+  }
+
+  @Test
   public void testMissing()
   {
     FixedBucketsHistogram h = new FixedBucketsHistogram(
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregatorTest.java
new file mode 100644
index 0000000..78f74ed
--- /dev/null
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogramVectorAggregatorTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.histogram;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+
+public class FixedBucketsHistogramVectorAggregatorTest
+{
+  private static final double[] DOUBLES = {1.0, 12.0, 3.0, 14.0, 15.0, 16.0};
+  private static final boolean[] NULL_VECTOR = {false, false, false, false, true, false};
+  private VectorColumnSelectorFactory vectorColumnSelectorFactory;
+
+  @Before
+  public void setup()
+  {
+    NullHandling.initializeForTests();
+    VectorValueSelector vectorValueSelector_1 = createMock(VectorValueSelector.class);
+    expect(vectorValueSelector_1.getDoubleVector()).andReturn(DOUBLES).anyTimes();
+    expect(vectorValueSelector_1.getNullVector()).andReturn(NULL_VECTOR).anyTimes();
+
+    VectorValueSelector vectorValueSelector_2 = createMock(VectorValueSelector.class);
+    expect(vectorValueSelector_2.getDoubleVector()).andReturn(DOUBLES).anyTimes();
+    expect(vectorValueSelector_2.getNullVector()).andReturn(null).anyTimes();
+
+    EasyMock.replay(vectorValueSelector_1);
+    EasyMock.replay(vectorValueSelector_2);
+
+    ColumnCapabilities columnCapabilities
+        = ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE);
+    vectorColumnSelectorFactory = createMock(VectorColumnSelectorFactory.class);
+    expect(vectorColumnSelectorFactory.getColumnCapabilities("field_1")).andReturn(columnCapabilities).anyTimes();
+    expect(vectorColumnSelectorFactory.makeValueSelector("field_1"))
+        .andReturn(vectorValueSelector_1).anyTimes();
+    expect(vectorColumnSelectorFactory.getColumnCapabilities("field_2")).andReturn(columnCapabilities).anyTimes();
+    expect(vectorColumnSelectorFactory.makeValueSelector("field_2"))
+        .andReturn(vectorValueSelector_2).anyTimes();
+    EasyMock.replay(vectorColumnSelectorFactory);
+  }
+
+  @Test
+  public void testAggregateSinglePosition()
+  {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(FixedBucketsHistogram.getFullStorageSize(2));
+    FixedBucketsHistogramAggregatorFactory factory = buildHistogramAggFactory("field_1");
+    Assert.assertTrue(factory.canVectorize(vectorColumnSelectorFactory));
+    VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+    vectorAggregator.init(byteBuffer, 0);
+    vectorAggregator.aggregate(byteBuffer, 0, 0, 6);
+    FixedBucketsHistogram h = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
+
+    Assert.assertEquals(2, h.getNumBuckets());
+    Assert.assertEquals(10.0, h.getBucketSize(), 0.01);
+    Assert.assertEquals(1, h.getLowerLimit(), 0.01);
+    Assert.assertEquals(21, h.getUpperLimit(), 0.01);
+    Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
+    Assert.assertArrayEquals(new long[]{2, 3}, h.getHistogram());
+    Assert.assertEquals(5, h.getCount());
+    Assert.assertEquals(1.0, h.getMin(), 0.01);
+    Assert.assertEquals(16.0, h.getMax(), 0.01);
+    // Default value of null is 0 which is an outlier.
+    Assert.assertEquals(NullHandling.replaceWithDefault() ? 0 : 1, h.getMissingValueCount());
+    Assert.assertEquals(NullHandling.replaceWithDefault() ? 1 : 0, h.getLowerOutlierCount());
+    Assert.assertEquals(0, h.getUpperOutlierCount());
+
+    factory = buildHistogramAggFactory("field_2");
+    vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+    vectorAggregator.init(byteBuffer, 0);
+    vectorAggregator.aggregate(byteBuffer, 0, 0, 6);
+    h = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
+
+    Assert.assertEquals(2, h.getNumBuckets());
+    Assert.assertEquals(10.0, h.getBucketSize(), 0.01);
+    Assert.assertEquals(1, h.getLowerLimit(), 0.01);
+    Assert.assertEquals(21, h.getUpperLimit(), 0.01);
+    Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h.getOutlierHandlingMode());
+    Assert.assertArrayEquals(new long[]{2, 4}, h.getHistogram());
+    Assert.assertEquals(6, h.getCount());
+    Assert.assertEquals(1.0, h.getMin(), 0.01);
+    Assert.assertEquals(16.0, h.getMax(), 0.01);
+    Assert.assertEquals(0, h.getMissingValueCount());
+    Assert.assertEquals(0, h.getLowerOutlierCount());
+    Assert.assertEquals(0, h.getUpperOutlierCount());
+  }
+
+  @Test
+  public void testAggregateMultiPositions()
+  {
+    int size = FixedBucketsHistogram.getFullStorageSize(2);
+    ByteBuffer byteBuffer = ByteBuffer.allocate(size * 2);
+    FixedBucketsHistogramAggregatorFactory factory = buildHistogramAggFactory("field_2");
+    VectorAggregator vectorAggregator = factory.factorizeVector(vectorColumnSelectorFactory);
+    int[] positions = new int[]{0, size};
+    vectorAggregator.init(byteBuffer, positions[0]);
+    vectorAggregator.init(byteBuffer, positions[1]);
+    vectorAggregator.aggregate(byteBuffer, 2, positions, null, 0);
+
+    FixedBucketsHistogram h0 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
+
+    Assert.assertEquals(2, h0.getNumBuckets());
+    Assert.assertEquals(10.0, h0.getBucketSize(), 0.01);
+    Assert.assertEquals(1, h0.getLowerLimit(), 0.01);
+    Assert.assertEquals(21, h0.getUpperLimit(), 0.01);
+    Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h0.getOutlierHandlingMode());
+    Assert.assertArrayEquals(new long[]{1, 0}, h0.getHistogram());
+    Assert.assertEquals(1, h0.getCount());
+    Assert.assertEquals(1.0, h0.getMin(), 0.01);
+    Assert.assertEquals(1.0, h0.getMax(), 0.01);
+    Assert.assertEquals(0, h0.getMissingValueCount());
+    Assert.assertEquals(0, h0.getLowerOutlierCount());
+    Assert.assertEquals(0, h0.getUpperOutlierCount());
+
+    FixedBucketsHistogram h1 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, positions[1]);
+
+    Assert.assertEquals(2, h1.getNumBuckets());
+    Assert.assertEquals(10.0, h1.getBucketSize(), 0.01);
+    Assert.assertEquals(1, h1.getLowerLimit(), 0.01);
+    Assert.assertEquals(21, h1.getUpperLimit(), 0.01);
+    Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h1.getOutlierHandlingMode());
+    Assert.assertArrayEquals(new long[]{0, 1}, h1.getHistogram());
+    Assert.assertEquals(1, h1.getCount());
+    Assert.assertEquals(12.0, h1.getMin(), 0.01);
+    Assert.assertEquals(12.0, h1.getMax(), 0.01);
+    Assert.assertEquals(0, h1.getMissingValueCount());
+    Assert.assertEquals(0, h1.getLowerOutlierCount());
+    Assert.assertEquals(0, h1.getUpperOutlierCount());
+
+    // Tests when there is a level of indirection in accessing the vector
+    byteBuffer = ByteBuffer.allocate(size * 2);
+    vectorAggregator.init(byteBuffer, positions[0]);
+    vectorAggregator.init(byteBuffer, positions[1]);
+    vectorAggregator.aggregate(byteBuffer, 2, positions, new int[]{2, 3}, 0);
+
+    FixedBucketsHistogram h2 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, 0);
+
+    Assert.assertEquals(2, h2.getNumBuckets());
+    Assert.assertEquals(10.0, h2.getBucketSize(), 0.01);
+    Assert.assertEquals(1, h2.getLowerLimit(), 0.01);
+    Assert.assertEquals(21, h2.getUpperLimit(), 0.01);
+    Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h2.getOutlierHandlingMode());
+    Assert.assertArrayEquals(new long[]{1, 0}, h2.getHistogram());
+    Assert.assertEquals(1, h2.getCount());
+    Assert.assertEquals(3.0, h2.getMin(), 0.01);
+    Assert.assertEquals(3.0, h2.getMax(), 0.01);
+    Assert.assertEquals(0, h2.getMissingValueCount());
+    Assert.assertEquals(0, h2.getLowerOutlierCount());
+    Assert.assertEquals(0, h2.getUpperOutlierCount());
+
+    FixedBucketsHistogram h3 = (FixedBucketsHistogram) vectorAggregator.get(byteBuffer, positions[1]);
+
+    Assert.assertEquals(2, h3.getNumBuckets());
+    Assert.assertEquals(10.0, h3.getBucketSize(), 0.01);
+    Assert.assertEquals(1, h3.getLowerLimit(), 0.01);
+    Assert.assertEquals(21, h3.getUpperLimit(), 0.01);
+    Assert.assertEquals(FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW, h3.getOutlierHandlingMode());
+    Assert.assertArrayEquals(new long[]{0, 1}, h3.getHistogram());
+    Assert.assertEquals(1, h3.getCount());
+    Assert.assertEquals(14.0, h3.getMin(), 0.01);
+    Assert.assertEquals(14.0, h3.getMax(), 0.01);
+    Assert.assertEquals(0, h3.getMissingValueCount());
+    Assert.assertEquals(0, h3.getLowerOutlierCount());
+    Assert.assertEquals(0, h3.getUpperOutlierCount());
+
+  }
+
+  private FixedBucketsHistogramAggregatorFactory buildHistogramAggFactory(String fieldName)
+  {
+    return new FixedBucketsHistogramAggregatorFactory(
+        "fixedHisto",
+        fieldName,
+        2,
+        1,
+        21,
+        FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
+        false
+    );
+  }
+}
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
index 49fd806..beb275f 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
@@ -48,6 +48,7 @@
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@@ -120,6 +121,7 @@
     ApproximateHistogramDruidModule.registerSerde();
     for (Module mod : new ApproximateHistogramDruidModule().getJacksonModules()) {
       CalciteTests.getJsonMapper().registerModule(mod);
+      TestHelper.JSON_MAPPER.registerModule(mod);
     }
 
     final QueryableIndex index = IndexBuilder.create()
diff --git a/website/.spelling b/website/.spelling
index 249db61..14fd910 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -596,6 +596,7 @@
  - ../docs/development/extensions-core/approximate-histograms.md
 approxHistogram
 approxHistogramFold
+fixedBucketsHistogram
 bucketNum
 lowerLimit
 numBuckets