MSQ: Support multiple result columns with the same name. (#14025)

* MSQ: Support multiple result columns with the same name.

This is allowed in SQL, and is supported by the regular SQL endpoint.
We retain a validation that INSERT ... SELECT does not allow multiple
columns with the same name, because column names in segments must be
unique.
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 9a928ff..e2ec3ce 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -36,6 +36,7 @@
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntArraySet;
+import it.unimi.dsi.fastutil.ints.IntList;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.data.input.StringTuple;
@@ -186,6 +187,7 @@
 import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -1435,7 +1437,7 @@
 
                 final List<Object[]> retVal = new ArrayList<>();
                 while (!cursor.isDone()) {
-                  final Object[] row = new Object[columnMappings.getMappings().size()];
+                  final Object[] row = new Object[columnMappings.size()];
                   for (int i = 0; i < row.length; i++) {
                     row[i] = selectors.get(i).getObject();
                   }
@@ -1499,6 +1501,8 @@
   )
   {
     final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
+    final ColumnMappings columnMappings = querySpec.getColumnMappings();
+    final Query<?> queryToPlan;
     final ShuffleSpecFactory shuffleSpecFactory;
 
     if (MSQControllerTask.isIngestion(querySpec)) {
@@ -1508,25 +1512,33 @@
               tuningConfig.getRowsPerSegment(),
               aggregate
           );
+
+      if (!columnMappings.hasUniqueOutputColumnNames()) {
+        // We do not expect to hit this case in production, because the SQL validator checks that column names
+        // are unique for INSERT and REPLACE statements (i.e. anything where MSQControllerTask.isIngestion would
+        // be true). This check is here as defensive programming.
+        throw new ISE("Column names are not unique: [%s]", columnMappings.getOutputColumnNames());
+      }
+
+      if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME)) {
+        // We know there's a single time column, because we've checked columnMappings.hasUniqueOutputColumnNames().
+        final int timeColumn = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME).getInt(0);
+        queryToPlan = querySpec.getQuery().withOverriddenContext(
+            ImmutableMap.of(
+                QueryKitUtils.CTX_TIME_COLUMN_NAME,
+                columnMappings.getQueryColumnName(timeColumn)
+            )
+        );
+      } else {
+        queryToPlan = querySpec.getQuery();
+      }
     } else if (querySpec.getDestination() instanceof TaskReportMSQDestination) {
       shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
+      queryToPlan = querySpec.getQuery();
     } else {
       throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
     }
 
-    final Query<?> queryToPlan;
-
-    if (querySpec.getColumnMappings().hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME)) {
-      queryToPlan = querySpec.getQuery().withOverriddenContext(
-          ImmutableMap.of(
-              QueryKitUtils.CTX_TIME_COLUMN_NAME,
-              querySpec.getColumnMappings().getQueryColumnForOutputColumn(ColumnHolder.TIME_COLUMN_NAME)
-          )
-      );
-    } else {
-      queryToPlan = querySpec.getQuery();
-    }
-
     final QueryDefinition queryDef;
 
     try {
@@ -1550,7 +1562,6 @@
     if (MSQControllerTask.isIngestion(querySpec)) {
       final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature();
       final ClusterBy queryClusterBy = queryDef.getFinalStageDefinition().getClusterBy();
-      final ColumnMappings columnMappings = querySpec.getColumnMappings();
 
       // Find the stage that provides shuffled input to the final segment-generation stage.
       StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition();
@@ -1679,8 +1690,10 @@
    */
   private static boolean timeIsGroupByDimension(GroupByQuery groupByQuery, ColumnMappings columnMappings)
   {
-    if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME)) {
-      final String queryTimeColumn = columnMappings.getQueryColumnForOutputColumn(ColumnHolder.TIME_COLUMN_NAME);
+    final IntList positions = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME);
+
+    if (positions.size() == 1) {
+      final String queryTimeColumn = columnMappings.getQueryColumnName(positions.getInt(0));
       return queryTimeColumn.equals(groupByQuery.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD));
     } else {
       return false;
@@ -1740,7 +1753,7 @@
 
     for (int i = clusterBy.getBucketByCount(); i < clusterBy.getBucketByCount() + numShardColumns; i++) {
       final KeyColumn column = clusterByColumns.get(i);
-      final List<String> outputColumns = columnMappings.getOutputColumnsForQueryColumn(column.columnName());
+      final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(column.columnName());
 
       // DimensionRangeShardSpec only handles ascending order.
       if (column.order() != KeyOrder.ASCENDING) {
@@ -1759,7 +1772,7 @@
         return Collections.emptyList();
       }
 
-      shardColumns.add(outputColumns.get(0));
+      shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0)));
     }
 
     return shardColumns;
