Vectorize the cardinality aggregator. (#11182)

* Vectorize the cardinality aggregator.

Does not include a byRow implementation, so if byRow is true then
the aggregator still goes through the non-vectorized path.

Testing strategy:

- New tests that exercise both styles of "aggregate" for supported types.
- Some existing tests have also become active (note the deleted
  "cannotVectorize" lines).

* Adjust whitespace.
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
index 4990d5d..57c5c7e 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
@@ -35,15 +35,21 @@
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.NoopAggregator;
 import org.apache.druid.query.aggregation.NoopBufferAggregator;
+import org.apache.druid.query.aggregation.NoopVectorAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
 import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
 import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
+import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessorFactory;
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
@@ -146,7 +152,6 @@
     return new CardinalityAggregator(selectorPluses, byRow);
   }
 
-
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
   {
@@ -164,6 +169,32 @@
   }
 
   @Override
+  public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+  {
+    if (fields.isEmpty()) {
+      return NoopVectorAggregator.instance();
+    }
+
+    return new CardinalityVectorAggregator(
+        fields.stream().map(
+            field ->
+                ColumnProcessors.makeVectorProcessor(
+                    field,
+                    CardinalityVectorProcessorFactory.INSTANCE,
+                    selectorFactory
+                )
+        ).collect(Collectors.toList())
+    );
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    // !byRow because there is not yet a vector implementation.
+    return !byRow && fields.stream().allMatch(DimensionSpec::canVectorize);
+  }
+
+  @Override
   public Comparator getComparator()
   {
     return new Comparator<HyperLogLogCollector>()
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java
index 64d70d4..0e3d698 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java
@@ -55,10 +55,11 @@
     // Save position, limit and restore later instead of allocating a new ByteBuffer object
     final int oldPosition = buf.position();
     final int oldLimit = buf.limit();
-    buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
-    buf.position(position);
 
     try {
+      buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+      buf.position(position);
+
       final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
       if (byRow) {
         CardinalityAggregator.hashRow(selectorPluses, collector);
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java
new file mode 100644
index 0000000..59d1ecf
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesBufferAggregator;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class CardinalityVectorAggregator implements VectorAggregator
+{
+  private final List<CardinalityVectorProcessor> processors;
+
+  CardinalityVectorAggregator(List<CardinalityVectorProcessor> processors)
+  {
+    this.processors = processors;
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    HyperUniquesBufferAggregator.doInit(buf, position);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    for (final CardinalityVectorProcessor processor : processors) {
+      processor.aggregate(buf, position, startRow, endRow);
+    }
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    for (final CardinalityVectorProcessor processor : processors) {
+      processor.aggregate(buf, numRows, positions, rows, positionOffset);
+    }
+  }
+
+  @Nullable
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return HyperUniquesBufferAggregator.doGet(buf, position);
+  }
+
+  @Override
+  public void close()
+  {
+    // Nothing to close.
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java
index 1471475..4fc7a32 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java
@@ -34,6 +34,11 @@
 public class DoubleCardinalityAggregatorColumnSelectorStrategy
     implements CardinalityAggregatorColumnSelectorStrategy<BaseDoubleColumnValueSelector>
 {
+  public static void addDoubleToCollector(final HyperLogLogCollector collector, final double n)
+  {
+    collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(n)).asBytes());
+  }
+
   @Override
   public void hashRow(BaseDoubleColumnValueSelector selector, Hasher hasher)
   {
@@ -46,7 +51,7 @@
   public void hashValues(BaseDoubleColumnValueSelector selector, HyperLogLogCollector collector)
   {
     if (NullHandling.replaceWithDefault() || !selector.isNull()) {
-      collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(selector.getDouble())).asBytes());
+      addDoubleToCollector(collector, selector.getDouble());
     }
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java
index 59a242f..2b04e1f 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java
@@ -34,6 +34,11 @@
 public class FloatCardinalityAggregatorColumnSelectorStrategy
     implements CardinalityAggregatorColumnSelectorStrategy<BaseFloatColumnValueSelector>
 {
+  public static void addFloatToCollector(final HyperLogLogCollector collector, final float n)
+  {
+    collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(n)).asBytes());
+  }
+
   @Override
   public void hashRow(BaseFloatColumnValueSelector selector, Hasher hasher)
   {
@@ -46,7 +51,7 @@
   public void hashValues(BaseFloatColumnValueSelector selector, HyperLogLogCollector collector)
   {
     if (NullHandling.replaceWithDefault() || !selector.isNull()) {
-      collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(selector.getFloat())).asBytes());
+      addFloatToCollector(collector, selector.getFloat());
     }
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java
index d6ffea5..75a0f3f 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java
@@ -34,6 +34,11 @@
 public class LongCardinalityAggregatorColumnSelectorStrategy
     implements CardinalityAggregatorColumnSelectorStrategy<BaseLongColumnValueSelector>
 {
+  public static void addLongToCollector(final HyperLogLogCollector collector, final long n)
+  {
+    collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(n).asBytes());
+  }
+
   @Override
   public void hashRow(BaseLongColumnValueSelector selector, Hasher hasher)
   {
@@ -46,7 +51,7 @@
   public void hashValues(BaseLongColumnValueSelector selector, HyperLogLogCollector collector)
   {
     if (NullHandling.replaceWithDefault() || !selector.isNull()) {
-      collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(selector.getLong()).asBytes());
+      addLongToCollector(collector, selector.getLong());
     }
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java
index ca4c69c..9d49e30 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java
@@ -26,6 +26,7 @@
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.data.IndexedInts;
 
+import javax.annotation.Nullable;
 import java.util.Arrays;
 
 public class StringCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy<DimensionSelector>
@@ -33,6 +34,16 @@
   public static final String CARDINALITY_AGG_NULL_STRING = "\u0000";
   public static final char CARDINALITY_AGG_SEPARATOR = '\u0001';
 
+  public static void addStringToCollector(final HyperLogLogCollector collector, @Nullable final String s)
+  {
+    // SQL standard spec does not count null values,
+    // Skip counting null values when we are not replacing null with default value.
+    // A special value for null in case null handling is configured to use empty string for null.
+    if (NullHandling.replaceWithDefault() || s != null) {
+      collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(s)).asBytes());
+    }
+  }
+
   @Override
   public void hashRow(DimensionSelector dimSelector, Hasher hasher)
   {
@@ -80,16 +91,11 @@
     for (int i = 0, rowSize = row.size(); i < rowSize; i++) {
       int index = row.get(i);
       final String value = dimSelector.lookupName(index);
-      // SQL standard spec does not count null values,
-      // Skip counting null values when we are not replacing null with default value.
-      // A special value for null in case null handling is configured to use empty string for null.
-      if (NullHandling.replaceWithDefault() || value != null) {
-        collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(value)).asBytes());
-      }
+      addStringToCollector(collector, value);
     }
   }
 
