Vectorized versions of HllSketch aggregators. (#11115)

* Vectorized versions of HllSketch aggregators.

The patch uses the sameĀ "helper" approach as #10767 and #10304, and
extends the tests to run in both vectorized and non-vectorized modes.

Also includes some minor changes to the theta sketch vector aggregator:

- Cosmetic changes to make the hll and theta implementations look
  more similar.
- Extends the theta SQL tests to run in vectorized mode.

* Updates post-code-review.

* Fix javadoc.
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
index 8abc305..df68180 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
@@ -26,8 +26,11 @@
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
 
@@ -81,6 +84,24 @@
     );
   }
 
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    return true;
+  }
+
+  @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+    return new HllSketchBuildVectorAggregator(
+        selectorFactory,
+        getFieldName(),
+        getLgK(),
+        TgtHllType.valueOf(getTgtHllType()),
+        getMaxIntermediateSize()
+    );
+  }
+
   /**
    * For the HLL_4 sketch type, this value can be exceeded slightly in extremely rare cases.
    * The sketch will request on-heap memory and move there. It is handled in HllSketchBuildBufferAggregator.
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
index 4c39259..ab54215 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
@@ -19,22 +19,12 @@
 
 package org.apache.druid.query.aggregation.datasketches.hll;
 
-import com.google.common.util.concurrent.Striped;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import org.apache.datasketches.hll.HllSketch;
 import org.apache.datasketches.hll.TgtHllType;
-import org.apache.datasketches.hll.Union;
-import org.apache.datasketches.memory.WritableMemory;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.ColumnValueSelector;
 
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.IdentityHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
 
 /**
  * This aggregator builds sketches from raw data.
@@ -42,26 +32,8 @@
  */
 public class HllSketchBuildBufferAggregator implements BufferAggregator
 {
-
-  /**
-   * for locking per buffer position (power of 2 to make index computation faster)
-   */
-  private static final int NUM_STRIPES = 64;
-
   private final ColumnValueSelector<Object> selector;
-  private final int lgK;
-  private final TgtHllType tgtHllType;
-  private final int size;
-  private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
-  private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
-  private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
-
-  /**
-   * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
-   * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
-   * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
-   */
-  private final byte[] emptySketch;
+  private final HllSketchBuildBufferAggregatorHelper helper;
 
   public HllSketchBuildBufferAggregator(
       final ColumnValueSelector<Object> selector,
@@ -71,39 +43,15 @@
   )
   {
     this.selector = selector;
-    this.lgK = lgK;
-    this.tgtHllType = tgtHllType;
-    this.size = size;
-    this.emptySketch = new byte[size];
-
-    //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
-    new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
+    this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size);
   }
 
   @Override
   public void init(final ByteBuffer buf, final int position)
   {
-    // Copy prebuilt empty sketch object.
-
-    final int oldPosition = buf.position();
-    try {
-      buf.position(position);
-      buf.put(emptySketch);
-    }
-    finally {
-      buf.position(oldPosition);
-    }
-
-    // Add an HllSketch for this chunk to our sketchCache.
-    final WritableMemory mem = getMemory(buf).writableRegion(position, size);
-    putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
+    helper.init(buf, position);
   }
 
-  /**
-   * This method uses locks because it can be used during indexing,
-   * and Druid can call aggregate() and get() concurrently
-   * See https://github.com/druid-io/druid/pull/3956
-   */
   @Override
   public void aggregate(final ByteBuffer buf, final int position)
   {
@@ -111,40 +59,20 @@
     if (value == null) {
       return;
     }
-    final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock();
-    lock.lock();
-    try {
-      final HllSketch sketch = sketchCache.get(buf).get(position);
-      HllSketchBuildAggregator.updateSketch(sketch, value);
-    }
-    finally {
-      lock.unlock();
-    }
+
+    HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value);
   }
 
-  /**
-   * This method uses locks because it can be used during indexing,
-   * and Druid can call aggregate() and get() concurrently
-   * See https://github.com/druid-io/druid/pull/3956
-   */
   @Override
   public Object get(final ByteBuffer buf, final int position)
   {
-    final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
-    lock.lock();
-    try {
-      return sketchCache.get(buf).get(position).copy();
-    }
-    finally {
-      lock.unlock();
-    }
+    return helper.get(buf, position);
   }
 
   @Override
   public void close()
   {
-    memCache.clear();
-    sketchCache.clear();
+    helper.clear();
   }
 
   @Override
@@ -159,11 +87,6 @@
     throw new UnsupportedOperationException("Not implemented");
   }
 