@@ -1830,7 +1843,10 @@
         throw new MSQException(new InsertCannotOrderByDescendingFault(clusterByColumn.columnName()));
       }
 
-      outputColumnsInOrder.addAll(columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName()));
+      final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName());
+      for (final int outputColumn : outputColumns) {
+        outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn));
+      }
     }
 
     // Then all other columns.
@@ -1841,13 +1857,17 @@
     if (isRollupQuery) {
       // Populate aggregators from the native query when doing an ingest in rollup mode.
       for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) {
-        String outputColumn = Iterables.getOnlyElement(columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName()));
-        if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
-          throw new ISE("There can only be one aggregator factory for column [%s].", outputColumn);
+        final int outputColumn = CollectionUtils.getOnlyElement(
+            columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName()),
+            xs -> new ISE("Expected single output for query column[%s] but got[%s]", aggregatorFactory.getName(), xs)
+        );
+        final String outputColumnName = columnMappings.getOutputColumnName(outputColumn);
+        if (outputColumnAggregatorFactories.containsKey(outputColumnName)) {
+          throw new ISE("There can only be one aggregation for column [%s].", outputColumn);
         } else {
           outputColumnAggregatorFactories.put(
-              outputColumn,
-              aggregatorFactory.withName(outputColumn).getCombiningFactory()
+              outputColumnName,
+              aggregatorFactory.withName(outputColumnName).getCombiningFactory()
           );
         }
       }
@@ -1856,13 +1876,19 @@
     // Each column can be of either time, dimension, aggregator. For this method. we can ignore the time column.
     // For non-complex columns, If the aggregator factory of the column is not available, we treat the column as
     // a dimension. For complex columns, certains hacks are in place.
