Grouping on complex columns aka unifying GroupBy strategies  (#16068)

Users can pass complex types as dimensions to the group by queries. For example:

SELECT nested_col1, count(*) FROM foo GROUP BY nested_col1
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlGroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlGroupByBenchmark.java
new file mode 100644
index 0000000..52745e6
--- /dev/null
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlGroupByBenchmark.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.guice.NestedDataModule;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.AutoTypeColumnSchema;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.column.StringEncodingStrategy;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.segment.transform.ExpressionTransform;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.sql.calcite.SqlVectorizedExpressionSanityTest;
+import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
+import org.apache.druid.sql.calcite.planner.DruidPlanner;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.planner.PlannerFactory;
+import org.apache.druid.sql.calcite.planner.PlannerResult;
+import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+public class SqlGroupByBenchmark
+{
+  static {
+    NullHandling.initializeForTests();
+    ExpressionProcessing.initializeForTests();
+    NestedDataModule.registerHandlersAndSerde();
+  }
+
+  private static final DruidProcessingConfig PROCESSING_CONFIG = new DruidProcessingConfig()
+  {
+    @Override
+    public int intermediateComputeSizeBytes()
+    {
+      return 512 * 1024 * 1024;
+    }
+
+    @Override
+    public int getNumMergeBuffers()
+    {
+      return 3;
+    }
+
+    @Override
+    public int getNumThreads()
+    {
+      return 1;
+    }
+
+    @Override
+    public String getFormatString()
+    {
+      return "benchmarks-processing-%s";
+    }
+  };
+
+  @Param({
+      "string-Sequential-100_000",
+      "string-Sequential-10_000_000",
+      // "string-Sequential-1_000_000_000",
+      "string-ZipF-1_000_000",
+      "string-Uniform-1_000_000",
+
+      "multi-string-Sequential-100_000",
+      "multi-string-Sequential-10_000_000",
+      // "multi-string-Sequential-1_000_000_000",
+      "multi-string-ZipF-1_000_000",
+      "multi-string-Uniform-1_000_000",
+
+      "long-Sequential-100_000",
+      "long-Sequential-10_000_000",
+      // "long-Sequential-1_000_000_000",
+      "long-ZipF-1_000_000",
+      "long-Uniform-1_000_000",
+
+      "double-ZipF-1_000_000",
+      "double-Uniform-1_000_000",
+
+      "float-ZipF-1_000_000",
+      "float-Uniform-1_000_000",
+
+      "stringArray-Sequential-100_000",
+      "stringArray-Sequential-3_000_000",
+      // "stringArray-Sequential-1_000_000_000",
+      "stringArray-ZipF-1_000_000",
+      "stringArray-Uniform-1_000_000",
+
+      "longArray-Sequential-100_000",
+      "longArray-Sequential-3_000_000",
+      // "longArray-Sequential-1_000_000_000",
+      "longArray-ZipF-1_000_000",
+      "longArray-Uniform-1_000_000",
+
+      "nested-Sequential-100_000",
+      "nested-Sequential-3_000_000",
+      // "nested-Sequential-1_000_000_000",
+      "nested-ZipF-1_000_000",
+      "nested-Uniform-1_000_000",
+  })
+  private String groupingDimension;
+
+  private SqlEngine engine;
+  @Nullable
+  private PlannerFactory plannerFactory;
+  private Closer closer = Closer.create();
+
+  @Setup(Level.Trial)
+  public void setup()
+  {
+    final GeneratorSchemaInfo schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get("groupBy-testbench");
+
+    final DataSegment dataSegment = DataSegment.builder()
+                                               .dataSource("foo")
+                                               .interval(schemaInfo.getDataInterval())
+                                               .version("1")
+                                               .shardSpec(new LinearShardSpec(0))
+                                               .size(0)
+                                               .build();
+    final DataSegment dataSegment2 = DataSegment.builder()
+                                               .dataSource("foo")
+                                               .interval(schemaInfo.getDataInterval())
+                                               .version("1")
+                                               .shardSpec(new LinearShardSpec(1))
+                                               .size(0)
+                                               .build();
+
+
+    final PlannerConfig plannerConfig = new PlannerConfig();
+
+    String columnCardinalityWithUnderscores = groupingDimension.substring(groupingDimension.lastIndexOf('-') + 1);
+    int rowsPerSegment = Integer.parseInt(StringUtils.replace(columnCardinalityWithUnderscores, "_", ""));
+
+    final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
+
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(
+            // string array dims
+            new ExpressionTransform(
+                "stringArray-Sequential-100_000",
+                "array(\"string-Sequential-100_000\")",
+                TestExprMacroTable.INSTANCE
+            ),
+            new ExpressionTransform(
+                "stringArray-Sequential-3_000_000",
+                "array(\"string-Sequential-10_000_000\")",
+                TestExprMacroTable.INSTANCE
+            ),
+            /*
+            new ExpressionTransform(
+                "stringArray-Sequential-1_000_000_000",
+                "array(\"string-Sequential-1_000_000_000\")",
+                TestExprMacroTable.INSTANCE
+            ),*/
+            new ExpressionTransform(
+                "stringArray-ZipF-1_000_000",
+                "array(\"string-ZipF-1_000_000\")",
+                TestExprMacroTable.INSTANCE
+            ),
+            new ExpressionTransform(
+                "stringArray-Uniform-1_000_000",
+                "array(\"string-Uniform-1_000_000\")",
+                TestExprMacroTable.INSTANCE
+            ),
+
+            // long array dims
+            new ExpressionTransform(
+                "longArray-Sequential-100_000",
+                "array(\"long-Sequential-100_000\")",
+                TestExprMacroTable.INSTANCE
+            ),
+            new ExpressionTransform(
+                "longArray-Sequential-3_000_000",
+                "array(\"long-Sequential-10_000_000\")",
+                TestExprMacroTable.INSTANCE
+            ),
+            /*
+            new ExpressionTransform(
+                "longArray-Sequential-1_000_000_000",
+                "array(\"long-Sequential-1_000_000_000\")",
+                TestExprMacroTable.INSTANCE
+            ),*/
+            new ExpressionTransform(
+                "longArray-ZipF-1_000_000",
+                "array(\"long-ZipF-1_000_000\")",
+                TestExprMacroTable.INSTANCE
+            ),
+            new ExpressionTransform(
+                "longArray-Uniform-1_000_000",
+                "array(\"long-Uniform-1_000_000\")",
+                TestExprMacroTable.INSTANCE
+            ),
+
+            // nested complex json dim
+            new ExpressionTransform(
+                "nested-Sequential-100_000",
+                "json_object('long1', \"long-Sequential-100_000\", 'nesteder', json_object('long1', \"long-Sequential-100_000\"))",
+                TestExprMacroTable.INSTANCE
+            ),
+            new ExpressionTransform(
+                "nested-Sequential-3_000_000",
+                "json_object('long1', \"long-Sequential-10_000_000\", 'nesteder', json_object('long1', \"long-Sequential-10_000_000\"))",
+                TestExprMacroTable.INSTANCE
+            ),
+            /*new ExpressionTransform(
+                "nested-Sequential-1_000_000_000",
+                "json_object('long1', \"long-Sequential-1_000_000_000\", 'nesteder', json_object('long1', \"long-Sequential-1_000_000_000\"))",
+                TestExprMacroTable.INSTANCE
+            ),*/
+            new ExpressionTransform(
+                "nested-ZipF-1_000_000",
+                "json_object('long1', \"long-ZipF-1_000_000\", 'nesteder', json_object('long1', \"long-ZipF-1_000_000\"))",
+                TestExprMacroTable.INSTANCE
+            ),
+            new ExpressionTransform(
+                "nested-Uniform-1_000_000",
+                "json_object('long1', \"long-Uniform-1_000_000\", 'nesteder', json_object('long1', \"long-Uniform-1_000_000\"))",
+                TestExprMacroTable.INSTANCE
+            )
+        )
+    );
+
+    List<DimensionSchema> columnSchemas = schemaInfo.getDimensionsSpec()
+                                                    .getDimensions()
+                                                    .stream()
+                                                    .map(x -> new AutoTypeColumnSchema(x.getName(), null))
+                                                    .collect(Collectors.toList());
+
+    List<DimensionSchema> transformSchemas = transformSpec
+        .getTransforms()
+        .stream()
+        .map(
+            transform -> new AutoTypeColumnSchema(transform.getName(), null)
+        )
+        .collect(Collectors.toList());
+
+
+
+    final QueryableIndex index = segmentGenerator.generate(
+        dataSegment,
+        schemaInfo,
+        DimensionsSpec.builder()
+                      .setDimensions(ImmutableList.<DimensionSchema>builder()
+                                                  .addAll(columnSchemas)
+                                                  .addAll(transformSchemas)
+                                                  .build()
+                      )
+                      .build(),
+        transformSpec,
+        IndexSpec.builder().withStringDictionaryEncoding(new StringEncodingStrategy.Utf8()).build(),
+        Granularities.NONE,
+        rowsPerSegment
+    );
+
+    final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(
+        closer,
+        PROCESSING_CONFIG
+    );
+
+    final SpecificSegmentsQuerySegmentWalker walker = SpecificSegmentsQuerySegmentWalker.createWalker(conglomerate)
+                                                                                        .add(dataSegment, index)
+                                                                                        .add(dataSegment2, index);
+    closer.register(walker);
+
+    // Hacky and pollutes global namespace, but it is fine since benchmarks are run in isolation. Wasn't able
+    // to work up a cleaner way of doing it by modifying the injector.
+    CalciteTests.getJsonMapper().registerModules(NestedDataModule.getJacksonModulesList());
+
+    final DruidSchemaCatalog rootSchema =
+        CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+    engine = CalciteTests.createMockSqlEngine(walker, conglomerate);
+    plannerFactory = new PlannerFactory(
+        rootSchema,
+        CalciteTests.createOperatorTable(),
+        CalciteTests.createExprMacroTable(),
+        plannerConfig,
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        CalciteTests.getJsonMapper(),
+        CalciteTests.DRUID_SCHEMA_NAME,
+        new CalciteRulesManager(ImmutableSet.of()),
+        CalciteTests.createJoinableFactoryWrapper(),
+        CatalogResolver.NULL_RESOLVER,
+        new AuthConfig()
+    );
+
+    try {
+      SqlVectorizedExpressionSanityTest.sanityTestVectorizedSqlQueries(
+          plannerFactory,
+          sqlQuery(groupingDimension)
+      );
+    }
+    catch (Throwable ignored) {
+      // the show must go on
+    }
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDown() throws Exception
+  {
+    closer.close();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void querySql(Blackhole blackhole)
+  {
+    final String sql = sqlQuery(groupingDimension);
+    try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, Collections.emptyMap())) {
+      final PlannerResult plannerResult = planner.plan();
+      final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
+      final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
+      blackhole.consume(lastRow);
+    }
+  }
+
+  private static String sqlQuery(String groupingDimension)
+  {
+    return StringUtils.format("SELECT \"%s\", COUNT(*) FROM foo GROUP BY 1", groupingDimension);
+  }
+}
diff --git a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
index 8e1bba4..9271b9b 100644
--- a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
+++ b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
@@ -24,6 +24,7 @@
 import org.apache.druid.collections.StupidPool;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
@@ -138,10 +139,10 @@
     );
 
     Throwable t = Assert.assertThrows(
-        UnsupportedOperationException.class,
+        DruidException.class,
         () -> runner.run(QueryPlus.wrap(query)).toList()
     );
-    Assert.assertEquals("Map column doesn't support getRow()", t.getMessage());
+    Assert.assertEquals("Unable to group on the column[params]", t.getMessage());
   }
 
 
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index c2d7e1a..7c4af73 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -2028,7 +2028,7 @@
         .setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
             CoreMatchers.instanceOf(DruidException.class),
             ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
-                "SQL requires a group-by on a column of type COMPLEX<hyperUnique> that is unsupported"))
+                "SQL requires a group-by on a column with type [COMPLEX<hyperUnique>] that is unsupported."))
         ))
         .verifyExecutionError();
   }
diff --git a/processing/src/main/java/org/apache/druid/common/config/NullHandling.java b/processing/src/main/java/org/apache/druid/common/config/NullHandling.java
index 7b47220..747512c 100644
--- a/processing/src/main/java/org/apache/druid/common/config/NullHandling.java
+++ b/processing/src/main/java/org/apache/druid/common/config/NullHandling.java
@@ -143,6 +143,33 @@
   }
 
   @Nullable
+  public static Long nullToEmptyIfNeeded(@Nullable Long value)
+  {
+    if (replaceWithDefault() && value == null) {
+      return defaultLongValue();
+    }
+    return value;
+  }
+
+  @Nullable
+  public static Float nullToEmptyIfNeeded(@Nullable Float value)
+  {
+    if (replaceWithDefault() && value == null) {
+      return defaultFloatValue();
+    }
+    return value;
+  }
+
+  @Nullable
+  public static Double nullToEmptyIfNeeded(@Nullable Double value)
+  {
+    if (replaceWithDefault() && value == null) {
+      return defaultDoubleValue();
+    }
+    return value;
+  }
+
+  @Nullable
   public static String emptyToNullIfNeeded(@Nullable String value)
   {
     //CHECKSTYLE.OFF: Regexp
diff --git a/processing/src/main/java/org/apache/druid/frame/write/FrameWriterUtils.java b/processing/src/main/java/org/apache/druid/frame/write/FrameWriterUtils.java
index 864a9cc..0bb78b2 100644
--- a/processing/src/main/java/org/apache/druid/frame/write/FrameWriterUtils.java
+++ b/processing/src/main/java/org/apache/druid/frame/write/FrameWriterUtils.java
@@ -33,7 +33,6 @@
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.segment.data.ComparableIntArray;
 import org.apache.druid.segment.data.IndexedInts;
 
 import javax.annotation.Nullable;
@@ -195,10 +194,6 @@
       for (Object value : (Object[]) row) {
         retVal.add((Number) value);
       }
-    } else if (row instanceof ComparableIntArray) {
-      for (int value : ((ComparableIntArray) row).getDelegate()) {
-        retVal.add(value);
-      }
     } else {
       throw new ISE("Unexpected type %s found", row.getClass().getName());
     }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategyFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategyFactory.java
index 12ed3c5..ad22956 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategyFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategyFactory.java
@@ -30,7 +30,8 @@
   @Override
   public CardinalityAggregatorColumnSelectorStrategy makeColumnSelectorStrategy(
       ColumnCapabilities capabilities,
-      ColumnValueSelector selector
+      ColumnValueSelector selector,
+      String dimension
   )
   {
     switch (capabilities.getType()) {
@@ -46,4 +47,10 @@
         throw new IAE("Cannot create query type helper from invalid type [%s]", capabilities.asTypeString());
     }
   }
+
+  @Override
+  public boolean supportsComplexTypes()
+  {
+    return false;
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/dimension/ColumnSelectorStrategyFactory.java b/processing/src/main/java/org/apache/druid/query/dimension/ColumnSelectorStrategyFactory.java
index dc9304d..7e4db58 100644
--- a/processing/src/main/java/org/apache/druid/query/dimension/ColumnSelectorStrategyFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/dimension/ColumnSelectorStrategyFactory.java
@@ -24,5 +24,11 @@
 
 public interface ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
 {
-  ColumnSelectorStrategyClass makeColumnSelectorStrategy(ColumnCapabilities capabilities, ColumnValueSelector selector);
+  ColumnSelectorStrategyClass makeColumnSelectorStrategy(ColumnCapabilities capabilities, ColumnValueSelector selector, String dimension);
+
+  /**
+   * Whether the strategy supports complex types. If a strategy doesn't support the complex types, they can either throw an
+   * unsupported exception or treat them like strings.
+   */
+  boolean supportsComplexTypes();
 }
diff --git a/processing/src/main/java/org/apache/druid/query/filter/ArrayContainsElementFilter.java b/processing/src/main/java/org/apache/druid/query/filter/ArrayContainsElementFilter.java
index 129c857..6e6a418 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/ArrayContainsElementFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/ArrayContainsElementFilter.java
@@ -26,6 +26,7 @@
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.RangeSet;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.math.expr.ExprEval;
@@ -108,7 +109,15 @@
     final NullableTypeStrategy<Object> typeStrategy = elementMatchValueEval.type().getNullableStrategy();
     final int size = typeStrategy.estimateSizeBytes(elementMatchValueEval.value());
     final ByteBuffer valueBuffer = ByteBuffer.allocate(size);
-    typeStrategy.write(valueBuffer, elementMatchValueEval.value(), size);
+    if (typeStrategy.write(valueBuffer, elementMatchValueEval.value(), size) < 0) {
+      // Defensive check, since the size had already been estimated from the same type strategy
+      throw DruidException.defensive(
+          "Unable to write the for the column [%s] with value [%s] and size [%d]",
+          elementMatchValueEval.value(),
+          column,
+          size
+      );
+    }
     return new CacheKeyBuilder(DimFilterUtils.ARRAY_CONTAINS_CACHE_ID)
         .appendByte(DimFilterUtils.STRING_SEPARATOR)
         .appendString(column)
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index 4ca5b09..cdcf9e3 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -43,6 +43,7 @@
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.DataSource;
+import org.apache.druid.query.DimensionComparisonUtils;
 import org.apache.druid.query.Queries;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryDataSource;
@@ -597,11 +598,7 @@
         needsReverseList.add(false);
         final ColumnType type = dimensions.get(i).getOutputType();
         dimensionTypes.add(type);
-        if (type.isNumeric()) {
-          comparators.add(StringComparators.NUMERIC);
-        } else {
-          comparators.add(StringComparators.LEXICOGRAPHIC);
-        }
+        comparators.add(StringComparators.NATURAL);
       }
     }
 
@@ -762,6 +759,12 @@
     }
   }
 
