Vectorized versions of HllSketch aggregators. (#11115)
* Vectorized versions of HllSketch aggregators.
The patch uses the sameĀ "helper" approach as #10767 and #10304, and
extends the tests to run in both vectorized and non-vectorized modes.
Also includes some minor changes to the theta sketch vector aggregator:
- Cosmetic changes to make the hll and theta implementations look
more similar.
- Extends the theta SQL tests to run in vectorized mode.
* Updates post-code-review.
* Fix javadoc.
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
index 8abc305..df68180 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
@@ -26,8 +26,11 @@
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
@@ -81,6 +84,24 @@
);
}
+ @Override
+ public boolean canVectorize(ColumnInspector columnInspector)
+ {
+ return true;
+ }
+
+ @Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+ {
+ return new HllSketchBuildVectorAggregator(
+ selectorFactory,
+ getFieldName(),
+ getLgK(),
+ TgtHllType.valueOf(getTgtHllType()),
+ getMaxIntermediateSize()
+ );
+ }
+
/**
* For the HLL_4 sketch type, this value can be exceeded slightly in extremely rare cases.
* The sketch will request on-heap memory and move there. It is handled in HllSketchBuildBufferAggregator.
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
index 4c39259..ab54215 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
@@ -19,22 +19,12 @@
package org.apache.druid.query.aggregation.datasketches.hll;
-import com.google.common.util.concurrent.Striped;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
-import org.apache.datasketches.hll.Union;
-import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.IdentityHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
/**
* This aggregator builds sketches from raw data.
@@ -42,26 +32,8 @@
*/
public class HllSketchBuildBufferAggregator implements BufferAggregator
{
-
- /**
- * for locking per buffer position (power of 2 to make index computation faster)
- */
- private static final int NUM_STRIPES = 64;
-
private final ColumnValueSelector<Object> selector;
- private final int lgK;
- private final TgtHllType tgtHllType;
- private final int size;
- private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
- private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
- private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
-
- /**
- * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
- * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
- * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
- */
- private final byte[] emptySketch;
+ private final HllSketchBuildBufferAggregatorHelper helper;
public HllSketchBuildBufferAggregator(
final ColumnValueSelector<Object> selector,
@@ -71,39 +43,15 @@
)
{
this.selector = selector;
- this.lgK = lgK;
- this.tgtHllType = tgtHllType;
- this.size = size;
- this.emptySketch = new byte[size];
-
- //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
- new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
+ this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size);
}
@Override
public void init(final ByteBuffer buf, final int position)
{
- // Copy prebuilt empty sketch object.
-
- final int oldPosition = buf.position();
- try {
- buf.position(position);
- buf.put(emptySketch);
- }
- finally {
- buf.position(oldPosition);
- }
-
- // Add an HllSketch for this chunk to our sketchCache.
- final WritableMemory mem = getMemory(buf).writableRegion(position, size);
- putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
+ helper.init(buf, position);
}
- /**
- * This method uses locks because it can be used during indexing,
- * and Druid can call aggregate() and get() concurrently
- * See https://github.com/druid-io/druid/pull/3956
- */
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
@@ -111,40 +59,20 @@
if (value == null) {
return;
}
- final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock();
- lock.lock();
- try {
- final HllSketch sketch = sketchCache.get(buf).get(position);
- HllSketchBuildAggregator.updateSketch(sketch, value);
- }
- finally {
- lock.unlock();
- }
+
+ HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value);
}
- /**
- * This method uses locks because it can be used during indexing,
- * and Druid can call aggregate() and get() concurrently
- * See https://github.com/druid-io/druid/pull/3956
- */
@Override
public Object get(final ByteBuffer buf, final int position)
{
- final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
- lock.lock();
- try {
- return sketchCache.get(buf).get(position).copy();
- }
- finally {
- lock.unlock();
- }
+ return helper.get(buf, position);
}
@Override
public void close()
{
- memCache.clear();
- sketchCache.clear();
+ helper.clear();
}
@Override
@@ -159,11 +87,6 @@
throw new UnsupportedOperationException("Not implemented");
}
- private WritableMemory getMemory(final ByteBuffer buf)
- {
- return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN));
- }
-
/**
* In very rare cases sketches can exceed given memory, request on-heap memory and move there.
* We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions.
@@ -171,44 +94,7 @@
@Override
public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
{
- HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition);
- final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size);
- if (sketch.isSameResource(oldMem)) { // sketch has not moved
- final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size);
- sketch = HllSketch.writableWrap(newMem);
- }
- putSketchIntoCache(newBuf, newPosition, sketch);
- }
-
- private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch)
- {
- final Int2ObjectMap<HllSketch> map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>());
- map.put(position, sketch);
- }
-
- /**
- * compute lock index to avoid boxing in Striped.get() call
- *
- * @param position
- *
- * @return index
- */
- static int lockIndex(final int position)
- {
- return smear(position) % NUM_STRIPES;
- }
-
- /**
- * see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548
- *
- * @param hashCode
- *
- * @return smeared hashCode
- */
- private static int smear(int hashCode)
- {
- hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
- return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
+ helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
}
@Override
@@ -218,6 +104,6 @@
// lgK should be inspected because different execution paths exist in HllSketch.update() that is called from
// @CalledFromHotLoop-annotated aggregate() depending on the lgK.
// See https://github.com/apache/druid/pull/6893#discussion_r250726028
- inspector.visit("lgK", lgK);
+ inspector.visit("lgK", helper.getLgK());
}
}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java
new file mode 100644
index 0000000..466f1ac
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.TgtHllType;
+import org.apache.datasketches.memory.WritableMemory;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.IdentityHashMap;
+
+public class HllSketchBuildBufferAggregatorHelper
+{
+ private final int lgK;
+ private final int size;
+ private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
+ private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
+
+ /**
+ * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
+ * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
+ * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link org.apache.datasketches.hll.Union} objects.
+ */
+ private final byte[] emptySketch;
+
+ public HllSketchBuildBufferAggregatorHelper(final int lgK, final TgtHllType tgtHllType, final int size)
+ {
+ this.lgK = lgK;
+ this.size = size;
+ this.emptySketch = new byte[size];
+
+ //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
+ new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
+ }
+
+ /**
+ * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and
+ * {@link org.apache.druid.query.aggregation.VectorAggregator#init}.
+ */
+ public void init(final ByteBuffer buf, final int position)
+ {
+ // Copy prebuilt empty sketch object.
+
+ final int oldPosition = buf.position();
+ try {
+ buf.position(position);
+ buf.put(emptySketch);
+ }
+ finally {
+ buf.position(oldPosition);
+ }
+
+ // Add an HllSketch for this chunk to our sketchCache.
+ final WritableMemory mem = getMemory(buf).writableRegion(position, size);
+ putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
+ }
+
+ /**
+ * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
+ * {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
+ */
+ public Object get(ByteBuffer buf, int position)
+ {
+ return sketchCache.get(buf).get(position).copy();
+ }
+
+ /**
+ * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and
+ * {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}.
+ */
+ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuf, ByteBuffer newBuf)
+ {
+ HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition);
+ final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size);
+ if (sketch.isSameResource(oldMem)) { // sketch has not moved
+ final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size);
+ sketch = HllSketch.writableWrap(newMem);
+ }
+ putSketchIntoCache(newBuf, newPosition, sketch);
+ }
+
+ /**
+ * Retrieves the sketch at a particular position.
+ */
+ public HllSketch getSketchAtPosition(final ByteBuffer buf, final int position)
+ {
+ return sketchCache.get(buf).get(position);
+ }
+
+ /**
+ * Clean up resources used by this helper.
+ */
+ public void clear()
+ {
+ memCache.clear();
+ sketchCache.clear();
+ }
+
+ public int getLgK()
+ {
+ return lgK;
+ }
+
+ private WritableMemory getMemory(final ByteBuffer buf)
+ {
+ return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN));
+ }
+
+ private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch)
+ {
+ final Int2ObjectMap<HllSketch> map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>());
+ map.put(position, sketch);
+ }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java
new file mode 100644
index 0000000..506c9c3
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.TgtHllType;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class HllSketchBuildVectorAggregator implements VectorAggregator
+{
+ private final HllSketchBuildBufferAggregatorHelper helper;
+ private final Supplier<Object[]> objectSupplier;
+
+ HllSketchBuildVectorAggregator(
+ final VectorColumnSelectorFactory columnSelectorFactory,
+ final String column,
+ final int lgK,
+ final TgtHllType tgtHllType,
+ final int size
+ )
+ {
+ this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size);
+ this.objectSupplier =
+ ColumnProcessors.makeVectorProcessor(
+ column,
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ columnSelectorFactory
+ );
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ helper.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+ {
+ final Object[] vector = objectSupplier.get();
+ for (int i = startRow; i < endRow; i++) {
+ final Object value = vector[i];
+ if (value != null) {
+ HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(
+ final ByteBuffer buf,
+ final int numRows,
+ final int[] positions,
+ @Nullable final int[] rows,
+ final int positionOffset
+ )
+ {
+ final Object[] vector = objectSupplier.get();
+
+ for (int i = 0; i < numRows; i++) {
+ final Object o = vector[rows != null ? rows[i] : i];
+
+ if (o != null) {
+ final int position = positions[i] + positionOffset;
+ HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), o);
+ }
+ }
+ }
+
+ @Override
+ public Object get(final ByteBuffer buf, final int position)
+ {
+ return helper.get(buf, position);
+ }
+
+ /**
+ * In very rare cases sketches can exceed given memory, request on-heap memory and move there.
+ * We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions.
+ */
+ @Override
+ public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
+ {
+ helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
+ }
+
+ @Override
+ public void close()
+ {
+ helper.clear();
+ }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java
index 74afea3..050cf59 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java
@@ -29,8 +29,11 @@
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
@@ -103,6 +106,24 @@
}
@Override
+ public boolean canVectorize(ColumnInspector columnInspector)
+ {
+ return true;
+ }
+
+ @Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+ {
+ return new HllSketchMergeVectorAggregator(
+ selectorFactory,
+ getFieldName(),
+ getLgK(),
+ TgtHllType.valueOf(getTgtHllType()),
+ getMaxIntermediateSize()
+ );
+ }
+
+ @Override
public int getMaxIntermediateSize()
{
return Union.getMaxSerializationBytes(getLgK());
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
index 7161c25..278a5c5 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
@@ -19,7 +19,6 @@
package org.apache.druid.query.aggregation.datasketches.hll;
-import com.google.common.util.concurrent.Striped;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
@@ -30,8 +29,6 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
/**
* This aggregator merges existing sketches.
@@ -39,24 +36,8 @@
*/
public class HllSketchMergeBufferAggregator implements BufferAggregator
{
-
- /**
- * for locking per buffer position (power of 2 to make index computation faster)
- */
- private static final int NUM_STRIPES = 64;
-
private final ColumnValueSelector<HllSketch> selector;
- private final int lgK;
- private final TgtHllType tgtHllType;
- private final int size;
- private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
-
- /**
- * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image.
- * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The
- * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
- */
- private final byte[] emptyUnion;
+ private final HllSketchMergeBufferAggregatorHelper helper;
public HllSketchMergeBufferAggregator(
final ColumnValueSelector<HllSketch> selector,
@@ -66,39 +47,15 @@
)
{
this.selector = selector;
- this.lgK = lgK;
- this.tgtHllType = tgtHllType;
- this.size = size;
- this.emptyUnion = new byte[size];
-
- //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction)
- new Union(lgK, WritableMemory.wrap(emptyUnion));
+ this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size);
}
@Override
public void init(final ByteBuffer buf, final int position)
{
- // Copy prebuilt empty union object.
- // Not necessary to cache a Union wrapper around the initialized memory, because:
- // - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get".
- // - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the
- // max size and therefore do not need to be potentially moved in-heap.
-
- final int oldPosition = buf.position();
- try {
- buf.position(position);
- buf.put(emptyUnion);
- }
- finally {
- buf.position(oldPosition);
- }
+ helper.init(buf, position);
}
- /**
- * This method uses locks because it can be used during indexing,
- * and Druid can call aggregate() and get() concurrently
- * See https://github.com/druid-io/druid/pull/3956
- */
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
@@ -106,36 +63,18 @@
if (sketch == null) {
return;
}
- final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
- final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).writeLock();
- lock.lock();
- try {
- final Union union = Union.writableWrap(mem);
- union.update(sketch);
- }
- finally {
- lock.unlock();
- }
+
+ final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)
+ .writableRegion(position, helper.getSize());
+
+ final Union union = Union.writableWrap(mem);
+ union.update(sketch);
}
- /**
- * This method uses locks because it can be used during indexing,
- * and Druid can call aggregate() and get() concurrently
- * See https://github.com/druid-io/druid/pull/3956
- */
@Override
public Object get(final ByteBuffer buf, final int position)
{
- final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
- final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).readLock();
- lock.lock();
- try {
- final Union union = Union.writableWrap(mem);
- return union.getResult(tgtHllType);
- }
- finally {
- lock.unlock();
- }
+ return helper.get(buf, position);
}
@Override
@@ -163,6 +102,6 @@
// lgK should be inspected because different execution paths exist in Union.update() that is called from
// @CalledFromHotLoop-annotated aggregate() depending on the lgK.
// See https://github.com/apache/druid/pull/6893#discussion_r250726028
- inspector.visit("lgK", lgK);
+ inspector.visit("lgK", helper.getLgK());
}
}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
new file mode 100644
index 0000000..d6030e8
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.TgtHllType;
+import org.apache.datasketches.hll.Union;
+import org.apache.datasketches.memory.WritableMemory;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class HllSketchMergeBufferAggregatorHelper
+{
+ private final int lgK;
+ private final TgtHllType tgtHllType;
+ private final int size;
+
+ /**
+ * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image.
+ * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The
+ * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
+ */
+ private final byte[] emptyUnion;
+
+ public HllSketchMergeBufferAggregatorHelper(int lgK, TgtHllType tgtHllType, int size)
+ {
+ this.lgK = lgK;
+ this.tgtHllType = tgtHllType;
+ this.size = size;
+ this.emptyUnion = new byte[size];
+
+ //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction)
+ new Union(lgK, WritableMemory.wrap(emptyUnion));
+ }
+
+ /**
+ * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and
+ * {@link org.apache.druid.query.aggregation.VectorAggregator#init}.
+ */
+ public void init(final ByteBuffer buf, final int position)
+ {
+ // Copy prebuilt empty union object.
+ // Not necessary to cache a Union wrapper around the initialized memory, because:
+ // - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get".
+ // - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the
+ // max size and therefore do not need to be potentially moved in-heap.
+
+ final int oldPosition = buf.position();
+ try {
+ buf.position(position);
+ buf.put(emptyUnion);
+ }
+ finally {
+ buf.position(oldPosition);
+ }
+ }
+
+ /**
+ * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
+ * {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
+ */
+ public Object get(ByteBuffer buf, int position)
+ {
+ final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
+ final Union union = Union.writableWrap(mem);
+ return union.getResult(tgtHllType);
+ }
+
+ public int getLgK()
+ {
+ return lgK;
+ }
+
+ public int getSize()
+ {
+ return size;
+ }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
new file mode 100644
index 0000000..d97c0b5
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.TgtHllType;
+import org.apache.datasketches.hll.Union;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.function.Supplier;
+
+public class HllSketchMergeVectorAggregator implements VectorAggregator
+{
+ private final HllSketchMergeBufferAggregatorHelper helper;
+ private final Supplier<Object[]> objectSupplier;
+
+ HllSketchMergeVectorAggregator(
+ final VectorColumnSelectorFactory columnSelectorFactory,
+ final String column,
+ final int lgK,
+ final TgtHllType tgtHllType,
+ final int size
+ )
+ {
+ this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size);
+ this.objectSupplier =
+ ColumnProcessors.makeVectorProcessor(
+ column,
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ columnSelectorFactory
+ );
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ helper.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+ {
+ final Object[] vector = objectSupplier.get();
+
+ final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)
+ .writableRegion(position, helper.getSize());
+
+ final Union union = Union.writableWrap(mem);
+ for (int i = startRow; i < endRow; i++) {
+ union.update((HllSketch) vector[i]);
+ }
+ }
+
+ @Override
+ public void aggregate(
+ final ByteBuffer buf,
+ final int numRows,
+ final int[] positions,
+ @Nullable final int[] rows,
+ final int positionOffset
+ )
+ {
+ final Object[] vector = objectSupplier.get();
+
+ for (int i = 0; i < numRows; i++) {
+ final HllSketch o = (HllSketch) vector[rows != null ? rows[i] : i];
+
+ if (o != null) {
+ final int position = positions[i] + positionOffset;
+
+ final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)
+ .writableRegion(position, helper.getSize());
+
+ final Union union = Union.writableWrap(mem);
+ union.update(o);
+ }
+ }
+ }
+
+ @Override
+ public Object get(final ByteBuffer buf, final int position)
+ {
+ return helper.get(buf, position);
+ }
+
+ @Override
+ public void close()
+ {
+ // Nothing to close.
+ }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
index b5b9ad8..a862265 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
@@ -31,18 +31,18 @@
public class SketchVectorAggregator implements VectorAggregator
{
- private final Supplier<Object[]> toObjectProcessor;
private final SketchBufferAggregatorHelper helper;
+ private final Supplier<Object[]> objectSupplier;
- public SketchVectorAggregator(
- VectorColumnSelectorFactory columnSelectorFactory,
- String column,
- int size,
- int maxIntermediateSize
+ SketchVectorAggregator(
+ final VectorColumnSelectorFactory columnSelectorFactory,
+ final String column,
+ final int size,
+ final int maxIntermediateSize
)
{
this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize);
- this.toObjectProcessor =
+ this.objectSupplier =
ColumnProcessors.makeVectorProcessor(
column,
ToObjectVectorColumnProcessorFactory.INSTANCE,
@@ -60,7 +60,7 @@
public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
{
final Union union = helper.getOrCreateUnion(buf, position);
- final Object[] vector = toObjectProcessor.get();
+ final Object[] vector = objectSupplier.get();
for (int i = startRow; i < endRow; i++) {
final Object o = vector[i];
@@ -79,7 +79,7 @@
final int positionOffset
)
{
- final Object[] vector = toObjectProcessor.get();
+ final Object[] vector = objectSupplier.get();
for (int i = 0; i < numRows; i++) {
final Object o = vector[rows != null ? rows[i] : i];
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
index a299257..afac573 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
@@ -26,6 +26,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.groupby.GroupByQuery;
@@ -54,23 +55,27 @@
private static final boolean ROUND = true;
private final AggregationTestHelper helper;
+ private final QueryContexts.Vectorize vectorize;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
- public HllSketchAggregatorTest(GroupByQueryConfig config)
+ public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize)
{
HllSketchModule.registerSerde();
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
new HllSketchModule().getJacksonModules(), config, tempFolder);
+ this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
}
- @Parameterized.Parameters(name = "{0}")
+ @Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
- constructors.add(new Object[]{config});
+ for (String vectorize : new String[]{"false", "true", "force"}) {
+ constructors.add(new Object[]{config, vectorize});
+ }
}
return constructors;
}
@@ -224,10 +229,32 @@
)
.setPostAggregatorSpecs(
ImmutableList.of(
- new HllSketchToEstimatePostAggregator("estimate", new FieldAccessPostAggregator("f1", "sketch"), false),
- new HllSketchToEstimateWithBoundsPostAggregator("estimateWithBounds", new FieldAccessPostAggregator("f1", "sketch"), 2),
- new HllSketchToStringPostAggregator("summary", new FieldAccessPostAggregator("f1", "sketch")),
- new HllSketchUnionPostAggregator("union", ImmutableList.of(new FieldAccessPostAggregator("f1", "sketch"), new FieldAccessPostAggregator("f2", "sketch")), null, null)
+ new HllSketchToEstimatePostAggregator(
+ "estimate",
+ new FieldAccessPostAggregator("f1", "sketch"),
+ false
+ ),
+ new HllSketchToEstimateWithBoundsPostAggregator(
+ "estimateWithBounds",
+ new FieldAccessPostAggregator(
+ "f1",
+ "sketch"
+ ),
+ 2
+ ),
+ new HllSketchToStringPostAggregator(
+ "summary",
+ new FieldAccessPostAggregator("f1", "sketch")
+ ),
+ new HllSketchUnionPostAggregator(
+ "union",
+ ImmutableList.of(new FieldAccessPostAggregator(
+ "f1",
+ "sketch"
+ ), new FieldAccessPostAggregator("f2", "sketch")),
+ null,
+ null
+ )
)
)
.build()
@@ -320,7 +347,7 @@
);
}
- private static String buildGroupByQueryJson(
+ private String buildGroupByQueryJson(
String aggregationType,
String aggregationFieldName,
boolean aggregationRound
@@ -338,6 +365,7 @@
.put("dimensions", Collections.emptyList())
.put("aggregations", Collections.singletonList(aggregation))
.put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z"))
+ .put("context", ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize.toString()))
.build();
return toJson(object);
}
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
index 039801c..b1faed1 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
@@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -84,34 +85,60 @@
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+@RunWith(Parameterized.class)
public class HllSketchSqlAggregatorTest extends CalciteTestBase
{
private static final String DATA_SOURCE = "foo";
private static final boolean ROUND = true;
- private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
- PlannerContext.CTX_SQL_QUERY_ID, "dummy"
- );
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
@Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER);
-
+
+ private final Map<String, Object> queryContext;
private SpecificSegmentsQuerySegmentWalker walker;
private SqlLifecycleFactory sqlLifecycleFactory;
+ public HllSketchSqlAggregatorTest(final String vectorize)
+ {
+ this.queryContext = ImmutableMap.of(
+ PlannerContext.CTX_SQL_QUERY_ID, "dummy",
+ QueryContexts.VECTORIZE_KEY, vectorize,
+ QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
+ );
+ }
+
+ @Parameterized.Parameters(name = "vectorize = {0}")
+ public static Collection<?> constructorFeeder()
+ {
+ final List<Object[]> constructors = new ArrayList<>();
+ for (String vectorize : new String[]{"false", "true", "force"}) {
+ constructors.add(new Object[]{vectorize});
+ }
+ return constructors;
+ }
+
@BeforeClass
public static void setUpClass()
{
@@ -207,6 +234,9 @@
@Test
public void testApproxCountDistinctHllSketch() throws Exception
{
+ // Can't vectorize due to CONCAT expression.
+ cannotVectorize();
+
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
@@ -222,7 +252,7 @@
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(
sql,
- QUERY_CONTEXT_DEFAULT,
+ queryContext,
DEFAULT_PARAMETERS,
authenticationResult
).toList();
@@ -317,8 +347,9 @@
new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND)
)
)
- .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
- .build(),
+ .context(queryContext)
+ .build()
+ .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
@@ -327,6 +358,9 @@
@Test
public void testAvgDailyCountDistinctHllSketch() throws Exception
{
+ // Can't vectorize due to outer query, which runs on an inline datasource.
+ cannotVectorize();
+
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
@@ -340,7 +374,7 @@
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(
sql,
- QUERY_CONTEXT_DEFAULT,
+ queryContext,
DEFAULT_PARAMETERS,
authenticationResult
).toList();
@@ -379,11 +413,14 @@
new FinalizingFieldAccessPostAggregator("a0", "a0:a")
)
)
- .context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
- ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"),
- "d0"
- ))
+ .context(queryContext)
.build()
+ .withOverriddenContext(
+ BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
+ ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"),
+ "d0"
+ )
+ )
)
)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
@@ -414,7 +451,7 @@
)
)
)
- .setContext(QUERY_CONTEXT_DEFAULT)
+ .setContext(queryContext)
.build();
Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
@@ -437,7 +474,7 @@
// Verify results
final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+ sqlLifecycle.runSimple(sql, queryContext, DEFAULT_PARAMETERS, authenticationResult).toList();
final int expected = NullHandling.replaceWithDefault() ? 1 : 2;
Assert.assertEquals(expected, results.size());
}
@@ -466,7 +503,7 @@
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(
sql,
- QUERY_CONTEXT_DEFAULT,
+ queryContext,
DEFAULT_PARAMETERS,
authenticationResult
).toList();
@@ -598,11 +635,9 @@
new HllSketchToEstimatePostAggregator("p23", new FieldAccessPostAggregator("p22", "a0"), true)
)
)
- .context(ImmutableMap.of(
- "skipEmptyBuckets", true,
- PlannerContext.CTX_SQL_QUERY_ID, "dummy"
- ))
- .build();
+ .context(queryContext)
+ .build()
+ .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
// Verify query
Assert.assertEquals(expectedQuery, actualQuery);
@@ -619,7 +654,7 @@
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(
sql2,
- QUERY_CONTEXT_DEFAULT,
+ queryContext,
DEFAULT_PARAMETERS,
authenticationResult
).toList();
@@ -670,13 +705,19 @@
new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "p0"))
)
)
- .context(ImmutableMap.of(
- "skipEmptyBuckets", true,
- PlannerContext.CTX_SQL_QUERY_ID, "dummy"
- ))
- .build();
+ .context(queryContext)
+ .build()
+ .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
// Verify query
Assert.assertEquals(expectedQuery, actualQuery);
}
+
+ private void cannotVectorize()
+ {
+ if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY))
+ == QueryContexts.Vectorize.FORCE) {
+ expectedException.expectMessage("Cannot vectorize");
+ }
+ }
}
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
index 9201c91..0aac2b0 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
@@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -81,14 +82,20 @@
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+@RunWith(Parameterized.class)
public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
{
private static final String DATA_SOURCE = "foo";
@@ -96,9 +103,6 @@
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
- private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
- PlannerContext.CTX_SQL_QUERY_ID, "dummy"
- );
@BeforeClass
public static void setUpClass()
@@ -114,14 +118,37 @@
}
@Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
+ private final Map<String, Object> queryContext;
private SpecificSegmentsQuerySegmentWalker walker;
private SqlLifecycleFactory sqlLifecycleFactory;
+ public ThetaSketchSqlAggregatorTest(final String vectorize)
+ {
+ this.queryContext = ImmutableMap.of(
+ PlannerContext.CTX_SQL_QUERY_ID, "dummy",
+ QueryContexts.VECTORIZE_KEY, vectorize,
+ QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
+ );
+ }
+
+ @Parameterized.Parameters(name = "vectorize = {0}")
+ public static Collection<?> constructorFeeder()
+ {
+ final List<Object[]> constructors = new ArrayList<>();
+ for (String vectorize : new String[]{"false", "true", "force"}) {
+ constructors.add(new Object[]{vectorize});
+ }
+ return constructors;
+ }
+
@Before
public void setUp() throws Exception
{
@@ -206,21 +233,30 @@
@Test
public void testApproxCountDistinctThetaSketch() throws Exception
{
+ // Cannot vectorize due to SUBSTRING.
+ cannotVectorize();
+
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ " SUM(cnt),\n"
- + " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n" // uppercase
- + " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered
- + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn
- + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression
- + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n" // on native theta sketch column
- + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n" // on native theta sketch column
+ + " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n"
+ // uppercase
+ + " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n"
+ // lowercase; also, filtered
+ + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n"
+ // on extractionFn
+ + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n"
+ // on expression
+ + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n"
+ // on native theta sketch column
+ + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n"
+ // on native theta sketch column
+ "FROM druid.foo";
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(
sql,
- QUERY_CONTEXT_DEFAULT,
+ queryContext,
DEFAULT_PARAMETERS,
authenticationResult
).toList();
@@ -319,8 +355,9 @@
new SketchMergeAggregatorFactory("a6", "thetasketch_dim1", null, null, null, null)
)
)
- .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
- .build(),
+ .context(queryContext)
+ .build()
+ .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
@@ -328,6 +365,9 @@
@Test
public void testAvgDailyCountDistinctThetaSketch() throws Exception
{
+ // Can't vectorize due to outer query (it operates on an inlined data source, which cannot be vectorized).
+ cannotVectorize();
+
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
@@ -337,7 +377,7 @@
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(
sql,
- QUERY_CONTEXT_DEFAULT,
+ queryContext,
DEFAULT_PARAMETERS,
authenticationResult
).toList();
@@ -358,7 +398,11 @@
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(
Filtration.eternity()
)))
- .granularity(new PeriodGranularity(Period.days(1), null, DateTimeZone.UTC))
+ .granularity(new PeriodGranularity(
+ Period.days(1),
+ null,
+ DateTimeZone.UTC
+ ))
.aggregators(
Collections.singletonList(
new SketchMergeAggregatorFactory(
@@ -373,14 +417,25 @@
)
.postAggregators(
ImmutableList.of(
- new FinalizingFieldAccessPostAggregator("a0", "a0:a")
+ new FinalizingFieldAccessPostAggregator(
+ "a0",
+ "a0:a"
+ )
)
)
- .context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
- ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"),
- "d0"
- ))
+ .context(queryContext)
.build()
+ .withOverriddenContext(
+ BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
+ ImmutableMap.of(
+ "skipEmptyBuckets",
+ true,
+ "sqlQueryId",
+ "dummy"
+ ),
+ "d0"
+ )
+ )
)
)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
@@ -388,8 +443,8 @@
.setAggregatorSpecs(
NullHandling.replaceWithDefault()
? Arrays.asList(
- new LongSumAggregatorFactory("_a0:sum", "a0"),
- new CountAggregatorFactory("_a0:count")
+ new LongSumAggregatorFactory("_a0:sum", "a0"),
+ new CountAggregatorFactory("_a0:count")
)
: Arrays.asList(
new LongSumAggregatorFactory("_a0:sum", "a0"),
@@ -411,7 +466,7 @@
)
)
)
- .setContext(QUERY_CONTEXT_DEFAULT)
+ .setContext(queryContext)
.build();
Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
@@ -439,7 +494,7 @@
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(
sql,
- QUERY_CONTEXT_DEFAULT,
+ queryContext,
DEFAULT_PARAMETERS,
authenticationResult
).toList();
@@ -598,8 +653,9 @@
null
)
)
- .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
- .build();
+ .context(queryContext)
+ .build()
+ .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
// Verify query
@@ -617,7 +673,7 @@
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(
sql2,
- QUERY_CONTEXT_DEFAULT,
+ queryContext,
DEFAULT_PARAMETERS,
authenticationResult
).toList();
@@ -664,11 +720,19 @@
null
)
)
- .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
- .build();
-
+ .context(queryContext)
+ .build()
+ .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
// Verify query
Assert.assertEquals(expectedQuery, actualQuery);
}
+
+ private void cannotVectorize()
+ {
+ if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY))
+ == QueryContexts.Vectorize.FORCE) {
+ expectedException.expectMessage("Cannot vectorize");
+ }
+ }
}