Vectorize the DataSketches quantiles aggregator. (#11183)

* Vectorize the DataSketches quantiles aggregator.

Also removes synchronization for the BufferAggregator and VectorAggregator
implementations, since it is not necessary (similar to #11115).

Extends DoublesSketchAggregatorTest and DoublesSketchSqlAggregatorTest
to run all test cases in vectorized mode.

* Style fix.
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
index 2ea6a2a..27da4a2 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
@@ -32,11 +32,22 @@
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.aggregation.VectorAggregator;
 import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.VectorColumnProcessorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -63,7 +74,8 @@
   public DoublesSketchAggregatorFactory(
       @JsonProperty("name") final String name,
       @JsonProperty("fieldName") final String fieldName,
-      @JsonProperty("k") final Integer k)
+      @JsonProperty("k") final Integer k
+  )
   {
     this(name, fieldName, k, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID);
   }
@@ -106,7 +118,7 @@
   {
     if (metricFactory.getColumnCapabilities(fieldName) != null
         && ValueType.isNumeric(metricFactory.getColumnCapabilities(fieldName).getType())) {
-      final ColumnValueSelector<Double> selector = metricFactory.makeColumnValueSelector(fieldName);
+      final BaseDoubleColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
       if (selector instanceof NilColumnValueSelector) {
         return new NoopDoublesSketchBufferAggregator();
       }
@@ -120,6 +132,65 @@
   }
 
   @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+    return ColumnProcessors.makeVectorProcessor(
+        fieldName,
+        new VectorColumnProcessorFactory<VectorAggregator>()
+        {
+          @Override
+          public VectorAggregator makeSingleValueDimensionProcessor(
+              ColumnCapabilities capabilities,
+              SingleValueDimensionVectorSelector selector
+          )
+          {
+            return new NoopDoublesSketchBufferAggregator();
+          }
+
+          @Override
+          public VectorAggregator makeMultiValueDimensionProcessor(
+              ColumnCapabilities capabilities,
+              MultiValueDimensionVectorSelector selector
+          )
+          {
+            return new NoopDoublesSketchBufferAggregator();
+          }
+
+          @Override
+          public VectorAggregator makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+          {
+            return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
+          }
+
+          @Override
+          public VectorAggregator makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+          {
+            return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
+          }
+
+          @Override
+          public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+          {
+            return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
+          }
+
+          @Override
+          public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
+          {
+            return new DoublesSketchMergeVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
+          }
+        },
+        selectorFactory
+    );
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    return true;
+  }
+
+  @Override
   public Object deserialize(final Object object)
   {
     return DoublesSketchOperations.deserialize(object);
@@ -217,8 +288,9 @@
         new DoublesSketchAggregatorFactory(
             fieldName,
             fieldName,
-            k)
-        );
+            k
+        )
+    );
   }
 
   @Override
@@ -306,10 +378,10 @@
   public String toString()
   {
     return getClass().getSimpleName() + "{"
-        + "name=" + name
-        + ", fieldName=" + fieldName
-        + ", k=" + k
-        + "}";
+           + "name=" + name
+           + ", fieldName=" + fieldName
+           + ", k=" + k
+           + "}";
   }
 
 }
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
index e330310..74be19b 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
@@ -19,60 +19,52 @@
 
 package org.apache.druid.query.aggregation.datasketches.quantiles;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import org.apache.datasketches.memory.WritableMemory;
-import org.apache.datasketches.quantiles.DoublesSketch;
 import org.apache.datasketches.quantiles.UpdateDoublesSketch;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 
+import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.IdentityHashMap;
 
 public class DoublesSketchBuildBufferAggregator implements BufferAggregator
 {
 
-  private final ColumnValueSelector<Double> selector;
-  private final int size;
-  private final int maxIntermediateSize;
+  private final BaseDoubleColumnValueSelector selector;
+  private final DoublesSketchBuildBufferAggregatorHelper helper;
 
-  private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
-  private final IdentityHashMap<ByteBuffer, Int2ObjectMap<UpdateDoublesSketch>> sketches = new IdentityHashMap<>();
-
-  public DoublesSketchBuildBufferAggregator(final ColumnValueSelector<Double> valueSelector, final int size,
-      final int maxIntermediateSize)
+  public DoublesSketchBuildBufferAggregator(
+      final BaseDoubleColumnValueSelector valueSelector,
+      final int size,
+      final int maxIntermediateSize
+  )
   {
     this.selector = valueSelector;
-    this.size = size;
-    this.maxIntermediateSize = maxIntermediateSize;
+    this.helper = new DoublesSketchBuildBufferAggregatorHelper(size, maxIntermediateSize);
   }
 
   @Override
-  public synchronized void init(final ByteBuffer buffer, final int position)
+  public void init(ByteBuffer buf, int position)
   {
-    final WritableMemory mem = getMemory(buffer);
-    final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
-    final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region);
-    putSketch(buffer, position, sketch);
+    helper.init(buf, position);
   }
 
   @Override
-  public synchronized void aggregate(final ByteBuffer buffer, final int position)
+  public void aggregate(final ByteBuffer buffer, final int position)
   {
     if (selector.isNull()) {
       return;
     }
-    final UpdateDoublesSketch sketch = sketches.get(buffer).get(position);
+
+    final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, position);
     sketch.update(selector.getDouble());
   }
 
+  @Nullable
   @Override
-  public synchronized Object get(final ByteBuffer buffer, final int position)
+  public Object get(ByteBuffer buf, int position)
   {
-    return sketches.get(buffer).get(position).compact();
+    return helper.get(buf, position);
   }
 
   @Override
@@ -88,42 +80,17 @@
   }
 
   @Override
-  public synchronized void close()
+  public void close()
   {
-    sketches.clear();
-    memCache.clear();
+    helper.clear();
   }
 
   // A small number of sketches may run out of the given memory, request more memory on heap and move there.
   // In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
   @Override
-  public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
   {
-    UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition);
-    final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
-    if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap
-      final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
-      sketch = UpdateDoublesSketch.wrap(newRegion);
-    }
-    putSketch(newBuffer, newPosition, sketch);
-
-    final Int2ObjectMap<UpdateDoublesSketch> map = sketches.get(oldBuffer);
-    map.remove(oldPosition);
-    if (map.isEmpty()) {
-      sketches.remove(oldBuffer);
-      memCache.remove(oldBuffer);
-    }
-  }
-
-  private WritableMemory getMemory(final ByteBuffer buffer)
-  {
-    return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
-  }
-
-  private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)
-  {
-    Int2ObjectMap<UpdateDoublesSketch> map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
-    map.put(position, sketch);
+    helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
   }
 
   @Override
@@ -131,5 +98,4 @@
   {
     inspector.visit("selector", selector);
   }
