INSERT/REPLACE complex target column types are validated against source input expressions (#16223)

* * fix

* * fix

* * address review comments

* * fix

* * simplify tests

* * fix complex type nullability issue

* * address review comments

* * address test review comments

* * fix checkstyle
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java
index d4a97e6..49ebf3f 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java
@@ -31,7 +31,6 @@
 import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5;
 import org.apache.druid.sql.calcite.CalciteCatalogInsertTest;
 import org.apache.druid.sql.calcite.planner.CatalogResolver;
-import org.apache.druid.sql.calcite.table.DatasourceTable;
 import org.apache.druid.sql.calcite.util.SqlTestFramework;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -69,8 +68,8 @@
 
   public void buildDatasources()
   {
-    resolvedTables.forEach((datasourceName, datasourceTable) -> {
-      DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
+    RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> {
+      DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata();
       TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
       catalogMetadata.columnFacades().forEach(
           columnFacade -> {
@@ -92,14 +91,6 @@
 
       createTableMetadata(tableBuilder.build());
     });
-    DatasourceFacade catalogMetadata =
-        ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
-    TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
-    catalogMetadata.columnFacades().forEach(
-        columnFacade -> {
-          tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType());
-        }
-    );
   }
 
   private void createTableMetadata(TableMetadata table)
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java
index 34011fb..31e0a34 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java
@@ -31,7 +31,6 @@
 import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5;
 import org.apache.druid.sql.calcite.CalciteCatalogReplaceTest;
 import org.apache.druid.sql.calcite.planner.CatalogResolver;
-import org.apache.druid.sql.calcite.table.DatasourceTable;
 import org.apache.druid.sql.calcite.util.SqlTestFramework;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -68,8 +67,8 @@
 
   public void buildDatasources()
   {
-    resolvedTables.forEach((datasourceName, datasourceTable) -> {
-      DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
+    RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> {
+      DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata();
       TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
       catalogMetadata.columnFacades().forEach(
           columnFacade -> {
@@ -92,7 +91,7 @@
       createTableMetadata(tableBuilder.build());
     });
     DatasourceFacade catalogMetadata =
-        ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
+        RESOLVED_TABLES.get("foo").effectiveMetadata().catalogMetadata();
     TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
     catalogMetadata.columnFacades().forEach(
         columnFacade -> {
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
index 0d3045c..03dbbd4 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
@@ -41,7 +41,6 @@
 import org.apache.calcite.sql.SqlWindow;
 import org.apache.calcite.sql.SqlWith;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.IdentifierNamespace;
@@ -54,19 +53,21 @@
 import org.apache.calcite.util.Static;
 import org.apache.calcite.util.Util;
 import org.apache.druid.catalog.model.facade.DatasourceFacade;
-import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.error.InvalidSqlInput;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.Types;
+import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
 import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
 import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
 import org.apache.druid.sql.calcite.run.EngineFeature;
 import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.sql.calcite.table.RowSignatures;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.ArrayList;
@@ -474,19 +475,11 @@
         fields.add(Pair.of(colName, sourceField.getType()));
         continue;
       }
-      SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType());
-      RelDataType relType = typeFactory.createSqlType(sqlTypeName);
-      if (NullHandling.replaceWithDefault() && !SqlTypeFamily.STRING.contains(relType)) {
-        fields.add(Pair.of(
-            colName,
-            relType
-        ));
-      } else {
-        fields.add(Pair.of(
-            colName,
-            typeFactory.createTypeWithNullability(relType, true)
-        ));
-      }
+      RelDataType relType = computeTypeForDefinedCol(definedCol, sourceField);
+      fields.add(Pair.of(
+          colName,
+          typeFactory.createTypeWithNullability(relType, sourceField.getType().isNullable())
+      ));
     }
 
     // Perform the SQL-standard check: that the SELECT column can be
@@ -516,8 +509,14 @@
       RelDataType targetFieldRelDataType = targetFields.get(i).getType();
       ColumnType sourceFieldColumnType = Calcites.getColumnTypeForRelDataType(sourceFielRelDataType);
       ColumnType targetFieldColumnType = Calcites.getColumnTypeForRelDataType(targetFieldRelDataType);
-
-      if (targetFieldColumnType != ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType)) {
+      try {
+        if (!Objects.equals(
+            targetFieldColumnType,
+            ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType))) {
+          throw new Types.IncompatibleTypeException(targetFieldColumnType, sourceFieldColumnType);
+        }
+      }
+      catch (Types.IncompatibleTypeException e) {
         SqlNode node = getNthExpr(query, i, sourceCount);
         String targetTypeString;
         String sourceTypeString;
@@ -534,12 +533,39 @@
             Static.RESOURCE.typeNotAssignable(
                 targetFields.get(i).getName(), targetTypeString,
                 sourceFields.get(i).getName(), sourceTypeString));
+
       }
     }
     // the call to base class definition will insert implicit casts / coercions where needed.
     super.checkTypeAssignment(sourceScope, table, sourceRowType, targetRowType, query);
   }
 
+  protected RelDataType computeTypeForDefinedCol(
+      final DatasourceFacade.ColumnFacade definedCol,
+      final RelDataTypeField sourceField
+  )
+  {
+    SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType());
+    RelDataType relType;
+    if (sqlTypeName != null) {
+      relType = typeFactory.createSqlType(sqlTypeName);
+    } else {
+      ColumnType columnType = ColumnType.fromString(definedCol.sqlStorageType());
+      if (columnType != null && columnType.getType().equals(ValueType.COMPLEX)) {
+        relType = RowSignatures.makeComplexType(typeFactory, columnType, sourceField.getType().isNullable());
+      } else {
+        relType = RowSignatures.columnTypeToRelDataType(
+            typeFactory,
+            columnType,
+            // this nullability is ignored for complex types for some reason, hence the check for complex above.
+            sourceField.getType().isNullable()
+        );
+      }
+    }
+
+    return relType;
+  }
+
   /**
    * Locates the n'th expression in an INSERT or UPDATE query.
    *
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
index 2d3fb5d..4715096 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
@@ -29,16 +29,44 @@
 import org.apache.druid.catalog.model.TableSpec;
 import org.apache.druid.catalog.model.facade.DatasourceFacade;
 import org.apache.druid.catalog.model.table.DatasourceDefn;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.external.ExternalDataSource;
+import org.apache.druid.sql.calcite.external.Externals;
+import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.planner.CatalogResolver;
 import org.apache.druid.sql.calcite.table.DatasourceTable;
 import org.apache.druid.sql.calcite.table.DruidTable;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.jupiter.api.Test;
 
-public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest
+public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest
 {
-  public ImmutableMap<String, DruidTable> resolvedTables = ImmutableMap.of(
+  private final String operationName;
+  private final String dmlPrefixPattern;
+
+  public CalciteCatalogIngestionDmlTest()
+  {
+    this.operationName = getOperationName();
+    this.dmlPrefixPattern = getDmlPrefixPattern();
+  }
+
+  public abstract String getOperationName();
+  public abstract String getDmlPrefixPattern();
+
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+  public static ImmutableMap<String, DatasourceTable> RESOLVED_TABLES = ImmutableMap.of(
       "hourDs", new DatasourceTable(
           RowSignature.builder().addTimeColumn().build(),
           new DatasourceTable.PhysicalDatasourceMetadata(
@@ -152,7 +180,8 @@
                           new ColumnSpec("dim3", Columns.STRING, null),
                           new ColumnSpec("cnt", Columns.LONG, null),
                           new ColumnSpec("m1", Columns.FLOAT, null),
-                          new ColumnSpec("m2", Columns.DOUBLE, null)
+                          new ColumnSpec("m2", Columns.DOUBLE, null),
+                          new ColumnSpec("unique_dim1", HyperUniquesAggregatorFactory.TYPE.asTypeString(), null)
                       )
                   ),
                   MAPPER
@@ -198,8 +227,6 @@
       )
   );
 
-  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
-
   @Override
   public CatalogResolver createCatalogResolver()
   {
@@ -210,11 +237,492 @@
           final DatasourceTable.PhysicalDatasourceMetadata dsMetadata
       )
       {
-        if (resolvedTables.get(tableName) != null) {
-          return resolvedTables.get(tableName);
+        if (RESOLVED_TABLES.get(tableName) != null) {
+          return RESOLVED_TABLES.get(tableName);
         }
         return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
       }
     };
   }
+
+  /**
+   * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
+   * value from the catalog.
+   */
+  @Test
+  public void testInsertHourGrainPartitonedByFromCatalog()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
+             "SELECT * FROM foo")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                // Scan query lists columns in alphabetical order independent of the
+                // SQL project list or the defined schema.
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
+                .context(queryContextWithGranularity(Granularities.HOUR))
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * If the segment grain is given in the catalog, and also by PARTITIONED BY, then
+   * the query value is used.
+   */
+  @Test
+  public void testInsertHourGrainWithDayPartitonedByFromQuery()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
+             "SELECT * FROM foo\n" +
+             "PARTITIONED BY day")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                // Scan query lists columns in alphabetical order independent of the
+                // SQL project list or the defined schema.
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
+                .context(queryContextWithGranularity(Granularities.DAY))
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
+   * validation error.
+   */
+  @Test
+  public void testInsertNoPartitonedByFromCatalog()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
+             "SELECT * FROM foo")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectValidationError(
+            DruidException.class,
+            StringUtils.format("Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", operationName)
+        )
+        .verify();
+  }
+
+  /**
+   * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then
+   * the query value is used.
+   */
+  @Test
+  public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
+             "SELECT * FROM foo\n" +
+             "PARTITIONED BY day")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                // Scan query lists columns in alphabetical order independent of the
+                // SQL project list or the defined schema.
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
+                .context(queryContextWithGranularity(Granularities.DAY))
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * Adding a new column during ingestion that is not defined in a non-sealed table should succeed.
+   */
+  @Test
+  public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
+  {
+    ExternalDataSource externalDataSource = new ExternalDataSource(
+        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
+        RowSignature.builder()
+            .add("a", ColumnType.STRING)
+            .add("b", ColumnType.STRING)
+            .add("c", ColumnType.LONG)
+            .add("d", ColumnType.STRING)
+            .add("e", ColumnType.STRING)
+            .build()
+    );
+    final RowSignature signature = RowSignature.builder()
+        .add("__time", ColumnType.LONG)
+        .add("dim1", ColumnType.STRING)
+        .add("cnt", ColumnType.LONG)
+        .add("m2", ColumnType.DOUBLE)
+        .add("extra2", ColumnType.LONG)
+        .add("extra3", ColumnType.STRING)
+        .build();
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  1 AS cnt,\n" +
+             "  c AS m2,\n" +
+             "  CAST(d AS BIGINT) AS extra2,\n" +
+             "  e AS extra3\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             "PARTITIONED BY ALL TIME")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("foo", signature)
+        .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(
+                    expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+                    expressionVirtualColumn("v1", "1", ColumnType.LONG),
+                    expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
+                    expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
+                )
+                // Scan query lists columns in alphabetical order independent of the
+                // SQL project list or the defined schema.
+                .columns("b", "e", "v0", "v1", "v2", "v3")
+                .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed.
+   */
+  @Test
+  public void testGroupByInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
+  {
+    ExternalDataSource externalDataSource = new ExternalDataSource(
+        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
+        RowSignature.builder()
+            .add("a", ColumnType.STRING)
+            .add("b", ColumnType.STRING)
+            .add("c", ColumnType.LONG)
+            .add("d", ColumnType.STRING)
+            .add("e", ColumnType.STRING)
+            .build()
+    );
+    final RowSignature signature = RowSignature.builder()
+        .add("__time", ColumnType.LONG)
+        .add("dim1", ColumnType.STRING)
+        .add("cnt", ColumnType.LONG)
+        .add("m2", ColumnType.DOUBLE)
+        .add("extra2", ColumnType.LONG)
+        .add("extra3", ColumnType.STRING)
+        .add("extra4_complex", ColumnType.LONG)
+        .build();
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  1 AS cnt,\n" +
+             "  c AS m2,\n" +
+             "  CAST(d AS BIGINT) AS extra2,\n" +
+             "  e AS extra3,\n" +
+             "  APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             "GROUP BY 1,2,3,4,5,6\n" +
+             "PARTITIONED BY ALL TIME"
+        )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("foo", signature)
+        .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
+        .expectQuery(
+            GroupByQuery.builder()
+                .setDataSource(externalDataSource)
+                .setGranularity(Granularities.ALL)
+                .setInterval(querySegmentSpec(Filtration.eternity()))
+                .setVirtualColumns(
+                    expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG)
+                )
+                .setDimensions(
+                    dimensions(
+                        new DefaultDimensionSpec("v0", "d0", ColumnType.LONG),
+                        new DefaultDimensionSpec("b", "d1", ColumnType.STRING),
+                        new DefaultDimensionSpec("c", "d3", ColumnType.LONG),
+                        new DefaultDimensionSpec("d", "d4", ColumnType.LONG),
+                        new DefaultDimensionSpec("e", "d5", ColumnType.STRING)
+                    )
+                )
+                .setAggregatorSpecs(
+                    new CardinalityAggregatorFactory(
+                        "a0",
+                        null,
+                        ImmutableList.of(
+                            new DefaultDimensionSpec(
+                                "c",
+                                "c",
+                                ColumnType.LONG
+                            )
+                        ),
+                        false,
+                        true
+                    )
+                )
+                .setPostAggregatorSpecs(
+                    expressionPostAgg("p0", "1", ColumnType.LONG),
+                    expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE)
+                )
+                // Scan query lists columns in alphabetical order independent of the
+                // SQL project list or the defined schema.
+                .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * Adding a new column during ingestion that is not defined in a sealed table should fail with
+   * proper validation error.
+   */
+  @Test
+  public void testInsertAddNonDefinedColumnIntoSealedCatalogTable()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "fooSealed") + "\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  1 AS cnt,\n" +
+             "  c AS m2,\n" +
+             "  CAST(d AS BIGINT) AS extra2\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             "PARTITIONED BY ALL TIME")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectValidationError(
+            DruidException.class,
+            "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema"
+        )
+        .verify();
+  }
+
+
+  /**
+   * Inserting into a catalog table with a WITH source succeeds
+   */
+  @Test
+  public void testInsertWithSourceIntoCatalogTable()
+  {
+    ExternalDataSource externalDataSource = new ExternalDataSource(
+        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
+        RowSignature.builder()
+            .add("a", ColumnType.STRING)
+            .add("b", ColumnType.STRING)
+            .add("c", ColumnType.LONG)
+            .add("d", ColumnType.STRING)
+            .add("e", ColumnType.STRING)
+            .build()
+    );
+    final RowSignature signature = RowSignature.builder()
+        .add("__time", ColumnType.LONG)
+        .add("dim1", ColumnType.STRING)
+        .add("cnt", ColumnType.LONG)
+        .add("m2", ColumnType.DOUBLE)
+        .add("extra2", ColumnType.LONG)
+        .add("extra3", ColumnType.STRING)
+        .build();
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+             "WITH \"ext\" AS (\n" +
+             "  SELECT *\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             ")\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  1 AS cnt,\n" +
+             "  c AS m2,\n" +
+             "  CAST(d AS BIGINT) AS extra2,\n" +
+             "  e AS extra3\n" +
+             "FROM \"ext\"\n" +
+             "PARTITIONED BY ALL TIME")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("foo", signature)
+        .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(
+                    expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+                    expressionVirtualColumn("v1", "1", ColumnType.LONG),
+                    expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
+                    expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
+                )
+                // Scan query lists columns in alphabetical order independent of the
+                // SQL project list or the defined schema.
+                .columns("b", "e", "v0", "v1", "v2", "v3")
+                .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed.
+   */
+  @Test
+  public void testGroupByInsertWithSourceIntoCatalogTable()
+  {
+    ExternalDataSource externalDataSource = new ExternalDataSource(
+        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
+        RowSignature.builder()
+            .add("a", ColumnType.STRING)
+            .add("b", ColumnType.STRING)
+            .add("c", ColumnType.LONG)
+            .add("d", ColumnType.STRING)
+            .add("e", ColumnType.STRING)
+            .build()
+    );
+    final RowSignature signature = RowSignature.builder()
+        .add("__time", ColumnType.LONG)
+        .add("dim1", ColumnType.STRING)
+        .add("cnt", ColumnType.LONG)
+        .add("m2", ColumnType.DOUBLE)
+        .add("extra2", ColumnType.LONG)
+        .add("extra3", ColumnType.STRING)
+        .add("extra4_complex", ColumnType.LONG)
+        .build();
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+             "WITH \"ext\" AS (\n" +
+             "  SELECT *\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             ")\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  1 AS cnt,\n" +
+             "  c AS m2,\n" +
+             "  CAST(d AS BIGINT) AS extra2,\n" +
+             "  e AS extra3,\n" +
+             "  APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" +
+             "FROM \"ext\"\n" +
+             "GROUP BY 1,2,3,4,5,6\n" +
+             "PARTITIONED BY ALL TIME"
+        )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("foo", signature)
+        .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
+        .expectQuery(
+            GroupByQuery.builder()
+                .setDataSource(externalDataSource)
+                .setGranularity(Granularities.ALL)
+                .setInterval(querySegmentSpec(Filtration.eternity()))
+                .setVirtualColumns(
+                    expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG)
+                )
+                .setDimensions(
+                    dimensions(
+                        new DefaultDimensionSpec("v0", "d0", ColumnType.LONG),
+                        new DefaultDimensionSpec("b", "d1", ColumnType.STRING),
+                        new DefaultDimensionSpec("c", "d3", ColumnType.LONG),
+                        new DefaultDimensionSpec("d", "d4", ColumnType.LONG),
+                        new DefaultDimensionSpec("e", "d5", ColumnType.STRING)
+                    )
+                )
+                .setAggregatorSpecs(
+                    new CardinalityAggregatorFactory(
+                        "a0",
+                        null,
+                        ImmutableList.of(
+                            new DefaultDimensionSpec(
+                                "c",
+                                "c",
+                                ColumnType.LONG
+                            )
+                        ),
+                        false,
+                        true
+                    )
+                )
+                .setPostAggregatorSpecs(
+                    expressionPostAgg("p0", "1", ColumnType.LONG),
+                    expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE)
+                )
+                // Scan query lists columns in alphabetical order independent of the
+                // SQL project list or the defined schema.
+                .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testInsertIntoExistingStrictNoDefinedSchema()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "strictTableWithNoDefinedSchema") + " SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME")
+        .expectValidationError(
+            DruidException.class,
+            "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema")
+        .verify();
+  }
+
+  @Test
+  public void testInsertIntoExistingWithIncompatibleTypeAssignment()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+             + "SELECT\n"
+             + "  __time AS __time,\n"
+             + "  ARRAY[dim1] AS dim1\n"
+             + "FROM foo\n"
+             + "PARTITIONED BY ALL TIME")
+        .expectValidationError(
+            DruidException.class,
+            "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])")
+        .verify();
+  }
+
+  @Test
+  public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+             + "SELECT\n"
+             + "  __time AS __time,\n"
+             + "  ARRAY[dim1] AS unique_dim1\n"
+             + "FROM foo\n"
+             + "PARTITIONED BY ALL TIME")
+        .expectValidationError(
+            DruidException.class,
+            "Cannot assign to target field 'unique_dim1' of type COMPLEX<hyperUnique> from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])")
+        .verify();
+  }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java
index af45896..fff5ca9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java
@@ -19,302 +19,20 @@
 
 package org.apache.druid.sql.calcite;
 
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.InlineInputSource;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.external.ExternalDataSource;
-import org.apache.druid.sql.calcite.external.Externals;
-import org.apache.druid.sql.calcite.filtration.Filtration;
-import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.junit.jupiter.api.Test;
-
 /**
  * Test for INSERT DML statements for tables defined in catalog.
  */
 public class CalciteCatalogInsertTest extends CalciteCatalogIngestionDmlTest
 {
-  /**
-   * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
-   * value from the catalog.
-   */
-  @Test
-  public void testInsertHourGrainPartitonedByFromCatalog()
+  @Override
+  public String getOperationName()
   {
-    testIngestionQuery()
-        .sql("INSERT INTO hourDs\n" +
-             "SELECT * FROM foo")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
-        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource("foo")
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                // Scan query lists columns in alphabetical order independent of the
-                // SQL project list or the defined schema.
-                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
-                .context(queryContextWithGranularity(Granularities.HOUR))
-                .build()
-        )
-        .verify();
+    return "INSERT";
   }
 