+  /**
+   * Compares the dimensions for limit pushdown.
+   *
+   * Due to legacy reason, the provided StringComparator for the arrays isn't applied and must be changed once we
+   * get rid of the StringComparators for array types
+   */
   private static int compareDimsForLimitPushDown(
       final IntList fields,
       final List<Boolean> needsReverseList,
@@ -781,7 +784,7 @@
       final Object rhsObj = rhs.get(fieldNumber);
 
       if (dimensionType.isNumeric()) {
-        if (comparator.equals(StringComparators.NUMERIC)) {
+        if (DimensionComparisonUtils.isNaturalComparator(dimensionType.getType(), comparator)) {
           dimCompare = DimensionHandlerUtils.compareObjectsAsType(lhsObj, rhsObj, dimensionType);
         } else {
           dimCompare = comparator.compare(String.valueOf(lhsObj), String.valueOf(rhsObj));
@@ -795,6 +798,8 @@
         final Object[] lhsArr = DimensionHandlerUtils.convertToArray(lhsObj, dimensionType.getElementType());
         final Object[] rhsArr = DimensionHandlerUtils.convertToArray(rhsObj, dimensionType.getElementType());
         dimCompare = dimensionType.getNullableStrategy().compare(lhsArr, rhsArr);
+      } else if (DimensionComparisonUtils.isNaturalComparator(dimensionType.getType(), comparator)) {
+        dimCompare = DimensionHandlerUtils.compareObjectsAsType(lhsObj, rhsObj, dimensionType);
       } else {
         dimCompare = comparator.compare((String) lhsObj, (String) rhsObj);
       }
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuilding.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuilding.java
deleted file mode 100644
index a8f16d2..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuilding.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae;
-
-import it.unimi.dsi.fastutil.Hash;
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import it.unimi.dsi.fastutil.objects.Object2IntOpenCustomHashMap;
-import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.segment.DimensionDictionary;
-import org.apache.druid.segment.column.TypeSignature;
-import org.apache.druid.segment.column.ValueType;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Utilities for parts of the groupBy engine that need to build dictionaries.
- */
-public class DictionaryBuilding
-{
-  // Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
-  private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Long.BYTES * 5 + Integer.BYTES;
-
-  /**
-   * Creates a forward dictionary (dictionary ID -> value).
-   */
-  public static <T> List<T> createDictionary()
-  {
-    return new ArrayList<>();
-  }
-
-  /**
-   * Creates a reverse dictionary (value -> dictionary ID). If a value is not present in the reverse dictionary,
-   * {@link Object2IntMap#getInt} will return {@link DimensionDictionary#ABSENT_VALUE_ID}.
-   *
-   * WARNING: This assumes that the .hashCode and the .equals of the method are implemented correctly. This does not
-   * apply for primitive array types, which donot consider new Object[]{1L, 2L} = new Object[]{1, 2}. For such objects,
-   * (especially arrays), a custom hash strategy must be passed.
-   */
-  public static <T> Object2IntMap<T> createReverseDictionary()
-  {
-    final Object2IntOpenHashMap<T> m = new Object2IntOpenHashMap<>();
-    m.defaultReturnValue(DimensionDictionary.ABSENT_VALUE_ID);
-    return m;
-  }
-
-  private static <T> Object2IntMap<T> createReverseDictionary(final Hash.Strategy<T> hashStrategy)
-  {
-    final Object2IntOpenCustomHashMap<T> m = new Object2IntOpenCustomHashMap<>(hashStrategy);
-    m.defaultReturnValue(DimensionDictionary.ABSENT_VALUE_ID);
-    return m;
-  }
-
-  /**
-   * Creates a reverse dictionary for arrays of primitive types.
-   */
-  public static Object2IntMap<Object[]> createReverseDictionaryForPrimitiveArray(TypeSignature<ValueType> arrayType)
-  {
-    if (!arrayType.isPrimitiveArray()) {
-      throw DruidException.defensive("Dictionary building function expected an array of a primitive type");
-    }
-    return createReverseDictionary(new Hash.Strategy<Object[]>()
-    {
-      @Override
-      public int hashCode(Object[] o)
-      {
-        // We don't do a deep comparison, because the array type is primitive, therefore we don't need to incur the extra
-        // overhead of checking the nestings
-        return Arrays.hashCode(o);
-      }
-
-      @Override
-      public boolean equals(Object[] a, Object[] b)
-      {
-        return arrayType.getNullableStrategy().compare(a, b) == 0;
-      }
-    });
-  }
-
-  /**
-   * Estimated footprint of a new entry.
-   */
-  public static int estimateEntryFootprint(final int valueFootprint)
-  {
-    return valueFootprint + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuildingUtils.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuildingUtils.java
new file mode 100644
index 0000000..243aedf
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuildingUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae;
+
+import it.unimi.dsi.fastutil.Hash;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenCustomHashMap;
+import org.apache.druid.segment.DimensionDictionary;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities for parts of the groupBy engine that need to build dictionaries.
+ */
+public class DictionaryBuildingUtils
+{
+  // Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
+  private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Long.BYTES * 5 + Integer.BYTES;
+
+  /**
+   * Creates a forward dictionary (dictionary ID -> value).
+   */
+  public static <T> List<T> createDictionary()
+  {
+    return new ArrayList<>();
+  }
+
+  /**
+   * Create reverse dictionary (value -> dictionary ID) that relies on the given {@link Hash.Strategy} for
+   * hashing and comparing equality. It explicitly requires a hashing strategy, so that callers are aware of the
+   * correct implementation of the .hashCode and the .equals method used to store and address the objects
+   *
+   * If a value is not present in the reverse dictionary, {@link Object2IntMap#getInt} will
+   * return {@link DimensionDictionary#ABSENT_VALUE_ID}.
+   *
+   * The object's {@link org.apache.druid.segment.column.NullableTypeStrategy} is often enough to create a reverse
+   * dictionary for those objects
+   */
+  public static <T> Object2IntMap<T> createReverseDictionary(final Hash.Strategy<T> hashStrategy)
+  {
+    final Object2IntOpenCustomHashMap<T> m = new Object2IntOpenCustomHashMap<>(hashStrategy);
+    m.defaultReturnValue(DimensionDictionary.ABSENT_VALUE_ID);
+    return m;
+  }
+
+  /**
+   * Estimated footprint of a new entry.
+   */
+  public static int estimateEntryFootprint(final int valueFootprint)
+  {
+    return valueFootprint + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByColumnSelectorStrategyFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByColumnSelectorStrategyFactory.java
new file mode 100644
index 0000000..53ae47d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByColumnSelectorStrategyFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
+import org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingGroupByColumnSelectorStrategy;
+import org.apache.druid.query.groupby.epinephelinae.column.FixedWidthGroupByColumnSelectorStrategy;
+import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
+import org.apache.druid.query.groupby.epinephelinae.column.KeyMappingMultiValueGroupByColumnSelectorStrategy;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+
+/**
+ * Creates {@link org.apache.druid.query.dimension.ColumnSelectorStrategy}s for grouping dimensions
+ * If the type is STRING, then it delegates the group by handling to {@link KeyMappingMultiValueGroupByColumnSelectorStrategy}
+ * which is specialized for {@link DimensionSelector}s and multi-value dimensions.
+ * If the type is numeric, then it delegates the handling to the {@link FixedWidthGroupByColumnSelectorStrategy}
+ * Else, it delegates the handling to {@link DictionaryBuildingGroupByColumnSelectorStrategy} which is a generic strategy
+ * and builds dictionaries on the fly.
+ */
+public class GroupByColumnSelectorStrategyFactory implements ColumnSelectorStrategyFactory<GroupByColumnSelectorStrategy>
+{
+  @Override
+  public GroupByColumnSelectorStrategy makeColumnSelectorStrategy(
+      ColumnCapabilities capabilities,
+      ColumnValueSelector selector,
+      String dimension
+  )
+  {
+    if (capabilities == null || capabilities.getType() == null) {
+      throw DruidException.defensive("Unable to deduce type for the grouping dimension");
+    }
+    try {
+      if (!capabilities.toColumnType().getNullableStrategy().groupable()) {
+        // InvalidInput because the SQL planner would have already flagged these dimensions, therefore this will only happen
+        // if native queries have been submitted.
+        throw InvalidInput.exception(
+            "Unable to group on the column[%s] with type[%s]",
+            dimension,
+            capabilities.toColumnType()
+        );
+      }
+    }
+    catch (Exception e) {
+      throw InvalidInput.exception(e, "Unable to group on the column[%s]", dimension);
+    }
+
+    switch (capabilities.getType()) {
+      case STRING:
+        return KeyMappingMultiValueGroupByColumnSelectorStrategy.create(capabilities, (DimensionSelector) selector);
+      case LONG:
+        return new FixedWidthGroupByColumnSelectorStrategy<>(
+            Byte.BYTES + Long.BYTES,
+            ColumnType.LONG,
+            ColumnValueSelector::getLong,
+            ColumnValueSelector::isNull
+        );
+      case FLOAT:
+        return new FixedWidthGroupByColumnSelectorStrategy<>(
+            Byte.BYTES + Float.BYTES,
+            ColumnType.FLOAT,
+            ColumnValueSelector::getFloat,
+            ColumnValueSelector::isNull
+        );
+      case DOUBLE:
+        return new FixedWidthGroupByColumnSelectorStrategy<>(
+            Byte.BYTES + Double.BYTES,
+            ColumnType.DOUBLE,
+            ColumnValueSelector::getDouble,
+            ColumnValueSelector::isNull
+        );
+      case ARRAY:
+        switch (capabilities.getElementType().getType()) {
+          case LONG:
+          case STRING:
+          case DOUBLE:
+            return DictionaryBuildingGroupByColumnSelectorStrategy.forType(capabilities.toColumnType());
+          case FLOAT:
+            // Array<Float> not supported in expressions, ingestion
+          default:
+            throw new IAE("Cannot create query type helper from invalid type [%s]", capabilities.asTypeString());
+
+        }
+      case COMPLEX:
+        return DictionaryBuildingGroupByColumnSelectorStrategy.forType(capabilities.toColumnType());
+      default:
+        throw new IAE("Cannot create query type helper from invalid type [%s]", capabilities.asTypeString());
+    }
+  }
+
+  @Override
+  public boolean supportsComplexTypes()
+  {
+    return true;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java
index eff19df..085e602 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java
@@ -33,7 +33,6 @@
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.aggregation.AggregatorAdapters;
 import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.groupby.GroupByQuery;
@@ -41,23 +40,13 @@
 import org.apache.druid.query.groupby.GroupByQueryMetrics;
 import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.column.ArrayDoubleGroupByColumnSelectorStrategy;
-import org.apache.druid.query.groupby.epinephelinae.column.ArrayLongGroupByColumnSelectorStrategy;
-import org.apache.druid.query.groupby.epinephelinae.column.ArrayStringGroupByColumnSelectorStrategy;
-import org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingStringGroupByColumnSelectorStrategy;
-import org.apache.druid.query.groupby.epinephelinae.column.DoubleGroupByColumnSelectorStrategy;
-import org.apache.druid.query.groupby.epinephelinae.column.FloatGroupByColumnSelectorStrategy;
 import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorPlus;
 import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
-import org.apache.druid.query.groupby.epinephelinae.column.LongGroupByColumnSelectorStrategy;
-import org.apache.druid.query.groupby.epinephelinae.column.NullableNumericGroupByColumnSelectorStrategy;
-import org.apache.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy;
 import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
 import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.query.ordering.StringComparator;
 import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.DimensionSelector;
@@ -95,7 +84,7 @@
  */
 public class GroupByQueryEngine
 {
-  private static final GroupByStrategyFactory STRATEGY_FACTORY = new GroupByStrategyFactory();
+  private static final GroupByColumnSelectorStrategyFactory STRATEGY_FACTORY = new GroupByColumnSelectorStrategyFactory();
 
   private GroupByQueryEngine()
   {
@@ -145,7 +134,7 @@
                       curPos,
                       query.getResultRowDimensionStart() + i
                   );
-                  curPos += dims[i].getColumnSelectorStrategy().getGroupingKeySize();
+                  curPos += dims[i].getColumnSelectorStrategy().getGroupingKeySizeBytes();
                 }
 
                 final int cardinalityForArrayAggregation = GroupingEngine.getCardinalityForArrayAggregation(
@@ -242,57 +231,6 @@
             });
   }
 
-  private static class GroupByStrategyFactory implements ColumnSelectorStrategyFactory<GroupByColumnSelectorStrategy>
-  {
-    @Override
-    public GroupByColumnSelectorStrategy makeColumnSelectorStrategy(
-        ColumnCapabilities capabilities,
-        ColumnValueSelector selector
-    )
-    {
-      switch (capabilities.getType()) {
-        case STRING:
-          DimensionSelector dimSelector = (DimensionSelector) selector;
-          if (dimSelector.getValueCardinality() >= 0) {
-            return new StringGroupByColumnSelectorStrategy(dimSelector::lookupName, capabilities);
-          } else {
-            return new DictionaryBuildingStringGroupByColumnSelectorStrategy();
-          }
-        case LONG:
-          return makeNullableNumericStrategy(new LongGroupByColumnSelectorStrategy());
-        case FLOAT:
-          return makeNullableNumericStrategy(new FloatGroupByColumnSelectorStrategy());
-        case DOUBLE:
-          return makeNullableNumericStrategy(new DoubleGroupByColumnSelectorStrategy());
-        case ARRAY:
-          switch (capabilities.getElementType().getType()) {
-            case LONG:
-              return new ArrayLongGroupByColumnSelectorStrategy();
-            case STRING:
-              return new ArrayStringGroupByColumnSelectorStrategy();
-            case DOUBLE:
-              return new ArrayDoubleGroupByColumnSelectorStrategy();
-            case FLOAT:
-              // Array<Float> not supported in expressions, ingestion
-            default:
-              throw new IAE("Cannot create query type helper from invalid type [%s]", capabilities.asTypeString());
-
-          }
-        default:
-          throw new IAE("Cannot create query type helper from invalid type [%s]", capabilities.asTypeString());
-      }
-    }
-
-    private GroupByColumnSelectorStrategy makeNullableNumericStrategy(GroupByColumnSelectorStrategy delegate)
-    {
-      if (NullHandling.sqlCompatible()) {
-        return new NullableNumericGroupByColumnSelectorStrategy(delegate);
-      } else {
-        return delegate;
-      }
-    }
-  }
-
   private abstract static class GroupByEngineIterator<KeyType> implements Iterator<ResultRow>, Closeable
   {
     protected final GroupByQuery query;
@@ -846,7 +784,7 @@
       this.dims = dims;
       int keySize = 0;
       for (GroupByColumnSelectorPlus selectorPlus : dims) {
-        keySize += selectorPlus.getColumnSelectorStrategy().getGroupingKeySize();
+        keySize += selectorPlus.getColumnSelectorStrategy().getGroupingKeySizeBytes();
       }
       this.keySize = keySize;
 
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index e98af6a..491c28d 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -89,8 +89,10 @@
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -743,7 +745,8 @@
     @Override
     public InputRawSupplierColumnSelectorStrategy makeColumnSelectorStrategy(
         ColumnCapabilities capabilities,
-        ColumnValueSelector selector
+        ColumnValueSelector selector,
+        String dimension
     )
     {
       switch (capabilities.getType()) {
@@ -759,28 +762,20 @@
           return (InputRawSupplierColumnSelectorStrategy<BaseDoubleColumnValueSelector>)
               columnSelector -> () -> columnSelector.isNull() ? null : columnSelector.getDouble();
         case ARRAY:
-          switch (capabilities.getElementType().getType()) {
-            case STRING:
-              return (InputRawSupplierColumnSelectorStrategy<ColumnValueSelector>)
-                  columnSelector ->
-                      () -> DimensionHandlerUtils.coerceToStringArray(columnSelector.getObject());
-            case FLOAT:
-            case LONG:
-            case DOUBLE:
-              return (InputRawSupplierColumnSelectorStrategy<ColumnValueSelector>)
-                  columnSelector ->
-                      () -> DimensionHandlerUtils.convertToArray(columnSelector.getObject(),
-                                                                 capabilities.getElementType());
-            default:
-              throw new IAE(
-                  "Cannot create query type helper from invalid type [%s]",
-                  capabilities.asTypeString()
-              );
-          }
+        case COMPLEX:
+          return (InputRawSupplierColumnSelectorStrategy<ColumnValueSelector>)
+              columnSelector ->
+                  () -> DimensionHandlerUtils.convertObjectToType(columnSelector.getObject(), capabilities.toColumnType());
         default:
           throw new IAE("Cannot create query type helper from invalid type [%s]", capabilities.asTypeString());
       }
     }
+
+    @Override
+    public boolean supportsComplexTypes()
+    {
+      return true;
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -1157,7 +1152,7 @@
 
   static long estimateStringKeySize(@Nullable String key)
   {
-    return DictionaryBuilding.estimateEntryFootprint((key == null ? 0 : key.length()) * Character.BYTES);
+    return DictionaryBuildingUtils.estimateEntryFootprint((key == null ? 0 : key.length()) * Character.BYTES);
   }
 
   private static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedGrouperHelper.RowBasedKey>
@@ -1172,7 +1167,6 @@
     private final BufferComparator[] serdeHelperComparators;
     private final DefaultLimitSpec limitSpec;
     private final List<ColumnType> valueTypes;
-
     private final boolean enableRuntimeDictionaryGeneration;
 
     private final List<String> dictionary;
@@ -1190,6 +1184,8 @@
     private final List<Object[]> doubleArrayDictionary;
     private final Object2IntMap<Object[]> reverseDoubleArrayDictionary;
 
+    private final Map<String, List<Object>> genericDictionaries = new HashMap<>();
+    private final Map<String, Object2IntMap<Object>> genericReverseDictionaries = new HashMap<>();
 
     // Size limiting for the dictionary, in (roughly estimated) bytes.
     private final long maxDictionarySize;
@@ -1219,20 +1215,21 @@
       this.valueTypes = valueTypes;
       this.limitSpec = limitSpec;
       this.enableRuntimeDictionaryGeneration = dictionary == null;
-      this.dictionary = enableRuntimeDictionaryGeneration ? DictionaryBuilding.createDictionary() : dictionary;
-      this.reverseDictionary = DictionaryBuilding.createReverseDictionary();
 
-      this.stringArrayDictionary = DictionaryBuilding.createDictionary();
-      this.reverseStringArrayDictionary = DictionaryBuilding.createReverseDictionaryForPrimitiveArray(ColumnType.STRING_ARRAY);
+      this.dictionary = enableRuntimeDictionaryGeneration ? DictionaryBuildingUtils.createDictionary() : dictionary;
+      this.reverseDictionary = DictionaryBuildingUtils.createReverseDictionary(ColumnType.STRING.getNullableStrategy());
 
-      this.longArrayDictionary = DictionaryBuilding.createDictionary();
-      this.reverseLongArrayDictionary = DictionaryBuilding.createReverseDictionaryForPrimitiveArray(ColumnType.LONG_ARRAY);
+      this.stringArrayDictionary = DictionaryBuildingUtils.createDictionary();
+      this.reverseStringArrayDictionary = DictionaryBuildingUtils.createReverseDictionary(ColumnType.STRING_ARRAY.getNullableStrategy());
 
-      this.floatArrayDictionary = DictionaryBuilding.createDictionary();
-      this.reverseFloatArrayDictionary = DictionaryBuilding.createReverseDictionaryForPrimitiveArray(ColumnType.FLOAT_ARRAY);
+      this.longArrayDictionary = DictionaryBuildingUtils.createDictionary();
+      this.reverseLongArrayDictionary = DictionaryBuildingUtils.createReverseDictionary(ColumnType.LONG_ARRAY.getNullableStrategy());
 
-      this.doubleArrayDictionary = DictionaryBuilding.createDictionary();
-      this.reverseDoubleArrayDictionary = DictionaryBuilding.createReverseDictionaryForPrimitiveArray(ColumnType.DOUBLE_ARRAY);
+      this.floatArrayDictionary = DictionaryBuildingUtils.createDictionary();
+      this.reverseFloatArrayDictionary = DictionaryBuildingUtils.createReverseDictionary(ColumnType.FLOAT_ARRAY.getNullableStrategy());
+
+      this.doubleArrayDictionary = DictionaryBuildingUtils.createDictionary();
+      this.reverseDoubleArrayDictionary = DictionaryBuildingUtils.createReverseDictionary(ColumnType.DOUBLE_ARRAY.getNullableStrategy());
 
       this.maxDictionarySize = maxDictionarySize;
       this.serdeHelpers = makeSerdeHelpers(limitSpec != null, enableRuntimeDictionaryGeneration);
@@ -1388,6 +1385,8 @@
         reverseFloatArrayDictionary.clear();
         longArrayDictionary.clear();
         reverseLongArrayDictionary.clear();
+        genericDictionaries.clear();
+        genericReverseDictionaries.clear();
         rankOfDictionaryIds = null;
         currentEstimatedSize = 0;
       }
@@ -1443,6 +1442,12 @@
     )
     {
       switch (valueType.getType()) {
+        case COMPLEX:
+          if (stringComparator != null
+              && !DimensionComparisonUtils.isNaturalComparator(valueType.getType(), stringComparator)) {
+            throw DruidException.defensive("Unexpected string comparator supplied");
+          }
+          return new GenericRowBasedKeySerdeHelper(keyBufferPosition, valueType);
         case ARRAY:
           switch (valueType.getElementType().getType()) {
             case STRING:
@@ -1531,11 +1536,123 @@
       }
     }
 
-    private class ArrayNumericRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
+    private abstract class DictionaryBuildingSingleValuedRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
     {
-      final int keyBufferPosition;
+      private final int keyBufferPosition;
+
+      public DictionaryBuildingSingleValuedRowBasedKeySerdeHelper(final int keyBufferPosition)
+      {
+        this.keyBufferPosition = keyBufferPosition;
+      }
+
+      @Override
+      public int getKeyBufferValueSize()
+      {
+        return Integer.BYTES;
+      }
+
+      @Override
+      public boolean putToKeyBuffer(RowBasedKey key, int idx)
+      {
+        final Object obj = key.getKey()[idx];
+        int id = getReverseDictionary().getInt(obj);
+        if (id == DimensionDictionary.ABSENT_VALUE_ID) {
+          id = getDictionary().size();
+          getReverseDictionary().put(obj, id);
+          getDictionary().add(obj);
+        }
+        keyBuffer.putInt(id);
+        return true;
+      }
+
+      @Override
+      public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
+      {
+        dimValues[dimValIdx] = getDictionary().get(buffer.getInt(initialOffset + keyBufferPosition));
+      }
+
+      /**
+       * Raw type used because arrays and object dictionaries differ
+       */
+      @SuppressWarnings("rawtypes")
+      public abstract List getDictionary();
+
+      /**
+       * Raw types used because arrays and object dictionaries differ
+       */
+      @SuppressWarnings("rawtypes")
+      public abstract Object2IntMap getReverseDictionary();
+    }
+
+    private class GenericRowBasedKeySerdeHelper extends DictionaryBuildingSingleValuedRowBasedKeySerdeHelper
+    {
       final BufferComparator bufferComparator;
-      final TypeSignature<ValueType> elementType;
+      final String columnTypeName;
+
+      final List<Object> dictionary;
+      final Object2IntMap<Object> reverseDictionary;
+
+      public GenericRowBasedKeySerdeHelper(
+          int keyBufferPosition,
+          ColumnType columnType
+      )
+      {
+        super(keyBufferPosition);
+        validateColumnType(columnType);
+        this.columnTypeName = columnType.asTypeString();
+        this.dictionary = genericDictionaries.computeIfAbsent(
+            columnTypeName,
+            ignored -> DictionaryBuildingUtils.createDictionary()
+        );
+        this.reverseDictionary = genericReverseDictionaries.computeIfAbsent(
+            columnTypeName,
+            ignored -> DictionaryBuildingUtils.createReverseDictionary(columnType.getNullableStrategy())
+        );
+        this.bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) ->
+            columnType.getNullableStrategy().compare(
+                dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)),
+                dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition))
+            );
+      }
+
+      // Asserts that we don't entertain any complex types without a typename, to prevent intermixing dictionaries of
+      // different types.
+      private void validateColumnType(TypeSignature<ValueType> columnType)
+      {
+        if (columnType.isArray()) {
+          validateColumnType(columnType.getElementType());
+        } else if (columnType.is(ValueType.COMPLEX)) {
+          if (columnType.getComplexTypeName() == null) {
+            throw DruidException.defensive("complex type name expected");
+          }
+        }
+      }
+
+      @Override
+      public BufferComparator getBufferComparator()
+      {
+        return bufferComparator;
+      }
+
+      @Override
+      public List<Object> getDictionary()
+      {
+        return dictionary;
+      }
+
+      @Override
+      public Object2IntMap<Object> getReverseDictionary()
+      {
+        return reverseDictionary;
+      }
+    }
+
+
+    private class ArrayNumericRowBasedKeySerdeHelper extends DictionaryBuildingSingleValuedRowBasedKeySerdeHelper
+    {
+      private final BufferComparator bufferComparator;
+      private final List<Object[]> dictionary;
+      private final Object2IntMap<Object[]> reverseDictionary;
 
       public ArrayNumericRowBasedKeySerdeHelper(
           int keyBufferPosition,
@@ -1543,20 +1660,22 @@
           ColumnType arrayType
       )
       {
-        this.keyBufferPosition = keyBufferPosition;
-        this.elementType = arrayType.getElementType();
+        super(keyBufferPosition);
+        final TypeSignature<ValueType> elementType = arrayType.getElementType();
+        this.dictionary = getDictionaryForType(elementType);
+        this.reverseDictionary = getReverseDictionaryForType(elementType);
         this.bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
           if (stringComparator == null
               || StringComparators.NUMERIC.equals(stringComparator)
               || StringComparators.NATURAL.equals(stringComparator)) {
             return arrayType.getNullableStrategy().compare(
-                getDictionaryForType(elementType).get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)),
-                getDictionaryForType(elementType).get(rhsBuffer.getInt(rhsPosition + keyBufferPosition))
+                this.dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)),
+                this.dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition))
             );
           } else {
             return new DimensionComparisonUtils.ArrayComparatorForUnnaturalStringComparator(stringComparator).compare(
-                getDictionaryForType(elementType).get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)),
-                getDictionaryForType(elementType).get(rhsBuffer.getInt(rhsPosition + keyBufferPosition))
+                this.dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)),
+                this.dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition))
             );
           }
         };
@@ -1590,43 +1709,27 @@
         }
       }
 
-
-      @Override
-      public int getKeyBufferValueSize()
-      {
-        return Integer.BYTES;
-      }
-
-      @Override
-      public boolean putToKeyBuffer(RowBasedKey key, int idx)
-      {
-        final Object[] listArray = (Object[]) key.getKey()[idx];
-        int id = getReverseDictionaryForType(elementType).getInt(listArray);
-        if (id == DimensionDictionary.ABSENT_VALUE_ID) {
-          id = getDictionaryForType(elementType).size();
-          getReverseDictionaryForType(elementType).put(listArray, id);
-          getDictionaryForType(elementType).add(listArray);
-        }
-        keyBuffer.putInt(id);
-        return true;
-      }
-
-      @Override
-      public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
-      {
-        dimValues[dimValIdx] = getDictionaryForType(elementType).get(buffer.getInt(initialOffset + keyBufferPosition));
-      }
-
       @Override
       public BufferComparator getBufferComparator()
       {
         return bufferComparator;
       }
+
+      @Override
+      public List<Object[]> getDictionary()
+      {
+        return dictionary;
+      }
+
+      @Override
+      public Object2IntMap<Object[]> getReverseDictionary()
+      {
+        return reverseDictionary;
+      }
     }
 
-    private class ArrayStringRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
+    private class ArrayStringRowBasedKeySerdeHelper extends DictionaryBuildingSingleValuedRowBasedKeySerdeHelper
     {
-      final int keyBufferPosition;
       final BufferComparator bufferComparator;
 
       ArrayStringRowBasedKeySerdeHelper(
@@ -1634,9 +1737,10 @@
           @Nullable StringComparator stringComparator
       )
       {
-        this.keyBufferPosition = keyBufferPosition;
+        super(keyBufferPosition);
         bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) ->
-            new DimensionComparisonUtils.ArrayComparator<String>(stringComparator == null ? StringComparators.LEXICOGRAPHIC : stringComparator)
+            new DimensionComparisonUtils.ArrayComparator<>(
+                stringComparator == null ? StringComparators.LEXICOGRAPHIC : stringComparator)
                 .compare(
                     stringArrayDictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)),
                     stringArrayDictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition))
@@ -1650,38 +1754,21 @@
       }
 
       @Override
-      public boolean putToKeyBuffer(RowBasedKey key, int idx)
-      {
-        Object[] stringArray = (Object[]) key.getKey()[idx];
-        final int id = addToArrayDictionary(stringArray);
-        if (id < 0) {
-          return false;
-        }
-        keyBuffer.putInt(id);
-        return true;
-      }
-
-      @Override
-      public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
-      {
-        dimValues[dimValIdx] = stringArrayDictionary.get(buffer.getInt(initialOffset + keyBufferPosition));
-      }
-
-      @Override
       public BufferComparator getBufferComparator()
       {
         return bufferComparator;
       }
 