-
 }
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java
new file mode 100644
index 0000000..f5286bd
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.datasketches.quantiles.CompactDoublesSketch;
+import org.apache.datasketches.quantiles.DoublesSketch;
+import org.apache.datasketches.quantiles.UpdateDoublesSketch;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.IdentityHashMap;
+
+public class DoublesSketchBuildBufferAggregatorHelper
+{
+  private final int size;
+  private final int maxIntermediateSize;
+  private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
+  private final IdentityHashMap<ByteBuffer, Int2ObjectMap<UpdateDoublesSketch>> sketches = new IdentityHashMap<>();
+
+  public DoublesSketchBuildBufferAggregatorHelper(final int size, final int maxIntermediateSize)
+  {
+    this.size = size;
+    this.maxIntermediateSize = maxIntermediateSize;
+  }
+
+  public void init(final ByteBuffer buffer, final int position)
+  {
+    final WritableMemory mem = getMemory(buffer);
+    final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+    final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region);
+    putSketch(buffer, position, sketch);
+  }
+
+  public CompactDoublesSketch get(final ByteBuffer buffer, final int position)
+  {
+    return sketches.get(buffer).get(position).compact();
+  }
+
+  // A small number of sketches may run out of the given memory, request more memory on heap and move there.
+  // In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+  {
+    UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition);
+    final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
+    if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap
+      final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
+      sketch = UpdateDoublesSketch.wrap(newRegion);
+    }
+    putSketch(newBuffer, newPosition, sketch);
+
+    final Int2ObjectMap<UpdateDoublesSketch> map = sketches.get(oldBuffer);
+    map.remove(oldPosition);
+    if (map.isEmpty()) {
+      sketches.remove(oldBuffer);
+      memCache.remove(oldBuffer);
+    }
+  }
+
+  public void clear()
+  {
+    sketches.clear();
+    memCache.clear();
+  }
+
+  /**
+   * Retrieves the sketch at a particular position.
+   */
+  public UpdateDoublesSketch getSketchAtPosition(final ByteBuffer buf, final int position)
+  {
+    return sketches.get(buf).get(position);
+  }
+
+  private WritableMemory getMemory(final ByteBuffer buffer)
+  {
+    return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
+  }
+
+  private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)
+  {
+    Int2ObjectMap<UpdateDoublesSketch> map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
+    map.put(position, sketch);
+  }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java
new file mode 100644
index 0000000..af29c5b
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import org.apache.datasketches.quantiles.UpdateDoublesSketch;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class DoublesSketchBuildVectorAggregator implements VectorAggregator
+{
+  private final VectorValueSelector selector;
+  private final DoublesSketchBuildBufferAggregatorHelper helper;
+
+  DoublesSketchBuildVectorAggregator(
+      final VectorValueSelector selector,
+      final int size,
+      final int maxIntermediateSize
+  )
+  {
+    this.selector = selector;
+    this.helper = new DoublesSketchBuildBufferAggregatorHelper(size, maxIntermediateSize);
+  }
+
+  @Override
+  public void init(final ByteBuffer buf, final int position)
+  {
+    helper.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+  {
+    final double[] doubles = selector.getDoubleVector();
+    final boolean[] nulls = selector.getNullVector();
+
+    final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buf, position);
+
+    for (int i = startRow; i < endRow; i++) {
+      if (nulls == null || !nulls[i]) {
+        sketch.update(doubles[i]);
+      }
+    }
+  }
+
+  @Override
+  public void aggregate(
+      final ByteBuffer buf,
+      final int numRows,
+      final int[] positions,
+      @Nullable final int[] rows,
+      final int positionOffset
+  )
+  {
+    final double[] doubles = selector.getDoubleVector();
+    final boolean[] nulls = selector.getNullVector();
+
+    for (int i = 0; i < numRows; i++) {
+      final int idx = rows != null ? rows[i] : i;
+
+      if (nulls == null || !nulls[idx]) {
+        final int position = positions[i] + positionOffset;
+        helper.getSketchAtPosition(buf, position).update(doubles[idx]);
+      }
+    }
+  }
+
+  @Override
+  public Object get(final ByteBuffer buf, final int position)
+  {
+    return helper.get(buf, position);
+  }
+
+  @Override
+  public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
+  {
+    helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
+  }
+
+  @Override
+  public void close()
+  {
+    helper.clear();
+  }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
index 41ae453..c788adc 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
@@ -19,57 +19,44 @@
 
 package org.apache.druid.query.aggregation.datasketches.quantiles;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import org.apache.datasketches.memory.WritableMemory;
-import org.apache.datasketches.quantiles.DoublesUnion;
+import org.apache.datasketches.quantiles.DoublesSketch;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.ColumnValueSelector;
 
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.IdentityHashMap;
 
 public class DoublesSketchMergeBufferAggregator implements BufferAggregator
 {
-
-  private final ColumnValueSelector selector;
-  private final int k;
-  private final int maxIntermediateSize;
-  private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
-  private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();
+  private final ColumnValueSelector<DoublesSketch> selector;
+  private final DoublesSketchMergeBufferAggregatorHelper helper;
 
   public DoublesSketchMergeBufferAggregator(
-      final ColumnValueSelector selector,
+      final ColumnValueSelector<DoublesSketch> selector,
       final int k,
-      final int maxIntermediateSize)
+      final int maxIntermediateSize
+  )
   {
     this.selector = selector;
-    this.k = k;
-    this.maxIntermediateSize = maxIntermediateSize;
+    this.helper = new DoublesSketchMergeBufferAggregatorHelper(k, maxIntermediateSize);
   }
 
   @Override
-  public synchronized void init(final ByteBuffer buffer, final int position)
+  public void init(final ByteBuffer buffer, final int position)
   {
-    final WritableMemory mem = getMemory(buffer);
-    final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
-    final DoublesUnion union = DoublesUnion.builder().setMaxK(k).build(region);
-    putUnion(buffer, position, union);
+    helper.init(buffer, position);
   }
 
   @Override
-  public synchronized void aggregate(final ByteBuffer buffer, final int position)
+  public void aggregate(final ByteBuffer buffer, final int position)
   {
-    final DoublesUnion union = unions.get(buffer).get(position);
-    DoublesSketchMergeAggregator.updateUnion(selector, union);
+    DoublesSketchMergeAggregator.updateUnion(selector, helper.getSketchAtPosition(buffer, position));
   }
 
   @Override
-  public synchronized Object get(final ByteBuffer buffer, final int position)
+  public Object get(final ByteBuffer buffer, final int position)
   {
-    return unions.get(buffer).get(position).getResult();
+    return helper.getSketchAtPosition(buffer, position).getResult();
   }
 
   @Override
@@ -87,8 +74,7 @@
   @Override
   public synchronized void close()
   {
-    unions.clear();
-    memCache.clear();
+    helper.clear();
   }
 
   // A small number of sketches may run out of the given memory, request more memory on heap and move there.
@@ -96,31 +82,7 @@
   @Override
   public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
   {
-    DoublesUnion union = unions.get(oldBuffer).get(oldPosition);
-    final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
-    if (union.isSameResource(oldMem)) { // union was not relocated on heap
-      final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
-      union = DoublesUnion.wrap(newMem);
-    }
-    putUnion(newBuffer, newPosition, union);
-
-    Int2ObjectMap<DoublesUnion> map = unions.get(oldBuffer);
-    map.remove(oldPosition);
-    if (map.isEmpty()) {
-      unions.remove(oldBuffer);
-      memCache.remove(oldBuffer);
-    }
-  }
-
-  private WritableMemory getMemory(final ByteBuffer buffer)
-  {
-    return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
-  }
-
-  private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union)
-  {
-    Int2ObjectMap<DoublesUnion> map = unions.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
-    map.put(position, union);
+    helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
   }
 
   @Override
@@ -128,5 +90,4 @@
   {
     inspector.visit("selector", selector);
   }