-  /**
-   * If the segment grain is given in the catalog, and also by PARTITIONED BY, then
-   * the query value is used.
-   */
-  @Test
-  public void testInsertHourGrainWithDayPartitonedByFromQuery()
+  @Override
+  public String getDmlPrefixPattern()
   {
-    testIngestionQuery()
-        .sql("INSERT INTO hourDs\n" +
-             "SELECT * FROM foo\n" +
-             "PARTITIONED BY day")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
-        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource("foo")
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                // Scan query lists columns in alphabetical order independent of the
-                // SQL project list or the defined schema.
-                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
-                .context(queryContextWithGranularity(Granularities.DAY))
-                .build()
-        )
-        .verify();
-  }
-
-  /**
-   * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
-   * validation error.
-   */
-  @Test
-  public void testInsertNoPartitonedByFromCatalog()
-  {
-    testIngestionQuery()
-        .sql("INSERT INTO noPartitonedBy\n" +
-             "SELECT * FROM foo")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectValidationError(
-            DruidException.class,
-            "Operation [INSERT] requires a PARTITIONED BY to be explicitly defined, but none was found."
-        )
-        .verify();
-  }
-
-  /**
-   * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then
-   * the query value is used.
-   */
-  @Test
-  public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
-  {
-    testIngestionQuery()
-        .sql("INSERT INTO noPartitonedBy\n" +
-             "SELECT * FROM foo\n" +
-             "PARTITIONED BY day")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
-        .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource("foo")
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                // Scan query lists columns in alphabetical order independent of the
-                // SQL project list or the defined schema.
-                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
-                .context(queryContextWithGranularity(Granularities.DAY))
-                .build()
-        )
-        .verify();
-  }
-
-  /**
-   * Adding a new column during ingestion that is not defined in a non-sealed table should succeed.
-   */
-  @Test
-  public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
-  {
-    ExternalDataSource externalDataSource = new ExternalDataSource(
-        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
-        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
-        RowSignature.builder()
-            .add("a", ColumnType.STRING)
-            .add("b", ColumnType.STRING)
-            .add("c", ColumnType.LONG)
-            .add("d", ColumnType.STRING)
-            .add("e", ColumnType.STRING)
-            .build()
-    );
-    final RowSignature signature = RowSignature.builder()
-        .add("__time", ColumnType.LONG)
-        .add("dim1", ColumnType.STRING)
-        .add("cnt", ColumnType.LONG)
-        .add("m2", ColumnType.DOUBLE)
-        .add("extra2", ColumnType.LONG)
-        .add("extra3", ColumnType.STRING)
-        .build();
-    testIngestionQuery()
-        .sql("INSERT INTO foo\n" +
-             "SELECT\n" +
-             "  TIME_PARSE(a) AS __time,\n" +
-             "  b AS dim1,\n" +
-             "  1 AS cnt,\n" +
-             "  c AS m2,\n" +
-             "  CAST(d AS BIGINT) AS extra2,\n" +
-             "  e AS extra3\n" +
-             "FROM TABLE(inline(\n" +
-             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
-             "  format => 'csv'))\n" +
-             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
-             "PARTITIONED BY ALL TIME")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("foo", signature)
-        .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource(externalDataSource)
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                .virtualColumns(
-                    expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
-                    expressionVirtualColumn("v1", "1", ColumnType.LONG),
-                    expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
-                    expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
-                )
-                // Scan query lists columns in alphabetical order independent of the
-                // SQL project list or the defined schema.
-                .columns("b", "e", "v0", "v1", "v2", "v3")
-                .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
-                .build()
-        )
-        .verify();
-  }
-
-  /**
-   * Adding a new column during ingestion that is not defined in a sealed table should fail with
-   * proper validation error.
-   */
-  @Test
-  public void testInsertAddNonDefinedColumnIntoSealedCatalogTable()
-  {
-    testIngestionQuery()
-        .sql("INSERT INTO fooSealed\n" +
-             "SELECT\n" +
-             "  TIME_PARSE(a) AS __time,\n" +
-             "  b AS dim1,\n" +
-             "  1 AS cnt,\n" +
-             "  c AS m2,\n" +
-             "  CAST(d AS BIGINT) AS extra2\n" +
-             "FROM TABLE(inline(\n" +
-             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
-             "  format => 'csv'))\n" +
-             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
-             "PARTITIONED BY ALL TIME")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectValidationError(
-            DruidException.class,
-            "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema"
-        )
-        .verify();
-  }
-
-
-  /**
-   * Inserting into a catalog table with a WITH source succeeds
-   */
-  @Test
-  public void testInsertWithSourceIntoCatalogTable()
-  {
-    ExternalDataSource externalDataSource = new ExternalDataSource(
-        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
-        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
-        RowSignature.builder()
-            .add("a", ColumnType.STRING)
-            .add("b", ColumnType.STRING)
-            .add("c", ColumnType.LONG)
-            .add("d", ColumnType.STRING)
-            .add("e", ColumnType.STRING)
-            .build()
-    );
-    final RowSignature signature = RowSignature.builder()
-        .add("__time", ColumnType.LONG)
-        .add("dim1", ColumnType.STRING)
-        .add("cnt", ColumnType.LONG)
-        .add("m2", ColumnType.DOUBLE)
-        .add("extra2", ColumnType.LONG)
-        .add("extra3", ColumnType.STRING)
-        .build();
-    testIngestionQuery()
-        .sql("INSERT INTO \"foo\"\n" +
-             "WITH \"ext\" AS (\n" +
-             "  SELECT *\n" +
-             "FROM TABLE(inline(\n" +
-             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
-             "  format => 'csv'))\n" +
-             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
-             ")\n" +
-             "SELECT\n" +
-             "  TIME_PARSE(a) AS __time,\n" +
-             "  b AS dim1,\n" +
-             "  1 AS cnt,\n" +
-             "  c AS m2,\n" +
-             "  CAST(d AS BIGINT) AS extra2,\n" +
-             "  e AS extra3\n" +
-             "FROM \"ext\"\n" +
-             "PARTITIONED BY ALL TIME")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("foo", signature)
-        .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource(externalDataSource)
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                .virtualColumns(
-                    expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
-                    expressionVirtualColumn("v1", "1", ColumnType.LONG),
-                    expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
-                    expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
-                )
-                // Scan query lists columns in alphabetical order independent of the
-                // SQL project list or the defined schema.
-                .columns("b", "e", "v0", "v1", "v2", "v3")
-                .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
-                .build()
-        )
-        .verify();
-  }
-
-  @Test
-  public void testInsertIntoExistingStrictNoDefinedSchema()
-  {
-    testIngestionQuery()
-        .sql("INSERT INTO strictTableWithNoDefinedSchema SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(
-            DruidException.class,
-            "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema")
-        .verify();
-  }
-
-  @Test
-  public void testInsertIntoExistingWithIncompatibleTypeAssignment()
-  {
-    testIngestionQuery()
-        .sql("INSERT INTO foo\n"
-             + "SELECT\n"
-             + "  __time AS __time,\n"
-             + "  ARRAY[dim1] AS dim1\n"
-             + "FROM foo\n"
-             + "PARTITIONED BY ALL TIME")
-        .expectValidationError(
-            DruidException.class,
-            "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])")
-        .verify();
+    return "INSERT INTO \"%s\"";
   }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java