-    for (final String outputColumn : outputColumnsInOrder) {
-      final String queryColumn = columnMappings.getQueryColumnForOutputColumn(outputColumn);
+    for (final String outputColumnName : outputColumnsInOrder) {
+      // CollectionUtils.getOnlyElement because this method is only called during ingestion, where we require
+      // that output names be unique.
+      final int outputColumn = CollectionUtils.getOnlyElement(
+          columnMappings.getOutputColumnsByName(outputColumnName),
+          xs -> new ISE("Expected single output column for name [%s], but got [%s]", outputColumnName, xs)
+      );
+      final String queryColumn = columnMappings.getQueryColumnName(outputColumn);
       final ColumnType type =
           querySignature.getColumnType(queryColumn)
-                        .orElseThrow(() -> new ISE("No type for column [%s]", outputColumn));
+                        .orElseThrow(() -> new ISE("No type for column [%s]", outputColumnName));
 
-      if (!outputColumn.equals(ColumnHolder.TIME_COLUMN_NAME)) {
+      if (!outputColumnName.equals(ColumnHolder.TIME_COLUMN_NAME)) {
 
         if (!type.is(ValueType.COMPLEX)) {
           // non complex columns
@@ -1870,21 +1896,21 @@
               dimensions,
               aggregators,
               outputColumnAggregatorFactories,
-              outputColumn,
+              outputColumnName,
               type
           );
         } else {
           // complex columns only
           if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) {
-            dimensions.add(DimensionSchemaUtils.createDimensionSchema(outputColumn, type));
+            dimensions.add(DimensionSchemaUtils.createDimensionSchema(outputColumnName, type));
           } else if (!isRollupQuery) {
-            aggregators.add(new PassthroughAggregatorFactory(outputColumn, type.getComplexTypeName()));
+            aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName()));
           } else {
             populateDimensionsAndAggregators(
                 dimensions,
                 aggregators,
                 outputColumnAggregatorFactories,
-                outputColumn,
+                outputColumnName,
                 type
             );
           }
@@ -1972,12 +1998,14 @@
   )
   {
     final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature();
-    final RowSignature.Builder mappedSignature = RowSignature.builder();
+    final ImmutableList.Builder<MSQResultsReport.ColumnAndType> mappedSignature = ImmutableList.builder();
 
     for (final ColumnMapping mapping : columnMappings.getMappings()) {
       mappedSignature.add(
-          mapping.getOutputColumn(),
-          querySignature.getColumnType(mapping.getQueryColumn()).orElse(null)
+          new MSQResultsReport.ColumnAndType(
+              mapping.getOutputColumn(),
+              querySignature.getColumnType(mapping.getQueryColumn()).orElse(null)
+          )
       );
     }
 
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ColumnMappings.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ColumnMappings.java
index fddfbee..5dc502f 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ColumnMappings.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ColumnMappings.java
@@ -22,12 +22,12 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntLists;
 import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.segment.column.RowSignature;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -36,23 +36,32 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+/**
+ * Maps column names from {@link MSQSpec#getQuery()} to output names desired by the user, in the order
+ * desired by the user.
+ *
+ * The {@link MSQSpec#getQuery()} is translated by {@link org.apache.druid.msq.querykit.QueryKit} into
+ * a {@link org.apache.druid.msq.kernel.QueryDefinition}. So, this class also represents mappings from
+ * {@link org.apache.druid.msq.kernel.QueryDefinition#getFinalStageDefinition()} into the output names desired
+ * by the user.
+ */
 public class ColumnMappings
 {
   private final List<ColumnMapping> mappings;
-  private final Map<String, String> outputToQueryColumnMap;
-  private final Map<String, List<String>> queryToOutputColumnsMap;
+  private final Map<String, IntList> outputColumnNameToPositionMap;
+  private final Map<String, IntList> queryColumnNameToPositionMap;
 
   @JsonCreator
   public ColumnMappings(final List<ColumnMapping> mappings)
   {
-    this.mappings = validateNoDuplicateOutputColumns(Preconditions.checkNotNull(mappings, "mappings"));
-    this.outputToQueryColumnMap = new HashMap<>();
-    this.queryToOutputColumnsMap = new HashMap<>();
+    this.mappings = Preconditions.checkNotNull(mappings, "mappings");
+    this.outputColumnNameToPositionMap = new HashMap<>();
+    this.queryColumnNameToPositionMap = new HashMap<>();
 
-    for (final ColumnMapping mapping : mappings) {
-      outputToQueryColumnMap.put(mapping.getOutputColumn(), mapping.getQueryColumn());
-      queryToOutputColumnsMap.computeIfAbsent(mapping.getQueryColumn(), k -> new ArrayList<>())
-                             .add(mapping.getOutputColumn());
+    for (int i = 0; i < mappings.size(); i++) {
+      final ColumnMapping mapping = mappings.get(i);
+      outputColumnNameToPositionMap.computeIfAbsent(mapping.getOutputColumn(), k -> new IntArrayList()).add(i);
+      queryColumnNameToPositionMap.computeIfAbsent(mapping.getQueryColumn(), k -> new IntArrayList()).add(i);
     }
   }
 
@@ -66,34 +75,95 @@
     );
   }
 
+  /**
+   * Number of output columns.
+   */
+  public int size()
+  {
+    return mappings.size();
+  }
+
+  /**
+   * All output column names, in order. Some names may appear more than once, unless
+   * {@link #hasUniqueOutputColumnNames()} is true.
+   */
   public List<String> getOutputColumnNames()
   {
     return mappings.stream().map(ColumnMapping::getOutputColumn).collect(Collectors.toList());
   }
 
-  public boolean hasOutputColumn(final String columnName)
+  /**
+   * Whether output column names from {@link #getOutputColumnNames()} are all unique.
+   */
+  public boolean hasUniqueOutputColumnNames()
   {
-    return outputToQueryColumnMap.containsKey(columnName);
+    final Set<String> encountered = new HashSet<>();
+
+    for (final ColumnMapping mapping : mappings) {
+      if (!encountered.add(mapping.getOutputColumn())) {
+        return false;
+      }
+    }
+
+    return true;
   }
 
-  public String getQueryColumnForOutputColumn(final String outputColumn)
+  /**
+   * Whether a particular output column name exists.
+   */
+  public boolean hasOutputColumn(final String outputColumnName)
   {
-    final String queryColumn = outputToQueryColumnMap.get(outputColumn);
-    if (queryColumn != null) {
-      return queryColumn;
-    } else {
-      throw new IAE("No such output column [%s]", outputColumn);
-    }
+    return outputColumnNameToPositionMap.containsKey(outputColumnName);
   }
 
-  public List<String> getOutputColumnsForQueryColumn(final String queryColumn)
+  /**
+   * Query column name for a particular output column position.
+   *
+   * @throws IllegalArgumentException if the output column position is out of range
+   */
+  public String getQueryColumnName(final int outputColumn)
   {
-    final List<String> outputColumns = queryToOutputColumnsMap.get(queryColumn);
-    if (outputColumns != null) {
-      return outputColumns;
-    } else {
-      return Collections.emptyList();
+    if (outputColumn < 0 || outputColumn >= mappings.size()) {
+      throw new IAE("Output column position[%d] out of range", outputColumn);
     }
+
+    return mappings.get(outputColumn).getQueryColumn();
+  }
+
+  /**
+   * Output column name for a particular output column position.
+   *
+   * @throws IllegalArgumentException if the output column position is out of range
+   */
+  public String getOutputColumnName(final int outputColumn)
+  {
+    if (outputColumn < 0 || outputColumn >= mappings.size()) {
+      throw new IAE("Output column position[%d] out of range", outputColumn);
+    }
+
+    return mappings.get(outputColumn).getOutputColumn();
+  }
+
+  /**
+   * Output column positions for a particular output column name.
+   */
+  public IntList getOutputColumnsByName(final String outputColumnName)
+  {
+    return outputColumnNameToPositionMap.getOrDefault(outputColumnName, IntLists.emptyList());
+  }
+
+  /**
+   * Output column positions for a particular query column name.
+   */
+  public IntList getOutputColumnsForQueryColumn(final String queryColumnName)
+  {
+    final IntList outputColumnPositions = queryColumnNameToPositionMap.get(queryColumnName);
+
+    if (outputColumnPositions == null) {
+      return IntLists.emptyList();
+    }
+
+    return outputColumnPositions;
   }
 
   @JsonValue
@@ -128,17 +198,4 @@
            "mappings=" + mappings +
            '}';
   }
-
-  private static List<ColumnMapping> validateNoDuplicateOutputColumns(final List<ColumnMapping> mappings)
-  {
-    final Set<String> encountered = new HashSet<>();
-
-    for (final ColumnMapping mapping : mappings) {
-      if (!encountered.add(mapping.getOutputColumn())) {
-        throw new ISE("Duplicate output column [%s]", mapping.getOutputColumn());
-      }
-    }
-
-    return mappings;
-  }
 }
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java
index 3d75e39..5891138 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java
@@ -26,20 +26,25 @@
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.guava.Yielder;
 import org.apache.druid.java.util.common.guava.Yielders;