-
 }
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java
new file mode 100644
index 0000000..378e41d
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.datasketches.quantiles.DoublesUnion;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.IdentityHashMap;
+
+public class DoublesSketchMergeBufferAggregatorHelper
+{
+  private final int k;
+  private final int maxIntermediateSize;
+  private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
+  private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();
+
+  public DoublesSketchMergeBufferAggregatorHelper(
+      final int k,
+      final int maxIntermediateSize
+  )
+  {
+    this.k = k;
+    this.maxIntermediateSize = maxIntermediateSize;
+  }
+
+  public void init(final ByteBuffer buffer, final int position)
+  {
+    final WritableMemory mem = getMemory(buffer);
+    final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+    final DoublesUnion union = DoublesUnion.builder().setMaxK(k).build(region);
+    putUnion(buffer, position, union);
+  }
+
+  public Object get(final ByteBuffer buffer, final int position)
+  {
+    return unions.get(buffer).get(position).getResult();
+  }
+
+  public void clear()
+  {
+    unions.clear();
+    memCache.clear();
+  }
+
+  // A small number of sketches may run out of the given memory, request more memory on heap and move there.
+  // In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+  {
+    DoublesUnion union = unions.get(oldBuffer).get(oldPosition);
+    final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
+    if (union.isSameResource(oldMem)) { // union was not relocated on heap
+      final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
+      union = DoublesUnion.wrap(newMem);
+    }
+    putUnion(newBuffer, newPosition, union);
+
+    Int2ObjectMap<DoublesUnion> map = unions.get(oldBuffer);
+    map.remove(oldPosition);
+    if (map.isEmpty()) {
+      unions.remove(oldBuffer);
+      memCache.remove(oldBuffer);
+    }
+  }
+
+  /**
+   * Retrieves the sketch at a particular position.
+   */
+  public DoublesUnion getSketchAtPosition(final ByteBuffer buf, final int position)
+  {
+    return unions.get(buf).get(position);
+  }
+
+  private WritableMemory getMemory(final ByteBuffer buffer)
+  {
+    return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
+  }
+
+  private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union)
+  {
+    Int2ObjectMap<DoublesUnion> map = unions.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
+    map.put(position, union);
+  }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java
new file mode 100644
index 0000000..8a8e10b
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import org.apache.datasketches.quantiles.DoublesSketch;
+import org.apache.datasketches.quantiles.DoublesUnion;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class DoublesSketchMergeVectorAggregator implements VectorAggregator
+{
+  private final VectorObjectSelector selector;
+  private final DoublesSketchMergeBufferAggregatorHelper helper;
+
+  public DoublesSketchMergeVectorAggregator(
+      final VectorObjectSelector selector,
+      final int k,
+      final int maxIntermediateSize
+  )
+  {
+    this.selector = selector;
+    this.helper = new DoublesSketchMergeBufferAggregatorHelper(k, maxIntermediateSize);
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    helper.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+  {
+    final Object[] vector = selector.getObjectVector();
+
+    final DoublesUnion union = helper.getSketchAtPosition(buf, position);
+
+    for (int i = startRow; i < endRow; i++) {
+      final DoublesSketch sketch = (DoublesSketch) vector[i];
+      if (sketch != null) {
+        union.update(sketch);
+      }
+    }
+  }
+
+  @Override
+  public void aggregate(
+      final ByteBuffer buf,
+      final int numRows,
+      final int[] positions,
+      @Nullable final int[] rows,
+      final int positionOffset
+  )
+  {
+    final Object[] vector = selector.getObjectVector();
+
+    for (int i = 0; i < numRows; i++) {
+      final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i];
+
+      if (sketch != null) {
+        final int position = positions[i] + positionOffset;
+        final DoublesUnion union = helper.getSketchAtPosition(buf, position);
+        union.update(sketch);
+      }
+    }
+  }
+
+  @Nullable
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return helper.get(buf, position);
+  }
+
+  @Override
+  public void close()
+  {
+    helper.clear();
+  }
+
+  @Override
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+  {
+    helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
+  }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java
index cbb40c4..e30fb9b 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java
@@ -41,8 +41,9 @@
       return (DoublesSketch) serializedSketch;
     }
     throw new ISE(
-        "Object is not of a type that can be deserialized to a quantiles DoublsSketch: "
-            + serializedSketch.getClass());
+        "Object is not of a type that can be deserialized to a quantiles DoublesSketch: %s",
+        serializedSketch == null ? "null" : serializedSketch.getClass()
+    );
   }
 
   public static DoublesSketch deserializeFromBase64EncodedString(final String str)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java
index 5f7808c..2a0a40b 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java
@@ -20,20 +20,42 @@
 package org.apache.druid.query.aggregation.datasketches.quantiles;
 
 import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 
+import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 
-public class NoopDoublesSketchBufferAggregator implements BufferAggregator
+public class NoopDoublesSketchBufferAggregator implements BufferAggregator, VectorAggregator
 {
   @Override
   public void init(final ByteBuffer buf, final int position)
   {
+    // Nothing to do.
   }
 
   @Override
   public void aggregate(final ByteBuffer buf, final int position)
   {
+    // Nothing to do.
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    // Nothing to do.
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    // Nothing to do.
   }
 
   @Override
@@ -55,12 +77,20 @@
   }
 
   @Override
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+  {
+    // Nothing to do.
+  }
+
+  @Override
   public void close()
   {
+    // Nothing to do.
   }
 
   @Override
   public void inspectRuntimeShape(final RuntimeShapeInspector inspector)
   {
+    // Nothing to do.
   }
 }
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
index c0a28d3..8152369 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
@@ -20,10 +20,12 @@
 package org.apache.druid.query.aggregation.datasketches.quantiles;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.AggregationTestHelper;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
@@ -54,24 +56,29 @@
   @Rule
   public final TemporaryFolder tempFolder = new TemporaryFolder();
 
-  public DoublesSketchAggregatorTest(final GroupByQueryConfig config)
+  public DoublesSketchAggregatorTest(final GroupByQueryConfig config, final String vectorize)
   {
     DoublesSketchModule.registerSerde();
     DoublesSketchModule module = new DoublesSketchModule();
     helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
-        module.getJacksonModules(), config, tempFolder);
+        module.getJacksonModules(),
+        config,
+        tempFolder
+    ).withQueryContext(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize));
     timeSeriesHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
         module.getJacksonModules(),
         tempFolder
-    );
+    ).withQueryContext(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize));
   }
 
-  @Parameterized.Parameters(name = "{0}")
+  @Parameterized.Parameters(name = "groupByConfig = {0}, vectorize = {1}")
   public static Collection<?> constructorFeeder()
   {
     final List<Object[]> constructors = new ArrayList<>();
     for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
-      constructors.add(new Object[]{config});
+      for (String vectorize : new String[]{"false", "true", "force"}) {
+        constructors.add(new Object[]{config, vectorize});
+      }
     }
     return constructors;
   }
@@ -381,7 +388,11 @@
     // post agg with nulls
     Object quantileObjectWithNulls = row.get(5);
     Assert.assertTrue(quantileObjectWithNulls instanceof Double);