index f4c6a90..aad2269 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java
@@ -19,298 +19,20 @@
 
 package org.apache.druid.sql.calcite;
 
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.InlineInputSource;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.external.ExternalDataSource;
-import org.apache.druid.sql.calcite.external.Externals;
-import org.apache.druid.sql.calcite.filtration.Filtration;
-import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.junit.jupiter.api.Test;
-
 /**
  * Test for REPLACE DML statements for tables defined in catalog.
  */
 public class CalciteCatalogReplaceTest extends CalciteCatalogIngestionDmlTest
 {
-  /**
-   * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
-   * value from the catalog.
-   */
-  @Test
-  public void testReplaceHourGrainPartitonedByFromCatalog()
+  @Override
+  public String getOperationName()
   {
-    testIngestionQuery()
-        .sql("REPLACE INTO hourDs OVERWRITE ALL\n" +
-             "SELECT * FROM foo")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
-        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource("foo")
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
-                .context(queryContextWithGranularity(Granularities.HOUR))
-                .build()
-        )
-        .verify();
+    return "REPLACE";
   }
 
-  /**
-   * If the segment grain is given in the catalog, and also by PARTITIONED BY, then
-   * the query value is used.
-   */
-  @Test
-  public void testReplaceHourGrainWithDayPartitonedByFromQuery()
+  @Override
+  public String getDmlPrefixPattern()
   {
-    testIngestionQuery()
-        .sql("REPLACE INTO hourDs OVERWRITE ALL\n" +
-             "SELECT *FROM foo\n" +
-             "PARTITIONED BY day")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
-        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource("foo")
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
-                .context(queryContextWithGranularity(Granularities.DAY))
-                .build()
-        )
-        .verify();
-  }
-
-  /**
-   * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
-   * validation error.
-   */
-  @Test
-  public void testInsertNoPartitonedByFromCatalog()
-  {
-    testIngestionQuery()
-        .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" +
-             "SELECT * FROM foo")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectValidationError(
-            DruidException.class,
-            "Operation [REPLACE] requires a PARTITIONED BY to be explicitly defined, but none was found."
-        )
-        .verify();
-  }
-
-  /**
-   * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then
-   * the query value is used.
-   */
-  @Test
-  public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
-  {
-    testIngestionQuery()
-        .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" +
-             "SELECT * FROM foo\n" +
-             "PARTITIONED BY day")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
-        .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource("foo")
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
-                .context(queryContextWithGranularity(Granularities.DAY))
-                .build()
-        )
-        .verify();
-  }
-
-  /**
-   * Adding a new column during ingestion that is not defined in a non-sealed table should succeed.
-   */
-  @Test
-  public void testReplaceAddNonDefinedColumnIntoNonSealedCatalogTable()
-  {
-    ExternalDataSource externalDataSource = new ExternalDataSource(
-        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
-        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
-        RowSignature.builder()
-            .add("a", ColumnType.STRING)
-            .add("b", ColumnType.STRING)
-            .add("c", ColumnType.LONG)
-            .add("d", ColumnType.STRING)
-            .add("e", ColumnType.STRING)
-            .build()
-    );
-    final RowSignature signature = RowSignature.builder()
-        .add("__time", ColumnType.LONG)
-        .add("dim1", ColumnType.STRING)
-        .add("cnt", ColumnType.LONG)
-        .add("m2", ColumnType.DOUBLE)
-        .add("extra2", ColumnType.LONG)
-        .add("extra3", ColumnType.STRING)
-        .build();
-    testIngestionQuery()
-        .sql("REPLACE INTO foo OVERWRITE ALL\n" +
-             "SELECT\n" +
-             "  TIME_PARSE(a) AS __time,\n" +
-             "  b AS dim1,\n" +
-             "  1 AS cnt,\n" +
-             "  c AS m2,\n" +
-             "  CAST(d AS BIGINT) AS extra2,\n" +
-             "  e AS extra3\n" +
-             "FROM TABLE(inline(\n" +
-             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
-             "  format => 'csv'))\n" +
-             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
-             "PARTITIONED BY ALL TIME")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("foo", signature)
-        .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource(externalDataSource)
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                .virtualColumns(
-                    expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
-                    expressionVirtualColumn("v1", "1", ColumnType.LONG),
-                    expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
-                    expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
-                )
-                // Scan query lists columns in alphabetical order independent of the
-                // SQL project list or the defined schema. Here we just check that the
-                // set of columns is correct, but not their order.
-                .columns("b", "e", "v0", "v1", "v2", "v3")
-                .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
-                .build()
-        )
-        .verify();
-  }
-
-  /**
-   * Adding a new column during ingestion that is not defined in a sealed table should fail with
-   * proper validation error.
-   */
-  @Test
-  public void testReplaceAddNonDefinedColumnIntoSealedCatalogTable()
-  {
-    testIngestionQuery()
-        .sql("REPLACE INTO fooSealed OVERWRITE ALL\n" +
-             "SELECT\n" +
-             "  TIME_PARSE(a) AS __time,\n" +
-             "  b AS dim1,\n" +
-             "  1 AS cnt,\n" +
-             "  c AS m2,\n" +
-             "  CAST(d AS BIGINT) AS extra2\n" +
-             "FROM TABLE(inline(\n" +
-             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
-             "  format => 'csv'))\n" +
-             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
-             "PARTITIONED BY ALL TIME")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectValidationError(
-            DruidException.class,
-            "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema"
-        )
-        .verify();
-  }
-
-
-  /**
-   * Replacing into a catalog table with a WITH source succeeds
-   */
-  @Test
-  public void testReplaceWithSourceIntoCatalogTable()
-  {
-    ExternalDataSource externalDataSource = new ExternalDataSource(
-        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
-        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
-        RowSignature.builder()
-            .add("a", ColumnType.STRING)
-            .add("b", ColumnType.STRING)
-            .add("c", ColumnType.LONG)
-            .add("d", ColumnType.STRING)
-            .add("e", ColumnType.STRING)
-            .build()
-    );
-    final RowSignature signature = RowSignature.builder()
-        .add("__time", ColumnType.LONG)
-        .add("dim1", ColumnType.STRING)
-        .add("cnt", ColumnType.LONG)
-        .add("m2", ColumnType.DOUBLE)
-        .add("extra2", ColumnType.LONG)
-        .add("extra3", ColumnType.STRING)
-        .build();
-    testIngestionQuery()
-        .sql("REPLACE INTO \"foo\" OVERWRITE ALL\n" +
-             "WITH \"ext\" AS (\n" +
-             "  SELECT *\n" +
-             "FROM TABLE(inline(\n" +
-             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
-             "  format => 'csv'))\n" +
-             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
-             ")\n" +
-             "SELECT\n" +
-             "  TIME_PARSE(a) AS __time,\n" +
-             "  b AS dim1,\n" +
-             "  1 AS cnt,\n" +
-             "  c AS m2,\n" +
-             "  CAST(d AS BIGINT) AS extra2,\n" +
-             "  e AS extra3\n" +
-             "FROM \"ext\"\n" +
-             "PARTITIONED BY ALL TIME")
-        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
-        .expectTarget("foo", signature)
-        .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
-        .expectQuery(
-            newScanQueryBuilder()
-                .dataSource(externalDataSource)
-                .intervals(querySegmentSpec(Filtration.eternity()))
-                .virtualColumns(
-                    expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
-                    expressionVirtualColumn("v1", "1", ColumnType.LONG),
-                    expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
-                    expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
-                )
-                // Scan query lists columns in alphabetical order independent of the
-                // SQL project list or the defined schema. Here we just check that the
-                // set of columns is correct, but not their order.
-                .columns("b", "e", "v0", "v1", "v2", "v3")
-                .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
-                .build()
-        )
-        .verify();
-  }
-
-  @Test
-  public void testReplaceIntoExistingStrictNoDefinedSchema()
-  {
-    testIngestionQuery()
-        .sql("REPLACE INTO strictTableWithNoDefinedSchema OVERWRITE ALL SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(
-            DruidException.class,
-            "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema")
-        .verify();
-  }
-
-  @Test
-  public void testReplaceIntoExistingWithIncompatibleTypeAssignment()
-  {
-    testIngestionQuery()
-        .sql("REPLACE INTO foo OVERWRITE ALL\n"
-             + "SELECT\n"
-             + "  __time AS __time,\n"
-             + "  ARRAY[dim1] AS dim1\n"
-             + "FROM foo\n"
-             + "PARTITIONED BY ALL TIME")
-        .expectValidationError(
-            DruidException.class,
-            "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])")
-        .verify();
+    return "REPLACE INTO \"%s\" OVERWRITE ALL";
   }
 }