blob: 35e642fbf93273c76ffd5c0babe209131c85f443 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.timeline.SegmentId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Tests designed to exercise changing column types, adding columns, removing columns, etc.
*/
@RunWith(JUnitParamsRunner.class)
public class SchemaEvolutionTest
{
private static final String DATA_SOURCE = "foo";
private static final String TIMESTAMP_COLUMN = "t";
private static final double THIRTY_ONE_POINT_ONE = 31.1d;
public Object[] doVectorize()
{
return Lists.newArrayList(true, false).toArray();
}
public static List<Result<TimeseriesResultValue>> timeseriesResult(final Map<String, ?> map)
{
return ImmutableList.of(new Result<>(DateTimes.of("2000"), new TimeseriesResultValue((Map<String, Object>) map)));
}
public static List<InputRow> inputRowsWithDimensions(final List<String> dimensions)
{
final MapInputRowParser parser = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec(TIMESTAMP_COLUMN, "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(dimensions),
dimensions.isEmpty() ? ImmutableList.of("t", "c1", "c2") : null,
null
)
)
);
return ImmutableList.of(
parser.parseBatch(ImmutableMap.of("t", "2000-01-01", "c1", "9", "c2", ImmutableList.of("a"))).get(0),
parser.parseBatch(ImmutableMap.of("t", "2000-01-02", "c1", "10.1", "c2", ImmutableList.of())).get(0),
parser.parseBatch(ImmutableMap.of("t", "2000-01-03", "c1", "2", "c2", ImmutableList.of(""))).get(0),
parser.parseBatch(ImmutableMap.of("t", "2001-01-01", "c1", "1", "c2", ImmutableList.of("a", "c"))).get(0),
parser.parseBatch(ImmutableMap.of("t", "2001-01-02", "c1", "4", "c2", ImmutableList.of("abc"))).get(0),
parser.parseBatch(ImmutableMap.of("t", "2001-01-03", "c1", "5")).get(0)
);
}
public static <T, QueryType extends Query<T>> List<T> runQuery(
final QueryType query,
final QueryRunnerFactory<T, QueryType> factory,
final List<QueryableIndex> indexes
)
{
final Sequence<T> results = new FinalizeResultsQueryRunner<>(
factory.getToolchest().mergeResults(
factory.mergeRunners(
Execs.directExecutor(),
FunctionalIterable
.create(indexes)
.transform(
index -> factory.createRunner(new QueryableIndexSegment(index, SegmentId.dummy("xxx")))
)
)
),
(QueryToolChest<T, Query<T>>) factory.getToolchest()
).run(QueryPlus.wrap(query));
return results.toList();
}
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
// Index1: c1 is a string, c2 nonexistent, "uniques" nonexistent
private QueryableIndex index1 = null;
// Index2: c1 is a long, c2 is a string, "uniques" is uniques on c2
private QueryableIndex index2 = null;
// Index3: c1 is a float, c2 is a string, "uniques" is uniques on c2
private QueryableIndex index3 = null;
// Index4: c1 is nonexistent, c2 is uniques on c2
private QueryableIndex index4 = null;
@Before
public void setUp() throws IOException
{
NullHandling.initializeForTests();
// Index1: c1 is a string, c2 nonexistent, "uniques" nonexistent
index1 = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(new CountAggregatorFactory("cnt"))
.withRollup(false)
.build()
)
.rows(inputRowsWithDimensions(ImmutableList.of("c1")))
.buildMMappedIndex();
// Index2: c1 is a long, c2 is a string, "uniques" is uniques on c2, "longmin" is min on c1
index2 = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new LongSumAggregatorFactory("c1", "c1"),
new HyperUniquesAggregatorFactory("uniques", "c2"),
new LongMinAggregatorFactory("longmin", "c1")
)
.withRollup(false)
.build()
)
.rows(inputRowsWithDimensions(ImmutableList.of("c2")))
.buildMMappedIndex();
// Index3: c1 is a float, c2 is a string, "uniques" is uniques on c2
index3 = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("c1", "c1"),
new HyperUniquesAggregatorFactory("uniques", "c2")
)
.withRollup(false)
.build()
)
.rows(inputRowsWithDimensions(ImmutableList.of("c2")))
.buildMMappedIndex();
// Index4: c1 is nonexistent, c2 is uniques on c2
index4 = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(new HyperUniquesAggregatorFactory("c2", "c2"))
.withRollup(false)
.build()
)
.rows(inputRowsWithDimensions(ImmutableList.of()))
.buildMMappedIndex();
if (index4.getAvailableDimensions().size() != 0) {
// Just double-checking that the exclusions are working properly
throw new ISE("Expected no dimensions in index4");
}
}
@After
public void tearDown() throws IOException
{
Closeables.close(index1, false);
Closeables.close(index2, false);
Closeables.close(index3, false);
Closeables.close(index4, false);
}
@Test
@Parameters(method = "doVectorize")
public void testHyperUniqueEvolutionTimeseries(boolean doVectorize)
{
final TimeseriesQueryRunnerFactory factory = QueryRunnerTestHelper.newTimeseriesQueryRunnerFactory();
final TimeseriesQuery query = Druids
.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals("1000/3000")
.aggregators(
ImmutableList.of(
new HyperUniquesAggregatorFactory("uniques", "uniques")
)
)
.context(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, doVectorize))
.build();
// index1 has no "uniques" column
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("uniques", 0d)),
runQuery(query, factory, ImmutableList.of(index1))
);
// index1 (no uniques) + index2 and index3 (yes uniques); we should be able to combine
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("uniques", 4.003911343725148d)),
runQuery(query, factory, ImmutableList.of(index1, index2, index3))
);
}
@Test
@Parameters(method = "doVectorize")
public void testNumericEvolutionTimeseriesAggregation(boolean doVectorize)
{
final TimeseriesQueryRunnerFactory factory = QueryRunnerTestHelper.newTimeseriesQueryRunnerFactory();
// "c1" changes from string(1) -> long(2) -> float(3) -> nonexistent(4)
// test behavior of longSum/doubleSum with/without expressions
final TimeseriesQuery query = Druids
.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals("1000/3000")
.aggregators(
ImmutableList.of(
new LongSumAggregatorFactory("a", "c1"),
new DoubleSumAggregatorFactory("b", "c1"),
new LongSumAggregatorFactory("c", null, "c1 * 1", TestExprMacroTable.INSTANCE),
new DoubleSumAggregatorFactory("d", null, "c1 * 1", TestExprMacroTable.INSTANCE)
)
)
.context(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, doVectorize))
.build();
// Only string(1)
// Note: Expressions implicitly cast strings to numbers, leading to the a/b vs c/d difference.
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("a", 31L, "b", THIRTY_ONE_POINT_ONE, "c", 31L, "d", THIRTY_ONE_POINT_ONE)),
runQuery(query, factory, ImmutableList.of(index1))
);
// Only long(2)
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("a", 31L, "b", 31.0, "c", 31L, "d", 31.0)),
runQuery(query, factory, ImmutableList.of(index2))
);
// Only float(3)
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("a", 31L, "b", THIRTY_ONE_POINT_ONE, "c", 31L, "d", THIRTY_ONE_POINT_ONE)),
runQuery(query, factory, ImmutableList.of(index3))
);
// Only nonexistent(4)
Map<String, Object> result = new HashMap<>();
result.put("a", NullHandling.defaultLongValue());
result.put("b", NullHandling.defaultDoubleValue());
result.put("c", NullHandling.defaultLongValue());
result.put("d", NullHandling.defaultDoubleValue());
Assert.assertEquals(
timeseriesResult(result),
runQuery(query, factory, ImmutableList.of(index4))
);
// string(1) + long(2) + float(3) + nonexistent(4)
// Note: Expressions implicitly cast strings to numbers, leading to the a/b vs c/d difference.
Assert.assertEquals(
timeseriesResult(ImmutableMap.of(
"a", 31L * 3,
"b", THIRTY_ONE_POINT_ONE * 2 + 31,
"c", 31L * 3,
"d", THIRTY_ONE_POINT_ONE * 2 + 31
)),
runQuery(query, factory, ImmutableList.of(index1, index2, index3, index4))
);
// long(2) + float(3) + nonexistent(4)
Assert.assertEquals(
timeseriesResult(ImmutableMap.of(
"a", 31L * 2,
"b", THIRTY_ONE_POINT_ONE + 31,
"c", 31L * 2,
"d", THIRTY_ONE_POINT_ONE + 31
)),
runQuery(query, factory, ImmutableList.of(index2, index3, index4))
);
}
@Test
@Parameters(method = "doVectorize")
public void testNumericEvolutionFiltering(boolean doVectorize)
{
final TimeseriesQueryRunnerFactory factory = QueryRunnerTestHelper.newTimeseriesQueryRunnerFactory();
// "c1" changes from string(1) -> long(2) -> float(3) -> nonexistent(4)
// test behavior of filtering
final TimeseriesQuery query = Druids
.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals("1000/3000")
.filters(new BoundDimFilter("c1", "9", "11", false, false, null, null, StringComparators.NUMERIC))
.aggregators(
ImmutableList.of(
new LongSumAggregatorFactory("a", "c1"),
new DoubleSumAggregatorFactory("b", "c1"),
new FloatSumAggregatorFactory("d", "c1"),
new LongMinAggregatorFactory("e", "c1"),
new CountAggregatorFactory("c")
)
)
.context(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, doVectorize))
.build();
// Only string(1) -- which we can filter but not aggregate
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f, "e", 9L)),
runQuery(query, factory, ImmutableList.of(index1))
);
// Only long(2) -- which we can filter and aggregate
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.0, "c", 2L, "d", 19.0f, "e", 9L)),
runQuery(query, factory, ImmutableList.of(index2))
);
// Only float(3) -- which we can't filter, but can aggregate
Assert.assertEquals(
timeseriesResult(ImmutableMap.of("a", 19L, "b", 19.1, "c", 2L, "d", 19.1f, "e", 9L)),
runQuery(query, factory, ImmutableList.of(index3))
);
// Only nonexistent(4)
Assert.assertEquals(
timeseriesResult(TestHelper.createExpectedMap(
"a",
NullHandling.defaultLongValue(),
"b",
NullHandling.defaultDoubleValue(),
"c",
0L,
"d",
NullHandling.defaultFloatValue(),
"e",
NullHandling.sqlCompatible() ? null : Long.MAX_VALUE
)),
runQuery(query, factory, ImmutableList.of(index4))
);
// string(1) + long(2) + float(3) + nonexistent(4)
Assert.assertEquals(
timeseriesResult(ImmutableMap.of(
"a", 57L,
"b", 57.2,
"c", 6L,
"d", 57.20000076293945,
"e", 9L
)),
runQuery(query, factory, ImmutableList.of(index1, index2, index3, index4))
);
}
}