-      private int addToArrayDictionary(final Object[] s)
+      @Override
+      public List<Object[]> getDictionary()
       {
-        int idx = reverseStringArrayDictionary.getInt(s);
-        if (idx == DimensionDictionary.ABSENT_VALUE_ID) {
-          idx = stringArrayDictionary.size();
-          reverseStringArrayDictionary.put(s, idx);
-          stringArrayDictionary.add(s);
-        }
-        return idx;
+        return stringArrayDictionary;
+      }
+
+      @Override
+      public Object2IntMap<Object[]> getReverseDictionary()
+      {
+        return reverseStringArrayDictionary;
       }
     }
 
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategy.java
deleted file mode 100644
index e1a7e94..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.ValueType;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class ArrayDoubleGroupByColumnSelectorStrategy extends ArrayNumericGroupByColumnSelectorStrategy
-{
-  public ArrayDoubleGroupByColumnSelectorStrategy()
-  {
-    super(Double.BYTES, ColumnType.DOUBLE_ARRAY);
-  }
-
-  @Override
-  protected int computeDictionaryId(ColumnValueSelector selector)
-  {
-    Object object = selector.getObject();
-    if (object == null) {
-      return GROUP_BY_MISSING_VALUE;
-    } else if (object instanceof Double) {
-      return addToIndexedDictionary(new Object[]{object});
-    } else if (object instanceof List) {
-      return addToIndexedDictionary(((List) object).toArray());
-    } else if (object instanceof Double[]) {
-      // Defensive check, since we don't usually expect to encounter Double[] objects from selectors
-      return addToIndexedDictionary(Arrays.stream((Double[]) object).toArray());
-    } else if (object instanceof Object[]) {
-      return addToIndexedDictionary((Object[]) object);
-    } else {
-      throw new ISE("Found unexpected object type [%s] in %s array.", object.getClass().getName(), ValueType.DOUBLE);
-    }
-  }
-}
-
-
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategy.java
deleted file mode 100644
index 49cd91b..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.ValueType;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class ArrayLongGroupByColumnSelectorStrategy extends ArrayNumericGroupByColumnSelectorStrategy
-{
-  public ArrayLongGroupByColumnSelectorStrategy()
-  {
-    super(Long.BYTES, ColumnType.LONG_ARRAY);
-  }
-
-  @Override
-  protected int computeDictionaryId(ColumnValueSelector selector)
-  {
-    Object object = selector.getObject();
-    if (object == null) {
-      return GROUP_BY_MISSING_VALUE;
-    } else if (object instanceof Long) {
-      return addToIndexedDictionary(new Object[]{object});
-    } else if (object instanceof List) {
-      return addToIndexedDictionary(((List) object).toArray());
-    } else if (object instanceof Long[]) {
-      // Defensive check, since we don't usually expect to encounter Long[] objects from selectors
-      return addToIndexedDictionary(Arrays.stream((Long[]) object).toArray());
-    } else if (object instanceof Object[]) {
-      return addToIndexedDictionary((Object[]) object);
-    } else {
-      throw new ISE("Found unexpected object type [%s] in %s array.", object.getClass().getName(), ValueType.LONG);
-    }
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayNumericGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayNumericGroupByColumnSelectorStrategy.java
deleted file mode 100644
index 62c4798..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayNumericGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import com.google.common.annotations.VisibleForTesting;
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.ordering.StringComparator;
-import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.column.ColumnType;
-
-import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public abstract class ArrayNumericGroupByColumnSelectorStrategy
-    implements GroupByColumnSelectorStrategy
-{
-  protected static final int GROUP_BY_MISSING_VALUE = -1;
-
-  private final List<Object[]> dictionary;
-  private final Object2IntMap<Object[]> reverseDictionary;
-  private long estimatedFootprint = 0L;
-  private final int valueFootprint;
-
-  public ArrayNumericGroupByColumnSelectorStrategy(final int valueFootprint, final ColumnType arrayType)
-  {
-    this.dictionary = DictionaryBuilding.createDictionary();
-    this.reverseDictionary = DictionaryBuilding.createReverseDictionaryForPrimitiveArray(arrayType);
-    this.valueFootprint = valueFootprint;
-  }
-
-  @Override
-  public int getGroupingKeySize()
-  {
-    return Integer.BYTES;
-  }
-
-  @Override
-  public void processValueFromGroupingKey(
-      GroupByColumnSelectorPlus selectorPlus,
-      ByteBuffer key,
-      ResultRow resultRow,
-      int keyBufferPosition
-  )
-  {
-    final int id = key.getInt(keyBufferPosition);
-
-    // GROUP_BY_MISSING_VALUE is used to indicate empty rows, which are omitted from the result map.
-    if (id != GROUP_BY_MISSING_VALUE) {
-      final Object[] value = dictionary.get(id);
-      resultRow.set(selectorPlus.getResultRowPosition(), value);
-    } else {
-      resultRow.set(selectorPlus.getResultRowPosition(), null);
-    }
-  }
-
-  @Override
-  public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
-  {
-    final long priorFootprint = estimatedFootprint;
-    valuess[columnIndex] = computeDictionaryId(selector);
-    return (int) (estimatedFootprint - priorFootprint);
-  }
-
-  @Override
-  public void initGroupingKeyColumnValue(
-      int keyBufferPosition,
-      int dimensionIndex,
-      Object rowObj,
-      ByteBuffer keyBuffer,
-      int[] stack
-  )
-  {
-    final int groupingKey = (int) rowObj;
-    writeToKeyBuffer(keyBufferPosition, groupingKey, keyBuffer);
-    if (groupingKey == GROUP_BY_MISSING_VALUE) {
-      stack[dimensionIndex] = 0;
-    } else {
-      stack[dimensionIndex] = 1;
-    }
-
-  }
-
-  @Override
-  public boolean checkRowIndexAndAddValueToGroupingKey(
-      int keyBufferPosition,
-      Object rowObj,
-      int rowValIdx,
-      ByteBuffer keyBuffer
-  )
-  {
-    return false;
-  }
-
-  protected abstract int computeDictionaryId(ColumnValueSelector selector);
-
-  @Override
-  public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
-  {
-    final long priorFootprint = estimatedFootprint;
-
-    // computeDictionaryId updates estimatedFootprint
-    keyBuffer.putInt(keyBufferPosition, computeDictionaryId(selector));
-
-    return (int) (estimatedFootprint - priorFootprint);
-  }
-
-  protected int addToIndexedDictionary(Object[] t)
-  {
-    final int dictId = reverseDictionary.getInt(t);
-    if (dictId < 0) {
-      final int size = dictionary.size();
-      dictionary.add(t);
-      reverseDictionary.put(t, size);
-
-      // Footprint estimate: one pointer, one value per list entry.
-      estimatedFootprint += DictionaryBuilding.estimateEntryFootprint(t.length * (Long.BYTES + valueFootprint));
-      return size;
-    }
-    return dictId;
-  }
-
-  @Override
-  public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
-  {
-    StringComparator comparator = stringComparator == null ? StringComparators.NUMERIC : stringComparator;
-    return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
-      Object[] lhs = dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition));
-      Object[] rhs = dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition));
-
-      int minLength = Math.min(lhs.length, rhs.length);
-      //noinspection ArrayEquality
-      if (lhs == rhs) {
-        return 0;
-      } else {
-        for (int i = 0; i < minLength; i++) {
-          final Object left = lhs[i];
-          final Object right = rhs[i];
-          final int cmp;
-          if (left == null && right == null) {
-            cmp = 0;
-          } else if (left == null) {
-            cmp = -1;
-          } else {
-            cmp = comparator.compare(String.valueOf(left), String.valueOf(right));
-          }
-          if (cmp == 0) {
-            continue;
-          }
-          return cmp;
-        }
-        if (lhs.length == rhs.length) {
-          return 0;
-        } else if (lhs.length < rhs.length) {
-          return -1;
-        }
-        return 1;
-      }
-    };
-  }
-
-  @Override
-  public void reset()
-  {
-    dictionary.clear();
-    reverseDictionary.clear();
-    estimatedFootprint = 0;
-  }
-
-  @VisibleForTesting
-  void writeToKeyBuffer(int keyBufferPosition, int groupingKey, ByteBuffer keyBuffer)
-  {
-    keyBuffer.putInt(keyBufferPosition, groupingKey);
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategy.java
deleted file mode 100644
index 3754966..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.ordering.StringComparator;
-import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.column.ValueType;
-import org.apache.druid.segment.data.ComparableIntArray;
-
-import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class ArrayStringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
-{
-  private static final int GROUP_BY_MISSING_VALUE = -1;
-
-
-  // contains string <-> id for each element of the multi value grouping column
-  // for eg : [a,b,c] is the col value. dictionaryToInt will contain { a <-> 1, b <-> 2, c <-> 3}
-  private final BiMap<String, Integer> dictionaryToInt;
-
-  // stores each row as an integer array where the int represents the value in dictionaryToInt
-  // for eg : [a,b,c] would be converted to [1,2,3] and assigned a integer value 1.
-  // [1,2,3] <-> 1
-  private final BiMap<ComparableIntArray, Integer> intListToInt;
-
-  private long estimatedFootprint = 0L;
-
-  @Override
-  public int getGroupingKeySize()
-  {
-    return Integer.BYTES;
-  }
-
-  public ArrayStringGroupByColumnSelectorStrategy()
-  {
-    dictionaryToInt = HashBiMap.create();
-    intListToInt = HashBiMap.create();
-  }
-
-  @VisibleForTesting
-  ArrayStringGroupByColumnSelectorStrategy(
-      BiMap<String, Integer> dictionaryToInt,
-      BiMap<ComparableIntArray, Integer> intArrayToInt
-  )
-  {
-    this.dictionaryToInt = dictionaryToInt;
-    this.intListToInt = intArrayToInt;
-  }
-
-  @Override
-  public void processValueFromGroupingKey(
-      GroupByColumnSelectorPlus selectorPlus,
-      ByteBuffer key,
-      ResultRow resultRow,
-      int keyBufferPosition
-  )
-  {
-    final int id = key.getInt(keyBufferPosition);
-
-    // GROUP_BY_MISSING_VALUE is used to indicate empty rows
-    if (id != GROUP_BY_MISSING_VALUE) {
-      final int[] intRepresentation = intListToInt.inverse()
-                                                  .get(id)
-                                                  .getDelegate();
-      final Object[] stringRepresentaion = new Object[intRepresentation.length];
-      for (int i = 0; i < intRepresentation.length; i++) {
-        stringRepresentaion[i] = dictionaryToInt.inverse().get(intRepresentation[i]);
-      }
-      resultRow.set(selectorPlus.getResultRowPosition(), stringRepresentaion);
-    } else {
-      resultRow.set(selectorPlus.getResultRowPosition(), null);
-    }
-
-  }
-
-  @Override
-  public int initColumnValues(
-      ColumnValueSelector selector,
-      int columnIndex,
-      Object[] valuess
-  )
-  {
-    final long priorFootprint = estimatedFootprint;
-    final int groupingKey = computeDictionaryId(selector);
-    valuess[columnIndex] = groupingKey;
-    return (int) (estimatedFootprint - priorFootprint);
-  }
-
-  @Override
-  public void initGroupingKeyColumnValue(
-      int keyBufferPosition,
-      int dimensionIndex,
-      Object rowObj,
-      ByteBuffer keyBuffer,
-      int[] stack
-  )
-  {
-    final int groupingKey = (int) rowObj;
-    writeToKeyBuffer(keyBufferPosition, groupingKey, keyBuffer);
-    if (groupingKey == GROUP_BY_MISSING_VALUE) {
-      stack[dimensionIndex] = 0;
-    } else {
-      stack[dimensionIndex] = 1;
-    }
-  }
-
-  @Override
-  public boolean checkRowIndexAndAddValueToGroupingKey(
-      int keyBufferPosition,
-      Object rowObj,
-      int rowValIdx,
-      ByteBuffer keyBuffer
-  )
-  {
-    return false;
-  }
-
-  /**
-   * Compute dictionary ID for the given selector. Updates {@link #estimatedFootprint} as necessary.
-   */
-  @VisibleForTesting
-  int computeDictionaryId(ColumnValueSelector selector)
-  {
-    final int[] intRepresentation;
-    Object object = selector.getObject();
-    if (object == null) {
-      return GROUP_BY_MISSING_VALUE;
-    } else if (object instanceof String) {
-      intRepresentation = new int[1];
-      intRepresentation[0] = addToIndexedDictionary((String) object);
-    } else if (object instanceof List) {
-      final int size = ((List<?>) object).size();
-      intRepresentation = new int[size];
-      for (int i = 0; i < size; i++) {
-        intRepresentation[i] = addToIndexedDictionary((String) ((List<?>) object).get(i));
-      }
-    } else if (object instanceof String[]) {
-      final int size = ((String[]) object).length;
-      intRepresentation = new int[size];
-      for (int i = 0; i < size; i++) {
-        intRepresentation[i] = addToIndexedDictionary(((String[]) object)[i]);
-      }
-    } else if (object instanceof Object[]) {
-      final int size = ((Object[]) object).length;
-      intRepresentation = new int[size];
-      for (int i = 0; i < size; i++) {
-        intRepresentation[i] = addToIndexedDictionary((String) ((Object[]) object)[i]);
-      }
-    } else {
-      throw new ISE("Found unexpected object type [%s] in %s array.", object.getClass().getName(), ValueType.STRING);
-    }
-
-    final ComparableIntArray comparableIntArray = ComparableIntArray.of(intRepresentation);
-    final int dictId = intListToInt.getOrDefault(comparableIntArray, GROUP_BY_MISSING_VALUE);
-    if (dictId == GROUP_BY_MISSING_VALUE) {
-      final int nextId = intListToInt.keySet().size();
-      intListToInt.put(comparableIntArray, nextId);
-
-      // We're not using the dictionary and reverseDictionary from DictionaryBuilding, but the BiMap is close enough
-      // that we expect this footprint calculation to still be useful. (It doesn't have to be exact.)
-      estimatedFootprint +=
-          DictionaryBuilding.estimateEntryFootprint(comparableIntArray.getDelegate().length * Integer.BYTES);
-
-      return nextId;
-    } else {
-      return dictId;
-    }
-  }
-
-  private int addToIndexedDictionary(String value)
-  {
-    final Integer dictId = dictionaryToInt.get(value);
-    if (dictId == null) {
-      final int nextId = dictionaryToInt.size();
-      dictionaryToInt.put(value, nextId);
-
-      // We're not using the dictionary and reverseDictionary from DictionaryBuilding, but the BiMap is close enough
-      // that we expect this footprint calculation to still be useful. (It doesn't have to be exact.)
-      estimatedFootprint +=
-          DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
-
-      return nextId;
-    } else {
-      return dictId;
-    }
-  }
-
-  @Override
-  public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
-  {
-    final long priorFootprint = estimatedFootprint;
-
-    // computeDictionaryId updates estimatedFootprint
-    keyBuffer.putInt(keyBufferPosition, computeDictionaryId(selector));
-
-    return (int) (estimatedFootprint - priorFootprint);
-  }
-
-  @Override
-  public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
-  {
-    final StringComparator comparator = stringComparator == null ? StringComparators.LEXICOGRAPHIC : stringComparator;
-    return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
-      int[] lhs = intListToInt.inverse().get(lhsBuffer.getInt(lhsPosition + keyBufferPosition)).getDelegate();
-      int[] rhs = intListToInt.inverse().get(rhsBuffer.getInt(rhsPosition + keyBufferPosition)).getDelegate();
-
-      int minLength = Math.min(lhs.length, rhs.length);
-      //noinspection ArrayEquality
-      if (lhs == rhs) {
-        return 0;
-      } else {
-        for (int i = 0; i < minLength; i++) {
-          final int cmp = comparator.compare(
-              dictionaryToInt.inverse().get(lhs[i]),
-              dictionaryToInt.inverse().get(rhs[i])
-          );
-          if (cmp == 0) {
-            continue;
-          }
-          return cmp;
-        }
-        if (lhs.length == rhs.length) {
-          return 0;
-        } else if (lhs.length < rhs.length) {
-          return -1;
-        }
-        return 1;
-      }
-    };
-  }
-
-  @Override
-  public void reset()
-  {
-    dictionaryToInt.clear();
-    intListToInt.clear();
-    estimatedFootprint = 0;
-  }
-
-  @VisibleForTesting
-  void writeToKeyBuffer(int keyBufferPosition, int groupingKey, ByteBuffer keyBuffer)
-  {
-    keyBuffer.putInt(keyBufferPosition, groupingKey);
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingGroupByColumnSelectorStrategy.java
new file mode 100644
index 0000000..cf033ea
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingGroupByColumnSelectorStrategy.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.column;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.query.groupby.epinephelinae.DictionaryBuildingUtils;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Strategy for grouping dimensions which can have variable-width objects, and aren't backed by prebuilt dictionaries. It
+ * encapsulates the dictionary building logic, along with providing the implementations for dimension to dictionary id
+ * encoding-decoding.
+ * <p>
+ * This strategy can handle any dimension that can be addressed on a reverse-dictionary. Reverse dictionary uses
+ * a sorted map, rather than a hashmap.
+ * <p>
+ * This is the most expensive of all the strategies, and hence must be used only when other strategies aren't valid.
+ */
+@NotThreadSafe
+public class DictionaryBuildingGroupByColumnSelectorStrategy<DimensionType>
+    extends KeyMappingGroupByColumnSelectorStrategy<DimensionType>
+{
+
+  private DictionaryBuildingGroupByColumnSelectorStrategy(
+      DimensionIdCodec<DimensionType> dimensionIdCodec,
+      ColumnType columnType,
+      NullableTypeStrategy<DimensionType> nullableTypeStrategy,
+      DimensionType defaultValue
+  )
+  {
+    super(dimensionIdCodec, columnType, nullableTypeStrategy, defaultValue);
+  }
+
+  /**
+   * Creates an implementation of the strategy for the given type
+   */
+  public static GroupByColumnSelectorStrategy forType(final ColumnType columnType)
+  {
+    if (columnType.equals(ColumnType.STRING)) {
+      // String types are handled specially because they can have multi-value dimensions
+      throw DruidException.defensive("Should use special variant which handles multi-value dimensions");
+    } else if (
+      // Defensive check, primitives should be using a faster fixed-width strategy
+        columnType.equals(ColumnType.DOUBLE)
+        || columnType.equals(ColumnType.FLOAT)
+        || columnType.equals(ColumnType.LONG)) {
+      throw DruidException.defensive("Could used a fixed width strategy");
+    }
+
+    if (ColumnType.STRING_ARRAY.equals(columnType)) {
+      forStringArrays();
+    }
+
+    // Catch-all for all other types, that can only have single-valued dimensions
+    return forArrayAndComplexTypes(columnType);
+  }
+
+  /**
+   * Implemenatation of dictionary building strategy for types other than strings (since they can be multi-valued and need
+   * to be handled separately) and numeric primitives (since they can be handled by fixed-width strategy).
+   * This also means that we handle array and complex types here, which simplifies the generics a lot, as everything can be
+   * treated as Object in this class.
+   * <p>
+   * Also, there isn't any concept of multi-values here, therefore Dimension == DimensionHolderType == Object. We still
+   * homogenize rogue selectors which can return non-standard implementation of arrays (like Long[] for long arrays instead of
+   * Object[]) to what the callers would expect (i.e. Object[] in this case).
+   */
+  private static GroupByColumnSelectorStrategy forArrayAndComplexTypes(final ColumnType columnType)
+  {
+    return new DictionaryBuildingGroupByColumnSelectorStrategy<>(
+        new UniValueDimensionIdCodec(columnType.getNullableStrategy()),
+        columnType,
+        columnType.getNullableStrategy(),
+        null
+    );
+  }
+
+  private static GroupByColumnSelectorStrategy forStringArrays()
+  {
+    return new DictionaryBuildingGroupByColumnSelectorStrategy<>(
+        new StringArrayDimensionIdCodec(),
+        ColumnType.STRING_ARRAY,
+        ColumnType.STRING_ARRAY.getNullableStrategy(),
+        null
+    );
+  }
+
+  private static class UniValueDimensionIdCodec implements DimensionIdCodec<Object>
+  {
+    /**
+     * Dictionary for mapping the dimension value to an index. i-th position in the dictionary holds the value represented
+     * by the dictionaryId "i".
+     * Therefore, if a value has a dictionary id "i", dictionary.get(i) = value
+     * If a -> 0, b -> 1, c -> 2 (value -> dictionaryId), then the dictionary would be laid out like: [a, b, c]
+     */
+    private final List<Object> dictionary;
+
+    /**
+     * Reverse dictionary for faster lookup into the dictionary, and reusing pre-existing dictionary ids.
+     * <p>
+     * An entry of form (value, i) in the reverse dictionary represents that "value" is present at the i-th location in the
+     * {@link #dictionary}.
+     * Absence of mapping of a "value" (denoted by returning {@link GroupByColumnSelectorStrategy#GROUP_BY_MISSING_VALUE})
+     * represents that the value is absent in the dictionary
+     * If a -> 0, b -> 1, c -> 2 (value -> dictionaryId), then the reverse dictionary would have the entries (a, 0), (b, 1),
+     * (c, 2)
+     */
+    private final Object2IntMap<Object> reverseDictionary;
+
+    @SuppressWarnings("rawtypes")
+    private final NullableTypeStrategy nullableTypeStrategy;
+
+    public UniValueDimensionIdCodec(final NullableTypeStrategy nullableTypeStrategy)
+    {
+      this.dictionary = DictionaryBuildingUtils.createDictionary();
+      this.reverseDictionary = DictionaryBuildingUtils.createReverseDictionary(nullableTypeStrategy);
+      this.nullableTypeStrategy = nullableTypeStrategy;
+    }
+
+    @Override
+    public MemoryFootprint<Integer> lookupId(Object dimension)
+    {
+      int dictId = reverseDictionary.getInt(dimension);
+      int footprintIncrease = 0;
+      // Even if called again, then this is no-op
+      if (dictId < 0) {
+        final int size = dictionary.size();
+        dictionary.add(dimension);
+        reverseDictionary.put(dimension, size);
+        dictId = size;
+        // MultiValueHOlder is always expected to handle the type, once the coercion is complete
+        //noinspection unchecked
+        footprintIncrease = DictionaryBuildingUtils.estimateEntryFootprint(
+            nullableTypeStrategy.estimateSizeBytes(dimension)
+        );
+      }
+      return new MemoryFootprint<>(dictId, footprintIncrease);
+    }
+
+    @Override
+    public Object idToKey(int id)
+    {
+      if (id >= dictionary.size()) {
+        throw DruidException.defensive("Unknown dictionary id [%d]", id);
+      }
+      // No need to handle GROUP_BY_MISSING_VALUE, by contract
+      return dictionary.get(id);
+    }
+
+    @Override
+    public boolean canCompareIds()
+    {
+      // Dictionaries are built on the fly, and ids are assigned in the order in which the value is added to the
+      // dictionary.
+      return false;
+    }
+
+    @Override
+    public void reset()
+    {
+      dictionary.clear();
+      reverseDictionary.clear();
+    }
+  }
+
+  /**
+   * {@link DimensionIdCodec} for string arrays. Dictionary building for string arrays is optimised to have a dual
+   * dictionary - one that maps the string values to an id, and another which maps an array of these ids, to the returned
+   * dictionary id. This reduces the amount of heap memory required to build the dictionaries
+   */
+  private static class StringArrayDimensionIdCodec implements DimensionIdCodec<Object>
+  {
+    // contains string <-> id for each element of the multi value grouping column
+    // for eg : [a,b,c] is the col value. dictionaryToInt will contain { a <-> 1, b <-> 2, c <-> 3}
+    private final BiMap<String, Integer> elementBiDictionary = HashBiMap.create();
+
+    // stores each row as an integer array where the int represents the value in dictionaryToInt
+    // for eg : [a,b,c] would be converted to [1,2,3] and assigned a integer value 1.
+    // [1,2,3] <-> 1
+    private final BiMap<ArrayList<Integer>, Integer> arrayBiDictionary = HashBiMap.create();
+
+    @Override
+    public MemoryFootprint<Integer> lookupId(Object dimension)
+    {
+      // dimension IS non-null, by contract of this method
+      Object[] stringArray = (Object[]) dimension;
+      ArrayList<Integer> dictionaryEncodedStringArray = new ArrayList<>();
+      int estimatedFootprint = 0;
+      for (Object element : stringArray) {
+        String elementCasted = (String) element;
+        Integer elementDictId = elementBiDictionary.get(elementCasted);
+        if (elementDictId == null) {
+          elementDictId = elementBiDictionary.size();
+          elementBiDictionary.put(elementCasted, elementDictId);
+          // We're not using the dictionary and reverseDictionary from DictionaryBuilding, but the BiMap is close enough
+          // that we expect this footprint calculation to still be useful.
+          estimatedFootprint +=
+              DictionaryBuildingUtils.estimateEntryFootprint(elementCasted == null
+                                                             ? 0
+                                                             : elementCasted.length() * Character.BYTES);
+        }
+        dictionaryEncodedStringArray.add(elementDictId);
+      }
+
+      Integer arrayDictId = arrayBiDictionary.get(dictionaryEncodedStringArray);
+      if (arrayDictId == null) {
+        arrayDictId = arrayBiDictionary.size();
+        arrayBiDictionary.put(dictionaryEncodedStringArray, arrayDictId);
+        estimatedFootprint +=
+            DictionaryBuildingUtils.estimateEntryFootprint(dictionaryEncodedStringArray.size() * Integer.BYTES);
+      }
+      return new MemoryFootprint<>(arrayDictId, estimatedFootprint);
+    }
+
+    @Override
+    public Object idToKey(int id)
+    {
+      ArrayList<Integer> dictionaryEncodedStringArray = arrayBiDictionary.inverse().get(id);
+      final Object[] stringRepresentation = new Object[dictionaryEncodedStringArray.size()];
+      for (int i = 0; i < dictionaryEncodedStringArray.size(); ++i) {
+        stringRepresentation[i] = elementBiDictionary.inverse().get(dictionaryEncodedStringArray.get(i));
+      }
+      return stringRepresentation;
+    }
+
+    @Override
+    public boolean canCompareIds()
+    {
+      return false;
+    }
+
+    @Override
+    public void reset()
+    {
+      arrayBiDictionary.clear();
+      elementBiDictionary.clear();
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java
deleted file mode 100644
index dfc5149..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.ordering.StringComparator;
-import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.DimensionDictionary;
-import org.apache.druid.segment.DimensionSelector;
-import org.apache.druid.segment.data.ArrayBasedIndexedInts;
-import org.apache.druid.segment.data.IndexedInts;
-
-import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * A String strategy that builds an internal String<->Integer dictionary for
- * DimensionSelectors that return false for nameLookupPossibleInAdvance()
- */
-public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends StringGroupByColumnSelectorStrategy
-{
-  private static final int GROUP_BY_MISSING_VALUE = -1;
-
-  private final List<String> dictionary = DictionaryBuilding.createDictionary();
-  private final Object2IntMap<String> reverseDictionary = DictionaryBuilding.createReverseDictionary();
-
-  public DictionaryBuildingStringGroupByColumnSelectorStrategy()
-  {
-    super(null, null);
-  }
-
-  @Override
-  public void processValueFromGroupingKey(
-      GroupByColumnSelectorPlus selectorPlus,
-      ByteBuffer key,
-      ResultRow resultRow,
-      int keyBufferPosition
-  )
-  {
-    final int id = key.getInt(keyBufferPosition);
-
-    // GROUP_BY_MISSING_VALUE is used to indicate empty rows, which are omitted from the result map.
-    if (id != GROUP_BY_MISSING_VALUE) {
-      final String value = dictionary.get(id);
-      resultRow.set(selectorPlus.getResultRowPosition(), value);
-    } else {
-      resultRow.set(selectorPlus.getResultRowPosition(), NullHandling.defaultStringValue());
-    }
-  }
-
-  @Override
-  public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
-  {
-    final DimensionSelector dimSelector = (DimensionSelector) selector;
-    final IndexedInts row = dimSelector.getRow();
-    int stateFootprintIncrease = 0;
-    ArrayBasedIndexedInts newRow = (ArrayBasedIndexedInts) valuess[columnIndex];
-    if (newRow == null) {
-      newRow = new ArrayBasedIndexedInts();
-      valuess[columnIndex] = newRow;
-    }
-    int rowSize = row.size();
-    newRow.ensureSize(rowSize);
-    for (int i = 0; i < rowSize; i++) {
-      final String value = dimSelector.lookupName(row.get(i));
-      final int dictId = reverseDictionary.getInt(value);
-      if (dictId < 0) {
-        final int nextId = dictionary.size();
-        dictionary.add(value);
-        reverseDictionary.put(value, nextId);
-        newRow.setValue(i, nextId);
-        stateFootprintIncrease +=
-            DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
-      } else {
-        newRow.setValue(i, dictId);
-      }
-    }
-    newRow.setSize(rowSize);
-    return stateFootprintIncrease;
-  }
-
-  @Override
-  public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
-  {
-    final DimensionSelector dimSelector = (DimensionSelector) selector;
-    final IndexedInts row = dimSelector.getRow();
-
-    Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions");
-
-    if (row.size() == 0) {
-      writeToKeyBuffer(keyBufferPosition, GROUP_BY_MISSING_VALUE, keyBuffer);
-      return 0;
-    }
-
-    final String value = dimSelector.lookupName(row.get(0));
-    final int dictId = reverseDictionary.getInt(value);
-    if (dictId == DimensionDictionary.ABSENT_VALUE_ID) {
-      final int nextId = dictionary.size();
-      dictionary.add(value);
-      reverseDictionary.put(value, nextId);
-      writeToKeyBuffer(keyBufferPosition, nextId, keyBuffer);
-      return DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
-    } else {
-      writeToKeyBuffer(keyBufferPosition, dictId, keyBuffer);
-      return 0;
-    }
-  }
-
-  @Override
-  public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
-  {
-    final StringComparator realComparator = stringComparator == null ?
-                                            StringComparators.LEXICOGRAPHIC :
-                                            stringComparator;
-    return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
-      String lhsStr = dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition));
-      String rhsStr = dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition));
-      return realComparator.compare(lhsStr, rhsStr);
-    };
-  }
-
-  @Override
-  public void reset()
-  {
-    dictionary.clear();
-    reverseDictionary.clear();
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DimensionIdCodec.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DimensionIdCodec.java
new file mode 100644
index 0000000..3f5a207
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DimensionIdCodec.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.column;
+
+/**
+ * Dimension to integer id encoder - decoder i.e. it is an interface for converters of dimension to dictionary id and back.
+ * It only handles single value dimensions. Handle multi-value dimensions (i.e. strings) using the {@link KeyMappingMultiValueGroupByColumnSelectorStrategy}.
+ * <p>
+ * <strong>Encoding</strong><br />
+ * The caller is expected to handle non-null values. Null values must be filtered by the caller, and assigned {@link GroupByColumnSelectorStrategy#GROUP_BY_MISSING_VALUE}
+ * 1. {@link DimensionIdCodec} extracts the dimension from the selector
+ * 2. The value gets encoded into a dictionaryId, using {@link DimensionIdCodec#lookupId}
+ * 3. The callers can use this integer dictionaryID to represent the grouping key
+ * <p>
+ * <strong>Decoding</strong><br />
+ * Converts back the dictionaryId to the dimension value. The implementations are not expected to handle {@link GroupByColumnSelectorStrategy#GROUP_BY_MISSING_VALUE}.
+ * The callers should handle those values appropriately ontheir own, and filter those out before trying to convert
+ * the dictionary id back to value.
+ *
+ * @param <DimensionType> Type of the dimension holder
+ */
+public interface DimensionIdCodec<DimensionType>
+{
+  /**
+   * @return DictionaryId of the object at the given index and the memory increase associated with it
+   */
+  MemoryFootprint<Integer> lookupId(DimensionType dimension);
+
+  DimensionType idToKey(int id);
+
+  /**
+   * Returns if the object comparison can be optimised directly by comparing the dictionaryIds, instead of decoding the
+   * objects and comparing those. Therefore, it returns true iff the "dict" function defined by dict(id) = value is
+   * monotonically increasing.
+   *
+   * Ids backed by dictionaries built on the fly can never be compared, therefore those should always return false.
+   */
+  boolean canCompareIds();
+
+  void reset();
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java
deleted file mode 100644
index 6fcbf9c..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils;
-import org.apache.druid.query.ordering.StringComparator;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.DimensionHandlerUtils;
-
-import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
-
-public class DoubleGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
-{
-  @Override
-  public int getGroupingKeySize()
-  {
-    return Double.BYTES;
-  }
-
-  @Override
-  public void processValueFromGroupingKey(
-      GroupByColumnSelectorPlus selectorPlus,
-      ByteBuffer key,
-      ResultRow resultRow,
-      int keyBufferPosition
-  )
-  {
-    final double val = key.getDouble(keyBufferPosition);
-    resultRow.set(selectorPlus.getResultRowPosition(), val);
-  }
-
-  @Override
-  public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values)
-  {
-    values[columnIndex] = selector.getDouble();
-    return 0;
-  }
-
-  @Override
-  public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
-  {
-    keyBuffer.putDouble(keyBufferPosition, selector.getDouble());
-    return 0;
-  }
-
-  @Override
-  public void initGroupingKeyColumnValue(
-      int keyBufferPosition,
-      int dimensionIndex,
-      Object rowObj,
-      ByteBuffer keyBuffer,
-      int[] stack
-  )
-  {
-    writeToKeyBuffer(keyBufferPosition, DimensionHandlerUtils.nullToZero((Double) rowObj), keyBuffer);
-    stack[dimensionIndex] = 1;
-  }
-
-  @Override
-  public boolean checkRowIndexAndAddValueToGroupingKey(
-      int keyBufferPosition,
-      Object rowObj,
-      int rowValIdx,
-      ByteBuffer keyBuffer
-  )
-  {
-    // rows from a double column always have a single value, multi-value is not currently supported
-    // this method handles row values after the first in a multivalued row, so just return false
-    return false;
-  }
-
-  @Override
-  public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
-  {
-    return GrouperBufferComparatorUtils.makeBufferComparatorForDouble(
-        keyBufferPosition,
-        true,
-        stringComparator
-    );
-  }
-
-  @Override
-  public void reset()
-  {
-    // Nothing to do.
-  }
-
-  private void writeToKeyBuffer(int keyBufferPosition, double value, ByteBuffer keyBuffer)
-  {
-    keyBuffer.putDouble(keyBufferPosition, value);
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FixedWidthGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FixedWidthGroupByColumnSelectorStrategy.java
new file mode 100644
index 0000000..378689a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FixedWidthGroupByColumnSelectorStrategy.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.column;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.query.DimensionComparisonUtils;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.groupby.epinephelinae.Grouper;
+import org.apache.druid.query.ordering.StringComparator;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.nio.ByteBuffer;
+import java.util.function.Function;
+
+/**
+ * Strategy for grouping dimensions which have fixed-width objects. It is only used for numeric primitive types,
+ * however complex types can reuse this strategy if they can hint the engine that they are always fixed width
+ * (for e.g. IP types). Such types donot need to be backed by a dictionary, and hence are faster to group by.
+ *
+ * @param <T> Class of the dimension
+ */
+@NotThreadSafe
+public class FixedWidthGroupByColumnSelectorStrategy<T> implements GroupByColumnSelectorStrategy
+{
+  /**
+   * Size of the key when materialized as bytes
+   */
+  final int keySizeBytes;
+
+  /**
+   * Type of the dimension on which the grouping strategy is being used
+   */
+  final ColumnType columnType;
+
+  /**
+   * Nullable type strategy of the dimension
+   */
+  final NullableTypeStrategy<T> nullableTypeStrategy;
+
+  final Function<ColumnValueSelector<?>, T> valueGetter;
+  final Function<ColumnValueSelector<?>, Boolean> nullityGetter;
+
+  public FixedWidthGroupByColumnSelectorStrategy(
+      int keySizeBytes,
+      ColumnType columnType,
+      Function<ColumnValueSelector<?>, T> valueGetter,
+      Function<ColumnValueSelector<?>, Boolean> nullityGetter
+  )
+  {
+    this.keySizeBytes = keySizeBytes;
+    this.columnType = columnType;
+    this.nullableTypeStrategy = columnType.getNullableStrategy();
+    this.valueGetter = valueGetter;
+    this.nullityGetter = nullityGetter;
+  }
+
+  @Override
+  public int getGroupingKeySizeBytes()
+  {
+    return keySizeBytes;
+  }
+
+  @Override
+  public void processValueFromGroupingKey(
+      GroupByColumnSelectorPlus selectorPlus,
+      ByteBuffer key,
+      ResultRow resultRow,
+      int keyBufferPosition
+  )
+  {
+    resultRow.set(
+        selectorPlus.getResultRowPosition(),
+        nullableTypeStrategy.read(key, keyBufferPosition)
+    );
+  }
+
+  @Override
+  public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
+  {
+    valuess[columnIndex] = getValue(selector);
+    return 0;
+  }
+
+  @Override
+  public void initGroupingKeyColumnValue(
+      int keyBufferPosition,
+      int dimensionIndex,
+      Object rowObj,
+      ByteBuffer keyBuffer,
+      int[] stack
+  )
+  {
+    int written;
+    if (rowObj == null) {
+      written = nullableTypeStrategy.write(keyBuffer, keyBufferPosition, null, keySizeBytes);
+      stack[dimensionIndex] = 0;
+    } else {
+      written = nullableTypeStrategy.write(keyBuffer, keyBufferPosition, (T) rowObj, keySizeBytes);
+      stack[dimensionIndex] = 1;
+    }
+    // Since this is a fixed width strategy, the caller should already have allocated enough space to materialize the
+    // key object, and the type strategy should always be able to write to the buffer
+    if (written < 0) {
+      throw DruidException.defensive("Unable to serialize the value [%s] to buffer", rowObj);
+    }
+  }
+
+  /**
+   * This is used for multi-valued dimensions, for values after the first one. None of the current types supported by
+   * this strategy handle multi-valued dimensions, therefore this short circuits and returns false
+   */
+  @Override
+  public boolean checkRowIndexAndAddValueToGroupingKey(
+      int keyBufferPosition,
+      Object rowObj,
+      int rowValIdx,
+      ByteBuffer keyBuffer
+  )
+  {
+    return false;
+  }
+
+  @Override
+  public int writeToKeyBuffer(
+      int keyBufferPosition,
+      ColumnValueSelector selector,
+      ByteBuffer keyBuffer
+  )
+  {
+    T value = getValue(selector);
+    int written = nullableTypeStrategy.write(keyBuffer, keyBufferPosition, value, keySizeBytes);
+    if (written < 0) {
+      throw DruidException.defensive("Unable to serialize the value [%s] to buffer", value);
+    }
+    // This strategy doesn't use dictionary building and doesn't hold any internal state, therefore size increase is nil.
+    return 0;
+  }
+
+  @Override
+  public Grouper.BufferComparator bufferComparator(
+      int keyBufferPosition,
+      @Nullable StringComparator stringComparator
+  )
+  {
+    return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+      T lhs = nullableTypeStrategy.read(lhsBuffer, lhsPosition + keyBufferPosition);
+      T rhs = nullableTypeStrategy.read(rhsBuffer, rhsPosition + keyBufferPosition);
+      if (stringComparator != null
+          && !DimensionComparisonUtils.isNaturalComparator(columnType.getType(), stringComparator)) {
+        return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
+      }
+      // Nulls are allowed while comparing
+      //noinspection ConstantConditions
+      return nullableTypeStrategy.compare(lhs, rhs);
+    };
+  }
+
+
+  @Override
+  public void reset()
+  {
+    // Nothing to reset
+  }
+
+  /**
+   * Returns the value of the selector. It handles nullity of the value and casts it to the proper type so that the
+   * upstream callers donot need to worry about handling incorrect types (for example, if a double column value selector
+   * returns a long)
+   */
+  @Nullable
+  private T getValue(ColumnValueSelector columnValueSelector)
+  {
+    if (nullityGetter.apply(columnValueSelector)) {
+      return null;
+    }
+    // Convert the object to the desired type
+    return valueGetter.apply(columnValueSelector);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java
deleted file mode 100644
index a01c3c3..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils;
-import org.apache.druid.query.ordering.StringComparator;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.DimensionHandlerUtils;
-
-import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
-
-public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
-{
-
-  @Override
-  public int getGroupingKeySize()
-  {
-    return Float.BYTES;
-  }
-
-  @Override
-  public void processValueFromGroupingKey(
-      GroupByColumnSelectorPlus selectorPlus,
-      ByteBuffer key,
-      ResultRow resultRow,
-      int keyBufferPosition
-  )
-  {
-    final float val = key.getFloat(keyBufferPosition);
-    resultRow.set(selectorPlus.getResultRowPosition(), val);
-  }
-
-  @Override
-  public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
-  {
-    valuess[columnIndex] = selector.getFloat();
-    return 0;
-  }
-
-  @Override
-  public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
-  {
-    keyBuffer.putFloat(keyBufferPosition, selector.getFloat());
-    return 0;
-  }
-
-  @Override
-  public Grouper.BufferComparator bufferComparator(
-      int keyBufferPosition,
-      @Nullable StringComparator stringComparator
-  )
-  {
-    return GrouperBufferComparatorUtils.makeBufferComparatorForFloat(
-        keyBufferPosition,
-        true,
-        stringComparator
-    );
-  }
-
-  @Override
-  public void initGroupingKeyColumnValue(
-      int keyBufferPosition,
-      int dimensionIndex,
-      Object rowObj,
-      ByteBuffer keyBuffer,
-      int[] stack
-  )
-  {
-    writeToKeyBuffer(keyBufferPosition, DimensionHandlerUtils.nullToZero((Float) rowObj), keyBuffer);
-    stack[dimensionIndex] = 1;
-  }
-
-  @Override
-  public boolean checkRowIndexAndAddValueToGroupingKey(
-      int keyBufferPosition,
-      Object rowObj,
-      int rowValIdx,
-      ByteBuffer keyBuffer
-  )
-  {
-    // rows from a float column always have a single value, multi-value is not currently supported
-    // this method handles row values after the first in a multivalued row, so just return false
-    return false;
-  }
-
-  @Override
-  public void reset()
-  {
-    // Nothing to do.
-  }
-
-  private void writeToKeyBuffer(int keyBufferPosition, float value, ByteBuffer keyBuffer)
-  {
-    keyBuffer.putFloat(keyBufferPosition, value);
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java
index 26095b5..bb7ac01 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java
@@ -36,7 +36,7 @@
  * Each GroupByColumnSelectorStrategy is associated with a single dimension.
  *
  * Strategies may have internal state, such as the dictionary maintained by
- * {@link DictionaryBuildingStringGroupByColumnSelectorStrategy}. Callers should assume that the internal
+ * {@link DictionaryBuildingGroupByColumnSelectorStrategy}. Callers should assume that the internal
  * state footprint starts out empty (zero bytes) and is also reset to zero on each call to {@link #reset()}. Each call
  * to {@link #initColumnValues} or {@link #writeToKeyBuffer(int, ColumnValueSelector, ByteBuffer)} returns the
  * incremental increase in internal state footprint that happened as a result of that particular call.
@@ -45,6 +45,9 @@
  */
 public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
 {
+  /**
+   * Index to indicate the absence of a key in the dictionary
+   */
   int GROUP_BY_MISSING_VALUE = -1;
 
   /**
@@ -54,7 +57,7 @@
    *
    * @return size, in bytes, of this dimension's values in the grouping key.
    */
-  int getGroupingKeySize();
+  int getGroupingKeySizeBytes();
 
   /**
    * Read a value from a grouping key and add it to the group by query result row, using the output name specified
@@ -117,7 +120,9 @@
   /**
    * If rowValIdx is less than the size of rowObj (haven't handled all of the row values):
    * First, read the value at rowValIdx from a rowObj and write that value to the keyBuffer at keyBufferPosition.
-   * Then return true
+   * Then return true.
+   * This method assumes that the size increase associated with the dictionary building has occurred already when calling
+   * {@link #initColumnValues}
    *
    * Otherwise, return false.
    *
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/KeyMappingGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/KeyMappingGroupByColumnSelectorStrategy.java
new file mode 100644
index 0000000..350118c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/KeyMappingGroupByColumnSelectorStrategy.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.column;
+
+import org.apache.druid.query.DimensionComparisonUtils;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.groupby.epinephelinae.Grouper;
+import org.apache.druid.query.ordering.StringComparator;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.NullableTypeStrategy;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.nio.ByteBuffer;
+
+/**
+ * Strategy for grouping single value dimensions which can have variable-width objects. Materializing such objects on the buffer
+ * require an additional step of mapping them to an integer index. The integer index can be materialized on the buffer within
+ * a fixed width, and is often backed by a dictionary representing the actual dimension object. It is used for arrays,
+ * and complex types.
+ * <p>
+ * The visibility of the class is limited, and the callers must use the following variant of the mapping strategy:
+ * 2. {@link DictionaryBuildingGroupByColumnSelectorStrategy}
+ * <p>
+ * {@code null} can be represented by either -1 or the position of null in the dictionary it was stored when it was
+ * encountered. This is fine, because most of the time, the dictionary id has no value of its own, and is converted back to
+ * the value it represents, before doing further operations. The only place where it would matter would be when
+ * {@link DimensionIdCodec#canCompareIds()} is true, and we compare directly on the dictionary ids for prebuilt
+ * dictionaries (we can't compare ids for the dictionaries built on the fly in the grouping strategy). However, in that case,
+ * it is guaranteed that the dictionaryId of null represented by the pre-built dictionary would be the lowest (most likely 0)
+ * and therefore nulls (-1) would be adjacent to nulls (represented by the lowest non-negative dictionary id), and would get
+ * grouped in the later merge stages.
+ * <p>
+ * It only handles single value dimensions, i.e. every type except for strings. Strings are handled by the implementations
+ * of {@link KeyMappingMultiValueGroupByColumnSelectorStrategy}
+ * <p>
+ * It only handles non-primitive types, because numeric primitives are handled by the {@link FixedWidthGroupByColumnSelectorStrategy}
+ * and the string primitives are handled by the {@link KeyMappingMultiValueGroupByColumnSelectorStrategy}
+ *
+ * @param <DimensionType>   Class of the dimension
+ * @see DimensionIdCodec encoding decoding logic for converting value to dictionary
+ */
+@NotThreadSafe
+class KeyMappingGroupByColumnSelectorStrategy<DimensionType> implements GroupByColumnSelectorStrategy
+{
+  /**
+   * Converts the dimension to equivalent dictionaryId.
+   */
+  final DimensionIdCodec<DimensionType> dimensionIdCodec;
+
+  /**
+   * Type of the dimension on which the grouping strategy is used
+   */
+  final ColumnType columnType;
+
+  /**
+   * Nullable type strategy of the dimension
+   */
+  final NullableTypeStrategy<DimensionType> nullableTypeStrategy;
+
+  /**
+   * Default value of the dimension
+   */
+  final DimensionType defaultValue;
+
+  KeyMappingGroupByColumnSelectorStrategy(
+      final DimensionIdCodec<DimensionType> dimensionIdCodec,
+      final ColumnType columnType,
+      final NullableTypeStrategy<DimensionType> nullableTypeStrategy,
+      final DimensionType defaultValue
+  )
+  {
+    this.dimensionIdCodec = dimensionIdCodec;
+    this.columnType = columnType;
+    this.nullableTypeStrategy = nullableTypeStrategy;
+    this.defaultValue = defaultValue;
+  }
+
+  /**
+   * Strategy maps to integer dictionary ids
+   */
+  @Override
+  public int getGroupingKeySizeBytes()
+  {
+    return Integer.BYTES;
+  }
+
+  @Override
+  public void processValueFromGroupingKey(
+      GroupByColumnSelectorPlus selectorPlus,
+      ByteBuffer key,
+      ResultRow resultRow,
+      int keyBufferPosition
+  )
+  {
+    final int id = key.getInt(keyBufferPosition);
+    if (id != GROUP_BY_MISSING_VALUE) {
+      resultRow.set(selectorPlus.getResultRowPosition(), dimensionIdCodec.idToKey(id));
+    } else {
+      resultRow.set(selectorPlus.getResultRowPosition(), defaultValue);
+    }
+  }
+
+  @Override
+  public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
+  {
+    //noinspection unchecked
+    final DimensionType value = (DimensionType) DimensionHandlerUtils.convertObjectToType(
+        selector.getObject(),
+        columnType
+    );
+    if (value == null) {
+      valuess[columnIndex] = GROUP_BY_MISSING_VALUE;
+      return 0;
+    } else {
+      MemoryFootprint<Integer> idAndMemoryFootprint = dimensionIdCodec.lookupId(value);
+      valuess[columnIndex] = idAndMemoryFootprint.value();
+      return idAndMemoryFootprint.memoryIncrease();
+    }
+  }
+
+  @Override
+  public void initGroupingKeyColumnValue(
+      int keyBufferPosition,
+      int dimensionIndex,
+      Object rowObj,
+      ByteBuffer keyBuffer,
+      int[] stack
+  )
+  {
+    // It is always called with the dictionaryId that we'd have initialized
+    //noinspection unchecked
+    int dictId = (int) rowObj;
+    keyBuffer.putInt(keyBufferPosition, dictId);
+    if (dictId == GROUP_BY_MISSING_VALUE) {
+      stack[dimensionIndex] = 0;
+    } else {
+      stack[dimensionIndex] = 1;
+    }
+  }
+
+  // The method is only used for single value dimensions, therefore doesn't have any actual implementation of this
+  // method, which is only called for multi-value dimensions
+  @Override
+  public boolean checkRowIndexAndAddValueToGroupingKey(
+      int keyBufferPosition,
+      Object rowObj,
+      int rowValIdx,
+      ByteBuffer keyBuffer
+  )
+  {
+    return false;
+  }
+
+  @Override
+  public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
+  {
+    //noinspection unchecked
+    final DimensionType value = (DimensionType) DimensionHandlerUtils.convertObjectToType(
+        selector.getObject(),
+        columnType
+    );
+    final int memoryIncrease;
+    if (value == null) {
+      keyBuffer.putInt(keyBufferPosition, GROUP_BY_MISSING_VALUE);
+      return 0;
+    } else {
+      MemoryFootprint<Integer> idAndMemoryIncrease = dimensionIdCodec.lookupId(value);
+      keyBuffer.putInt(keyBufferPosition, idAndMemoryIncrease.value());
+      memoryIncrease = idAndMemoryIncrease.memoryIncrease();
+    }
+    return memoryIncrease;
+  }
+
+  @Override
+  public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
+  {
+    boolean usesNaturalComparator =
+        stringComparator == null
+        || DimensionComparisonUtils.isNaturalComparator(columnType.getType(), stringComparator);
+    if (dimensionIdCodec.canCompareIds() && usesNaturalComparator) {
+      return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Integer.compare(
+          lhsBuffer.getInt(lhsPosition + keyBufferPosition),
+          rhsBuffer.getInt(rhsPosition + keyBufferPosition)
+      );
+    } else {
+      return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+        int lhsDictId = lhsBuffer.getInt(lhsPosition + keyBufferPosition);
+        int rhsDictId = rhsBuffer.getInt(rhsPosition + keyBufferPosition);
+
+        Object lhsObject = lhsDictId == GROUP_BY_MISSING_VALUE ? null : dimensionIdCodec.idToKey(lhsDictId);
+        Object rhsObject = rhsDictId == GROUP_BY_MISSING_VALUE ? null : dimensionIdCodec.idToKey(rhsDictId);
+        if (usesNaturalComparator) {
+          return nullableTypeStrategy.compare(
+              (DimensionType) DimensionHandlerUtils.convertObjectToType(lhsObject, columnType),
+              (DimensionType) DimensionHandlerUtils.convertObjectToType(rhsObject, columnType)
+          );
+        } else {
+          return stringComparator.compare(String.valueOf(lhsObject), String.valueOf(rhsObject));
+        }
+      };
+    }
+  }
+
+  @Override
+  public void reset()
+  {
+    dimensionIdCodec.reset();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/KeyMappingMultiValueGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/KeyMappingMultiValueGroupByColumnSelectorStrategy.java
new file mode 100644
index 0000000..aac9f2f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/KeyMappingMultiValueGroupByColumnSelectorStrategy.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.column;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.groupby.epinephelinae.DictionaryBuildingUtils;
+import org.apache.druid.query.groupby.epinephelinae.Grouper;
+import org.apache.druid.query.ordering.StringComparator;
+import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionDictionary;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.ArrayBasedIndexedInts;
+import org.apache.druid.segment.data.IndexedInts;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.IntFunction;
+
+/**
+ * Like {@link KeyMappingGroupByColumnSelectorStrategy}, but for multi-value dimensions, i.e. strings. It can only handle
+ * {@link DimensionSelector}
+ */
+public abstract class KeyMappingMultiValueGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
+{
+
+  public static GroupByColumnSelectorStrategy create(
+      ColumnCapabilities capabilities,
+      DimensionSelector dimensionSelector
+  )
+  {
+    if (dimensionSelector.getValueCardinality() >= 0 && dimensionSelector.nameLookupPossibleInAdvance()) {
+      return new PrebuiltDictionary(capabilities, dimensionSelector::lookupName);
+    }
+    return new DictionaryBuilding();
+  }
+
+  @Override
+  public int getGroupingKeySizeBytes()
+  {
+    return Integer.BYTES;
+  }
+
+
+  @Override
+  public void initGroupingKeyColumnValue(
+      int keyBufferPosition,
+      int dimensionIndex,
+      Object rowObj,
+      ByteBuffer keyBuffer,
+      int[] stack
+  )
+  {
+    IndexedInts row = (IndexedInts) rowObj;
+    int rowSize = row.size();
+
+    keyBuffer.putInt(
+        keyBufferPosition,
+        rowSize == 0 ? GROUP_BY_MISSING_VALUE : row.get(0)
+    );
+
+    stack[dimensionIndex] = rowSize == 0 ? 0 : 1;
+  }
+
+  @Override
+  public boolean checkRowIndexAndAddValueToGroupingKey(
+      int keyBufferPosition,
+      Object rowObj,
+      int rowValIdx,
+      ByteBuffer keyBuffer
+  )
+  {
+    IndexedInts row = (IndexedInts) rowObj;
+    int rowSize = row.size();
+
+    if (rowValIdx < rowSize) {
+      keyBuffer.putInt(
+          keyBufferPosition,
+          row.get(rowValIdx)
+      );
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public static class PrebuiltDictionary extends KeyMappingMultiValueGroupByColumnSelectorStrategy
+  {
+    private final ColumnCapabilities capabilities;
+    private final IntFunction<String> dictionaryLookup;
+
+    public PrebuiltDictionary(
+        ColumnCapabilities capabilities,
+        IntFunction<String> dictionaryLookup
+    )
+    {
+      this.capabilities = capabilities;
+      this.dictionaryLookup = dictionaryLookup;
+    }
+
+    @Override
+    public void processValueFromGroupingKey(
+        GroupByColumnSelectorPlus selectorPlus,
+        ByteBuffer key, ResultRow resultRow,
+        int keyBufferPosition
+    )
+    {
+      final int id = key.getInt(keyBufferPosition);
+      if (id != GROUP_BY_MISSING_VALUE) {
+        resultRow.set(
+            selectorPlus.getResultRowPosition(),
+            ((DimensionSelector) selectorPlus.getSelector()).lookupName(id)
+        );
+      } else {
+        // Since this is used for String dimensions only, we can directly put the default string value here
+        resultRow.set(selectorPlus.getResultRowPosition(), NullHandling.defaultStringValue());
+      }
+    }
+
+    @Override
+    public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
+    {
+      DimensionSelector dimSelector = (DimensionSelector) selector;
+      IndexedInts row = dimSelector.getRow();
+      valuess[columnIndex] = row;
+      return 0;
+    }
+
+    @Override
+    public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
+    {
+      final DimensionSelector dimSelector = (DimensionSelector) selector;
+      final IndexedInts row = dimSelector.getRow();
+      Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions");
+      final int dictId = row.size() == 1 ? row.get(0) : GROUP_BY_MISSING_VALUE;
+      keyBuffer.putInt(keyBufferPosition, dictId);
+      return 0;
+    }
+
+    @Override
+    public Grouper.BufferComparator bufferComparator(
+        int keyBufferPosition,
+        @Nullable StringComparator stringComparator
+    )
+    {
+      final boolean canCompareInts =
+          capabilities != null &&
+          capabilities.isDictionaryEncoded().and(
+              capabilities.areDictionaryValuesSorted().and(
+                  capabilities.areDictionaryValuesUnique()
+              )
+          ).isTrue();
+
+      final StringComparator comparator = stringComparator == null ? StringComparators.LEXICOGRAPHIC : stringComparator;
+      if (canCompareInts && StringComparators.LEXICOGRAPHIC.equals(comparator)) {
+        return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Integer.compare(
+            lhsBuffer.getInt(lhsPosition + keyBufferPosition),
+            rhsBuffer.getInt(rhsPosition + keyBufferPosition)
+        );
+      } else {
+        Preconditions.checkState(dictionaryLookup != null, "null dictionary lookup");
+        return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+          String lhsStr = dictionaryLookup.apply(lhsBuffer.getInt(lhsPosition + keyBufferPosition));
+          String rhsStr = dictionaryLookup.apply(rhsBuffer.getInt(rhsPosition + keyBufferPosition));
+          return comparator.compare(lhsStr, rhsStr);
+        };
+      }
+    }
+
+    @Override
+    public void reset()
+    {
+      // Nothing to do.
+    }
+  }
+
+  public static class DictionaryBuilding extends KeyMappingMultiValueGroupByColumnSelectorStrategy
+  {
+
+    private final List<String> dictionary = DictionaryBuildingUtils.createDictionary();
+    private final Object2IntMap<String> reverseDictionary = DictionaryBuildingUtils.createReverseDictionary(ColumnType.STRING.getNullableStrategy());
+
+    @Override
+    public void processValueFromGroupingKey(
+        GroupByColumnSelectorPlus selectorPlus,
+        ByteBuffer key,
+        ResultRow resultRow,
+        int keyBufferPosition
+    )
+    {
+      final int id = key.getInt(keyBufferPosition);
+
+      // GROUP_BY_MISSING_VALUE is used to indicate empty rows, which are omitted from the result map.
+      if (id != GROUP_BY_MISSING_VALUE) {
+        final String value = dictionary.get(id);
+        resultRow.set(selectorPlus.getResultRowPosition(), value);
+      } else {
+        resultRow.set(selectorPlus.getResultRowPosition(), NullHandling.defaultStringValue());
+      }
+    }
+
+    @Override
+    public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
+    {
+      final DimensionSelector dimSelector = (DimensionSelector) selector;
+      final IndexedInts row = dimSelector.getRow();
+      int stateFootprintIncrease = 0;
+      ArrayBasedIndexedInts newRow = (ArrayBasedIndexedInts) valuess[columnIndex];
+      if (newRow == null) {
+        newRow = new ArrayBasedIndexedInts();
+        valuess[columnIndex] = newRow;
+      }
+      int rowSize = row.size();
+      newRow.ensureSize(rowSize);
+      for (int i = 0; i < rowSize; i++) {
+        final String value = dimSelector.lookupName(row.get(i));
+        final int dictId = reverseDictionary.getInt(value);
+        if (dictId < 0) {
+          final int nextId = dictionary.size();
+          dictionary.add(value);
+          reverseDictionary.put(value, nextId);
+          newRow.setValue(i, nextId);
+          stateFootprintIncrease +=
+              DictionaryBuildingUtils.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
+        } else {
+          newRow.setValue(i, dictId);
+        }
+      }
+      newRow.setSize(rowSize);
+      return stateFootprintIncrease;
+    }
+
+    @Override
+    public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
+    {
+      final DimensionSelector dimSelector = (DimensionSelector) selector;
+      final IndexedInts row = dimSelector.getRow();
+
+      Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions");
+
+      if (row.size() == 0) {
+        keyBuffer.putInt(keyBufferPosition, GROUP_BY_MISSING_VALUE);
+        return 0;
+      }
+
+      final String value = dimSelector.lookupName(row.get(0));
+      final int dictId = reverseDictionary.getInt(value);
+      if (dictId == DimensionDictionary.ABSENT_VALUE_ID) {
+        final int nextId = dictionary.size();
+        dictionary.add(value);
+        reverseDictionary.put(value, nextId);
+        keyBuffer.putInt(keyBufferPosition, nextId);
+        return DictionaryBuildingUtils.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
+      } else {
+        keyBuffer.putInt(keyBufferPosition, dictId);
+        return 0;
+      }
+    }
+
+    @Override
+    public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
+    {
+      final StringComparator realComparator = stringComparator == null ?
+                                              StringComparators.LEXICOGRAPHIC :
+                                              stringComparator;
+      return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
+        String lhsStr = dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition));
+        String rhsStr = dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition));
+        return realComparator.compare(lhsStr, rhsStr);
+      };
+    }
+
+    @Override
+    public void reset()
+    {
+      dictionary.clear();
+      reverseDictionary.clear();
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java
deleted file mode 100644
index 95d57e0..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils;
-import org.apache.druid.query.ordering.StringComparator;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.DimensionHandlerUtils;
-
-import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
-
-public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
-{
-
-  @Override
-  public int getGroupingKeySize()
-  {
-    return Long.BYTES;
-  }
-
-  @Override
-  public void processValueFromGroupingKey(
-      GroupByColumnSelectorPlus selectorPlus,
-      ByteBuffer key,
-      ResultRow resultRow,
-      int keyBufferPosition
-  )
-  {
-    final long val = key.getLong(keyBufferPosition);
-    resultRow.set(selectorPlus.getResultRowPosition(), val);
-  }
-
-  @Override
-  public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
-  {
-    valuess[columnIndex] = selector.getLong();
-    return 0;
-  }
-
-  @Override
-  public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
-  {
-    keyBuffer.putLong(keyBufferPosition, selector.getLong());
-    return 0;
-  }
-
-  @Override
-  public Grouper.BufferComparator bufferComparator(
-      int keyBufferPosition,
-      @Nullable StringComparator stringComparator
-  )
-  {
-    return GrouperBufferComparatorUtils.makeBufferComparatorForLong(
-        keyBufferPosition,
-        true,
-        stringComparator
-    );
-  }
-
-  @Override
-  public void initGroupingKeyColumnValue(
-      int keyBufferPosition,
-      int dimensionIndex,
-      Object rowObj,
-      ByteBuffer keyBuffer,
-      int[] stack
-  )
-  {
-    writeToKeyBuffer(keyBufferPosition, DimensionHandlerUtils.nullToZero((Long) rowObj), keyBuffer);
-    stack[dimensionIndex] = 1;
-  }
-
-  @Override
-  public boolean checkRowIndexAndAddValueToGroupingKey(
-      int keyBufferPosition,
-      Object rowObj,
-      int rowValIdx,
-      ByteBuffer keyBuffer
-  )
-  {
-    // rows from a long column always have a single value, multi-value is not currently supported
-    // this method handles row values after the first in a multivalued row, so just return false
-    return false;
-  }
-
-  @Override
-  public void reset()
-  {
-    // Nothing to do.
-  }
-
-  public void writeToKeyBuffer(int keyBufferPosition, long value, ByteBuffer keyBuffer)
-  {
-    keyBuffer.putLong(keyBufferPosition, value);
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/MemoryFootprint.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/MemoryFootprint.java
new file mode 100644
index 0000000..6430377
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/MemoryFootprint.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.column;
+
+/**
+ * Holder for a value and the memory increase in the internal dictionary associated with the increase
+ */
+public class MemoryFootprint<T>
+{
+  private final T value;
+  private final int footprintIncrease;
+
+  // Reduced visibility
+  MemoryFootprint(T value, int footprintIncrease)
+  {
+    this.value = value;
+    this.footprintIncrease = footprintIncrease;
+  }
+
+  public T value()
+  {
+    return value;
+  }
+
+  public int memoryIncrease()
+  {
+    return footprintIncrease;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java
deleted file mode 100644
index 34af762..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.groupby.epinephelinae.GrouperBufferComparatorUtils;
-import org.apache.druid.query.ordering.StringComparator;
-import org.apache.druid.segment.ColumnValueSelector;
-
-import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
-
-/**
- * A wrapper around a numeric {@link GroupByColumnSelectorStrategy} that makes it null-aware. Should only be used
- * for numeric strategies, not for string strategies.
- *
- * @see org.apache.druid.segment.BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case
- */
-public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
-{
-  private final GroupByColumnSelectorStrategy delegate;
-  private final byte[] nullKeyBytes;
-
-  public NullableNumericGroupByColumnSelectorStrategy(GroupByColumnSelectorStrategy delegate)
-  {
-    this.delegate = delegate;
-    this.nullKeyBytes = new byte[delegate.getGroupingKeySize() + 1];
-    this.nullKeyBytes[0] = NullHandling.IS_NULL_BYTE;
-  }
-
-  @Override
-  public int getGroupingKeySize()
-  {
-    return delegate.getGroupingKeySize() + Byte.BYTES;
-  }
-
-  @Override
-  public void processValueFromGroupingKey(
-      GroupByColumnSelectorPlus selectorPlus,
-      ByteBuffer key,
-      ResultRow resultRow,
-      int keyBufferPosition
-  )
-  {
-    if (key.get(keyBufferPosition) == NullHandling.IS_NULL_BYTE) {
-      resultRow.set(selectorPlus.getResultRowPosition(), null);
-    } else {
-      delegate.processValueFromGroupingKey(selectorPlus, key, resultRow, keyBufferPosition + Byte.BYTES);
-    }
-  }
-
-  @Override
-  public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values)
-  {
-    if (selector.isNull()) {
-      values[columnIndex] = null;
-      return 0;
-    } else {
-      return delegate.initColumnValues(selector, columnIndex, values);
-    }
-  }
-
-  @Override
-  public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
-  {
-    if (selector.isNull()) {
-      keyBuffer.position(keyBufferPosition);
-      keyBuffer.put(nullKeyBytes);
-      return 0;
-    } else {
-      keyBuffer.put(keyBufferPosition, NullHandling.IS_NOT_NULL_BYTE);
-      return delegate.writeToKeyBuffer(keyBufferPosition + Byte.BYTES, selector, keyBuffer);
-    }
-  }
-
-  @Override
-  public Grouper.BufferComparator bufferComparator(
-      int keyBufferPosition,
-      @Nullable StringComparator stringComparator
-  )
-  {
-    return GrouperBufferComparatorUtils.makeNullHandlingBufferComparatorForNumericData(
-        keyBufferPosition,
-        delegate.bufferComparator(keyBufferPosition + Byte.BYTES, stringComparator)
-    );
-  }
-
-  @Override
-  public void initGroupingKeyColumnValue(
-      int keyBufferPosition,
-      int dimensionIndex,
-      Object rowObj,
-      ByteBuffer keyBuffer,
-      int[] stack
-  )
-  {
-    if (rowObj == null) {
-      keyBuffer.position(keyBufferPosition);
-      keyBuffer.put(nullKeyBytes);
-    } else {
-      keyBuffer.put(keyBufferPosition, NullHandling.IS_NOT_NULL_BYTE);
-
-      // No need to update stack ourselves; we expect the delegate to do this.
-      delegate.initGroupingKeyColumnValue(
-          keyBufferPosition + Byte.BYTES,
-          dimensionIndex,
-          rowObj,
-          keyBuffer,
-          stack
-      );
-    }
-  }
-
-  @Override
-  public boolean checkRowIndexAndAddValueToGroupingKey(
-      int keyBufferPosition,
-      Object rowObj,
-      int rowValIdx,
-      ByteBuffer keyBuffer
-  )
-  {
-    // rows from a nullable column always have a single value, multi-value is not currently supported
-    // this method handles row values after the first in a multivalued row, so just return false
-    return false;
-  }
-
-  @Override
-  public void reset()
-  {
-    delegate.reset();
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java
deleted file mode 100644
index 8c25c77..0000000
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import com.google.common.base.Preconditions;
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.ordering.StringComparator;
-import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.DimensionSelector;
-import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.data.IndexedInts;
-
-import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
-import java.util.function.IntFunction;
-
-public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
-{
-  @Nullable
-  private final ColumnCapabilities capabilities;
-
-  @Nullable
-  private final IntFunction<String> dictionaryLookup;
-
-  public StringGroupByColumnSelectorStrategy(IntFunction<String> dictionaryLookup, ColumnCapabilities capabilities)
-  {
-    this.dictionaryLookup = dictionaryLookup;
-    this.capabilities = capabilities;
-  }
-
-  @Override
-  public int getGroupingKeySize()
-  {
-    return Integer.BYTES;
-  }
-
-  @Override
-  public void processValueFromGroupingKey(
-      GroupByColumnSelectorPlus selectorPlus,
-      ByteBuffer key,
-      ResultRow resultRow,
-      int keyBufferPosition
-  )
-  {
-    final int id = key.getInt(keyBufferPosition);
-
-    // GROUP_BY_MISSING_VALUE is used to indicate empty rows, which are omitted from the result map.
-    if (id != GROUP_BY_MISSING_VALUE) {
-      resultRow.set(
-          selectorPlus.getResultRowPosition(),
-          ((DimensionSelector) selectorPlus.getSelector()).lookupName(id)
-      );
-    } else {
-      resultRow.set(selectorPlus.getResultRowPosition(), NullHandling.defaultStringValue());
-    }
-  }
-
-  @Override
-  public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
-  {
-    DimensionSelector dimSelector = (DimensionSelector) selector;
-    IndexedInts row = dimSelector.getRow();
-    valuess[columnIndex] = row;
-    return 0;
-  }
-
-  /**
-   * Writes a dictionary ID to the grouping key.
-   *
-   * Protected so subclasses can access it, like {@link DictionaryBuildingStringGroupByColumnSelectorStrategy}.
-   */
-  protected void writeToKeyBuffer(int keyBufferPosition, int dictId, ByteBuffer keyBuffer)
-  {
-    keyBuffer.putInt(keyBufferPosition, dictId);
-  }
-
-  @Override
-  public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
-  {
-    final DimensionSelector dimSelector = (DimensionSelector) selector;
-    final IndexedInts row = dimSelector.getRow();
-    Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions");
-    final int dictId = row.size() == 1 ? row.get(0) : GROUP_BY_MISSING_VALUE;
-    keyBuffer.putInt(keyBufferPosition, dictId);
-    return 0;
-  }
-
-  @Override
-  public void initGroupingKeyColumnValue(
-      int keyBufferPosition,
-      int dimensionIndex,
-      Object rowObj,
-      ByteBuffer keyBuffer,
-      int[] stack
-  )
-  {
-    IndexedInts row = (IndexedInts) rowObj;
-    int rowSize = row.size();
-
-    initializeGroupingKeyV2Dimension(row, rowSize, keyBuffer, keyBufferPosition);
-    stack[dimensionIndex] = rowSize == 0 ? 0 : 1;
-  }
-
-  @Override
-  public boolean checkRowIndexAndAddValueToGroupingKey(
-      int keyBufferPosition,
-      Object rowObj,
-      int rowValIdx,
-      ByteBuffer keyBuffer
-  )
-  {
-    IndexedInts row = (IndexedInts) rowObj;
-    int rowSize = row.size();
-
-    if (rowValIdx < rowSize) {
-      keyBuffer.putInt(
-          keyBufferPosition,
-          row.get(rowValIdx)
-      );
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private void initializeGroupingKeyV2Dimension(
-      final IndexedInts values,
-      final int rowSize,
-      final ByteBuffer keyBuffer,
-      final int keyBufferPosition
-  )
-  {
-    if (rowSize == 0) {
-      keyBuffer.putInt(keyBufferPosition, GROUP_BY_MISSING_VALUE);
-    } else {
-      keyBuffer.putInt(keyBufferPosition, values.get(0));
-    }
-  }
-
-  @Override
-  public Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator)
-  {
-    final boolean canCompareInts =
-        capabilities != null &&
-        capabilities.hasBitmapIndexes() &&
-        capabilities.areDictionaryValuesSorted().and(capabilities.areDictionaryValuesUnique()).isTrue();
-    final StringComparator comparator = stringComparator == null ? StringComparators.LEXICOGRAPHIC : stringComparator;
-    if (canCompareInts && StringComparators.LEXICOGRAPHIC.equals(comparator)) {
-      return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Integer.compare(
-          lhsBuffer.getInt(lhsPosition + keyBufferPosition),
-          rhsBuffer.getInt(rhsPosition + keyBufferPosition)
-      );
-    } else {
-      Preconditions.checkState(dictionaryLookup != null, "null dictionary lookup");
-      return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
-        String lhsStr = dictionaryLookup.apply(lhsBuffer.getInt(lhsPosition + keyBufferPosition));
-        String rhsStr = dictionaryLookup.apply(rhsBuffer.getInt(rhsPosition + keyBufferPosition));
-        return comparator.compare(lhsStr, rhsStr);
-      };
-    }
-  }
-
-  @Override
-  public void reset()
-  {
-    // Nothing to do.
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java
index 83f49e1..393c717 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java
@@ -23,7 +23,7 @@
 import org.apache.datasketches.memory.WritableMemory;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding;
+import org.apache.druid.query.groupby.epinephelinae.DictionaryBuildingUtils;
 import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
 import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.vector.VectorObjectSelector;
@@ -36,7 +36,8 @@
  * single-valued STRING columns which are not natively dictionary encoded, e.g. expression virtual columns.
  *
  * This is effectively the {@link VectorGroupByEngine} analog of
- * {@link org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingStringGroupByColumnSelectorStrategy}
+ * {@link org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingGroupByColumnSelectorStrategy} for
+ * String columns
  */
 public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector implements GroupByVectorColumnSelector
 {
@@ -82,7 +83,7 @@
 
         // Use same ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY as the nonvectorized version; dictionary structure is the same.
         stateFootprintIncrease +=
-            DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
+            DictionaryBuildingUtils.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
       } else {
         keySpace.putInt(j, dictId);
       }
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
index a877237..a2fc9ce 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -219,8 +219,8 @@
                 return false;
               }
 
-              if (dimension.getOutputType().isArray()) {
-                // group by on arrays is not currently supported in the vector processing engine
+              if (!dimension.getOutputType().isPrimitive()) {
+                // group by on arrays and complex types is not currently supported in the vector processing engine
                 return false;
               }
 
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
index 6f7fab5..a34d58b 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
@@ -478,16 +478,33 @@
         throw new ISE("Cannot create comparator for array type %s.", columnType.toString());
       }
     }
+    final Comparator comparatorToUse;
+    if (arrayComparator != null) {
+      comparatorToUse = arrayComparator;
+    } else {
+      comparatorToUse = DimensionComparisonUtils.isNaturalComparator(columnType.getType(), stringComparator)
+                        ? columnType.getNullableStrategy()
+                        : stringComparator;
+    }
+
     return Ordering.from(
         Comparator.comparing(
             (ResultRow row) -> {
               if (columnType.isArray()) {
+                // Arrays have a specialized comparator, that applies the ordering per element. That will handle the casting
+                // and the comparison
+                return row.get(column);
+              } else if (DimensionComparisonUtils.isNaturalComparator(columnType.getType(), stringComparator)) {
+                // If the natural comparator is used, we can directly extract the dimension value, and the type strategy's comparison
+                // function will handle it, without casting
                 return row.get(column);
               } else {
+                // If the comparator is not natural, we will be using the string comparator, and we need to cast the dimension to string
+                // before comparison
                 return getDimensionValue(row, column);
               }
             },
-            Comparator.nullsFirst(arrayComparator == null ? stringComparator : arrayComparator)
+            Comparator.nullsFirst(comparatorToUse)
         )
     );
   }
diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunner.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunner.java
index 776c115..49d9a5a 100644
--- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunner.java
+++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunner.java
@@ -68,7 +68,8 @@
     @Override
     public SearchColumnSelectorStrategy makeColumnSelectorStrategy(
         ColumnCapabilities capabilities,
-        ColumnValueSelector selector
+        ColumnValueSelector selector,
+        String dimension
     )
     {
       switch (capabilities.getType()) {
@@ -84,6 +85,12 @@
           throw new IAE("Cannot create query type helper from invalid type [%s]", capabilities.asTypeString());
       }
     }
+
+    @Override
+    public boolean supportsComplexTypes()
+    {
+      return false;
+    }
   }
 
   public interface SearchColumnSelectorStrategy<ValueSelectorType> extends ColumnSelectorStrategy
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
index 0eafee2..e921e12 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
@@ -43,7 +43,8 @@
   @Override
   public TopNColumnAggregatesProcessor<?> makeColumnSelectorStrategy(
       ColumnCapabilities capabilities,
-      ColumnValueSelector selector
+      ColumnValueSelector selector,
+      String dimension
   )
   {
     if (capabilities.is(ValueType.STRING)) {
@@ -78,4 +79,10 @@
 
     throw new IAE("Cannot create query type helper from invalid type [%s]", capabilities.asTypeString());
   }
+
+  @Override
+  public boolean supportsComplexTypes()
+  {
+    return false;
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
index 1ca911a..da63413 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
@@ -25,6 +25,7 @@
 import com.google.common.primitives.Floats;
 import org.apache.druid.common.guava.GuavaUtils;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.parsers.ParseException;
@@ -201,7 +202,8 @@
       final String dimName = dimSpec.getDimension();
       final ColumnValueSelector<?> selector = getColumnValueSelectorFromDimensionSpec(
           dimSpec,
-          columnSelectorFactory
+          columnSelectorFactory,
+          strategyFactory.supportsComplexTypes()
       );
       Strategy strategy = makeStrategy(
           strategyFactory,
@@ -222,12 +224,13 @@
 
   private static ColumnValueSelector<?> getColumnValueSelectorFromDimensionSpec(
       DimensionSpec dimSpec,
-      ColumnSelectorFactory columnSelectorFactory
+      ColumnSelectorFactory columnSelectorFactory,
+      boolean supportsComplexTypes
   )
   {
     String dimName = dimSpec.getDimension();
     ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimName);
-    capabilities = getEffectiveCapabilities(dimSpec, capabilities);
+    capabilities = getEffectiveCapabilities(dimSpec, capabilities, supportsComplexTypes);
     if (capabilities.is(ValueType.STRING)) {
       return columnSelectorFactory.makeDimensionSelector(dimSpec);
     }
@@ -241,7 +244,8 @@
    */
   private static ColumnCapabilities getEffectiveCapabilities(
       DimensionSpec dimSpec,
-      @Nullable ColumnCapabilities capabilities
+      @Nullable ColumnCapabilities capabilities,
+      boolean supportsComplexTypes
   )
   {
     if (capabilities == null) {
@@ -249,7 +253,7 @@
     }
 
     // Complex dimension type is not supported
-    if (capabilities.is(ValueType.COMPLEX)) {
+    if (!supportsComplexTypes && capabilities.is(ValueType.COMPLEX)) {
       capabilities = DEFAULT_STRING_CAPABILITIES;
     }
 
@@ -289,8 +293,8 @@
       ColumnValueSelector<?> selector
   )
   {
-    capabilities = getEffectiveCapabilities(dimSpec, capabilities);
-    return strategyFactory.makeColumnSelectorStrategy(capabilities, selector);
+    capabilities = getEffectiveCapabilities(dimSpec, capabilities, strategyFactory.supportsComplexTypes());
+    return strategyFactory.makeColumnSelectorStrategy(capabilities, selector, dimSpec.getDimension());
   }
 
   @Nullable
@@ -403,19 +407,16 @@
       case STRING:
         return convertObjectToString(obj);
       case ARRAY:
-        switch (type.getElementType().getType()) {
-          case STRING:
-            return coerceToStringArray(obj);
-          case LONG:
-            return coerceToObjectArrayWithElementCoercionFunction(obj, DimensionHandlerUtils::convertObjectToLong);
-          case FLOAT:
-            return coerceToObjectArrayWithElementCoercionFunction(obj, DimensionHandlerUtils::convertObjectToFloat);
-          case DOUBLE:
-            return coerceToObjectArrayWithElementCoercionFunction(obj, DimensionHandlerUtils::convertObjectToDouble);
-        }
-
+        return coerceToObjectArrayWithElementCoercionFunction(
+            obj,
+            x -> DimensionHandlerUtils.convertObjectToType(x, type.getElementType())
+        );
+      case COMPLEX:
+        // Can't coerce complex objects, and we shouldn't need to. If in future selectors behave weirdly, or we need to
+        // cast them (for some unknown reason), we can have that casting knowledge in the type strategy
+        return obj;
       default:
-        throw new IAE("Type[%s] is not supported for dimensions!", type);
+        throw DruidException.defensive("Type[%s] is not supported for dimensions!", type);
     }
   }
 
@@ -426,8 +427,9 @@
   }
 
 
+  @Nullable
   public static Object[] coerceToObjectArrayWithElementCoercionFunction(
-      Object obj,
+      @Nullable Object obj,
       Function<Object, Object> coercionFunction
   )
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java
index f7ed029..044a092 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/NullableTypeStrategy.java
@@ -19,8 +19,10 @@
 
 package org.apache.druid.segment.column;
 
+import it.unimi.dsi.fastutil.Hash;
 import org.apache.druid.common.config.NullHandling;
 
+import javax.annotation.CheckReturnValue;
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
@@ -37,7 +39,7 @@
  *
  * @see TypeStrategy
  */
-public final class NullableTypeStrategy<T> implements Comparator<T>
+public final class NullableTypeStrategy<T> implements Comparator<T>, Hash.Strategy<T>
 {
   private final TypeStrategy<T> delegate;
   private final Comparator<T> delegateComparator;
@@ -66,6 +68,7 @@
     return delegate.read(buffer);
   }
 
+  @CheckReturnValue
   public int write(ByteBuffer buffer, @Nullable T value, int maxSizeBytes)
   {
     final int max = Math.min(buffer.limit() - buffer.position(), maxSizeBytes);
@@ -112,6 +115,7 @@
     return delegate.readRetainsBufferReference();
   }
 
+  @CheckReturnValue
   public int write(ByteBuffer buffer, int offset, @Nullable T value, int maxSizeBytes)
   {
     final int oldPosition = buffer.position();
@@ -129,4 +133,24 @@
   {
     return delegateComparator.compare(o1, o2);
   }
+
+  public boolean groupable()
+  {
+    return delegate.groupable();
+  }
+
+  @Override
+  public int hashCode(@Nullable T o)
+  {
+    return o == null ? 0 : delegate.hashCode(o);
+  }
+
+  @Override
+  public boolean equals(@Nullable T a, @Nullable T b)
+  {
+    if (a == null) {
+      return b == null;
+    }
+    return b != null && delegate.equals(a, b);
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
index d40ee5d..267477e 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.segment.column;
 
+import it.unimi.dsi.fastutil.Hash;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.segment.data.ObjectStrategy;
 
 import javax.annotation.Nullable;
@@ -27,7 +29,7 @@
 /**
  * Default implementation of {@link TypeStrategy} for all {@link org.apache.druid.segment.serde.ComplexMetricSerde}
  * implementations that just wraps the {@link ObjectStrategy} they are required to implement.
- *
+ * <p>
  * This is not likely to be the most efficient way to do things, especially since writing must first produce a byte
  * array before it can be written to the buffer, but it is cheap and should work correctly, which is important.
  */
@@ -35,11 +37,24 @@
 {
   private final ObjectStrategy<T> objectStrategy;
   private final TypeSignature<?> typeSignature;
+  @Nullable
+  private final Hash.Strategy<T> hashStrategy;
 
   public ObjectStrategyComplexTypeStrategy(ObjectStrategy<T> objectStrategy, TypeSignature<?> signature)
   {
+    this(objectStrategy, signature, null);
+  }
+
+  public ObjectStrategyComplexTypeStrategy(
+      ObjectStrategy<T> objectStrategy,
+      TypeSignature<?> signature,
+      @Nullable final Hash.Strategy<T> hashStrategy
+  )
+  {
     this.objectStrategy = objectStrategy;
     this.typeSignature = signature;
+    this.hashStrategy = hashStrategy;
+
   }
 
   @Override
@@ -94,4 +109,28 @@
   {
     return objectStrategy.fromByteBufferSafe(ByteBuffer.wrap(value), value.length);
   }
+
+  @Override
+  public boolean groupable()
+  {
+    return hashStrategy != null;
+  }
+
+  @Override
+  public int hashCode(T o)
+  {
+    if (hashStrategy == null) {
+      throw DruidException.defensive("hashStrategy not provided");
+    }
+    return hashStrategy.hashCode(o);
+  }
+
+  @Override
+  public boolean equals(T a, T b)
+  {
+    if (hashStrategy == null) {
+      throw DruidException.defensive("hashStrategy not provided");
+    }
+    return hashStrategy.equals(a, b);
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
index 3afcfdb..bae2917 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
@@ -258,6 +258,12 @@
     }
 
     @Override
+    public boolean groupable()
+    {
+      return true;
+    }
+
+    @Override
     public boolean readRetainsBufferReference()
     {
       return false;
@@ -281,6 +287,18 @@
     {
       return Longs.compare(((Number) o1).longValue(), ((Number) o2).longValue());
     }
+
+    @Override
+    public int hashCode(Long o)
+    {
+      return o.hashCode();
+    }
+
+    @Override
+    public boolean equals(Long a, Long b)
+    {
+      return a.equals(b);
+    }
   }
 
   /**
@@ -309,6 +327,12 @@
     }
 
     @Override
+    public boolean groupable()
+    {
+      return true;
+    }
+
+    @Override
     public boolean readRetainsBufferReference()
     {
       return false;
@@ -332,6 +356,18 @@
     {
       return Floats.compare(((Number) o1).floatValue(), ((Number) o2).floatValue());
     }
+
+    @Override
+    public int hashCode(Float o)
+    {
+      return o.hashCode();
+    }
+
+    @Override
+    public boolean equals(Float a, Float b)
+    {
+      return a.equals(b);
+    }
   }
 
   /**
@@ -361,6 +397,12 @@
     }
 
     @Override
+    public boolean groupable()
+    {
+      return true;
+    }
+
+    @Override
     public boolean readRetainsBufferReference()
     {
       return false;
@@ -384,6 +426,18 @@
     {
       return Double.compare(((Number) o1).doubleValue(), ((Number) o2).doubleValue());
     }
+
+    @Override
+    public int hashCode(Double o)
+    {
+      return o.hashCode();
+    }
+
+    @Override
+    public boolean equals(Double a, Double b)
+    {
+      return a.equals(b);
+    }
   }
 
   /**
@@ -435,6 +489,12 @@
     }
 
     @Override
+    public boolean groupable()
+    {
+      return true;
+    }
+
+    @Override
     public int compare(Object s, Object s2)
     {
       // copy of lexicographical string comparator in druid processing
@@ -447,6 +507,18 @@
 
       return ORDERING.compare((String) s, (String) s2);
     }
+
+    @Override
+    public int hashCode(String o)
+    {
+      return o.hashCode();
+    }
+
+    @Override
+    public boolean equals(String a, String b)
+    {
+      return a.equals(b);
+    }
   }
 
   /**
@@ -519,6 +591,12 @@
     }
 
     @Override
+    public boolean groupable()
+    {
+      return elementStrategy.groupable();
+    }
+
+    @Override
     public int compare(@Nullable Object o1Obj, @Nullable Object o2Obj)
     {
       Object[] o1 = (Object[]) o1Obj;
@@ -544,5 +622,47 @@
       }
       return Integer.compare(o1.length, o2.length);
     }
+
+    /**
+     * Implements {@link Arrays#hashCode(Object[])} but the element hashing uses the element's type strategy
+     */
+    @Override
+    public int hashCode(Object[] o)
+    {
+      if (o == null) {
+        return 0;
+      } else {
+        int result = 1;
+        for (Object element : o) {
+          result = 31 * result + (element == null ? 0 : elementStrategy.hashCode(element));
+        }
+        return result;
+      }
+    }
+    /**
+     * Implements {@link Arrays#equals} but the element equality uses the element's type strategy
+     */
+    @Override
+    public boolean equals(@Nullable Object[] a, @Nullable Object[] b)
+    {
+      //noinspection ArrayEquality
+      if (a == b) {
+        return true;
+      } else if (a != null && b != null) {
+        int length = a.length;
+        if (b.length != length) {
+          return false;
+        } else {
+          for (int i = 0; i < length; ++i) {
+            if (!elementStrategy.equals(a[i], b[i])) {
+              return false;
+            }
+          }
+          return true;
+        }
+      } else {
+        return false;
+      }
+    }
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
index e4856f8..3d2493b 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
@@ -19,7 +19,9 @@
 
 package org.apache.druid.segment.column;
 
+import it.unimi.dsi.fastutil.Hash;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
 
 import java.nio.ByteBuffer;
 import java.util.Comparator;
@@ -63,7 +65,7 @@
  * {@code Comparator<Number>}.  So, we fall back to effectively erasing the generic type and having them all be
  * {@code Comparator<Object>}.
  */
-public interface TypeStrategy<T> extends Comparator<Object>
+public interface TypeStrategy<T> extends Comparator<Object>, Hash.Strategy<T>
 {
   /**
    * Estimate the size in bytes that writing this value to memory would require. This method is not required to be
@@ -170,4 +172,30 @@
   {
     throw new IllegalStateException("Not supported");
   }
+
+  /**
+   * Whether the type is groupable or not. This is always true for all the primitive types, arrays, and nested arrays
+   * therefore the SQL and the native layer might ignore this flag for those types. For complex types, this flag can be
+   * true or false, depending on whether the semantics and implementation of the type naturally leads to groupability
+   * or not. For example, it makes sense for JSON columns to be groupable, however there is little sense in grouping
+   * sketches (before finalizing).
+   *
+   * If a type is groupable, it MUST implement the {@link #hashCode} and {@link #equals} correctly
+   */
+  default boolean groupable()
+  {
+    return false;
+  }
+
+  @Override
+  default int hashCode(T o)
+  {
+    throw DruidException.defensive("Not implemented. Check groupable() first");
+  }
+
+  @Override
+  default boolean equals(T a, T b)
+  {
+    throw DruidException.defensive("Not implemented. Check groupable() first");
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/data/ComparableIntArray.java b/processing/src/main/java/org/apache/druid/segment/data/ComparableIntArray.java
deleted file mode 100644
index 7769e98..0000000
--- a/processing/src/main/java/org/apache/druid/segment/data/ComparableIntArray.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.data;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonValue;
-
-import java.util.Arrays;
-
-public class ComparableIntArray implements Comparable<ComparableIntArray>
-{
-  public static final ComparableIntArray EMPTY_ARRAY = new ComparableIntArray(new int[0]);
-
-  final int[] delegate;
-  private int hashCode;
-  private boolean hashCodeComputed;
-
-  private ComparableIntArray(int[] array)
-  {
-    delegate = array;
-  }
-
-  @JsonCreator
-  public static ComparableIntArray of(int... array)
-  {
-    if (array.length == 0) {
-      return EMPTY_ARRAY;
-    } else {
-      return new ComparableIntArray(array);
-    }
-  }
-
-  @JsonValue
-  public int[] getDelegate()
-  {
-    return delegate;
-  }
-
-  @Override
-  public int hashCode()
-  {
-    // Check is not thread-safe, but that's fine. Even if used by multiple threads, it's ok to write these primitive
-    // fields more than once.
-    // As ComparableIntArray is used in hot loop caching the hashcode
-    if (!hashCodeComputed) {
-      hashCode = Arrays.hashCode(delegate);
-      hashCodeComputed = true;
-    }
-
-    return hashCode;
-  }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-
-    return Arrays.equals(delegate, ((ComparableIntArray) obj).getDelegate());
-  }
-
-  @Override
-  public int compareTo(ComparableIntArray rhs)
-  {
-    // rhs.getDelegate() cannot be null
-    if (rhs == null) {
-      return 1;
-    }
-    final int minSize = Math.min(this.getDelegate().length, rhs.getDelegate().length);
-    //noinspection ArrayEquality
-    if (this.delegate == rhs.getDelegate()) {
-      return 0;
-    } else {
-      for (int i = 0; i < minSize; i++) {
-        //int's cant be null
-        final int cmp = Integer.compare(delegate[i], rhs.getDelegate()[i]);
-        if (cmp == 0) {
-          continue;
-        }
-        return cmp;
-      }
-      if (this.getDelegate().length == rhs.getDelegate().length) {
-        return 0;
-      } else if (this.getDelegate().length < rhs.getDelegate().length) {
-        return -1;
-      } else {
-        return 1;
-      }
-    }
-  }
-
-  @Override
-  public String toString()
-  {
-    return Arrays.toString(delegate);
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java b/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
index c6b05a4..0727309 100644
--- a/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
+++ b/processing/src/main/java/org/apache/druid/segment/generator/GeneratorBasicSchemas.java
@@ -35,6 +35,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -365,6 +366,50 @@
   }
 
   static {
+    // schema for benchmarking group-by
+    List<GeneratorColumnSchema> expressionsTestsSchemaColumns = ImmutableList.of(
+        // string dims
+        GeneratorColumnSchema.makeSequential("string-Sequential-100_000", ValueType.STRING, false, 1, null, 0, 100_000),
+        GeneratorColumnSchema.makeSequential("string-Sequential-10_000_000", ValueType.STRING, false, 1, null, 0, 10_000_000),
+        // GeneratorColumnSchema.makeSequential("string-Sequential-1_000_000_000", ValueType.STRING, false, 1, null, 0, 1_000_000_000),
+        GeneratorColumnSchema.makeLazyZipf("string-ZipF-1_000_000", ValueType.STRING, false, 1, 0.1, 0, 1_000_000, 2.0),
+        GeneratorColumnSchema.makeLazyDiscreteUniform("string-Uniform-1_000_000", ValueType.STRING, false, 1, 0.3, 0, 1_000_000),
+
+        // multi string dims
+        GeneratorColumnSchema.makeSequential("multi-string-Sequential-100_000", ValueType.STRING, false, 8, null, 0, 100_000),
+        GeneratorColumnSchema.makeSequential("multi-string-Sequential-10_000_000", ValueType.STRING, false, 8, null, 0, 10_000_000),
+        // GeneratorColumnSchema.makeSequential("multi-string-Sequential-1_000_000_000", ValueType.STRING, false, 8, null, 0, 1_000_000_000),
+        GeneratorColumnSchema.makeLazyZipf("multi-string-ZipF-1_000_000", ValueType.STRING, false, 16, 0.1, 0, 1_000_000, 2.0),
+        GeneratorColumnSchema.makeLazyDiscreteUniform("multi-string-Uniform-1_000_000", ValueType.STRING, false, 4, null, 0, 1_000_000),
+
+        // numeric dims
+        GeneratorColumnSchema.makeSequential("long-Sequential-100_000", ValueType.LONG, false, 1, null, 0, 100_000),
+        GeneratorColumnSchema.makeSequential("long-Sequential-10_000_000", ValueType.LONG, false, 1, null, 0, 10_000_000),
+        // GeneratorColumnSchema.makeSequential("long-Sequential-1_000_000_000", ValueType.LONG, false, 1, null, 0, 1_000_000_000),
+        GeneratorColumnSchema.makeLazyZipf("long-ZipF-1_000_000", ValueType.LONG, false, 1, 0.1, 0, 1_000_000, 2.0),
+        GeneratorColumnSchema.makeLazyDiscreteUniform("long-Uniform-1_000_000", ValueType.LONG, false, 1, 0.3, 0, 1_000_000),
+
+        GeneratorColumnSchema.makeLazyZipf("double-ZipF-1_000_000", ValueType.DOUBLE, false, 1, 0.1, 0, 1_000_000, 2.0),
+        GeneratorColumnSchema.makeContinuousUniform("double-Uniform-1_000_000", ValueType.DOUBLE, false, 1, null, 0.0, 1_000_000.0),
+
+        GeneratorColumnSchema.makeLazyZipf("float-ZipF-1_000_000", ValueType.FLOAT, false, 1, 0.1, 0, 1_000_000, 2.0),
+        GeneratorColumnSchema.makeContinuousUniform("float-Uniform-1_000_000", ValueType.FLOAT, false, 1, null, 0.0, 1_000_000.0)
+        // Generate the array dims, and the complex value dims by wrapping the pre-existing primitive dims within simple expressions
+    );
+
+    Interval interval = Intervals.of("2000-01-01/P1D");
+
+    GeneratorSchemaInfo groupByTestsSchema = new GeneratorSchemaInfo(
+        expressionsTestsSchemaColumns,
+        Collections.emptyList(),
+        interval,
+        false
+    );
+
+    SCHEMA_INFO_BUILDER.put("groupBy-testbench", groupByTestsSchema);
+  }
+
+  static {
     List<GeneratorColumnSchema> inTestsSchemaColumns = ImmutableList.of(
         GeneratorColumnSchema.makeSequential("long1", ValueType.LONG, false, 1, null, 0, 40000),
         GeneratorColumnSchema.makeSequential("string1", ValueType.STRING, false, 1, null, 0, 40000)
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
index c7c5695..4d1bb34 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
+import it.unimi.dsi.fastutil.Hash;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.guice.NestedDataModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -37,6 +38,8 @@
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ColumnFormat;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
+import org.apache.druid.segment.column.TypeStrategy;
 import org.apache.druid.segment.data.ObjectStrategy;
 import org.apache.druid.segment.serde.ComplexMetricExtractor;
 import org.apache.druid.segment.serde.ComplexMetricSerde;
@@ -157,6 +160,29 @@
     };
   }
 
+  @Override
+  public <T extends Comparable<T>> TypeStrategy<T> getTypeStrategy()
+  {
+    return new ObjectStrategyComplexTypeStrategy<>(
+        getObjectStrategy(),
+        ColumnType.ofComplex(TYPE_NAME),
+        new Hash.Strategy<Object>()
+        {
+          @Override
+          public int hashCode(Object o)
+          {
+            return StructuredData.wrap(o).equalityHash();
+          }
+
+          @Override
+          public boolean equals(Object a, Object b)
+          {
+            return StructuredData.wrap(a).compareTo(StructuredData.wrap(b)) == 0;
+          }
+        }
+    );
+  }
+
   public static class NestedColumnFormatV4 implements ColumnFormat
   {
     @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/StructuredData.java b/processing/src/main/java/org/apache/druid/segment/nested/StructuredData.java
index 32513d6..9dfa79b 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/StructuredData.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/StructuredData.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.primitives.Longs;
 import net.jpountz.xxhash.XXHash64;
 import net.jpountz.xxhash.XXHashFactory;
 import org.apache.druid.java.util.common.guava.Comparators;
@@ -185,6 +186,12 @@
     return Objects.hash(value);
   }
 
+  // hashCode that relies on the object equality. Translates the hashcode to an integer as well
+  public int equalityHash()
+  {
+    return Longs.hashCode(hash.getAsLong());
+  }
+
   @Override
   public String toString()
   {
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 0b4d899..54cdcf1 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -9932,31 +9932,22 @@
   @Test
   public void testGroupByComplexColumn()
   {
+    cannotVectorize();
     GroupByQuery query = makeQueryBuilder()
         .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
         .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
-        .setDimensions(new DefaultDimensionSpec("quality_uniques", "quality_uniques"))
+        .setDimensions(new DefaultDimensionSpec(
+            "quality_uniques",
+            "quality_uniques",
+            HyperUniquesAggregatorFactory.TYPE
+        ))
         .setDimFilter(new SelectorDimFilter("quality_uniques", null, null))
         .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
         .setGranularity(QueryRunnerTestHelper.ALL_GRAN)
         .build();
 
-    Assert.assertEquals(Functions.<Sequence<ResultRow>>identity(), query.getLimitSpec().build(query));
-
-    List<ResultRow> expectedResults = Collections.singletonList(
-        makeRow(
-            query,
-            "2011-04-01",
-            "quality_uniques",
-            null,
-            "rows",
-            26L,
-            "idx",
-            12446L
-        )
-    );
-    Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
-    TestHelper.assertExpectedObjects(expectedResults, results, "long");
+    expectedException.expect(RuntimeException.class);
+    GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
   }
 
   @Test
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategyTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategyTest.java
deleted file mode 100644
index ad0c96c..0000000
--- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategyTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.nio.ByteBuffer;
-
-public class ArrayDoubleGroupByColumnSelectorStrategyTest
-{
-  private final ByteBuffer buffer1 = ByteBuffer.allocate(4);
-  private final ByteBuffer buffer2 = ByteBuffer.allocate(4);
-
-  private ArrayNumericGroupByColumnSelectorStrategy strategy;
-
-  @Before
-  public void setup()
-  {
-    strategy = new ArrayDoubleGroupByColumnSelectorStrategy();
-    addToStrategy(new Object[]{1.0, 2.0});
-    addToStrategy(ImmutableList.of(2.0, 3.0));
-    addToStrategy(new Double[]{1.0});
-  }
-
-  @Test
-  public void testKeySize()
-  {
-    Assert.assertEquals(Integer.BYTES, strategy.getGroupingKeySize());
-  }
-
-  @Test
-  public void testWriteKey()
-  {
-    strategy.writeToKeyBuffer(0, 1, buffer1);
-    Assert.assertEquals(1, buffer1.getInt(0));
-  }
-
-  @Test
-  public void testBufferComparatorsWithNullAndNonNullStringComprators()
-  {
-    buffer1.putInt(1);
-    buffer2.putInt(2);
-    Grouper.BufferComparator comparator = strategy.bufferComparator(0, null);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(buffer2, buffer1, 0, 0) < 0);
-
-    comparator = strategy.bufferComparator(0, StringComparators.LEXICOGRAPHIC);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(buffer2, buffer1, 0, 0) < 0);
-
-    comparator = strategy.bufferComparator(0, StringComparators.STRLEN);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(buffer2, buffer1, 0, 0) < 0);
-  }
-
-  @Test
-  public void testBufferComparator()
-  {
-    buffer1.putInt(0);
-    buffer2.putInt(2);
-    Grouper.BufferComparator comparator = strategy.bufferComparator(0, null);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-
-  }
-
-  @Test
-  public void testSanity()
-  {
-    testSanity(new Object[]{1.0, 2.0}, 0);
-    testSanity(new Object[]{2.0, 3.0}, 1);
-    testSanity(new Object[]{1.0}, 2);
-  }
-
-  private void testSanity(Object[] storedValue, int expectedIndex)
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(storedValue);
-    Assert.assertEquals(expectedIndex, strategy.computeDictionaryId(columnValueSelector));
-
-    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
-    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
-    ResultRow row = ResultRow.create(1);
-
-    buffer1.putInt(0, expectedIndex);
-    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
-    Assert.assertArrayEquals(storedValue, (Object[]) row.get(0));
-  }
-
-  @Test
-  public void testAddingInDictionary()
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(4.0, 2.0));
-    Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
-
-    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
-    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
-    ResultRow row = ResultRow.create(1);
-
-    buffer1.putInt(3);
-    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
-    Assert.assertArrayEquals(new Object[]{4.0, 2.0}, (Object[]) row.get(0));
-  }
-
-  @Test
-  public void testAddingInDictionaryWithObjects()
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(new Object[]{4.0D, 2.0D});
-    Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
-
-    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
-    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
-    ResultRow row = ResultRow.create(1);
-    buffer1.putInt(3);
-    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
-    Assert.assertArrayEquals(new Object[]{4.0, 2.0}, (Object[]) row.get(0));
-  }
-
-  private void addToStrategy(Object value)
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(value);
-    strategy.computeDictionaryId(columnValueSelector);
-  }
-
-  @After
-  public void tearDown()
-  {
-    buffer1.clear();
-    buffer2.clear();
-  }
-}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategyTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategyTest.java
deleted file mode 100644
index d8bc537..0000000
--- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategyTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.nio.ByteBuffer;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ArrayLongGroupByColumnSelectorStrategyTest
-{
-  private final ByteBuffer buffer1 = ByteBuffer.allocate(4);
-  private final ByteBuffer buffer2 = ByteBuffer.allocate(4);
-
-  private ArrayNumericGroupByColumnSelectorStrategy strategy;
-
-  @Before
-  public void setup()
-  {
-    strategy = new ArrayLongGroupByColumnSelectorStrategy();
-    addToStrategy(new Object[]{1L, 2L});
-    addToStrategy(ImmutableList.of(2L, 3L));
-    addToStrategy(new Long[]{1L});
-  }
-
-  @Test
-  public void testKeySize()
-  {
-    Assert.assertEquals(Integer.BYTES, strategy.getGroupingKeySize());
-  }
-
-  @Test
-  public void testWriteKey()
-  {
-    strategy.writeToKeyBuffer(0, 1, buffer1);
-    Assert.assertEquals(1, buffer1.getInt(0));
-  }
-
-  @Test
-  public void testBufferComparatorsWithNullAndNonNullStringComprators()
-  {
-    buffer1.putInt(1);
-    buffer2.putInt(2);
-    Grouper.BufferComparator comparator = strategy.bufferComparator(0, null);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(buffer2, buffer1, 0, 0) < 0);
-
-    comparator = strategy.bufferComparator(0, StringComparators.LEXICOGRAPHIC);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(buffer2, buffer1, 0, 0) < 0);
-
-    comparator = strategy.bufferComparator(0, StringComparators.STRLEN);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(buffer2, buffer1, 0, 0) < 0);
-  }
-
-  @Test
-  public void testBufferComparator()
-  {
-    buffer1.putInt(0);
-    buffer2.putInt(2);
-    Grouper.BufferComparator comparator = strategy.bufferComparator(0, null);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-
-  }
-
-  @Test
-  public void testSanity()
-  {
-    testSanity(new Object[]{1L, 2L}, 0);
-    testSanity(new Object[]{2L, 3L}, 1);
-    testSanity(new Object[]{1L}, 2);
-  }
-
-  private void testSanity(Object[] storedValue, int expectedIndex)
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(storedValue);
-    Assert.assertEquals(expectedIndex, strategy.computeDictionaryId(columnValueSelector));
-
-    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
-    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
-    ResultRow row = ResultRow.create(1);
-
-    buffer1.putInt(0, expectedIndex);
-    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
-    Assert.assertArrayEquals(storedValue, (Object[]) row.get(0));
-  }
-
-  @Test
-  public void testAddingInDictionary()
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(4L, 2L));
-    Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
-
-    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
-    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
-    ResultRow row = ResultRow.create(1);
-
-    buffer1.putInt(3);
-    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
-    Assert.assertArrayEquals(new Object[]{4L, 2L}, (Object[]) row.get(0));
-  }
-
-  @Test
-  public void testAddingInDictionaryWithObjects()
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(new Object[]{4L, 2L});
-    Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
-
-    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
-    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
-    ResultRow row = ResultRow.create(1);
-
-    buffer1.putInt(3);
-    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
-    Assert.assertArrayEquals(new Object[]{4L, 2L}, (Object[]) row.get(0));
-  }
-
-  private void addToStrategy(Object value)
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(value);
-    strategy.computeDictionaryId(columnValueSelector);
-  }
-
-  @After
-  public void tearDown()
-  {
-    buffer1.clear();
-    buffer2.clear();
-  }
-}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategyTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategyTest.java
deleted file mode 100644
index b3ed046..0000000
--- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategyTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.data.ComparableIntArray;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.nio.ByteBuffer;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ArrayStringGroupByColumnSelectorStrategyTest
-{
-  private final BiMap<String, Integer> dictionaryInt = HashBiMap.create();
-
-  // The dictionary has been constructed such that the values are not sorted lexicographically
-  // so we can tell when the comparator uses a lexicographic comparison and when it uses the indexes.
-  private final BiMap<ComparableIntArray, Integer> indexedIntArrays = HashBiMap.create();
-
-  private final ByteBuffer buffer1 = ByteBuffer.allocate(4);
-  private final ByteBuffer buffer2 = ByteBuffer.allocate(4);
-
-  private ArrayStringGroupByColumnSelectorStrategy strategy;
-
-  @Before
-  public void setup()
-  {
-    strategy = new ArrayStringGroupByColumnSelectorStrategy(dictionaryInt, indexedIntArrays);
-
-    dictionaryInt.put("a", 0);
-    dictionaryInt.put("b", 1);
-    dictionaryInt.put("bd", 2);
-    dictionaryInt.put("d", 3);
-    dictionaryInt.put("e", 4);
-
-    indexedIntArrays.put(ComparableIntArray.of(0, 1), 0);
-    indexedIntArrays.put(ComparableIntArray.of(2, 4), 1);
-    indexedIntArrays.put(ComparableIntArray.of(0, 2), 2);
-  }
-
-  @Test
-  public void testKeySize()
-  {
-    Assert.assertEquals(Integer.BYTES, strategy.getGroupingKeySize());
-  }
-
-  @Test
-  public void testWriteKey()
-  {
-    strategy.writeToKeyBuffer(0, 1, buffer1);
-    Assert.assertEquals(1, buffer1.getInt(0));
-  }
-
-  @Test
-  public void testBufferComparatorCanCompareIntsAndNullStringComparatorShouldUseLexicographicComparator()
-  {
-    buffer1.putInt(1);
-    buffer2.putInt(2);
-    Grouper.BufferComparator comparator = strategy.bufferComparator(0, null);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(buffer2, buffer1, 0, 0) < 0);
-  }
-
-  @Test
-  public void testBufferComparatorCanCompareIntsAndLexicographicStringComparatorShouldUseLexicographicComparator()
-  {
-    buffer1.putInt(1);
-    buffer2.putInt(2);
-    Grouper.BufferComparator comparator = strategy.bufferComparator(0, StringComparators.LEXICOGRAPHIC);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(buffer2, buffer1, 0, 0) < 0);
-  }
-
-  @Test
-  public void testBufferComparatorCanCompareIntsAndStrLenStringComparatorShouldUseLexicographicComparator()
-  {
-    buffer1.putInt(1);
-    buffer2.putInt(2);
-    Grouper.BufferComparator comparator = strategy.bufferComparator(0, StringComparators.STRLEN);
-    Assert.assertTrue(comparator.compare(buffer1, buffer2, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(buffer2, buffer1, 0, 0) < 0);
-  }
-
-  @Test
-  public void testSanity()
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of("a", "b"));
-    Assert.assertEquals(0, strategy.computeDictionaryId(columnValueSelector));
-
-    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
-    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
-    ResultRow row = ResultRow.create(1);
-
-    buffer1.putInt(0);
-    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
-    Assert.assertArrayEquals(new Object[]{"a", "b"}, (Object[]) row.get(0));
-  }
-
-
-  @Test
-  public void testAddingInDictionary()
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of("f", "a"));
-    Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
-
-    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
-    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
-    ResultRow row = ResultRow.create(1);
-
-    buffer1.putInt(3);
-    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
-    Assert.assertArrayEquals(new Object[]{"f", "a"}, (Object[]) row.get(0));
-  }
-
-  @Test
-  public void testAddingInDictionaryWithObjects()
-  {
-    ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
-    Mockito.when(columnValueSelector.getObject()).thenReturn(new Object[]{"f", "a"});
-    Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
-
-    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
-    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
-    ResultRow row = ResultRow.create(1);
-
-    buffer1.putInt(3);
-    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
-    Assert.assertArrayEquals(new Object[]{"f", "a"}, (Object[]) row.get(0));
-  }
-
-  @After
-  public void tearDown()
-  {
-    buffer1.clear();
-    buffer2.clear();
-  }
-}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/FixedWidthGroupByColumnSelectorStrategyTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/FixedWidthGroupByColumnSelectorStrategyTest.java
new file mode 100644
index 0000000..d427cb8
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/FixedWidthGroupByColumnSelectorStrategyTest.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.column;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.IterableRowsCursorHelper;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.groupby.epinephelinae.GroupByColumnSelectorStrategyFactory;
+import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class FixedWidthGroupByColumnSelectorStrategyTest extends InitializedNullHandlingTest
+{
+  private static final List<Object[]> DATASOURCE_ROWS = ImmutableList.of(
+      new Object[]{1L, 1.0f, 1.0d},
+      new Object[]{2L, 2.0f, 2.0d},
+      new Object[]{null, null, null},
+      new Object[]{3L, 3.0f, 3.0d}
+  );
+  private static final GroupByColumnSelectorStrategyFactory STRATEGY_FACTORY = new GroupByColumnSelectorStrategyFactory();
+  private static final ByteBuffer BUFFER1 = ByteBuffer.allocate(10);
+  private static final ByteBuffer BUFFER2 = ByteBuffer.allocate(10);
+  private static final String LONG_COLUMN = "long";
+  private static final String FLOAT_COLUMN = "float";
+  private static final String DOUBLE_COLUMN = "double";
+
+  public static class LongGroupByColumnSelectorStrategyTest
+  {
+    private static final GroupByColumnSelectorStrategy STRATEGY =
+        STRATEGY_FACTORY.makeColumnSelectorStrategy(
+            createCursor().getColumnSelectorFactory().getColumnCapabilities(LONG_COLUMN),
+            createCursor().getColumnSelectorFactory().makeColumnValueSelector(LONG_COLUMN),
+            "dimension"
+        );
+
+    @Test
+    public void testKeySize()
+    {
+      Assert.assertEquals(Byte.BYTES + Long.BYTES, STRATEGY.getGroupingKeySizeBytes());
+    }
+
+    @Test
+    public void testWriteToKeyBuffer()
+    {
+      Cursor cursor = createCursor();
+      ResultRow resultRow = ResultRow.create(1);
+      ColumnValueSelector columnValueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(LONG_COLUMN);
+      GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
+      Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
+
+      int rowNum = 0;
+      while (!cursor.isDone()) {
+        // Check if the round trip serde produces the same results
+        int sizeIncrease = STRATEGY.writeToKeyBuffer(0, columnValueSelector, BUFFER1);
+        STRATEGY.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+        // There shouldn't be any internal size increase associated with the fixed width types
+        Assert.assertEquals(0, sizeIncrease);
+        Assert.assertEquals(NullHandling.nullToEmptyIfNeeded((Long) DATASOURCE_ROWS.get(rowNum)[0]), resultRow.get(0));
+        cursor.advance();
+        ++rowNum;
+      }
+    }
+
+    @Test
+    public void testInitColumnValues()
+    {
+      Cursor cursor = createCursor();
+      ColumnValueSelector columnValueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(LONG_COLUMN);
+      Object[] valuess = new Object[1];
+
+      int rowNum = 0;
+      while (!cursor.isDone()) {
+        int sizeIncrease = STRATEGY.initColumnValues(columnValueSelector, 0, valuess);
+        Assert.assertEquals(0, sizeIncrease);
+        Assert.assertEquals(NullHandling.nullToEmptyIfNeeded((Long) DATASOURCE_ROWS.get(rowNum)[0]), valuess[0]);
+        cursor.advance();
+        ++rowNum;
+      }
+    }
+
+    @Test
+    public void testBufferComparator()
+    {
+      // lhs < rhs
+      writeGroupingKeyToBuffer(BUFFER1, 100L);
+      writeGroupingKeyToBuffer(BUFFER2, 200L);
+      Assert.assertEquals(-1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs == rhs
+      writeGroupingKeyToBuffer(BUFFER1, 100L);
+      writeGroupingKeyToBuffer(BUFFER2, 100L);
+      Assert.assertEquals(0, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs > rhs
+      writeGroupingKeyToBuffer(BUFFER1, 200L);
+      writeGroupingKeyToBuffer(BUFFER2, 100L);
+      Assert.assertEquals(1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs is null
+      writeGroupingKeyToBuffer(BUFFER1, null);
+      writeGroupingKeyToBuffer(BUFFER2, 0L);
+      Assert.assertEquals(-1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // rhs is null
+      writeGroupingKeyToBuffer(BUFFER1, 0L);
+      writeGroupingKeyToBuffer(BUFFER2, null);
+      Assert.assertEquals(1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs and rhs are null
+      writeGroupingKeyToBuffer(BUFFER1, null);
+      writeGroupingKeyToBuffer(BUFFER2, null);
+      Assert.assertEquals(0, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // stringComparator is provided, for lexicographic comparator "2" > "100"
+      writeGroupingKeyToBuffer(BUFFER1, 2L);
+      writeGroupingKeyToBuffer(BUFFER2, 100L);
+      Assert.assertEquals(
+          1,
+          STRATEGY.bufferComparator(0, StringComparators.LEXICOGRAPHIC)
+                  .compare(BUFFER1, BUFFER2, 0, 0)
+      );
+
+      // stringComparator is provided, for alphanumeric comparator number("2") < number("100")
+      writeGroupingKeyToBuffer(BUFFER1, 2L);
+      writeGroupingKeyToBuffer(BUFFER2, 100L);
+      Assert.assertEquals(
+          -1,
+          STRATEGY.bufferComparator(0, StringComparators.ALPHANUMERIC)
+                  .compare(BUFFER1, BUFFER2, 0, 0)
+      );
+    }
+
+    private static void writeGroupingKeyToBuffer(final ByteBuffer buffer, @Nullable Long key)
+    {
+      ColumnValueSelector columnValueSelector1 = Mockito.mock(ColumnValueSelector.class);
+
+      Mockito.when(columnValueSelector1.getObject()).thenReturn(key);
+      Mockito.when(columnValueSelector1.getLong()).thenReturn(key == null ? 0 : key);
+      Mockito.when(columnValueSelector1.isNull()).thenReturn(key == null);
+
+      Assert.assertEquals(0, STRATEGY.writeToKeyBuffer(0, columnValueSelector1, buffer));
+    }
+
+    @Test
+    public void testMultiValueHandling()
+    {
+      // Returns false, because fixed width strategy doesn't handle multi-value dimensions, therefore it returns false
+      Assert.assertFalse(STRATEGY.checkRowIndexAndAddValueToGroupingKey(0, 1L, 0, BUFFER1));
+      Assert.assertFalse(STRATEGY.checkRowIndexAndAddValueToGroupingKey(0, 1L, 10, BUFFER1));
+    }
+
+    @Test
+    public void testInitGroupingKeyColumnValue()
+    {
+      GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
+      Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
+      int[] stack = new int[1];
+      ResultRow resultRow = ResultRow.create(1);
+
+      STRATEGY.initGroupingKeyColumnValue(0, 0, 1001L, BUFFER1, stack);
+      Assert.assertEquals(1, stack[0]);
+      STRATEGY.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+      Assert.assertEquals(1001L, resultRow.get(0));
+
+
+      STRATEGY.initGroupingKeyColumnValue(0, 0, null, BUFFER1, stack);
+      Assert.assertEquals(0, stack[0]);
+      STRATEGY.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+      Assert.assertEquals(null, resultRow.get(0));
+    }
+  }
+
+  public static class FloatGroupByColumnSelectorStrategyTest
+  {
+    private static final GroupByColumnSelectorStrategy STRATEGY =
+        STRATEGY_FACTORY.makeColumnSelectorStrategy(
+            createCursor().getColumnSelectorFactory().getColumnCapabilities(FLOAT_COLUMN),
+            createCursor().getColumnSelectorFactory().makeColumnValueSelector(FLOAT_COLUMN),
+            "dimension"
+        );
+
+    @Test
+    public void testKeySize()
+    {
+      Assert.assertEquals(Byte.BYTES + Float.BYTES, STRATEGY.getGroupingKeySizeBytes());
+    }
+
+    @Test
+    public void testWriteToKeyBuffer()
+    {
+      Cursor cursor = createCursor();
+      ResultRow resultRow = ResultRow.create(1);
+      ColumnValueSelector columnValueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(FLOAT_COLUMN);
+      GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
+      Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
+
+      int rowNum = 0;
+      while (!cursor.isDone()) {
+        int sizeIncrease = STRATEGY.writeToKeyBuffer(0, columnValueSelector, BUFFER1);
+        STRATEGY.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+        Assert.assertEquals(0, sizeIncrease);
+        Assert.assertEquals(NullHandling.nullToEmptyIfNeeded((Float) DATASOURCE_ROWS.get(rowNum)[1]), resultRow.get(0));
+        cursor.advance();
+        ++rowNum;
+      }
+    }
+
+    @Test
+    public void testInitColumnValues()
+    {
+      Cursor cursor = createCursor();
+      ColumnValueSelector columnValueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(FLOAT_COLUMN);
+      Object[] valuess = new Object[1];
+
+      int rowNum = 0;
+      while (!cursor.isDone()) {
+        int sizeIncrease = STRATEGY.initColumnValues(columnValueSelector, 0, valuess);
+        Assert.assertEquals(0, sizeIncrease);
+        Assert.assertEquals(NullHandling.nullToEmptyIfNeeded((Float) DATASOURCE_ROWS.get(rowNum)[1]), valuess[0]);
+        cursor.advance();
+        ++rowNum;
+      }
+    }
+
+    @Test
+    public void testBufferComparator()
+    {
+      // lhs < rhs
+      writeGroupingKeyToBuffer(BUFFER1, 100.0F);
+      writeGroupingKeyToBuffer(BUFFER2, 200.0F);
+      Assert.assertEquals(-1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs == rhs
+      writeGroupingKeyToBuffer(BUFFER1, 100.0F);
+      writeGroupingKeyToBuffer(BUFFER2, 100.0F);
+      Assert.assertEquals(0, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs > rhs
+      writeGroupingKeyToBuffer(BUFFER1, 200.0F);
+      writeGroupingKeyToBuffer(BUFFER2, 100.0F);
+      Assert.assertEquals(1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs is null
+      writeGroupingKeyToBuffer(BUFFER1, null);
+      writeGroupingKeyToBuffer(BUFFER2, 0.0F);
+      Assert.assertEquals(-1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // rhs is null
+      writeGroupingKeyToBuffer(BUFFER1, 0.0F);
+      writeGroupingKeyToBuffer(BUFFER2, null);
+      Assert.assertEquals(1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs and rhs are null
+      writeGroupingKeyToBuffer(BUFFER1, null);
+      writeGroupingKeyToBuffer(BUFFER2, null);
+      Assert.assertEquals(0, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // stringComparator is provided, for lexicographic comparator "2.0" > "100.0"
+      writeGroupingKeyToBuffer(BUFFER1, 2.0F);
+      writeGroupingKeyToBuffer(BUFFER2, 100.0F);
+      Assert.assertEquals(
+          1,
+          STRATEGY.bufferComparator(0, StringComparators.LEXICOGRAPHIC)
+                  .compare(BUFFER1, BUFFER2, 0, 0)
+      );
+
+      // stringComparator is provided, for alphanumeric comparator number("2") < number("100")
+      writeGroupingKeyToBuffer(BUFFER1, 2.0F);
+      writeGroupingKeyToBuffer(BUFFER2, 100.0F);
+      Assert.assertEquals(
+          -1,
+          STRATEGY.bufferComparator(0, StringComparators.ALPHANUMERIC)
+                  .compare(BUFFER1, BUFFER2, 0, 0)
+      );
+    }
+
+    private static void writeGroupingKeyToBuffer(final ByteBuffer buffer, @Nullable Float key)
+    {
+      ColumnValueSelector columnValueSelector1 = Mockito.mock(ColumnValueSelector.class);
+
+      Mockito.when(columnValueSelector1.getObject()).thenReturn(key);
+      Mockito.when(columnValueSelector1.getFloat()).thenReturn(key == null ? 0.0f : key);
+      Mockito.when(columnValueSelector1.isNull()).thenReturn(key == null);
+
+      Assert.assertEquals(0, STRATEGY.writeToKeyBuffer(0, columnValueSelector1, buffer));
+    }
+
+    @Test
+    public void testMultiValueHandling()
+    {
+      // Returns false, because fixed width strategy doesn't handle multi-value dimensions, therefore it returns false
+      Assert.assertFalse(STRATEGY.checkRowIndexAndAddValueToGroupingKey(0, 1.0F, 0, BUFFER1));
+      Assert.assertFalse(STRATEGY.checkRowIndexAndAddValueToGroupingKey(0, 1.0F, 10, BUFFER1));
+    }
+
+    @Test
+    public void testInitGroupingKeyColumnValue()
+    {
+      GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
+      Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
+      int[] stack = new int[1];
+      ResultRow resultRow = ResultRow.create(1);
+
+      STRATEGY.initGroupingKeyColumnValue(0, 0, 1001.0F, BUFFER1, stack);
+      Assert.assertEquals(1, stack[0]);
+      STRATEGY.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+      Assert.assertEquals(1001.0F, resultRow.get(0));
+
+
+      STRATEGY.initGroupingKeyColumnValue(0, 0, null, BUFFER1, stack);
+      Assert.assertEquals(0, stack[0]);
+      STRATEGY.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+      Assert.assertEquals(null, resultRow.get(0));
+    }
+  }
+
+  public static class DoubleGroupByColumnSelectorStrategyTest
+  {
+    private static final GroupByColumnSelectorStrategy STRATEGY =
+        STRATEGY_FACTORY.makeColumnSelectorStrategy(
+            createCursor().getColumnSelectorFactory().getColumnCapabilities(DOUBLE_COLUMN),
+            createCursor().getColumnSelectorFactory().makeColumnValueSelector(DOUBLE_COLUMN),
+            "dimension"
+        );
+
+    @Test
+    public void testKeySize()
+    {
+      Assert.assertEquals(Byte.BYTES + Double.BYTES, STRATEGY.getGroupingKeySizeBytes());
+    }
+
+    @Test
+    public void testWriteToKeyBuffer()
+    {
+      Cursor cursor = createCursor();
+      ResultRow resultRow = ResultRow.create(1);
+      ColumnValueSelector columnValueSelector = cursor.getColumnSelectorFactory()
+                                                      .makeColumnValueSelector(DOUBLE_COLUMN);
+      GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
+      Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
+
+      int rowNum = 0;
+      while (!cursor.isDone()) {
+        int sizeIncrease = STRATEGY.writeToKeyBuffer(0, columnValueSelector, BUFFER1);
+        STRATEGY.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+        Assert.assertEquals(0, sizeIncrease);
+        Assert.assertEquals(
+            NullHandling.nullToEmptyIfNeeded((Double) DATASOURCE_ROWS.get(rowNum)[2]),
+            resultRow.get(0)
+        );
+        cursor.advance();
+        ++rowNum;
+      }
+    }
+
+    @Test
+    public void testInitColumnValues()
+    {
+      Cursor cursor = createCursor();
+      ColumnValueSelector columnValueSelector = cursor.getColumnSelectorFactory()
+                                                      .makeColumnValueSelector(DOUBLE_COLUMN);
+      Object[] valuess = new Object[1];
+
+      int rowNum = 0;
+      while (!cursor.isDone()) {
+        int sizeIncrease = STRATEGY.initColumnValues(columnValueSelector, 0, valuess);
+        Assert.assertEquals(0, sizeIncrease);
+        Assert.assertEquals(NullHandling.nullToEmptyIfNeeded((Double) DATASOURCE_ROWS.get(rowNum)[2]), valuess[0]);
+        cursor.advance();
+        ++rowNum;
+      }
+    }
+
+    @Test
+    public void testBufferComparator()
+    {
+      // lhs < rhs
+      writeGroupingKeyToBuffer(BUFFER1, 100.0D);
+      writeGroupingKeyToBuffer(BUFFER2, 200.0D);
+      Assert.assertEquals(-1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs == rhs
+      writeGroupingKeyToBuffer(BUFFER1, 100.0D);
+      writeGroupingKeyToBuffer(BUFFER2, 100.0D);
+      Assert.assertEquals(0, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs > rhs
+      writeGroupingKeyToBuffer(BUFFER1, 200.0D);
+      writeGroupingKeyToBuffer(BUFFER2, 100.0D);
+      Assert.assertEquals(1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs is null
+      writeGroupingKeyToBuffer(BUFFER1, null);
+      writeGroupingKeyToBuffer(BUFFER2, 0.0D);
+      Assert.assertEquals(-1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // rhs is null
+      writeGroupingKeyToBuffer(BUFFER1, 0.0D);
+      writeGroupingKeyToBuffer(BUFFER2, null);
+      Assert.assertEquals(1, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // lhs and rhs are null
+      writeGroupingKeyToBuffer(BUFFER1, null);
+      writeGroupingKeyToBuffer(BUFFER2, null);
+      Assert.assertEquals(0, STRATEGY.bufferComparator(0, null).compare(BUFFER1, BUFFER2, 0, 0));
+
+      // stringComparator is provided, for lexicographic comparator "2.0" > "100.0"
+      writeGroupingKeyToBuffer(BUFFER1, 2.0D);
+      writeGroupingKeyToBuffer(BUFFER2, 100.0D);
+      Assert.assertEquals(
+          1,
+          STRATEGY.bufferComparator(0, StringComparators.LEXICOGRAPHIC)
+                  .compare(BUFFER1, BUFFER2, 0, 0)
+      );
+
+      // stringComparator is provided, for alphanumeric comparator number("2.0D") < number("100.0D")
+      writeGroupingKeyToBuffer(BUFFER1, 2.0D);
+      writeGroupingKeyToBuffer(BUFFER2, 100.0D);
+      Assert.assertEquals(
+          -1,
+          STRATEGY.bufferComparator(0, StringComparators.ALPHANUMERIC)
+                  .compare(BUFFER1, BUFFER2, 0, 0)
+      );
+    }
+
+    private static void writeGroupingKeyToBuffer(final ByteBuffer buffer, @Nullable Double key)
+    {
+      ColumnValueSelector columnValueSelector1 = Mockito.mock(ColumnValueSelector.class);
+
+      Mockito.when(columnValueSelector1.getObject()).thenReturn(key);
+      Mockito.when(columnValueSelector1.getDouble()).thenReturn(key == null ? 0.0d : key);
+      Mockito.when(columnValueSelector1.isNull()).thenReturn(key == null);
+
+      Assert.assertEquals(0, STRATEGY.writeToKeyBuffer(0, columnValueSelector1, buffer));
+    }
+
+    @Test
+    public void testMultiValueHandling()
+    {
+      // Returns false, because fixed width strategy doesn't handle multi-value dimensions, therefore it returns false
+      Assert.assertFalse(STRATEGY.checkRowIndexAndAddValueToGroupingKey(0, 1.0D, 0, BUFFER1));
+      Assert.assertFalse(STRATEGY.checkRowIndexAndAddValueToGroupingKey(0, 1.0D, 10, BUFFER1));
+    }
+
+    @Test
+    public void testInitGroupingKeyColumnValue()
+    {
+      GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
+      Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
+      int[] stack = new int[1];
+      ResultRow resultRow = ResultRow.create(1);
+
+      STRATEGY.initGroupingKeyColumnValue(0, 0, 1001.0D, BUFFER1, stack);
+      Assert.assertEquals(1, stack[0]);
+      STRATEGY.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+      Assert.assertEquals(1001.0D, resultRow.get(0));
+
+
+      STRATEGY.initGroupingKeyColumnValue(0, 0, null, BUFFER1, stack);
+      Assert.assertEquals(0, stack[0]);
+      STRATEGY.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+      Assert.assertEquals(null, resultRow.get(0));
+    }
+  }
+
+  private static Cursor createCursor()
+  {
+    return IterableRowsCursorHelper.getCursorFromIterable(
+        DATASOURCE_ROWS,
+        RowSignature.builder()
+                    .add(LONG_COLUMN, ColumnType.LONG)
+                    .add(FLOAT_COLUMN, ColumnType.FLOAT)
+                    .add(DOUBLE_COLUMN, ColumnType.DOUBLE)
+                    .build()
+    ).lhs;
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/NestedColumnGroupByColumnSelectorStrategyTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/NestedColumnGroupByColumnSelectorStrategyTest.java
new file mode 100644
index 0000000..a35432c
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/NestedColumnGroupByColumnSelectorStrategyTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.column;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.guice.NestedDataModule;
+import org.apache.druid.query.IterableRowsCursorHelper;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.groupby.epinephelinae.GroupByColumnSelectorStrategyFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Serves as tests for class {@link DictionaryBuildingGroupByColumnSelectorStrategy} when a complex type is specified
+ */
+public class NestedColumnGroupByColumnSelectorStrategyTest extends InitializedNullHandlingTest
+{
+  static {
+    NestedDataModule.registerHandlersAndSerde();
+  }
+
+  private static final GroupByColumnSelectorStrategyFactory STRATEGY_FACTORY = new GroupByColumnSelectorStrategyFactory();
+
+  // No datasource would exist like this, however the inline datasource is an easy way to create the required column value selectors
+  private static final List<Object[]> DATASOURCE_ROWS = ImmutableList.of(
+      new Object[]{StructuredData.wrap(ImmutableList.of("x", "y", "z"))},
+      new Object[]{StructuredData.wrap(ImmutableMap.of("x", 1.1, "y", 2L))},
+      new Object[]{null},
+      new Object[]{StructuredData.wrap("hello")}
+  );
+
+  // Dictionary ids alloted to each object, in the column-0 of the DATASOURCE_ROWS, when building from scratch.
+  // null's dictionary id would be -1
+  private static final int[] DICT_IDS = new int[]{0, 1, -1, 2};
+
+  private static final String NESTED_COLUMN = "nested";
+  /**
+   * Row with null value in the column
+   */
+  private static final int NULL_ROW_NUMBER = 2;
+  private static final ByteBuffer BUFFER1 = ByteBuffer.allocate(10);
+  private static final ByteBuffer BUFFER2 = ByteBuffer.allocate(10);
+
+  @Test
+  public void testKeySize()
+  {
+    Assert.assertEquals(Integer.BYTES, createStrategy().getGroupingKeySizeBytes());
+  }
+
+  @Test
+  public void testInitColumnValues()
+  {
+    GroupByColumnSelectorStrategy strategy = createStrategy();
+    Cursor cursor = createCursor();
+    ColumnValueSelector columnValueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(NESTED_COLUMN);
+    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
+    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
+    Object[] valuess = new Object[1];
+    int rowNum = 0;
+    while (!cursor.isDone()) {
+      int sz = strategy.initColumnValues(columnValueSelector, 0, valuess);
+      // While adding the values for the first time, the initialisation should have a non-zero footprint, apart from the
+      // row with the null value
+      if (DATASOURCE_ROWS.get(rowNum)[0] == null) {
+        Assert.assertEquals(0, sz);
+      } else {
+        Assert.assertTrue(sz > 0);
+      }
+      Assert.assertEquals(DICT_IDS[rowNum], valuess[0]);
+
+      cursor.advance();
+      ++rowNum;
+    }
+
+    cursor = createCursor();
+    columnValueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(NESTED_COLUMN);
+    rowNum = 0;
+    while (!cursor.isDone()) {
+      int sz = strategy.initColumnValues(columnValueSelector, 0, valuess);
+      // While adding the values for the first time, the initialisation should have a non-zero footprint
+      Assert.assertEquals(0, sz);
+      Assert.assertEquals(DICT_IDS[rowNum], valuess[0]);
+
+      cursor.advance();
+      ++rowNum;
+    }
+  }
+
+  @Test
+  public void testWriteToKeyBuffer()
+  {
+    GroupByColumnSelectorStrategy strategy = createStrategy();
+    ResultRow resultRow = ResultRow.create(1);
+    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
+    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
+    Cursor cursor = createCursor();
+    ColumnValueSelector columnValueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(NESTED_COLUMN);
+
+    int rowNum = 0;
+    while (!cursor.isDone()) {
+      int sz = strategy.writeToKeyBuffer(0, columnValueSelector, BUFFER1);
+      if (DATASOURCE_ROWS.get(rowNum)[0] == null) {
+        Assert.assertEquals(0, sz);
+      } else {
+        Assert.assertTrue(sz > 0);
+      }
+      // null is represented by GROUP_BY_MISSING_VALUE on the buffer, even though it gets its own dictionaryId in the dictionary
+      Assert.assertEquals(DICT_IDS[rowNum], BUFFER1.getInt(0));
+      // Readback the value
+      strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+      Assert.assertEquals(DATASOURCE_ROWS.get(rowNum)[0], resultRow.get(0));
+
+      cursor.advance();
+      ++rowNum;
+    }
+  }
+
+  @Test
+  public void testInitGroupingKeyColumnValue()
+  {
+    GroupByColumnSelectorStrategy strategy = createStrategy();
+    GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
+    Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
+    int[] stack = new int[1];
+    ResultRow resultRow = ResultRow.create(1);
+
+    // Test nulls
+    strategy.initGroupingKeyColumnValue(0, 0, -1, BUFFER1, stack);
+    Assert.assertEquals(0, stack[0]);
+    strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, BUFFER1, resultRow, 0);
+    Assert.assertNull(resultRow.get(0));
+  }
+
+  // test reset works fine
+
+  private static GroupByColumnSelectorStrategy createStrategy()
+  {
+    return STRATEGY_FACTORY.makeColumnSelectorStrategy(
+        createCursor().getColumnSelectorFactory().getColumnCapabilities(NESTED_COLUMN),
+        createCursor().getColumnSelectorFactory().makeColumnValueSelector(NESTED_COLUMN),
+        "dimension"
+    );
+  }
+
+
+  private static Cursor createCursor()
+  {
+    return IterableRowsCursorHelper.getCursorFromIterable(
+        DATASOURCE_ROWS,
+        RowSignature.builder()
+                    .add(NESTED_COLUMN, ColumnType.NESTED_DATA)
+                    .build()
+    ).lhs;
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategyTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategyTest.java
deleted file mode 100644
index 86524bb..0000000
--- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategyTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.epinephelinae.column;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import org.apache.druid.query.groupby.epinephelinae.Grouper;
-import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.segment.column.ColumnCapabilities;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.nio.ByteBuffer;
-import java.util.function.IntFunction;
-
-@RunWith(MockitoJUnitRunner.class)
-public class StringGroupByColumnSelectorStrategyTest
-{
-  // The dictionary has been constructed such that the values are not sorted lexicographically
-  // so we can tell when the comparator uses a lexicographic comparison and when it uses the indexes.
-  private static final Int2ObjectMap<String> DICTIONARY = new Int2ObjectArrayMap<>(
-      new int[] {0, 1, 2},
-      new String[] {"A", "F1", "D"}
-  );
-
-  private final ByteBuffer lhsBuffer = ByteBuffer.allocate(4);
-  private final ByteBuffer rhsBuffer = ByteBuffer.allocate(4);
-
-  @Mock
-  private ColumnCapabilities capabilities;
-  private final IntFunction<String> dictionaryLookup = DICTIONARY::get;
-
-  private StringGroupByColumnSelectorStrategy target;
-
-  @Before
-  public void setUp()
-  {
-    lhsBuffer.putInt(1);
-    rhsBuffer.putInt(2);
-    Mockito.doReturn(true).when(capabilities).hasBitmapIndexes();
-    Mockito.doReturn(ColumnCapabilities.Capable.TRUE).when(capabilities).areDictionaryValuesSorted();
-    Mockito.doReturn(ColumnCapabilities.Capable.TRUE).when(capabilities).areDictionaryValuesUnique();
-    target = new StringGroupByColumnSelectorStrategy(dictionaryLookup, capabilities);
-  }
-
-  @Test
-  public void testBufferComparatorCannotCompareIntsAndNullStringComparatorShouldUseLexicographicComparator()
-  {
-    Mockito.when(capabilities.areDictionaryValuesSorted()).thenReturn(ColumnCapabilities.Capable.FALSE);
-    // The comparator is not using the short circuit so it isn't comparing indexes.
-    Grouper.BufferComparator comparator = target.bufferComparator(0, null);
-    Assert.assertTrue(comparator.compare(lhsBuffer, rhsBuffer, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(rhsBuffer, lhsBuffer, 0, 0) < 0);
-  }
-
-  @Test
-  public void testBufferComparatorCanCompareIntsAndNullStringComparatorShouldUseLexicographicComparator()
-  {
-    Grouper.BufferComparator comparator = target.bufferComparator(0, null);
-    Assert.assertTrue(comparator.compare(lhsBuffer, rhsBuffer, 0, 0) < 0);
-    Assert.assertTrue(comparator.compare(rhsBuffer, lhsBuffer, 0, 0) > 0);
-  }
-
-  @Test
-  public void testBufferComparatorCanCompareIntsAndLexicographicStringComparatorShouldUseLexicographicComparator()
-  {
-    Grouper.BufferComparator comparator = target.bufferComparator(0, StringComparators.LEXICOGRAPHIC);
-    Assert.assertTrue(comparator.compare(lhsBuffer, rhsBuffer, 0, 0) < 0);
-    Assert.assertTrue(comparator.compare(rhsBuffer, lhsBuffer, 0, 0) > 0);
-  }
-
-  @Test
-  public void testBufferComparatorCanCompareIntsAndStrLenStringComparatorShouldUseLexicographicComparator()
-  {
-    Grouper.BufferComparator comparator = target.bufferComparator(0, StringComparators.STRLEN);
-    Assert.assertTrue(comparator.compare(lhsBuffer, rhsBuffer, 0, 0) > 0);
-    Assert.assertTrue(comparator.compare(rhsBuffer, lhsBuffer, 0, 0) < 0);
-  }
-
-  @After
-  public void tearDown()
-  {
-    lhsBuffer.clear();
-    rhsBuffer.clear();
-  }
-}
diff --git a/processing/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java b/processing/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java
index 19b4921..670da7a 100644
--- a/processing/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java
@@ -110,6 +110,12 @@
       {
         return 0;
       }
+
+      @Override
+      public boolean groupable()
+      {
+        return false;
+      }
     });
   }
 
@@ -693,6 +699,12 @@
     {
       return read(ByteBuffer.wrap(value));
     }
+
+    @Override
+    public boolean groupable()
+    {
+      return false;
+    }
   }
 
   @Test
diff --git a/processing/src/test/java/org/apache/druid/segment/data/ComparableIntArrayTest.java b/processing/src/test/java/org/apache/druid/segment/data/ComparableIntArrayTest.java
deleted file mode 100644
index cfc4e34..0000000
--- a/processing/src/test/java/org/apache/druid/segment/data/ComparableIntArrayTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.data;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-public class ComparableIntArrayTest
-{
-  private final int[] array = new int[]{1, 2, 3};
-  private final ComparableIntArray comparableIntArray = ComparableIntArray.of(1, 2, 3);
-
-  @Test
-  public void testDelegate()
-  {
-    Assert.assertArrayEquals(array, comparableIntArray.getDelegate());
-    Assert.assertEquals(0, ComparableIntArray.of(new int[0]).getDelegate().length);
-    Assert.assertEquals(0, ComparableIntArray.of().getDelegate().length);
-  }
-
-  @Test
-  public void testHashCode()
-  {
-    Assert.assertEquals(Arrays.hashCode(array), comparableIntArray.hashCode());
-    Set<ComparableIntArray> set = new HashSet<>();
-    set.add(comparableIntArray);
-    set.add(ComparableIntArray.of(array));
-    Assert.assertEquals(1, set.size());
-  }
-
-  @Test
-  public void testEquals()
-  {
-    Assert.assertTrue(comparableIntArray.equals(ComparableIntArray.of(array)));
-    Assert.assertFalse(comparableIntArray.equals(ComparableIntArray.of(1, 2, 5)));
-    Assert.assertFalse(comparableIntArray.equals(ComparableIntArray.EMPTY_ARRAY));
-    Assert.assertFalse(comparableIntArray.equals(null));
-  }
-
-  @Test
-  public void testCompareTo()
-  {
-    Assert.assertEquals(0, comparableIntArray.compareTo(ComparableIntArray.of(array)));
-    Assert.assertEquals(1, comparableIntArray.compareTo(null));
-    Assert.assertEquals(1, comparableIntArray.compareTo(ComparableIntArray.of(1, 2)));
-    Assert.assertEquals(-1, comparableIntArray.compareTo(ComparableIntArray.of(1, 2, 3, 4)));
-    Assert.assertTrue(comparableIntArray.compareTo(ComparableIntArray.of(2)) < 0);
-  }
-}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 1bd3f80..95a3379 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -90,8 +90,6 @@
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.segment.column.Types;
-import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.sql.calcite.aggregation.Aggregation;
 import org.apache.druid.sql.calcite.aggregation.DimensionExpression;
@@ -486,15 +484,16 @@
 
       final RelDataType dataType = rexNode.getType();
       final ColumnType outputType = Calcites.getColumnTypeForRelDataType(dataType);
-      if (Types.isNullOr(outputType, ValueType.COMPLEX)) {
-        // Can't group on unknown or COMPLEX types.
-        plannerContext.setPlanningError(
-            "SQL requires a group-by on a column of type %s that is unsupported.",
-            outputType
-        );
+      if (outputType == null) {
+        // Can't group on unknown types.
+        plannerContext.setPlanningError("SQL requires a group-by on a column with unknown type that is unsupported.");
         throw new CannotBuildQueryException(aggregate, rexNode);
       }
-
+      if (!outputType.getNullableStrategy().groupable()) {
+        // Can't group on 'ungroupable' types.
+        plannerContext.setPlanningError("SQL requires a group-by on a column with type [%s] that is unsupported.", outputType);
+        throw new CannotBuildQueryException(aggregate, rexNode);
+      }
       final String dimOutputName = outputNamePrefix + outputNameCounter++;
       if (!druidExpression.isSimpleExtraction()) {
         final String virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
@@ -1250,8 +1249,9 @@
     }
 
     final DimensionSpec dimensionSpec = Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec();
-    // grouping col cannot be type array
-    if (dimensionSpec.getOutputType().isArray()) {
+    // TopN queries can't handle arrays or complex dimensions. Return's null so that they get planned as a group by query
+    // which does support complex and array dimensions
+    if (!dimensionSpec.getOutputType().isPrimitive()) {
       return null;
     }
     final OrderByColumnSpec limitColumn;
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteGroupByQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteGroupByQueryTest.java
new file mode 100644
index 0000000..4595de2
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteGroupByQueryTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+public class CalciteGroupByQueryTest
+{
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
index 07e6898..dceddba 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
@@ -54,6 +54,8 @@
 import org.apache.druid.query.filter.ExpressionDimFilter;
 import org.apache.druid.query.filter.LikeDimFilter;
 import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.topn.DimensionTopNMetricSpec;
@@ -539,6 +541,212 @@
   }
 
   @Test
+  public void testGroupByOnNestedColumn()
+  {
+    cannotVectorize();
+    testQuery(
+        "SELECT nester, SUM(strlen(string)) FROM druid.nested GROUP BY 1",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(DATA_SOURCE)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ExpressionVirtualColumn("v0", "strlen(\"string\")", ColumnType.LONG, queryFramework().macroTable())
+                        )
+                        .setDimensions(dimensions(new DefaultDimensionSpec("nester", "d0", ColumnType.NESTED_DATA)))
+                        .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "v0")))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{null, 9L},
+            new Object[]{"\"hello\"", 3L},
+            new Object[]{"2", 3L},
+            new Object[]{"{\"array\":[\"a\",\"b\"],\"n\":{\"x\":\"hello\"}}", 3L},
+            new Object[]{"{\"array\":[\"a\",\"b\"],\"n\":{\"x\":1}}", 3L}
+        )
+    );
+  }
+
+  @Test
+  public void testGroupByOnNestedColumnWithOrderBy()
+  {
+    cannotVectorize();
+    testQuery(
+        "SELECT nester, SUM(strlen(string)) FROM druid.nested GROUP BY 1",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(DATA_SOURCE)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ExpressionVirtualColumn("v0", "strlen(\"string\")", ColumnType.LONG, queryFramework().macroTable())
+                        )
+                        .setDimensions(dimensions(new DefaultDimensionSpec("nester", "d0", ColumnType.NESTED_DATA)))
+                        .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "v0")))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{null, 9L},
+            new Object[]{"\"hello\"", 3L},
+            new Object[]{"2", 3L},
+            new Object[]{"{\"array\":[\"a\",\"b\"],\"n\":{\"x\":\"hello\"}}", 3L},
+            new Object[]{"{\"array\":[\"a\",\"b\"],\"n\":{\"x\":1}}", 3L}
+        )
+    );
+  }
+
+  @Test
+  public void testGroupByOnNestedColumnWithOrderByAndLimit()
+  {
+    cannotVectorize();
+    testQuery(
+        "SELECT nester, SUM(strlen(string)) FROM druid.nested GROUP BY 1 ORDER BY 1 LIMIT 100",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(DATA_SOURCE)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ExpressionVirtualColumn(
+                                "v0",
+                                "strlen(\"string\")",
+                                ColumnType.LONG,
+                                queryFramework().macroTable()
+                            )
+                        )
+                        .setDimensions(dimensions(new DefaultDimensionSpec("nester", "d0", ColumnType.NESTED_DATA)))
+                        .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "v0")))
+                        .setLimitSpec(new DefaultLimitSpec(
+                            ImmutableList.of(new OrderByColumnSpec(
+                                "d0",
+                                OrderByColumnSpec.Direction.ASCENDING,
+                                StringComparators.NATURAL
+                            )),
+                            100
+                        ))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{null, 9L},
+            new Object[]{"\"hello\"", 3L},
+            new Object[]{"2", 3L},
+            new Object[]{"{\"array\":[\"a\",\"b\"],\"n\":{\"x\":\"hello\"}}", 3L},
+            new Object[]{"{\"array\":[\"a\",\"b\"],\"n\":{\"x\":1}}", 3L}
+        )
+    );
+  }
+
+  @Test
+  public void testGroupByOnNestedColumnWithOrderByAndLimit2()
+  {
+    cannotVectorize();
+    testQuery(
+        "SELECT nester, SUM(strlen(string)) FROM druid.nested GROUP BY 1 ORDER BY 1 LIMIT 2",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(DATA_SOURCE)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ExpressionVirtualColumn(
+                                "v0",
+                                "strlen(\"string\")",
+                                ColumnType.LONG,
+                                queryFramework().macroTable()
+                            )
+                        )
+                        .setDimensions(dimensions(new DefaultDimensionSpec("nester", "d0", ColumnType.NESTED_DATA)))
+                        .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "v0")))
+                        .setLimitSpec(new DefaultLimitSpec(
+                            ImmutableList.of(new OrderByColumnSpec(
+                                "d0",
+                                OrderByColumnSpec.Direction.ASCENDING,
+                                StringComparators.NATURAL
+                            )),
+                            2
+                        ))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{null, 9L},
+            new Object[]{"\"hello\"", 3L}
+        )
+    );
+  }
+
+  @Test
+  public void testGroupByOnNestedColumnWithLimit()
+  {
+    cannotVectorize();
+    testQuery(
+        "SELECT nester, SUM(strlen(string)) FROM druid.nested GROUP BY 1 LIMIT 100",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(DATA_SOURCE)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ExpressionVirtualColumn(
+                                "v0",
+                                "strlen(\"string\")",
+                                ColumnType.LONG,
+                                queryFramework().macroTable()
+                            )
+                        )
+                        .setDimensions(dimensions(new DefaultDimensionSpec("nester", "d0", ColumnType.NESTED_DATA)))
+                        .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "v0")))
+                        .setLimitSpec(new DefaultLimitSpec(null, 100))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{null, 9L},
+            new Object[]{"\"hello\"", 3L},
+            new Object[]{"2", 3L},
+            new Object[]{"{\"array\":[\"a\",\"b\"],\"n\":{\"x\":\"hello\"}}", 3L},
+            new Object[]{"{\"array\":[\"a\",\"b\"],\"n\":{\"x\":1}}", 3L}
+        )
+    );
+  }
+
+  @Test
+  public void testGroupByOnNestedColumnWithLimit2()
+  {
+    cannotVectorize();
+    testQuery(
+        "SELECT nester, SUM(strlen(string)) FROM druid.nested GROUP BY 1 LIMIT 2",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(DATA_SOURCE)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            new ExpressionVirtualColumn(
+                                "v0",
+                                "strlen(\"string\")",
+                                ColumnType.LONG,
+                                queryFramework().macroTable()
+                            )
+                        )
+                        .setDimensions(dimensions(new DefaultDimensionSpec("nester", "d0", ColumnType.NESTED_DATA)))
+                        .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "v0")))
+                        .setLimitSpec(new DefaultLimitSpec(null, 2))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{null, 9L},
+            new Object[]{"\"hello\"", 3L}
+        )
+    );
+  }
+
+  @Test
   public void testGroupByRootPath()
   {
     testQuery(
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index d851ee0..661785c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -5847,8 +5847,8 @@
     // COUNT DISTINCT on a sketch cannot be exact.
     assertQueryIsUnplannable(
         PLANNER_CONFIG_NO_HLL,
-        "SELECT COUNT(distinct unique_dim1) FROM druid.foo",
-        "SQL requires a group-by on a column of type COMPLEX<hyperUnique> that is unsupported."
+        "SELECT unique_dim1, COUNT(*) FROM druid.foo GROUP BY 1",
+        "SQL requires a group-by on a column with type [COMPLEX<hyperUnique>] that is unsupported."
     );
   }