-  private WritableMemory getMemory(final ByteBuffer buf)
-  {
-    return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN));
-  }
-
   /**
    * In very rare cases sketches can exceed given memory, request on-heap memory and move there.
    * We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions.
@@ -171,44 +94,7 @@
   @Override
   public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
   {
-    HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition);
-    final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size);
-    if (sketch.isSameResource(oldMem)) { // sketch has not moved
-      final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size);
-      sketch = HllSketch.writableWrap(newMem);
-    }
-    putSketchIntoCache(newBuf, newPosition, sketch);
-  }
-
-  private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch)
-  {
-    final Int2ObjectMap<HllSketch> map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>());
-    map.put(position, sketch);
-  }
-
-  /**
-   * compute lock index to avoid boxing in Striped.get() call
-   *
-   * @param position
-   *
-   * @return index
-   */
-  static int lockIndex(final int position)
-  {
-    return smear(position) % NUM_STRIPES;
-  }
-
-  /**
-   * see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548
-   *
-   * @param hashCode
-   *
-   * @return smeared hashCode
-   */
-  private static int smear(int hashCode)
-  {
-    hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
-    return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
+    helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
   }
 
   @Override
@@ -218,6 +104,6 @@
     // lgK should be inspected because different execution paths exist in HllSketch.update() that is called from
     // @CalledFromHotLoop-annotated aggregate() depending on the lgK.
     // See https://github.com/apache/druid/pull/6893#discussion_r250726028
-    inspector.visit("lgK", lgK);
+    inspector.visit("lgK", helper.getLgK());
   }
 }
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java
new file mode 100644
index 0000000..466f1ac
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.TgtHllType;
+import org.apache.datasketches.memory.WritableMemory;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.IdentityHashMap;
+
+public class HllSketchBuildBufferAggregatorHelper
+{
+  private final int lgK;
+  private final int size;
+  private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
+  private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
+
+  /**
+   * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
+   * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
+   * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link org.apache.datasketches.hll.Union} objects.
+   */
+  private final byte[] emptySketch;
+
+  public HllSketchBuildBufferAggregatorHelper(final int lgK, final TgtHllType tgtHllType, final int size)
+  {
+    this.lgK = lgK;
+    this.size = size;
+    this.emptySketch = new byte[size];
+
+    //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
+    new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
+  }
+
+  /**
+   * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and
+   * {@link org.apache.druid.query.aggregation.VectorAggregator#init}.
+   */
+  public void init(final ByteBuffer buf, final int position)
+  {
+    // Copy prebuilt empty sketch object.
+
+    final int oldPosition = buf.position();
+    try {
+      buf.position(position);
+      buf.put(emptySketch);
+    }
+    finally {
+      buf.position(oldPosition);
+    }
+
+    // Add an HllSketch for this chunk to our sketchCache.
+    final WritableMemory mem = getMemory(buf).writableRegion(position, size);
+    putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
+  }
+
+  /**
+   * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
+   * {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
+   */
+  public Object get(ByteBuffer buf, int position)
+  {
+    return sketchCache.get(buf).get(position).copy();
+  }
+
+  /**
+   * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and
+   * {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}.
+   */
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuf, ByteBuffer newBuf)
+  {
+    HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition);
+    final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size);
+    if (sketch.isSameResource(oldMem)) { // sketch has not moved
+      final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size);
+      sketch = HllSketch.writableWrap(newMem);
+    }
+    putSketchIntoCache(newBuf, newPosition, sketch);
+  }
+
+  /**
+   * Retrieves the sketch at a particular position.
+   */
+  public HllSketch getSketchAtPosition(final ByteBuffer buf, final int position)
+  {
+    return sketchCache.get(buf).get(position);
+  }
+
+  /**
+   * Clean up resources used by this helper.
+   */
+  public void clear()
+  {
+    memCache.clear();
+    sketchCache.clear();
+  }
+
+  public int getLgK()
+  {
+    return lgK;
+  }
+
+  private WritableMemory getMemory(final ByteBuffer buf)
+  {
+    return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN));
+  }
+
+  private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch)
+  {
+    final Int2ObjectMap<HllSketch> map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>());
+    map.put(position, sketch);
+  }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java
new file mode 100644
index 0000000..506c9c3
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.TgtHllType;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class HllSketchBuildVectorAggregator implements VectorAggregator
+{
+  private final HllSketchBuildBufferAggregatorHelper helper;
+  private final Supplier<Object[]> objectSupplier;
+
+  HllSketchBuildVectorAggregator(
+      final VectorColumnSelectorFactory columnSelectorFactory,
+      final String column,
+      final int lgK,
+      final TgtHllType tgtHllType,
+      final int size
+  )
+  {
+    this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size);
+    this.objectSupplier =
+        ColumnProcessors.makeVectorProcessor(
+            column,
+            ToObjectVectorColumnProcessorFactory.INSTANCE,
+            columnSelectorFactory
+        );
+  }
+
+  @Override
+  public void init(final ByteBuffer buf, final int position)
+  {
+    helper.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+  {
+    final Object[] vector = objectSupplier.get();
+    for (int i = startRow; i < endRow; i++) {
+      final Object value = vector[i];
+      if (value != null) {
+        HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value);
+      }
+    }
+  }
+
+  @Override
+  public void aggregate(
+      final ByteBuffer buf,
+      final int numRows,
+      final int[] positions,
+      @Nullable final int[] rows,
+      final int positionOffset
+  )
+  {
+    final Object[] vector = objectSupplier.get();
+
+    for (int i = 0; i < numRows; i++) {
+      final Object o = vector[rows != null ? rows[i] : i];
+
+      if (o != null) {
+        final int position = positions[i] + positionOffset;
+        HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), o);
+      }
+    }
+  }
+
+  @Override
+  public Object get(final ByteBuffer buf, final int position)
+  {
+    return helper.get(buf, position);
+  }
+
+  /**
+   * In very rare cases sketches can exceed given memory, request on-heap memory and move there.
+   * We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions.
+   */
+  @Override
+  public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
+  {
+    helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
+  }
+
+  @Override
+  public void close()
+  {
+    helper.clear();
+  }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java
index 74afea3..050cf59 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java
@@ -29,8 +29,11 @@
 import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
 
