adjust topn heap algorithm to only use known cardinality path when dictionary is unique (#11186)
* adjust topn heap algorithm to only use known cardinality path when dictionary is unique
* better check and add comment
* adjust comment more
diff --git a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
index 87f7956..14f3b72 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
@@ -42,7 +42,6 @@
)
{
super(storageAdapter);
-
this.query = query;
}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
index cee1e3a..b036ba6 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
@@ -87,10 +87,6 @@
Map<Comparable<?>, Aggregator[]> aggregatesStore
)
{
- if (params.getCardinality() < 0) {
- throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
- }
-
final Cursor cursor = params.getCursor();
final DimensionSelector dimSelector = params.getDimSelector();
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index daad69f..ad03f6a 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -202,7 +202,7 @@
}
/**
- * {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since the
+ * {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since
* which value is used as the grouping key itself doesn't particularly matter. For top-n, this method allows the
* query to be transformed in {@link TopNQueryQueryToolChest#preMergeQueryDecoration} to strip off the
* {@link ExtractionFn} on the broker, so that a more optimized algorithm (e.g. {@link PooledTopNAlgorithm}) can be
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
index 92b2e8a..eacbde0 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
@@ -29,6 +29,7 @@
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
@@ -38,18 +39,25 @@
public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor<DimensionSelector>
{
+ private final ColumnCapabilities capabilities;
private final Function<Object, Comparable<?>> dimensionValueConverter;
private HashMap<Comparable<?>, Aggregator[]> aggregatesStore;
- public StringTopNColumnAggregatesProcessor(final ValueType dimensionType)
+ public StringTopNColumnAggregatesProcessor(final ColumnCapabilities capabilities, final ValueType dimensionType)
{
+ this.capabilities = capabilities;
this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType);
}
@Override
public int getCardinality(DimensionSelector selector)
{
- return selector.getValueCardinality();
+ // only report the underlying selector cardinality if the column the selector is for is dictionary encoded, and
+ // the dictionary values are unique, that is they have a 1:1 mapping between dictionaryId and column value
+ if (capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue()) {
+ return selector.getValueCardinality();
+ }
+ return DimensionDictionarySelector.CARDINALITY_UNKNOWN;
}
@Override
@@ -108,7 +116,18 @@
Aggregator[][] rowSelector
)
{
- if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) {
+ final boolean notUnknown = selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN;
+ final boolean unique = capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue();
+ // we must know cardinality to use array based aggregation
+ // we check for uniquely dictionary encoded values because non-unique (meaning dictionary ids do not have a 1:1
+ // relation with values) negates many of the benefits of array aggregation:
+ // - if different dictionary ids map to the same value but dictionary ids are unique to that value (*:1), then
+ // array aggregation will be correct but will still have to potentially perform many map lookups and lose the
+ // performance benefit array aggregation is trying to provide
+ // - in cases where the same dictionary ids map to different values (1:* or *:*), results can be entirely
+ // incorrect since an aggregator for a different value might be chosen from the array based on the re-used
+ // dictionary id
+ if (notUnknown && unique) {
return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector);
} else {
return scanAndAggregateWithCardinalityUnknown(query, cursor, selector);
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
index 56a2943..be6eb08 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
@@ -48,7 +48,7 @@
final ValueType selectorType = capabilities.getType();
if (selectorType.equals(ValueType.STRING)) {
- return new StringTopNColumnAggregatesProcessor(dimensionType);
+ return new StringTopNColumnAggregatesProcessor(capabilities, dimensionType);
} else if (selectorType.isNumeric()) {
final Function<Object, Comparable<?>> converter;
final ValueType strategyType;
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 1e32e76..f724cd0 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -16929,6 +16929,45 @@
}
@Test
+ @Parameters(source = QueryContextForJoinProvider.class)
+ public void testTopNOnStringWithNonSortedOrUniqueDictionaryOrderByDim(Map<String, Object> queryContext) throws Exception
+ {
+ testQuery(
+ "SELECT druid.broadcast.dim4, COUNT(*)\n"
+ + "FROM druid.numfoo\n"
+ + "INNER JOIN druid.broadcast ON numfoo.dim4 = broadcast.dim4\n"
+ + "GROUP BY 1 ORDER BY 1 DESC LIMIT 4",
+ queryContext,
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE3),
+ new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE),
+ "j0.",
+ equalsCondition(
+ DruidExpression.fromColumn("dim4"),
+ DruidExpression.fromColumn("j0.dim4")
+ ),
+ JoinType.INNER
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .dimension(new DefaultDimensionSpec("j0.dim4", "_d0", ValueType.STRING))
+ .threshold(4)
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
+ .context(queryContext)
+ .metric(new InvertedTopNMetricSpec(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC)))
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"b", 9L},
+ new Object[]{"a", 9L}
+ )
+ );
+ }
+
+ @Test
public void testTimeStampAddZeroDayPeriod() throws Exception
{
testQuery(