adjust topn heap algorithm to only use known cardinality path when dictionary is unique (#11186)

* adjust topn heap algorithm to only use known cardinality path when dictionary is unique

* better check and add comment

* adjust comment more
diff --git a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
index 87f7956..14f3b72 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
@@ -42,7 +42,6 @@
   )
   {
     super(storageAdapter);
-
     this.query = query;
   }
 
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
index cee1e3a..b036ba6 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
@@ -87,10 +87,6 @@
       Map<Comparable<?>, Aggregator[]> aggregatesStore
   )
   {
-    if (params.getCardinality() < 0) {
-      throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
-    }
-
     final Cursor cursor = params.getCursor();
     final DimensionSelector dimSelector = params.getDimSelector();
 
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index daad69f..ad03f6a 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -202,7 +202,7 @@
   }
 
   /**
-   * {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since the
+   * {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since
    * which value is used as the grouping key itself doesn't particularly matter. For top-n, this method allows the
    * query to be transformed in {@link TopNQueryQueryToolChest#preMergeQueryDecoration} to strip off the
    * {@link ExtractionFn} on the broker, so that a more optimized algorithm (e.g. {@link PooledTopNAlgorithm}) can be
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
index 92b2e8a..eacbde0 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
@@ -29,6 +29,7 @@
 import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.IndexedInts;
 
@@ -38,18 +39,25 @@
 
 public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor<DimensionSelector>
 {
+  private final ColumnCapabilities capabilities;
   private final Function<Object, Comparable<?>> dimensionValueConverter;
   private HashMap<Comparable<?>, Aggregator[]> aggregatesStore;
 
-  public StringTopNColumnAggregatesProcessor(final ValueType dimensionType)
+  public StringTopNColumnAggregatesProcessor(final ColumnCapabilities capabilities, final ValueType dimensionType)
   {
+    this.capabilities = capabilities;
     this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType);
   }
 
   @Override
   public int getCardinality(DimensionSelector selector)
   {
-    return selector.getValueCardinality();
+    // only report the underlying selector cardinality if the column the selector is for is dictionary encoded, and
+    // the dictionary values are unique, that is they have a 1:1 mapping between dictionaryId and column value
+    if (capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue()) {
+      return selector.getValueCardinality();
+    }
+    return DimensionDictionarySelector.CARDINALITY_UNKNOWN;
   }
 
   @Override
@@ -108,7 +116,18 @@
       Aggregator[][] rowSelector
   )
   {
-    if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) {
+    final boolean notUnknown = selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN;
+    final boolean unique = capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue();
+    // we must know cardinality to use array based aggregation
+    // we check for uniquely dictionary encoded values because non-unique (meaning dictionary ids do not have a 1:1
+    // relation with values) negates many of the benefits of array aggregation:
+    // - if different dictionary ids map to the same value but dictionary ids are unique to that value (*:1), then
+    //   array aggregation will be correct but will still have to potentially perform many map lookups and lose the
+    //   performance benefit array aggregation is trying to provide
+    // - in cases where the same dictionary ids map to different values (1:* or *:*), results can be entirely
+    //   incorrect since an aggregator for a different value might be chosen from the array based on the re-used
+    //   dictionary id
+    if (notUnknown && unique) {
       return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector);
     } else {
       return scanAndAggregateWithCardinalityUnknown(query, cursor, selector);
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
index 56a2943..be6eb08 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
@@ -48,7 +48,7 @@
     final ValueType selectorType = capabilities.getType();
 
     if (selectorType.equals(ValueType.STRING)) {
-      return new StringTopNColumnAggregatesProcessor(dimensionType);
+      return new StringTopNColumnAggregatesProcessor(capabilities, dimensionType);
     } else if (selectorType.isNumeric()) {
       final Function<Object, Comparable<?>> converter;
       final ValueType strategyType;
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 1e32e76..f724cd0 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -16929,6 +16929,45 @@
   }
 
   @Test
+  @Parameters(source = QueryContextForJoinProvider.class)
+  public void testTopNOnStringWithNonSortedOrUniqueDictionaryOrderByDim(Map<String, Object> queryContext) throws Exception
+  {
+    testQuery(
+        "SELECT druid.broadcast.dim4, COUNT(*)\n"
+        + "FROM druid.numfoo\n"
+        + "INNER JOIN druid.broadcast ON numfoo.dim4 = broadcast.dim4\n"
+        + "GROUP BY 1 ORDER BY 1 DESC LIMIT 4",
+        queryContext,
+        ImmutableList.of(
+            new TopNQueryBuilder()
+                .dataSource(
+                    join(
+                        new TableDataSource(CalciteTests.DATASOURCE3),
+                        new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE),
+                        "j0.",
+                        equalsCondition(
+                            DruidExpression.fromColumn("dim4"),
+                            DruidExpression.fromColumn("j0.dim4")
+                        ),
+                        JoinType.INNER
+                    )
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .dimension(new DefaultDimensionSpec("j0.dim4", "_d0", ValueType.STRING))
+                .threshold(4)
+                .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                .context(queryContext)
+                .metric(new InvertedTopNMetricSpec(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC)))
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"b", 9L},
+            new Object[]{"a", 9L}
+        )
+    );
+  }
+
+  @Test
   public void testTimeStampAddZeroDayPeriod() throws Exception
   {
     testQuery(