@@ -103,6 +106,24 @@
   }
 
   @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    return true;
+  }
+
+  @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+    return new HllSketchMergeVectorAggregator(
+        selectorFactory,
+        getFieldName(),
+        getLgK(),
+        TgtHllType.valueOf(getTgtHllType()),
+        getMaxIntermediateSize()
+    );
+  }
+
+  @Override
   public int getMaxIntermediateSize()
   {
     return Union.getMaxSerializationBytes(getLgK());
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
index 7161c25..278a5c5 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.query.aggregation.datasketches.hll;
 
-import com.google.common.util.concurrent.Striped;
 import org.apache.datasketches.hll.HllSketch;
 import org.apache.datasketches.hll.TgtHllType;
 import org.apache.datasketches.hll.Union;
@@ -30,8 +29,6 @@
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
 
 /**
  * This aggregator merges existing sketches.
@@ -39,24 +36,8 @@
  */
 public class HllSketchMergeBufferAggregator implements BufferAggregator
 {
-
-  /**
-   * for locking per buffer position (power of 2 to make index computation faster)
-   */
-  private static final int NUM_STRIPES = 64;
-
   private final ColumnValueSelector<HllSketch> selector;
-  private final int lgK;
-  private final TgtHllType tgtHllType;
-  private final int size;
-  private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
-
-  /**
-   * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image.
-   * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The
-   * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
-   */
-  private final byte[] emptyUnion;
+  private final HllSketchMergeBufferAggregatorHelper helper;
 
   public HllSketchMergeBufferAggregator(
       final ColumnValueSelector<HllSketch> selector,
@@ -66,39 +47,15 @@
   )
   {
     this.selector = selector;
-    this.lgK = lgK;
-    this.tgtHllType = tgtHllType;
-    this.size = size;
-    this.emptyUnion = new byte[size];
-
-    //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction)
-    new Union(lgK, WritableMemory.wrap(emptyUnion));
+    this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size);
   }
 
   @Override
   public void init(final ByteBuffer buf, final int position)
   {
-    // Copy prebuilt empty union object.
-    // Not necessary to cache a Union wrapper around the initialized memory, because:
-    //  - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get".
-    //  - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the
-    //    max size and therefore do not need to be potentially moved in-heap.
-
-    final int oldPosition = buf.position();
-    try {
-      buf.position(position);
-      buf.put(emptyUnion);
-    }
-    finally {
-      buf.position(oldPosition);
-    }
+    helper.init(buf, position);
   }
 
-  /**
-   * This method uses locks because it can be used during indexing,
-   * and Druid can call aggregate() and get() concurrently
-   * See https://github.com/druid-io/druid/pull/3956
-   */
   @Override
   public void aggregate(final ByteBuffer buf, final int position)
   {
@@ -106,36 +63,18 @@
     if (sketch == null) {
       return;
     }
-    final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
-    final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).writeLock();
-    lock.lock();
-    try {
-      final Union union = Union.writableWrap(mem);
-      union.update(sketch);
-    }
-    finally {
-      lock.unlock();
-    }
+
+    final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)
+                                             .writableRegion(position, helper.getSize());
+
+    final Union union = Union.writableWrap(mem);
+    union.update(sketch);
   }
 