-import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ColumnType;
 
 import javax.annotation.Nullable;
 import java.util.List;
+import java.util.Objects;
 
 public class MSQResultsReport
 {
-  private final RowSignature signature;
+  /**
+   * Like {@link org.apache.druid.segment.column.RowSignature}, but allows duplicate column names for compatibility
+   * with SQL (which also allows duplicate column names in query results).
+   */
+  private final List<ColumnAndType> signature;
   @Nullable
   private final List<String> sqlTypeNames;
   private final Yielder<Object[]> resultYielder;
 
   public MSQResultsReport(
-      final RowSignature signature,
+      final List<ColumnAndType> signature,
       @Nullable final List<String> sqlTypeNames,
       final Yielder<Object[]> resultYielder
   )
@@ -54,7 +59,7 @@
    */
   @JsonCreator
   static MSQResultsReport fromJson(
-      @JsonProperty("signature") final RowSignature signature,
+      @JsonProperty("signature") final List<ColumnAndType> signature,
       @JsonProperty("sqlTypeNames") @Nullable final List<String> sqlTypeNames,
       @JsonProperty("results") final List<Object[]> results
   )
@@ -63,7 +68,7 @@
   }
 
   @JsonProperty("signature")
-  public RowSignature getSignature()
+  public List<ColumnAndType> getSignature()
   {
     return signature;
   }
@@ -81,4 +86,58 @@
   {
     return resultYielder;
   }
