INSERT/REPLACE complex target column types are validated against source input expressions (#16223)
* * fix
* * fix
* * address review comments
* * fix
* * simplify tests
* * fix complex type nullability issue
* * address review comments
* * address test review comments
* * fix checkstyle
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java
index d4a97e6..49ebf3f 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java
@@ -31,7 +31,6 @@
import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5;
import org.apache.druid.sql.calcite.CalciteCatalogInsertTest;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
-import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -69,8 +68,8 @@
public void buildDatasources()
{
- resolvedTables.forEach((datasourceName, datasourceTable) -> {
- DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
+ RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> {
+ DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
@@ -92,14 +91,6 @@
createTableMetadata(tableBuilder.build());
});
- DatasourceFacade catalogMetadata =
- ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
- TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
- catalogMetadata.columnFacades().forEach(
- columnFacade -> {
- tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType());
- }
- );
}
private void createTableMetadata(TableMetadata table)
diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java
index 34011fb..31e0a34 100644
--- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java
+++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java
@@ -31,7 +31,6 @@
import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5;
import org.apache.druid.sql.calcite.CalciteCatalogReplaceTest;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
-import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -68,8 +67,8 @@
public void buildDatasources()
{
- resolvedTables.forEach((datasourceName, datasourceTable) -> {
- DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
+ RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> {
+ DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
@@ -92,7 +91,7 @@
createTableMetadata(tableBuilder.build());
});
DatasourceFacade catalogMetadata =
- ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
+ RESOLVED_TABLES.get("foo").effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
index 0d3045c..03dbbd4 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
@@ -41,7 +41,6 @@
import org.apache.calcite.sql.SqlWindow;
import org.apache.calcite.sql.SqlWith;
import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.IdentifierNamespace;
@@ -54,19 +53,21 @@
import org.apache.calcite.util.Static;
import org.apache.calcite.util.Util;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
-import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.Types;
+import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.sql.calcite.table.RowSignatures;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.ArrayList;
@@ -474,19 +475,11 @@
fields.add(Pair.of(colName, sourceField.getType()));
continue;
}
- SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType());
- RelDataType relType = typeFactory.createSqlType(sqlTypeName);
- if (NullHandling.replaceWithDefault() && !SqlTypeFamily.STRING.contains(relType)) {
- fields.add(Pair.of(
- colName,
- relType
- ));
- } else {
- fields.add(Pair.of(
- colName,
- typeFactory.createTypeWithNullability(relType, true)
- ));
- }
+ RelDataType relType = computeTypeForDefinedCol(definedCol, sourceField);
+ fields.add(Pair.of(
+ colName,
+ typeFactory.createTypeWithNullability(relType, sourceField.getType().isNullable())
+ ));
}
// Perform the SQL-standard check: that the SELECT column can be
@@ -516,8 +509,14 @@
RelDataType targetFieldRelDataType = targetFields.get(i).getType();
ColumnType sourceFieldColumnType = Calcites.getColumnTypeForRelDataType(sourceFielRelDataType);
ColumnType targetFieldColumnType = Calcites.getColumnTypeForRelDataType(targetFieldRelDataType);
-
- if (targetFieldColumnType != ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType)) {
+ try {
+ if (!Objects.equals(
+ targetFieldColumnType,
+ ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType))) {
+ throw new Types.IncompatibleTypeException(targetFieldColumnType, sourceFieldColumnType);
+ }
+ }
+ catch (Types.IncompatibleTypeException e) {
SqlNode node = getNthExpr(query, i, sourceCount);
String targetTypeString;
String sourceTypeString;
@@ -534,12 +533,39 @@
Static.RESOURCE.typeNotAssignable(
targetFields.get(i).getName(), targetTypeString,
sourceFields.get(i).getName(), sourceTypeString));
+
}
}
// the call to base class definition will insert implicit casts / coercions where needed.
super.checkTypeAssignment(sourceScope, table, sourceRowType, targetRowType, query);
}
+ protected RelDataType computeTypeForDefinedCol(
+ final DatasourceFacade.ColumnFacade definedCol,
+ final RelDataTypeField sourceField
+ )
+ {
+ SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType());
+ RelDataType relType;
+ if (sqlTypeName != null) {
+ relType = typeFactory.createSqlType(sqlTypeName);
+ } else {
+ ColumnType columnType = ColumnType.fromString(definedCol.sqlStorageType());
+ if (columnType != null && columnType.getType().equals(ValueType.COMPLEX)) {
+ relType = RowSignatures.makeComplexType(typeFactory, columnType, sourceField.getType().isNullable());
+ } else {
+ relType = RowSignatures.columnTypeToRelDataType(
+ typeFactory,
+ columnType,
+ // this nullability is ignored for complex types for some reason, hence the check for complex above.
+ sourceField.getType().isNullable()
+ );
+ }
+ }
+
+ return relType;
+ }
+
/**
* Locates the n'th expression in an INSERT or UPDATE query.
*
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
index 2d3fb5d..4715096 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
@@ -29,16 +29,44 @@
import org.apache.druid.catalog.model.TableSpec;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.catalog.model.table.DatasourceDefn;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.error.DruidException;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.external.ExternalDataSource;
+import org.apache.druid.sql.calcite.external.Externals;
+import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DruidTable;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.jupiter.api.Test;
-public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest
+public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest
{
- public ImmutableMap<String, DruidTable> resolvedTables = ImmutableMap.of(
+ private final String operationName;
+ private final String dmlPrefixPattern;
+
+ public CalciteCatalogIngestionDmlTest()
+ {
+ this.operationName = getOperationName();
+ this.dmlPrefixPattern = getDmlPrefixPattern();
+ }
+
+ public abstract String getOperationName();
+ public abstract String getDmlPrefixPattern();
+
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+ public static ImmutableMap<String, DatasourceTable> RESOLVED_TABLES = ImmutableMap.of(
"hourDs", new DatasourceTable(
RowSignature.builder().addTimeColumn().build(),
new DatasourceTable.PhysicalDatasourceMetadata(
@@ -152,7 +180,8 @@
new ColumnSpec("dim3", Columns.STRING, null),
new ColumnSpec("cnt", Columns.LONG, null),
new ColumnSpec("m1", Columns.FLOAT, null),
- new ColumnSpec("m2", Columns.DOUBLE, null)
+ new ColumnSpec("m2", Columns.DOUBLE, null),
+ new ColumnSpec("unique_dim1", HyperUniquesAggregatorFactory.TYPE.asTypeString(), null)
)
),
MAPPER
@@ -198,8 +227,6 @@
)
);
- private static final ObjectMapper MAPPER = new DefaultObjectMapper();
-
@Override
public CatalogResolver createCatalogResolver()
{
@@ -210,11 +237,492 @@
final DatasourceTable.PhysicalDatasourceMetadata dsMetadata
)
{
- if (resolvedTables.get(tableName) != null) {
- return resolvedTables.get(tableName);
+ if (RESOLVED_TABLES.get(tableName) != null) {
+ return RESOLVED_TABLES.get(tableName);
}
return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
}
};
}
+
+ /**
+ * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
+ * value from the catalog.
+ */
+ @Test
+ public void testInsertHourGrainPartitonedByFromCatalog()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
+ "SELECT * FROM foo")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ // Scan query lists columns in alphabetical order independent of the
+ // SQL project list or the defined schema.
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
+ .context(queryContextWithGranularity(Granularities.HOUR))
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * If the segment grain is given in the catalog, and also by PARTITIONED BY, then
+ * the query value is used.
+ */
+ @Test
+ public void testInsertHourGrainWithDayPartitonedByFromQuery()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
+ "SELECT * FROM foo\n" +
+ "PARTITIONED BY day")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ // Scan query lists columns in alphabetical order independent of the
+ // SQL project list or the defined schema.
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
+ .context(queryContextWithGranularity(Granularities.DAY))
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
+ * validation error.
+ */
+ @Test
+ public void testInsertNoPartitonedByFromCatalog()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
+ "SELECT * FROM foo")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectValidationError(
+ DruidException.class,
+ StringUtils.format("Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", operationName)
+ )
+ .verify();
+ }
+
+ /**
+ * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then
+ * the query value is used.
+ */
+ @Test
+ public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
+ "SELECT * FROM foo\n" +
+ "PARTITIONED BY day")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ // Scan query lists columns in alphabetical order independent of the
+ // SQL project list or the defined schema.
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
+ .context(queryContextWithGranularity(Granularities.DAY))
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Adding a new column during ingestion that is not defined in a non-sealed table should succeed.
+ */
+ @Test
+ public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .add("m2", ColumnType.DOUBLE)
+ .add("extra2", ColumnType.LONG)
+ .add("extra3", ColumnType.STRING)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " 1 AS cnt,\n" +
+ " c AS m2,\n" +
+ " CAST(d AS BIGINT) AS extra2,\n" +
+ " e AS extra3\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("foo", signature)
+ .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG),
+ expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
+ expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
+ )
+ // Scan query lists columns in alphabetical order independent of the
+ // SQL project list or the defined schema.
+ .columns("b", "e", "v0", "v1", "v2", "v3")
+ .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed.
+ */
+ @Test
+ public void testGroupByInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .add("m2", ColumnType.DOUBLE)
+ .add("extra2", ColumnType.LONG)
+ .add("extra3", ColumnType.STRING)
+ .add("extra4_complex", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " 1 AS cnt,\n" +
+ " c AS m2,\n" +
+ " CAST(d AS BIGINT) AS extra2,\n" +
+ " e AS extra3,\n" +
+ " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "GROUP BY 1,2,3,4,5,6\n" +
+ "PARTITIONED BY ALL TIME"
+ )
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("foo", signature)
+ .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ GroupByQuery.builder()
+ .setDataSource(externalDataSource)
+ .setGranularity(Granularities.ALL)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setVirtualColumns(
+ expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG)
+ )
+ .setDimensions(
+ dimensions(
+ new DefaultDimensionSpec("v0", "d0", ColumnType.LONG),
+ new DefaultDimensionSpec("b", "d1", ColumnType.STRING),
+ new DefaultDimensionSpec("c", "d3", ColumnType.LONG),
+ new DefaultDimensionSpec("d", "d4", ColumnType.LONG),
+ new DefaultDimensionSpec("e", "d5", ColumnType.STRING)
+ )
+ )
+ .setAggregatorSpecs(
+ new CardinalityAggregatorFactory(
+ "a0",
+ null,
+ ImmutableList.of(
+ new DefaultDimensionSpec(
+ "c",
+ "c",
+ ColumnType.LONG
+ )
+ ),
+ false,
+ true
+ )
+ )
+ .setPostAggregatorSpecs(
+ expressionPostAgg("p0", "1", ColumnType.LONG),
+ expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE)
+ )
+ // Scan query lists columns in alphabetical order independent of the
+ // SQL project list or the defined schema.
+ .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Adding a new column during ingestion that is not defined in a sealed table should fail with
+ * proper validation error.
+ */
+ @Test
+ public void testInsertAddNonDefinedColumnIntoSealedCatalogTable()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "fooSealed") + "\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " 1 AS cnt,\n" +
+ " c AS m2,\n" +
+ " CAST(d AS BIGINT) AS extra2\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectValidationError(
+ DruidException.class,
+ "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema"
+ )
+ .verify();
+ }
+
+
+ /**
+ * Inserting into a catalog table with a WITH source succeeds
+ */
+ @Test
+ public void testInsertWithSourceIntoCatalogTable()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .add("m2", ColumnType.DOUBLE)
+ .add("extra2", ColumnType.LONG)
+ .add("extra3", ColumnType.STRING)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+ "WITH \"ext\" AS (\n" +
+ " SELECT *\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ ")\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " 1 AS cnt,\n" +
+ " c AS m2,\n" +
+ " CAST(d AS BIGINT) AS extra2,\n" +
+ " e AS extra3\n" +
+ "FROM \"ext\"\n" +
+ "PARTITIONED BY ALL TIME")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("foo", signature)
+ .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG),
+ expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
+ expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
+ )
+ // Scan query lists columns in alphabetical order independent of the
+ // SQL project list or the defined schema.
+ .columns("b", "e", "v0", "v1", "v2", "v3")
+ .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed.
+ */
+ @Test
+ public void testGroupByInsertWithSourceIntoCatalogTable()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .add("m2", ColumnType.DOUBLE)
+ .add("extra2", ColumnType.LONG)
+ .add("extra3", ColumnType.STRING)
+ .add("extra4_complex", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+ "WITH \"ext\" AS (\n" +
+ " SELECT *\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ ")\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " 1 AS cnt,\n" +
+ " c AS m2,\n" +
+ " CAST(d AS BIGINT) AS extra2,\n" +
+ " e AS extra3,\n" +
+ " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" +
+ "FROM \"ext\"\n" +
+ "GROUP BY 1,2,3,4,5,6\n" +
+ "PARTITIONED BY ALL TIME"
+ )
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("foo", signature)
+ .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ GroupByQuery.builder()
+ .setDataSource(externalDataSource)
+ .setGranularity(Granularities.ALL)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setVirtualColumns(
+ expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG)
+ )
+ .setDimensions(
+ dimensions(
+ new DefaultDimensionSpec("v0", "d0", ColumnType.LONG),
+ new DefaultDimensionSpec("b", "d1", ColumnType.STRING),
+ new DefaultDimensionSpec("c", "d3", ColumnType.LONG),
+ new DefaultDimensionSpec("d", "d4", ColumnType.LONG),
+ new DefaultDimensionSpec("e", "d5", ColumnType.STRING)
+ )
+ )
+ .setAggregatorSpecs(
+ new CardinalityAggregatorFactory(
+ "a0",
+ null,
+ ImmutableList.of(
+ new DefaultDimensionSpec(
+ "c",
+ "c",
+ ColumnType.LONG
+ )
+ ),
+ false,
+ true
+ )
+ )
+ .setPostAggregatorSpecs(
+ expressionPostAgg("p0", "1", ColumnType.LONG),
+ expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE)
+ )
+ // Scan query lists columns in alphabetical order independent of the
+ // SQL project list or the defined schema.
+ .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testInsertIntoExistingStrictNoDefinedSchema()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "strictTableWithNoDefinedSchema") + " SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ DruidException.class,
+ "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema")
+ .verify();
+ }
+
+ @Test
+ public void testInsertIntoExistingWithIncompatibleTypeAssignment()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+ + "SELECT\n"
+ + " __time AS __time,\n"
+ + " ARRAY[dim1] AS dim1\n"
+ + "FROM foo\n"
+ + "PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ DruidException.class,
+ "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])")
+ .verify();
+ }
+
+ @Test
+ public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+ + "SELECT\n"
+ + " __time AS __time,\n"
+ + " ARRAY[dim1] AS unique_dim1\n"
+ + "FROM foo\n"
+ + "PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ DruidException.class,
+ "Cannot assign to target field 'unique_dim1' of type COMPLEX<hyperUnique> from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])")
+ .verify();
+ }
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java
index af45896..fff5ca9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java
@@ -19,302 +19,20 @@
package org.apache.druid.sql.calcite;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.InlineInputSource;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.external.ExternalDataSource;
-import org.apache.druid.sql.calcite.external.Externals;
-import org.apache.druid.sql.calcite.filtration.Filtration;
-import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.junit.jupiter.api.Test;
-
/**
* Test for INSERT DML statements for tables defined in catalog.
*/
public class CalciteCatalogInsertTest extends CalciteCatalogIngestionDmlTest
{
- /**
- * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
- * value from the catalog.
- */
- @Test
- public void testInsertHourGrainPartitonedByFromCatalog()
+ @Override
+ public String getOperationName()
{
- testIngestionQuery()
- .sql("INSERT INTO hourDs\n" +
- "SELECT * FROM foo")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
- .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource("foo")
- .intervals(querySegmentSpec(Filtration.eternity()))
- // Scan query lists columns in alphabetical order independent of the
- // SQL project list or the defined schema.
- .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
- .context(queryContextWithGranularity(Granularities.HOUR))
- .build()
- )
- .verify();
+ return "INSERT";
}
- /**
- * If the segment grain is given in the catalog, and also by PARTITIONED BY, then
- * the query value is used.
- */
- @Test
- public void testInsertHourGrainWithDayPartitonedByFromQuery()
+ @Override
+ public String getDmlPrefixPattern()
{
- testIngestionQuery()
- .sql("INSERT INTO hourDs\n" +
- "SELECT * FROM foo\n" +
- "PARTITIONED BY day")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
- .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource("foo")
- .intervals(querySegmentSpec(Filtration.eternity()))
- // Scan query lists columns in alphabetical order independent of the
- // SQL project list or the defined schema.
- .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
- .context(queryContextWithGranularity(Granularities.DAY))
- .build()
- )
- .verify();
- }
-
- /**
- * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
- * validation error.
- */
- @Test
- public void testInsertNoPartitonedByFromCatalog()
- {
- testIngestionQuery()
- .sql("INSERT INTO noPartitonedBy\n" +
- "SELECT * FROM foo")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectValidationError(
- DruidException.class,
- "Operation [INSERT] requires a PARTITIONED BY to be explicitly defined, but none was found."
- )
- .verify();
- }
-
- /**
- * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then
- * the query value is used.
- */
- @Test
- public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
- {
- testIngestionQuery()
- .sql("INSERT INTO noPartitonedBy\n" +
- "SELECT * FROM foo\n" +
- "PARTITIONED BY day")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
- .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource("foo")
- .intervals(querySegmentSpec(Filtration.eternity()))
- // Scan query lists columns in alphabetical order independent of the
- // SQL project list or the defined schema.
- .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
- .context(queryContextWithGranularity(Granularities.DAY))
- .build()
- )
- .verify();
- }
-
- /**
- * Adding a new column during ingestion that is not defined in a non-sealed table should succeed.
- */
- @Test
- public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
- {
- ExternalDataSource externalDataSource = new ExternalDataSource(
- new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
- new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
- RowSignature.builder()
- .add("a", ColumnType.STRING)
- .add("b", ColumnType.STRING)
- .add("c", ColumnType.LONG)
- .add("d", ColumnType.STRING)
- .add("e", ColumnType.STRING)
- .build()
- );
- final RowSignature signature = RowSignature.builder()
- .add("__time", ColumnType.LONG)
- .add("dim1", ColumnType.STRING)
- .add("cnt", ColumnType.LONG)
- .add("m2", ColumnType.DOUBLE)
- .add("extra2", ColumnType.LONG)
- .add("extra3", ColumnType.STRING)
- .build();
- testIngestionQuery()
- .sql("INSERT INTO foo\n" +
- "SELECT\n" +
- " TIME_PARSE(a) AS __time,\n" +
- " b AS dim1,\n" +
- " 1 AS cnt,\n" +
- " c AS m2,\n" +
- " CAST(d AS BIGINT) AS extra2,\n" +
- " e AS extra3\n" +
- "FROM TABLE(inline(\n" +
- " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
- " format => 'csv'))\n" +
- " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
- "PARTITIONED BY ALL TIME")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("foo", signature)
- .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource(externalDataSource)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .virtualColumns(
- expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
- expressionVirtualColumn("v1", "1", ColumnType.LONG),
- expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
- expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
- )
- // Scan query lists columns in alphabetical order independent of the
- // SQL project list or the defined schema.
- .columns("b", "e", "v0", "v1", "v2", "v3")
- .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
- .build()
- )
- .verify();
- }
-
- /**
- * Adding a new column during ingestion that is not defined in a sealed table should fail with
- * proper validation error.
- */
- @Test
- public void testInsertAddNonDefinedColumnIntoSealedCatalogTable()
- {
- testIngestionQuery()
- .sql("INSERT INTO fooSealed\n" +
- "SELECT\n" +
- " TIME_PARSE(a) AS __time,\n" +
- " b AS dim1,\n" +
- " 1 AS cnt,\n" +
- " c AS m2,\n" +
- " CAST(d AS BIGINT) AS extra2\n" +
- "FROM TABLE(inline(\n" +
- " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
- " format => 'csv'))\n" +
- " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
- "PARTITIONED BY ALL TIME")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectValidationError(
- DruidException.class,
- "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema"
- )
- .verify();
- }
-
-
- /**
- * Inserting into a catalog table with a WITH source succeeds
- */
- @Test
- public void testInsertWithSourceIntoCatalogTable()
- {
- ExternalDataSource externalDataSource = new ExternalDataSource(
- new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
- new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
- RowSignature.builder()
- .add("a", ColumnType.STRING)
- .add("b", ColumnType.STRING)
- .add("c", ColumnType.LONG)
- .add("d", ColumnType.STRING)
- .add("e", ColumnType.STRING)
- .build()
- );
- final RowSignature signature = RowSignature.builder()
- .add("__time", ColumnType.LONG)
- .add("dim1", ColumnType.STRING)
- .add("cnt", ColumnType.LONG)
- .add("m2", ColumnType.DOUBLE)
- .add("extra2", ColumnType.LONG)
- .add("extra3", ColumnType.STRING)
- .build();
- testIngestionQuery()
- .sql("INSERT INTO \"foo\"\n" +
- "WITH \"ext\" AS (\n" +
- " SELECT *\n" +
- "FROM TABLE(inline(\n" +
- " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
- " format => 'csv'))\n" +
- " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
- ")\n" +
- "SELECT\n" +
- " TIME_PARSE(a) AS __time,\n" +
- " b AS dim1,\n" +
- " 1 AS cnt,\n" +
- " c AS m2,\n" +
- " CAST(d AS BIGINT) AS extra2,\n" +
- " e AS extra3\n" +
- "FROM \"ext\"\n" +
- "PARTITIONED BY ALL TIME")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("foo", signature)
- .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource(externalDataSource)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .virtualColumns(
- expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
- expressionVirtualColumn("v1", "1", ColumnType.LONG),
- expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
- expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
- )
- // Scan query lists columns in alphabetical order independent of the
- // SQL project list or the defined schema.
- .columns("b", "e", "v0", "v1", "v2", "v3")
- .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
- .build()
- )
- .verify();
- }
-
- @Test
- public void testInsertIntoExistingStrictNoDefinedSchema()
- {
- testIngestionQuery()
- .sql("INSERT INTO strictTableWithNoDefinedSchema SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME")
- .expectValidationError(
- DruidException.class,
- "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema")
- .verify();
- }
-
- @Test
- public void testInsertIntoExistingWithIncompatibleTypeAssignment()
- {
- testIngestionQuery()
- .sql("INSERT INTO foo\n"
- + "SELECT\n"
- + " __time AS __time,\n"
- + " ARRAY[dim1] AS dim1\n"
- + "FROM foo\n"
- + "PARTITIONED BY ALL TIME")
- .expectValidationError(
- DruidException.class,
- "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])")
- .verify();
+ return "INSERT INTO \"%s\"";
}
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java
index f4c6a90..aad2269 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java
@@ -19,298 +19,20 @@
package org.apache.druid.sql.calcite;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.InlineInputSource;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.external.ExternalDataSource;
-import org.apache.druid.sql.calcite.external.Externals;
-import org.apache.druid.sql.calcite.filtration.Filtration;
-import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.junit.jupiter.api.Test;
-
/**
* Test for REPLACE DML statements for tables defined in catalog.
*/
public class CalciteCatalogReplaceTest extends CalciteCatalogIngestionDmlTest
{
- /**
- * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
- * value from the catalog.
- */
- @Test
- public void testReplaceHourGrainPartitonedByFromCatalog()
+ @Override
+ public String getOperationName()
{
- testIngestionQuery()
- .sql("REPLACE INTO hourDs OVERWRITE ALL\n" +
- "SELECT * FROM foo")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
- .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource("foo")
- .intervals(querySegmentSpec(Filtration.eternity()))
- .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
- .context(queryContextWithGranularity(Granularities.HOUR))
- .build()
- )
- .verify();
+ return "REPLACE";
}
- /**
- * If the segment grain is given in the catalog, and also by PARTITIONED BY, then
- * the query value is used.
- */
- @Test
- public void testReplaceHourGrainWithDayPartitonedByFromQuery()
+ @Override
+ public String getDmlPrefixPattern()
{
- testIngestionQuery()
- .sql("REPLACE INTO hourDs OVERWRITE ALL\n" +
- "SELECT *FROM foo\n" +
- "PARTITIONED BY day")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
- .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource("foo")
- .intervals(querySegmentSpec(Filtration.eternity()))
- .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
- .context(queryContextWithGranularity(Granularities.DAY))
- .build()
- )
- .verify();
- }
-
- /**
- * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
- * validation error.
- */
- @Test
- public void testInsertNoPartitonedByFromCatalog()
- {
- testIngestionQuery()
- .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" +
- "SELECT * FROM foo")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectValidationError(
- DruidException.class,
- "Operation [REPLACE] requires a PARTITIONED BY to be explicitly defined, but none was found."
- )
- .verify();
- }
-
- /**
- * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then
- * the query value is used.
- */
- @Test
- public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
- {
- testIngestionQuery()
- .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" +
- "SELECT * FROM foo\n" +
- "PARTITIONED BY day")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
- .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource("foo")
- .intervals(querySegmentSpec(Filtration.eternity()))
- .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
- .context(queryContextWithGranularity(Granularities.DAY))
- .build()
- )
- .verify();
- }
-
- /**
- * Adding a new column during ingestion that is not defined in a non-sealed table should succeed.
- */
- @Test
- public void testReplaceAddNonDefinedColumnIntoNonSealedCatalogTable()
- {
- ExternalDataSource externalDataSource = new ExternalDataSource(
- new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
- new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
- RowSignature.builder()
- .add("a", ColumnType.STRING)
- .add("b", ColumnType.STRING)
- .add("c", ColumnType.LONG)
- .add("d", ColumnType.STRING)
- .add("e", ColumnType.STRING)
- .build()
- );
- final RowSignature signature = RowSignature.builder()
- .add("__time", ColumnType.LONG)
- .add("dim1", ColumnType.STRING)
- .add("cnt", ColumnType.LONG)
- .add("m2", ColumnType.DOUBLE)
- .add("extra2", ColumnType.LONG)
- .add("extra3", ColumnType.STRING)
- .build();
- testIngestionQuery()
- .sql("REPLACE INTO foo OVERWRITE ALL\n" +
- "SELECT\n" +
- " TIME_PARSE(a) AS __time,\n" +
- " b AS dim1,\n" +
- " 1 AS cnt,\n" +
- " c AS m2,\n" +
- " CAST(d AS BIGINT) AS extra2,\n" +
- " e AS extra3\n" +
- "FROM TABLE(inline(\n" +
- " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
- " format => 'csv'))\n" +
- " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
- "PARTITIONED BY ALL TIME")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("foo", signature)
- .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource(externalDataSource)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .virtualColumns(
- expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
- expressionVirtualColumn("v1", "1", ColumnType.LONG),
- expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
- expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
- )
- // Scan query lists columns in alphabetical order independent of the
- // SQL project list or the defined schema. Here we just check that the
- // set of columns is correct, but not their order.
- .columns("b", "e", "v0", "v1", "v2", "v3")
- .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
- .build()
- )
- .verify();
- }
-
- /**
- * Adding a new column during ingestion that is not defined in a sealed table should fail with
- * proper validation error.
- */
- @Test
- public void testReplaceAddNonDefinedColumnIntoSealedCatalogTable()
- {
- testIngestionQuery()
- .sql("REPLACE INTO fooSealed OVERWRITE ALL\n" +
- "SELECT\n" +
- " TIME_PARSE(a) AS __time,\n" +
- " b AS dim1,\n" +
- " 1 AS cnt,\n" +
- " c AS m2,\n" +
- " CAST(d AS BIGINT) AS extra2\n" +
- "FROM TABLE(inline(\n" +
- " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
- " format => 'csv'))\n" +
- " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
- "PARTITIONED BY ALL TIME")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectValidationError(
- DruidException.class,
- "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema"
- )
- .verify();
- }
-
-
- /**
- * Replacing into a catalog table with a WITH source succeeds
- */
- @Test
- public void testReplaceWithSourceIntoCatalogTable()
- {
- ExternalDataSource externalDataSource = new ExternalDataSource(
- new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
- new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
- RowSignature.builder()
- .add("a", ColumnType.STRING)
- .add("b", ColumnType.STRING)
- .add("c", ColumnType.LONG)
- .add("d", ColumnType.STRING)
- .add("e", ColumnType.STRING)
- .build()
- );
- final RowSignature signature = RowSignature.builder()
- .add("__time", ColumnType.LONG)
- .add("dim1", ColumnType.STRING)
- .add("cnt", ColumnType.LONG)
- .add("m2", ColumnType.DOUBLE)
- .add("extra2", ColumnType.LONG)
- .add("extra3", ColumnType.STRING)
- .build();
- testIngestionQuery()
- .sql("REPLACE INTO \"foo\" OVERWRITE ALL\n" +
- "WITH \"ext\" AS (\n" +
- " SELECT *\n" +
- "FROM TABLE(inline(\n" +
- " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
- " format => 'csv'))\n" +
- " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
- ")\n" +
- "SELECT\n" +
- " TIME_PARSE(a) AS __time,\n" +
- " b AS dim1,\n" +
- " 1 AS cnt,\n" +
- " c AS m2,\n" +
- " CAST(d AS BIGINT) AS extra2,\n" +
- " e AS extra3\n" +
- "FROM \"ext\"\n" +
- "PARTITIONED BY ALL TIME")
- .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
- .expectTarget("foo", signature)
- .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
- .expectQuery(
- newScanQueryBuilder()
- .dataSource(externalDataSource)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .virtualColumns(
- expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
- expressionVirtualColumn("v1", "1", ColumnType.LONG),
- expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
- expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
- )
- // Scan query lists columns in alphabetical order independent of the
- // SQL project list or the defined schema. Here we just check that the
- // set of columns is correct, but not their order.
- .columns("b", "e", "v0", "v1", "v2", "v3")
- .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
- .build()
- )
- .verify();
- }
-
- @Test
- public void testReplaceIntoExistingStrictNoDefinedSchema()
- {
- testIngestionQuery()
- .sql("REPLACE INTO strictTableWithNoDefinedSchema OVERWRITE ALL SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME")
- .expectValidationError(
- DruidException.class,
- "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema")
- .verify();
- }
-
- @Test
- public void testReplaceIntoExistingWithIncompatibleTypeAssignment()
- {
- testIngestionQuery()
- .sql("REPLACE INTO foo OVERWRITE ALL\n"
- + "SELECT\n"
- + " __time AS __time,\n"
- + " ARRAY[dim1] AS dim1\n"
- + "FROM foo\n"
- + "PARTITIONED BY ALL TIME")
- .expectValidationError(
- DruidException.class,
- "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])")
- .verify();
+ return "REPLACE INTO \"%s\" OVERWRITE ALL";
}
}