-  /**
-   * This method uses locks because it can be used during indexing,
-   * and Druid can call aggregate() and get() concurrently
-   * See https://github.com/druid-io/druid/pull/3956
-   */
   @Override
   public Object get(final ByteBuffer buf, final int position)
   {
-    final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
-    final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).readLock();
-    lock.lock();
-    try {
-      final Union union = Union.writableWrap(mem);
-      return union.getResult(tgtHllType);
-    }
-    finally {
-      lock.unlock();
-    }
+    return helper.get(buf, position);
   }
 
   @Override
@@ -163,6 +102,6 @@
     // lgK should be inspected because different execution paths exist in Union.update() that is called from
     // @CalledFromHotLoop-annotated aggregate() depending on the lgK.
     // See https://github.com/apache/druid/pull/6893#discussion_r250726028
-    inspector.visit("lgK", lgK);
+    inspector.visit("lgK", helper.getLgK());
   }
 }
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
new file mode 100644
index 0000000..d6030e8
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.TgtHllType;
+import org.apache.datasketches.hll.Union;
+import org.apache.datasketches.memory.WritableMemory;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class HllSketchMergeBufferAggregatorHelper
+{
+  private final int lgK;
+  private final TgtHllType tgtHllType;
+  private final int size;
+
+  /**
+   * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image.
+   * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The
+   * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
+   */
+  private final byte[] emptyUnion;
+
+  public HllSketchMergeBufferAggregatorHelper(int lgK, TgtHllType tgtHllType, int size)
+  {
+    this.lgK = lgK;
+    this.tgtHllType = tgtHllType;
+    this.size = size;
+    this.emptyUnion = new byte[size];
+
+    //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction)
+    new Union(lgK, WritableMemory.wrap(emptyUnion));
+  }
+
+  /**
+   * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and
+   * {@link org.apache.druid.query.aggregation.VectorAggregator#init}.
+   */
+  public void init(final ByteBuffer buf, final int position)
+  {
+    // Copy prebuilt empty union object.
+    // Not necessary to cache a Union wrapper around the initialized memory, because:
+    //  - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get".
+    //  - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the
+    //    max size and therefore do not need to be potentially moved in-heap.
+
+    final int oldPosition = buf.position();
+    try {
+      buf.position(position);
+      buf.put(emptyUnion);
+    }
+    finally {
+      buf.position(oldPosition);
+    }
+  }
+
+  /**
+   * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
+   * {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
+   */
+  public Object get(ByteBuffer buf, int position)
+  {
+    final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
+    final Union union = Union.writableWrap(mem);
+    return union.getResult(tgtHllType);
+  }
+
+  public int getLgK()
+  {
+    return lgK;
+  }
+
+  public int getSize()
+  {
+    return size;
+  }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
new file mode 100644
index 0000000..d97c0b5
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.TgtHllType;
+import org.apache.datasketches.hll.Union;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.function.Supplier;
+
+public class HllSketchMergeVectorAggregator implements VectorAggregator
+{
+  private final HllSketchMergeBufferAggregatorHelper helper;
+  private final Supplier<Object[]> objectSupplier;
+
+  HllSketchMergeVectorAggregator(
+      final VectorColumnSelectorFactory columnSelectorFactory,
+      final String column,
+      final int lgK,
+      final TgtHllType tgtHllType,
+      final int size
+  )
+  {
+    this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size);
+    this.objectSupplier =
+        ColumnProcessors.makeVectorProcessor(
+            column,
+            ToObjectVectorColumnProcessorFactory.INSTANCE,
+            columnSelectorFactory
+        );
+  }
+
+  @Override
+  public void init(final ByteBuffer buf, final int position)
+  {
+    helper.init(buf, position);
+  }
+
+  @Override
+  public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+  {
+    final Object[] vector = objectSupplier.get();
+
+    final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)
+                                             .writableRegion(position, helper.getSize());
+
+    final Union union = Union.writableWrap(mem);
+    for (int i = startRow; i < endRow; i++) {
+      union.update((HllSketch) vector[i]);
+    }
+  }
+
+  @Override
+  public void aggregate(
+      final ByteBuffer buf,
+      final int numRows,
+      final int[] positions,
+      @Nullable final int[] rows,
+      final int positionOffset
+  )
+  {
+    final Object[] vector = objectSupplier.get();
+
+    for (int i = 0; i < numRows; i++) {
+      final HllSketch o = (HllSketch) vector[rows != null ? rows[i] : i];
+
+      if (o != null) {
+        final int position = positions[i] + positionOffset;
+
+        final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)
+                                                 .writableRegion(position, helper.getSize());
+
+        final Union union = Union.writableWrap(mem);
+        union.update(o);
+      }
+    }
+  }
+
+  @Override
+  public Object get(final ByteBuffer buf, final int position)
+  {
+    return helper.get(buf, position);
+  }
+
+  @Override
+  public void close()
+  {
+    // Nothing to close.
+  }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
index b5b9ad8..a862265 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
@@ -31,18 +31,18 @@
 
 public class SketchVectorAggregator implements VectorAggregator
 {
-  private final Supplier<Object[]> toObjectProcessor;
   private final SketchBufferAggregatorHelper helper;
+  private final Supplier<Object[]> objectSupplier;
 
-  public SketchVectorAggregator(
-      VectorColumnSelectorFactory columnSelectorFactory,
-      String column,
-      int size,
-      int maxIntermediateSize
+  SketchVectorAggregator(
+      final VectorColumnSelectorFactory columnSelectorFactory,
+      final String column,
+      final int size,
+      final int maxIntermediateSize
   )
   {
     this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize);
-    this.toObjectProcessor =
+    this.objectSupplier =
         ColumnProcessors.makeVectorProcessor(
             column,
             ToObjectVectorColumnProcessorFactory.INSTANCE,
@@ -60,7 +60,7 @@
   public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
   {
     final Union union = helper.getOrCreateUnion(buf, position);
-    final Object[] vector = toObjectProcessor.get();
+    final Object[] vector = objectSupplier.get();
 
     for (int i = startRow; i < endRow; i++) {
       final Object o = vector[i];
@@ -79,7 +79,7 @@
       final int positionOffset
   )
   {
-    final Object[] vector = toObjectProcessor.get();
+    final Object[] vector = objectSupplier.get();
 
     for (int i = 0; i < numRows; i++) {
       final Object o = vector[rows != null ? rows[i] : i];
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
index a299257..afac573 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
@@ -26,6 +26,7 @@
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.AggregationTestHelper;
 import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
 import org.apache.druid.query.groupby.GroupByQuery;
@@ -54,23 +55,27 @@
   private static final boolean ROUND = true;
 
   private final AggregationTestHelper helper;
+  private final QueryContexts.Vectorize vectorize;
 
   @Rule
   public final TemporaryFolder tempFolder = new TemporaryFolder();
 
-  public HllSketchAggregatorTest(GroupByQueryConfig config)
+  public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize)
   {
     HllSketchModule.registerSerde();
     helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
         new HllSketchModule().getJacksonModules(), config, tempFolder);
+    this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
   }
 
-  @Parameterized.Parameters(name = "{0}")
+  @Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
   public static Collection<?> constructorFeeder()
   {
     final List<Object[]> constructors = new ArrayList<>();
     for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
-      constructors.add(new Object[]{config});
+      for (String vectorize : new String[]{"false", "true", "force"}) {
+        constructors.add(new Object[]{config, vectorize});
+      }
     }
     return constructors;
   }
@@ -224,10 +229,32 @@
                         )
                         .setPostAggregatorSpecs(
                             ImmutableList.of(
-                              new HllSketchToEstimatePostAggregator("estimate", new FieldAccessPostAggregator("f1", "sketch"), false),
-                              new HllSketchToEstimateWithBoundsPostAggregator("estimateWithBounds", new FieldAccessPostAggregator("f1", "sketch"), 2),
-                              new HllSketchToStringPostAggregator("summary", new FieldAccessPostAggregator("f1", "sketch")),
-                              new HllSketchUnionPostAggregator("union", ImmutableList.of(new FieldAccessPostAggregator("f1", "sketch"), new FieldAccessPostAggregator("f2", "sketch")), null, null)
+                                new HllSketchToEstimatePostAggregator(
+                                    "estimate",
+                                    new FieldAccessPostAggregator("f1", "sketch"),
+                                    false
+                                ),
+                                new HllSketchToEstimateWithBoundsPostAggregator(
+                                    "estimateWithBounds",
+                                    new FieldAccessPostAggregator(
+                                        "f1",
+                                        "sketch"
+                                    ),
+                                    2
+                                ),
+                                new HllSketchToStringPostAggregator(
+                                    "summary",
+                                    new FieldAccessPostAggregator("f1", "sketch")
+                                ),
+                                new HllSketchUnionPostAggregator(
+                                    "union",
+                                    ImmutableList.of(new FieldAccessPostAggregator(
+                                        "f1",
+                                        "sketch"
+                                    ), new FieldAccessPostAggregator("f2", "sketch")),
+                                    null,
+                                    null
+                                )
                             )
                         )
                         .build()
@@ -320,7 +347,7 @@
     );
   }
 
