| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.client; |
| |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.annotation.JsonSerialize; |
| import com.google.common.base.Function; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Ordering; |
| import com.google.common.hash.HashFunction; |
| import com.google.common.hash.Hashing; |
| import com.google.common.util.concurrent.ForwardingListeningExecutorService; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.SettableFuture; |
| import org.apache.druid.client.cache.BackgroundCachePopulator; |
| import org.apache.druid.client.cache.Cache; |
| import org.apache.druid.client.cache.CacheConfig; |
| import org.apache.druid.client.cache.CachePopulator; |
| import org.apache.druid.client.cache.CachePopulatorStats; |
| import org.apache.druid.client.cache.ForegroundCachePopulator; |
| import org.apache.druid.client.cache.MapCache; |
| import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; |
| import org.apache.druid.client.selector.QueryableDruidServer; |
| import org.apache.druid.client.selector.RandomServerSelectorStrategy; |
| import org.apache.druid.client.selector.ServerSelector; |
| import org.apache.druid.guice.http.DruidHttpClientConfig; |
| import org.apache.druid.hll.HyperLogLogCollector; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.common.granularity.PeriodGranularity; |
| import org.apache.druid.java.util.common.guava.Comparators; |
| import org.apache.druid.java.util.common.guava.FunctionalIterable; |
| import org.apache.druid.java.util.common.guava.MergeIterable; |
| import org.apache.druid.java.util.common.guava.Sequence; |
| import org.apache.druid.java.util.common.guava.Sequences; |
| import org.apache.druid.java.util.common.guava.nary.TrinaryFn; |
| import org.apache.druid.java.util.common.io.Closer; |
| import org.apache.druid.query.BySegmentResultValueClass; |
| import org.apache.druid.query.DruidProcessingConfig; |
| import org.apache.druid.query.Druids; |
| import org.apache.druid.query.FinalizeResultsQueryRunner; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryContexts; |
| import org.apache.druid.query.QueryPlus; |
| import org.apache.druid.query.QueryRunner; |
| import org.apache.druid.query.QueryToolChestWarehouse; |
| import org.apache.druid.query.Result; |
| import org.apache.druid.query.SegmentDescriptor; |
| import org.apache.druid.query.aggregation.AggregatorFactory; |
| import org.apache.druid.query.aggregation.CountAggregatorFactory; |
| import org.apache.druid.query.aggregation.LongSumAggregatorFactory; |
| import org.apache.druid.query.aggregation.PostAggregator; |
| import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; |
| import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; |
| import org.apache.druid.query.aggregation.post.ConstantPostAggregator; |
| import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; |
| import org.apache.druid.query.context.ResponseContext; |
| import org.apache.druid.query.context.ResponseContext.Key; |
| import org.apache.druid.query.dimension.DefaultDimensionSpec; |
| import org.apache.druid.query.filter.AndDimFilter; |
| import org.apache.druid.query.filter.BoundDimFilter; |
| import org.apache.druid.query.filter.DimFilter; |
| import org.apache.druid.query.filter.InDimFilter; |
| import org.apache.druid.query.filter.OrDimFilter; |
| import org.apache.druid.query.filter.SelectorDimFilter; |
| import org.apache.druid.query.groupby.GroupByQuery; |
| import org.apache.druid.query.groupby.ResultRow; |
| import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; |
| import org.apache.druid.query.ordering.StringComparators; |
| import org.apache.druid.query.planning.DataSourceAnalysis; |
| import org.apache.druid.query.search.SearchHit; |
| import org.apache.druid.query.search.SearchQuery; |
| import org.apache.druid.query.search.SearchQueryConfig; |
| import org.apache.druid.query.search.SearchQueryQueryToolChest; |
| import org.apache.druid.query.search.SearchResultValue; |
| import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; |
| import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; |
| import org.apache.druid.query.spec.QuerySegmentSpec; |
| import org.apache.druid.query.timeboundary.TimeBoundaryQuery; |
| import org.apache.druid.query.timeboundary.TimeBoundaryResultValue; |
| import org.apache.druid.query.timeseries.TimeseriesQuery; |
| import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; |
| import org.apache.druid.query.timeseries.TimeseriesResultValue; |
| import org.apache.druid.query.topn.TopNQuery; |
| import org.apache.druid.query.topn.TopNQueryBuilder; |
| import org.apache.druid.query.topn.TopNQueryConfig; |
| import org.apache.druid.query.topn.TopNQueryQueryToolChest; |
| import org.apache.druid.query.topn.TopNResultValue; |
| import org.apache.druid.segment.TestHelper; |
| import org.apache.druid.segment.join.MapJoinableFactory; |
| import org.apache.druid.server.QueryScheduler; |
| import org.apache.druid.server.ServerTestHelper; |
| import org.apache.druid.server.coordination.ServerType; |
| import org.apache.druid.server.initialization.ServerConfig; |
| import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; |
| import org.apache.druid.server.scheduling.NoQueryLaningStrategy; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.SegmentId; |
| import org.apache.druid.timeline.VersionedIntervalTimeline; |
| import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; |
| import org.apache.druid.timeline.partition.HashPartitionFunction; |
| import org.apache.druid.timeline.partition.NoneShardSpec; |
| import org.apache.druid.timeline.partition.NumberedPartitionChunk; |
| import org.apache.druid.timeline.partition.ShardSpec; |
| import org.apache.druid.timeline.partition.SingleDimensionShardSpec; |
| import org.apache.druid.timeline.partition.SingleElementPartitionChunk; |
| import org.easymock.Capture; |
| import org.easymock.EasyMock; |
| import org.easymock.IAnswer; |
| import org.joda.time.DateTime; |
| import org.joda.time.DateTimeZone; |
| import org.joda.time.Interval; |
| import org.joda.time.Period; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import javax.annotation.Nullable; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedDeque; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ForkJoinPool; |
| import java.util.stream.IntStream; |
| |
| /** |
| * |
| */ |
| @RunWith(Parameterized.class) |
| public class CachingClusteredClientTest |
| { |
| private static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of( |
| "finalize", false, |
| |
| // GroupBy v2 won't cache on the broker, so test with v1. |
| "groupByStrategy", GroupByStrategySelector.STRATEGY_V1 |
| ); |
| private static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); |
| private static final String DATA_SOURCE = "test"; |
| private static final ObjectMapper JSON_MAPPER = CachingClusteredClientTestUtils.createObjectMapper(); |
| |
| /** |
| * We want a deterministic test, but we'd also like a bit of randomness for the distribution of segments |
| * across servers. Thus, we loop multiple times and each time use a deterministically created Random instance. |
| * Increase this value to increase exposure to random situations at the expense of test run time. |
| */ |
| private static final int RANDOMNESS = 10; |
| private static final List<AggregatorFactory> AGGS = Arrays.asList( |
| new CountAggregatorFactory("rows"), |
| new LongSumAggregatorFactory("imps", "imps"), |
| new LongSumAggregatorFactory("impers", "imps") |
| ); |
| private static final List<PostAggregator> POST_AGGS = Arrays.asList( |
| new ArithmeticPostAggregator( |
| "avg_imps_per_row", |
| "/", |
| Arrays.asList( |
| new FieldAccessPostAggregator("imps", "imps"), |
| new FieldAccessPostAggregator("rows", "rows") |
| ) |
| ), |
| new ArithmeticPostAggregator( |
| "avg_imps_per_row_double", |
| "*", |
| Arrays.asList( |
| new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"), |
| new ConstantPostAggregator("constant", 2) |
| ) |
| ), |
| new ArithmeticPostAggregator( |
| "avg_imps_per_row_half", |
| "/", |
| Arrays.asList( |
| new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"), |
| new ConstantPostAggregator("constant", 2) |
| ) |
| ) |
| ); |
| private static final List<AggregatorFactory> RENAMED_AGGS = Arrays.asList( |
| new CountAggregatorFactory("rows"), |
| new LongSumAggregatorFactory("imps", "imps"), |
| new LongSumAggregatorFactory("impers2", "imps") |
| ); |
| private static final List<PostAggregator> DIFF_ORDER_POST_AGGS = Arrays.asList( |
| new ArithmeticPostAggregator( |
| "avg_imps_per_row", |
| "/", |
| Arrays.asList( |
| new FieldAccessPostAggregator("imps", "imps"), |
| new FieldAccessPostAggregator("rows", "rows") |
| ) |
| ), |
| new ArithmeticPostAggregator( |
| "avg_imps_per_row_half", |
| "/", |
| Arrays.asList( |
| new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"), |
| new ConstantPostAggregator("constant", 2) |
| ) |
| ), |
| new ArithmeticPostAggregator( |
| "avg_imps_per_row_double", |
| "*", |
| Arrays.asList( |
| new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"), |
| new ConstantPostAggregator("constant", 2) |
| ) |
| ) |
| ); |
| private static final DimFilter DIM_FILTER = null; |
| private static final List<PostAggregator> RENAMED_POST_AGGS = ImmutableList.of(); |
| private static final Granularity GRANULARITY = Granularities.DAY; |
| private static final DateTimeZone TIMEZONE = DateTimes.inferTzFromString("America/Los_Angeles"); |
| private static final Granularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE); |
| private static final String TOP_DIM = "a_dim"; |
| private static final Pair<QueryToolChestWarehouse, Closer> WAREHOUSE_AND_CLOSER = CachingClusteredClientTestUtils |
| .createWarehouse(JSON_MAPPER); |
| private static final QueryToolChestWarehouse WAREHOUSE = WAREHOUSE_AND_CLOSER.lhs; |
| private static final Closer RESOURCE_CLOSER = WAREHOUSE_AND_CLOSER.rhs; |
| |
| private final Random random; |
| |
| private CachingClusteredClient client; |
| private Runnable queryCompletedCallback; |
| private TimelineServerView serverView; |
| private VersionedIntervalTimeline<String, ServerSelector> timeline; |
| private Cache cache; |
| private DruidServer[] servers; |
| |
| public CachingClusteredClientTest(int randomSeed) |
| { |
| this.random = new Random(randomSeed); |
| } |
| |
| @Parameterized.Parameters(name = "{0}") |
| public static Iterable<Object[]> constructorFeeder() |
| { |
| return Lists.transform( |
| Lists.newArrayList(new RangeIterable(RANDOMNESS)), |
| new Function<Integer, Object[]>() |
| { |
| @Override |
| public Object[] apply(Integer input) |
| { |
| return new Object[]{input}; |
| } |
| } |
| ); |
| } |
| |
| @AfterClass |
| public static void tearDownClass() throws IOException |
| { |
| RESOURCE_CLOSER.close(); |
| } |
| |
| @Before |
| public void setUp() |
| { |
| timeline = new VersionedIntervalTimeline<>(Ordering.natural()); |
| serverView = EasyMock.createNiceMock(TimelineServerView.class); |
| cache = MapCache.create(100000); |
| client = makeClient(new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), -1)); |
| |
| servers = new DruidServer[]{ |
| new DruidServer("test1", "test1", null, 10, ServerType.HISTORICAL, "bye", 0), |
| new DruidServer("test2", "test2", null, 10, ServerType.HISTORICAL, "bye", 0), |
| new DruidServer("test3", "test3", null, 10, ServerType.HISTORICAL, "bye", 0), |
| new DruidServer("test4", "test4", null, 10, ServerType.HISTORICAL, "bye", 0), |
| new DruidServer("test5", "test5", null, 10, ServerType.HISTORICAL, "bye", 0) |
| }; |
| } |
| |
| @Test |
| public void testOutOfOrderBackgroundCachePopulation() |
| { |
| // This test is a bit whacky, but I couldn't find a better way to do it in the current framework. |
| |
| // The purpose of this special executor is to randomize execution of tasks on purpose. |
| // Since we don't know the number of tasks to be executed, a special DrainTask is used |
| // to trigger the actual execution when we are ready to shuffle the order. |
| abstract class DrainTask implements Runnable |
| { |
| } |
| final ForwardingListeningExecutorService randomizingExecutorService = new ForwardingListeningExecutorService() |
| { |
| final ConcurrentLinkedDeque<Pair<SettableFuture, Object>> taskQueue = new ConcurrentLinkedDeque<>(); |
| final ListeningExecutorService delegate = MoreExecutors.listeningDecorator( |
| // we need to run everything in the same thread to ensure all callbacks on futures in CachingClusteredClient |
| // are complete before moving on to the next query run. |
| Execs.directExecutor() |
| ); |
| |
| @Override |
| protected ListeningExecutorService delegate() |
| { |
| return delegate; |
| } |
| |
| private <T> ListenableFuture<T> maybeSubmitTask(Object task, boolean wait) |
| { |
| if (wait) { |
| SettableFuture<T> future = SettableFuture.create(); |
| taskQueue.addFirst(Pair.of(future, task)); |
| return future; |
| } else { |
| List<Pair<SettableFuture, Object>> tasks = Lists.newArrayList(taskQueue.iterator()); |
| Collections.shuffle(tasks, new Random(0)); |
| |
| for (final Pair<SettableFuture, Object> pair : tasks) { |
| ListenableFuture future = pair.rhs instanceof Callable ? |
| delegate.submit((Callable) pair.rhs) : |
| delegate.submit((Runnable) pair.rhs); |
| Futures.addCallback( |
| future, |
| new FutureCallback() |
| { |
| @Override |
| public void onSuccess(@Nullable Object result) |
| { |
| pair.lhs.set(result); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) |
| { |
| pair.lhs.setException(t); |
| } |
| } |
| ); |
| } |
| } |
| return task instanceof Callable ? |
| delegate.submit((Callable) task) : |
| (ListenableFuture<T>) delegate.submit((Runnable) task); |
| } |
| |
| @SuppressWarnings("ParameterPackage") |
| @Override |
| public <T> ListenableFuture<T> submit(Callable<T> task) |
| { |
| return maybeSubmitTask(task, true); |
| } |
| |
| @Override |
| public ListenableFuture<?> submit(Runnable task) |
| { |
| if (task instanceof DrainTask) { |
| return maybeSubmitTask(task, false); |
| } else { |
| return maybeSubmitTask(task, true); |
| } |
| } |
| }; |
| |
| client = makeClient( |
| new BackgroundCachePopulator( |
| randomizingExecutorService, |
| JSON_MAPPER, |
| new CachePopulatorStats(), |
| -1 |
| ) |
| ); |
| |
| // callback to be run every time a query run is complete, to ensure all background |
| // caching tasks are executed, and cache is populated before we move onto the next query |
| queryCompletedCallback = new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| try { |
| randomizingExecutorService.submit( |
| new DrainTask() |
| { |
| @Override |
| public void run() |
| { |
| // no-op |
| } |
| } |
| ).get(); |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| }; |
| |
| final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(SEG_SPEC) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT) |
| .randomQueryId(); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); |
| |
| testQueryCaching( |
| runner, |
| builder.build(), |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTimeResults( |
| DateTimes.of("2011-01-05"), 85, 102, |
| DateTimes.of("2011-01-06"), 412, 521, |
| DateTimes.of("2011-01-07"), 122, 21894, |
| DateTimes.of("2011-01-08"), 5, 20, |
| DateTimes.of("2011-01-09"), 18, 521 |
| ), |
| Intervals.of("2011-01-10/2011-01-13"), |
| makeTimeResults( |
| DateTimes.of("2011-01-10"), 85, 102, |
| DateTimes.of("2011-01-11"), 412, 521, |
| DateTimes.of("2011-01-12"), 122, 21894 |
| ) |
| ); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testTimeseriesCaching() |
| { |
| final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(SEG_SPEC) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); |
| |
| testQueryCaching( |
| runner, |
| builder.randomQueryId().build(), |
| Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000), |
| Intervals.of("2011-01-02/2011-01-03"), makeTimeResults(DateTimes.of("2011-01-02"), 30, 6000), |
| Intervals.of("2011-01-04/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-04"), 23, 85312), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTimeResults( |
| DateTimes.of("2011-01-05"), 85, 102, |
| DateTimes.of("2011-01-06"), 412, 521, |
| DateTimes.of("2011-01-07"), 122, 21894, |
| DateTimes.of("2011-01-08"), 5, 20, |
| DateTimes.of("2011-01-09"), 18, 521 |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTimeResults( |
| DateTimes.of("2011-01-05T01"), 80, 100, |
| DateTimes.of("2011-01-06T01"), 420, 520, |
| DateTimes.of("2011-01-07T01"), 12, 2194, |
| DateTimes.of("2011-01-08T01"), 59, 201, |
| DateTimes.of("2011-01-09T01"), 181, 52 |
| ) |
| ); |
| |
| |
| TimeseriesQuery query = builder.intervals("2011-01-01/2011-01-10") |
| .aggregators(RENAMED_AGGS) |
| .postAggregators(RENAMED_POST_AGGS) |
| .randomQueryId() |
| .build(); |
| TestHelper.assertExpectedResults( |
| makeRenamedTimeResults( |
| DateTimes.of("2011-01-01"), 50, 5000, |
| DateTimes.of("2011-01-02"), 30, 6000, |
| DateTimes.of("2011-01-04"), 23, 85312, |
| DateTimes.of("2011-01-05"), 85, 102, |
| DateTimes.of("2011-01-05T01"), 80, 100, |
| DateTimes.of("2011-01-06"), 412, 521, |
| DateTimes.of("2011-01-06T01"), 420, 520, |
| DateTimes.of("2011-01-07"), 122, 21894, |
| DateTimes.of("2011-01-07T01"), 12, 2194, |
| DateTimes.of("2011-01-08"), 5, 20, |
| DateTimes.of("2011-01-08T01"), 59, 201, |
| DateTimes.of("2011-01-09"), 18, 521, |
| DateTimes.of("2011-01-09T01"), 181, 52 |
| ), |
| runner.run(QueryPlus.wrap(query)) |
| ); |
| } |
| |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testCachingOverBulkLimitEnforcesLimit() |
| { |
| final int limit = 10; |
| final Interval interval = Intervals.of("2011-01-01/2011-01-02"); |
| final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval))) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT) |
| .randomQueryId() |
| .build(); |
| |
| final ResponseContext context = initializeResponseContext(); |
| final Cache cache = EasyMock.createStrictMock(Cache.class); |
| final Capture<Iterable<Cache.NamedKey>> cacheKeyCapture = EasyMock.newCapture(); |
| EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture))) |
| .andReturn(ImmutableMap.of()) |
| .once(); |
| EasyMock.replay(cache); |
| client = makeClient(new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), -1), cache, limit); |
| final DruidServer lastServer = servers[random.nextInt(servers.length)]; |
| final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class); |
| EasyMock.expect(dataSegment.getId()).andReturn(SegmentId.dummy(DATA_SOURCE)).anyTimes(); |
| EasyMock.replay(dataSegment); |
| final ServerSelector selector = new ServerSelector( |
| dataSegment, |
| new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) |
| ); |
| selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), dataSegment); |
| timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector)); |
| |
| getDefaultQueryRunner().run(QueryPlus.wrap(query), context); |
| |
| Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured()); |
| Assert.assertTrue("Cache key below limit", ImmutableList.copyOf(cacheKeyCapture.getValue()).size() <= limit); |
| |
| EasyMock.verify(cache); |
| |
| EasyMock.reset(cache); |
| cacheKeyCapture.reset(); |
| EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture))) |
| .andReturn(ImmutableMap.of()) |
| .once(); |
| EasyMock.replay(cache); |
| client = makeClient(new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), -1), cache, 0); |
| getDefaultQueryRunner().run(QueryPlus.wrap(query), context); |
| EasyMock.verify(cache); |
| EasyMock.verify(dataSegment); |
| Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured()); |
| Assert.assertTrue("Cache Keys empty", ImmutableList.copyOf(cacheKeyCapture.getValue()).isEmpty()); |
| } |
| |
| @Test |
| public void testTimeseriesMergingOutOfOrderPartitions() |
| { |
| final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(SEG_SPEC) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); |
| |
| testQueryCaching( |
| runner, |
| builder.randomQueryId().build(), |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTimeResults( |
| DateTimes.of("2011-01-05T02"), 80, 100, |
| DateTimes.of("2011-01-06T02"), 420, 520, |
| DateTimes.of("2011-01-07T02"), 12, 2194, |
| DateTimes.of("2011-01-08T02"), 59, 201, |
| DateTimes.of("2011-01-09T02"), 181, 52 |
| ), |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTimeResults( |
| DateTimes.of("2011-01-05T00"), 85, 102, |
| DateTimes.of("2011-01-06T00"), 412, 521, |
| DateTimes.of("2011-01-07T00"), 122, 21894, |
| DateTimes.of("2011-01-08T00"), 5, 20, |
| DateTimes.of("2011-01-09T00"), 18, 521 |
| ) |
| ); |
| |
| TimeseriesQuery query = builder |
| .intervals("2011-01-05/2011-01-10") |
| .aggregators(RENAMED_AGGS) |
| .postAggregators(RENAMED_POST_AGGS) |
| .randomQueryId() |
| .build(); |
| TestHelper.assertExpectedResults( |
| makeRenamedTimeResults( |
| DateTimes.of("2011-01-05T00"), 85, 102, |
| DateTimes.of("2011-01-05T02"), 80, 100, |
| DateTimes.of("2011-01-06T00"), 412, 521, |
| DateTimes.of("2011-01-06T02"), 420, 520, |
| DateTimes.of("2011-01-07T00"), 122, 21894, |
| DateTimes.of("2011-01-07T02"), 12, 2194, |
| DateTimes.of("2011-01-08T00"), 5, 20, |
| DateTimes.of("2011-01-08T02"), 59, 201, |
| DateTimes.of("2011-01-09T00"), 18, 521, |
| DateTimes.of("2011-01-09T02"), 181, 52 |
| ), |
| runner.run(QueryPlus.wrap(query)) |
| ); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testTimeseriesCachingTimeZone() |
| { |
| final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(SEG_SPEC) |
| .filters(DIM_FILTER) |
| .granularity(PT1H_TZ_GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); |
| |
| testQueryCaching( |
| runner, |
| builder.randomQueryId().build(), |
| Intervals.of("2011-11-04/2011-11-08"), |
| makeTimeResults( |
| new DateTime("2011-11-04", TIMEZONE), 50, 5000, |
| new DateTime("2011-11-05", TIMEZONE), 30, 6000, |
| new DateTime("2011-11-06", TIMEZONE), 23, 85312, |
| new DateTime("2011-11-07", TIMEZONE), 85, 102 |
| ) |
| ); |
| TimeseriesQuery query = builder |
| .intervals("2011-11-04/2011-11-08") |
| .aggregators(RENAMED_AGGS) |
| .postAggregators(RENAMED_POST_AGGS) |
| .randomQueryId() |
| .build(); |
| TestHelper.assertExpectedResults( |
| makeRenamedTimeResults( |
| new DateTime("2011-11-04", TIMEZONE), 50, 5000, |
| new DateTime("2011-11-05", TIMEZONE), 30, 6000, |
| new DateTime("2011-11-06", TIMEZONE), 23, 85312, |
| new DateTime("2011-11-07", TIMEZONE), 85, 102 |
| ), |
| runner.run(QueryPlus.wrap(query)) |
| ); |
| } |
| |
| @Test |
| public void testDisableUseCache() |
| { |
| final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(SEG_SPEC) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); |
| |
| testQueryCaching( |
| runner, |
| 1, |
| true, |
| builder.context( |
| ImmutableMap.of( |
| "useCache", "false", |
| "populateCache", "true" |
| ) |
| ).randomQueryId().build(), |
| Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000) |
| ); |
| |
| Assert.assertEquals(1, cache.getStats().getNumEntries()); |
| Assert.assertEquals(0, cache.getStats().getNumHits()); |
| Assert.assertEquals(0, cache.getStats().getNumMisses()); |
| |
| cache.close(SegmentId.dummy("0_0").toString()); |
| |
| testQueryCaching( |
| runner, |
| 1, |
| false, |
| builder.context( |
| ImmutableMap.of( |
| "useCache", "false", |
| "populateCache", "false" |
| ) |
| ).randomQueryId().build(), |
| Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000) |
| ); |
| |
| Assert.assertEquals(0, cache.getStats().getNumEntries()); |
| Assert.assertEquals(0, cache.getStats().getNumHits()); |
| Assert.assertEquals(0, cache.getStats().getNumMisses()); |
| |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| 1, |
| false, |
| builder.context( |
| ImmutableMap.of( |
| "useCache", "true", |
| "populateCache", "false" |
| ) |
| ).randomQueryId().build(), |
| Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000) |
| ); |
| |
| Assert.assertEquals(0, cache.getStats().getNumEntries()); |
| Assert.assertEquals(0, cache.getStats().getNumHits()); |
| Assert.assertEquals(1, cache.getStats().getNumMisses()); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testTopNCaching() |
| { |
| final TopNQueryBuilder builder = new TopNQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .dimension(TOP_DIM) |
| .metric("imps") |
| .threshold(3) |
| .intervals(SEG_SPEC) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner( |
| getDefaultQueryRunner(), |
| new TopNQueryQueryToolChest(new TopNQueryConfig()) |
| ); |
| |
| testQueryCaching( |
| runner, |
| builder.randomQueryId().build(), |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeTopNResultsWithoutRename(DateTimes.of("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998), |
| |
| Intervals.of("2011-01-02/2011-01-03"), |
| makeTopNResultsWithoutRename(DateTimes.of("2011-01-02"), "a", 50, 4997, "b", 50, 4996, "c", 50, 4995), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ) |
| ); |
| TopNQuery query = builder |
| .intervals("2011-01-01/2011-01-10") |
| .metric("imps") |
| .aggregators(RENAMED_AGGS) |
| .postAggregators(DIFF_ORDER_POST_AGGS) |
| .randomQueryId() |
| .build(); |
| TestHelper.assertExpectedResults( |
| makeRenamedTopNResults( |
| DateTimes.of("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998, |
| DateTimes.of("2011-01-02"), "a", 50, 4997, "b", 50, 4996, "c", 50, 4995, |
| DateTimes.of("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983, |
| DateTimes.of("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ), |
| runner.run(QueryPlus.wrap(query)) |
| ); |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testTopNCachingTimeZone() |
| { |
| final TopNQueryBuilder builder = new TopNQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .dimension(TOP_DIM) |
| .metric("imps") |
| .threshold(3) |
| .intervals(SEG_SPEC) |
| .filters(DIM_FILTER) |
| .granularity(PT1H_TZ_GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner( |
| getDefaultQueryRunner(), |
| new TopNQueryQueryToolChest(new TopNQueryConfig()) |
| ); |
| |
| testQueryCaching( |
| runner, |
| builder.randomQueryId().build(), |
| Intervals.of("2011-11-04/2011-11-08"), |
| makeTopNResultsWithoutRename( |
| new DateTime("2011-11-04", TIMEZONE), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| new DateTime("2011-11-05", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| new DateTime("2011-11-07", TIMEZONE), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986 |
| ) |
| ); |
| TopNQuery query = builder |
| .intervals("2011-11-04/2011-11-08") |
| .metric("imps") |
| .aggregators(RENAMED_AGGS) |
| .postAggregators(DIFF_ORDER_POST_AGGS) |
| .randomQueryId() |
| .build(); |
| TestHelper.assertExpectedResults( |
| makeRenamedTopNResults( |
| |
| new DateTime("2011-11-04", TIMEZONE), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| new DateTime("2011-11-05", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| new DateTime("2011-11-07", TIMEZONE), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986 |
| ), |
| runner.run(QueryPlus.wrap(query)) |
| ); |
| } |
| |
| @Test |
| public void testOutOfOrderSequenceMerging() |
| { |
| List<Sequence<Result<TopNResultValue>>> sequences = |
| ImmutableList.of( |
| Sequences.simple( |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ) |
| ), |
| Sequences.simple( |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ) |
| ) |
| ); |
| |
| TestHelper.assertExpectedResults( |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, |
| DateTimes.of("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ), |
| mergeSequences( |
| new TopNQueryBuilder() |
| .dataSource("test") |
| .intervals("2011-01-06/2011-01-10") |
| .dimension("a") |
| .metric("b") |
| .threshold(3) |
| .aggregators(new CountAggregatorFactory("b")) |
| .randomQueryId() |
| .build(), |
| sequences |
| ) |
| ); |
| } |
| |
| private static <T> Sequence<T> mergeSequences(Query<T> query, List<Sequence<T>> sequences) |
| { |
| return Sequences.simple(sequences).flatMerge(seq -> seq, query.getResultOrdering()); |
| } |
| |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testTopNCachingEmptyResults() |
| { |
| final TopNQueryBuilder builder = new TopNQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .dimension(TOP_DIM) |
| .metric("imps") |
| .threshold(3) |
| .intervals(SEG_SPEC) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner( |
| getDefaultQueryRunner(), |
| new TopNQueryQueryToolChest(new TopNQueryConfig()) |
| ); |
| testQueryCaching( |
| runner, |
| builder.randomQueryId().build(), |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeTopNResultsWithoutRename(), |
| |
| Intervals.of("2011-01-02/2011-01-03"), |
| makeTopNResultsWithoutRename(), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ) |
| ); |
| |
| TopNQuery query = builder |
| .intervals("2011-01-01/2011-01-10") |
| .metric("imps") |
| .aggregators(RENAMED_AGGS) |
| .postAggregators(DIFF_ORDER_POST_AGGS) |
| .randomQueryId() |
| .build(); |
| TestHelper.assertExpectedResults( |
| makeRenamedTopNResults( |
| DateTimes.of("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, |
| DateTimes.of("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ), |
| runner.run(QueryPlus.wrap(query)) |
| ); |
| } |
| |
| @Test |
| public void testTopNOnPostAggMetricCaching() |
| { |
| final TopNQueryBuilder builder = new TopNQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .dimension(TOP_DIM) |
| .metric("avg_imps_per_row_double") |
| .threshold(3) |
| .intervals(SEG_SPEC) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner( |
| getDefaultQueryRunner(), |
| new TopNQueryQueryToolChest(new TopNQueryConfig()) |
| ); |
| |
| testQueryCaching( |
| runner, |
| builder.randomQueryId().build(), |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeTopNResultsWithoutRename(), |
| |
| Intervals.of("2011-01-02/2011-01-03"), |
| makeTopNResultsWithoutRename(), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ) |
| ); |
| |
| TopNQuery query = builder |
| .intervals("2011-01-01/2011-01-10") |
| .metric("avg_imps_per_row_double") |
| .aggregators(AGGS) |
| .postAggregators(DIFF_ORDER_POST_AGGS) |
| .randomQueryId() |
| .build(); |
| TestHelper.assertExpectedResults( |
| makeTopNResultsWithoutRename( |
| DateTimes.of("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, |
| DateTimes.of("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, |
| DateTimes.of("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, |
| DateTimes.of("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983, |
| DateTimes.of("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983 |
| ), |
| runner.run(QueryPlus.wrap(query)) |
| ); |
| } |
| |
| @Test |
| public void testSearchCaching() |
| { |
| final Druids.SearchQueryBuilder builder = Druids.newSearchQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .limit(1000) |
| .intervals(SEG_SPEC) |
| .dimensions(Collections.singletonList(TOP_DIM)) |
| .query("how") |
| .context(CONTEXT); |
| |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| builder.randomQueryId().build(), |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeSearchResults(TOP_DIM, DateTimes.of("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4), |
| |
| Intervals.of("2011-01-02/2011-01-03"), |
| makeSearchResults(TOP_DIM, DateTimes.of("2011-01-02"), "how1", 1, "howdy1", 2, "howwwwww1", 3, "howwy1", 4), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeSearchResults( |
| TOP_DIM, |
| DateTimes.of("2011-01-05"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-06"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-07"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-08"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-09"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4 |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeSearchResults( |
| TOP_DIM, |
| DateTimes.of("2011-01-05T01"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-06T01"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-07T01"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-08T01"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-09T01"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4 |
| ) |
| ); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner( |
| getDefaultQueryRunner(), |
| new SearchQueryQueryToolChest(new SearchQueryConfig()) |
| ); |
| |
| TestHelper.assertExpectedResults( |
| makeSearchResults( |
| TOP_DIM, |
| DateTimes.of("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4, |
| DateTimes.of("2011-01-02"), "how1", 1, "howdy1", 2, "howwwwww1", 3, "howwy1", 4, |
| DateTimes.of("2011-01-05"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-05T01"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-06"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-06T01"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-07"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-07T01"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-08"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-08T01"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-09"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4, |
| DateTimes.of("2011-01-09T01"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4 |
| ), |
| runner.run(QueryPlus.wrap(builder.randomQueryId().intervals("2011-01-01/2011-01-10").build())) |
| ); |
| } |
| |
| @Test |
| public void testSearchCachingRenamedOutput() |
| { |
| final Druids.SearchQueryBuilder builder = Druids.newSearchQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .filters(DIM_FILTER) |
| .granularity(GRANULARITY) |
| .limit(1000) |
| .intervals(SEG_SPEC) |
| .dimensions(Collections.singletonList(TOP_DIM)) |
| .query("how") |
| .context(CONTEXT); |
| |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| builder.randomQueryId().build(), |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeSearchResults(TOP_DIM, DateTimes.of("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4), |
| |
| Intervals.of("2011-01-02/2011-01-03"), |
| makeSearchResults(TOP_DIM, DateTimes.of("2011-01-02"), "how1", 1, "howdy1", 2, "howwwwww1", 3, "howwy1", 4), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeSearchResults( |
| TOP_DIM, |
| DateTimes.of("2011-01-05"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-06"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-07"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-08"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-09"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4 |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeSearchResults( |
| TOP_DIM, |
| DateTimes.of("2011-01-05T01"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-06T01"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-07T01"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-08T01"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-09T01"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4 |
| ) |
| ); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner( |
| getDefaultQueryRunner(), |
| new SearchQueryQueryToolChest(new SearchQueryConfig()) |
| ); |
| |
| ResponseContext context = initializeResponseContext(); |
| TestHelper.assertExpectedResults( |
| makeSearchResults( |
| TOP_DIM, |
| DateTimes.of("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4, |
| DateTimes.of("2011-01-02"), "how1", 1, "howdy1", 2, "howwwwww1", 3, "howwy1", 4, |
| DateTimes.of("2011-01-05"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-05T01"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-06"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-06T01"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-07"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-07T01"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-08"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-08T01"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-09"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4, |
| DateTimes.of("2011-01-09T01"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4 |
| ), |
| runner.run(QueryPlus.wrap(builder.randomQueryId().intervals("2011-01-01/2011-01-10").build()), context) |
| ); |
| SearchQuery query = builder |
| .intervals("2011-01-01/2011-01-10") |
| .dimensions(new DefaultDimensionSpec(TOP_DIM, "new_dim")) |
| .randomQueryId() |
| .build(); |
| TestHelper.assertExpectedResults( |
| makeSearchResults( |
| "new_dim", |
| DateTimes.of("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4, |
| DateTimes.of("2011-01-02"), "how1", 1, "howdy1", 2, "howwwwww1", 3, "howwy1", 4, |
| DateTimes.of("2011-01-05"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-05T01"), "how2", 1, "howdy2", 2, "howwwwww2", 3, "howww2", 4, |
| DateTimes.of("2011-01-06"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-06T01"), "how3", 1, "howdy3", 2, "howwwwww3", 3, "howww3", 4, |
| DateTimes.of("2011-01-07"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-07T01"), "how4", 1, "howdy4", 2, "howwwwww4", 3, "howww4", 4, |
| DateTimes.of("2011-01-08"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-08T01"), "how5", 1, "howdy5", 2, "howwwwww5", 3, "howww5", 4, |
| DateTimes.of("2011-01-09"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4, |
| DateTimes.of("2011-01-09T01"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4 |
| ), |
| runner.run(QueryPlus.wrap(query), context) |
| ); |
| } |
| |
| @Test |
| public void testGroupByCaching() |
| { |
| List<AggregatorFactory> aggsWithUniques = ImmutableList.<AggregatorFactory>builder() |
| .addAll(AGGS) |
| .add(new HyperUniquesAggregatorFactory("uniques", "uniques")) |
| .build(); |
| |
| final HashFunction hashFn = Hashing.murmur3_128(); |
| |
| GroupByQuery.Builder builder = new GroupByQuery.Builder() |
| .setDataSource(DATA_SOURCE) |
| .setQuerySegmentSpec(SEG_SPEC) |
| .setDimFilter(DIM_FILTER) |
| .setGranularity(GRANULARITY).setDimensions(new DefaultDimensionSpec("a", "a")) |
| .setAggregatorSpecs(aggsWithUniques) |
| .setPostAggregatorSpecs(POST_AGGS) |
| .setContext(CONTEXT); |
| |
| final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); |
| collector.add(hashFn.hashString("abc123", StandardCharsets.UTF_8).asBytes()); |
| collector.add(hashFn.hashString("123abc", StandardCharsets.UTF_8).asBytes()); |
| |
| final GroupByQuery query = builder.randomQueryId().build(); |
| |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| query, |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeGroupByResults( |
| query, |
| DateTimes.of("2011-01-01"), |
| ImmutableMap.of("a", "a", "rows", 1, "imps", 1, "impers", 1, "uniques", collector) |
| ), |
| |
| Intervals.of("2011-01-02/2011-01-03"), |
| makeGroupByResults( |
| query, |
| DateTimes.of("2011-01-02"), |
| ImmutableMap.of("a", "b", "rows", 2, "imps", 2, "impers", 2, "uniques", collector) |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeGroupByResults( |
| query, |
| DateTimes.of("2011-01-05"), |
| ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector), |
| DateTimes.of("2011-01-06"), |
| ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector), |
| DateTimes.of("2011-01-07"), |
| ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector), |
| DateTimes.of("2011-01-08"), |
| ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector), |
| DateTimes.of("2011-01-09"), |
| ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector) |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeGroupByResults( |
| query, |
| DateTimes.of("2011-01-05T01"), |
| ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector), |
| DateTimes.of("2011-01-06T01"), |
| ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector), |
| DateTimes.of("2011-01-07T01"), |
| ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector), |
| DateTimes.of("2011-01-08T01"), |
| ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector), |
| DateTimes.of("2011-01-09T01"), |
| ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector) |
| ) |
| ); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner( |
| getDefaultQueryRunner(), |
| WAREHOUSE.getToolChest(query) |
| ); |
| TestHelper.assertExpectedObjects( |
| makeGroupByResults( |
| query, |
| DateTimes.of("2011-01-05T"), |
| ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector), |
| DateTimes.of("2011-01-05T01"), |
| ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3, "uniques", collector), |
| DateTimes.of("2011-01-06T"), |
| ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector), |
| DateTimes.of("2011-01-06T01"), |
| ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4, "uniques", collector), |
| DateTimes.of("2011-01-07T"), |
| ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector), |
| DateTimes.of("2011-01-07T01"), |
| ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5, "uniques", collector), |
| DateTimes.of("2011-01-08T"), |
| ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector), |
| DateTimes.of("2011-01-08T01"), |
| ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6, "uniques", collector), |
| DateTimes.of("2011-01-09T"), |
| ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector), |
| DateTimes.of("2011-01-09T01"), |
| ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector) |
| ), |
| runner.run(QueryPlus.wrap(builder.randomQueryId().setInterval("2011-01-05/2011-01-10").build())), |
| "" |
| ); |
| } |
| |
| @Test |
| public void testTimeBoundaryCaching() |
| { |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| Druids.newTimeBoundaryQueryBuilder() |
| .dataSource(CachingClusteredClientTest.DATA_SOURCE) |
| .intervals(CachingClusteredClientTest.SEG_SPEC) |
| .context(CachingClusteredClientTest.CONTEXT) |
| .randomQueryId() |
| .build(), |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-01"), DateTimes.of("2011-01-01"), DateTimes.of("2011-01-02")), |
| |
| Intervals.of("2011-01-01/2011-01-03"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-02"), DateTimes.of("2011-01-02"), DateTimes.of("2011-01-03")), |
| |
| Intervals.of("2011-01-01/2011-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-05"), DateTimes.of("2011-01-05"), DateTimes.of("2011-01-10")), |
| |
| Intervals.of("2011-01-01/2011-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-05T01"), DateTimes.of("2011-01-05T01"), DateTimes.of("2011-01-10")) |
| ); |
| |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| Druids.newTimeBoundaryQueryBuilder() |
| .dataSource(CachingClusteredClientTest.DATA_SOURCE) |
| .intervals(CachingClusteredClientTest.SEG_SPEC) |
| .context(CachingClusteredClientTest.CONTEXT) |
| .bound(TimeBoundaryQuery.MAX_TIME) |
| .randomQueryId() |
| .build(), |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-02"), null, DateTimes.of("2011-01-02")), |
| |
| Intervals.of("2011-01-01/2011-01-03"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-03"), null, DateTimes.of("2011-01-03")), |
| |
| Intervals.of("2011-01-01/2011-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-10"), null, DateTimes.of("2011-01-10")) |
| ); |
| |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| Druids.newTimeBoundaryQueryBuilder() |
| .dataSource(CachingClusteredClientTest.DATA_SOURCE) |
| .intervals(CachingClusteredClientTest.SEG_SPEC) |
| .context(CachingClusteredClientTest.CONTEXT) |
| .bound(TimeBoundaryQuery.MIN_TIME) |
| .randomQueryId() |
| .build(), |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-01"), DateTimes.of("2011-01-01"), null), |
| |
| Intervals.of("2011-01-01/2011-01-03"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-02"), DateTimes.of("2011-01-02"), null), |
| |
| Intervals.of("2011-01-01/2011-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-05"), DateTimes.of("2011-01-05"), null), |
| |
| Intervals.of("2011-01-01/2011-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("2011-01-05T01"), DateTimes.of("2011-01-05T01"), null) |
| ); |
| } |
| |
| @Test |
| public void testTimeSeriesWithFilter() |
| { |
| DimFilter filter = new AndDimFilter( |
| new OrDimFilter( |
| new SelectorDimFilter("dim0", "1", null), |
| new BoundDimFilter("dim0", "222", "333", false, false, false, null, StringComparators.LEXICOGRAPHIC) |
| ), |
| new AndDimFilter( |
| new InDimFilter("dim1", Arrays.asList("0", "1", "2", "3", "4"), null), |
| new BoundDimFilter("dim1", "0", "3", false, true, false, null, StringComparators.LEXICOGRAPHIC), |
| new BoundDimFilter("dim1", "1", "9999", true, false, false, null, StringComparators.LEXICOGRAPHIC) |
| ) |
| ); |
| |
| final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(SEG_SPEC) |
| .filters(filter) |
| .granularity(GRANULARITY) |
| .aggregators(AGGS) |
| .postAggregators(POST_AGGS) |
| .context(CONTEXT); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); |
| |
| /* |
| For dim0 (2011-01-01/2011-01-05), the combined range is {[1,1], [222,333]}, so segments [-inf,1], [1,2], [2,3], and |
| [3,4] is needed |
| For dim1 (2011-01-06/2011-01-10), the combined range for the bound filters is {(1,3)}, combined this with the in |
| filter result in {[2,2]}, so segments [1,2] and [2,3] is needed |
| */ |
| List<Iterable<Result<TimeseriesResultValue>>> expectedResult = Arrays.asList( |
| makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000, |
| DateTimes.of("2011-01-02"), 10, 1252, |
| DateTimes.of("2011-01-03"), 20, 6213, |
| DateTimes.of("2011-01-04"), 30, 743 |
| ), |
| makeTimeResults(DateTimes.of("2011-01-07"), 60, 6020, |
| DateTimes.of("2011-01-08"), 70, 250 |
| ) |
| ); |
| |
| testQueryCachingWithFilter( |
| runner, |
| 3, |
| builder.randomQueryId().build(), |
| expectedResult, |
| Intervals.of("2011-01-01/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000), |
| Intervals.of("2011-01-01/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-02"), 10, 1252), |
| Intervals.of("2011-01-01/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-03"), 20, 6213), |
| Intervals.of("2011-01-01/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-04"), 30, 743), |
| Intervals.of("2011-01-01/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-05"), 40, 6000), |
| Intervals.of("2011-01-06/2011-01-10"), makeTimeResults(DateTimes.of("2011-01-06"), 50, 425), |
| Intervals.of("2011-01-06/2011-01-10"), makeTimeResults(DateTimes.of("2011-01-07"), 60, 6020), |
| Intervals.of("2011-01-06/2011-01-10"), makeTimeResults(DateTimes.of("2011-01-08"), 70, 250), |
| Intervals.of("2011-01-06/2011-01-10"), makeTimeResults(DateTimes.of("2011-01-09"), 23, 85312), |
| Intervals.of("2011-01-06/2011-01-10"), makeTimeResults(DateTimes.of("2011-01-10"), 100, 512) |
| ); |
| |
| } |
| |
| @Test |
| public void testSingleDimensionPruning() |
| { |
| DimFilter filter = new AndDimFilter( |
| new OrDimFilter( |
| new SelectorDimFilter("dim1", "a", null), |
| new BoundDimFilter("dim1", "from", "to", false, false, false, null, StringComparators.LEXICOGRAPHIC) |
| ), |
| new AndDimFilter( |
| new InDimFilter("dim2", Arrays.asList("a", "c", "e", "g"), null), |
| new BoundDimFilter("dim2", "aaa", "hi", false, false, false, null, StringComparators.LEXICOGRAPHIC), |
| new BoundDimFilter("dim2", "e", "zzz", true, true, false, null, StringComparators.LEXICOGRAPHIC) |
| ) |
| ); |
| |
| final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .filters(filter) |
| .granularity(GRANULARITY) |
| .intervals(SEG_SPEC) |
| .context(CONTEXT) |
| .intervals("2011-01-05/2011-01-10") |
| .aggregators(RENAMED_AGGS) |
| .postAggregators(RENAMED_POST_AGGS); |
| |
| TimeseriesQuery query = builder.randomQueryId().build(); |
| |
| final Interval interval1 = Intervals.of("2011-01-06/2011-01-07"); |
| final Interval interval2 = Intervals.of("2011-01-07/2011-01-08"); |
| final Interval interval3 = Intervals.of("2011-01-08/2011-01-09"); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); |
| |
| final DruidServer lastServer = servers[random.nextInt(servers.length)]; |
| ServerSelector selector1 = makeMockSingleDimensionSelector(lastServer, "dim1", null, "b", 0); |
| ServerSelector selector2 = makeMockSingleDimensionSelector(lastServer, "dim1", "e", "f", 1); |
| ServerSelector selector3 = makeMockSingleDimensionSelector(lastServer, "dim1", "hi", "zzz", 2); |
| ServerSelector selector4 = makeMockSingleDimensionSelector(lastServer, "dim2", "a", "e", 0); |
| ServerSelector selector5 = makeMockSingleDimensionSelector(lastServer, "dim2", null, null, 1); |
| ServerSelector selector6 = makeMockSingleDimensionSelector(lastServer, "other", "b", null, 0); |
| |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(0, 3, selector1)); |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(1, 3, selector2)); |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(2, 3, selector3)); |
| timeline.add(interval2, "v", new NumberedPartitionChunk<>(0, 2, selector4)); |
| timeline.add(interval2, "v", new NumberedPartitionChunk<>(1, 2, selector5)); |
| timeline.add(interval3, "v", new NumberedPartitionChunk<>(0, 1, selector6)); |
| |
| final Capture<QueryPlus> capture = Capture.newInstance(); |
| final Capture<ResponseContext> contextCap = Capture.newInstance(); |
| |
| QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class); |
| EasyMock.expect(mockRunner.run(EasyMock.capture(capture), EasyMock.capture(contextCap))) |
| .andReturn(Sequences.empty()) |
| .anyTimes(); |
| EasyMock.expect(serverView.getQueryRunner(lastServer)) |
| .andReturn(mockRunner) |
| .anyTimes(); |
| EasyMock.replay(serverView); |
| EasyMock.replay(mockRunner); |
| |
| List<SegmentDescriptor> descriptors = new ArrayList<>(); |
| descriptors.add(new SegmentDescriptor(interval1, "v", 0)); |
| descriptors.add(new SegmentDescriptor(interval1, "v", 2)); |
| descriptors.add(new SegmentDescriptor(interval2, "v", 1)); |
| descriptors.add(new SegmentDescriptor(interval3, "v", 0)); |
| MultipleSpecificSegmentSpec expected = new MultipleSpecificSegmentSpec(descriptors); |
| |
| runner.run(QueryPlus.wrap(query)).toList(); |
| |
| Assert.assertEquals(expected, ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec()); |
| } |
| |
| @Test |
| public void testHashBasedPruningQueryContextEnabledWithPartitionFunctionAndPartitionDimensionsDoSegmentPruning() |
| { |
| DimFilter filter = new AndDimFilter( |
| new SelectorDimFilter("dim1", "a", null), |
| new BoundDimFilter("dim2", "e", "zzz", true, true, false, null, StringComparators.LEXICOGRAPHIC), |
| // Equivalent filter of dim3 below is InDimFilter("dim3", Arrays.asList("c"), null) |
| new AndDimFilter( |
| new InDimFilter("dim3", Arrays.asList("a", "c", "e", "g"), null), |
| new BoundDimFilter("dim3", "aaa", "ddd", false, false, false, null, StringComparators.LEXICOGRAPHIC) |
| ) |
| ); |
| |
| final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .filters(filter) |
| .granularity(GRANULARITY) |
| .intervals(SEG_SPEC) |
| .context(CONTEXT) |
| .intervals("2011-01-05/2011-01-10") |
| .aggregators(RENAMED_AGGS) |
| .postAggregators(RENAMED_POST_AGGS) |
| .randomQueryId(); |
| |
| TimeseriesQuery query = builder.build(); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); |
| |
| final Interval interval1 = Intervals.of("2011-01-06/2011-01-07"); |
| final Interval interval2 = Intervals.of("2011-01-07/2011-01-08"); |
| final Interval interval3 = Intervals.of("2011-01-08/2011-01-09"); |
| |
| final DruidServer lastServer = servers[random.nextInt(servers.length)]; |
| List<String> partitionDimensions1 = ImmutableList.of("dim1"); |
| ServerSelector selector1 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions1, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 0, |
| 6 |
| ); |
| ServerSelector selector2 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions1, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 1, |
| 6 |
| ); |
| ServerSelector selector3 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions1, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 2, |
| 6 |
| ); |
| ServerSelector selector4 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions1, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 3, |
| 6 |
| ); |
| ServerSelector selector5 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions1, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 4, |
| 6 |
| ); |
| ServerSelector selector6 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions1, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 5, |
| 6 |
| ); |
| |
| List<String> partitionDimensions2 = ImmutableList.of("dim2"); |
| ServerSelector selector7 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions2, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 0, |
| 3 |
| ); |
| ServerSelector selector8 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions2, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 1, |
| 3 |
| ); |
| ServerSelector selector9 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions2, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 2, |
| 3 |
| ); |
| |
| List<String> partitionDimensions3 = ImmutableList.of("dim1", "dim3"); |
| ServerSelector selector10 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions3, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 0, |
| 4 |
| ); |
| ServerSelector selector11 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions3, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 1, |
| 4 |
| ); |
| ServerSelector selector12 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions3, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 2, |
| 4 |
| ); |
| ServerSelector selector13 = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions3, |
| HashPartitionFunction.MURMUR3_32_ABS, |
| 3, |
| 4 |
| ); |
| |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(0, 6, selector1)); |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(1, 6, selector2)); |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(2, 6, selector3)); |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(3, 6, selector4)); |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(4, 6, selector5)); |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(5, 6, selector6)); |
| |
| timeline.add(interval2, "v", new NumberedPartitionChunk<>(0, 3, selector7)); |
| timeline.add(interval2, "v", new NumberedPartitionChunk<>(1, 3, selector8)); |
| timeline.add(interval2, "v", new NumberedPartitionChunk<>(2, 3, selector9)); |
| |
| timeline.add(interval3, "v", new NumberedPartitionChunk<>(0, 3, selector10)); |
| timeline.add(interval3, "v", new NumberedPartitionChunk<>(1, 3, selector11)); |
| timeline.add(interval3, "v", new NumberedPartitionChunk<>(2, 3, selector12)); |
| timeline.add(interval3, "v", new NumberedPartitionChunk<>(2, 3, selector13)); |
| |
| final Capture<QueryPlus> capture = Capture.newInstance(); |
| final Capture<ResponseContext> contextCap = Capture.newInstance(); |
| |
| QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class); |
| EasyMock.expect(mockRunner.run(EasyMock.capture(capture), EasyMock.capture(contextCap))) |
| .andReturn(Sequences.empty()) |
| .anyTimes(); |
| EasyMock.expect(serverView.getQueryRunner(lastServer)) |
| .andReturn(mockRunner) |
| .anyTimes(); |
| EasyMock.replay(serverView); |
| EasyMock.replay(mockRunner); |
| |
| List<SegmentDescriptor> expcetedDescriptors = new ArrayList<>(); |
| // Narrow down to 1 chunk |
| expcetedDescriptors.add(new SegmentDescriptor(interval1, "v", 3)); |
| // Can't filter out any chunks |
| expcetedDescriptors.add(new SegmentDescriptor(interval2, "v", 0)); |
| expcetedDescriptors.add(new SegmentDescriptor(interval2, "v", 1)); |
| expcetedDescriptors.add(new SegmentDescriptor(interval2, "v", 2)); |
| // Narrow down to 1 chunk |
| expcetedDescriptors.add(new SegmentDescriptor(interval3, "v", 2)); |
| |
| MultipleSpecificSegmentSpec expected = new MultipleSpecificSegmentSpec(expcetedDescriptors); |
| |
| runner.run(QueryPlus.wrap(query)).toList(); |
| Assert.assertEquals(expected, ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec()); |
| } |
| |
| @Test |
| public void testHashBasedPruningQueryContextDisabledNoSegmentPruning() |
| { |
| testNoSegmentPruningForHashPartitionedSegments(false, HashPartitionFunction.MURMUR3_32_ABS, false); |
| } |
| |
| @Test |
| public void testHashBasedPruningWithoutPartitionFunctionNoSegmentPruning() |
| { |
| testNoSegmentPruningForHashPartitionedSegments(true, null, false); |
| } |
| |
| @Test |
| public void testHashBasedPruningWithEmptyPartitionDimensionsNoSegmentPruning() |
| { |
| testNoSegmentPruningForHashPartitionedSegments(true, HashPartitionFunction.MURMUR3_32_ABS, true); |
| } |
| |
| private void testNoSegmentPruningForHashPartitionedSegments( |
| boolean enableSegmentPruning, |
| @Nullable HashPartitionFunction partitionFunction, |
| boolean useEmptyPartitionDimensions |
| ) |
| { |
| DimFilter filter = new AndDimFilter( |
| new SelectorDimFilter("dim1", "a", null), |
| new BoundDimFilter("dim2", "e", "zzz", true, true, false, null, StringComparators.LEXICOGRAPHIC), |
| // Equivalent filter of dim3 below is InDimFilter("dim3", Arrays.asList("c"), null) |
| new AndDimFilter( |
| new InDimFilter("dim3", Arrays.asList("a", "c", "e", "g"), null), |
| new BoundDimFilter("dim3", "aaa", "ddd", false, false, false, null, StringComparators.LEXICOGRAPHIC) |
| ) |
| ); |
| |
| final Map<String, Object> context = new HashMap<>(CONTEXT); |
| context.put(QueryContexts.SECONDARY_PARTITION_PRUNING_KEY, enableSegmentPruning); |
| final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .filters(filter) |
| .granularity(GRANULARITY) |
| .intervals(SEG_SPEC) |
| .intervals("2011-01-05/2011-01-10") |
| .aggregators(RENAMED_AGGS) |
| .postAggregators(RENAMED_POST_AGGS) |
| .context(context) |
| .randomQueryId(); |
| |
| TimeseriesQuery query = builder.build(); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); |
| |
| final Interval interval1 = Intervals.of("2011-01-06/2011-01-07"); |
| final Interval interval2 = Intervals.of("2011-01-07/2011-01-08"); |
| final Interval interval3 = Intervals.of("2011-01-08/2011-01-09"); |
| |
| final DruidServer lastServer = servers[random.nextInt(servers.length)]; |
| List<String> partitionDimensions = useEmptyPartitionDimensions ? ImmutableList.of() : ImmutableList.of("dim1"); |
| final int numPartitions1 = 6; |
| for (int i = 0; i < numPartitions1; i++) { |
| ServerSelector selector = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions, |
| partitionFunction, |
| i, |
| numPartitions1 |
| ); |
| timeline.add(interval1, "v", new NumberedPartitionChunk<>(i, numPartitions1, selector)); |
| } |
| |
| partitionDimensions = useEmptyPartitionDimensions ? ImmutableList.of() : ImmutableList.of("dim2"); |
| final int numPartitions2 = 3; |
| for (int i = 0; i < numPartitions2; i++) { |
| ServerSelector selector = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions, |
| partitionFunction, |
| i, |
| numPartitions2 |
| ); |
| timeline.add(interval2, "v", new NumberedPartitionChunk<>(i, numPartitions2, selector)); |
| } |
| |
| partitionDimensions = useEmptyPartitionDimensions ? ImmutableList.of() : ImmutableList.of("dim1", "dim3"); |
| final int numPartitions3 = 4; |
| for (int i = 0; i < numPartitions3; i++) { |
| ServerSelector selector = makeMockHashBasedSelector( |
| lastServer, |
| partitionDimensions, |
| partitionFunction, |
| i, |
| numPartitions3 |
| ); |
| timeline.add(interval3, "v", new NumberedPartitionChunk<>(i, numPartitions3, selector)); |
| } |
| |
| final Capture<QueryPlus> capture = Capture.newInstance(); |
| final Capture<ResponseContext> contextCap = Capture.newInstance(); |
| |
| QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class); |
| EasyMock.expect(mockRunner.run(EasyMock.capture(capture), EasyMock.capture(contextCap))) |
| .andReturn(Sequences.empty()) |
| .anyTimes(); |
| EasyMock.expect(serverView.getQueryRunner(lastServer)) |
| .andReturn(mockRunner) |
| .anyTimes(); |
| EasyMock.replay(serverView); |
| EasyMock.replay(mockRunner); |
| |
| // Expected to read all segments |
| Set<SegmentDescriptor> expcetedDescriptors = new HashSet<>(); |
| IntStream.range(0, numPartitions1).forEach(i -> expcetedDescriptors.add(new SegmentDescriptor(interval1, "v", i))); |
| IntStream.range(0, numPartitions2).forEach(i -> expcetedDescriptors.add(new SegmentDescriptor(interval2, "v", i))); |
| IntStream.range(0, numPartitions3).forEach(i -> expcetedDescriptors.add(new SegmentDescriptor(interval3, "v", i))); |
| |
| runner.run(QueryPlus.wrap(query)).toList(); |
| QuerySegmentSpec querySegmentSpec = ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec(); |
| Assert.assertSame(MultipleSpecificSegmentSpec.class, querySegmentSpec.getClass()); |
| final Set<SegmentDescriptor> actualDescriptors = new HashSet<>( |
| ((MultipleSpecificSegmentSpec) querySegmentSpec).getDescriptors() |
| ); |
| Assert.assertEquals(expcetedDescriptors, actualDescriptors); |
| } |
| |
| private ServerSelector makeMockHashBasedSelector( |
| DruidServer server, |
| List<String> partitionDimensions, |
| @Nullable HashPartitionFunction partitionFunction, |
| int partitionNum, |
| int partitions |
| ) |
| { |
| final DataSegment segment = new DataSegment( |
| SegmentId.dummy(DATA_SOURCE), |
| null, |
| null, |
| null, |
| new HashBasedNumberedShardSpec( |
| partitionNum, |
| partitions, |
| partitionNum, |
| partitions, |
| partitionDimensions, |
| partitionFunction, |
| ServerTestHelper.MAPPER |
| ), |
| null, |
| 9, |
| 0L |
| ); |
| |
| ServerSelector selector = new ServerSelector( |
| segment, |
| new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) |
| ); |
| selector.addServerAndUpdateSegment(new QueryableDruidServer(server, null), segment); |
| return selector; |
| } |
| |
| private ServerSelector makeMockSingleDimensionSelector( |
| DruidServer server, |
| String dimension, |
| String start, |
| String end, |
| int partitionNum |
| ) |
| { |
| final DataSegment segment = new DataSegment( |
| SegmentId.dummy(DATA_SOURCE), |
| null, |
| null, |
| null, |
| new SingleDimensionShardSpec( |
| dimension, |
| start, |
| end, |
| partitionNum, |
| SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS |
| ), |
| null, |
| 9, |
| 0L |
| ); |
| |
| ServerSelector selector = new ServerSelector( |
| segment, |
| new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) |
| ); |
| selector.addServerAndUpdateSegment(new QueryableDruidServer(server, null), segment); |
| return selector; |
| } |
| |
| private Iterable<Result<TimeBoundaryResultValue>> makeTimeBoundaryResult( |
| DateTime timestamp, |
| DateTime minTime, |
| DateTime maxTime |
| ) |
| { |
| final Object value; |
| if (minTime != null && maxTime != null) { |
| value = ImmutableMap.of( |
| TimeBoundaryQuery.MIN_TIME, |
| minTime, |
| TimeBoundaryQuery.MAX_TIME, |
| maxTime |
| ); |
| } else if (maxTime != null) { |
| value = ImmutableMap.of( |
| TimeBoundaryQuery.MAX_TIME, |
| maxTime |
| ); |
| } else { |
| value = ImmutableMap.of( |
| TimeBoundaryQuery.MIN_TIME, |
| minTime |
| ); |
| } |
| |
| return ImmutableList.of( |
| new Result<>( |
| timestamp, |
| new TimeBoundaryResultValue(value) |
| ) |
| ); |
| } |
| |
| public void parseResults( |
| final List<Interval> queryIntervals, |
| final List<List<Iterable<Result<Object>>>> expectedResults, |
| Object... args |
| ) |
| { |
| if (args.length % 2 != 0) { |
| throw new ISE("args.length must be divisible by two, was %d", args.length); |
| } |
| |
| for (int i = 0; i < args.length; i += 2) { |
| final Interval interval = (Interval) args[i]; |
| final Iterable<Result<Object>> results = (Iterable<Result<Object>>) args[i + 1]; |
| |
| if (queryIntervals.size() > 0 && interval.equals(queryIntervals.get(queryIntervals.size() - 1))) { |
| expectedResults.get(expectedResults.size() - 1).add(results); |
| } else { |
| queryIntervals.add(interval); |
| expectedResults.add(Lists.<Iterable<Result<Object>>>newArrayList(results)); |
| } |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void testQueryCachingWithFilter( |
| final QueryRunner runner, |
| final int numTimesToQuery, |
| final Query query, |
| final List<Iterable<Result<TimeseriesResultValue>>> filteredExpected, |
| Object... args // does this assume query intervals must be ordered? |
| ) |
| { |
| final List<Interval> queryIntervals = Lists.newArrayListWithCapacity(args.length / 2); |
| final List<List<Iterable<Result<Object>>>> expectedResults = Lists.newArrayListWithCapacity(queryIntervals.size()); |
| |
| parseResults(queryIntervals, expectedResults, args); |
| |
| for (int i = 0; i < queryIntervals.size(); ++i) { |
| List<Object> mocks = new ArrayList<>(); |
| mocks.add(serverView); |
| |
| final Interval actualQueryInterval = new Interval( |
| queryIntervals.get(0).getStart(), queryIntervals.get(i).getEnd() |
| ); |
| |
| final List<Map<DruidServer, ServerExpectations>> serverExpectationList = populateTimeline( |
| queryIntervals, |
| expectedResults, |
| i, |
| mocks |
| ); |
| |
| final Map<DruidServer, ServerExpectations> finalExpectation = serverExpectationList.get( |
| serverExpectationList.size() - 1 |
| ); |
| for (Map.Entry<DruidServer, ServerExpectations> entry : finalExpectation.entrySet()) { |
| DruidServer server = entry.getKey(); |
| ServerExpectations expectations = entry.getValue(); |
| |
| EasyMock.expect(serverView.getQueryRunner(server)) |
| .andReturn(expectations.getQueryRunner()) |
| .times(0, 1); |
| |
| final Capture<? extends QueryPlus> capture = Capture.newInstance(); |
| final Capture<? extends ResponseContext> context = Capture.newInstance(); |
| QueryRunner queryable = expectations.getQueryRunner(); |
| |
| if (query instanceof TimeseriesQuery) { |
| final List<SegmentId> segmentIds = new ArrayList<>(); |
| final List<Iterable<Result<TimeseriesResultValue>>> results = new ArrayList<>(); |
| for (ServerExpectation expectation : expectations) { |
| segmentIds.add(expectation.getSegmentId()); |
| results.add(expectation.getResults()); |
| } |
| EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context))) |
| .andAnswer(new IAnswer<Sequence>() |
| { |
| @Override |
| public Sequence answer() |
| { |
| return toFilteredQueryableTimeseriesResults( |
| (TimeseriesQuery) capture.getValue().getQuery(), |
| segmentIds, |
| queryIntervals, |
| results |
| ); |
| } |
| }) |
| .times(0, 1); |
| } else { |
| throw new ISE("Unknown query type[%s]", query.getClass()); |
| } |
| } |
| |
| final Iterable<Result<Object>> expected = new ArrayList<>(); |
| for (int intervalNo = 0; intervalNo < i + 1; intervalNo++) { |
| Iterables.addAll((List) expected, filteredExpected.get(intervalNo)); |
| } |
| |
| runWithMocks( |
| new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| for (int i = 0; i < numTimesToQuery; ++i) { |
| TestHelper.assertExpectedResults( |
| expected, |
| runner.run( |
| QueryPlus.wrap( |
| query.withQuerySegmentSpec( |
| new MultipleIntervalSegmentSpec( |
| ImmutableList.of( |
| actualQueryInterval |
| ) |
| ) |
| ) |
| ) |
| ) |
| ); |
| if (queryCompletedCallback != null) { |
| queryCompletedCallback.run(); |
| } |
| } |
| } |
| }, |
| mocks.toArray() |
| ); |
| } |
| } |
| |
| private Sequence<Result<TimeseriesResultValue>> toFilteredQueryableTimeseriesResults( |
| TimeseriesQuery query, |
| List<SegmentId> segmentIds, |
| List<Interval> queryIntervals, |
| List<Iterable<Result<TimeseriesResultValue>>> results |
| ) |
| { |
| MultipleSpecificSegmentSpec spec = (MultipleSpecificSegmentSpec) query.getQuerySegmentSpec(); |
| List<Result<TimeseriesResultValue>> ret = new ArrayList<>(); |
| for (SegmentDescriptor descriptor : spec.getDescriptors()) { |
| SegmentId id = SegmentId.dummy( |
| StringUtils.format("%s_%s", queryIntervals.indexOf(descriptor.getInterval()), descriptor.getPartitionNumber()) |
| ); |
| int index = segmentIds.indexOf(id); |
| if (index != -1) { |
| Result result = new Result( |
| results.get(index).iterator().next().getTimestamp(), |
| new BySegmentResultValueClass( |
| Lists.newArrayList(results.get(index)), |
| id.toString(), |
| descriptor.getInterval() |
| ) |
| ); |
| ret.add(result); |
| } else { |
| throw new ISE("Descriptor %s not found in server", id); |
| } |
| } |
| return Sequences.simple(ret); |
| } |
| |
| public void testQueryCaching(QueryRunner runner, final Query query, Object... args) |
| { |
| testQueryCaching(runner, 3, true, query, args); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void testQueryCaching( |
| final QueryRunner runner, |
| final int numTimesToQuery, |
| boolean expectBySegment, |
| final Query query, |
| Object... args // does this assume query intervals must be ordered? |
| ) |
| { |
| final List<Interval> queryIntervals = Lists.newArrayListWithCapacity(args.length / 2); |
| final List<List<Iterable<Result<Object>>>> expectedResults = Lists.newArrayListWithCapacity(queryIntervals.size()); |
| |
| parseResults(queryIntervals, expectedResults, args); |
| |
| for (int i = 0; i < queryIntervals.size(); ++i) { |
| List<Object> mocks = new ArrayList<>(); |
| mocks.add(serverView); |
| |
| final Interval actualQueryInterval = new Interval( |
| queryIntervals.get(0).getStart(), queryIntervals.get(i).getEnd() |
| ); |
| |
| final List<Map<DruidServer, ServerExpectations>> serverExpectationList = populateTimeline( |
| queryIntervals, |
| expectedResults, |
| i, |
| mocks |
| ); |
| |
| List<Capture> queryCaptures = new ArrayList<>(); |
| final Map<DruidServer, ServerExpectations> finalExpectation = serverExpectationList.get( |
| serverExpectationList.size() - 1 |
| ); |
| for (Map.Entry<DruidServer, ServerExpectations> entry : finalExpectation.entrySet()) { |
| DruidServer server = entry.getKey(); |
| ServerExpectations expectations = entry.getValue(); |
| |
| |
| EasyMock.expect(serverView.getQueryRunner(server)) |
| .andReturn(expectations.getQueryRunner()) |
| .once(); |
| |
| final Capture<? extends QueryPlus> capture = Capture.newInstance(); |
| final Capture<? extends ResponseContext> context = Capture.newInstance(); |
| queryCaptures.add(capture); |
| QueryRunner queryable = expectations.getQueryRunner(); |
| |
| if (query instanceof TimeseriesQuery) { |
| List<SegmentId> segmentIds = new ArrayList<>(); |
| List<Interval> intervals = new ArrayList<>(); |
| List<Iterable<Result<TimeseriesResultValue>>> results = new ArrayList<>(); |
| for (ServerExpectation expectation : expectations) { |
| segmentIds.add(expectation.getSegmentId()); |
| intervals.add(expectation.getInterval()); |
| results.add(expectation.getResults()); |
| } |
| EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context))) |
| .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) |
| .once(); |
| |
| } else if (query instanceof TopNQuery) { |
| List<SegmentId> segmentIds = new ArrayList<>(); |
| List<Interval> intervals = new ArrayList<>(); |
| List<Iterable<Result<TopNResultValue>>> results = new ArrayList<>(); |
| for (ServerExpectation expectation : expectations) { |
| segmentIds.add(expectation.getSegmentId()); |
| intervals.add(expectation.getInterval()); |
| results.add(expectation.getResults()); |
| } |
| EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context))) |
| .andReturn(toQueryableTopNResults(segmentIds, intervals, results)) |
| .once(); |
| } else if (query instanceof SearchQuery) { |
| List<SegmentId> segmentIds = new ArrayList<>(); |
| List<Interval> intervals = new ArrayList<>(); |
| List<Iterable<Result<SearchResultValue>>> results = new ArrayList<>(); |
| for (ServerExpectation expectation : expectations) { |
| segmentIds.add(expectation.getSegmentId()); |
| intervals.add(expectation.getInterval()); |
| results.add(expectation.getResults()); |
| } |
| EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context))) |
| .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) |
| .once(); |
| } else if (query instanceof GroupByQuery) { |
| List<SegmentId> segmentIds = new ArrayList<>(); |
| List<Interval> intervals = new ArrayList<>(); |
| List<Iterable<ResultRow>> results = new ArrayList<>(); |
| for (ServerExpectation expectation : expectations) { |
| segmentIds.add(expectation.getSegmentId()); |
| intervals.add(expectation.getInterval()); |
| results.add(expectation.getResults()); |
| } |
| EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context))) |
| .andReturn(toQueryableGroupByResults((GroupByQuery) query, segmentIds, intervals, results)) |
| .once(); |
| } else if (query instanceof TimeBoundaryQuery) { |
| List<SegmentId> segmentIds = new ArrayList<>(); |
| List<Interval> intervals = new ArrayList<>(); |
| List<Iterable<Result<TimeBoundaryResultValue>>> results = new ArrayList<>(); |
| for (ServerExpectation expectation : expectations) { |
| segmentIds.add(expectation.getSegmentId()); |
| intervals.add(expectation.getInterval()); |
| results.add(expectation.getResults()); |
| } |
| EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context))) |
| .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) |
| .once(); |
| } else { |
| throw new ISE("Unknown query type[%s]", query.getClass()); |
| } |
| } |
| |
| final int expectedResultsRangeStart; |
| final int expectedResultsRangeEnd; |
| if (query instanceof TimeBoundaryQuery) { |
| expectedResultsRangeStart = i; |
| expectedResultsRangeEnd = i + 1; |
| } else { |
| expectedResultsRangeStart = 0; |
| expectedResultsRangeEnd = i + 1; |
| } |
| |
| runWithMocks( |
| new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| for (int i = 0; i < numTimesToQuery; ++i) { |
| TestHelper.assertExpectedResults( |
| new MergeIterable( |
| query instanceof GroupByQuery |
| ? ((GroupByQuery) query).getResultOrdering() |
| : Comparators.naturalNullsFirst(), |
| FunctionalIterable |
| .create(new RangeIterable(expectedResultsRangeStart, expectedResultsRangeEnd)) |
| .transformCat( |
| new Function<Integer, Iterable<Iterable<Result<Object>>>>() |
| { |
| @Override |
| public Iterable<Iterable<Result<Object>>> apply(@Nullable Integer input) |
| { |
| List<Iterable<Result<Object>>> retVal = new ArrayList<>(); |
| |
| final Map<DruidServer, ServerExpectations> exps = serverExpectationList.get(input); |
| for (ServerExpectations expectations : exps.values()) { |
| for (ServerExpectation expectation : expectations) { |
| retVal.add(expectation.getResults()); |
| } |
| } |
| |
| return retVal; |
| } |
| } |
| ) |
| ), |
| runner.run( |
| QueryPlus.wrap( |
| query.withQuerySegmentSpec( |
| new MultipleIntervalSegmentSpec(ImmutableList.of(actualQueryInterval)) |
| ) |
| ), |
| initializeResponseContext() |
| ) |
| ); |
| if (queryCompletedCallback != null) { |
| queryCompletedCallback.run(); |
| } |
| } |
| } |
| }, |
| mocks.toArray() |
| ); |
| |
| // make sure all the queries were sent down as 'bySegment' |
| for (Capture queryCapture : queryCaptures) { |
| QueryPlus capturedQueryPlus = (QueryPlus) queryCapture.getValue(); |
| Query capturedQuery = capturedQueryPlus.getQuery(); |
| if (expectBySegment) { |
| Assert.assertEquals(true, capturedQuery.getContextValue(QueryContexts.BY_SEGMENT_KEY)); |
| } else { |
| Assert.assertTrue( |
| capturedQuery.getContextValue(QueryContexts.BY_SEGMENT_KEY) == null || |
| capturedQuery.getContextValue(QueryContexts.BY_SEGMENT_KEY).equals(false) |
| ); |
| } |
| } |
| } |
| } |
| |
| private List<Map<DruidServer, ServerExpectations>> populateTimeline( |
| List<Interval> queryIntervals, |
| List<List<Iterable<Result<Object>>>> expectedResults, |
| int numQueryIntervals, |
| List<Object> mocks |
| ) |
| { |
| timeline = new VersionedIntervalTimeline<>(Ordering.natural()); |
| |
| final List<Map<DruidServer, ServerExpectations>> serverExpectationList = new ArrayList<>(); |
| |
| for (int k = 0; k < numQueryIntervals + 1; ++k) { |
| final int numChunks = expectedResults.get(k).size(); |
| final TreeMap<DruidServer, ServerExpectations> serverExpectations = new TreeMap<>(); |
| serverExpectationList.add(serverExpectations); |
| for (int j = 0; j < numChunks; ++j) { |
| DruidServer lastServer = servers[random.nextInt(servers.length)]; |
| serverExpectations |
| .computeIfAbsent(lastServer, server -> new ServerExpectations(server, makeMock(mocks, QueryRunner.class))); |
| |
| final ShardSpec shardSpec; |
| if (numChunks == 1) { |
| shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0, 1); |
| } else { |
| String start = null; |
| String end = null; |
| if (j > 0) { |
| start = String.valueOf(j); |
| } |
| if (j + 1 < numChunks) { |
| end = String.valueOf(j + 1); |
| } |
| shardSpec = new SingleDimensionShardSpec("dim" + k, start, end, j, numChunks); |
| } |
| DataSegment mockSegment = makeMock(mocks, DataSegment.class); |
| ServerExpectation<Object> expectation = new ServerExpectation<>( |
| SegmentId.dummy(StringUtils.format("%s_%s", k, j)), // interval/chunk |
| queryIntervals.get(k), |
| mockSegment, |
| shardSpec, |
| expectedResults.get(k).get(j) |
| ); |
| serverExpectations.get(lastServer).addExpectation(expectation); |
| EasyMock.expect(mockSegment.getSize()).andReturn(0L).anyTimes(); |
| EasyMock.replay(mockSegment); |
| ServerSelector selector = new ServerSelector( |
| expectation.getSegment(), |
| new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) |
| ); |
| selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), selector.getSegment()); |
| EasyMock.reset(mockSegment); |
| EasyMock.expect(mockSegment.getShardSpec()) |
| .andReturn(shardSpec) |
| .anyTimes(); |
| timeline.add(queryIntervals.get(k), String.valueOf(k), shardSpec.createChunk(selector)); |
| } |
| } |
| return serverExpectationList; |
| } |
| |
| private Sequence<Result<TimeseriesResultValue>> toQueryableTimeseriesResults( |
| boolean bySegment, |
| Iterable<SegmentId> segmentIds, |
| Iterable<Interval> intervals, |
| Iterable<Iterable<Result<TimeseriesResultValue>>> results |
| ) |
| { |
| if (bySegment) { |
| return Sequences.simple( |
| FunctionalIterable |
| .create(segmentIds) |
| .trinaryTransform( |
| intervals, |
| results, |
| new TrinaryFn<SegmentId, Interval, Iterable<Result<TimeseriesResultValue>>, Result<TimeseriesResultValue>>() |
| { |
| @Override |
| @SuppressWarnings("unchecked") |
| public Result<TimeseriesResultValue> apply( |
| final SegmentId segmentId, |
| final Interval interval, |
| final Iterable<Result<TimeseriesResultValue>> results |
| ) |
| { |
| return new Result( |
| results.iterator().next().getTimestamp(), |
| new BySegmentResultValueClass( |
| Lists.newArrayList(results), |
| segmentId.toString(), |
| interval |
| ) |
| ); |
| } |
| } |
| ) |
| ); |
| } else { |
| return Sequences.simple(Iterables.concat(results)); |
| } |
| } |
| |
| private Sequence<Result<TopNResultValue>> toQueryableTopNResults( |
| Iterable<SegmentId> segmentIds, |
| Iterable<Interval> intervals, |
| Iterable<Iterable<Result<TopNResultValue>>> results |
| ) |
| { |
| return Sequences.simple( |
| FunctionalIterable |
| .create(segmentIds) |
| .trinaryTransform( |
| intervals, |
| results, |
| new TrinaryFn<SegmentId, Interval, Iterable<Result<TopNResultValue>>, Result<TopNResultValue>>() |
| { |
| @Override |
| @SuppressWarnings("unchecked") |
| public Result<TopNResultValue> apply( |
| final SegmentId segmentId, |
| final Interval interval, |
| final Iterable<Result<TopNResultValue>> results |
| ) |
| { |
| return new Result( |
| interval.getStart(), |
| new BySegmentResultValueClass( |
| Lists.newArrayList(results), |
| segmentId.toString(), |
| interval |
| ) |
| ); |
| } |
| } |
| ) |
| ); |
| } |
| |
| private Sequence<Result<SearchResultValue>> toQueryableSearchResults( |
| Iterable<SegmentId> segmentIds, |
| Iterable<Interval> intervals, |
| Iterable<Iterable<Result<SearchResultValue>>> results |
| ) |
| { |
| return Sequences.simple( |
| FunctionalIterable |
| .create(segmentIds) |
| .trinaryTransform( |
| intervals, |
| results, |
| new TrinaryFn<SegmentId, Interval, Iterable<Result<SearchResultValue>>, Result<SearchResultValue>>() |
| { |
| @Override |
| @SuppressWarnings("unchecked") |
| public Result<SearchResultValue> apply( |
| final SegmentId segmentId, |
| final Interval interval, |
| final Iterable<Result<SearchResultValue>> results |
| ) |
| { |
| return new Result( |
| results.iterator().next().getTimestamp(), |
| new BySegmentResultValueClass( |
| Lists.newArrayList(results), |
| segmentId.toString(), |
| interval |
| ) |
| ); |
| } |
| } |
| ) |
| ); |
| } |
| |
| private Sequence<Result> toQueryableGroupByResults( |
| GroupByQuery query, |
| Iterable<SegmentId> segmentIds, |
| Iterable<Interval> intervals, |
| Iterable<Iterable<ResultRow>> results |
| ) |
| { |
| return Sequences.simple( |
| FunctionalIterable |
| .create(segmentIds) |
| .trinaryTransform( |
| intervals, |
| results, |
| new TrinaryFn<SegmentId, Interval, Iterable<ResultRow>, Result>() |
| { |
| @Override |
| @SuppressWarnings("unchecked") |
| public Result apply( |
| final SegmentId segmentId, |
| final Interval interval, |
| final Iterable<ResultRow> results |
| ) |
| { |
| final DateTime timestamp; |
| |
| if (query.getUniversalTimestamp() != null) { |
| timestamp = query.getUniversalTimestamp(); |
| } else { |
| timestamp = query.getGranularity().toDateTime(results.iterator().next().getLong(0)); |
| } |
| |
| return new Result( |
| timestamp, |
| new BySegmentResultValueClass( |
| Lists.newArrayList(results), |
| segmentId.toString(), |
| interval |
| ) |
| ); |
| } |
| } |
| ) |
| ); |
| } |
| |
| private Sequence<Result<TimeBoundaryResultValue>> toQueryableTimeBoundaryResults( |
| Iterable<SegmentId> segmentIds, |
| Iterable<Interval> intervals, |
| Iterable<Iterable<Result<TimeBoundaryResultValue>>> results |
| ) |
| { |
| return Sequences.simple( |
| FunctionalIterable |
| .create(segmentIds) |
| .trinaryTransform( |
| intervals, |
| results, |
| new TrinaryFn<SegmentId, Interval, Iterable<Result<TimeBoundaryResultValue>>, Result<TimeBoundaryResultValue>>() |
| { |
| @Override |
| @SuppressWarnings("unchecked") |
| public Result<TimeBoundaryResultValue> apply( |
| final SegmentId segmentId, |
| final Interval interval, |
| final Iterable<Result<TimeBoundaryResultValue>> results |
| ) |
| { |
| return new Result( |
| results.iterator().next().getTimestamp(), |
| new BySegmentResultValueClass( |
| Lists.newArrayList(results), |
| segmentId.toString(), |
| interval |
| ) |
| ); |
| } |
| } |
| ) |
| ); |
| } |
| |
| private Iterable<Result<TimeseriesResultValue>> makeTimeResults(Object... objects) |
| { |
| if (objects.length % 3 != 0) { |
| throw new ISE("makeTimeResults must be passed arguments in groups of 3, got[%d]", objects.length); |
| } |
| |
| List<Result<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 3); |
| for (int i = 0; i < objects.length; i += 3) { |
| double avg_impr = ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue(); |
| retVal.add( |
| new Result<>( |
| (DateTime) objects[i], |
| new TimeseriesResultValue( |
| ImmutableMap.<String, Object>builder() |
| .put("rows", objects[i + 1]) |
| .put("imps", objects[i + 2]) |
| .put("impers", objects[i + 2]) |
| .put("avg_imps_per_row", avg_impr) |
| .put("avg_imps_per_row_half", avg_impr / 2) |
| .put("avg_imps_per_row_double", avg_impr * 2) |
| .build() |
| ) |
| ) |
| ); |
| } |
| return retVal; |
| } |
| |
| private Iterable<Result<TimeseriesResultValue>> makeRenamedTimeResults(Object... objects) |
| { |
| if (objects.length % 3 != 0) { |
| throw new ISE("makeTimeResults must be passed arguments in groups of 3, got[%d]", objects.length); |
| } |
| |
| List<Result<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 3); |
| for (int i = 0; i < objects.length; i += 3) { |
| retVal.add( |
| new Result<>( |
| (DateTime) objects[i], |
| new TimeseriesResultValue( |
| ImmutableMap.of( |
| "rows", objects[i + 1], |
| "imps", objects[i + 2], |
| "impers2", objects[i + 2] |
| ) |
| ) |
| ) |
| ); |
| } |
| return retVal; |
| } |
| |
| private Iterable<Result<TopNResultValue>> makeTopNResultsWithoutRename(Object... objects) |
| { |
| return makeTopNResults( |
| Lists.newArrayList( |
| TOP_DIM, |
| "rows", |
| "imps", |
| "impers", |
| "avg_imps_per_row", |
| "avg_imps_per_row_double", |
| "avg_imps_per_row_half" |
| ), |
| objects |
| ); |
| } |
| |
| private Iterable<Result<TopNResultValue>> makeTopNResults(List<String> names, Object... objects) |
| { |
| Preconditions.checkArgument(names.size() == 7); |
| List<Result<TopNResultValue>> retVal = new ArrayList<>(); |
| int index = 0; |
| while (index < objects.length) { |
| DateTime timestamp = (DateTime) objects[index++]; |
| |
| List<Map<String, Object>> values = new ArrayList<>(); |
| while (index < objects.length && !(objects[index] instanceof DateTime)) { |
| if (objects.length - index < 3) { |
| throw new ISE( |
| "expect 3 values for each entry in the top list, had %d values left.", objects.length - index |
| ); |
| } |
| final double imps = ((Number) objects[index + 2]).doubleValue(); |
| final double rows = ((Number) objects[index + 1]).doubleValue(); |
| values.add( |
| ImmutableMap.<String, Object>builder() |
| .put(names.get(0), objects[index]) |
| .put(names.get(1), rows) |
| .put(names.get(2), imps) |
| .put(names.get(3), imps) |
| .put(names.get(4), imps / rows) |
| .put(names.get(5), ((imps * 2) / rows)) |
| .put(names.get(6), (imps / (rows * 2))) |
| .build() |
| ); |
| index += 3; |
| } |
| |
| retVal.add(new Result<>(timestamp, new TopNResultValue(values))); |
| } |
| return retVal; |
| } |
| |
| private Iterable<Result<TopNResultValue>> makeRenamedTopNResults(Object... objects) |
| { |
| return makeTopNResults( |
| Lists.newArrayList( |
| TOP_DIM, |
| "rows", |
| "imps", |
| "impers2", |
| "avg_imps_per_row", |
| "avg_imps_per_row_double", |
| "avg_imps_per_row_half" |
| ), |
| objects |
| ); |
| } |
| |
| private Iterable<Result<SearchResultValue>> makeSearchResults(String dim, Object... objects) |
| { |
| List<Result<SearchResultValue>> retVal = new ArrayList<>(); |
| int index = 0; |
| while (index < objects.length) { |
| DateTime timestamp = (DateTime) objects[index++]; |
| |
| List<SearchHit> values = new ArrayList<>(); |
| while (index < objects.length && !(objects[index] instanceof DateTime)) { |
| values.add(new SearchHit(dim, objects[index++].toString(), (Integer) objects[index++])); |
| } |
| |
| retVal.add(new Result<>(timestamp, new SearchResultValue(values))); |
| } |
| return retVal; |
| } |
| |
| private Iterable<ResultRow> makeGroupByResults(GroupByQuery query, Object... objects) |
| { |
| List<ResultRow> retVal = new ArrayList<>(); |
| int index = 0; |
| while (index < objects.length) { |
| final DateTime timestamp = (DateTime) objects[index++]; |
| final Map<String, Object> rowMap = (Map<String, Object>) objects[index++]; |
| final ResultRow row = ResultRow.create(query.getResultRowSizeWithoutPostAggregators()); |
| |
| if (query.getResultRowHasTimestamp()) { |
| row.set(0, timestamp.getMillis()); |
| } |
| |
| for (Map.Entry<String, Object> entry : rowMap.entrySet()) { |
| final int position = query.getResultRowSignature().indexOf(entry.getKey()); |
| row.set(position, entry.getValue()); |
| } |
| |
| retVal.add(row); |
| } |
| return retVal; |
| } |
| |
| private <T> T makeMock(List<Object> mocks, Class<T> clazz) |
| { |
| T obj = EasyMock.createMock(clazz); |
| mocks.add(obj); |
| return obj; |
| } |
| |
| private void runWithMocks(Runnable toRun, Object... mocks) |
| { |
| EasyMock.replay(mocks); |
| |
| toRun.run(); |
| |
| EasyMock.verify(mocks); |
| EasyMock.reset(mocks); |
| } |
| |
| protected CachingClusteredClient makeClient(final CachePopulator cachePopulator) |
| { |
| return makeClient(cachePopulator, cache, 10); |
| } |
| |
| protected CachingClusteredClient makeClient( |
| final CachePopulator cachePopulator, |
| final Cache cache, |
| final int mergeLimit |
| ) |
| { |
| return new CachingClusteredClient( |
| WAREHOUSE, |
| new TimelineServerView() |
| { |
| @Override |
| public void registerSegmentCallback(Executor exec, SegmentCallback callback) |
| { |
| } |
| |
| @Override |
| public Optional<VersionedIntervalTimeline<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis) |
| { |
| return Optional.of(timeline); |
| } |
| |
| @Override |
| public List<ImmutableDruidServer> getDruidServers() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public <T> QueryRunner<T> getQueryRunner(DruidServer server) |
| { |
| return serverView.getQueryRunner(server); |
| } |
| |
| @Override |
| public void registerTimelineCallback(final Executor exec, final TimelineCallback callback) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) |
| { |
| |
| } |
| }, |
| cache, |
| JSON_MAPPER, |
| cachePopulator, |
| new CacheConfig() |
| { |
| @Override |
| public boolean isPopulateCache() |
| { |
| return true; |
| } |
| |
| @Override |
| public boolean isUseCache() |
| { |
| return true; |
| } |
| |
| @Override |
| public boolean isQueryCacheable(Query query) |
| { |
| return true; |
| } |
| |
| @Override |
| public int getCacheBulkMergeLimit() |
| { |
| return mergeLimit; |
| } |
| }, |
| new DruidHttpClientConfig() |
| { |
| @Override |
| public long getMaxQueuedBytes() |
| { |
| return 0L; |
| } |
| }, |
| new DruidProcessingConfig() |
| { |
| @Override |
| public String getFormatString() |
| { |
| return null; |
| } |
| |
| @Override |
| public int getMergePoolParallelism() |
| { |
| // fixed so same behavior across all test environments |
| return 4; |
| } |
| }, |
| ForkJoinPool.commonPool(), |
| new QueryScheduler( |
| 0, |
| ManualQueryPrioritizationStrategy.INSTANCE, |
| NoQueryLaningStrategy.INSTANCE, |
| new ServerConfig() |
| ), |
| new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) |
| ); |
| } |
| |
| private static class ServerExpectation<T> |
| { |
| private final SegmentId segmentId; |
| private final Interval interval; |
| private final DataSegment segment; |
| private final ShardSpec shardSpec; |
| private final Iterable<Result<T>> results; |
| |
| public ServerExpectation( |
| SegmentId segmentId, |
| Interval interval, |
| DataSegment segment, |
| ShardSpec shardSpec, |
| Iterable<Result<T>> results |
| ) |
| { |
| this.segmentId = segmentId; |
| this.interval = interval; |
| this.segment = segment; |
| this.shardSpec = shardSpec; |
| this.results = results; |
| } |
| |
| public SegmentId getSegmentId() |
| { |
| return segmentId; |
| } |
| |
| public Interval getInterval() |
| { |
| return interval; |
| } |
| |
| public DataSegment getSegment() |
| { |
| return new MyDataSegment(); |
| } |
| |
| public Iterable<Result<T>> getResults() |
| { |
| return results; |
| } |
| |
| private class MyDataSegment extends DataSegment |
| { |
| private final DataSegment baseSegment = segment; |
| |
| private MyDataSegment() |
| { |
| super( |
| "", |
| Intervals.utc(0, 1), |
| "", |
| null, |
| null, |
| null, |
| NoneShardSpec.instance(), |
| null, |
| 0 |
| ); |
| } |
| |
| @Override |
| @JsonProperty |
| public String getDataSource() |
| { |
| return baseSegment.getDataSource(); |
| } |
| |
| @Override |
| @JsonProperty |
| public Interval getInterval() |
| { |
| return baseSegment.getInterval(); |
| } |
| |
| @Override |
| @JsonProperty |
| public Map<String, Object> getLoadSpec() |
| { |
| return baseSegment.getLoadSpec(); |
| } |
| |
| @Override |
| @JsonProperty |
| public String getVersion() |
| { |
| return "version"; |
| } |
| |
| @Override |
| @JsonSerialize |
| @JsonProperty |
| public List<String> getDimensions() |
| { |
| return baseSegment.getDimensions(); |
| } |
| |
| @Override |
| @JsonSerialize |
| @JsonProperty |
| public List<String> getMetrics() |
| { |
| return baseSegment.getMetrics(); |
| } |
| |
| @Override |
| @JsonProperty |
| public ShardSpec getShardSpec() |
| { |
| try { |
| return baseSegment.getShardSpec(); |
| } |
| catch (IllegalStateException e) { |
| return NoneShardSpec.instance(); |
| } |
| } |
| |
| @Override |
| @JsonProperty |
| public long getSize() |
| { |
| return baseSegment.getSize(); |
| } |
| |
| @Override |
| public SegmentId getId() |
| { |
| return segmentId; |
| } |
| |
| @Override |
| public SegmentDescriptor toDescriptor() |
| { |
| return baseSegment.toDescriptor(); |
| } |
| |
| @Override |
| public int compareTo(DataSegment dataSegment) |
| { |
| return baseSegment.compareTo(dataSegment); |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (!(o instanceof DataSegment)) { |
| return false; |
| } |
| return baseSegment.equals(o); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return baseSegment.hashCode(); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return baseSegment.toString(); |
| } |
| |
| @Override |
| public int getStartRootPartitionId() |
| { |
| return shardSpec.getStartRootPartitionId(); |
| } |
| |
| @Override |
| public int getEndRootPartitionId() |
| { |
| return shardSpec.getEndRootPartitionId(); |
| } |
| |
| @Override |
| public short getMinorVersion() |
| { |
| return shardSpec.getMinorVersion(); |
| } |
| |
| @Override |
| public short getAtomicUpdateGroupSize() |
| { |
| return shardSpec.getAtomicUpdateGroupSize(); |
| } |
| |
| @Override |
| public boolean overshadows(DataSegment other) |
| { |
| if (getDataSource().equals(other.getDataSource()) |
| && getInterval().overlaps(other.getInterval()) |
| && getVersion().equals(other.getVersion())) { |
| return getStartRootPartitionId() <= other.getStartRootPartitionId() |
| && getEndRootPartitionId() >= other.getEndRootPartitionId() |
| && getMinorVersion() > other.getMinorVersion(); |
| } |
| return false; |
| } |
| } |
| } |
| |
| private static class ServerExpectations implements Iterable<ServerExpectation> |
| { |
| private final DruidServer server; |
| private final QueryRunner queryRunner; |
| private final List<ServerExpectation> expectations = new ArrayList<>(); |
| |
| public ServerExpectations( |
| DruidServer server, |
| QueryRunner queryRunner |
| ) |
| { |
| this.server = server; |
| this.queryRunner = queryRunner; |
| } |
| |
| public QueryRunner getQueryRunner() |
| { |
| return queryRunner; |
| } |
| |
| public void addExpectation( |
| ServerExpectation expectation |
| ) |
| { |
| expectations.add(expectation); |
| } |
| |
| @Override |
| public Iterator<ServerExpectation> iterator() |
| { |
| return expectations.iterator(); |
| } |
| } |
| |
| @Test |
| public void testTimeBoundaryCachingWhenTimeIsInteger() |
| { |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| Druids.newTimeBoundaryQueryBuilder() |
| .dataSource(CachingClusteredClientTest.DATA_SOURCE) |
| .intervals(CachingClusteredClientTest.SEG_SPEC) |
| .context(CachingClusteredClientTest.CONTEXT) |
| .randomQueryId() |
| .build(), |
| Intervals.of("1970-01-01/1970-01-02"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-01"), DateTimes.of("1970-01-01"), DateTimes.of("1970-01-02")), |
| |
| Intervals.of("1970-01-01/2011-01-03"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-02"), DateTimes.of("1970-01-02"), DateTimes.of("1970-01-03")), |
| |
| Intervals.of("1970-01-01/2011-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-05"), DateTimes.of("1970-01-05"), DateTimes.of("1970-01-10")), |
| |
| Intervals.of("1970-01-01/2011-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-05T01"), DateTimes.of("1970-01-05T01"), DateTimes.of("1970-01-10")) |
| ); |
| |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| Druids.newTimeBoundaryQueryBuilder() |
| .dataSource(CachingClusteredClientTest.DATA_SOURCE) |
| .intervals(CachingClusteredClientTest.SEG_SPEC) |
| .context(CachingClusteredClientTest.CONTEXT) |
| .bound(TimeBoundaryQuery.MAX_TIME) |
| .randomQueryId() |
| .build(), |
| Intervals.of("1970-01-01/2011-01-02"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-02"), null, DateTimes.of("1970-01-02")), |
| |
| Intervals.of("1970-01-01/2011-01-03"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-03"), null, DateTimes.of("1970-01-03")), |
| |
| Intervals.of("1970-01-01/2011-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-10"), null, DateTimes.of("1970-01-10")) |
| ); |
| |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| Druids.newTimeBoundaryQueryBuilder() |
| .dataSource(CachingClusteredClientTest.DATA_SOURCE) |
| .intervals(CachingClusteredClientTest.SEG_SPEC) |
| .context(CachingClusteredClientTest.CONTEXT) |
| .bound(TimeBoundaryQuery.MIN_TIME) |
| .randomQueryId() |
| .build(), |
| Intervals.of("1970-01-01/2011-01-02"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-01"), DateTimes.of("1970-01-01"), null), |
| |
| Intervals.of("1970-01-01/2011-01-03"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-02"), DateTimes.of("1970-01-02"), null), |
| |
| Intervals.of("1970-01-01/1970-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-05"), DateTimes.of("1970-01-05"), null), |
| |
| Intervals.of("1970-01-01/2011-01-10"), |
| makeTimeBoundaryResult(DateTimes.of("1970-01-05T01"), DateTimes.of("1970-01-05T01"), null) |
| ); |
| } |
| |
| @Test |
| public void testGroupByCachingRenamedAggs() |
| { |
| GroupByQuery.Builder builder = new GroupByQuery.Builder() |
| .setDataSource(DATA_SOURCE) |
| .setQuerySegmentSpec(SEG_SPEC) |
| .setDimFilter(DIM_FILTER) |
| .setGranularity(GRANULARITY).setDimensions(new DefaultDimensionSpec("a", "output")) |
| .setAggregatorSpecs(AGGS) |
| .setContext(CONTEXT); |
| |
| final GroupByQuery query1 = builder.randomQueryId().build(); |
| testQueryCaching( |
| getDefaultQueryRunner(), |
| query1, |
| Intervals.of("2011-01-01/2011-01-02"), |
| makeGroupByResults( |
| query1, |
| DateTimes.of("2011-01-01"), |
| ImmutableMap.of("output", "a", "rows", 1, "imps", 1, "impers", 1) |
| ), |
| |
| Intervals.of("2011-01-02/2011-01-03"), |
| makeGroupByResults( |
| query1, |
| DateTimes.of("2011-01-02"), |
| ImmutableMap.of("output", "b", "rows", 2, "imps", 2, "impers", 2) |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeGroupByResults( |
| query1, |
| DateTimes.of("2011-01-05"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), |
| DateTimes.of("2011-01-06"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), |
| DateTimes.of("2011-01-07"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), |
| DateTimes.of("2011-01-08"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), |
| DateTimes.of("2011-01-09"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) |
| ), |
| |
| Intervals.of("2011-01-05/2011-01-10"), |
| makeGroupByResults( |
| query1, |
| DateTimes.of("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), |
| DateTimes.of("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), |
| DateTimes.of("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), |
| DateTimes.of("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), |
| DateTimes.of("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) |
| ) |
| ); |
| |
| QueryRunner runner = new FinalizeResultsQueryRunner( |
| getDefaultQueryRunner(), |
| WAREHOUSE.getToolChest(query1) |
| ); |
| final ResponseContext context = initializeResponseContext(); |
| TestHelper.assertExpectedObjects( |
| makeGroupByResults( |
| query1, |
| DateTimes.of("2011-01-05T"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), |
| DateTimes.of("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), |
| DateTimes.of("2011-01-06T"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), |
| DateTimes.of("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), |
| DateTimes.of("2011-01-07T"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), |
| DateTimes.of("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), |
| DateTimes.of("2011-01-08T"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), |
| DateTimes.of("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), |
| DateTimes.of("2011-01-09T"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7), |
| DateTimes.of("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) |
| ), |
| runner.run(QueryPlus.wrap(builder.randomQueryId().setInterval("2011-01-05/2011-01-10").build()), context), |
| "" |
| ); |
| |
| final GroupByQuery query2 = builder |
| .setInterval("2011-01-05/2011-01-10").setDimensions(new DefaultDimensionSpec("a", "output2")) |
| .setAggregatorSpecs(RENAMED_AGGS) |
| .randomQueryId() |
| .build(); |
| TestHelper.assertExpectedObjects( |
| makeGroupByResults( |
| query2, |
| DateTimes.of("2011-01-05T"), ImmutableMap.of("output2", "c", "rows", 3, "imps", 3, "impers2", 3), |
| DateTimes.of("2011-01-05T01"), ImmutableMap.of("output2", "c", "rows", 3, "imps", 3, "impers2", 3), |
| DateTimes.of("2011-01-06T"), ImmutableMap.of("output2", "d", "rows", 4, "imps", 4, "impers2", 4), |
| DateTimes.of("2011-01-06T01"), ImmutableMap.of("output2", "d", "rows", 4, "imps", 4, "impers2", 4), |
| DateTimes.of("2011-01-07T"), ImmutableMap.of("output2", "e", "rows", 5, "imps", 5, "impers2", 5), |
| DateTimes.of("2011-01-07T01"), ImmutableMap.of("output2", "e", "rows", 5, "imps", 5, "impers2", 5), |
| DateTimes.of("2011-01-08T"), ImmutableMap.of("output2", "f", "rows", 6, "imps", 6, "impers2", 6), |
| DateTimes.of("2011-01-08T01"), ImmutableMap.of("output2", "f", "rows", 6, "imps", 6, "impers2", 6), |
| DateTimes.of("2011-01-09T"), ImmutableMap.of("output2", "g", "rows", 7, "imps", 7, "impers2", 7), |
| DateTimes.of("2011-01-09T01"), ImmutableMap.of("output2", "g", "rows", 7, "imps", 7, "impers2", 7) |
| ), |
| runner.run(QueryPlus.wrap(query2), context), |
| "renamed aggregators test" |
| ); |
| } |
| |
| @Test |
| public void testIfNoneMatch() |
| { |
| Interval interval = Intervals.of("2016/2017"); |
| final DataSegment dataSegment = new DataSegment( |
| "dataSource", |
| interval, |
| "ver", |
| ImmutableMap.of( |
| "type", "hdfs", |
| "path", "/tmp" |
| ), |
| ImmutableList.of("product"), |
| ImmutableList.of("visited_sum"), |
| NoneShardSpec.instance(), |
| 9, |
| 12334 |
| ); |
| final ServerSelector selector = new ServerSelector( |
| dataSegment, |
| new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) |
| ); |
| selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment); |
| timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector)); |
| |
| TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval))) |
| .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) |
| .randomQueryId() |
| .build(); |
| |
| |
| final ResponseContext responseContext = initializeResponseContext(); |
| |
| getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); |
| Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get(ResponseContext.Key.ETAG)); |
| } |
| |
| @Test |
| public void testEtagforDifferentQueryInterval() |
| { |
| final Interval interval = Intervals.of("2016-01-01/2016-01-02"); |
| final Interval queryInterval = Intervals.of("2016-01-01T14:00:00/2016-01-02T14:00:00"); |
| final Interval queryInterval2 = Intervals.of("2016-01-01T18:00:00/2016-01-02T18:00:00"); |
| final DataSegment dataSegment = new DataSegment( |
| "dataSource", |
| interval, |
| "ver", |
| ImmutableMap.of( |
| "type", "hdfs", |
| "path", "/tmp" |
| ), |
| ImmutableList.of("product"), |
| ImmutableList.of("visited_sum"), |
| NoneShardSpec.instance(), |
| 9, |
| 12334 |
| ); |
| final ServerSelector selector = new ServerSelector( |
| dataSegment, |
| new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) |
| ); |
| selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment); |
| timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector)); |
| |
| final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval))) |
| .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) |
| .randomQueryId() |
| .build(); |
| |
| final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder() |
| .dataSource(DATA_SOURCE) |
| .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval2))) |
| .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) |
| .randomQueryId() |
| .build(); |
| |
| |
| final ResponseContext responseContext = initializeResponseContext(); |
| |
| getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); |
| final Object etag1 = responseContext.get(ResponseContext.Key.ETAG); |
| getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext); |
| final Object etag2 = responseContext.get(ResponseContext.Key.ETAG); |
| Assert.assertNotEquals(etag1, etag2); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private QueryRunner getDefaultQueryRunner() |
| { |
| return new QueryRunner() |
| { |
| @Override |
| public Sequence run(final QueryPlus queryPlus, final ResponseContext responseContext) |
| { |
| return client.getQueryRunnerForIntervals(queryPlus.getQuery(), queryPlus.getQuery().getIntervals()) |
| .run(queryPlus, responseContext); |
| } |
| }; |
| } |
| |
| private static ResponseContext initializeResponseContext() |
| { |
| final ResponseContext context = ResponseContext.createEmpty(); |
| context.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); |
| return context; |
| } |
| } |