+
+  public static class ColumnAndType
+  {
+    private final String name;
+    private final ColumnType type;
+
+    @JsonCreator
+    public ColumnAndType(
+        @JsonProperty("name") String name,
+        @JsonProperty("type") ColumnType type
+    )
+    {
+      this.name = name;
+      this.type = type;
+    }
+
+    @JsonProperty
+    public String getName()
+    {
+      return name;
+    }
+
+    @JsonProperty
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    public ColumnType getType()
+    {
+      return type;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ColumnAndType that = (ColumnAndType) o;
+      return Objects.equals(name, that.name) && Objects.equals(type, that.type);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(name, type);
+    }
+
+    @Override
+    public String toString()
+    {
+      return name + ":" + type;
+    }
+  }
 }
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 35b439e..783b371 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -183,9 +183,6 @@
     final List<ColumnMapping> columnMappings = new ArrayList<>();
 
     for (final Pair<Integer, String> entry : fieldMapping) {
-      // Note: SQL generally allows output columns to be duplicates, but MSQTaskSqlEngine.validateNoDuplicateAliases
-      // will prevent duplicate output columns from appearing here. So no need to worry about it.
-
       final String queryColumn = druidQuery.getOutputRowSignature().getColumnName(entry.getKey());
       final String outputColumns = entry.getValue();
 
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index b2c4fab..94c2532 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -173,8 +173,6 @@
       final PlannerContext plannerContext
   ) throws ValidationException
   {
-    validateNoDuplicateAliases(fieldMappings);
-
     if (plannerContext.queryContext().containsKey(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)) {
       throw new ValidationException(
           StringUtils.format("Cannot use \"%s\" without INSERT", DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
@@ -248,7 +246,8 @@
   }
 
   /**
-   * SQL allows multiple output columns with the same name, but multi-stage queries doesn't.
+   * SQL allows multiple output columns with the same name. However, we don't allow this for INSERT or REPLACE
+   * queries, because we use these output names to generate columns in segments. They must be unique.
    */
   private static void validateNoDuplicateAliases(final List<Pair<Integer, String>> fieldMappings)
       throws ValidationException
@@ -257,7 +256,7 @@
 
     for (final Pair<Integer, String> field : fieldMappings) {
       if (!aliasesSeen.add(field.right)) {
-        throw new ValidationException("Duplicate field in SELECT: " + field.right);
+        throw new ValidationException("Duplicate field in SELECT: [" + field.right + "]");
       }
     }
   }
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 82f3583..b20e443 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -879,6 +879,30 @@
   }
 
   @Test
+  public void testInsertDuplicateColumnNames()
+  {
+    testIngestQuery()
+        .setSql(" insert into foo1 SELECT\n"
+                + "  floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+                + " namespace,\n"
+                + " \"user\" AS namespace\n"
+                + "FROM TABLE(\n"
+                + "  EXTERN(\n"
+                + "    '{ \"files\": [\"ignored\"],\"type\":\"local\"}',\n"
+                + "    '{\"type\": \"json\"}',\n"
+                + "    '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}, {\"name\": \"__bucket\", \"type\": \"string\"}]'\n"
+                + "  )\n"
+                + ") PARTITIONED by day")
+        .setQueryContext(context)
+        .setExpectedValidationErrorMatcher(CoreMatchers.allOf(
+            CoreMatchers.instanceOf(SqlPlanningException.class),
+            ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
+                "Duplicate field in SELECT: [namespace]"))
+        ))
+        .verifyPlanningErrors();
+  }
+
+  @Test
   public void testInsertQueryWithInvalidSubtaskCount()
   {
     Map<String, Object> localContext = ImmutableMap.<String, Object>builder()
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index d652446..4d5a2c0 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -35,6 +35,7 @@
 import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.indexing.MSQTuningConfig;
 import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
 import org.apache.druid.msq.test.CounterSnapshotMatcher;
 import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.msq.test.MSQTestFileUtils;
@@ -246,6 +247,73 @@
   }
 
   @Test
