MSQ: Fix validation of time position in collations. (#16961)

* MSQ: Fix validation of time position in collations.

It is possible for the collation to refer to a field that isn't mapped,
such as when the DML includes "CLUSTERED BY some_function(some_field)".
In this case, the collation refers to a projected column that is not
part of the field mappings. Prior to this patch, that would lead to an
out of bounds list access on fieldMappings.

This patch fixes the problem by identifying the position of __time in
the fieldMappings first, rather than retrieving each collation field
from fieldMappings.

Fixes a bug introduced in #16849.

* Fix test. Better warning message.
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index 24fbd37..01d8780 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -479,22 +479,29 @@
         );
       }
     } else if (!rootCollation.getFieldCollations().isEmpty()) {
-      int timePosition = -1;
-      for (int i = 0; i < rootCollation.getFieldCollations().size(); i++) {
-        final String fieldCollationName =
-            fieldMappings.get(rootCollation.getFieldCollations().get(i).getFieldIndex()).getValue();
-        if (ColumnHolder.TIME_COLUMN_NAME.equals(fieldCollationName)) {
-          timePosition = i;
+      int timePositionInRow = -1;
+      for (int i = 0; i < fieldMappings.size(); i++) {
+        Entry<Integer, String> entry = fieldMappings.get(i);
+        if (ColumnHolder.TIME_COLUMN_NAME.equals(entry.getValue())) {
+          timePositionInRow = i;
           break;
         }
       }
 
-      if (timePosition > 0) {
+      int timePositionInCollation = -1;
+      for (int i = 0; i < rootCollation.getFieldCollations().size(); i++) {
+        if (rootCollation.getFieldCollations().get(i).getFieldIndex() == timePositionInRow) {
+          timePositionInCollation = i;
+          break;
+        }
+      }
+
+      if (timePositionInCollation > 0) {
         throw InvalidSqlInput.exception(
             "Sort order (CLUSTERED BY) cannot include[%s] in position[%d] unless context parameter[%s] "
             + "is set to[false]. %s",
             ColumnHolder.TIME_COLUMN_NAME,
-            timePosition,
+            timePositionInCollation,
             MultiStageQueryContext.CTX_FORCE_TIME_SORT,
             DimensionsSpec.WARNING_NON_TIME_SORT_ORDER
         );
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 72541046..798a27a 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -278,6 +278,86 @@
   @ParameterizedTest(name = "{index}:with context {0}")
   public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String contextName, Map<String, Object> context)
   {
+    // Tests [CLUSTERED BY LOWER(dim1)], i.e. an expression that is not actually stored.
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("dim1", ColumnType.STRING)
+                                            .add("m1", ColumnType.FLOAT)
+                                            .build();
+
+    DataSegment existingDataSegment0 = DataSegment.builder()
+                                                  .interval(Intervals.of("2000-01-01T/2000-01-04T"))
+                                                  .size(50)
+                                                  .version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    DataSegment existingDataSegment1 = DataSegment.builder()
+                                                  .interval(Intervals.of("2001-01-01T/2001-01-04T"))
+                                                  .size(50)
+                                                  .version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    Mockito.doCallRealMethod()
+           .doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1))
+           .when(testTaskActionClient)
+           .submit(new RetrieveUsedSegmentsAction("foo", ImmutableList.of(Intervals.ETERNITY)));
+
+    testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL "
+                             + "SELECT __time, dim1, m1 "
+                             + "FROM foo "
+                             + "PARTITIONED BY ALL "
+                             + "CLUSTERED BY LOWER(dim1)")
+                     .setExpectedDataSource("foo")
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(context)
+                     .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+                     .setExpectedSegments(
+                         ImmutableSet.of(
+                             SegmentId.of("foo", Intervals.ETERNITY, "test", 0)
+                         )
+                     )
+                     .setExpectedShardSpec(NumberedShardSpec.class)
+                     .setExpectedResultRows(
+                         ImmutableList.of(
+                             new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null, 1.0f},
+                             new Object[]{946771200000L, "10.1", 2.0f},
+                             new Object[]{946857600000L, "2", 3.0f},
+                             new Object[]{978307200000L, "1", 4.0f},
+                             new Object[]{978393600000L, "def", 5.0f},
+                             new Object[]{978480000000L, "abc", 6.0f}
+                         )
+                     )
+                     .setExpectedSegmentGenerationProgressCountersForStageWorker(
+                         CounterSnapshotMatcher
+                             .with().segmentRowsProcessed(6),
+                         1, 0
+                     )
+                     .setExpectedLastCompactionState(
+                         expectedCompactionState(
+                             context,
+                             Collections.emptyList(),
+                             DimensionsSpec.builder()
+                                           .setDimensions(
+                                               ImmutableList.of(
+                                                   new StringDimensionSchema("dim1"),
+                                                   new FloatDimensionSchema("m1")
+                                               )
+                                           )
+                                           .setDimensionExclusions(Collections.singletonList("__time"))
+                                           .build(),
+                             GranularityType.ALL,
+                             Intervals.ETERNITY
+                         )
+                     )
+                     .verifyResults();
+  }
+
+  @MethodSource("data")
+  @ParameterizedTest(name = "{index}:with context {0}")
+  public void testReplaceOnFooWithAllClusteredByExpression(String contextName, Map<String, Object> context)
+  {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("dim1", ColumnType.STRING)
                                             .add("__time", ColumnType.LONG)
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
index e529aa2..efc3718 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
@@ -60,8 +60,10 @@
   public static final String WARNING_NON_TIME_SORT_ORDER = StringUtils.format(
       "Warning: support for segments not sorted by[%s] is experimental. Such segments are not readable by older "
       + "version of Druid, and certain queries cannot run on them. See "
-      + "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting for details before using this option.",
-      ColumnHolder.TIME_COLUMN_NAME
+      + "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting for details before setting "
+      + "%s to[false].",
+      ColumnHolder.TIME_COLUMN_NAME,
+      PARAMETER_FORCE_TIME_SORT
   );
 
   public static final boolean DEFAULT_FORCE_TIME_SORT = true;