-  private String nullToSpecial(String value)
+  private static String nullToSpecial(String value)
   {
     return value == null ? CARDINALITY_AGG_NULL_STRING : value;
   }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java
new file mode 100644
index 0000000..9c79d0b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Processor for {@link org.apache.druid.query.aggregation.cardinality.CardinalityVectorAggregator}.
+ */
+public interface CardinalityVectorProcessor
+{
+  /**
+   * Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)}
+   * in byRow = false mode.
+   */
+  void aggregate(ByteBuffer buf, int position, int startRow, int endRow);
+
+  /**
+   * Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)}
+   * in byRow = false mode.
+   */
+  void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset);
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java
new file mode 100644
index 0000000..3d74583
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.segment.VectorColumnProcessorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+public class CardinalityVectorProcessorFactory implements VectorColumnProcessorFactory<CardinalityVectorProcessor>
+{
+  public static final CardinalityVectorProcessorFactory INSTANCE = new CardinalityVectorProcessorFactory();
+
+  @Override
+  public CardinalityVectorProcessor makeSingleValueDimensionProcessor(
+      ColumnCapabilities capabilities,
+      SingleValueDimensionVectorSelector selector
+  )
+  {
+    return new SingleValueStringCardinalityVectorProcessor(selector);
+  }
+
+  @Override
+  public CardinalityVectorProcessor makeMultiValueDimensionProcessor(
+      ColumnCapabilities capabilities,
+      MultiValueDimensionVectorSelector selector
+  )
+  {
+    return new MultiValueStringCardinalityVectorProcessor(selector);
+  }
+
+  @Override
+  public CardinalityVectorProcessor makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+  {
+    return new FloatCardinalityVectorProcessor(selector);
+  }
+
+  @Override
+  public CardinalityVectorProcessor makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+  {
+    return new DoubleCardinalityVectorProcessor(selector);
+  }
+
+  @Override
+  public CardinalityVectorProcessor makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+  {
+    return new LongCardinalityVectorProcessor(selector);
+  }
+
+  @Override
+  public CardinalityVectorProcessor makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
+  {
+    return NilCardinalityVectorProcessor.INSTANCE;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java
new file mode 100644
index 0000000..ad63f1a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.DoubleCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class DoubleCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+  private final VectorValueSelector selector;
+
+  public DoubleCardinalityVectorProcessor(final VectorValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final double[] vector = selector.getDoubleVector();
+      final boolean[] nullVector = selector.getNullVector();
+
+      buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+      buf.position(position);
+
+      final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+      for (int i = startRow; i < endRow; i++) {
+        if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) {
+          DoubleCardinalityAggregatorColumnSelectorStrategy.addDoubleToCollector(collector, vector[i]);
+        }
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final double[] vector = selector.getDoubleVector();
+      final boolean[] nullVector = selector.getNullVector();
+
+      for (int i = 0; i < numRows; i++) {
+        final int idx = rows != null ? rows[i] : i;
+        if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) {
+          final int position = positions[i] + positionOffset;
+          buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+          buf.position(position);
+          final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+          DoubleCardinalityAggregatorColumnSelectorStrategy.addDoubleToCollector(collector, vector[idx]);
+        }
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java
new file mode 100644
index 0000000..1be4dd3
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.FloatCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class FloatCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+  private final VectorValueSelector selector;
+
+  public FloatCardinalityVectorProcessor(final VectorValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final float[] vector = selector.getFloatVector();
+      final boolean[] nullVector = selector.getNullVector();
+
+      buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+      buf.position(position);
+
+      final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+      for (int i = startRow; i < endRow; i++) {
+        if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) {
+          FloatCardinalityAggregatorColumnSelectorStrategy.addFloatToCollector(collector, vector[i]);
+        }
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final float[] vector = selector.getFloatVector();
+      final boolean[] nullVector = selector.getNullVector();
+
+      for (int i = 0; i < numRows; i++) {
+        final int idx = rows != null ? rows[i] : i;
+        if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) {
+          final int position = positions[i] + positionOffset;
+          buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+          buf.position(position);
+          final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+          FloatCardinalityAggregatorColumnSelectorStrategy.addFloatToCollector(collector, vector[idx]);
+        }
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java
new file mode 100644
index 0000000..f69ab5f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.LongCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class LongCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+  private final VectorValueSelector selector;
+
+  public LongCardinalityVectorProcessor(final VectorValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final long[] vector = selector.getLongVector();
+      final boolean[] nullVector = selector.getNullVector();
+
+      buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+      buf.position(position);
+
+      final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+      for (int i = startRow; i < endRow; i++) {
+        if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) {
+          LongCardinalityAggregatorColumnSelectorStrategy.addLongToCollector(collector, vector[i]);
+        }
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final long[] vector = selector.getLongVector();
+      final boolean[] nullVector = selector.getNullVector();
+
+      for (int i = 0; i < numRows; i++) {
+        final int idx = rows != null ? rows[i] : i;
+        if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) {
+          final int position = positions[i] + positionOffset;
+          buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+          buf.position(position);
+          final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+          LongCardinalityAggregatorColumnSelectorStrategy.addLongToCollector(collector, vector[idx]);
+        }
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java
new file mode 100644
index 0000000..1ccee04
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class MultiValueStringCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+  private final MultiValueDimensionVectorSelector selector;
+
+  public MultiValueStringCardinalityVectorProcessor(final MultiValueDimensionVectorSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final IndexedInts[] vector = selector.getRowVector();
+
+      buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+      buf.position(position);
+
+      final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+      for (int i = startRow; i < endRow; i++) {
+        final IndexedInts ids = vector[i];
+        final int sz = ids.size();
+
+        for (int j = 0; j < sz; j++) {
+          final String value = selector.lookupName(ids.get(j));
+          StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, value);
+        }
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final IndexedInts[] vector = selector.getRowVector();
+
+      for (int i = 0; i < numRows; i++) {
+        final IndexedInts ids = vector[rows != null ? rows[i] : i];
+        final int sz = ids.size();
+
+        for (int j = 0; j < sz; j++) {
+          final String s = selector.lookupName(ids.get(j));
+          if (NullHandling.replaceWithDefault() || s != null) {
+            final int position = positions[i] + positionOffset;
+            buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+            buf.position(position);
+            final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+            StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s);
+          }
+        }
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java
new file mode 100644
index 0000000..42f1e0b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class NilCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+  public static final NilCardinalityVectorProcessor INSTANCE = new NilCardinalityVectorProcessor();
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    // Do nothing.
+  }
+
+  @Override
+  public void aggregate(
+      ByteBuffer buf,
+      int numRows,
+      int[] positions,
+      @Nullable int[] rows,
+      int positionOffset
+  )
+  {
+    // Do nothing.
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java
new file mode 100644
index 0000000..080ce04
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueStringCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+  private final SingleValueDimensionVectorSelector selector;
+
+  public SingleValueStringCardinalityVectorProcessor(final SingleValueDimensionVectorSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final int[] vector = selector.getRowVector();
+
+      buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+      buf.position(position);
+
+      final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+      for (int i = startRow; i < endRow; i++) {
+        final String value = selector.lookupName(vector[i]);
+        StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, value);
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+  {
+    // Save position, limit and restore later instead of allocating a new ByteBuffer object
+    final int oldPosition = buf.position();
+    final int oldLimit = buf.limit();
+
+    try {
+      final int[] vector = selector.getRowVector();
+
+      for (int i = 0; i < numRows; i++) {
+        final String s = selector.lookupName(vector[rows != null ? rows[i] : i]);
+
+        if (NullHandling.replaceWithDefault() || s != null) {
+          final int position = positions[i] + positionOffset;
+          buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+          buf.position(position);
+          final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+          StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s);
+        }
+      }
+    }
+    finally {
+      buf.limit(oldLimit);
+      buf.position(oldPosition);
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java
new file mode 100644
index 0000000..0206d11
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.vector.DoubleCardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.cardinality.vector.FloatCardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.cardinality.vector.LongCardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.cardinality.vector.MultiValueStringCardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.cardinality.vector.SingleValueStringCardinalityVectorProcessor;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.data.ArrayBasedIndexedInts;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.vector.BaseDoubleVectorValueSelector;
+import org.apache.druid.segment.vector.BaseFloatVectorValueSelector;
+import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.NoFilterVectorOffset;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class CardinalityVectorAggregatorTest extends InitializedNullHandlingTest
+{
+  @Test
+  public void testAggregateLong()
+  {
+    final long[] values = {1, 2, 2, 3, 3, 3, 0};
+    final boolean[] nulls = NullHandling.replaceWithDefault()
+                            ? null
+                            : new boolean[]{false, false, false, false, false, false, true};
+
+    final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+        Collections.singletonList(
+            new LongCardinalityVectorProcessor(
+                new BaseLongVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length))
+                {
+                  @Override
+                  public long[] getLongVector()
+                  {
+                    return values;
+                  }
+
+                  @Nullable
+                  @Override
+                  public boolean[] getNullVector()
+                  {
+                    return nulls;
+                  }
+                }
+            )
+        )
+    );
+
+    testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3);
+  }
+
+  @Test
+  public void testAggregateDouble()
+  {
+    final double[] values = {1, 2, 2, 3, 3, 3, 0};
+    final boolean[] nulls = NullHandling.replaceWithDefault()
+                            ? null
+                            : new boolean[]{false, false, false, false, false, false, true};
+
+    final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+        Collections.singletonList(
+            new DoubleCardinalityVectorProcessor(
+                new BaseDoubleVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length))
+                {
+                  @Override
+                  public double[] getDoubleVector()
+                  {
+                    return values;
+                  }
+
+                  @Nullable
+                  @Override
+                  public boolean[] getNullVector()
+                  {
+                    return nulls;
+                  }
+                }
+            )
+        )
+    );
+
+    testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3);
+  }
+
+  @Test
+  public void testAggregateFloat()
+  {
+    final float[] values = {1, 2, 2, 3, 3, 3, 0};
+    final boolean[] nulls = NullHandling.replaceWithDefault()
+                            ? null
+                            : new boolean[]{false, false, false, false, false, false, true};
+
+    final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+        Collections.singletonList(
+            new FloatCardinalityVectorProcessor(
+                new BaseFloatVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length))
+                {
+                  @Override
+                  public float[] getFloatVector()
+                  {
+                    return values;
+                  }
+
+                  @Nullable
+                  @Override
+                  public boolean[] getNullVector()
+                  {
+                    return nulls;
+                  }
+                }
+            )
+        )
+    );
+
+    testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3);
+  }
+
+  @Test
+  public void testAggregateSingleValueString()
+  {
+    final int[] ids = {1, 2, 2, 3, 3, 3, 0};
+    final String[] dict = {null, "abc", "def", "foo"};
+
+    final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+        Collections.singletonList(
+            new SingleValueStringCardinalityVectorProcessor(
+                new SingleValueDimensionVectorSelector()
+                {
+                  @Override
+                  public int[] getRowVector()
+                  {
+                    return ids;
+                  }
+
+                  @Override
+                  public int getValueCardinality()
+                  {
+                    return dict.length;
+                  }
+
+                  @Nullable
+                  @Override
+                  public String lookupName(int id)
+                  {
+                    return dict[id];
+                  }
+
+                  @Override
+                  public boolean nameLookupPossibleInAdvance()
+                  {
+                    return true;
+                  }
+
+                  @Nullable
+                  @Override
+                  public IdLookup idLookup()
+                  {
+                    return null;
+                  }
+
+                  @Override
+                  public int getMaxVectorSize()
+                  {
+                    return ids.length;
+                  }
+
+                  @Override
+                  public int getCurrentVectorSize()
+                  {
+                    return ids.length;
+                  }
+                }
+            )
+        )
+    );
+
+    testAggregate(aggregator, ids.length, NullHandling.replaceWithDefault() ? 4 : 3);
+  }
+
+  @Test
+  public void testAggregateMultiValueString()
+  {
+    final IndexedInts[] ids = {
+        new ArrayBasedIndexedInts(new int[]{1, 2}),
+        new ArrayBasedIndexedInts(new int[]{2, 3}),
+        new ArrayBasedIndexedInts(new int[]{3, 3}),
+        new ArrayBasedIndexedInts(new int[]{0})
+    };
+
+    final String[] dict = {null, "abc", "def", "foo"};
+
+    final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+        Collections.singletonList(
+            new MultiValueStringCardinalityVectorProcessor(
+                new MultiValueDimensionVectorSelector()
+                {
+                  @Override
+                  public IndexedInts[] getRowVector()
+                  {
+                    return ids;
+                  }
+
+                  @Override
+                  public int getValueCardinality()
+                  {
+                    return dict.length;
+                  }
+
+                  @Nullable
+                  @Override
+                  public String lookupName(int id)
+                  {
+                    return dict[id];
+                  }
+
+                  @Override
+                  public boolean nameLookupPossibleInAdvance()
+                  {
+                    return true;
+                  }
+
+                  @Nullable
+                  @Override
+                  public IdLookup idLookup()
+                  {
+                    return null;
+                  }
+
+                  @Override
+                  public int getMaxVectorSize()
+                  {
+                    return ids.length;
+                  }
+
+                  @Override
+                  public int getCurrentVectorSize()
+                  {
+                    return ids.length;
+                  }
+                }
+            )
+        )
+    );
+
+    testAggregate(aggregator, ids.length, NullHandling.replaceWithDefault() ? 4 : 3);
+  }
+
+  private static void testAggregate(
+      final CardinalityVectorAggregator aggregator,
+      final int numRows,
+      final double expectedResult
+  )
+  {
+    testAggregateStyle1(aggregator, numRows, expectedResult);
+    testAggregateStyle2(aggregator, numRows, expectedResult);
+  }
+
+  private static void testAggregateStyle1(
+      final CardinalityVectorAggregator aggregator,
+      final int numRows,
+      final double expectedResult
+  )
+  {
+    final int position = 1;
+    final ByteBuffer buf = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage() + position);
+    aggregator.init(buf, position);
+    aggregator.aggregate(buf, position, 0, numRows);
+
+    Assert.assertEquals(
+        "style1",
+        expectedResult,
+        ((HyperLogLogCollector) aggregator.get(buf, position)).estimateCardinality(),
+        0.01
+    );
+  }
+
+  private static void testAggregateStyle2(
+      final CardinalityVectorAggregator aggregator,
+      final int numRows,
+      final double expectedResult
+  )
+  {
+    final int positionOffset = 1;
+
+    final int aggregatorSize = HyperLogLogCollector.getLatestNumBytesForDenseStorage();
+    final ByteBuffer buf = ByteBuffer.allocate(positionOffset + 2 * aggregatorSize);
+    aggregator.init(buf, positionOffset);
+    aggregator.init(buf, positionOffset + aggregatorSize);
+
+    final int[] positions = new int[numRows];
+    final int[] rows = new int[numRows];
+
+    for (int i = 0; i < numRows; i++) {
+      positions[i] = (i % 2) * aggregatorSize;
+      rows[i] = (i + 1) % numRows;
+    }
+
+    aggregator.aggregate(buf, numRows, positions, rows, positionOffset);
+
+    Assert.assertEquals(
+        "style2",
+        expectedResult,
+        ((HyperLogLogCollector) aggregator.get(buf, positionOffset))
+            .fold((HyperLogLogCollector) aggregator.get(buf, positionOffset + aggregatorSize))
+            .estimateCardinality(),
+        0.01
+    );
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 8d0c864..4139c8c 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -2566,9 +2566,6 @@
   @Test
   public void testGroupByWithCardinality()
   {
-    // Cannot vectorize due to "cardinality" aggregator.
-    cannotVectorize();
-
     GroupByQuery query = makeQueryBuilder()
         .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
         .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
@@ -2684,7 +2681,7 @@
   @Test
   public void testGroupByWithNoResult()
   {
-    // Cannot vectorize due to "cardinality" aggregator.
+    // Cannot vectorize due to first, last aggregators.
     cannotVectorize();
 
     GroupByQuery query = makeQueryBuilder()
@@ -8684,7 +8681,7 @@
   @Test
   public void testGroupByCardinalityAggWithExtractionFn()
   {
-    // Cannot vectorize due to "cardinality" aggregator.
+    // Cannot vectorize due to extraction dimension spec.
     cannotVectorize();
 
     String helloJsFn = "function(str) { return 'hello' }";
@@ -8776,9 +8773,6 @@
   @Test
   public void testGroupByCardinalityAggOnFloat()
   {
-    // Cannot vectorize due to "cardinality" aggregator.
-    cannotVectorize();
-
     GroupByQuery query = makeQueryBuilder()
         .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
         .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 0ce2d2b..cf30419 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -3459,9 +3459,6 @@
   @Test
   public void testHavingOnApproximateCountDistinct() throws Exception
   {
-    // Cannot vectorize due to "cardinality" aggregator.
-    cannotVectorize();
-
     testQuery(
         "SELECT dim2, COUNT(DISTINCT m1) FROM druid.foo GROUP BY dim2 HAVING COUNT(DISTINCT m1) > 1",
         ImmutableList.of(
@@ -6269,9 +6266,6 @@
   @Test
   public void testFilteredAggregations() throws Exception
   {
-    // Cannot vectorize due to "cardinality" aggregator.
-    cannotVectorize();
-
     testQuery(
         "SELECT "
         + "SUM(case dim1 when 'abc' then cnt end), "
@@ -7657,9 +7651,6 @@
   @Test
   public void testCountDistinct() throws Exception
   {
-    // Cannot vectorize due to "cardinality" aggregator.
-    cannotVectorize();
-
     testQuery(
         "SELECT SUM(cnt), COUNT(distinct dim2), COUNT(distinct unique_dim1) FROM druid.foo",
         ImmutableList.of(
@@ -7692,9 +7683,6 @@
   @Test
   public void testCountDistinctOfCaseWhen() throws Exception
   {
-    // Cannot vectorize due to "cardinality" aggregator.
-    cannotVectorize();
-
     testQuery(
         "SELECT\n"
         + "COUNT(DISTINCT CASE WHEN m1 >= 4 THEN m1 END),\n"
@@ -7787,9 +7775,6 @@
   {
     // When HLL is disabled, APPROX_COUNT_DISTINCT is still approximate.
 
-    // Cannot vectorize due to "cardinality" aggregator.
-    cannotVectorize();
-
     testQuery(
         PLANNER_CONFIG_NO_HLL,
         "SELECT APPROX_COUNT_DISTINCT(dim2) FROM druid.foo",
@@ -8362,7 +8347,7 @@
   @Test
   public void testAvgDailyCountDistinct() throws Exception
   {
-    // Cannot vectorize due to virtual columns.
+    // Cannot vectorize outer query due to inlined inner query.
     cannotVectorize();
 
     testQuery(
@@ -9034,9 +9019,6 @@
   @Test
   public void testCountDistinctArithmetic() throws Exception
   {
-    // Cannot vectorize due to "cardinality" aggregator.
-    cannotVectorize();
-
     testQuery(
         "SELECT\n"
         + "  SUM(cnt),\n"
@@ -9081,7 +9063,7 @@
   @Test
   public void testCountDistinctOfSubstring() throws Exception
   {
-    // Cannot vectorize due to "cardinality" aggregator.
+    // Cannot vectorize due to extraction dimension spec.
     cannotVectorize();
 
     testQuery(
@@ -11808,7 +11790,7 @@
   @Test
   public void testCountDistinctOfLookup() throws Exception
   {
-    // Cannot vectorize due to "cardinality" aggregator.
+    // Cannot vectorize due to extraction dimension spec.
     cannotVectorize();
 
     final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn(
@@ -15219,7 +15201,7 @@
         )
     );
 
-    // Cannot vectorize next test due to "cardinality" aggregator.
+    // Cannot vectorize next test due to extraction dimension spec.
     cannotVectorize();
 
     // semi-join requires time condition on both left and right query