-  private static String buildGroupByQueryJson(
+  private String buildGroupByQueryJson(
       String aggregationType,
       String aggregationFieldName,
       boolean aggregationRound
@@ -338,6 +365,7 @@
         .put("dimensions", Collections.emptyList())
         .put("aggregations", Collections.singletonList(aggregation))
         .put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z"))
+        .put("context", ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize.toString()))
         .build();
     return toJson(object);
   }
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
index 039801c..b1faed1 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
@@ -32,6 +32,7 @@
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -84,34 +85,60 @@
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+@RunWith(Parameterized.class)
 public class HllSketchSqlAggregatorTest extends CalciteTestBase
 {
   private static final String DATA_SOURCE = "foo";
   private static final boolean ROUND = true;
-  private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
-      PlannerContext.CTX_SQL_QUERY_ID, "dummy"
-  );
   private static QueryRunnerFactoryConglomerate conglomerate;
   private static Closer resourceCloser;
   private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
 
   @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   @Rule
   public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER);
-  
+
+  private final Map<String, Object> queryContext;
   private SpecificSegmentsQuerySegmentWalker walker;
   private SqlLifecycleFactory sqlLifecycleFactory;
 
+  public HllSketchSqlAggregatorTest(final String vectorize)
+  {
+    this.queryContext = ImmutableMap.of(
+        PlannerContext.CTX_SQL_QUERY_ID, "dummy",
+        QueryContexts.VECTORIZE_KEY, vectorize,
+        QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
+    );
+  }
+
+  @Parameterized.Parameters(name = "vectorize = {0}")
+  public static Collection<?> constructorFeeder()
+  {
+    final List<Object[]> constructors = new ArrayList<>();
+    for (String vectorize : new String[]{"false", "true", "force"}) {
+      constructors.add(new Object[]{vectorize});
+    }
+    return constructors;
+  }
+
   @BeforeClass
   public static void setUpClass()
   {
@@ -207,6 +234,9 @@
   @Test
   public void testApproxCountDistinctHllSketch() throws Exception
   {
+    // Can't vectorize due to CONCAT expression.
+    cannotVectorize();
+
     SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
 
     final String sql = "SELECT\n"
@@ -222,7 +252,7 @@
     // Verify results
     final List<Object[]> results = sqlLifecycle.runSimple(
         sql,
-        QUERY_CONTEXT_DEFAULT,
+        queryContext,
         DEFAULT_PARAMETERS,
         authenticationResult
     ).toList();
@@ -317,8 +347,9 @@
                       new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND)
                   )
               )
