blob: 97457b8688e9d1e8fb0e708d2177070a1e37ca61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.SimpleServerView;
import org.apache.druid.client.TestHttpClient;
import org.apache.druid.client.TestHttpClient.SimpleServerManager;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* This class provides useful setup for testing {@link QueryRunner}s which work on top of
* {@link CachingClusteredClient}. In this class, each {@link DruidServer} serves one segment
* created using {@link SegmentGenerator}. Tests extending this class can query those segments as below:
*
* <pre>
* public void test()
* {
* prepareCluster(3); // prepare a cluster of 3 servers
* Query<T> query = makeQuery();
* QueryRunner<T> baseRunner = cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals());
* QueryRunner<T> queryRunner = makeQueryRunner(baseRunner);
* queryRunner.run(QueryPlus.wrap(query), responseContext())
* ...
* }
* </pre>
*/
public abstract class QueryRunnerBasedOnClusteredClientTestBase
{
protected static final GeneratorSchemaInfo BASE_SCHEMA_INFO = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
private static final Closer CLOSER = Closer.create();
private static final String DATASOURCE = "datasource";
private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
protected final ObjectMapper objectMapper = new DefaultObjectMapper();
protected final QueryToolChestWarehouse toolChestWarehouse;
private final QueryRunnerFactoryConglomerate conglomerate;
protected TestHttpClient httpClient;
protected SimpleServerView simpleServerView;
protected CachingClusteredClient cachingClusteredClient;
protected List<DruidServer> servers;
private SegmentGenerator segmentGenerator;
protected QueryRunnerBasedOnClusteredClientTestBase()
{
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(
CLOSER,
USE_PARALLEL_MERGE_POOL_CONFIGURED,
() -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD
);
toolChestWarehouse = new QueryToolChestWarehouse()
{
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
{
return conglomerate.findFactory(query).getToolchest();
}
};
}
@AfterClass
public static void tearDownAbstractClass() throws IOException
{
CLOSER.close();
}
@Before
public void setupTestBase()
{
segmentGenerator = new SegmentGenerator();
httpClient = new TestHttpClient(objectMapper);
simpleServerView = new SimpleServerView(toolChestWarehouse, objectMapper, httpClient);
cachingClusteredClient = new CachingClusteredClient(
toolChestWarehouse,
simpleServerView,
MapCache.create(0),
objectMapper,
new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig(),
QueryStackTests.getProcessingConfig(
USE_PARALLEL_MERGE_POOL_CONFIGURED,
DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
new NoopServiceEmitter()
);
servers = new ArrayList<>();
}
@After
public void tearDownTestBase() throws IOException
{
segmentGenerator.close();
}
protected void addServer(DruidServer server, DataSegment dataSegment, QueryableIndex queryableIndex)
{
addServer(server, dataSegment, queryableIndex, false);
}
protected void addServer(
DruidServer server,
DataSegment dataSegment,
QueryableIndex queryableIndex,
boolean throwQueryError
)
{
servers.add(server);
simpleServerView.addServer(server, dataSegment);
httpClient.addServerAndRunner(
server,
new SimpleServerManager(conglomerate, dataSegment, queryableIndex, throwQueryError)
);
}
protected void prepareCluster(int numServers)
{
Preconditions.checkArgument(numServers < 25, "Cannot be larger than 24");
for (int i = 0; i < numServers; i++) {
final int partitionId = i % 2;
final int intervalIndex = i / 2;
final Interval interval = Intervals.of("2000-01-01T%02d/PT1H", intervalIndex);
final DataSegment segment = newSegment(interval, partitionId, 2);
addServer(
SimpleServerView.createServer(i + 1),
segment,
generateSegment(segment)
);
}
}
protected QueryableIndex generateSegment(DataSegment segment)
{
return segmentGenerator.generate(
segment,
new GeneratorSchemaInfo(
BASE_SCHEMA_INFO.getColumnSchemas(),
BASE_SCHEMA_INFO.getAggs(),
segment.getInterval(),
BASE_SCHEMA_INFO.isWithRollup()
),
Granularities.NONE,
10
);
}
protected static Query<Result<TimeseriesResultValue>> timeseriesQuery(Interval interval)
{
return Druids.newTimeseriesQueryBuilder()
.dataSource(DATASOURCE)
.intervals(ImmutableList.of(interval))
.granularity(Granularities.HOUR)
.aggregators(new CountAggregatorFactory("rows"))
.context(
ImmutableMap.of(
DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 10000
)
)
.build()
.withId(UUID.randomUUID().toString());
}
protected static List<Result<TimeseriesResultValue>> expectedTimeseriesResult(int expectedNumResultRows)
{
return IntStream
.range(0, expectedNumResultRows)
.mapToObj(
i -> new Result<>(
DateTimes.of(StringUtils.format("2000-01-01T%02d", i / 2)),
new TimeseriesResultValue(ImmutableMap.of("rows", 10))
)
)
.collect(Collectors.toList());
}
protected static ResponseContext responseContext()
{
final ResponseContext responseContext = ConcurrentResponseContext.createEmpty();
responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
return responseContext;
}
protected static DataSegment newSegment(
Interval interval,
int partitionId,
int numCorePartitions
)
{
return DataSegment.builder()
.dataSource(DATASOURCE)
.interval(interval)
.version("1")
.shardSpec(new NumberedShardSpec(partitionId, numCorePartitions))
.size(10)
.build();
}
}