| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import org.apache.druid.client.cache.CacheConfig; |
| import org.apache.druid.collections.CloseableStupidPool; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.io.Closer; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.query.DataSource; |
| import org.apache.druid.query.DefaultGenericQueryMetricsFactory; |
| import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; |
| import org.apache.druid.query.DruidProcessingConfig; |
| import org.apache.druid.query.InlineDataSource; |
| import org.apache.druid.query.LookupDataSource; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryRunnerFactory; |
| import org.apache.druid.query.QueryRunnerFactoryConglomerate; |
| import org.apache.druid.query.QueryRunnerTestHelper; |
| import org.apache.druid.query.QuerySegmentWalker; |
| import org.apache.druid.query.QueryToolChest; |
| import org.apache.druid.query.QueryToolChestWarehouse; |
| import org.apache.druid.query.RetryQueryRunnerConfig; |
| import org.apache.druid.query.groupby.GroupByQuery; |
| import org.apache.druid.query.groupby.GroupByQueryConfig; |
| import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; |
| import org.apache.druid.query.groupby.GroupByQueryRunnerTest; |
| import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; |
| import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; |
| import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; |
| import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest; |
| import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory; |
| import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; |
| import org.apache.druid.query.scan.ScanQuery; |
| import org.apache.druid.query.scan.ScanQueryConfig; |
| import org.apache.druid.query.scan.ScanQueryEngine; |
| import org.apache.druid.query.scan.ScanQueryQueryToolChest; |
| import org.apache.druid.query.scan.ScanQueryRunnerFactory; |
| import org.apache.druid.query.timeseries.TimeseriesQuery; |
| import org.apache.druid.query.timeseries.TimeseriesQueryEngine; |
| import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; |
| import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; |
| import org.apache.druid.query.topn.TopNQuery; |
| import org.apache.druid.query.topn.TopNQueryConfig; |
| import org.apache.druid.query.topn.TopNQueryQueryToolChest; |
| import org.apache.druid.query.topn.TopNQueryRunnerFactory; |
| import org.apache.druid.segment.ReferenceCountingSegment; |
| import org.apache.druid.segment.SegmentWrangler; |
| import org.apache.druid.segment.TestHelper; |
| import org.apache.druid.segment.join.InlineJoinableFactory; |
| import org.apache.druid.segment.join.JoinableFactory; |
| import org.apache.druid.segment.join.LookupJoinableFactory; |
| import org.apache.druid.segment.join.MapJoinableFactory; |
| import org.apache.druid.server.initialization.ServerConfig; |
| import org.apache.druid.server.metrics.NoopServiceEmitter; |
| import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; |
| import org.apache.druid.server.scheduling.NoQueryLaningStrategy; |
| import org.apache.druid.timeline.VersionedIntervalTimeline; |
| |
| import javax.annotation.Nullable; |
| import java.nio.ByteBuffer; |
| import java.util.Map; |
| import java.util.Set; |
| |
| /** |
| * Utilities for creating query-stack objects for tests. |
| */ |
| public class QueryStackTests |
| { |
| public static final QueryScheduler DEFAULT_NOOP_SCHEDULER = new QueryScheduler( |
| 0, |
| ManualQueryPrioritizationStrategy.INSTANCE, |
| NoQueryLaningStrategy.INSTANCE, |
| new ServerConfig() |
| ); |
| private static final ServiceEmitter EMITTER = new NoopServiceEmitter(); |
| private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024; |
| |
| private QueryStackTests() |
| { |
| // No instantiation. |
| } |
| |
| public static ClientQuerySegmentWalker createClientQuerySegmentWalker( |
| final QuerySegmentWalker clusterWalker, |
| final QuerySegmentWalker localWalker, |
| final QueryRunnerFactoryConglomerate conglomerate, |
| final JoinableFactory joinableFactory, |
| final ServerConfig serverConfig |
| ) |
| { |
| return new ClientQuerySegmentWalker( |
| EMITTER, |
| clusterWalker, |
| localWalker, |
| new QueryToolChestWarehouse() |
| { |
| @Override |
| public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query) |
| { |
| return conglomerate.findFactory(query).getToolchest(); |
| } |
| }, |
| joinableFactory, |
| new RetryQueryRunnerConfig(), |
| TestHelper.makeJsonMapper(), |
| serverConfig, |
| null /* Cache */, |
| new CacheConfig() |
| { |
| @Override |
| public boolean isPopulateCache() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean isUseCache() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean isPopulateResultLevelCache() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean isUseResultLevelCache() |
| { |
| return false; |
| } |
| } |
| ); |
| } |
| |
| public static TestClusterQuerySegmentWalker createClusterQuerySegmentWalker( |
| Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines, |
| JoinableFactory joinableFactory, |
| QueryRunnerFactoryConglomerate conglomerate, |
| @Nullable QueryScheduler scheduler |
| ) |
| { |
| return new TestClusterQuerySegmentWalker(timelines, joinableFactory, conglomerate, scheduler); |
| } |
| |
| public static LocalQuerySegmentWalker createLocalQuerySegmentWalker( |
| final QueryRunnerFactoryConglomerate conglomerate, |
| final SegmentWrangler segmentWrangler, |
| final JoinableFactory joinableFactory, |
| final QueryScheduler scheduler |
| ) |
| { |
| return new LocalQuerySegmentWalker( |
| conglomerate, |
| segmentWrangler, |
| joinableFactory, |
| scheduler, |
| EMITTER |
| ); |
| } |
| |
| public static DruidProcessingConfig getProcessingConfig( |
| boolean useParallelMergePoolConfigured, |
| final int mergeBuffers |
| ) |
| { |
| return new DruidProcessingConfig() |
| { |
| @Override |
| public String getFormatString() |
| { |
| return null; |
| } |
| |
| @Override |
| public int intermediateComputeSizeBytes() |
| { |
| return COMPUTE_BUFFER_SIZE; |
| } |
| |
| @Override |
| public int getNumThreads() |
| { |
| // Only use 1 thread for tests. |
| return 1; |
| } |
| |
| @Override |
| public int getNumMergeBuffers() |
| { |
| if (mergeBuffers == DEFAULT_NUM_MERGE_BUFFERS) { |
| return 2; |
| } |
| return mergeBuffers; |
| } |
| |
| @Override |
| public boolean useParallelMergePoolConfigured() |
| { |
| return useParallelMergePoolConfigured; |
| } |
| }; |
| } |
| |
| /** |
| * Returns a new {@link QueryRunnerFactoryConglomerate}. Adds relevant closeables to the passed-in {@link Closer}. |
| */ |
| public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(final Closer closer) |
| { |
| return createQueryRunnerFactoryConglomerate(closer, true); |
| } |
| |
| public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( |
| final Closer closer, |
| final boolean useParallelMergePoolConfigured |
| |
| ) |
| { |
| return createQueryRunnerFactoryConglomerate(closer, |
| getProcessingConfig( |
| useParallelMergePoolConfigured, |
| DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS |
| ) |
| ); |
| } |
| |
| public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( |
| final Closer closer, |
| final DruidProcessingConfig processingConfig |
| ) |
| { |
| final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>( |
| "TopNQueryRunnerFactory-bufferPool", |
| () -> ByteBuffer.allocate(COMPUTE_BUFFER_SIZE) |
| ); |
| |
| closer.register(stupidPool); |
| |
| final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair = |
| GroupByQueryRunnerTest.makeQueryRunnerFactory( |
| GroupByQueryRunnerTest.DEFAULT_MAPPER, |
| new GroupByQueryConfig() |
| { |
| @Override |
| public String getDefaultStrategy() |
| { |
| return GroupByStrategySelector.STRATEGY_V2; |
| } |
| }, |
| processingConfig |
| ); |
| |
| final GroupByQueryRunnerFactory groupByQueryRunnerFactory = factoryCloserPair.lhs; |
| closer.register(factoryCloserPair.rhs); |
| |
| final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate( |
| ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder() |
| .put( |
| SegmentMetadataQuery.class, |
| new SegmentMetadataQueryRunnerFactory( |
| new SegmentMetadataQueryQueryToolChest( |
| new SegmentMetadataQueryConfig("P1W") |
| ), |
| QueryRunnerTestHelper.NOOP_QUERYWATCHER |
| ) |
| ) |
| .put( |
| ScanQuery.class, |
| new ScanQueryRunnerFactory( |
| new ScanQueryQueryToolChest( |
| new ScanQueryConfig(), |
| new DefaultGenericQueryMetricsFactory() |
| ), |
| new ScanQueryEngine(), |
| new ScanQueryConfig() |
| ) |
| ) |
| .put( |
| TimeseriesQuery.class, |
| new TimeseriesQueryRunnerFactory( |
| new TimeseriesQueryQueryToolChest(), |
| new TimeseriesQueryEngine(), |
| QueryRunnerTestHelper.NOOP_QUERYWATCHER |
| ) |
| ) |
| .put( |
| TopNQuery.class, |
| new TopNQueryRunnerFactory( |
| stupidPool, |
| new TopNQueryQueryToolChest(new TopNQueryConfig()), |
| QueryRunnerTestHelper.NOOP_QUERYWATCHER |
| ) |
| ) |
| .put(GroupByQuery.class, groupByQueryRunnerFactory) |
| .build() |
| ); |
| |
| return conglomerate; |
| } |
| |
| public static JoinableFactory makeJoinableFactoryForLookup( |
| LookupExtractorFactoryContainerProvider lookupProvider |
| ) |
| { |
| return makeJoinableFactoryFromDefault(lookupProvider, null, null); |
| } |
| |
| public static JoinableFactory makeJoinableFactoryFromDefault( |
| @Nullable LookupExtractorFactoryContainerProvider lookupProvider, |
| @Nullable Set<JoinableFactory> customFactories, |
| @Nullable Map<Class<? extends JoinableFactory>, Class<? extends DataSource>> customMappings |
| ) |
| { |
| ImmutableSet.Builder<JoinableFactory> setBuilder = ImmutableSet.builder(); |
| ImmutableMap.Builder<Class<? extends JoinableFactory>, Class<? extends DataSource>> mapBuilder = |
| ImmutableMap.builder(); |
| setBuilder.add(new InlineJoinableFactory()); |
| mapBuilder.put(InlineJoinableFactory.class, InlineDataSource.class); |
| if (lookupProvider != null) { |
| setBuilder.add(new LookupJoinableFactory(lookupProvider)); |
| mapBuilder.put(LookupJoinableFactory.class, LookupDataSource.class); |
| } |
| if (customFactories != null) { |
| setBuilder.addAll(customFactories); |
| } |
| if (customMappings != null) { |
| mapBuilder.putAll(customMappings); |
| } |
| |
| return new MapJoinableFactory(setBuilder.build(), mapBuilder.build()); |
| } |
| } |