blob: 4715096e6bb6862384365965acd218e54b4bf3c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.model.ColumnSpec;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableDefn;
import org.apache.druid.catalog.model.TableSpec;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.error.DruidException;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.Externals;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.jupiter.api.Test;
public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest
{
private final String operationName;
private final String dmlPrefixPattern;
public CalciteCatalogIngestionDmlTest()
{
this.operationName = getOperationName();
this.dmlPrefixPattern = getDmlPrefixPattern();
}
public abstract String getOperationName();
public abstract String getDmlPrefixPattern();
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
public static ImmutableMap<String, DatasourceTable> RESOLVED_TABLES = ImmutableMap.of(
"hourDs", new DatasourceTable(
RowSignature.builder().addTimeColumn().build(),
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("hourDs"),
RowSignature.builder().addTimeColumn().build(),
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"foo",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "PT1H"),
ImmutableList.of(
new ColumnSpec("__time", Columns.TIME_COLUMN, null)
)
),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(
RowSignature.builder()
.addTimeColumn()
.build()),
false
)
),
"noPartitonedBy", new DatasourceTable(
RowSignature.builder().addTimeColumn().build(),
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("hourDs"),
RowSignature.builder().addTimeColumn().build(),
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"foo",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(),
ImmutableList.of(
new ColumnSpec("__time", Columns.TIME_COLUMN, null)
)
),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(
RowSignature.builder()
.addTimeColumn()
.build()),
false
)
),
"strictTableWithNoDefinedSchema", new DatasourceTable(
RowSignature.builder().build(),
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("strictTableWithNoDefinedSchema"),
RowSignature.builder().build(),
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"strictTableWithNoDefinedSchema",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(DatasourceDefn.TABLE_TYPE, ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true), null),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder().build()),
false
)
),
"foo", new DatasourceTable(
FOO_TABLE_SIGNATURE,
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("foo"),
FOO_TABLE_SIGNATURE,
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"foo",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(),
ImmutableList.of(
new ColumnSpec("__time", Columns.TIME_COLUMN, null),
new ColumnSpec("dim1", Columns.STRING, null),
new ColumnSpec("dim2", Columns.STRING, null),
new ColumnSpec("dim3", Columns.STRING, null),
new ColumnSpec("cnt", Columns.LONG, null),
new ColumnSpec("m1", Columns.FLOAT, null),
new ColumnSpec("m2", Columns.DOUBLE, null),
new ColumnSpec("unique_dim1", HyperUniquesAggregatorFactory.TYPE.asTypeString(), null)
)
),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(FOO_TABLE_SIGNATURE),
false
)
),
"fooSealed", new DatasourceTable(
FOO_TABLE_SIGNATURE,
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("foo"),
FOO_TABLE_SIGNATURE,
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"foo",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true),
ImmutableList.of(
new ColumnSpec("__time", Columns.TIME_COLUMN, null),
new ColumnSpec("dim1", Columns.STRING, null),
new ColumnSpec("dim2", Columns.STRING, null),
new ColumnSpec("dim3", Columns.STRING, null),
new ColumnSpec("cnt", Columns.LONG, null),
new ColumnSpec("m1", Columns.FLOAT, null),
new ColumnSpec("m2", Columns.DOUBLE, null)
)
),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(FOO_TABLE_SIGNATURE),
false
)
)
);
@Override
public CatalogResolver createCatalogResolver()
{
return new CatalogResolver.NullCatalogResolver() {
@Override
public DruidTable resolveDatasource(
final String tableName,
final DatasourceTable.PhysicalDatasourceMetadata dsMetadata
)
{
if (RESOLVED_TABLES.get(tableName) != null) {
return RESOLVED_TABLES.get(tableName);
}
return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
}
};
}
/**
* If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
* value from the catalog.
*/
@Test
public void testInsertHourGrainPartitonedByFromCatalog()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
"SELECT * FROM foo")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(queryContextWithGranularity(Granularities.HOUR))
.build()
)
.verify();
}
/**
* If the segment grain is given in the catalog, and also by PARTITIONED BY, then
* the query value is used.
*/
@Test
public void testInsertHourGrainWithDayPartitonedByFromQuery()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
"SELECT * FROM foo\n" +
"PARTITIONED BY day")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
/**
* If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
* validation error.
*/
@Test
public void testInsertNoPartitonedByFromCatalog()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
"SELECT * FROM foo")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectValidationError(
DruidException.class,
StringUtils.format("Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", operationName)
)
.verify();
}
/**
* If the segment grain is absent in the catalog, but given by PARTITIONED BY, then
* the query value is used.
*/
@Test
public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
"SELECT * FROM foo\n" +
"PARTITIONED BY day")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
/**
* Adding a new column during ingestion that is not defined in a non-sealed table should succeed.
*/
@Test
public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.build();
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2,\n" +
" e AS extra3\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
expressionVirtualColumn("v1", "1", ColumnType.LONG),
expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("b", "e", "v0", "v1", "v2", "v3")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
/**
* Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed.
*/
@Test
public void testGroupByInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.add("extra4_complex", ColumnType.LONG)
.build();
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2,\n" +
" e AS extra3,\n" +
" APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"GROUP BY 1,2,3,4,5,6\n" +
"PARTITIONED BY ALL TIME"
)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
.expectQuery(
GroupByQuery.builder()
.setDataSource(externalDataSource)
.setGranularity(Granularities.ALL)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setVirtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG)
)
.setDimensions(
dimensions(
new DefaultDimensionSpec("v0", "d0", ColumnType.LONG),
new DefaultDimensionSpec("b", "d1", ColumnType.STRING),
new DefaultDimensionSpec("c", "d3", ColumnType.LONG),
new DefaultDimensionSpec("d", "d4", ColumnType.LONG),
new DefaultDimensionSpec("e", "d5", ColumnType.STRING)
)
)
.setAggregatorSpecs(
new CardinalityAggregatorFactory(
"a0",
null,
ImmutableList.of(
new DefaultDimensionSpec(
"c",
"c",
ColumnType.LONG
)
),
false,
true
)
)
.setPostAggregatorSpecs(
expressionPostAgg("p0", "1", ColumnType.LONG),
expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
/**
* Adding a new column during ingestion that is not defined in a sealed table should fail with
* proper validation error.
*/
@Test
public void testInsertAddNonDefinedColumnIntoSealedCatalogTable()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "fooSealed") + "\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectValidationError(
DruidException.class,
"Column [extra2] is not defined in the target table [druid.fooSealed] strict schema"
)
.verify();
}
/**
* Inserting into a catalog table with a WITH source succeeds
*/
@Test
public void testInsertWithSourceIntoCatalogTable()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.build();
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
"WITH \"ext\" AS (\n" +
" SELECT *\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
")\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2,\n" +
" e AS extra3\n" +
"FROM \"ext\"\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
expressionVirtualColumn("v1", "1", ColumnType.LONG),
expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("b", "e", "v0", "v1", "v2", "v3")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
/**
* Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed.
*/
@Test
public void testGroupByInsertWithSourceIntoCatalogTable()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.add("extra4_complex", ColumnType.LONG)
.build();
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
"WITH \"ext\" AS (\n" +
" SELECT *\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
")\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2,\n" +
" e AS extra3,\n" +
" APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" +
"FROM \"ext\"\n" +
"GROUP BY 1,2,3,4,5,6\n" +
"PARTITIONED BY ALL TIME"
)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
.expectQuery(
GroupByQuery.builder()
.setDataSource(externalDataSource)
.setGranularity(Granularities.ALL)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setVirtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG)
)
.setDimensions(
dimensions(
new DefaultDimensionSpec("v0", "d0", ColumnType.LONG),
new DefaultDimensionSpec("b", "d1", ColumnType.STRING),
new DefaultDimensionSpec("c", "d3", ColumnType.LONG),
new DefaultDimensionSpec("d", "d4", ColumnType.LONG),
new DefaultDimensionSpec("e", "d5", ColumnType.STRING)
)
)
.setAggregatorSpecs(
new CardinalityAggregatorFactory(
"a0",
null,
ImmutableList.of(
new DefaultDimensionSpec(
"c",
"c",
ColumnType.LONG
)
),
false,
true
)
)
.setPostAggregatorSpecs(
expressionPostAgg("p0", "1", ColumnType.LONG),
expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
@Test
public void testInsertIntoExistingStrictNoDefinedSchema()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "strictTableWithNoDefinedSchema") + " SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
DruidException.class,
"Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema")
.verify();
}
@Test
public void testInsertIntoExistingWithIncompatibleTypeAssignment()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+ "SELECT\n"
+ " __time AS __time,\n"
+ " ARRAY[dim1] AS dim1\n"
+ "FROM foo\n"
+ "PARTITIONED BY ALL TIME")
.expectValidationError(
DruidException.class,
"Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])")
.verify();
}
@Test
public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+ "SELECT\n"
+ " __time AS __time,\n"
+ " ARRAY[dim1] AS unique_dim1\n"
+ "FROM foo\n"
+ "PARTITIONED BY ALL TIME")
.expectValidationError(
DruidException.class,
"Cannot assign to target field 'unique_dim1' of type COMPLEX<hyperUnique> from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])")
.verify();
}
}