-    Assert.assertEquals(NullHandling.replaceWithDefault() ? 7.4 : 7.5, (double) quantileObjectWithNulls, 0.1); // median value
+    Assert.assertEquals(
+        NullHandling.replaceWithDefault() ? 7.4 : 7.5,
+        (double) quantileObjectWithNulls,
+        0.1
+    ); // median value
 
     // post agg with nulls
     Object quantilesObjectWithNulls = row.get(6);
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
index c6c8e13..647f300 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
@@ -23,12 +23,10 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.Druids;
-import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -70,16 +68,15 @@
 import org.apache.druid.sql.http.SqlParameter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
 {
-  private static final AuthenticationResult AUTH_RESULT = CalciteTests.REGULAR_USER_AUTH_RESULT;
   private static final DruidOperatorTable OPERATOR_TABLE = new DruidOperatorTable(
       ImmutableSet.of(
           new DoublesSketchApproxQuantileSqlAggregator(),
@@ -172,166 +169,136 @@
   @Test
   public void testQuantileOnFloatAndLongs() throws Exception
   {
-    SqlLifecycle sqlLifecycle = getSqlLifecycle();
-    final String sql = "SELECT\n"
-                       + "APPROX_QUANTILE_DS(m1, 0.01),\n"
-                       + "APPROX_QUANTILE_DS(m1, 0.5, 64),\n"
-                       + "APPROX_QUANTILE_DS(m1, 0.98, 256),\n"
-                       + "APPROX_QUANTILE_DS(m1, 0.99),\n"
-                       + "APPROX_QUANTILE_DS(m1 * 2, 0.97),\n"
-                       + "APPROX_QUANTILE_DS(m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
-                       + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
-                       + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 = 'abc'),\n"
-                       + "APPROX_QUANTILE_DS(cnt, 0.5)\n"
-                       + "FROM foo";
-
-    // Verify results
-    final List<Object[]> results = sqlLifecycle.runSimple(
-        sql,
-        TIMESERIES_CONTEXT_DEFAULT,
-        DEFAULT_PARAMETERS,
-        AUTH_RESULT
-    ).toList();
-    final List<Object[]> expectedResults = ImmutableList.of(
-        new Object[]{
-            1.0,
-            4.0,
-            6.0,
-            6.0,
-            12.0,
-            6.0,
-            5.0,
-            6.0,
-            1.0
-        }
-    );
-    Assert.assertEquals(expectedResults.size(), results.size());
-    for (int i = 0; i < expectedResults.size(); i++) {
-      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
-    }
-
-    // Verify query
-    Assert.assertEquals(
-        Druids.newTimeseriesQueryBuilder()
-              .dataSource(CalciteTests.DATASOURCE1)
-              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
-              .granularity(Granularities.ALL)
-              .virtualColumns(
-                  new ExpressionVirtualColumn(
-                      "v0",
-                      "(\"m1\" * 2)",
-                      ValueType.FLOAT,
-                      TestExprMacroTable.INSTANCE
+    testQuery(
+        "SELECT\n"
+        + "APPROX_QUANTILE_DS(m1, 0.01),\n"
+        + "APPROX_QUANTILE_DS(m1, 0.5, 64),\n"
+        + "APPROX_QUANTILE_DS(m1, 0.98, 256),\n"
+        + "APPROX_QUANTILE_DS(m1, 0.99),\n"
+        + "APPROX_QUANTILE_DS(m1 * 2, 0.97),\n"
+        + "APPROX_QUANTILE_DS(m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
+        + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
+        + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 = 'abc'),\n"
+        + "APPROX_QUANTILE_DS(cnt, 0.5)\n"
+        + "FROM foo",
+        Collections.singletonList(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(CalciteTests.DATASOURCE1)
+                  .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                  .granularity(Granularities.ALL)
+                  .virtualColumns(
+                      new ExpressionVirtualColumn(
+                          "v0",
+                          "(\"m1\" * 2)",
+                          ValueType.FLOAT,
+                          TestExprMacroTable.INSTANCE
+                      )
                   )
-              )
-              .aggregators(ImmutableList.of(
-                  new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
-                  new DoublesSketchAggregatorFactory("a1:agg", "m1", 64),
-                  new DoublesSketchAggregatorFactory("a2:agg", "m1", 256),
-                  new DoublesSketchAggregatorFactory("a4:agg", "v0", null),
-                  new FilteredAggregatorFactory(
-                      new DoublesSketchAggregatorFactory("a5:agg", "m1", null),
-                      new SelectorDimFilter("dim1", "abc", null)
-                  ),
-                  new FilteredAggregatorFactory(
-                      new DoublesSketchAggregatorFactory("a6:agg", "m1", null),
-                      new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
-                  ),
-                  new DoublesSketchAggregatorFactory("a8:agg", "cnt", null)
-              ))
-              .postAggregators(
-                  new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
-                  new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
-                  new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
-                  new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
-                  new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f),
-                  new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f),
-                  new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f),
-                  new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f),
-                  new DoublesSketchToQuantilePostAggregator("a8", makeFieldAccessPostAgg("a8:agg"), 0.50f)
-              )
-              .context(TIMESERIES_CONTEXT_DEFAULT)
-              .build(),
-        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+                  .aggregators(ImmutableList.of(
+                      new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
+                      new DoublesSketchAggregatorFactory("a1:agg", "m1", 64),
+                      new DoublesSketchAggregatorFactory("a2:agg", "m1", 256),
+                      new DoublesSketchAggregatorFactory("a4:agg", "v0", null),
+                      new FilteredAggregatorFactory(
+                          new DoublesSketchAggregatorFactory("a5:agg", "m1", null),
+                          new SelectorDimFilter("dim1", "abc", null)
+                      ),
+                      new FilteredAggregatorFactory(
+                          new DoublesSketchAggregatorFactory("a6:agg", "m1", null),
+                          new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
+                      ),
+                      new DoublesSketchAggregatorFactory("a8:agg", "cnt", null)
+                  ))
+                  .postAggregators(
+                      new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
+                      new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
+                      new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
+                      new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
+                      new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f),
+                      new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f),
+                      new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f),
+                      new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f),
+                      new DoublesSketchToQuantilePostAggregator("a8", makeFieldAccessPostAgg("a8:agg"), 0.50f)
+                  )
+                  .context(TIMESERIES_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{
+                1.0,
+                4.0,
+                6.0,
+                6.0,
+                12.0,
+                6.0,
+                5.0,
+                6.0,
+                1.0
+            }
+        )
     );
   }
 
   @Test
   public void testQuantileOnComplexColumn() throws Exception
   {
-    SqlLifecycle lifecycle = getSqlLifecycle();
-    final String sql = "SELECT\n"
-                       + "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n"
-                       + "APPROX_QUANTILE_DS(qsketch_m1, 0.5, 64),\n"
-                       + "APPROX_QUANTILE_DS(qsketch_m1, 0.98, 256),\n"
-                       + "APPROX_QUANTILE_DS(qsketch_m1, 0.99),\n"
-                       + "APPROX_QUANTILE_DS(qsketch_m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
-                       + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
-                       + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n"
-                       + "FROM foo";
-
-    // Verify results
-    final List<Object[]> results = lifecycle.runSimple(
-        sql,
-        TIMESERIES_CONTEXT_DEFAULT,
-        DEFAULT_PARAMETERS,
-        AUTH_RESULT
-    ).toList();
-    final List<Object[]> expectedResults = ImmutableList.of(
-        new Object[]{
-            1.0,
-            4.0,
-            6.0,
-            6.0,
-            6.0,
-            5.0,
-            6.0
-        }
-    );
-    Assert.assertEquals(expectedResults.size(), results.size());
-    for (int i = 0; i < expectedResults.size(); i++) {
-      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
-    }
-
-    // Verify query
-    Assert.assertEquals(
-        Druids.newTimeseriesQueryBuilder()
-              .dataSource(CalciteTests.DATASOURCE1)
-              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
-              .granularity(Granularities.ALL)
-              .aggregators(ImmutableList.of(
-                  new DoublesSketchAggregatorFactory("a0:agg", "qsketch_m1", null),
-                  new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", 64),
-                  new DoublesSketchAggregatorFactory("a2:agg", "qsketch_m1", 256),
-                  new FilteredAggregatorFactory(
-                      new DoublesSketchAggregatorFactory("a4:agg", "qsketch_m1", null),
-                      new SelectorDimFilter("dim1", "abc", null)
-                  ),
-                  new FilteredAggregatorFactory(
-                      new DoublesSketchAggregatorFactory("a5:agg", "qsketch_m1", null),
-                      new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
+    testQuery(
+        "SELECT\n"
+        + "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n"
+        + "APPROX_QUANTILE_DS(qsketch_m1, 0.5, 64),\n"
+        + "APPROX_QUANTILE_DS(qsketch_m1, 0.98, 256),\n"
+        + "APPROX_QUANTILE_DS(qsketch_m1, 0.99),\n"
+        + "APPROX_QUANTILE_DS(qsketch_m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
+        + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
+        + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n"
+        + "FROM foo",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(CalciteTests.DATASOURCE1)
+                  .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                  .granularity(Granularities.ALL)
+                  .aggregators(ImmutableList.of(
+                      new DoublesSketchAggregatorFactory("a0:agg", "qsketch_m1", null),
+                      new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", 64),
+                      new DoublesSketchAggregatorFactory("a2:agg", "qsketch_m1", 256),
+                      new FilteredAggregatorFactory(
+                          new DoublesSketchAggregatorFactory("a4:agg", "qsketch_m1", null),
+                          new SelectorDimFilter("dim1", "abc", null)
+                      ),
+                      new FilteredAggregatorFactory(
+                          new DoublesSketchAggregatorFactory("a5:agg", "qsketch_m1", null),
+                          new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
+                      )
+                  ))
+                  .postAggregators(
+                      new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
+                      new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
+                      new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
+                      new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
+                      new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.99f),
+                      new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.999f),
+                      new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a4:agg"), 0.999f)
                   )
-              ))
-              .postAggregators(
-                  new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
-                  new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
-                  new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
-                  new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
-                  new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.99f),
-                  new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.999f),
-                  new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a4:agg"), 0.999f)
-              )
-              .context(TIMESERIES_CONTEXT_DEFAULT)
-              .build(),
-        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+                  .context(TIMESERIES_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{
+                1.0,
+                4.0,
+                6.0,
+                6.0,
+                6.0,
+                5.0,
+                6.0
+            }
+        )
     );
   }
 
   @Test
   public void testQuantileOnCastedString() throws Exception
   {
-    cannotVectorize();
-
     final List<Object[]> expectedResults;
     if (NullHandling.replaceWithDefault()) {
       expectedResults = ImmutableList.of(
@@ -425,92 +392,72 @@
   @Test
   public void testQuantileOnInnerQuery() throws Exception
   {
-    SqlLifecycle sqlLifecycle = getSqlLifecycle();
-    final String sql = "SELECT AVG(x), APPROX_QUANTILE_DS(x, 0.98)\n"
-                       + "FROM (SELECT dim2, SUM(m1) AS x FROM foo GROUP BY dim2)";
-
-    // Verify results
-    final List<Object[]> results = sqlLifecycle.runSimple(
-        sql,
-        QUERY_CONTEXT_DEFAULT,
-        DEFAULT_PARAMETERS,
-        AUTH_RESULT
-    ).toList();
     final List<Object[]> expectedResults;
     if (NullHandling.replaceWithDefault()) {
       expectedResults = ImmutableList.of(new Object[]{7.0, 11.0});
     } else {
       expectedResults = ImmutableList.of(new Object[]{5.25, 8.0});
     }
-    Assert.assertEquals(expectedResults.size(), results.size());
-    for (int i = 0; i < expectedResults.size(); i++) {
-      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
-    }
 
-    // Verify query
-    Assert.assertEquals(
-        GroupByQuery.builder()
-                    .setDataSource(
-                        new QueryDataSource(
-                            GroupByQuery.builder()
-                                        .setDataSource(CalciteTests.DATASOURCE1)
-                                        .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
-                                        .setGranularity(Granularities.ALL)
-                                        .setDimensions(new DefaultDimensionSpec("dim2", "d0"))
-                                        .setAggregatorSpecs(
-                                            ImmutableList.of(
-                                                new DoubleSumAggregatorFactory("a0", "m1")
+    testQuery(
+        "SELECT AVG(x), APPROX_QUANTILE_DS(x, 0.98)\n"
+        + "FROM (SELECT dim2, SUM(m1) AS x FROM foo GROUP BY dim2)",
+        Collections.singletonList(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            new QueryDataSource(
+                                GroupByQuery.builder()
+                                            .setDataSource(CalciteTests.DATASOURCE1)
+                                            .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                                            .setGranularity(Granularities.ALL)
+                                            .setDimensions(new DefaultDimensionSpec("dim2", "d0"))
+                                            .setAggregatorSpecs(
+                                                ImmutableList.of(
+                                                    new DoubleSumAggregatorFactory("a0", "m1")
+                                                )
                                             )
-                                        )
-                                        .setContext(QUERY_CONTEXT_DEFAULT)
-                                        .build()
+                                            .setContext(QUERY_CONTEXT_DEFAULT)
+                                            .build()
+                            )
                         )
-                    )
-                    .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
-                    .setGranularity(Granularities.ALL)
-                    .setAggregatorSpecs(
-                        new DoubleSumAggregatorFactory("_a0:sum", "a0"),
-                        new CountAggregatorFactory("_a0:count"),
-                        new DoublesSketchAggregatorFactory(
-                            "_a1:agg",
-                            "a0",
-                            null
+                        .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(
+                            new DoubleSumAggregatorFactory("_a0:sum", "a0"),
+                            new CountAggregatorFactory("_a0:count"),
+                            new DoublesSketchAggregatorFactory(
+                                "_a1:agg",
+                                "a0",
+                                null
+                            )
                         )
-                    )
-                    .setPostAggregatorSpecs(
-                        ImmutableList.of(
-                            new ArithmeticPostAggregator(
-                                "_a0",
-                                "quotient",
-                                ImmutableList.of(
-                                    new FieldAccessPostAggregator(null, "_a0:sum"),
-                                    new FieldAccessPostAggregator(null, "_a0:count")
+                        .setPostAggregatorSpecs(
+                            ImmutableList.of(
+                                new ArithmeticPostAggregator(
+                                    "_a0",
+                                    "quotient",
+                                    ImmutableList.of(
+                                        new FieldAccessPostAggregator(null, "_a0:sum"),
+                                        new FieldAccessPostAggregator(null, "_a0:count")
+                                    )
+                                ),
+                                new DoublesSketchToQuantilePostAggregator(
+                                    "_a1",
+                                    makeFieldAccessPostAgg("_a1:agg"),
+                                    0.98f
                                 )
-                            ),
-                            new DoublesSketchToQuantilePostAggregator("_a1", makeFieldAccessPostAgg("_a1:agg"), 0.98f)
+                            )
                         )
-                    )
-                    .setContext(QUERY_CONTEXT_DEFAULT)
-                    .build(),
-        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        expectedResults
     );
   }
 
   @Test
   public void testQuantileOnInnerQuantileQuery() throws Exception
   {
-    SqlLifecycle sqlLifecycle = getSqlLifecycle();
-    final String sql = "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n"
-                       + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1";
-
-
-    final List<Object[]> results = sqlLifecycle.runSimple(
-        sql,
-        QUERY_CONTEXT_DEFAULT,
-        DEFAULT_PARAMETERS,
-        AUTH_RESULT
-    ).toList();
-
     ImmutableList.Builder<Object[]> builder = ImmutableList.builder();
     builder.add(new Object[]{"", 1.0});
     builder.add(new Object[]{"1", 4.0});
@@ -519,296 +466,292 @@
     builder.add(new Object[]{"abc", 6.0});
     builder.add(new Object[]{"def", 5.0});
     final List<Object[]> expectedResults = builder.build();
-    Assert.assertEquals(expectedResults.size(), results.size());
-    for (int i = 0; i < expectedResults.size(); i++) {
-      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
-    }
 
-    // Verify query
-    Assert.assertEquals(
-        GroupByQuery.builder()
-                    .setDataSource(
-                        new QueryDataSource(
-                            GroupByQuery.builder()
-                                        .setDataSource(CalciteTests.DATASOURCE1)
-                                        .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
-                                        .setGranularity(Granularities.ALL)
-                                        .setDimensions(
-                                            new DefaultDimensionSpec("dim1", "d0"),
-                                            new DefaultDimensionSpec("dim2", "d1")
-                                        )
-                                        .setAggregatorSpecs(
-                                            ImmutableList.of(
-                                                new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
+    testQuery(
+        "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n"
+        + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1",
+        Collections.singletonList(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            new QueryDataSource(
+                                GroupByQuery.builder()
+                                            .setDataSource(CalciteTests.DATASOURCE1)
+                                            .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                                            .setGranularity(Granularities.ALL)
+                                            .setDimensions(
+                                                new DefaultDimensionSpec("dim1", "d0"),
+                                                new DefaultDimensionSpec("dim2", "d1")
                                             )
-                                        )
-                                        .setPostAggregatorSpecs(
-                                            ImmutableList.of(
-                                                new DoublesSketchToQuantilePostAggregator(
-                                                    "a0",
-                                                    makeFieldAccessPostAgg("a0:agg"),
-                                                    0.5f
+                                            .setAggregatorSpecs(
+                                                ImmutableList.of(
+                                                    new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
                                                 )
                                             )
-                                        )
-                                        .setContext(QUERY_CONTEXT_DEFAULT)
-                                        .build()
+                                            .setPostAggregatorSpecs(
+                                                ImmutableList.of(
+                                                    new DoublesSketchToQuantilePostAggregator(
+                                                        "a0",
+                                                        makeFieldAccessPostAgg("a0:agg"),
+                                                        0.5f
+                                                    )
+                                                )
+                                            )
+                                            .setContext(QUERY_CONTEXT_DEFAULT)
+                                            .build()
+                            )
                         )
-                    )
-                    .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
-                    .setGranularity(Granularities.ALL)
-                    .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))
-                    .setAggregatorSpecs(
-                        new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)
-                    )
-                    .setPostAggregatorSpecs(
-                        ImmutableList.of(
-                            new DoublesSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.5f)
+                        .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                        .setGranularity(Granularities.ALL)
+                        .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))
+                        .setAggregatorSpecs(
+                            new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)
                         )
-                    )
-                    .setContext(QUERY_CONTEXT_DEFAULT)
-                    .build(),
-        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+                        .setPostAggregatorSpecs(
+                            ImmutableList.of(
+                                new DoublesSketchToQuantilePostAggregator(
+                                    "_a0",
+                                    makeFieldAccessPostAgg("_a0:agg"),
+                                    0.5f
+                                )
+                            )
+                        )
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        expectedResults
     );
   }
 
   @Test
   public void testDoublesSketchPostAggs() throws Exception
   {
-    SqlLifecycle sqlLifecycle = getSqlLifecycle();
-    final String sql = "SELECT\n"
-                       + "  SUM(cnt),\n"
-                       + "  APPROX_QUANTILE_DS(cnt, 0.5) + 1,\n"
-                       + "  DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5) + 1000,\n"
-                       + "  DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt + 123), 0.5) + 1000,\n"
-                       + "  ABS(DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5)),\n"
-                       + "  DS_GET_QUANTILES(DS_QUANTILES_SKETCH(cnt), 0.5, 0.8),\n"
-                       + "  DS_HISTOGRAM(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
-                       + "  DS_RANK(DS_QUANTILES_SKETCH(cnt), 3),\n"
-                       + "  DS_CDF(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
-                       + "  DS_QUANTILE_SUMMARY(DS_QUANTILES_SKETCH(cnt))\n"
-                       + "FROM foo";
-
-    // Verify results
-    final List<Object[]> results = sqlLifecycle.runSimple(
-        sql,
-        TIMESERIES_CONTEXT_DEFAULT,
-        DEFAULT_PARAMETERS,
-        AUTH_RESULT
-    ).toList();
-    final List<Object[]> expectedResults = ImmutableList.of(
-        new Object[]{
-            6L,
-            2.0d,
-            1001.0d,
-            1124.0d,
-            1.0d,
-            "[1.0,1.0]",
-            "[0.0,0.0,6.0]",
-            1.0d,
-            "[0.0,0.0,1.0]",
-            "\n"
-              + "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n"
-              + "   Empty                        : false\n"
-              + "   Direct, Capacity bytes       : false, \n"
-              + "   Estimation Mode              : false\n"
-              + "   K                            : 128\n"
-              + "   N                            : 6\n"
-              + "   Levels (Needed, Total, Valid): 0, 0, 0\n"
-              + "   Level Bit Pattern            : 0\n"
-              + "   BaseBufferCount              : 6\n"
-              + "   Combined Buffer Capacity     : 8\n"
-              + "   Retained Items               : 6\n"
-              + "   Compact Storage Bytes        : 80\n"
-              + "   Updatable Storage Bytes      : 96\n"
-              + "   Normalized Rank Error        : 1.406%\n"
-              + "   Normalized Rank Error (PMF)  : 1.711%\n"
-              + "   Min Value                    : 1.000000e+00\n"
-              + "   Max Value                    : 1.000000e+00\n"
-              + "### END SKETCH SUMMARY\n"
-        }
+    testQuery(
+        "SELECT\n"
+        + "  SUM(cnt),\n"
+        + "  APPROX_QUANTILE_DS(cnt, 0.5) + 1,\n"
+        + "  DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5) + 1000,\n"
+        + "  DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt + 123), 0.5) + 1000,\n"
+        + "  ABS(DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5)),\n"
+        + "  DS_GET_QUANTILES(DS_QUANTILES_SKETCH(cnt), 0.5, 0.8),\n"
+        + "  DS_HISTOGRAM(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
+        + "  DS_RANK(DS_QUANTILES_SKETCH(cnt), 3),\n"
+        + "  DS_CDF(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
+        + "  -- The nonvectorized query uses a regular Aggregator, and the vectorized query uses a buffer-based\n"
+        + "  -- VectorAggregator. The buffer-based aggregators return HeapCompactDoublesSketch instead of\n"
+        + "  -- HeapUpdateDoublesSketch since they must make a copy out of the buffer before returning something.\n"
+        + "  -- Use REPLACE to normalize summaries.\n"
+        + "  REPLACE("
+        + "    REPLACE("
+        + "      DS_QUANTILE_SUMMARY(DS_QUANTILES_SKETCH(cnt)),"
+        + "      'HeapCompactDoublesSketch',"
+        + "      'HeapUpdateDoublesSketch'"
+        + "    ),"
+        + "    'Combined Buffer Capacity     : 6',"
+        + "    'Combined Buffer Capacity     : 8'"
+        + "  )\n"
+        + "FROM foo",
+        Collections.singletonList(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(CalciteTests.DATASOURCE1)
+                  .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                  .granularity(Granularities.ALL)
+                  .virtualColumns(
+                      new ExpressionVirtualColumn(
+                          "v0",
+                          "(\"cnt\" + 123)",
+                          ValueType.FLOAT,
+                          TestExprMacroTable.INSTANCE
+                      )
+                  )
+                  .aggregators(ImmutableList.of(
+                      new LongSumAggregatorFactory("a0", "cnt"),
+                      new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128),
+                      new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128),
+                      new DoublesSketchAggregatorFactory("a3:agg", "v0", 128)
+                  ))
+                  .postAggregators(
+                      new DoublesSketchToQuantilePostAggregator(
+                          "a1",
+                          makeFieldAccessPostAgg("a1:agg"),
+                          0.5f
+                      ),
+                      new ExpressionPostAggregator(
+                          "p0",
+                          "(\"a1\" + 1)",
+                          null,
+                          TestExprMacroTable.INSTANCE
+                      ),
+                      new DoublesSketchToQuantilePostAggregator(
+                          "p2",
+                          new FieldAccessPostAggregator(
+                              "p1",
+                              "a2:agg"
+                          ),
+                          0.5f
+                      ),
+                      new ExpressionPostAggregator(
+                          "p3",
+                          "(p2 + 1000)",
+                          null,
+                          TestExprMacroTable.INSTANCE
+                      ),
+                      new DoublesSketchToQuantilePostAggregator(
+                          "p5",
+                          new FieldAccessPostAggregator(
+                              "p4",
+                              "a3:agg"
+                          ),
+                          0.5f
+                      ),
+                      new ExpressionPostAggregator(
+                          "p6",
+                          "(p5 + 1000)",
+                          null,
+                          TestExprMacroTable.INSTANCE
+                      ),
+                      new DoublesSketchToQuantilePostAggregator(
+                          "p8",
+                          new FieldAccessPostAggregator(
+                              "p7",
+                              "a2:agg"
+                          ),
+                          0.5f
+                      ),
+                      new ExpressionPostAggregator("p9", "abs(p8)", null, TestExprMacroTable.INSTANCE),
+                      new DoublesSketchToQuantilesPostAggregator(
+                          "p11",
+                          new FieldAccessPostAggregator(
+                              "p10",
+                              "a2:agg"
+                          ),
+                          new double[]{0.5d, 0.8d}
+                      ),
+                      new DoublesSketchToHistogramPostAggregator(
+                          "p13",
+                          new FieldAccessPostAggregator(
+                              "p12",
+                              "a2:agg"
+                          ),
+                          new double[]{0.2d, 0.6d},
+                          null
+                      ),
+                      new DoublesSketchToRankPostAggregator(
+                          "p15",
+                          new FieldAccessPostAggregator(
+                              "p14",
+                              "a2:agg"
+                          ),
+                          3.0d
+                      ),
+                      new DoublesSketchToCDFPostAggregator(
+                          "p17",
+                          new FieldAccessPostAggregator(
+                              "p16",
+                              "a2:agg"
+                          ),
+                          new double[]{0.2d, 0.6d}
+                      ),
+                      new DoublesSketchToStringPostAggregator(
+                          "p19",
+                          new FieldAccessPostAggregator(
+                              "p18",
+                              "a2:agg"
+                          )
+                      ),
+                      new ExpressionPostAggregator(
+                          "p20",
+                          "replace(replace(p19,'HeapCompactDoublesSketch','HeapUpdateDoublesSketch'),"
+                          + "'Combined Buffer Capacity     : 6',"
+                          + "'Combined Buffer Capacity     : 8')",
+                          null,
+                          ExprMacroTable.nil()
+                      )
+                  )
+                  .context(TIMESERIES_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{
+                6L,
+                2.0d,
+                1001.0d,
+                1124.0d,
+                1.0d,
+                "[1.0,1.0]",
+                "[0.0,0.0,6.0]",
+                1.0d,
+                "[0.0,0.0,1.0]",
+                "\n"
+                  + "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n"
+                  + "   Empty                        : false\n"
+                  + "   Direct, Capacity bytes       : false, \n"
+                  + "   Estimation Mode              : false\n"
+                  + "   K                            : 128\n"
+                  + "   N                            : 6\n"
+                  + "   Levels (Needed, Total, Valid): 0, 0, 0\n"
+                  + "   Level Bit Pattern            : 0\n"
+                  + "   BaseBufferCount              : 6\n"
+                  + "   Combined Buffer Capacity     : 8\n"
+                  + "   Retained Items               : 6\n"
+                  + "   Compact Storage Bytes        : 80\n"
+                  + "   Updatable Storage Bytes      : 96\n"
+                  + "   Normalized Rank Error        : 1.406%\n"
+                  + "   Normalized Rank Error (PMF)  : 1.711%\n"
+                  + "   Min Value                    : 1.000000e+00\n"
+                  + "   Max Value                    : 1.000000e+00\n"
+                  + "### END SKETCH SUMMARY\n"
+            }
+        )
     );
-    Assert.assertEquals(expectedResults.size(), results.size());
-    for (int i = 0; i < expectedResults.size(); i++) {
-      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
-    }
-
-    Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
-    Query expectedQuery = Druids.newTimeseriesQueryBuilder()
-                                .dataSource(CalciteTests.DATASOURCE1)
-                                .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
-                                .granularity(Granularities.ALL)
-                                .virtualColumns(
-                                    new ExpressionVirtualColumn(
-                                        "v0",
-                                        "(\"cnt\" + 123)",
-                                        ValueType.FLOAT,
-                                        TestExprMacroTable.INSTANCE
-                                    )
-                                )
-                                .aggregators(ImmutableList.of(
-                                    new LongSumAggregatorFactory("a0", "cnt"),
-                                    new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128),
-                                    new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128),
-                                    new DoublesSketchAggregatorFactory("a3:agg", "v0", 128)
-                                ))
-                                .postAggregators(
-                                    new DoublesSketchToQuantilePostAggregator(
-                                        "a1",
-                                        makeFieldAccessPostAgg("a1:agg"),
-                                        0.5f
-                                    ),
-                                    new ExpressionPostAggregator(
-                                        "p0",
-                                        "(\"a1\" + 1)",
-                                        null,
-                                        TestExprMacroTable.INSTANCE
-                                    ),
-                                    new DoublesSketchToQuantilePostAggregator(
-                                        "p2",
-                                        new FieldAccessPostAggregator(
-                                            "p1",
-                                            "a2:agg"
-                                        ),
-                                        0.5f
-                                    ),
-                                    new ExpressionPostAggregator(
-                                        "p3",
-                                        "(p2 + 1000)",
-                                        null,
-                                        TestExprMacroTable.INSTANCE
-                                    ),
-                                    new DoublesSketchToQuantilePostAggregator(
-                                        "p5",
-                                        new FieldAccessPostAggregator(
-                                            "p4",
-                                            "a3:agg"
-                                        ),
-                                        0.5f
-                                    ),
-                                    new ExpressionPostAggregator(
-                                        "p6",
-                                        "(p5 + 1000)",
-                                        null,
-                                        TestExprMacroTable.INSTANCE
-                                    ),
-                                    new DoublesSketchToQuantilePostAggregator(
-                                        "p8",
-                                        new FieldAccessPostAggregator(
-                                            "p7",
-                                            "a2:agg"
-                                        ),
-                                        0.5f
-                                    ),
-                                    new ExpressionPostAggregator("p9", "abs(p8)", null, TestExprMacroTable.INSTANCE),
-                                    new DoublesSketchToQuantilesPostAggregator(
-                                        "p11",
-                                        new FieldAccessPostAggregator(
-                                            "p10",
-                                            "a2:agg"
-                                        ),
-                                        new double[]{0.5d, 0.8d}
-                                    ),
-                                    new DoublesSketchToHistogramPostAggregator(
-                                        "p13",
-                                        new FieldAccessPostAggregator(
-                                            "p12",
-                                            "a2:agg"
-                                        ),
-                                        new double[]{0.2d, 0.6d},
-                                        null
-                                    ),
-                                    new DoublesSketchToRankPostAggregator(
-                                        "p15",
-                                        new FieldAccessPostAggregator(
-                                            "p14",
-                                            "a2:agg"
-                                        ),
-                                        3.0d
-                                    ),
-                                    new DoublesSketchToCDFPostAggregator(
-                                        "p17",
-                                        new FieldAccessPostAggregator(
-                                            "p16",
-                                            "a2:agg"
-                                        ),
-                                        new double[]{0.2d, 0.6d}
-                                    ),
-                                    new DoublesSketchToStringPostAggregator(
-                                        "p19",
-                                        new FieldAccessPostAggregator(
-                                            "p18",
-                                            "a2:agg"
-                                        )
-                                    )
-                                )
-                                .context(TIMESERIES_CONTEXT_DEFAULT)
-                                .build();
-
-    // Verify query
-    Assert.assertEquals(expectedQuery, actualQuery);
   }
 
   @Test
   public void testDoublesSketchPostAggsPostSort() throws Exception
   {
-    SqlLifecycle sqlLifecycle = getSqlLifecycle();
-
-    final String sql = "SELECT DS_QUANTILES_SKETCH(m1) as y FROM druid.foo ORDER BY  DS_GET_QUANTILE(DS_QUANTILES_SKETCH(m1), 0.5) DESC LIMIT 10";
-    final String sql2 = StringUtils.format("SELECT DS_GET_QUANTILE(y, 0.5), DS_GET_QUANTILE(y, 0.98) from (%s)", sql);
-
-    // Verify results
-    final List<Object[]> results = sqlLifecycle.runSimple(
-        sql2,
-        TIMESERIES_CONTEXT_DEFAULT,
-        DEFAULT_PARAMETERS,
-        AUTH_RESULT
-    ).toList();
-    final List<Object[]> expectedResults = ImmutableList.of(
-        new Object[]{
-            4.0d,
-            6.0d
-        }
-    );
-
-    Assert.assertEquals(expectedResults.size(), results.size());
-    for (int i = 0; i < expectedResults.size(); i++) {
-      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
-    }
-
-    Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
-
-    Query expectedQuery =
-        Druids.newTimeseriesQueryBuilder()
-              .dataSource(CalciteTests.DATASOURCE1)
-              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
-              .granularity(Granularities.ALL)
-              .aggregators(
-                  ImmutableList.of(
-                      new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
-                  )
-              )
-              .postAggregators(
-                  ImmutableList.of(
-                      new FieldAccessPostAggregator("p0", "a0:agg"),
-                      new DoublesSketchToQuantilePostAggregator(
-                          "p2",
-                          new FieldAccessPostAggregator("p1", "a0:agg"),
-                          0.5
-                      ),
-                      new DoublesSketchToQuantilePostAggregator("s1", new FieldAccessPostAggregator("s0", "p0"), 0.5),
-                      new DoublesSketchToQuantilePostAggregator(
-                          "s3",
-                          new FieldAccessPostAggregator("s2", "p0"),
-                          0.9800000190734863
+    testQuery(
+        "SELECT DS_GET_QUANTILE(y, 0.5), DS_GET_QUANTILE(y, 0.98) from ("
+        + "SELECT DS_QUANTILES_SKETCH(m1) as y FROM druid.foo ORDER BY  DS_GET_QUANTILE(DS_QUANTILES_SKETCH(m1), 0.5) DESC LIMIT 10"
+        + ")",
+        Collections.singletonList(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(CalciteTests.DATASOURCE1)
+                  .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                  .granularity(Granularities.ALL)
+                  .aggregators(
+                      ImmutableList.of(
+                          new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
                       )
                   )
-              )
-              .context(TIMESERIES_CONTEXT_DEFAULT)
-              .build();
-
-    // Verify query
-    Assert.assertEquals(expectedQuery, actualQuery);
+                  .postAggregators(
+                      ImmutableList.of(
+                          new FieldAccessPostAggregator("p0", "a0:agg"),
+                          new DoublesSketchToQuantilePostAggregator(
+                              "p2",
+                              new FieldAccessPostAggregator("p1", "a0:agg"),
+                              0.5
+                          ),
+                          new DoublesSketchToQuantilePostAggregator(
+                              "s1",
+                              new FieldAccessPostAggregator("s0", "p0"),
+                              0.5
+                          ),
+                          new DoublesSketchToQuantilePostAggregator(
+                              "s3",
+                              new FieldAccessPostAggregator("s2", "p0"),
+                              0.9800000190734863
+                          )
+                      )
+                  )
+                  .context(TIMESERIES_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{
+                4.0d,
+                6.0d
+            }
+        )
+    );
   }
 
   private static PostAggregator makeFieldAccessPostAgg(String name)
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index 0d6e8b8..1ed990b 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -96,8 +96,10 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * This class provides general utility to test any druid aggregation implementation given raw data,
@@ -116,6 +118,8 @@
   private final TemporaryFolder tempFolder;
   private final Closer resourceCloser;
 
+  private final Map<String, Object> queryContext;
+
   private AggregationTestHelper(
       ObjectMapper mapper,
       IndexMerger indexMerger,
@@ -124,7 +128,8 @@
       QueryRunnerFactory factory,
       TemporaryFolder tempFolder,
       List<? extends Module> jsonModulesToRegister,
-      Closer resourceCloser
+      Closer resourceCloser,
+      Map<String, Object> queryContext
   )
   {
     this.mapper = mapper;
@@ -134,6 +139,7 @@
     this.factory = factory;
     this.tempFolder = tempFolder;
     this.resourceCloser = resourceCloser;
+    this.queryContext = queryContext;
 
     for (Module mod : jsonModulesToRegister) {
       mapper.registerModule(mod);
@@ -174,7 +180,8 @@
         factory,
         tempFolder,
         jsonModulesToRegister,
-        closer
+        closer,
+        Collections.emptyMap()
     );
   }
 
@@ -213,7 +220,8 @@
         factory,
         tempFolder,
         jsonModulesToRegister,
-        Closer.create()
+        Closer.create(),
+        Collections.emptyMap()
     );
   }
 
@@ -264,7 +272,8 @@
         factory,
         tempFolder,
         jsonModulesToRegister,
-        resourceCloser
+        resourceCloser,
+        Collections.emptyMap()
     );
   }
 
@@ -307,7 +316,25 @@
         factory,
         tempFolder,
         jsonModulesToRegister,
-        resourceCloser
+        resourceCloser,
+        Collections.emptyMap()
+    );
+  }
+
+  public AggregationTestHelper withQueryContext(final Map<String, Object> queryContext)
+  {
+    final Map<String, Object> newContext = new HashMap<>(this.queryContext);
+    newContext.putAll(queryContext);
+    return new AggregationTestHelper(
+        mapper,
+        indexMerger,
+        indexIO,
+        toolChest,
+        factory,
+        tempFolder,
+        Collections.emptyList(),
+        resourceCloser,
+        newContext
     );
   }
 
@@ -658,7 +685,7 @@
   //from each segment, later deserialize and merge and finally return the results
   public <T> Sequence<T> runQueryOnSegments(final List<File> segmentDirs, final String queryJson)
   {
-    return runQueryOnSegments(segmentDirs, readQuery(queryJson));
+    return runQueryOnSegments(segmentDirs, readQuery(queryJson).withOverriddenContext(queryContext));
   }
 
   public <T> Sequence<T> runQueryOnSegments(final List<File> segmentDirs, final Query<T> query)