-              .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
-              .build(),
+              .context(queryContext)
+              .build()
+              .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)),
         Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
     );
   }
@@ -327,6 +358,9 @@
   @Test
   public void testAvgDailyCountDistinctHllSketch() throws Exception
   {
+    // Can't vectorize due to outer query, which runs on an inline datasource.
+    cannotVectorize();
+
     SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
 
     final String sql = "SELECT\n"
@@ -340,7 +374,7 @@
     // Verify results
     final List<Object[]> results = sqlLifecycle.runSimple(
         sql,
-        QUERY_CONTEXT_DEFAULT,
+        queryContext,
         DEFAULT_PARAMETERS,
         authenticationResult
     ).toList();
@@ -379,11 +413,14 @@
                                                        new FinalizingFieldAccessPostAggregator("a0", "a0:a")
                                                    )
                                                )
-                                               .context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
-                                                   ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"),
-                                                   "d0"
-                                               ))
+                                               .context(queryContext)
                                                .build()
+                                               .withOverriddenContext(
+                                                   BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
+                                                       ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"),
+                                                       "d0"
+                                                   )
+                                               )
                                      )
                                  )
                                  .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
@@ -414,7 +451,7 @@
                                          )
                                      )
                                  )
-                                 .setContext(QUERY_CONTEXT_DEFAULT)
+                                 .setContext(queryContext)
                                  .build();
 
     Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
@@ -437,7 +474,7 @@
 
     // Verify results
     final List<Object[]> results =