+  public void testSelectOnFooDuplicateColumnNames()
+  {
+    // Duplicate column names are OK in SELECT statements.
+
+    final RowSignature expectedScanSignature =
+        RowSignature.builder()
+                    .add("cnt", ColumnType.LONG)
+                    .add("dim1", ColumnType.STRING)
+                    .build();
+
+    final ColumnMappings expectedColumnMappings = new ColumnMappings(
+        ImmutableList.of(
+            new ColumnMapping("cnt", "x"),
+            new ColumnMapping("dim1", "x")
+        )
+    );
+
+    final List<MSQResultsReport.ColumnAndType> expectedOutputSignature =
+        ImmutableList.of(
+            new MSQResultsReport.ColumnAndType("x", ColumnType.LONG),
+            new MSQResultsReport.ColumnAndType("x", ColumnType.STRING)
+        );
+
+    testSelectQuery()
+        .setSql("select cnt AS x, dim1 AS x from foo")
+        .setExpectedMSQSpec(
+            MSQSpec.builder()
+                   .query(
+                       newScanQueryBuilder()
+                           .dataSource(CalciteTests.DATASOURCE1)
+                           .intervals(querySegmentSpec(Filtration.eternity()))
+                           .columns("cnt", "dim1")
+                           .context(defaultScanQueryContext(context, expectedScanSignature))
+                           .build()
+                   )
+                   .columnMappings(expectedColumnMappings)
+                   .tuningConfig(MSQTuningConfig.defaultConfig())
+                   .build()
+        )
+        .setQueryContext(context)
+        .setExpectedRowSignature(expectedOutputSignature)
+        .setExpectedCountersForStageWorkerChannel(
+            CounterSnapshotMatcher
+                .with().totalFiles(1),
+            0, 0, "input0"
+        )
+        .setExpectedCountersForStageWorkerChannel(
+            CounterSnapshotMatcher
+                .with().rows(6).frames(1),
+            0, 0, "output"
+        )
+        .setExpectedCountersForStageWorkerChannel(
+            CounterSnapshotMatcher
+                .with().rows(6).frames(1),
+            0, 0, "shuffle"
+        )
+        .setExpectedResultRows(ImmutableList.of(
+            new Object[]{1L, !useDefault ? "" : null},
+            new Object[]{1L, "10.1"},
+            new Object[]{1L, "2"},
+            new Object[]{1L, "1"},
+            new Object[]{1L, "def"},
+            new Object[]{1L, "abc"}
+        )).verifyResults();
+  }
+
+  @Test
   public void testSelectOnFooWhereMatchesNoSegments()
   {
     RowSignature resultSignature = RowSignature.builder()
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index 366e9c5..b4baaea 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -51,6 +51,7 @@
 import java.io.File;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -101,7 +102,7 @@
             ),
             new CounterSnapshotsTree(),
             new MSQResultsReport(
-                RowSignature.builder().add("s", ColumnType.STRING).build(),
+                Collections.singletonList(new MSQResultsReport.ColumnAndType("s", ColumnType.STRING)),
                 ImmutableList.of("VARCHAR"),
                 Yielders.each(Sequences.simple(results))
             )
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/ExtractResultsFactory.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/ExtractResultsFactory.java
index 7b8079d..7b694e1 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/ExtractResultsFactory.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/ExtractResultsFactory.java
@@ -20,17 +20,14 @@
 package org.apache.druid.msq.test;
 
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.sql.calcite.QueryTestBuilder;
 import org.apache.druid.sql.calcite.QueryTestRunner;
 import org.junit.Assert;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.function.Supplier;
 
 /**
@@ -98,11 +95,11 @@
           if (!payload.getStatus().getStatus().isComplete()) {
             throw new ISE("Query task [%s] should have finished", taskId);
           }
-          Optional<Pair<RowSignature, List<Object[]>>> signatureListPair = MSQTestBase.getSignatureWithRows(payload.getResults());
-          if (!signatureListPair.isPresent()) {
+          final List<Object[]> resultRows = MSQTestBase.getRows(payload.getResults());
+          if (resultRows == null) {
             throw new ISE("Results report not present in the task's report payload");
           }
-          extractedResults.add(results.withResults(signatureListPair.get().rhs));
+          extractedResults.add(results.withResults(resultRows));
         }
       }
 
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 1ee2bb0..a87f30c 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -179,7 +179,6 @@
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -191,7 +190,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -775,12 +773,12 @@
     );
   }
 
-  public static Optional<Pair<RowSignature, List<Object[]>>> getSignatureWithRows(MSQResultsReport resultsReport)
+  @Nullable
+  public static List<Object[]> getRows(@Nullable MSQResultsReport resultsReport)
   {
     if (resultsReport == null) {
-      return Optional.empty();
+      return null;
     } else {
-      RowSignature rowSignature = resultsReport.getSignature();
       Yielder<Object[]> yielder = resultsReport.getResultYielder();
       List<Object[]> rows = new ArrayList<>();
       while (!yielder.isDone()) {
@@ -794,7 +792,7 @@
         throw new ISE("Unable to get results from the report");
       }
 
-      return Optional.of(new Pair<RowSignature, List<Object[]>>(rowSignature, rows));
+      return rows;
     }
   }
 
@@ -802,7 +800,7 @@
   {
     protected String sql = null;
     protected Map<String, Object> queryContext = DEFAULT_MSQ_CONTEXT;
-    protected RowSignature expectedRowSignature = null;
+    protected List<MSQResultsReport.ColumnAndType> expectedRowSignature = null;
     protected MSQSpec expectedMSQSpec = null;
     protected MSQTuningConfig expectedTuningConfig = null;
     protected Set<SegmentId> expectedSegments = null;
@@ -829,10 +827,17 @@
       return asBuilder();
     }
 
+    public Builder setExpectedRowSignature(List<MSQResultsReport.ColumnAndType> expectedRowSignature)
+    {
+      Preconditions.checkArgument(!expectedRowSignature.isEmpty(), "Row signature cannot be empty");
+      this.expectedRowSignature = expectedRowSignature;
+      return asBuilder();
+    }
+
     public Builder setExpectedRowSignature(RowSignature expectedRowSignature)
     {
       Preconditions.checkArgument(!expectedRowSignature.equals(RowSignature.empty()), "Row signature cannot be empty");
-      this.expectedRowSignature = expectedRowSignature;
+      this.expectedRowSignature = resultSignatureFromRowSignature(expectedRowSignature);
       return asBuilder();
     }
 
@@ -1100,7 +1105,7 @@
           final StorageAdapter storageAdapter = new QueryableIndexStorageAdapter(queryableIndex);
 
           // assert rowSignature
-          Assert.assertEquals(expectedRowSignature, storageAdapter.getRowSignature());
+          Assert.assertEquals(expectedRowSignature, resultSignatureFromRowSignature(storageAdapter.getRowSignature()));
 
           // assert rollup
           Assert.assertEquals(expectedRollUp, queryableIndex.getMetadata().isRollup());
@@ -1172,7 +1177,7 @@
             Assert.assertTrue(segmentIdVsOutputRowsMap.get(diskSegment).contains(Arrays.asList(row)));
           }
         }
-        
+
         // Assert on the tombstone intervals
         // Tombstone segments are only published, but since they donot have any data, they are not pushed by the
         // SegmentGeneratorFrameProcessorFactory. We can get the tombstone segment ids published by taking a set
@@ -1245,7 +1250,7 @@
 
     // Made the visibility public to aid adding ut's easily with minimum parameters to set.
     @Nullable
-    public Pair<MSQSpec, Pair<RowSignature, List<Object[]>>> runQueryWithResult()
+    public Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>> runQueryWithResult()
     {
       readyToRun();
       Preconditions.checkArgument(sql != null, "sql cannot be null");
@@ -1280,18 +1285,16 @@
         if (payload.getStatus().getErrorReport() != null) {
           throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString());
         } else {
-          Optional<Pair<RowSignature, List<Object[]>>> rowSignatureListPair = getSignatureWithRows(payload.getResults());
-          if (!rowSignatureListPair.isPresent()) {
+          final List<Object[]> rows = getRows(payload.getResults());
+          if (rows == null) {
             throw new ISE("Query successful but no results found");
           }
-          log.info("found row signature %s", rowSignatureListPair.get().lhs);
-          log.info(rowSignatureListPair.get().rhs.stream()
-                                                 .map(row -> Arrays.toString(row))
-                                                 .collect(Collectors.joining("\n")));
+          log.info("found row signature %s", payload.getResults().getSignature());
+          log.info(rows.stream().map(Arrays::toString).collect(Collectors.joining("\n")));
 
-          MSQSpec spec = indexingServiceClient.getQuerySpecForTask(controllerId);
+          final MSQSpec spec = indexingServiceClient.getQuerySpecForTask(controllerId);
           log.info("Found spec: %s", objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(spec));
-          return new Pair<>(spec, rowSignatureListPair.get());
+          return new Pair<>(spec, Pair.of(payload.getResults().getSignature(), rows));
         }
       }
       catch (Exception e) {
@@ -1308,7 +1311,7 @@
       Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null");
       Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null");
       Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec not ");
-      Pair<MSQSpec, Pair<RowSignature, List<Object[]>>> specAndResults = runQueryWithResult();
+      Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>> specAndResults = runQueryWithResult();
 
       if (specAndResults == null) { // A fault was expected and the assertion has been done in the runQueryWithResult
         return;
@@ -1327,4 +1330,18 @@
       }
     }
   }
+
+  private static List<MSQResultsReport.ColumnAndType> resultSignatureFromRowSignature(final RowSignature signature)
+  {
+    final List<MSQResultsReport.ColumnAndType> retVal = new ArrayList<>(signature.size());
+    for (int i = 0; i < signature.size(); i++) {
+      retVal.add(
+          new MSQResultsReport.ColumnAndType(
+              signature.getColumnName(i),
+              signature.getColumnType(i).orElse(null)
+          )
+      );
+    }
+    return retVal;
+  }
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
index 9051e1e..7525cb9 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
@@ -37,7 +37,6 @@
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.sql.SqlTaskStatus;
-import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.sql.http.SqlQuery;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.SqlResourceTestClient;
@@ -203,13 +202,13 @@
     List<Map<String, Object>> actualResults = new ArrayList<>();
 
     Yielder<Object[]> yielder = resultsReport.getResultYielder();
-    RowSignature rowSignature = resultsReport.getSignature();
+    List<MSQResultsReport.ColumnAndType> rowSignature = resultsReport.getSignature();
 
     while (!yielder.isDone()) {
       Object[] row = yielder.get();
       Map<String, Object> rowWithFieldNames = new LinkedHashMap<>();
       for (int i = 0; i < row.length; ++i) {
-        rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]);
+        rowWithFieldNames.put(rowSignature.get(i).getName(), row[i]);
       }
       actualResults.add(rowWithFieldNames);
       yielder = yielder.next(null);
diff --git a/processing/src/main/java/org/apache/druid/utils/CollectionUtils.java b/processing/src/main/java/org/apache/druid/utils/CollectionUtils.java
index d4e1adf..15c11eb 100644
--- a/processing/src/main/java/org/apache/druid/utils/CollectionUtils.java
+++ b/processing/src/main/java/org/apache/druid/utils/CollectionUtils.java
@@ -33,6 +33,7 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.Spliterator;
 import java.util.TreeSet;
@@ -129,6 +130,7 @@
    * can be replaced with Guava's implementation once Druid has upgraded its Guava dependency to a sufficient version.
    *
    * @param expectedSize the expected size of the LinkedHashMap
+   *
    * @return LinkedHashMap object with appropriate size based on callers expectedSize
    */
   @SuppressForbidden(reason = "java.util.LinkedHashMap#<init>(int)")
