Vectorize the cardinality aggregator. (#11182)
* Vectorize the cardinality aggregator.
Does not include a byRow implementation, so if byRow is true then
the aggregator still goes through the non-vectorized path.
Testing strategy:
- New tests that exercise both styles of "aggregate" for supported types.
- Some existing tests have also become active (note the deleted
"cannotVectorize" lines).
* Adjust whitespace.
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
index 4990d5d..57c5c7e 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
@@ -35,15 +35,21 @@
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NoopAggregator;
import org.apache.druid.query.aggregation.NoopBufferAggregator;
+import org.apache.druid.query.aggregation.NoopVectorAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
+import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@@ -146,7 +152,6 @@
return new CardinalityAggregator(selectorPluses, byRow);
}
-
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
{
@@ -164,6 +169,32 @@
}
@Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+ {
+ if (fields.isEmpty()) {
+ return NoopVectorAggregator.instance();
+ }
+
+ return new CardinalityVectorAggregator(
+ fields.stream().map(
+ field ->
+ ColumnProcessors.makeVectorProcessor(
+ field,
+ CardinalityVectorProcessorFactory.INSTANCE,
+ selectorFactory
+ )
+ ).collect(Collectors.toList())
+ );
+ }
+
+ @Override
+ public boolean canVectorize(ColumnInspector columnInspector)
+ {
+ // !byRow because there is not yet a vector implementation.
+ return !byRow && fields.stream().allMatch(DimensionSpec::canVectorize);
+ }
+
+ @Override
public Comparator getComparator()
{
return new Comparator<HyperLogLogCollector>()
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java
index 64d70d4..0e3d698 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java
@@ -55,10 +55,11 @@
// Save position, limit and restore later instead of allocating a new ByteBuffer object
final int oldPosition = buf.position();
final int oldLimit = buf.limit();
- buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
- buf.position(position);
try {
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
if (byRow) {
CardinalityAggregator.hashRow(selectorPluses, collector);
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java
new file mode 100644
index 0000000..59d1ecf
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality;
+
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesBufferAggregator;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class CardinalityVectorAggregator implements VectorAggregator
+{
+ private final List<CardinalityVectorProcessor> processors;
+
+ CardinalityVectorAggregator(List<CardinalityVectorProcessor> processors)
+ {
+ this.processors = processors;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ HyperUniquesBufferAggregator.doInit(buf, position);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ for (final CardinalityVectorProcessor processor : processors) {
+ processor.aggregate(buf, position, startRow, endRow);
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+ {
+ for (final CardinalityVectorProcessor processor : processors) {
+ processor.aggregate(buf, numRows, positions, rows, positionOffset);
+ }
+ }
+
+ @Nullable
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return HyperUniquesBufferAggregator.doGet(buf, position);
+ }
+
+ @Override
+ public void close()
+ {
+ // Nothing to close.
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java
index 1471475..4fc7a32 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java
@@ -34,6 +34,11 @@
public class DoubleCardinalityAggregatorColumnSelectorStrategy
implements CardinalityAggregatorColumnSelectorStrategy<BaseDoubleColumnValueSelector>
{
+ public static void addDoubleToCollector(final HyperLogLogCollector collector, final double n)
+ {
+ collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(n)).asBytes());
+ }
+
@Override
public void hashRow(BaseDoubleColumnValueSelector selector, Hasher hasher)
{
@@ -46,7 +51,7 @@
public void hashValues(BaseDoubleColumnValueSelector selector, HyperLogLogCollector collector)
{
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
- collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(selector.getDouble())).asBytes());
+ addDoubleToCollector(collector, selector.getDouble());
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java
index 59a242f..2b04e1f 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java
@@ -34,6 +34,11 @@
public class FloatCardinalityAggregatorColumnSelectorStrategy
implements CardinalityAggregatorColumnSelectorStrategy<BaseFloatColumnValueSelector>
{
+ public static void addFloatToCollector(final HyperLogLogCollector collector, final float n)
+ {
+ collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(n)).asBytes());
+ }
+
@Override
public void hashRow(BaseFloatColumnValueSelector selector, Hasher hasher)
{
@@ -46,7 +51,7 @@
public void hashValues(BaseFloatColumnValueSelector selector, HyperLogLogCollector collector)
{
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
- collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(selector.getFloat())).asBytes());
+ addFloatToCollector(collector, selector.getFloat());
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java
index d6ffea5..75a0f3f 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java
@@ -34,6 +34,11 @@
public class LongCardinalityAggregatorColumnSelectorStrategy
implements CardinalityAggregatorColumnSelectorStrategy<BaseLongColumnValueSelector>
{
+ public static void addLongToCollector(final HyperLogLogCollector collector, final long n)
+ {
+ collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(n).asBytes());
+ }
+
@Override
public void hashRow(BaseLongColumnValueSelector selector, Hasher hasher)
{
@@ -46,7 +51,7 @@
public void hashValues(BaseLongColumnValueSelector selector, HyperLogLogCollector collector)
{
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
- collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(selector.getLong()).asBytes());
+ addLongToCollector(collector, selector.getLong());
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java
index ca4c69c..9d49e30 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java
@@ -26,6 +26,7 @@
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts;
+import javax.annotation.Nullable;
import java.util.Arrays;
public class StringCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy<DimensionSelector>
@@ -33,6 +34,16 @@
public static final String CARDINALITY_AGG_NULL_STRING = "\u0000";
public static final char CARDINALITY_AGG_SEPARATOR = '\u0001';
+ public static void addStringToCollector(final HyperLogLogCollector collector, @Nullable final String s)
+ {
+ // SQL standard spec does not count null values,
+ // Skip counting null values when we are not replacing null with default value.
+ // A special value for null in case null handling is configured to use empty string for null.
+ if (NullHandling.replaceWithDefault() || s != null) {
+ collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(s)).asBytes());
+ }
+ }
+
@Override
public void hashRow(DimensionSelector dimSelector, Hasher hasher)
{
@@ -80,16 +91,11 @@
for (int i = 0, rowSize = row.size(); i < rowSize; i++) {
int index = row.get(i);
final String value = dimSelector.lookupName(index);
- // SQL standard spec does not count null values,
- // Skip counting null values when we are not replacing null with default value.
- // A special value for null in case null handling is configured to use empty string for null.
- if (NullHandling.replaceWithDefault() || value != null) {
- collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(value)).asBytes());
- }
+ addStringToCollector(collector, value);
}
}
- private String nullToSpecial(String value)
+ private static String nullToSpecial(String value)
{
return value == null ? CARDINALITY_AGG_NULL_STRING : value;
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java
new file mode 100644
index 0000000..9c79d0b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Processor for {@link org.apache.druid.query.aggregation.cardinality.CardinalityVectorAggregator}.
+ */
+public interface CardinalityVectorProcessor
+{
+ /**
+ * Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)}
+ * in byRow = false mode.
+ */
+ void aggregate(ByteBuffer buf, int position, int startRow, int endRow);
+
+ /**
+ * Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)}
+ * in byRow = false mode.
+ */
+ void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset);
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java
new file mode 100644
index 0000000..3d74583
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.segment.VectorColumnProcessorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+public class CardinalityVectorProcessorFactory implements VectorColumnProcessorFactory<CardinalityVectorProcessor>
+{
+ public static final CardinalityVectorProcessorFactory INSTANCE = new CardinalityVectorProcessorFactory();
+
+ @Override
+ public CardinalityVectorProcessor makeSingleValueDimensionProcessor(
+ ColumnCapabilities capabilities,
+ SingleValueDimensionVectorSelector selector
+ )
+ {
+ return new SingleValueStringCardinalityVectorProcessor(selector);
+ }
+
+ @Override
+ public CardinalityVectorProcessor makeMultiValueDimensionProcessor(
+ ColumnCapabilities capabilities,
+ MultiValueDimensionVectorSelector selector
+ )
+ {
+ return new MultiValueStringCardinalityVectorProcessor(selector);
+ }
+
+ @Override
+ public CardinalityVectorProcessor makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+ {
+ return new FloatCardinalityVectorProcessor(selector);
+ }
+
+ @Override
+ public CardinalityVectorProcessor makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+ {
+ return new DoubleCardinalityVectorProcessor(selector);
+ }
+
+ @Override
+ public CardinalityVectorProcessor makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+ {
+ return new LongCardinalityVectorProcessor(selector);
+ }
+
+ @Override
+ public CardinalityVectorProcessor makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
+ {
+ return NilCardinalityVectorProcessor.INSTANCE;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java
new file mode 100644
index 0000000..ad63f1a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.DoubleCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class DoubleCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+ private final VectorValueSelector selector;
+
+ public DoubleCardinalityVectorProcessor(final VectorValueSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final double[] vector = selector.getDoubleVector();
+ final boolean[] nullVector = selector.getNullVector();
+
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+ for (int i = startRow; i < endRow; i++) {
+ if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) {
+ DoubleCardinalityAggregatorColumnSelectorStrategy.addDoubleToCollector(collector, vector[i]);
+ }
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final double[] vector = selector.getDoubleVector();
+ final boolean[] nullVector = selector.getNullVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final int idx = rows != null ? rows[i] : i;
+ if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) {
+ final int position = positions[i] + positionOffset;
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+ DoubleCardinalityAggregatorColumnSelectorStrategy.addDoubleToCollector(collector, vector[idx]);
+ }
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java
new file mode 100644
index 0000000..1be4dd3
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.FloatCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class FloatCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+ private final VectorValueSelector selector;
+
+ public FloatCardinalityVectorProcessor(final VectorValueSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final float[] vector = selector.getFloatVector();
+ final boolean[] nullVector = selector.getNullVector();
+
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+ for (int i = startRow; i < endRow; i++) {
+ if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) {
+ FloatCardinalityAggregatorColumnSelectorStrategy.addFloatToCollector(collector, vector[i]);
+ }
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final float[] vector = selector.getFloatVector();
+ final boolean[] nullVector = selector.getNullVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final int idx = rows != null ? rows[i] : i;
+ if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) {
+ final int position = positions[i] + positionOffset;
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+ FloatCardinalityAggregatorColumnSelectorStrategy.addFloatToCollector(collector, vector[idx]);
+ }
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java
new file mode 100644
index 0000000..f69ab5f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.LongCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class LongCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+ private final VectorValueSelector selector;
+
+ public LongCardinalityVectorProcessor(final VectorValueSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final long[] vector = selector.getLongVector();
+ final boolean[] nullVector = selector.getNullVector();
+
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+ for (int i = startRow; i < endRow; i++) {
+ if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) {
+ LongCardinalityAggregatorColumnSelectorStrategy.addLongToCollector(collector, vector[i]);
+ }
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final long[] vector = selector.getLongVector();
+ final boolean[] nullVector = selector.getNullVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final int idx = rows != null ? rows[i] : i;
+ if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) {
+ final int position = positions[i] + positionOffset;
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+ LongCardinalityAggregatorColumnSelectorStrategy.addLongToCollector(collector, vector[idx]);
+ }
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java
new file mode 100644
index 0000000..1ccee04
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class MultiValueStringCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+ private final MultiValueDimensionVectorSelector selector;
+
+ public MultiValueStringCardinalityVectorProcessor(final MultiValueDimensionVectorSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final IndexedInts[] vector = selector.getRowVector();
+
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+ for (int i = startRow; i < endRow; i++) {
+ final IndexedInts ids = vector[i];
+ final int sz = ids.size();
+
+ for (int j = 0; j < sz; j++) {
+ final String value = selector.lookupName(ids.get(j));
+ StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, value);
+ }
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final IndexedInts[] vector = selector.getRowVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final IndexedInts ids = vector[rows != null ? rows[i] : i];
+ final int sz = ids.size();
+
+ for (int j = 0; j < sz; j++) {
+ final String s = selector.lookupName(ids.get(j));
+ if (NullHandling.replaceWithDefault() || s != null) {
+ final int position = positions[i] + positionOffset;
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+ StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s);
+ }
+ }
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java
new file mode 100644
index 0000000..42f1e0b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class NilCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+ public static final NilCardinalityVectorProcessor INSTANCE = new NilCardinalityVectorProcessor();
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ // Do nothing.
+ }
+
+ @Override
+ public void aggregate(
+ ByteBuffer buf,
+ int numRows,
+ int[] positions,
+ @Nullable int[] rows,
+ int positionOffset
+ )
+ {
+ // Do nothing.
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java
new file mode 100644
index 0000000..080ce04
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SingleValueStringCardinalityVectorProcessor implements CardinalityVectorProcessor
+{
+ private final SingleValueDimensionVectorSelector selector;
+
+ public SingleValueStringCardinalityVectorProcessor(final SingleValueDimensionVectorSelector selector)
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final int[] vector = selector.getRowVector();
+
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+
+ for (int i = startRow; i < endRow; i++) {
+ final String value = selector.lookupName(vector[i]);
+ StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, value);
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
+ {
+ // Save position, limit and restore later instead of allocating a new ByteBuffer object
+ final int oldPosition = buf.position();
+ final int oldLimit = buf.limit();
+
+ try {
+ final int[] vector = selector.getRowVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final String s = selector.lookupName(vector[rows != null ? rows[i] : i]);
+
+ if (NullHandling.replaceWithDefault() || s != null) {
+ final int position = positions[i] + positionOffset;
+ buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
+ buf.position(position);
+ final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
+ StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s);
+ }
+ }
+ }
+ finally {
+ buf.limit(oldLimit);
+ buf.position(oldPosition);
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java
new file mode 100644
index 0000000..0206d11
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.cardinality;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.query.aggregation.cardinality.vector.DoubleCardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.cardinality.vector.FloatCardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.cardinality.vector.LongCardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.cardinality.vector.MultiValueStringCardinalityVectorProcessor;
+import org.apache.druid.query.aggregation.cardinality.vector.SingleValueStringCardinalityVectorProcessor;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.data.ArrayBasedIndexedInts;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.vector.BaseDoubleVectorValueSelector;
+import org.apache.druid.segment.vector.BaseFloatVectorValueSelector;
+import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.NoFilterVectorOffset;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class CardinalityVectorAggregatorTest extends InitializedNullHandlingTest
+{
+ @Test
+ public void testAggregateLong()
+ {
+ final long[] values = {1, 2, 2, 3, 3, 3, 0};
+ final boolean[] nulls = NullHandling.replaceWithDefault()
+ ? null
+ : new boolean[]{false, false, false, false, false, false, true};
+
+ final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+ Collections.singletonList(
+ new LongCardinalityVectorProcessor(
+ new BaseLongVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length))
+ {
+ @Override
+ public long[] getLongVector()
+ {
+ return values;
+ }
+
+ @Nullable
+ @Override
+ public boolean[] getNullVector()
+ {
+ return nulls;
+ }
+ }
+ )
+ )
+ );
+
+ testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3);
+ }
+
+ @Test
+ public void testAggregateDouble()
+ {
+ final double[] values = {1, 2, 2, 3, 3, 3, 0};
+ final boolean[] nulls = NullHandling.replaceWithDefault()
+ ? null
+ : new boolean[]{false, false, false, false, false, false, true};
+
+ final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+ Collections.singletonList(
+ new DoubleCardinalityVectorProcessor(
+ new BaseDoubleVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length))
+ {
+ @Override
+ public double[] getDoubleVector()
+ {
+ return values;
+ }
+
+ @Nullable
+ @Override
+ public boolean[] getNullVector()
+ {
+ return nulls;
+ }
+ }
+ )
+ )
+ );
+
+ testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3);
+ }
+
+ @Test
+ public void testAggregateFloat()
+ {
+ final float[] values = {1, 2, 2, 3, 3, 3, 0};
+ final boolean[] nulls = NullHandling.replaceWithDefault()
+ ? null
+ : new boolean[]{false, false, false, false, false, false, true};
+
+ final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+ Collections.singletonList(
+ new FloatCardinalityVectorProcessor(
+ new BaseFloatVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length))
+ {
+ @Override
+ public float[] getFloatVector()
+ {
+ return values;
+ }
+
+ @Nullable
+ @Override
+ public boolean[] getNullVector()
+ {
+ return nulls;
+ }
+ }
+ )
+ )
+ );
+
+ testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3);
+ }
+
+ @Test
+ public void testAggregateSingleValueString()
+ {
+ final int[] ids = {1, 2, 2, 3, 3, 3, 0};
+ final String[] dict = {null, "abc", "def", "foo"};
+
+ final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+ Collections.singletonList(
+ new SingleValueStringCardinalityVectorProcessor(
+ new SingleValueDimensionVectorSelector()
+ {
+ @Override
+ public int[] getRowVector()
+ {
+ return ids;
+ }
+
+ @Override
+ public int getValueCardinality()
+ {
+ return dict.length;
+ }
+
+ @Nullable
+ @Override
+ public String lookupName(int id)
+ {
+ return dict[id];
+ }
+
+ @Override
+ public boolean nameLookupPossibleInAdvance()
+ {
+ return true;
+ }
+
+ @Nullable
+ @Override
+ public IdLookup idLookup()
+ {
+ return null;
+ }
+
+ @Override
+ public int getMaxVectorSize()
+ {
+ return ids.length;
+ }
+
+ @Override
+ public int getCurrentVectorSize()
+ {
+ return ids.length;
+ }
+ }
+ )
+ )
+ );
+
+ testAggregate(aggregator, ids.length, NullHandling.replaceWithDefault() ? 4 : 3);
+ }
+
+ @Test
+ public void testAggregateMultiValueString()
+ {
+ final IndexedInts[] ids = {
+ new ArrayBasedIndexedInts(new int[]{1, 2}),
+ new ArrayBasedIndexedInts(new int[]{2, 3}),
+ new ArrayBasedIndexedInts(new int[]{3, 3}),
+ new ArrayBasedIndexedInts(new int[]{0})
+ };
+
+ final String[] dict = {null, "abc", "def", "foo"};
+
+ final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator(
+ Collections.singletonList(
+ new MultiValueStringCardinalityVectorProcessor(
+ new MultiValueDimensionVectorSelector()
+ {
+ @Override
+ public IndexedInts[] getRowVector()
+ {
+ return ids;
+ }
+
+ @Override
+ public int getValueCardinality()
+ {
+ return dict.length;
+ }
+
+ @Nullable
+ @Override
+ public String lookupName(int id)
+ {
+ return dict[id];
+ }
+
+ @Override
+ public boolean nameLookupPossibleInAdvance()
+ {
+ return true;
+ }
+
+ @Nullable
+ @Override
+ public IdLookup idLookup()
+ {
+ return null;
+ }
+
+ @Override
+ public int getMaxVectorSize()
+ {
+ return ids.length;
+ }
+
+ @Override
+ public int getCurrentVectorSize()
+ {
+ return ids.length;
+ }
+ }
+ )
+ )
+ );
+
+ testAggregate(aggregator, ids.length, NullHandling.replaceWithDefault() ? 4 : 3);
+ }
+
+ private static void testAggregate(
+ final CardinalityVectorAggregator aggregator,
+ final int numRows,
+ final double expectedResult
+ )
+ {
+ testAggregateStyle1(aggregator, numRows, expectedResult);
+ testAggregateStyle2(aggregator, numRows, expectedResult);
+ }
+
+ private static void testAggregateStyle1(
+ final CardinalityVectorAggregator aggregator,
+ final int numRows,
+ final double expectedResult
+ )
+ {
+ final int position = 1;
+ final ByteBuffer buf = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage() + position);
+ aggregator.init(buf, position);
+ aggregator.aggregate(buf, position, 0, numRows);
+
+ Assert.assertEquals(
+ "style1",
+ expectedResult,
+ ((HyperLogLogCollector) aggregator.get(buf, position)).estimateCardinality(),
+ 0.01
+ );
+ }
+
+ private static void testAggregateStyle2(
+ final CardinalityVectorAggregator aggregator,
+ final int numRows,
+ final double expectedResult
+ )
+ {
+ final int positionOffset = 1;
+
+ final int aggregatorSize = HyperLogLogCollector.getLatestNumBytesForDenseStorage();
+ final ByteBuffer buf = ByteBuffer.allocate(positionOffset + 2 * aggregatorSize);
+ aggregator.init(buf, positionOffset);
+ aggregator.init(buf, positionOffset + aggregatorSize);
+
+ final int[] positions = new int[numRows];
+ final int[] rows = new int[numRows];
+
+ for (int i = 0; i < numRows; i++) {
+ positions[i] = (i % 2) * aggregatorSize;
+ rows[i] = (i + 1) % numRows;
+ }
+
+ aggregator.aggregate(buf, numRows, positions, rows, positionOffset);
+
+ Assert.assertEquals(
+ "style2",
+ expectedResult,
+ ((HyperLogLogCollector) aggregator.get(buf, positionOffset))
+ .fold((HyperLogLogCollector) aggregator.get(buf, positionOffset + aggregatorSize))
+ .estimateCardinality(),
+ 0.01
+ );
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 8d0c864..4139c8c 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -2566,9 +2566,6 @@
@Test
public void testGroupByWithCardinality()
{
- // Cannot vectorize due to "cardinality" aggregator.
- cannotVectorize();
-
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
@@ -2684,7 +2681,7 @@
@Test
public void testGroupByWithNoResult()
{
- // Cannot vectorize due to "cardinality" aggregator.
+ // Cannot vectorize due to first, last aggregators.
cannotVectorize();
GroupByQuery query = makeQueryBuilder()
@@ -8684,7 +8681,7 @@
@Test
public void testGroupByCardinalityAggWithExtractionFn()
{
- // Cannot vectorize due to "cardinality" aggregator.
+ // Cannot vectorize due to extraction dimension spec.
cannotVectorize();
String helloJsFn = "function(str) { return 'hello' }";
@@ -8776,9 +8773,6 @@
@Test
public void testGroupByCardinalityAggOnFloat()
{
- // Cannot vectorize due to "cardinality" aggregator.
- cannotVectorize();
-
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 0ce2d2b..cf30419 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -3459,9 +3459,6 @@
@Test
public void testHavingOnApproximateCountDistinct() throws Exception
{
- // Cannot vectorize due to "cardinality" aggregator.
- cannotVectorize();
-
testQuery(
"SELECT dim2, COUNT(DISTINCT m1) FROM druid.foo GROUP BY dim2 HAVING COUNT(DISTINCT m1) > 1",
ImmutableList.of(
@@ -6269,9 +6266,6 @@
@Test
public void testFilteredAggregations() throws Exception
{
- // Cannot vectorize due to "cardinality" aggregator.
- cannotVectorize();
-
testQuery(
"SELECT "
+ "SUM(case dim1 when 'abc' then cnt end), "
@@ -7657,9 +7651,6 @@
@Test
public void testCountDistinct() throws Exception
{
- // Cannot vectorize due to "cardinality" aggregator.
- cannotVectorize();
-
testQuery(
"SELECT SUM(cnt), COUNT(distinct dim2), COUNT(distinct unique_dim1) FROM druid.foo",
ImmutableList.of(
@@ -7692,9 +7683,6 @@
@Test
public void testCountDistinctOfCaseWhen() throws Exception
{
- // Cannot vectorize due to "cardinality" aggregator.
- cannotVectorize();
-
testQuery(
"SELECT\n"
+ "COUNT(DISTINCT CASE WHEN m1 >= 4 THEN m1 END),\n"
@@ -7787,9 +7775,6 @@
{
// When HLL is disabled, APPROX_COUNT_DISTINCT is still approximate.
- // Cannot vectorize due to "cardinality" aggregator.
- cannotVectorize();
-
testQuery(
PLANNER_CONFIG_NO_HLL,
"SELECT APPROX_COUNT_DISTINCT(dim2) FROM druid.foo",
@@ -8362,7 +8347,7 @@
@Test
public void testAvgDailyCountDistinct() throws Exception
{
- // Cannot vectorize due to virtual columns.
+ // Cannot vectorize outer query due to inlined inner query.
cannotVectorize();
testQuery(
@@ -9034,9 +9019,6 @@
@Test
public void testCountDistinctArithmetic() throws Exception
{
- // Cannot vectorize due to "cardinality" aggregator.
- cannotVectorize();
-
testQuery(
"SELECT\n"
+ " SUM(cnt),\n"
@@ -9081,7 +9063,7 @@
@Test
public void testCountDistinctOfSubstring() throws Exception
{
- // Cannot vectorize due to "cardinality" aggregator.
+ // Cannot vectorize due to extraction dimension spec.
cannotVectorize();
testQuery(
@@ -11808,7 +11790,7 @@
@Test
public void testCountDistinctOfLookup() throws Exception
{
- // Cannot vectorize due to "cardinality" aggregator.
+ // Cannot vectorize due to extraction dimension spec.
cannotVectorize();
final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn(
@@ -15219,7 +15201,7 @@
)
);
- // Cannot vectorize next test due to "cardinality" aggregator.
+ // Cannot vectorize next test due to extraction dimension spec.
cannotVectorize();
// semi-join requires time condition on both left and right query