-        sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+        sqlLifecycle.runSimple(sql, queryContext, DEFAULT_PARAMETERS, authenticationResult).toList();
     final int expected = NullHandling.replaceWithDefault() ? 1 : 2;
     Assert.assertEquals(expected, results.size());
   }
@@ -466,7 +503,7 @@
     // Verify results
     final List<Object[]> results = sqlLifecycle.runSimple(
         sql,
-        QUERY_CONTEXT_DEFAULT,
+        queryContext,
         DEFAULT_PARAMETERS,
         authenticationResult
     ).toList();
@@ -598,11 +635,9 @@
                       new HllSketchToEstimatePostAggregator("p23", new FieldAccessPostAggregator("p22", "a0"), true)
                   )
               )
-              .context(ImmutableMap.of(
-                  "skipEmptyBuckets", true,
-                  PlannerContext.CTX_SQL_QUERY_ID, "dummy"
-              ))
-              .build();
+              .context(queryContext)
+              .build()
+              .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
 
     // Verify query
     Assert.assertEquals(expectedQuery, actualQuery);
@@ -619,7 +654,7 @@
     // Verify results
     final List<Object[]> results = sqlLifecycle.runSimple(
         sql2,
-        QUERY_CONTEXT_DEFAULT,
+        queryContext,
         DEFAULT_PARAMETERS,
         authenticationResult
     ).toList();
@@ -670,13 +705,19 @@
                       new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "p0"))
                   )
               )
-              .context(ImmutableMap.of(
-                  "skipEmptyBuckets", true,
-                  PlannerContext.CTX_SQL_QUERY_ID, "dummy"
-              ))
-              .build();
+              .context(queryContext)
+              .build()
+              .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
 
     // Verify query
     Assert.assertEquals(expectedQuery, actualQuery);
   }
+
+  private void cannotVectorize()
+  {
+    if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY))
+        == QueryContexts.Vectorize.FORCE) {
+      expectedException.expectMessage("Cannot vectorize");
+    }
+  }
 }
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
index 9201c91..0aac2b0 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
@@ -32,6 +32,7 @@
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -81,14 +82,20 @@
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+@RunWith(Parameterized.class)
 public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
 {
   private static final String DATA_SOURCE = "foo";
@@ -96,9 +103,6 @@
   private static QueryRunnerFactoryConglomerate conglomerate;
   private static Closer resourceCloser;
   private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
-  private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
-      PlannerContext.CTX_SQL_QUERY_ID, "dummy"
-  );
 
   @BeforeClass
   public static void setUpClass()
@@ -114,14 +118,37 @@
   }
 
   @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   @Rule
   public QueryLogHook queryLogHook = QueryLogHook.create();
 
+  private final Map<String, Object> queryContext;
   private SpecificSegmentsQuerySegmentWalker walker;
   private SqlLifecycleFactory sqlLifecycleFactory;
 
+  public ThetaSketchSqlAggregatorTest(final String vectorize)
+  {
+    this.queryContext = ImmutableMap.of(
+        PlannerContext.CTX_SQL_QUERY_ID, "dummy",
+        QueryContexts.VECTORIZE_KEY, vectorize,
+        QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
+    );
+  }
+
+  @Parameterized.Parameters(name = "vectorize = {0}")
+  public static Collection<?> constructorFeeder()
+  {
+    final List<Object[]> constructors = new ArrayList<>();
+    for (String vectorize : new String[]{"false", "true", "force"}) {
+      constructors.add(new Object[]{vectorize});
+    }
+    return constructors;
+  }
+
   @Before
   public void setUp() throws Exception
   {
@@ -206,21 +233,30 @@
   @Test
   public void testApproxCountDistinctThetaSketch() throws Exception
   {
+    // Cannot vectorize due to SUBSTRING.
+    cannotVectorize();
+
     SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
     final String sql = "SELECT\n"
                        + "  SUM(cnt),\n"
-                       + "  APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n" // uppercase
-                       + "  APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered
-                       + "  APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn
-                       + "  APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression
-                       + "  APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n" // on native theta sketch column
-                       + "  APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n" // on native theta sketch column
+                       + "  APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n"
+                       // uppercase
+                       + "  APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n"
+                       // lowercase; also, filtered
+                       + "  APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n"
+                       // on extractionFn
+                       + "  APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n"
+                       // on expression
+                       + "  APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n"
+                       // on native theta sketch column
+                       + "  APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n"
+                       // on native theta sketch column
                        + "FROM druid.foo";
 
     // Verify results
     final List<Object[]> results = sqlLifecycle.runSimple(
         sql,
-        QUERY_CONTEXT_DEFAULT,
+        queryContext,
         DEFAULT_PARAMETERS,
         authenticationResult
     ).toList();
@@ -319,8 +355,9 @@
                       new SketchMergeAggregatorFactory("a6", "thetasketch_dim1", null, null, null, null)
                   )
               )