@@ -184,6 +186,27 @@
     return result;
   }
 
+  /**
+   * Like {@link Iterables#getOnlyElement(Iterable)}, but allows a customizable error message.
+   */
+  public static <T, I extends Iterable<T>, X extends Throwable> T getOnlyElement(
+      final I iterable,
+      final Function<? super I, ? extends X> exceptionSupplier
+  ) throws X
+  {
+    final Iterator<T> iterator = iterable.iterator();
+    try {
+      final T object = iterator.next();
+      if (iterator.hasNext()) {
+        throw exceptionSupplier.apply(iterable);
+      }
+      return object;
+    }
+    catch (NoSuchElementException e) {
+      throw exceptionSupplier.apply(iterable);
+    }
+  }
+
   private CollectionUtils()
   {
   }
diff --git a/processing/src/test/java/org/apache/druid/utils/CollectionUtilsTest.java b/processing/src/test/java/org/apache/druid/utils/CollectionUtilsTest.java
index 522b9e0..d84a56b 100644
--- a/processing/src/test/java/org/apache/druid/utils/CollectionUtilsTest.java
+++ b/processing/src/test/java/org/apache/druid/utils/CollectionUtilsTest.java
@@ -19,13 +19,18 @@
 
 package org.apache.druid.utils;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.ISE;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
 import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
 
