blob: f1173aab03c59ddd49fe102b19dfa78675c4c641 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.movingaverage;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.java.util.common.guava.Accumulators;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.Result;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.movingaverage.test.TestConfig;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
/**
* Base class for implementing MovingAverageQuery tests
*/
@RunWith(Parameterized.class)
public class MovingAverageQueryTest extends InitializedNullHandlingTest
{
private final ObjectMapper jsonMapper;
private final QueryToolChestWarehouse warehouse;
private final RetryQueryRunnerConfig retryConfig;
private final ServerConfig serverConfig;
private final List<ResultRow> groupByResults = new ArrayList<>();
private final List<Result<TimeseriesResultValue>> timeseriesResults = new ArrayList<>();
private final TestConfig config;
@Parameters(name = "{0}")
public static Iterable<String[]> data() throws IOException
{
BufferedReader testReader = new BufferedReader(
new InputStreamReader(MovingAverageQueryTest.class.getResourceAsStream("/queryTests"), StandardCharsets.UTF_8));
List<String[]> tests = new ArrayList<>();
for (String line = testReader.readLine(); line != null; line = testReader.readLine()) {
tests.add(new String[]{line});
}
return tests;
}
public MovingAverageQueryTest(String yamlFile) throws IOException
{
List<Module> modules = getRequiredModules();
modules.add(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("queryTest");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(1);
binder.bind(QuerySegmentWalker.class).toProvider(Providers.of(new QuerySegmentWalker()
{
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
return (queryPlus, responseContext) -> {
if (query instanceof GroupByQuery) {
return (Sequence<T>) Sequences.simple(groupByResults);
} else if (query instanceof TimeseriesQuery) {
return (Sequence<T>) Sequences.simple(timeseriesResults);
}
throw new UnsupportedOperationException("unexpected query type " + query.getType());
};
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
return getQueryRunnerForIntervals(query, null);
}
}));
}
);
System.setProperty("druid.generic.useDefaultValueForNull", "true");
System.setProperty("druid.processing.buffer.sizeBytes", "655360");
Injector baseInjector = GuiceInjectors.makeStartupInjector();
Injector injector = Initialization.makeInjectorWithModules(baseInjector, modules);
jsonMapper = injector.getInstance(ObjectMapper.class);
warehouse = injector.getInstance(QueryToolChestWarehouse.class);
retryConfig = injector.getInstance(RetryQueryRunnerConfig.class);
serverConfig = injector.getInstance(ServerConfig.class);
InputStream is = getClass().getResourceAsStream("/queryTests/" + yamlFile);
ObjectMapper reader = new ObjectMapper(new YAMLFactory());
config = reader.readValue(is, TestConfig.class);
}
/**
* Returns the JSON query that should be used in the test.
*
* @return The JSON query
*/
private String getQueryString()
{
return config.query.toString();
}
/**
* Returns the JSON result that should be expected from the query.
*
* @return The JSON result
*/
private String getExpectedResultString()
{
return config.expectedOutput.toString();
}
/**
* Returns the JSON result that the nested groupby query should produce.
* Either this method or {@link #getTimeseriesResultJson()} must be defined
* by the subclass.
*
* @return The JSON result from the groupby query
*/
private String getGroupByResultJson()
{
ArrayNode node = config.intermediateResults.get("groupBy");
return node == null ? null : node.toString();
}
/**
* Returns the JSON result that the nested timeseries query should produce.
* Either this method or {@link #getGroupByResultJson()} must be defined
* by the subclass.
*
* @return The JSON result from the timeseries query
*/
private String getTimeseriesResultJson()
{
ArrayNode node = config.intermediateResults.get("timeseries");
return node == null ? null : node.toString();
}
/**
* Returns the expected query type.
*
* @return The Query type
*/
private Class<?> getExpectedQueryType()
{
return MovingAverageQuery.class;
}
private TypeReference<List<MapBasedRow>> getExpectedResultType()
{
return new TypeReference<List<MapBasedRow>>()
{
};
}
/**
* Returns a list of any additional Druid Modules necessary to run the test.
*/
private List<Module> getRequiredModules()
{
List<Module> list = new ArrayList<>();
list.add(new QueryRunnerFactoryModule());
list.add(new QueryableModule());
list.add(new DruidProcessingModule());
return list;
}
/**
* Set up any needed mocks to stub out backend query behavior.
*/
private void defineMocks() throws IOException
{
groupByResults.clear();
timeseriesResults.clear();
if (getGroupByResultJson() != null) {
groupByResults.addAll(jsonMapper.readValue(getGroupByResultJson(), new TypeReference<List<ResultRow>>() {}));
}
if (getTimeseriesResultJson() != null) {
timeseriesResults.addAll(
jsonMapper.readValue(
getTimeseriesResultJson(),
new TypeReference<List<Result<TimeseriesResultValue>>>() {}
)
);
}
}
/**
* converts Int to Long, Float to Double in the actual and expected result
*/
private List<MapBasedRow> consistentTypeCasting(List<MapBasedRow> result)
{
List<MapBasedRow> newResult = new ArrayList<>();
for (MapBasedRow row : result) {
final Map<String, Object> event = Maps.newLinkedHashMap((row).getEvent());
event.forEach((key, value) -> {
if (value instanceof Integer) {
event.put(key, ((Integer) value).longValue());
}
if (value instanceof Float) {
event.put(key, ((Float) value).doubleValue());
}
});
newResult.add(new MapBasedRow(row.getTimestamp(), event));
}
return newResult;
}
/**
* Validate that the specified query behaves correctly.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public void testQuery() throws IOException
{
Query<?> query = jsonMapper.readValue(getQueryString(), Query.class);
Assert.assertThat(query, IsInstanceOf.instanceOf(getExpectedQueryType()));
List<MapBasedRow> expectedResults = jsonMapper.readValue(getExpectedResultString(), getExpectedResultType());
Assert.assertNotNull(expectedResults);
Assert.assertThat(expectedResults, IsInstanceOf.instanceOf(List.class));
CachingClusteredClient baseClient = new CachingClusteredClient(
warehouse,
new TimelineServerView()
{
@Override
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis)
{
return Optional.empty();
}
@Override
public List<ImmutableDruidServer> getDruidServers()
{
return null;
}
@Override
public <T> QueryRunner<T> getQueryRunner(DruidServer server)
{
return null;
}
@Override
public void registerTimelineCallback(Executor exec, TimelineCallback callback)
{
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
}
@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
}
},
MapCache.create(100000),
jsonMapper,
new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1),
new CacheConfig(),
new DruidHttpClientConfig()
{
@Override
public long getMaxQueuedBytes()
{
return 0L;
}
},
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
},
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
);
ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
new ServiceEmitter("", "", null)
{
@Override
public void emit(Event event)
{
}
},
baseClient,
null /* local client; unused in this test, so pass in null */,
warehouse,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
retryConfig,
jsonMapper,
serverConfig,
null,
new CacheConfig()
);
defineMocks();
QueryPlus queryPlus = QueryPlus.wrap(query);
final Sequence<?> res = query.getRunner(walker).run(queryPlus);
List actualResults = new ArrayList();
actualResults = (List<MapBasedRow>) res.accumulate(actualResults, Accumulators.list());
expectedResults = consistentTypeCasting(expectedResults);
actualResults = consistentTypeCasting(actualResults);
Assert.assertEquals(expectedResults, actualResults);
}
}