| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.client; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Function; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Ordering; |
| import com.google.common.collect.RangeSet; |
| import com.google.common.collect.Sets; |
| import com.google.common.hash.Hasher; |
| import com.google.common.hash.Hashing; |
| import com.google.common.primitives.Bytes; |
| import com.google.inject.Inject; |
| import org.apache.druid.client.cache.Cache; |
| import org.apache.druid.client.cache.CacheConfig; |
| import org.apache.druid.client.cache.CachePopulator; |
| import org.apache.druid.client.selector.QueryableDruidServer; |
| import org.apache.druid.client.selector.ServerSelector; |
| import org.apache.druid.guice.annotations.Client; |
| import org.apache.druid.guice.annotations.Merging; |
| import org.apache.druid.guice.annotations.Smile; |
| import org.apache.druid.guice.http.DruidHttpClientConfig; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.NonnullPair; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.guava.BaseSequence; |
| import org.apache.druid.java.util.common.guava.LazySequence; |
| import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; |
| import org.apache.druid.java.util.common.guava.Sequence; |
| import org.apache.druid.java.util.common.guava.Sequences; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.query.BySegmentResultValueClass; |
| import org.apache.druid.query.CacheStrategy; |
| import org.apache.druid.query.DruidProcessingConfig; |
| import org.apache.druid.query.Queries; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryContexts; |
| import org.apache.druid.query.QueryMetrics; |
| import org.apache.druid.query.QueryPlus; |
| import org.apache.druid.query.QueryRunner; |
| import org.apache.druid.query.QuerySegmentWalker; |
| import org.apache.druid.query.QueryToolChest; |
| import org.apache.druid.query.QueryToolChestWarehouse; |
| import org.apache.druid.query.Result; |
| import org.apache.druid.query.SegmentDescriptor; |
| import org.apache.druid.query.aggregation.MetricManipulatorFns; |
| import org.apache.druid.query.context.ResponseContext; |
| import org.apache.druid.query.context.ResponseContext.Key; |
| import org.apache.druid.query.filter.DimFilterUtils; |
| import org.apache.druid.query.planning.DataSourceAnalysis; |
| import org.apache.druid.query.spec.QuerySegmentSpec; |
| import org.apache.druid.segment.join.JoinableFactory; |
| import org.apache.druid.segment.join.JoinableFactoryWrapper; |
| import org.apache.druid.server.QueryResource; |
| import org.apache.druid.server.QueryScheduler; |
| import org.apache.druid.server.coordination.DruidServerMetadata; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.SegmentId; |
| import org.apache.druid.timeline.TimelineLookup; |
| import org.apache.druid.timeline.TimelineObjectHolder; |
| import org.apache.druid.timeline.VersionedIntervalTimeline; |
| import org.apache.druid.timeline.VersionedIntervalTimeline.PartitionChunkEntry; |
| import org.apache.druid.timeline.partition.PartitionChunk; |
| import org.joda.time.Interval; |
| |
| import javax.annotation.Nullable; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.concurrent.ForkJoinPool; |
| import java.util.function.BinaryOperator; |
| import java.util.function.UnaryOperator; |
| import java.util.stream.Collectors; |
| |
| /** |
| * This is the class on the Broker that is responsible for making native Druid queries to a cluster of data servers. |
| * |
| * The main user of this class is {@link org.apache.druid.server.ClientQuerySegmentWalker}. In tests, its behavior |
| * is partially mimicked by TestClusterQuerySegmentWalker. |
| */ |
| public class CachingClusteredClient implements QuerySegmentWalker |
| { |
| private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class); |
| private final QueryToolChestWarehouse warehouse; |
| private final TimelineServerView serverView; |
| private final Cache cache; |
| private final ObjectMapper objectMapper; |
| private final CachePopulator cachePopulator; |
| private final CacheConfig cacheConfig; |
| private final DruidHttpClientConfig httpClientConfig; |
| private final DruidProcessingConfig processingConfig; |
| private final ForkJoinPool pool; |
| private final QueryScheduler scheduler; |
| private final JoinableFactoryWrapper joinableFactoryWrapper; |
| |
| @Inject |
| public CachingClusteredClient( |
| QueryToolChestWarehouse warehouse, |
| TimelineServerView serverView, |
| Cache cache, |
| @Smile ObjectMapper objectMapper, |
| CachePopulator cachePopulator, |
| CacheConfig cacheConfig, |
| @Client DruidHttpClientConfig httpClientConfig, |
| DruidProcessingConfig processingConfig, |
| @Merging ForkJoinPool pool, |
| QueryScheduler scheduler, |
| JoinableFactory joinableFactory |
| ) |
| { |
| this.warehouse = warehouse; |
| this.serverView = serverView; |
| this.cache = cache; |
| this.objectMapper = objectMapper; |
| this.cachePopulator = cachePopulator; |
| this.cacheConfig = cacheConfig; |
| this.httpClientConfig = httpClientConfig; |
| this.processingConfig = processingConfig; |
| this.pool = pool; |
| this.scheduler = scheduler; |
| this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); |
| |
| if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) { |
| log.warn( |
| "Even though groupBy caching is enabled in your configuration, v2 groupBys will not be cached on the broker. " |
| + "Consider enabling caching on your data nodes if it is not already enabled." |
| ); |
| } |
| |
| serverView.registerSegmentCallback( |
| Execs.singleThreaded("CCClient-ServerView-CB-%d"), |
| new ServerView.BaseSegmentCallback() |
| { |
| @Override |
| public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) |
| { |
| CachingClusteredClient.this.cache.close(segment.getId().toString()); |
| return ServerView.CallbackAction.CONTINUE; |
| } |
| } |
| ); |
| } |
| |
| @Override |
| public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals) |
| { |
| return new QueryRunner<T>() |
| { |
| @Override |
| public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext) |
| { |
| return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> timeline, false); |
| } |
| }; |
| } |
| |
| /** |
| * Run a query. The timelineConverter will be given the "master" timeline and can be used to return a different |
| * timeline, if desired. This is used by getQueryRunnerForSegments. |
| */ |
| private <T> Sequence<T> run( |
| final QueryPlus<T> queryPlus, |
| final ResponseContext responseContext, |
| final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter, |
| final boolean specificSegments |
| ) |
| { |
| final ClusterQueryResult<T> result = new SpecificQueryRunnable<>(queryPlus, responseContext) |
| .run(timelineConverter, specificSegments); |
| initializeNumRemainingResponsesInResponseContext(queryPlus.getQuery(), responseContext, result.numQueryServers); |
| return result.sequence; |
| } |
| |
| private static <T> void initializeNumRemainingResponsesInResponseContext( |
| final Query<T> query, |
| final ResponseContext responseContext, |
| final int numQueryServers |
| ) |
| { |
| responseContext.add( |
| Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, |
| new NonnullPair<>(query.getMostSpecificId(), numQueryServers) |
| ); |
| } |
| |
| @Override |
| public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs) |
| { |
| return new QueryRunner<T>() |
| { |
| @Override |
| public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext) |
| { |
| return CachingClusteredClient.this.run( |
| queryPlus, |
| responseContext, |
| new TimelineConverter(specs), |
| true |
| ); |
| } |
| }; |
| } |
| |
| private static class ClusterQueryResult<T> |
| { |
| private final Sequence<T> sequence; |
| private final int numQueryServers; |
| |
| private ClusterQueryResult(Sequence<T> sequence, int numQueryServers) |
| { |
| this.sequence = sequence; |
| this.numQueryServers = numQueryServers; |
| } |
| } |
| |
| /** |
| * This class essentially encapsulates the major part of the logic of {@link CachingClusteredClient}. It's state and |
| * methods couldn't belong to {@link CachingClusteredClient} itself, because they depend on the specific query object |
| * being run, but {@link QuerySegmentWalker} API is designed so that implementations should be able to accept |
| * arbitrary queries. |
| */ |
| private class SpecificQueryRunnable<T> |
| { |
| private final ResponseContext responseContext; |
| private QueryPlus<T> queryPlus; |
| private Query<T> query; |
| private final QueryToolChest<T, Query<T>> toolChest; |
| @Nullable |
| private final CacheStrategy<T, Object, Query<T>> strategy; |
| private final boolean useCache; |
| private final boolean populateCache; |
| private final boolean isBySegment; |
| private final int uncoveredIntervalsLimit; |
| private final Map<String, Cache.NamedKey> cachePopulatorKeyMap = new HashMap<>(); |
| private final DataSourceAnalysis dataSourceAnalysis; |
| private final List<Interval> intervals; |
| private final CacheKeyManager<T> cacheKeyManager; |
| |
| SpecificQueryRunnable(final QueryPlus<T> queryPlus, final ResponseContext responseContext) |
| { |
| this.queryPlus = queryPlus; |
| this.responseContext = responseContext; |
| this.query = queryPlus.getQuery(); |
| this.toolChest = warehouse.getToolChest(query); |
| this.strategy = toolChest.getCacheStrategy(query); |
| this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource()); |
| |
| this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); |
| this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); |
| this.isBySegment = QueryContexts.isBySegment(query); |
| // Note that enabling this leads to putting uncovered intervals information in the response headers |
| // and might blow up in some cases https://github.com/apache/druid/issues/2108 |
| this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); |
| // For nested queries, we need to look at the intervals of the inner most query. |
| this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec() |
| .map(QuerySegmentSpec::getIntervals) |
| .orElseGet(() -> query.getIntervals()); |
| this.cacheKeyManager = new CacheKeyManager<>( |
| query, |
| strategy, |
| useCache, |
| populateCache, |
| dataSourceAnalysis, |
| joinableFactoryWrapper |
| ); |
| } |
| |
| private ImmutableMap<String, Object> makeDownstreamQueryContext() |
| { |
| final ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>(); |
| |
| final int priority = QueryContexts.getPriority(query); |
| contextBuilder.put(QueryContexts.PRIORITY_KEY, priority); |
| final String lane = QueryContexts.getLane(query); |
| if (lane != null) { |
| contextBuilder.put(QueryContexts.LANE_KEY, lane); |
| } |
| |
| if (populateCache) { |
| // prevent down-stream nodes from caching results as well if we are populating the cache |
| contextBuilder.put(CacheConfig.POPULATE_CACHE, false); |
| contextBuilder.put(QueryContexts.BY_SEGMENT_KEY, true); |
| } |
| return contextBuilder.build(); |
| } |
| |
| /** |
| * Builds a query distribution and merge plan. |
| * |
| * This method returns an empty sequence if the query datasource is unknown or there is matching result-level cache. |
| * Otherwise, it creates a sequence merging sequences from the regular broker cache and remote servers. If parallel |
| * merge is enabled, it can merge and *combine* the underlying sequences in parallel. |
| * |
| * @return a pair of a sequence merging results from remote query servers and the number of remote servers |
| * participating in query processing. |
| */ |
| ClusterQueryResult<T> run( |
| final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter, |
| final boolean specificSegments |
| ) |
| { |
| final Optional<? extends TimelineLookup<String, ServerSelector>> maybeTimeline = serverView.getTimeline( |
| dataSourceAnalysis |
| ); |
| if (!maybeTimeline.isPresent()) { |
| return new ClusterQueryResult<>(Sequences.empty(), 0); |
| } |
| |
| final TimelineLookup<String, ServerSelector> timeline = timelineConverter.apply(maybeTimeline.get()); |
| if (uncoveredIntervalsLimit > 0) { |
| computeUncoveredIntervals(timeline); |
| } |
| |
| final Set<SegmentServerSelector> segmentServers = computeSegmentsToQuery(timeline, specificSegments); |
| @Nullable |
| final byte[] queryCacheKey = cacheKeyManager.computeSegmentLevelQueryCacheKey(); |
| if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) { |
| @Nullable |
| final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH); |
| @Nullable |
| final String currentEtag = cacheKeyManager.computeResultLevelCachingEtag(segmentServers, queryCacheKey); |
| if (null != currentEtag) { |
| responseContext.put(Key.ETAG, currentEtag); |
| } |
| if (currentEtag != null && currentEtag.equals(prevEtag)) { |
| return new ClusterQueryResult<>(Sequences.empty(), 0); |
| } |
| } |
| |
| final List<Pair<Interval, byte[]>> alreadyCachedResults = |
| pruneSegmentsWithCachedResults(queryCacheKey, segmentServers); |
| |
| query = scheduler.prioritizeAndLaneQuery(queryPlus, segmentServers); |
| queryPlus = queryPlus.withQuery(query); |
| |
| final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segmentServers); |
| LazySequence<T> mergedResultSequence = new LazySequence<>(() -> { |
| List<Sequence<T>> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size()); |
| addSequencesFromCache(sequencesByInterval, alreadyCachedResults); |
| addSequencesFromServer(sequencesByInterval, segmentsByServer); |
| return merge(sequencesByInterval); |
| }); |
| |
| return new ClusterQueryResult<>(scheduler.run(query, mergedResultSequence), segmentsByServer.size()); |
| } |
| |
| private Sequence<T> merge(List<Sequence<T>> sequencesByInterval) |
| { |
| BinaryOperator<T> mergeFn = toolChest.createMergeFn(query); |
| if (processingConfig.useParallelMergePool() && QueryContexts.getEnableParallelMerges(query) && mergeFn != null) { |
| return new ParallelMergeCombiningSequence<>( |
| pool, |
| sequencesByInterval, |
| query.getResultOrdering(), |
| mergeFn, |
| QueryContexts.hasTimeout(query), |
| QueryContexts.getTimeout(query), |
| QueryContexts.getPriority(query), |
| QueryContexts.getParallelMergeParallelism(query, processingConfig.getMergePoolDefaultMaxQueryParallelism()), |
| QueryContexts.getParallelMergeInitialYieldRows(query, processingConfig.getMergePoolTaskInitialYieldRows()), |
| QueryContexts.getParallelMergeSmallBatchRows(query, processingConfig.getMergePoolSmallBatchRows()), |
| processingConfig.getMergePoolTargetTaskRunTimeMillis(), |
| reportMetrics -> { |
| QueryMetrics<?> queryMetrics = queryPlus.getQueryMetrics(); |
| if (queryMetrics != null) { |
| queryMetrics.parallelMergeParallelism(reportMetrics.getParallelism()); |
| queryMetrics.reportParallelMergeParallelism(reportMetrics.getParallelism()); |
| queryMetrics.reportParallelMergeInputSequences(reportMetrics.getInputSequences()); |
| queryMetrics.reportParallelMergeInputRows(reportMetrics.getInputRows()); |
| queryMetrics.reportParallelMergeOutputRows(reportMetrics.getOutputRows()); |
| queryMetrics.reportParallelMergeTaskCount(reportMetrics.getTaskCount()); |
| queryMetrics.reportParallelMergeTotalCpuTime(reportMetrics.getTotalCpuTime()); |
| } |
| } |
| ); |
| } else { |
| return Sequences |
| .simple(sequencesByInterval) |
| .flatMerge(seq -> seq, query.getResultOrdering()); |
| } |
| } |
| |
| private Set<SegmentServerSelector> computeSegmentsToQuery( |
| TimelineLookup<String, ServerSelector> timeline, |
| boolean specificSegments |
| ) |
| { |
| final java.util.function.Function<Interval, List<TimelineObjectHolder<String, ServerSelector>>> lookupFn |
| = specificSegments ? timeline::lookupWithIncompletePartitions : timeline::lookup; |
| final List<TimelineObjectHolder<String, ServerSelector>> serversLookup = toolChest.filterSegments( |
| query, |
| intervals.stream().flatMap(i -> lookupFn.apply(i).stream()).collect(Collectors.toList()) |
| ); |
| |
| final Set<SegmentServerSelector> segments = new LinkedHashSet<>(); |
| final Map<String, Optional<RangeSet<String>>> dimensionRangeCache = new HashMap<>(); |
| // Filter unneeded chunks based on partition dimension |
| for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) { |
| final Set<PartitionChunk<ServerSelector>> filteredChunks; |
| if (QueryContexts.isSecondaryPartitionPruningEnabled(query)) { |
| filteredChunks = DimFilterUtils.filterShards( |
| query.getFilter(), |
| holder.getObject(), |
| partitionChunk -> partitionChunk.getObject().getSegment().getShardSpec(), |
| dimensionRangeCache |
| ); |
| } else { |
| filteredChunks = Sets.newHashSet(holder.getObject()); |
| } |
| for (PartitionChunk<ServerSelector> chunk : filteredChunks) { |
| ServerSelector server = chunk.getObject(); |
| final SegmentDescriptor segment = new SegmentDescriptor( |
| holder.getInterval(), |
| holder.getVersion(), |
| chunk.getChunkNumber() |
| ); |
| segments.add(new SegmentServerSelector(server, segment)); |
| } |
| } |
| return segments; |
| } |
| |
| private void computeUncoveredIntervals(TimelineLookup<String, ServerSelector> timeline) |
| { |
| final List<Interval> uncoveredIntervals = new ArrayList<>(uncoveredIntervalsLimit); |
| boolean uncoveredIntervalsOverflowed = false; |
| |
| for (Interval interval : intervals) { |
| Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval); |
| long startMillis = interval.getStartMillis(); |
| long endMillis = interval.getEndMillis(); |
| for (TimelineObjectHolder<String, ServerSelector> holder : lookup) { |
| Interval holderInterval = holder.getInterval(); |
| long intervalStart = holderInterval.getStartMillis(); |
| if (!uncoveredIntervalsOverflowed && startMillis != intervalStart) { |
| if (uncoveredIntervalsLimit > uncoveredIntervals.size()) { |
| uncoveredIntervals.add(Intervals.utc(startMillis, intervalStart)); |
| } else { |
| uncoveredIntervalsOverflowed = true; |
| } |
| } |
| startMillis = holderInterval.getEndMillis(); |
| } |
| |
| if (!uncoveredIntervalsOverflowed && startMillis < endMillis) { |
| if (uncoveredIntervalsLimit > uncoveredIntervals.size()) { |
| uncoveredIntervals.add(Intervals.utc(startMillis, endMillis)); |
| } else { |
| uncoveredIntervalsOverflowed = true; |
| } |
| } |
| } |
| |
| if (!uncoveredIntervals.isEmpty()) { |
| // Record in the response context the interval for which NO segment is present. |
| // Which is not necessarily an indication that the data doesn't exist or is |
| // incomplete. The data could exist and just not be loaded yet. In either |
| // case, though, this query will not include any data from the identified intervals. |
| responseContext.add(ResponseContext.Key.UNCOVERED_INTERVALS, uncoveredIntervals); |
| responseContext.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, uncoveredIntervalsOverflowed); |
| } |
| } |
| |
| private List<Pair<Interval, byte[]>> pruneSegmentsWithCachedResults( |
| final byte[] queryCacheKey, |
| final Set<SegmentServerSelector> segments |
| ) |
| { |
| if (queryCacheKey == null) { |
| return Collections.emptyList(); |
| } |
| final List<Pair<Interval, byte[]>> alreadyCachedResults = new ArrayList<>(); |
| Map<SegmentServerSelector, Cache.NamedKey> perSegmentCacheKeys = computePerSegmentCacheKeys( |
| segments, |
| queryCacheKey |
| ); |
| // Pull cached segments from cache and remove from set of segments to query |
| final Map<Cache.NamedKey, byte[]> cachedValues = computeCachedValues(perSegmentCacheKeys); |
| |
| perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> { |
| final Interval segmentQueryInterval = segment.getSegmentDescriptor().getInterval(); |
| |
| final byte[] cachedValue = cachedValues.get(segmentCacheKey); |
| if (cachedValue != null) { |
| // remove cached segment from set of segments to query |
| segments.remove(segment); |
| alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue)); |
| } else if (populateCache) { |
| // otherwise, if populating cache, add segment to list of segments to cache |
| final SegmentId segmentId = segment.getServer().getSegment().getId(); |
| addCachePopulatorKey(segmentCacheKey, segmentId, segmentQueryInterval); |
| } |
| }); |
| return alreadyCachedResults; |
| } |
| |
| private Map<SegmentServerSelector, Cache.NamedKey> computePerSegmentCacheKeys( |
| Set<SegmentServerSelector> segments, |
| byte[] queryCacheKey |
| ) |
| { |
| // cacheKeys map must preserve segment ordering, in order for shards to always be combined in the same order |
| Map<SegmentServerSelector, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap(); |
| for (SegmentServerSelector segmentServer : segments) { |
| final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey( |
| segmentServer.getServer().getSegment().getId().toString(), |
| segmentServer.getSegmentDescriptor(), |
| queryCacheKey |
| ); |
| cacheKeys.put(segmentServer, segmentCacheKey); |
| } |
| return cacheKeys; |
| } |
| |
| private Map<Cache.NamedKey, byte[]> computeCachedValues(Map<SegmentServerSelector, Cache.NamedKey> cacheKeys) |
| { |
| if (useCache) { |
| return cache.getBulk(Iterables.limit(cacheKeys.values(), cacheConfig.getCacheBulkMergeLimit())); |
| } else { |
| return ImmutableMap.of(); |
| } |
| } |
| |
| private void addCachePopulatorKey( |
| Cache.NamedKey segmentCacheKey, |
| SegmentId segmentId, |
| Interval segmentQueryInterval |
| ) |
| { |
| cachePopulatorKeyMap.put(StringUtils.format("%s_%s", segmentId, segmentQueryInterval), segmentCacheKey); |
| } |
| |
| @Nullable |
| private Cache.NamedKey getCachePopulatorKey(String segmentId, Interval segmentInterval) |
| { |
| return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval)); |
| } |
| |
| private SortedMap<DruidServer, List<SegmentDescriptor>> groupSegmentsByServer(Set<SegmentServerSelector> segments) |
| { |
| final SortedMap<DruidServer, List<SegmentDescriptor>> serverSegments = new TreeMap<>(); |
| for (SegmentServerSelector segmentServer : segments) { |
| final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick(query); |
| |
| if (queryableDruidServer == null) { |
| log.makeAlert( |
| "No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", |
| segmentServer.getSegmentDescriptor(), |
| query.getDataSource() |
| ).emit(); |
| } else { |
| final DruidServer server = queryableDruidServer.getServer(); |
| serverSegments.computeIfAbsent(server, s -> new ArrayList<>()).add(segmentServer.getSegmentDescriptor()); |
| } |
| } |
| return serverSegments; |
| } |
| |
| private void addSequencesFromCache( |
| final List<Sequence<T>> listOfSequences, |
| final List<Pair<Interval, byte[]>> cachedResults |
| ) |
| { |
| if (strategy == null) { |
| return; |
| } |
| |
| final Function<Object, T> pullFromCacheFunction = strategy.pullFromSegmentLevelCache(); |
| final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz(); |
| for (Pair<Interval, byte[]> cachedResultPair : cachedResults) { |
| final byte[] cachedResult = cachedResultPair.rhs; |
| Sequence<Object> cachedSequence = new BaseSequence<>( |
| new BaseSequence.IteratorMaker<Object, Iterator<Object>>() |
| { |
| @Override |
| public Iterator<Object> make() |
| { |
| try { |
| if (cachedResult.length == 0) { |
| return Collections.emptyIterator(); |
| } |
| |
| return objectMapper.readValues( |
| objectMapper.getFactory().createParser(cachedResult), |
| cacheObjectClazz |
| ); |
| } |
| catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void cleanup(Iterator<Object> iterFromMake) |
| { |
| } |
| } |
| ); |
| listOfSequences.add(Sequences.map(cachedSequence, pullFromCacheFunction)); |
| } |
| } |
| |
| /** |
| * Create sequences that reads from remote query servers (historicals and tasks). Note that the broker will |
| * hold an HTTP connection per server after this method is called. |
| */ |
| private void addSequencesFromServer( |
| final List<Sequence<T>> listOfSequences, |
| final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer |
| ) |
| { |
| segmentsByServer.forEach((server, segmentsOfServer) -> { |
| final QueryRunner serverRunner = serverView.getQueryRunner(server); |
| |
| if (serverRunner == null) { |
| log.error("Server[%s] doesn't have a query runner", server.getName()); |
| return; |
| } |
| |
| // Divide user-provided maxQueuedBytes by the number of servers, and limit each server to that much. |
| final long maxQueuedBytes = QueryContexts.getMaxQueuedBytes(query, httpClientConfig.getMaxQueuedBytes()); |
| final long maxQueuedBytesPerServer = maxQueuedBytes / segmentsByServer.size(); |
| final Sequence<T> serverResults; |
| |
| if (isBySegment) { |
| serverResults = getBySegmentServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); |
| } else if (!server.isSegmentReplicationTarget() || !populateCache) { |
| serverResults = getSimpleServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); |
| } else { |
| serverResults = getAndCacheServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); |
| } |
| listOfSequences.add(serverResults); |
| }); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Sequence<T> getBySegmentServerResults( |
| final QueryRunner serverRunner, |
| final List<SegmentDescriptor> segmentsOfServer, |
| long maxQueuedBytesPerServer |
| ) |
| { |
| Sequence<Result<BySegmentResultValueClass<T>>> resultsBySegments = serverRunner |
| .run( |
| queryPlus.withQuery( |
| Queries.withSpecificSegments(queryPlus.getQuery(), segmentsOfServer) |
| ).withMaxQueuedBytes(maxQueuedBytesPerServer), |
| responseContext |
| ); |
| // bySegment results need to be de-serialized, see DirectDruidClient.run() |
| return (Sequence<T>) resultsBySegments |
| .map(result -> result.map( |
| resultsOfSegment -> resultsOfSegment.mapResults( |
| toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing())::apply |
| ) |
| )); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Sequence<T> getSimpleServerResults( |
| final QueryRunner serverRunner, |
| final List<SegmentDescriptor> segmentsOfServer, |
| long maxQueuedBytesPerServer |
| ) |
| { |
| return serverRunner.run( |
| queryPlus.withQuery( |
| Queries.withSpecificSegments(queryPlus.getQuery(), segmentsOfServer) |
| ).withMaxQueuedBytes(maxQueuedBytesPerServer), |
| responseContext |
| ); |
| } |
| |
| private Sequence<T> getAndCacheServerResults( |
| final QueryRunner serverRunner, |
| final List<SegmentDescriptor> segmentsOfServer, |
| long maxQueuedBytesPerServer |
| ) |
| { |
| @SuppressWarnings("unchecked") |
| final Query<T> downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext()); |
| final Sequence<Result<BySegmentResultValueClass<T>>> resultsBySegments = serverRunner.run( |
| queryPlus |
| .withQuery( |
| Queries.withSpecificSegments( |
| downstreamQuery, |
| segmentsOfServer |
| ) |
| ) |
| .withMaxQueuedBytes(maxQueuedBytesPerServer), |
| responseContext |
| ); |
| final Function<T, Object> cacheFn = strategy.prepareForSegmentLevelCache(); |
| |
| return resultsBySegments |
| .map(result -> { |
| final BySegmentResultValueClass<T> resultsOfSegment = result.getValue(); |
| final Cache.NamedKey cachePopulatorKey = |
| getCachePopulatorKey(resultsOfSegment.getSegmentId(), resultsOfSegment.getInterval()); |
| Sequence<T> res = Sequences.simple(resultsOfSegment.getResults()); |
| if (cachePopulatorKey != null) { |
| res = cachePopulator.wrap(res, cacheFn::apply, cache, cachePopulatorKey); |
| } |
| return res.map( |
| toolChest.makePreComputeManipulatorFn(downstreamQuery, MetricManipulatorFns.deserializing())::apply |
| ); |
| }) |
| .flatMerge(seq -> seq, query.getResultOrdering()); |
| } |
| } |
| |
| /** |
| * An inner class that is used solely for computing cache keys. Its a separate class to allow extensive unit testing |
| * of cache key generation. |
| */ |
| @VisibleForTesting |
| static class CacheKeyManager<T> |
| { |
| private final Query<T> query; |
| private final CacheStrategy<T, Object, Query<T>> strategy; |
| private final DataSourceAnalysis dataSourceAnalysis; |
| private final JoinableFactoryWrapper joinableFactoryWrapper; |
| private final boolean isSegmentLevelCachingEnable; |
| |
| CacheKeyManager( |
| final Query<T> query, |
| final CacheStrategy<T, Object, Query<T>> strategy, |
| final boolean useCache, |
| final boolean populateCache, |
| final DataSourceAnalysis dataSourceAnalysis, |
| final JoinableFactoryWrapper joinableFactoryWrapper |
| ) |
| { |
| |
| this.query = query; |
| this.strategy = strategy; |
| this.dataSourceAnalysis = dataSourceAnalysis; |
| this.joinableFactoryWrapper = joinableFactoryWrapper; |
| this.isSegmentLevelCachingEnable = ((populateCache || useCache) |
| && !QueryContexts.isBySegment(query)); // explicit bySegment queries are never cached |
| |
| } |
| |
| @Nullable |
| byte[] computeSegmentLevelQueryCacheKey() |
| { |
| if (isSegmentLevelCachingEnable) { |
| return computeQueryCacheKeyWithJoin(); |
| } |
| return null; |
| } |
| |
| /** |
| * It computes the ETAG which is used by {@link org.apache.druid.query.ResultLevelCachingQueryRunner} for |
| * result level caches. queryCacheKey can be null if segment level cache is not being used. However, ETAG |
| * is still computed since result level cache may still be on. |
| */ |
| @Nullable |
| String computeResultLevelCachingEtag( |
| final Set<SegmentServerSelector> segments, |
| @Nullable byte[] queryCacheKey |
| ) |
| { |
| Hasher hasher = Hashing.sha1().newHasher(); |
| boolean hasOnlyHistoricalSegments = true; |
| for (SegmentServerSelector p : segments) { |
| QueryableDruidServer queryableServer = p.getServer().pick(query); |
| if (queryableServer == null || !queryableServer.getServer().isSegmentReplicationTarget()) { |
| hasOnlyHistoricalSegments = false; |
| break; |
| } |
| hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8); |
| // it is important to add the "query interval" as part ETag calculation |
| // to have result level cache work correctly for queries with different |
| // intervals covering the same set of segments |
| hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8); |
| } |
| |
| if (!hasOnlyHistoricalSegments) { |
| return null; |
| } |
| |
| // query cache key can be null if segment level caching is disabled |
| final byte[] queryCacheKeyFinal = (queryCacheKey == null) ? computeQueryCacheKeyWithJoin() : queryCacheKey; |
| if (queryCacheKeyFinal == null) { |
| return null; |
| } |
| hasher.putBytes(queryCacheKeyFinal); |
| String currEtag = StringUtils.encodeBase64String(hasher.hash().asBytes()); |
| return currEtag; |
| } |
| |
| /** |
| * Adds the cache key prefix for join data sources. Return null if its a join but caching is not supported |
| */ |
| @Nullable |
| private byte[] computeQueryCacheKeyWithJoin() |
| { |
| Preconditions.checkNotNull(strategy, "strategy cannot be null"); |
| if (dataSourceAnalysis.isJoin()) { |
| byte[] joinDataSourceCacheKey = joinableFactoryWrapper.computeJoinDataSourceCacheKey(dataSourceAnalysis) |
| .orElse(null); |
| if (null == joinDataSourceCacheKey) { |
| return null; // A join operation that does not support caching |
| } |
| return Bytes.concat(joinDataSourceCacheKey, strategy.computeCacheKey(query)); |
| } |
| return strategy.computeCacheKey(query); |
| } |
| } |
| |
| private static class TimelineConverter implements UnaryOperator<TimelineLookup<String, ServerSelector>> |
| { |
| private final Iterable<SegmentDescriptor> specs; |
| |
| TimelineConverter(final Iterable<SegmentDescriptor> specs) |
| { |
| this.specs = specs; |
| } |
| |
| @Override |
| public TimelineLookup<String, ServerSelector> apply(TimelineLookup<String, ServerSelector> timeline) |
| { |
| final VersionedIntervalTimeline<String, ServerSelector> timeline2 = |
| new VersionedIntervalTimeline<>(Ordering.natural()); |
| Iterator<PartitionChunkEntry<String, ServerSelector>> unfilteredIterator = |
| Iterators.transform(specs.iterator(), spec -> toChunkEntry(timeline, spec)); |
| Iterator<PartitionChunkEntry<String, ServerSelector>> iterator = Iterators.filter( |
| unfilteredIterator, |
| Objects::nonNull |
| ); |
| // VersionedIntervalTimeline#addAll implementation is much more efficient than calling VersionedIntervalTimeline#add |
| // in a loop when there are lot of segments to be added for same interval and version. |
| timeline2.addAll(iterator); |
| return timeline2; |
| } |
| |
| @Nullable |
| private PartitionChunkEntry<String, ServerSelector> toChunkEntry( |
| TimelineLookup<String, ServerSelector> timeline, |
| SegmentDescriptor spec |
| ) |
| { |
| PartitionChunk<ServerSelector> chunk = timeline.findChunk( |
| spec.getInterval(), |
| spec.getVersion(), |
| spec.getPartitionNumber() |
| ); |
| if (null == chunk) { |
| return null; |
| } |
| return new PartitionChunkEntry<>(spec.getInterval(), spec.getVersion(), chunk); |
| |
| } |
| } |
| } |