+import java.util.Collections;
 import java.util.Set;
 
-import static org.junit.Assert.assertEquals;
-
 public class CollectionUtilsTest
 {
   // When Java 9 is allowed, use Set.of().
@@ -37,28 +42,57 @@
   @Test
   public void testSubtract()
   {
-    assertEquals(empty, CollectionUtils.subtract(empty, empty));
-    assertEquals(abc, CollectionUtils.subtract(abc, empty));
-    assertEquals(empty, CollectionUtils.subtract(abc, abc));
-    assertEquals(abc, CollectionUtils.subtract(abc, efg));
-    assertEquals(ImmutableSet.of("a"), CollectionUtils.subtract(abc, bcd));
+    Assert.assertEquals(empty, CollectionUtils.subtract(empty, empty));
+    Assert.assertEquals(abc, CollectionUtils.subtract(abc, empty));
+    Assert.assertEquals(empty, CollectionUtils.subtract(abc, abc));
+    Assert.assertEquals(abc, CollectionUtils.subtract(abc, efg));
+    Assert.assertEquals(ImmutableSet.of("a"), CollectionUtils.subtract(abc, bcd));
   }
 
   @Test
   public void testIntersect()
   {
-    assertEquals(empty, CollectionUtils.intersect(empty, empty));
-    assertEquals(abc, CollectionUtils.intersect(abc, abc));
-    assertEquals(empty, CollectionUtils.intersect(abc, efg));
-    assertEquals(ImmutableSet.of("b", "c"), CollectionUtils.intersect(abc, bcd));
+    Assert.assertEquals(empty, CollectionUtils.intersect(empty, empty));
+    Assert.assertEquals(abc, CollectionUtils.intersect(abc, abc));
+    Assert.assertEquals(empty, CollectionUtils.intersect(abc, efg));
+    Assert.assertEquals(ImmutableSet.of("b", "c"), CollectionUtils.intersect(abc, bcd));
   }
 
   @Test
   public void testUnion()
   {
-    assertEquals(empty, CollectionUtils.union(empty, empty));
-    assertEquals(abc, CollectionUtils.union(abc, abc));
-    assertEquals(ImmutableSet.of("a", "b", "c", "e", "f", "g"), CollectionUtils.union(abc, efg));
-    assertEquals(ImmutableSet.of("a", "b", "c", "d"), CollectionUtils.union(abc, bcd));
+    Assert.assertEquals(empty, CollectionUtils.union(empty, empty));
+    Assert.assertEquals(abc, CollectionUtils.union(abc, abc));
+    Assert.assertEquals(ImmutableSet.of("a", "b", "c", "e", "f", "g"), CollectionUtils.union(abc, efg));
+    Assert.assertEquals(ImmutableSet.of("a", "b", "c", "d"), CollectionUtils.union(abc, bcd));
+  }
+
+  @Test
+  public void testGetOnlyElement_empty()
+  {
+    final IllegalStateException e = Assert.assertThrows(
+        IllegalStateException.class,
+        () -> CollectionUtils.getOnlyElement(Collections.emptyList(), xs -> new ISE("oops"))
+    );
+    MatcherAssert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("oops")));
+  }
+
+  @Test
+  public void testGetOnlyElement_one()
+  {
+    Assert.assertEquals(
+        "a",
+        CollectionUtils.getOnlyElement(Collections.singletonList("a"), xs -> new ISE("oops"))
+    );
+  }
+
+  @Test
+  public void testGetOnlyElement_two()
+  {
+    final IllegalStateException e = Assert.assertThrows(
+        IllegalStateException.class,
+        () -> CollectionUtils.getOnlyElement(ImmutableList.of("a", "b"), xs -> new ISE("oops"))
+    );
+    MatcherAssert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("oops")));
   }
 }