Vectorize the DataSketches quantiles aggregator. (#11183)
* Vectorize the DataSketches quantiles aggregator.
Also removes synchronization for the BufferAggregator and VectorAggregator
implementations, since it is not necessary (similar to #11115).
Extends DoublesSketchAggregatorTest and DoublesSketchSqlAggregatorTest
to run all test cases in vectorized mode.
* Style fix.
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
index 2ea6a2a..27da4a2 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
@@ -32,11 +32,22 @@
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.VectorColumnProcessorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -63,7 +74,8 @@
public DoublesSketchAggregatorFactory(
@JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName,
- @JsonProperty("k") final Integer k)
+ @JsonProperty("k") final Integer k
+ )
{
this(name, fieldName, k, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID);
}
@@ -106,7 +118,7 @@
{
if (metricFactory.getColumnCapabilities(fieldName) != null
&& ValueType.isNumeric(metricFactory.getColumnCapabilities(fieldName).getType())) {
- final ColumnValueSelector<Double> selector = metricFactory.makeColumnValueSelector(fieldName);
+ final BaseDoubleColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
if (selector instanceof NilColumnValueSelector) {
return new NoopDoublesSketchBufferAggregator();
}
@@ -120,6 +132,65 @@
}
@Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+ {
+ return ColumnProcessors.makeVectorProcessor(
+ fieldName,
+ new VectorColumnProcessorFactory<VectorAggregator>()
+ {
+ @Override
+ public VectorAggregator makeSingleValueDimensionProcessor(
+ ColumnCapabilities capabilities,
+ SingleValueDimensionVectorSelector selector
+ )
+ {
+ return new NoopDoublesSketchBufferAggregator();
+ }
+
+ @Override
+ public VectorAggregator makeMultiValueDimensionProcessor(
+ ColumnCapabilities capabilities,
+ MultiValueDimensionVectorSelector selector
+ )
+ {
+ return new NoopDoublesSketchBufferAggregator();
+ }
+
+ @Override
+ public VectorAggregator makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+ {
+ return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
+ }
+
+ @Override
+ public VectorAggregator makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+ {
+ return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
+ }
+
+ @Override
+ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+ {
+ return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
+ }
+
+ @Override
+ public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
+ {
+ return new DoublesSketchMergeVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
+ }
+ },
+ selectorFactory
+ );
+ }
+
+ @Override
+ public boolean canVectorize(ColumnInspector columnInspector)
+ {
+ return true;
+ }
+
+ @Override
public Object deserialize(final Object object)
{
return DoublesSketchOperations.deserialize(object);
@@ -217,8 +288,9 @@
new DoublesSketchAggregatorFactory(
fieldName,
fieldName,
- k)
- );
+ k
+ )
+ );
}
@Override
@@ -306,10 +378,10 @@
public String toString()
{
return getClass().getSimpleName() + "{"
- + "name=" + name
- + ", fieldName=" + fieldName
- + ", k=" + k
- + "}";
+ + "name=" + name
+ + ", fieldName=" + fieldName
+ + ", k=" + k
+ + "}";
}
}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
index e330310..74be19b 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
@@ -19,60 +19,52 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import org.apache.datasketches.memory.WritableMemory;
-import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.IdentityHashMap;
public class DoublesSketchBuildBufferAggregator implements BufferAggregator
{
- private final ColumnValueSelector<Double> selector;
- private final int size;
- private final int maxIntermediateSize;
+ private final BaseDoubleColumnValueSelector selector;
+ private final DoublesSketchBuildBufferAggregatorHelper helper;
- private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
- private final IdentityHashMap<ByteBuffer, Int2ObjectMap<UpdateDoublesSketch>> sketches = new IdentityHashMap<>();
-
- public DoublesSketchBuildBufferAggregator(final ColumnValueSelector<Double> valueSelector, final int size,
- final int maxIntermediateSize)
+ public DoublesSketchBuildBufferAggregator(
+ final BaseDoubleColumnValueSelector valueSelector,
+ final int size,
+ final int maxIntermediateSize
+ )
{
this.selector = valueSelector;
- this.size = size;
- this.maxIntermediateSize = maxIntermediateSize;
+ this.helper = new DoublesSketchBuildBufferAggregatorHelper(size, maxIntermediateSize);
}
@Override
- public synchronized void init(final ByteBuffer buffer, final int position)
+ public void init(ByteBuffer buf, int position)
{
- final WritableMemory mem = getMemory(buffer);
- final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
- final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region);
- putSketch(buffer, position, sketch);
+ helper.init(buf, position);
}
@Override
- public synchronized void aggregate(final ByteBuffer buffer, final int position)
+ public void aggregate(final ByteBuffer buffer, final int position)
{
if (selector.isNull()) {
return;
}
- final UpdateDoublesSketch sketch = sketches.get(buffer).get(position);
+
+ final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, position);
sketch.update(selector.getDouble());
}
+ @Nullable
@Override
- public synchronized Object get(final ByteBuffer buffer, final int position)
+ public Object get(ByteBuffer buf, int position)
{
- return sketches.get(buffer).get(position).compact();
+ return helper.get(buf, position);
}
@Override
@@ -88,42 +80,17 @@
}
@Override
- public synchronized void close()
+ public void close()
{
- sketches.clear();
- memCache.clear();
+ helper.clear();
}
// A small number of sketches may run out of the given memory, request more memory on heap and move there.
// In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
@Override
- public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
- UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition);
- final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
- if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap
- final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
- sketch = UpdateDoublesSketch.wrap(newRegion);
- }
- putSketch(newBuffer, newPosition, sketch);
-
- final Int2ObjectMap<UpdateDoublesSketch> map = sketches.get(oldBuffer);
- map.remove(oldPosition);
- if (map.isEmpty()) {
- sketches.remove(oldBuffer);
- memCache.remove(oldBuffer);
- }
- }
-
- private WritableMemory getMemory(final ByteBuffer buffer)
- {
- return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
- }
-
- private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)
- {
- Int2ObjectMap<UpdateDoublesSketch> map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
- map.put(position, sketch);
+ helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
}
@Override
@@ -131,5 +98,4 @@
{
inspector.visit("selector", selector);
}
-
}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java
new file mode 100644
index 0000000..f5286bd
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregatorHelper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.datasketches.quantiles.CompactDoublesSketch;
+import org.apache.datasketches.quantiles.DoublesSketch;
+import org.apache.datasketches.quantiles.UpdateDoublesSketch;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.IdentityHashMap;
+
+public class DoublesSketchBuildBufferAggregatorHelper
+{
+ private final int size;
+ private final int maxIntermediateSize;
+ private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
+ private final IdentityHashMap<ByteBuffer, Int2ObjectMap<UpdateDoublesSketch>> sketches = new IdentityHashMap<>();
+
+ public DoublesSketchBuildBufferAggregatorHelper(final int size, final int maxIntermediateSize)
+ {
+ this.size = size;
+ this.maxIntermediateSize = maxIntermediateSize;
+ }
+
+ public void init(final ByteBuffer buffer, final int position)
+ {
+ final WritableMemory mem = getMemory(buffer);
+ final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+ final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region);
+ putSketch(buffer, position, sketch);
+ }
+
+ public CompactDoublesSketch get(final ByteBuffer buffer, final int position)
+ {
+ return sketches.get(buffer).get(position).compact();
+ }
+
+ // A small number of sketches may run out of the given memory, request more memory on heap and move there.
+ // In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
+ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+ {
+ UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition);
+ final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
+ if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap
+ final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
+ sketch = UpdateDoublesSketch.wrap(newRegion);
+ }
+ putSketch(newBuffer, newPosition, sketch);
+
+ final Int2ObjectMap<UpdateDoublesSketch> map = sketches.get(oldBuffer);
+ map.remove(oldPosition);
+ if (map.isEmpty()) {
+ sketches.remove(oldBuffer);
+ memCache.remove(oldBuffer);
+ }
+ }
+
+ public void clear()
+ {
+ sketches.clear();
+ memCache.clear();
+ }
+
+ /**
+ * Retrieves the sketch at a particular position.
+ */
+ public UpdateDoublesSketch getSketchAtPosition(final ByteBuffer buf, final int position)
+ {
+ return sketches.get(buf).get(position);
+ }
+
+ private WritableMemory getMemory(final ByteBuffer buffer)
+ {
+ return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
+ }
+
+ private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)
+ {
+ Int2ObjectMap<UpdateDoublesSketch> map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
+ map.put(position, sketch);
+ }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java
new file mode 100644
index 0000000..af29c5b
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import org.apache.datasketches.quantiles.UpdateDoublesSketch;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class DoublesSketchBuildVectorAggregator implements VectorAggregator
+{
+ private final VectorValueSelector selector;
+ private final DoublesSketchBuildBufferAggregatorHelper helper;
+
+ DoublesSketchBuildVectorAggregator(
+ final VectorValueSelector selector,
+ final int size,
+ final int maxIntermediateSize
+ )
+ {
+ this.selector = selector;
+ this.helper = new DoublesSketchBuildBufferAggregatorHelper(size, maxIntermediateSize);
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ helper.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+ {
+ final double[] doubles = selector.getDoubleVector();
+ final boolean[] nulls = selector.getNullVector();
+
+ final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buf, position);
+
+ for (int i = startRow; i < endRow; i++) {
+ if (nulls == null || !nulls[i]) {
+ sketch.update(doubles[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(
+ final ByteBuffer buf,
+ final int numRows,
+ final int[] positions,
+ @Nullable final int[] rows,
+ final int positionOffset
+ )
+ {
+ final double[] doubles = selector.getDoubleVector();
+ final boolean[] nulls = selector.getNullVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final int idx = rows != null ? rows[i] : i;
+
+ if (nulls == null || !nulls[idx]) {
+ final int position = positions[i] + positionOffset;
+ helper.getSketchAtPosition(buf, position).update(doubles[idx]);
+ }
+ }
+ }
+
+ @Override
+ public Object get(final ByteBuffer buf, final int position)
+ {
+ return helper.get(buf, position);
+ }
+
+ @Override
+ public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
+ {
+ helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
+ }
+
+ @Override
+ public void close()
+ {
+ helper.clear();
+ }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
index 41ae453..c788adc 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregator.java
@@ -19,57 +19,44 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import org.apache.datasketches.memory.WritableMemory;
-import org.apache.datasketches.quantiles.DoublesUnion;
+import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.IdentityHashMap;
public class DoublesSketchMergeBufferAggregator implements BufferAggregator
{
-
- private final ColumnValueSelector selector;
- private final int k;
- private final int maxIntermediateSize;
- private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
- private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();
+ private final ColumnValueSelector<DoublesSketch> selector;
+ private final DoublesSketchMergeBufferAggregatorHelper helper;
public DoublesSketchMergeBufferAggregator(
- final ColumnValueSelector selector,
+ final ColumnValueSelector<DoublesSketch> selector,
final int k,
- final int maxIntermediateSize)
+ final int maxIntermediateSize
+ )
{
this.selector = selector;
- this.k = k;
- this.maxIntermediateSize = maxIntermediateSize;
+ this.helper = new DoublesSketchMergeBufferAggregatorHelper(k, maxIntermediateSize);
}
@Override
- public synchronized void init(final ByteBuffer buffer, final int position)
+ public void init(final ByteBuffer buffer, final int position)
{
- final WritableMemory mem = getMemory(buffer);
- final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
- final DoublesUnion union = DoublesUnion.builder().setMaxK(k).build(region);
- putUnion(buffer, position, union);
+ helper.init(buffer, position);
}
@Override
- public synchronized void aggregate(final ByteBuffer buffer, final int position)
+ public void aggregate(final ByteBuffer buffer, final int position)
{
- final DoublesUnion union = unions.get(buffer).get(position);
- DoublesSketchMergeAggregator.updateUnion(selector, union);
+ DoublesSketchMergeAggregator.updateUnion(selector, helper.getSketchAtPosition(buffer, position));
}
@Override
- public synchronized Object get(final ByteBuffer buffer, final int position)
+ public Object get(final ByteBuffer buffer, final int position)
{
- return unions.get(buffer).get(position).getResult();
+ return helper.getSketchAtPosition(buffer, position).getResult();
}
@Override
@@ -87,8 +74,7 @@
@Override
public synchronized void close()
{
- unions.clear();
- memCache.clear();
+ helper.clear();
}
// A small number of sketches may run out of the given memory, request more memory on heap and move there.
@@ -96,31 +82,7 @@
@Override
public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
- DoublesUnion union = unions.get(oldBuffer).get(oldPosition);
- final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
- if (union.isSameResource(oldMem)) { // union was not relocated on heap
- final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
- union = DoublesUnion.wrap(newMem);
- }
- putUnion(newBuffer, newPosition, union);
-
- Int2ObjectMap<DoublesUnion> map = unions.get(oldBuffer);
- map.remove(oldPosition);
- if (map.isEmpty()) {
- unions.remove(oldBuffer);
- memCache.remove(oldBuffer);
- }
- }
-
- private WritableMemory getMemory(final ByteBuffer buffer)
- {
- return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
- }
-
- private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union)
- {
- Int2ObjectMap<DoublesUnion> map = unions.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
- map.put(position, union);
+ helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
}
@Override
@@ -128,5 +90,4 @@
{
inspector.visit("selector", selector);
}
-
}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java
new file mode 100644
index 0000000..378e41d
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeBufferAggregatorHelper.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.datasketches.quantiles.DoublesUnion;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.IdentityHashMap;
+
+public class DoublesSketchMergeBufferAggregatorHelper
+{
+ private final int k;
+ private final int maxIntermediateSize;
+ private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
+ private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();
+
+ public DoublesSketchMergeBufferAggregatorHelper(
+ final int k,
+ final int maxIntermediateSize
+ )
+ {
+ this.k = k;
+ this.maxIntermediateSize = maxIntermediateSize;
+ }
+
+ public void init(final ByteBuffer buffer, final int position)
+ {
+ final WritableMemory mem = getMemory(buffer);
+ final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+ final DoublesUnion union = DoublesUnion.builder().setMaxK(k).build(region);
+ putUnion(buffer, position, union);
+ }
+
+ public Object get(final ByteBuffer buffer, final int position)
+ {
+ return unions.get(buffer).get(position).getResult();
+ }
+
+ public void clear()
+ {
+ unions.clear();
+ memCache.clear();
+ }
+
+ // A small number of sketches may run out of the given memory, request more memory on heap and move there.
+ // In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
+ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+ {
+ DoublesUnion union = unions.get(oldBuffer).get(oldPosition);
+ final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
+ if (union.isSameResource(oldMem)) { // union was not relocated on heap
+ final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
+ union = DoublesUnion.wrap(newMem);
+ }
+ putUnion(newBuffer, newPosition, union);
+
+ Int2ObjectMap<DoublesUnion> map = unions.get(oldBuffer);
+ map.remove(oldPosition);
+ if (map.isEmpty()) {
+ unions.remove(oldBuffer);
+ memCache.remove(oldBuffer);
+ }
+ }
+
+ /**
+ * Retrieves the sketch at a particular position.
+ */
+ public DoublesUnion getSketchAtPosition(final ByteBuffer buf, final int position)
+ {
+ return unions.get(buf).get(position);
+ }
+
+ private WritableMemory getMemory(final ByteBuffer buffer)
+ {
+ return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
+ }
+
+ private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union)
+ {
+ Int2ObjectMap<DoublesUnion> map = unions.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
+ map.put(position, union);
+ }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java
new file mode 100644
index 0000000..8a8e10b
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import org.apache.datasketches.quantiles.DoublesSketch;
+import org.apache.datasketches.quantiles.DoublesUnion;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class DoublesSketchMergeVectorAggregator implements VectorAggregator
+{
+ private final VectorObjectSelector selector;
+ private final DoublesSketchMergeBufferAggregatorHelper helper;
+
+ public DoublesSketchMergeVectorAggregator(
+ final VectorObjectSelector selector,
+ final int k,
+ final int maxIntermediateSize
+ )
+ {
+ this.selector = selector;
+ this.helper = new DoublesSketchMergeBufferAggregatorHelper(k, maxIntermediateSize);
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ helper.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+ {
+ final Object[] vector = selector.getObjectVector();
+
+ final DoublesUnion union = helper.getSketchAtPosition(buf, position);
+
+ for (int i = startRow; i < endRow; i++) {
+ final DoublesSketch sketch = (DoublesSketch) vector[i];
+ if (sketch != null) {
+ union.update(sketch);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(
+ final ByteBuffer buf,
+ final int numRows,
+ final int[] positions,
+ @Nullable final int[] rows,
+ final int positionOffset
+ )
+ {
+ final Object[] vector = selector.getObjectVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i];
+
+ if (sketch != null) {
+ final int position = positions[i] + positionOffset;
+ final DoublesUnion union = helper.getSketchAtPosition(buf, position);
+ union.update(sketch);
+ }
+ }
+ }
+
+ @Nullable
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return helper.get(buf, position);
+ }
+
+ @Override
+ public void close()
+ {
+ helper.clear();
+ }
+
+ @Override
+ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+ {
+ helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
+ }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java
index cbb40c4..e30fb9b 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchOperations.java
@@ -41,8 +41,9 @@
return (DoublesSketch) serializedSketch;
}
throw new ISE(
- "Object is not of a type that can be deserialized to a quantiles DoublsSketch: "
- + serializedSketch.getClass());
+ "Object is not of a type that can be deserialized to a quantiles DoublesSketch: %s",
+ serializedSketch == null ? "null" : serializedSketch.getClass()
+ );
}
public static DoublesSketch deserializeFromBase64EncodedString(final String str)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java
index 5f7808c..2a0a40b 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/NoopDoublesSketchBufferAggregator.java
@@ -20,20 +20,42 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
-public class NoopDoublesSketchBufferAggregator implements BufferAggregator
+public class NoopDoublesSketchBufferAggregator implements BufferAggregator, VectorAggregator
{
@Override
public void init(final ByteBuffer buf, final int position)
{
+ // Nothing to do.
}
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
+ // Nothing to do.
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ // Nothing to do.
+ }
+
+ @Override
+ public void aggregate(
+ ByteBuffer buf,
+ int numRows,
+ int[] positions,
+ @Nullable int[] rows,
+ int positionOffset
+ )
+ {
+ // Nothing to do.
}
@Override
@@ -55,12 +77,20 @@
}
@Override
+ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+ {
+ // Nothing to do.
+ }
+
+ @Override
public void close()
{
+ // Nothing to do.
}
@Override
public void inspectRuntimeShape(final RuntimeShapeInspector inspector)
{
+ // Nothing to do.
}
}
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
index c0a28d3..8152369 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
@@ -20,10 +20,12 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQueryConfig;
@@ -54,24 +56,29 @@
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
- public DoublesSketchAggregatorTest(final GroupByQueryConfig config)
+ public DoublesSketchAggregatorTest(final GroupByQueryConfig config, final String vectorize)
{
DoublesSketchModule.registerSerde();
DoublesSketchModule module = new DoublesSketchModule();
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
- module.getJacksonModules(), config, tempFolder);
+ module.getJacksonModules(),
+ config,
+ tempFolder
+ ).withQueryContext(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize));
timeSeriesHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
module.getJacksonModules(),
tempFolder
- );
+ ).withQueryContext(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize));
}
- @Parameterized.Parameters(name = "{0}")
+ @Parameterized.Parameters(name = "groupByConfig = {0}, vectorize = {1}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
- constructors.add(new Object[]{config});
+ for (String vectorize : new String[]{"false", "true", "force"}) {
+ constructors.add(new Object[]{config, vectorize});
+ }
}
return constructors;
}
@@ -381,7 +388,11 @@
// post agg with nulls
Object quantileObjectWithNulls = row.get(5);
Assert.assertTrue(quantileObjectWithNulls instanceof Double);
- Assert.assertEquals(NullHandling.replaceWithDefault() ? 7.4 : 7.5, (double) quantileObjectWithNulls, 0.1); // median value
+ Assert.assertEquals(
+ NullHandling.replaceWithDefault() ? 7.4 : 7.5,
+ (double) quantileObjectWithNulls,
+ 0.1
+ ); // median value
// post agg with nulls
Object quantilesObjectWithNulls = row.get(6);
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
index c6c8e13..647f300 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
@@ -23,12 +23,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
-import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -70,16 +68,15 @@
import org.apache.druid.sql.http.SqlParameter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
{
- private static final AuthenticationResult AUTH_RESULT = CalciteTests.REGULAR_USER_AUTH_RESULT;
private static final DruidOperatorTable OPERATOR_TABLE = new DruidOperatorTable(
ImmutableSet.of(
new DoublesSketchApproxQuantileSqlAggregator(),
@@ -172,166 +169,136 @@
@Test
public void testQuantileOnFloatAndLongs() throws Exception
{
- SqlLifecycle sqlLifecycle = getSqlLifecycle();
- final String sql = "SELECT\n"
- + "APPROX_QUANTILE_DS(m1, 0.01),\n"
- + "APPROX_QUANTILE_DS(m1, 0.5, 64),\n"
- + "APPROX_QUANTILE_DS(m1, 0.98, 256),\n"
- + "APPROX_QUANTILE_DS(m1, 0.99),\n"
- + "APPROX_QUANTILE_DS(m1 * 2, 0.97),\n"
- + "APPROX_QUANTILE_DS(m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
- + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
- + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 = 'abc'),\n"
- + "APPROX_QUANTILE_DS(cnt, 0.5)\n"
- + "FROM foo";
-
- // Verify results
- final List<Object[]> results = sqlLifecycle.runSimple(
- sql,
- TIMESERIES_CONTEXT_DEFAULT,
- DEFAULT_PARAMETERS,
- AUTH_RESULT
- ).toList();
- final List<Object[]> expectedResults = ImmutableList.of(
- new Object[]{
- 1.0,
- 4.0,
- 6.0,
- 6.0,
- 12.0,
- 6.0,
- 5.0,
- 6.0,
- 1.0
- }
- );
- Assert.assertEquals(expectedResults.size(), results.size());
- for (int i = 0; i < expectedResults.size(); i++) {
- Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
- }
-
- // Verify query
- Assert.assertEquals(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
- .granularity(Granularities.ALL)
- .virtualColumns(
- new ExpressionVirtualColumn(
- "v0",
- "(\"m1\" * 2)",
- ValueType.FLOAT,
- TestExprMacroTable.INSTANCE
+ testQuery(
+ "SELECT\n"
+ + "APPROX_QUANTILE_DS(m1, 0.01),\n"
+ + "APPROX_QUANTILE_DS(m1, 0.5, 64),\n"
+ + "APPROX_QUANTILE_DS(m1, 0.98, 256),\n"
+ + "APPROX_QUANTILE_DS(m1, 0.99),\n"
+ + "APPROX_QUANTILE_DS(m1 * 2, 0.97),\n"
+ + "APPROX_QUANTILE_DS(m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
+ + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
+ + "APPROX_QUANTILE_DS(m1, 0.999) FILTER(WHERE dim1 = 'abc'),\n"
+ + "APPROX_QUANTILE_DS(cnt, 0.5)\n"
+ + "FROM foo",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .virtualColumns(
+ new ExpressionVirtualColumn(
+ "v0",
+ "(\"m1\" * 2)",
+ ValueType.FLOAT,
+ TestExprMacroTable.INSTANCE
+ )
)
- )
- .aggregators(ImmutableList.of(
- new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
- new DoublesSketchAggregatorFactory("a1:agg", "m1", 64),
- new DoublesSketchAggregatorFactory("a2:agg", "m1", 256),
- new DoublesSketchAggregatorFactory("a4:agg", "v0", null),
- new FilteredAggregatorFactory(
- new DoublesSketchAggregatorFactory("a5:agg", "m1", null),
- new SelectorDimFilter("dim1", "abc", null)
- ),
- new FilteredAggregatorFactory(
- new DoublesSketchAggregatorFactory("a6:agg", "m1", null),
- new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
- ),
- new DoublesSketchAggregatorFactory("a8:agg", "cnt", null)
- ))
- .postAggregators(
- new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
- new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
- new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
- new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
- new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f),
- new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f),
- new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f),
- new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f),
- new DoublesSketchToQuantilePostAggregator("a8", makeFieldAccessPostAgg("a8:agg"), 0.50f)
- )
- .context(TIMESERIES_CONTEXT_DEFAULT)
- .build(),
- Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+ .aggregators(ImmutableList.of(
+ new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
+ new DoublesSketchAggregatorFactory("a1:agg", "m1", 64),
+ new DoublesSketchAggregatorFactory("a2:agg", "m1", 256),
+ new DoublesSketchAggregatorFactory("a4:agg", "v0", null),
+ new FilteredAggregatorFactory(
+ new DoublesSketchAggregatorFactory("a5:agg", "m1", null),
+ new SelectorDimFilter("dim1", "abc", null)
+ ),
+ new FilteredAggregatorFactory(
+ new DoublesSketchAggregatorFactory("a6:agg", "m1", null),
+ new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
+ ),
+ new DoublesSketchAggregatorFactory("a8:agg", "cnt", null)
+ ))
+ .postAggregators(
+ new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
+ new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
+ new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
+ new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
+ new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.97f),
+ new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.99f),
+ new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a6:agg"), 0.999f),
+ new DoublesSketchToQuantilePostAggregator("a7", makeFieldAccessPostAgg("a5:agg"), 0.999f),
+ new DoublesSketchToQuantilePostAggregator("a8", makeFieldAccessPostAgg("a8:agg"), 0.50f)
+ )
+ .context(TIMESERIES_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{
+ 1.0,
+ 4.0,
+ 6.0,
+ 6.0,
+ 12.0,
+ 6.0,
+ 5.0,
+ 6.0,
+ 1.0
+ }
+ )
);
}
@Test
public void testQuantileOnComplexColumn() throws Exception
{
- SqlLifecycle lifecycle = getSqlLifecycle();
- final String sql = "SELECT\n"
- + "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n"
- + "APPROX_QUANTILE_DS(qsketch_m1, 0.5, 64),\n"
- + "APPROX_QUANTILE_DS(qsketch_m1, 0.98, 256),\n"
- + "APPROX_QUANTILE_DS(qsketch_m1, 0.99),\n"
- + "APPROX_QUANTILE_DS(qsketch_m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
- + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
- + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n"
- + "FROM foo";
-
- // Verify results
- final List<Object[]> results = lifecycle.runSimple(
- sql,
- TIMESERIES_CONTEXT_DEFAULT,
- DEFAULT_PARAMETERS,
- AUTH_RESULT
- ).toList();
- final List<Object[]> expectedResults = ImmutableList.of(
- new Object[]{
- 1.0,
- 4.0,
- 6.0,
- 6.0,
- 6.0,
- 5.0,
- 6.0
- }
- );
- Assert.assertEquals(expectedResults.size(), results.size());
- for (int i = 0; i < expectedResults.size(); i++) {
- Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
- }
-
- // Verify query
- Assert.assertEquals(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
- .granularity(Granularities.ALL)
- .aggregators(ImmutableList.of(
- new DoublesSketchAggregatorFactory("a0:agg", "qsketch_m1", null),
- new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", 64),
- new DoublesSketchAggregatorFactory("a2:agg", "qsketch_m1", 256),
- new FilteredAggregatorFactory(
- new DoublesSketchAggregatorFactory("a4:agg", "qsketch_m1", null),
- new SelectorDimFilter("dim1", "abc", null)
- ),
- new FilteredAggregatorFactory(
- new DoublesSketchAggregatorFactory("a5:agg", "qsketch_m1", null),
- new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
+ testQuery(
+ "SELECT\n"
+ + "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n"
+ + "APPROX_QUANTILE_DS(qsketch_m1, 0.5, 64),\n"
+ + "APPROX_QUANTILE_DS(qsketch_m1, 0.98, 256),\n"
+ + "APPROX_QUANTILE_DS(qsketch_m1, 0.99),\n"
+ + "APPROX_QUANTILE_DS(qsketch_m1, 0.99) FILTER(WHERE dim1 = 'abc'),\n"
+ + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 <> 'abc'),\n"
+ + "APPROX_QUANTILE_DS(qsketch_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n"
+ + "FROM foo",
+ ImmutableList.of(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .aggregators(ImmutableList.of(
+ new DoublesSketchAggregatorFactory("a0:agg", "qsketch_m1", null),
+ new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", 64),
+ new DoublesSketchAggregatorFactory("a2:agg", "qsketch_m1", 256),
+ new FilteredAggregatorFactory(
+ new DoublesSketchAggregatorFactory("a4:agg", "qsketch_m1", null),
+ new SelectorDimFilter("dim1", "abc", null)
+ ),
+ new FilteredAggregatorFactory(
+ new DoublesSketchAggregatorFactory("a5:agg", "qsketch_m1", null),
+ new NotDimFilter(new SelectorDimFilter("dim1", "abc", null))
+ )
+ ))
+ .postAggregators(
+ new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
+ new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
+ new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
+ new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
+ new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.99f),
+ new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.999f),
+ new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a4:agg"), 0.999f)
)
- ))
- .postAggregators(
- new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
- new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f),
- new DoublesSketchToQuantilePostAggregator("a2", makeFieldAccessPostAgg("a2:agg"), 0.98f),
- new DoublesSketchToQuantilePostAggregator("a3", makeFieldAccessPostAgg("a0:agg"), 0.99f),
- new DoublesSketchToQuantilePostAggregator("a4", makeFieldAccessPostAgg("a4:agg"), 0.99f),
- new DoublesSketchToQuantilePostAggregator("a5", makeFieldAccessPostAgg("a5:agg"), 0.999f),
- new DoublesSketchToQuantilePostAggregator("a6", makeFieldAccessPostAgg("a4:agg"), 0.999f)
- )
- .context(TIMESERIES_CONTEXT_DEFAULT)
- .build(),
- Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+ .context(TIMESERIES_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{
+ 1.0,
+ 4.0,
+ 6.0,
+ 6.0,
+ 6.0,
+ 5.0,
+ 6.0
+ }
+ )
);
}
@Test
public void testQuantileOnCastedString() throws Exception
{
- cannotVectorize();
-
final List<Object[]> expectedResults;
if (NullHandling.replaceWithDefault()) {
expectedResults = ImmutableList.of(
@@ -425,92 +392,72 @@
@Test
public void testQuantileOnInnerQuery() throws Exception
{
- SqlLifecycle sqlLifecycle = getSqlLifecycle();
- final String sql = "SELECT AVG(x), APPROX_QUANTILE_DS(x, 0.98)\n"
- + "FROM (SELECT dim2, SUM(m1) AS x FROM foo GROUP BY dim2)";
-
- // Verify results
- final List<Object[]> results = sqlLifecycle.runSimple(
- sql,
- QUERY_CONTEXT_DEFAULT,
- DEFAULT_PARAMETERS,
- AUTH_RESULT
- ).toList();
final List<Object[]> expectedResults;
if (NullHandling.replaceWithDefault()) {
expectedResults = ImmutableList.of(new Object[]{7.0, 11.0});
} else {
expectedResults = ImmutableList.of(new Object[]{5.25, 8.0});
}
- Assert.assertEquals(expectedResults.size(), results.size());
- for (int i = 0; i < expectedResults.size(); i++) {
- Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
- }
- // Verify query
- Assert.assertEquals(
- GroupByQuery.builder()
- .setDataSource(
- new QueryDataSource(
- GroupByQuery.builder()
- .setDataSource(CalciteTests.DATASOURCE1)
- .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
- .setGranularity(Granularities.ALL)
- .setDimensions(new DefaultDimensionSpec("dim2", "d0"))
- .setAggregatorSpecs(
- ImmutableList.of(
- new DoubleSumAggregatorFactory("a0", "m1")
+ testQuery(
+ "SELECT AVG(x), APPROX_QUANTILE_DS(x, 0.98)\n"
+ + "FROM (SELECT dim2, SUM(m1) AS x FROM foo GROUP BY dim2)",
+ Collections.singletonList(
+ GroupByQuery.builder()
+ .setDataSource(
+ new QueryDataSource(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(new DefaultDimensionSpec("dim2", "d0"))
+ .setAggregatorSpecs(
+ ImmutableList.of(
+ new DoubleSumAggregatorFactory("a0", "m1")
+ )
)
- )
- .setContext(QUERY_CONTEXT_DEFAULT)
- .build()
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
)
- )
- .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
- .setGranularity(Granularities.ALL)
- .setAggregatorSpecs(
- new DoubleSumAggregatorFactory("_a0:sum", "a0"),
- new CountAggregatorFactory("_a0:count"),
- new DoublesSketchAggregatorFactory(
- "_a1:agg",
- "a0",
- null
+ .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(
+ new DoubleSumAggregatorFactory("_a0:sum", "a0"),
+ new CountAggregatorFactory("_a0:count"),
+ new DoublesSketchAggregatorFactory(
+ "_a1:agg",
+ "a0",
+ null
+ )
)
- )
- .setPostAggregatorSpecs(
- ImmutableList.of(
- new ArithmeticPostAggregator(
- "_a0",
- "quotient",
- ImmutableList.of(
- new FieldAccessPostAggregator(null, "_a0:sum"),
- new FieldAccessPostAggregator(null, "_a0:count")
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ArithmeticPostAggregator(
+ "_a0",
+ "quotient",
+ ImmutableList.of(
+ new FieldAccessPostAggregator(null, "_a0:sum"),
+ new FieldAccessPostAggregator(null, "_a0:count")
+ )
+ ),
+ new DoublesSketchToQuantilePostAggregator(
+ "_a1",
+ makeFieldAccessPostAgg("_a1:agg"),
+ 0.98f
)
- ),
- new DoublesSketchToQuantilePostAggregator("_a1", makeFieldAccessPostAgg("_a1:agg"), 0.98f)
+ )
)
- )
- .setContext(QUERY_CONTEXT_DEFAULT)
- .build(),
- Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expectedResults
);
}
@Test
public void testQuantileOnInnerQuantileQuery() throws Exception
{
- SqlLifecycle sqlLifecycle = getSqlLifecycle();
- final String sql = "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n"
- + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1";
-
-
- final List<Object[]> results = sqlLifecycle.runSimple(
- sql,
- QUERY_CONTEXT_DEFAULT,
- DEFAULT_PARAMETERS,
- AUTH_RESULT
- ).toList();
-
ImmutableList.Builder<Object[]> builder = ImmutableList.builder();
builder.add(new Object[]{"", 1.0});
builder.add(new Object[]{"1", 4.0});
@@ -519,296 +466,292 @@
builder.add(new Object[]{"abc", 6.0});
builder.add(new Object[]{"def", 5.0});
final List<Object[]> expectedResults = builder.build();
- Assert.assertEquals(expectedResults.size(), results.size());
- for (int i = 0; i < expectedResults.size(); i++) {
- Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
- }
- // Verify query
- Assert.assertEquals(
- GroupByQuery.builder()
- .setDataSource(
- new QueryDataSource(
- GroupByQuery.builder()
- .setDataSource(CalciteTests.DATASOURCE1)
- .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
- .setGranularity(Granularities.ALL)
- .setDimensions(
- new DefaultDimensionSpec("dim1", "d0"),
- new DefaultDimensionSpec("dim2", "d1")
- )
- .setAggregatorSpecs(
- ImmutableList.of(
- new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
+ testQuery(
+ "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n"
+ + "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1",
+ Collections.singletonList(
+ GroupByQuery.builder()
+ .setDataSource(
+ new QueryDataSource(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(
+ new DefaultDimensionSpec("dim1", "d0"),
+ new DefaultDimensionSpec("dim2", "d1")
)
- )
- .setPostAggregatorSpecs(
- ImmutableList.of(
- new DoublesSketchToQuantilePostAggregator(
- "a0",
- makeFieldAccessPostAgg("a0:agg"),
- 0.5f
+ .setAggregatorSpecs(
+ ImmutableList.of(
+ new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
)
)
- )
- .setContext(QUERY_CONTEXT_DEFAULT)
- .build()
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new DoublesSketchToQuantilePostAggregator(
+ "a0",
+ makeFieldAccessPostAgg("a0:agg"),
+ 0.5f
+ )
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
)
- )
- .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
- .setGranularity(Granularities.ALL)
- .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))
- .setAggregatorSpecs(
- new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)
- )
- .setPostAggregatorSpecs(
- ImmutableList.of(
- new DoublesSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.5f)
+ .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))
+ .setAggregatorSpecs(
+ new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)
)
- )
- .setContext(QUERY_CONTEXT_DEFAULT)
- .build(),
- Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new DoublesSketchToQuantilePostAggregator(
+ "_a0",
+ makeFieldAccessPostAgg("_a0:agg"),
+ 0.5f
+ )
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expectedResults
);
}
@Test
public void testDoublesSketchPostAggs() throws Exception
{
- SqlLifecycle sqlLifecycle = getSqlLifecycle();
- final String sql = "SELECT\n"
- + " SUM(cnt),\n"
- + " APPROX_QUANTILE_DS(cnt, 0.5) + 1,\n"
- + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5) + 1000,\n"
- + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt + 123), 0.5) + 1000,\n"
- + " ABS(DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5)),\n"
- + " DS_GET_QUANTILES(DS_QUANTILES_SKETCH(cnt), 0.5, 0.8),\n"
- + " DS_HISTOGRAM(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
- + " DS_RANK(DS_QUANTILES_SKETCH(cnt), 3),\n"
- + " DS_CDF(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
- + " DS_QUANTILE_SUMMARY(DS_QUANTILES_SKETCH(cnt))\n"
- + "FROM foo";
-
- // Verify results
- final List<Object[]> results = sqlLifecycle.runSimple(
- sql,
- TIMESERIES_CONTEXT_DEFAULT,
- DEFAULT_PARAMETERS,
- AUTH_RESULT
- ).toList();
- final List<Object[]> expectedResults = ImmutableList.of(
- new Object[]{
- 6L,
- 2.0d,
- 1001.0d,
- 1124.0d,
- 1.0d,
- "[1.0,1.0]",
- "[0.0,0.0,6.0]",
- 1.0d,
- "[0.0,0.0,1.0]",
- "\n"
- + "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n"
- + " Empty : false\n"
- + " Direct, Capacity bytes : false, \n"
- + " Estimation Mode : false\n"
- + " K : 128\n"
- + " N : 6\n"
- + " Levels (Needed, Total, Valid): 0, 0, 0\n"
- + " Level Bit Pattern : 0\n"
- + " BaseBufferCount : 6\n"
- + " Combined Buffer Capacity : 8\n"
- + " Retained Items : 6\n"
- + " Compact Storage Bytes : 80\n"
- + " Updatable Storage Bytes : 96\n"
- + " Normalized Rank Error : 1.406%\n"
- + " Normalized Rank Error (PMF) : 1.711%\n"
- + " Min Value : 1.000000e+00\n"
- + " Max Value : 1.000000e+00\n"
- + "### END SKETCH SUMMARY\n"
- }
+ testQuery(
+ "SELECT\n"
+ + " SUM(cnt),\n"
+ + " APPROX_QUANTILE_DS(cnt, 0.5) + 1,\n"
+ + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5) + 1000,\n"
+ + " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt + 123), 0.5) + 1000,\n"
+ + " ABS(DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5)),\n"
+ + " DS_GET_QUANTILES(DS_QUANTILES_SKETCH(cnt), 0.5, 0.8),\n"
+ + " DS_HISTOGRAM(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
+ + " DS_RANK(DS_QUANTILES_SKETCH(cnt), 3),\n"
+ + " DS_CDF(DS_QUANTILES_SKETCH(cnt), 0.2, 0.6),\n"
+ + " -- The nonvectorized query uses a regular Aggregator, and the vectorized query uses a buffer-based\n"
+ + " -- VectorAggregator. The buffer-based aggregators return HeapCompactDoublesSketch instead of\n"
+ + " -- HeapUpdateDoublesSketch since they must make a copy out of the buffer before returning something.\n"
+ + " -- Use REPLACE to normalize summaries.\n"
+ + " REPLACE("
+ + " REPLACE("
+ + " DS_QUANTILE_SUMMARY(DS_QUANTILES_SKETCH(cnt)),"
+ + " 'HeapCompactDoublesSketch',"
+ + " 'HeapUpdateDoublesSketch'"
+ + " ),"
+ + " 'Combined Buffer Capacity : 6',"
+ + " 'Combined Buffer Capacity : 8'"
+ + " )\n"
+ + "FROM foo",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .virtualColumns(
+ new ExpressionVirtualColumn(
+ "v0",
+ "(\"cnt\" + 123)",
+ ValueType.FLOAT,
+ TestExprMacroTable.INSTANCE
+ )
+ )
+ .aggregators(ImmutableList.of(
+ new LongSumAggregatorFactory("a0", "cnt"),
+ new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128),
+ new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128),
+ new DoublesSketchAggregatorFactory("a3:agg", "v0", 128)
+ ))
+ .postAggregators(
+ new DoublesSketchToQuantilePostAggregator(
+ "a1",
+ makeFieldAccessPostAgg("a1:agg"),
+ 0.5f
+ ),
+ new ExpressionPostAggregator(
+ "p0",
+ "(\"a1\" + 1)",
+ null,
+ TestExprMacroTable.INSTANCE
+ ),
+ new DoublesSketchToQuantilePostAggregator(
+ "p2",
+ new FieldAccessPostAggregator(
+ "p1",
+ "a2:agg"
+ ),
+ 0.5f
+ ),
+ new ExpressionPostAggregator(
+ "p3",
+ "(p2 + 1000)",
+ null,
+ TestExprMacroTable.INSTANCE
+ ),
+ new DoublesSketchToQuantilePostAggregator(
+ "p5",
+ new FieldAccessPostAggregator(
+ "p4",
+ "a3:agg"
+ ),
+ 0.5f
+ ),
+ new ExpressionPostAggregator(
+ "p6",
+ "(p5 + 1000)",
+ null,
+ TestExprMacroTable.INSTANCE
+ ),
+ new DoublesSketchToQuantilePostAggregator(
+ "p8",
+ new FieldAccessPostAggregator(
+ "p7",
+ "a2:agg"
+ ),
+ 0.5f
+ ),
+ new ExpressionPostAggregator("p9", "abs(p8)", null, TestExprMacroTable.INSTANCE),
+ new DoublesSketchToQuantilesPostAggregator(
+ "p11",
+ new FieldAccessPostAggregator(
+ "p10",
+ "a2:agg"
+ ),
+ new double[]{0.5d, 0.8d}
+ ),
+ new DoublesSketchToHistogramPostAggregator(
+ "p13",
+ new FieldAccessPostAggregator(
+ "p12",
+ "a2:agg"
+ ),
+ new double[]{0.2d, 0.6d},
+ null
+ ),
+ new DoublesSketchToRankPostAggregator(
+ "p15",
+ new FieldAccessPostAggregator(
+ "p14",
+ "a2:agg"
+ ),
+ 3.0d
+ ),
+ new DoublesSketchToCDFPostAggregator(
+ "p17",
+ new FieldAccessPostAggregator(
+ "p16",
+ "a2:agg"
+ ),
+ new double[]{0.2d, 0.6d}
+ ),
+ new DoublesSketchToStringPostAggregator(
+ "p19",
+ new FieldAccessPostAggregator(
+ "p18",
+ "a2:agg"
+ )
+ ),
+ new ExpressionPostAggregator(
+ "p20",
+ "replace(replace(p19,'HeapCompactDoublesSketch','HeapUpdateDoublesSketch'),"
+ + "'Combined Buffer Capacity : 6',"
+ + "'Combined Buffer Capacity : 8')",
+ null,
+ ExprMacroTable.nil()
+ )
+ )
+ .context(TIMESERIES_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{
+ 6L,
+ 2.0d,
+ 1001.0d,
+ 1124.0d,
+ 1.0d,
+ "[1.0,1.0]",
+ "[0.0,0.0,6.0]",
+ 1.0d,
+ "[0.0,0.0,1.0]",
+ "\n"
+ + "### Quantiles HeapUpdateDoublesSketch SUMMARY: \n"
+ + " Empty : false\n"
+ + " Direct, Capacity bytes : false, \n"
+ + " Estimation Mode : false\n"
+ + " K : 128\n"
+ + " N : 6\n"
+ + " Levels (Needed, Total, Valid): 0, 0, 0\n"
+ + " Level Bit Pattern : 0\n"
+ + " BaseBufferCount : 6\n"
+ + " Combined Buffer Capacity : 8\n"
+ + " Retained Items : 6\n"
+ + " Compact Storage Bytes : 80\n"
+ + " Updatable Storage Bytes : 96\n"
+ + " Normalized Rank Error : 1.406%\n"
+ + " Normalized Rank Error (PMF) : 1.711%\n"
+ + " Min Value : 1.000000e+00\n"
+ + " Max Value : 1.000000e+00\n"
+ + "### END SKETCH SUMMARY\n"
+ }
+ )
);
- Assert.assertEquals(expectedResults.size(), results.size());
- for (int i = 0; i < expectedResults.size(); i++) {
- Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
- }
-
- Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
- Query expectedQuery = Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
- .granularity(Granularities.ALL)
- .virtualColumns(
- new ExpressionVirtualColumn(
- "v0",
- "(\"cnt\" + 123)",
- ValueType.FLOAT,
- TestExprMacroTable.INSTANCE
- )
- )
- .aggregators(ImmutableList.of(
- new LongSumAggregatorFactory("a0", "cnt"),
- new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128),
- new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128),
- new DoublesSketchAggregatorFactory("a3:agg", "v0", 128)
- ))
- .postAggregators(
- new DoublesSketchToQuantilePostAggregator(
- "a1",
- makeFieldAccessPostAgg("a1:agg"),
- 0.5f
- ),
- new ExpressionPostAggregator(
- "p0",
- "(\"a1\" + 1)",
- null,
- TestExprMacroTable.INSTANCE
- ),
- new DoublesSketchToQuantilePostAggregator(
- "p2",
- new FieldAccessPostAggregator(
- "p1",
- "a2:agg"
- ),
- 0.5f
- ),
- new ExpressionPostAggregator(
- "p3",
- "(p2 + 1000)",
- null,
- TestExprMacroTable.INSTANCE
- ),
- new DoublesSketchToQuantilePostAggregator(
- "p5",
- new FieldAccessPostAggregator(
- "p4",
- "a3:agg"
- ),
- 0.5f
- ),
- new ExpressionPostAggregator(
- "p6",
- "(p5 + 1000)",
- null,
- TestExprMacroTable.INSTANCE
- ),
- new DoublesSketchToQuantilePostAggregator(
- "p8",
- new FieldAccessPostAggregator(
- "p7",
- "a2:agg"
- ),
- 0.5f
- ),
- new ExpressionPostAggregator("p9", "abs(p8)", null, TestExprMacroTable.INSTANCE),
- new DoublesSketchToQuantilesPostAggregator(
- "p11",
- new FieldAccessPostAggregator(
- "p10",
- "a2:agg"
- ),
- new double[]{0.5d, 0.8d}
- ),
- new DoublesSketchToHistogramPostAggregator(
- "p13",
- new FieldAccessPostAggregator(
- "p12",
- "a2:agg"
- ),
- new double[]{0.2d, 0.6d},
- null
- ),
- new DoublesSketchToRankPostAggregator(
- "p15",
- new FieldAccessPostAggregator(
- "p14",
- "a2:agg"
- ),
- 3.0d
- ),
- new DoublesSketchToCDFPostAggregator(
- "p17",
- new FieldAccessPostAggregator(
- "p16",
- "a2:agg"
- ),
- new double[]{0.2d, 0.6d}
- ),
- new DoublesSketchToStringPostAggregator(
- "p19",
- new FieldAccessPostAggregator(
- "p18",
- "a2:agg"
- )
- )
- )
- .context(TIMESERIES_CONTEXT_DEFAULT)
- .build();
-
- // Verify query
- Assert.assertEquals(expectedQuery, actualQuery);
}
@Test
public void testDoublesSketchPostAggsPostSort() throws Exception
{
- SqlLifecycle sqlLifecycle = getSqlLifecycle();
-
- final String sql = "SELECT DS_QUANTILES_SKETCH(m1) as y FROM druid.foo ORDER BY DS_GET_QUANTILE(DS_QUANTILES_SKETCH(m1), 0.5) DESC LIMIT 10";
- final String sql2 = StringUtils.format("SELECT DS_GET_QUANTILE(y, 0.5), DS_GET_QUANTILE(y, 0.98) from (%s)", sql);
-
- // Verify results
- final List<Object[]> results = sqlLifecycle.runSimple(
- sql2,
- TIMESERIES_CONTEXT_DEFAULT,
- DEFAULT_PARAMETERS,
- AUTH_RESULT
- ).toList();
- final List<Object[]> expectedResults = ImmutableList.of(
- new Object[]{
- 4.0d,
- 6.0d
- }
- );
-
- Assert.assertEquals(expectedResults.size(), results.size());
- for (int i = 0; i < expectedResults.size(); i++) {
- Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
- }
-
- Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
-
- Query expectedQuery =
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
- .granularity(Granularities.ALL)
- .aggregators(
- ImmutableList.of(
- new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
- )
- )
- .postAggregators(
- ImmutableList.of(
- new FieldAccessPostAggregator("p0", "a0:agg"),
- new DoublesSketchToQuantilePostAggregator(
- "p2",
- new FieldAccessPostAggregator("p1", "a0:agg"),
- 0.5
- ),
- new DoublesSketchToQuantilePostAggregator("s1", new FieldAccessPostAggregator("s0", "p0"), 0.5),
- new DoublesSketchToQuantilePostAggregator(
- "s3",
- new FieldAccessPostAggregator("s2", "p0"),
- 0.9800000190734863
+ testQuery(
+ "SELECT DS_GET_QUANTILE(y, 0.5), DS_GET_QUANTILE(y, 0.98) from ("
+ + "SELECT DS_QUANTILES_SKETCH(m1) as y FROM druid.foo ORDER BY DS_GET_QUANTILE(DS_QUANTILES_SKETCH(m1), 0.5) DESC LIMIT 10"
+ + ")",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .aggregators(
+ ImmutableList.of(
+ new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
)
)
- )
- .context(TIMESERIES_CONTEXT_DEFAULT)
- .build();
-
- // Verify query
- Assert.assertEquals(expectedQuery, actualQuery);
+ .postAggregators(
+ ImmutableList.of(
+ new FieldAccessPostAggregator("p0", "a0:agg"),
+ new DoublesSketchToQuantilePostAggregator(
+ "p2",
+ new FieldAccessPostAggregator("p1", "a0:agg"),
+ 0.5
+ ),
+ new DoublesSketchToQuantilePostAggregator(
+ "s1",
+ new FieldAccessPostAggregator("s0", "p0"),
+ 0.5
+ ),
+ new DoublesSketchToQuantilePostAggregator(
+ "s3",
+ new FieldAccessPostAggregator("s2", "p0"),
+ 0.9800000190734863
+ )
+ )
+ )
+ .context(TIMESERIES_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{
+ 4.0d,
+ 6.0d
+ }
+ )
+ );
}
private static PostAggregator makeFieldAccessPostAgg(String name)
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index 0d6e8b8..1ed990b 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -96,8 +96,10 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
/**
* This class provides general utility to test any druid aggregation implementation given raw data,
@@ -116,6 +118,8 @@
private final TemporaryFolder tempFolder;
private final Closer resourceCloser;
+ private final Map<String, Object> queryContext;
+
private AggregationTestHelper(
ObjectMapper mapper,
IndexMerger indexMerger,
@@ -124,7 +128,8 @@
QueryRunnerFactory factory,
TemporaryFolder tempFolder,
List<? extends Module> jsonModulesToRegister,
- Closer resourceCloser
+ Closer resourceCloser,
+ Map<String, Object> queryContext
)
{
this.mapper = mapper;
@@ -134,6 +139,7 @@
this.factory = factory;
this.tempFolder = tempFolder;
this.resourceCloser = resourceCloser;
+ this.queryContext = queryContext;
for (Module mod : jsonModulesToRegister) {
mapper.registerModule(mod);
@@ -174,7 +180,8 @@
factory,
tempFolder,
jsonModulesToRegister,
- closer
+ closer,
+ Collections.emptyMap()
);
}
@@ -213,7 +220,8 @@
factory,
tempFolder,
jsonModulesToRegister,
- Closer.create()
+ Closer.create(),
+ Collections.emptyMap()
);
}
@@ -264,7 +272,8 @@
factory,
tempFolder,
jsonModulesToRegister,
- resourceCloser
+ resourceCloser,
+ Collections.emptyMap()
);
}
@@ -307,7 +316,25 @@
factory,
tempFolder,
jsonModulesToRegister,
- resourceCloser
+ resourceCloser,
+ Collections.emptyMap()
+ );
+ }
+
+ public AggregationTestHelper withQueryContext(final Map<String, Object> queryContext)
+ {
+ final Map<String, Object> newContext = new HashMap<>(this.queryContext);
+ newContext.putAll(queryContext);
+ return new AggregationTestHelper(
+ mapper,
+ indexMerger,
+ indexIO,
+ toolChest,
+ factory,
+ tempFolder,
+ Collections.emptyList(),
+ resourceCloser,
+ newContext
);
}
@@ -658,7 +685,7 @@
//from each segment, later deserialize and merge and finally return the results
public <T> Sequence<T> runQueryOnSegments(final List<File> segmentDirs, final String queryJson)
{
- return runQueryOnSegments(segmentDirs, readQuery(queryJson));
+ return runQueryOnSegments(segmentDirs, readQuery(queryJson).withOverriddenContext(queryContext));
}
public <T> Sequence<T> runQueryOnSegments(final List<File> segmentDirs, final Query<T> query)