-              .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
-              .build(),
+              .context(queryContext)
+              .build()
+              .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)),
         Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
     );
   }
@@ -328,6 +365,9 @@
   @Test
   public void testAvgDailyCountDistinctThetaSketch() throws Exception
   {
+    // Can't vectorize due to outer query (it operates on an inlined data source, which cannot be vectorized).
+    cannotVectorize();
+
     SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
 
     final String sql = "SELECT\n"
@@ -337,7 +377,7 @@
     // Verify results
     final List<Object[]> results = sqlLifecycle.runSimple(
         sql,
-        QUERY_CONTEXT_DEFAULT,
+        queryContext,
         DEFAULT_PARAMETERS,
         authenticationResult
     ).toList();
@@ -358,7 +398,11 @@
                                                                .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(
                                                                    Filtration.eternity()
                                                                )))
-                                                               .granularity(new PeriodGranularity(Period.days(1), null, DateTimeZone.UTC))
+                                                               .granularity(new PeriodGranularity(
+                                                                   Period.days(1),
+                                                                   null,
+                                                                   DateTimeZone.UTC
+                                                               ))
                                                                .aggregators(
                                                                    Collections.singletonList(
                                                                        new SketchMergeAggregatorFactory(
@@ -373,14 +417,25 @@
                                                                )
                                                                .postAggregators(
                                                                    ImmutableList.of(
-                                                                       new FinalizingFieldAccessPostAggregator("a0", "a0:a")
+                                                                       new FinalizingFieldAccessPostAggregator(
+                                                                           "a0",
+                                                                           "a0:a"
+                                                                       )
                                                                    )
                                                                )
-                                                               .context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
-                                                                   ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"),
-                                                                   "d0"
-                                                               ))
+                                                               .context(queryContext)
                                                                .build()
+                                                               .withOverriddenContext(
+                                                                   BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
+                                                                       ImmutableMap.of(
+                                                                           "skipEmptyBuckets",
+                                                                           true,
+                                                                           "sqlQueryId",
+                                                                           "dummy"
+                                                                       ),
+                                                                       "d0"
+                                                                   )
+                                                               )
                                      )
                                  )
                                  .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
@@ -388,8 +443,8 @@
                                  .setAggregatorSpecs(
                                      NullHandling.replaceWithDefault()
                                      ? Arrays.asList(
-                                       new LongSumAggregatorFactory("_a0:sum", "a0"),
-                                       new CountAggregatorFactory("_a0:count")
+                                         new LongSumAggregatorFactory("_a0:sum", "a0"),
+                                         new CountAggregatorFactory("_a0:count")
                                      )
                                      : Arrays.asList(
                                          new LongSumAggregatorFactory("_a0:sum", "a0"),
@@ -411,7 +466,7 @@
                                          )
                                      )
                                  )
-                                 .setContext(QUERY_CONTEXT_DEFAULT)
+                                 .setContext(queryContext)
                                  .build();
 
     Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
@@ -439,7 +494,7 @@
     // Verify results
     final List<Object[]> results = sqlLifecycle.runSimple(
         sql,
-        QUERY_CONTEXT_DEFAULT,
+        queryContext,
         DEFAULT_PARAMETERS,
         authenticationResult
     ).toList();
@@ -598,8 +653,9 @@
                       null
                   )
               )
-              .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
-              .build();
+              .context(queryContext)
+              .build()
+              .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
 
 
     // Verify query
@@ -617,7 +673,7 @@
     // Verify results
     final List<Object[]> results = sqlLifecycle.runSimple(
         sql2,
-        QUERY_CONTEXT_DEFAULT,
+        queryContext,
         DEFAULT_PARAMETERS,
         authenticationResult
     ).toList();
@@ -664,11 +720,19 @@
                       null
                   )
               )
-              .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
-              .build();
-
+              .context(queryContext)
+              .build()
+              .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
 
     // Verify query
     Assert.assertEquals(expectedQuery, actualQuery);
   }
+
+  private void cannotVectorize()
+  {
+    if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY))
+        == QueryContexts.Vectorize.FORCE) {
+      expectedException.expectMessage("Cannot vectorize");
+    }
+  }
 }