| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server.coordination; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.Lists; |
| import com.google.inject.Inject; |
| import org.apache.druid.client.CachingQueryRunner; |
| import org.apache.druid.client.cache.Cache; |
| import org.apache.druid.client.cache.CacheConfig; |
| import org.apache.druid.client.cache.CachePopulator; |
| import org.apache.druid.guice.annotations.Smile; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.guava.FunctionalIterable; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.query.BySegmentQueryRunner; |
| import org.apache.druid.query.CPUTimeMetricQueryRunner; |
| import org.apache.druid.query.FinalizeResultsQueryRunner; |
| import org.apache.druid.query.MetricsEmittingQueryRunner; |
| import org.apache.druid.query.NoopQueryRunner; |
| import org.apache.druid.query.PerSegmentOptimizingQueryRunner; |
| import org.apache.druid.query.PerSegmentQueryOptimizationContext; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryDataSource; |
| import org.apache.druid.query.QueryMetrics; |
| import org.apache.druid.query.QueryProcessingPool; |
| import org.apache.druid.query.QueryRunner; |
| import org.apache.druid.query.QueryRunnerFactory; |
| import org.apache.druid.query.QueryRunnerFactoryConglomerate; |
| import org.apache.druid.query.QuerySegmentWalker; |
| import org.apache.druid.query.QueryToolChest; |
| import org.apache.druid.query.QueryUnsupportedException; |
| import org.apache.druid.query.ReferenceCountingSegmentQueryRunner; |
| import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; |
| import org.apache.druid.query.SegmentDescriptor; |
| import org.apache.druid.query.planning.DataSourceAnalysis; |
| import org.apache.druid.query.spec.SpecificSegmentQueryRunner; |
| import org.apache.druid.query.spec.SpecificSegmentSpec; |
| import org.apache.druid.segment.ReferenceCountingSegment; |
| import org.apache.druid.segment.SegmentReference; |
| import org.apache.druid.segment.StorageAdapter; |
| import org.apache.druid.segment.filter.Filters; |
| import org.apache.druid.segment.join.JoinableFactory; |
| import org.apache.druid.segment.join.JoinableFactoryWrapper; |
| import org.apache.druid.server.SegmentManager; |
| import org.apache.druid.server.SetAndVerifyContextQueryRunner; |
| import org.apache.druid.server.initialization.ServerConfig; |
| import org.apache.druid.timeline.SegmentId; |
| import org.apache.druid.timeline.VersionedIntervalTimeline; |
| import org.apache.druid.timeline.partition.PartitionChunk; |
| import org.joda.time.Interval; |
| |
| import java.util.Collections; |
| import java.util.Optional; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Function; |
| |
| /** |
| * Query handler for Historical processes (see CliHistorical). |
| * <p> |
| * In tests, this class's behavior is partially mimicked by TestClusterQuerySegmentWalker. |
| */ |
| public class ServerManager implements QuerySegmentWalker |
| { |
| private static final EmittingLogger log = new EmittingLogger(ServerManager.class); |
| private final QueryRunnerFactoryConglomerate conglomerate; |
| private final ServiceEmitter emitter; |
| private final QueryProcessingPool queryProcessingPool; |
| private final CachePopulator cachePopulator; |
| private final Cache cache; |
| private final ObjectMapper objectMapper; |
| private final CacheConfig cacheConfig; |
| private final SegmentManager segmentManager; |
| private final JoinableFactoryWrapper joinableFactoryWrapper; |
| private final ServerConfig serverConfig; |
| |
| @Inject |
| public ServerManager( |
| QueryRunnerFactoryConglomerate conglomerate, |
| ServiceEmitter emitter, |
| QueryProcessingPool queryProcessingPool, |
| CachePopulator cachePopulator, |
| @Smile ObjectMapper objectMapper, |
| Cache cache, |
| CacheConfig cacheConfig, |
| SegmentManager segmentManager, |
| JoinableFactory joinableFactory, |
| ServerConfig serverConfig |
| ) |
| { |
| this.conglomerate = conglomerate; |
| this.emitter = emitter; |
| |
| this.queryProcessingPool = queryProcessingPool; |
| this.cachePopulator = cachePopulator; |
| this.cache = cache; |
| this.objectMapper = objectMapper; |
| |
| this.cacheConfig = cacheConfig; |
| this.segmentManager = segmentManager; |
| this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); |
| this.serverConfig = serverConfig; |
| } |
| |
| @Override |
| public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) |
| { |
| final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); |
| final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline; |
| final Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> maybeTimeline = |
| segmentManager.getTimeline(analysis); |
| |
| if (maybeTimeline.isPresent()) { |
| timeline = maybeTimeline.get(); |
| } else { |
| // Even though we didn't find a timeline for the query datasource, we simply returns a noopQueryRunner |
| // instead of reporting missing intervals because the query intervals are a filter rather than something |
| // we must find. |
| return new NoopQueryRunner<>(); |
| } |
| |
| FunctionalIterable<SegmentDescriptor> segmentDescriptors = FunctionalIterable |
| .create(intervals) |
| .transformCat(timeline::lookup) |
| .transformCat( |
| holder -> { |
| if (holder == null) { |
| return null; |
| } |
| |
| return FunctionalIterable |
| .create(holder.getObject()) |
| .transform( |
| partitionChunk -> |
| new SegmentDescriptor( |
| holder.getInterval(), |
| holder.getVersion(), |
| partitionChunk.getChunkNumber() |
| ) |
| ); |
| } |
| ); |
| |
| return getQueryRunnerForSegments(query, segmentDescriptors); |
| } |
| |
| @Override |
| public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) |
| { |
| final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query); |
| if (factory == null) { |
| final QueryUnsupportedException e = new QueryUnsupportedException( |
| StringUtils.format("Unknown query type, [%s]", query.getClass()) |
| ); |
| log.makeAlert(e, "Error while executing a query[%s]", query.getId()) |
| .addData("dataSource", query.getDataSource()) |
| .emit(); |
| throw e; |
| } |
| |
| final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest(); |
| final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); |
| final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); |
| |
| final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline; |
| final Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> maybeTimeline = |
| segmentManager.getTimeline(analysis); |
| |
| // Make sure this query type can handle the subquery, if present. |
| if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) { |
| throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); |
| } |
| |
| if (maybeTimeline.isPresent()) { |
| timeline = maybeTimeline.get(); |
| } else { |
| return new ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs)); |
| } |
| |
| // segmentMapFn maps each base Segment into a joined Segment if necessary. |
| final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn( |
| analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null), |
| analysis.getPreJoinableClauses(), |
| cpuTimeAccumulator, |
| analysis.getBaseQuery().orElse(query) |
| ); |
| // We compute the join cache key here itself so it doesn't need to be re-computed for every segment |
| final Optional<byte[]> cacheKeyPrefix = analysis.isJoin() |
| ? joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis) |
| : Optional.of(StringUtils.EMPTY_BYTES); |
| |
| final FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable |
| .create(specs) |
| .transformCat( |
| descriptor -> Collections.singletonList( |
| buildQueryRunnerForSegment( |
| query, |
| descriptor, |
| factory, |
| toolChest, |
| timeline, |
| segmentMapFn, |
| cpuTimeAccumulator, |
| cacheKeyPrefix |
| ) |
| ) |
| ); |
| |
| return CPUTimeMetricQueryRunner.safeBuild( |
| new FinalizeResultsQueryRunner<>( |
| toolChest.mergeResults(factory.mergeRunners(queryProcessingPool, queryRunners)), |
| toolChest |
| ), |
| toolChest, |
| emitter, |
| cpuTimeAccumulator, |
| true |
| ); |
| } |
| |
| <T> QueryRunner<T> buildQueryRunnerForSegment( |
| final Query<T> query, |
| final SegmentDescriptor descriptor, |
| final QueryRunnerFactory<T, Query<T>> factory, |
| final QueryToolChest<T, Query<T>> toolChest, |
| final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline, |
| final Function<SegmentReference, SegmentReference> segmentMapFn, |
| final AtomicLong cpuTimeAccumulator, |
| Optional<byte[]> cacheKeyPrefix |
| ) |
| { |
| final PartitionChunk<ReferenceCountingSegment> chunk = timeline.findChunk( |
| descriptor.getInterval(), |
| descriptor.getVersion(), |
| descriptor.getPartitionNumber() |
| ); |
| |
| if (chunk == null) { |
| return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); |
| } |
| |
| final ReferenceCountingSegment segment = chunk.getObject(); |
| return buildAndDecorateQueryRunner( |
| factory, |
| toolChest, |
| segmentMapFn.apply(segment), |
| cacheKeyPrefix, |
| descriptor, |
| cpuTimeAccumulator |
| ); |
| } |
| |
| private <T> QueryRunner<T> buildAndDecorateQueryRunner( |
| final QueryRunnerFactory<T, Query<T>> factory, |
| final QueryToolChest<T, Query<T>> toolChest, |
| final SegmentReference segment, |
| final Optional<byte[]> cacheKeyPrefix, |
| final SegmentDescriptor segmentDescriptor, |
| final AtomicLong cpuTimeAccumulator |
| ) |
| { |
| final SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor); |
| final SegmentId segmentId = segment.getId(); |
| final Interval segmentInterval = segment.getDataInterval(); |
| // ReferenceCountingSegment can return null for ID or interval if it's already closed. |
| // Here, we check one more time if the segment is closed. |
| // If the segment is closed after this line, ReferenceCountingSegmentQueryRunner will handle and do the right thing. |
| if (segmentId == null || segmentInterval == null) { |
| return new ReportTimelineMissingSegmentQueryRunner<>(segmentDescriptor); |
| } |
| String segmentIdString = segmentId.toString(); |
| |
| MetricsEmittingQueryRunner<T> metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>( |
| emitter, |
| toolChest, |
| new ReferenceCountingSegmentQueryRunner<>(factory, segment, segmentDescriptor), |
| QueryMetrics::reportSegmentTime, |
| queryMetrics -> queryMetrics.segment(segmentIdString) |
| ); |
| |
| StorageAdapter storageAdapter = segment.asStorageAdapter(); |
| long segmentMaxTime = storageAdapter.getMaxTime().getMillis(); |
| long segmentMinTime = storageAdapter.getMinTime().getMillis(); |
| Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1); |
| CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>( |
| segmentIdString, |
| cacheKeyPrefix, |
| segmentDescriptor, |
| actualDataInterval, |
| objectMapper, |
| cache, |
| toolChest, |
| metricsEmittingQueryRunnerInner, |
| cachePopulator, |
| cacheConfig |
| ); |
| |
| BySegmentQueryRunner<T> bySegmentQueryRunner = new BySegmentQueryRunner<>( |
| segmentId, |
| segmentInterval.getStart(), |
| cachingQueryRunner |
| ); |
| |
| MetricsEmittingQueryRunner<T> metricsEmittingQueryRunnerOuter = new MetricsEmittingQueryRunner<>( |
| emitter, |
| toolChest, |
| bySegmentQueryRunner, |
| QueryMetrics::reportSegmentAndCacheTime, |
| queryMetrics -> queryMetrics.segment(segmentIdString) |
| ).withWaitMeasuredFromNow(); |
| |
| SpecificSegmentQueryRunner<T> specificSegmentQueryRunner = new SpecificSegmentQueryRunner<>( |
| metricsEmittingQueryRunnerOuter, |
| segmentSpec |
| ); |
| |
| PerSegmentOptimizingQueryRunner<T> perSegmentOptimizingQueryRunner = new PerSegmentOptimizingQueryRunner<>( |
| specificSegmentQueryRunner, |
| new PerSegmentQueryOptimizationContext(segmentDescriptor) |
| ); |
| |
| return new SetAndVerifyContextQueryRunner<>( |
| serverConfig, |
| CPUTimeMetricQueryRunner.safeBuild( |
| perSegmentOptimizingQueryRunner, |
| toolChest, |
| emitter, |
| cpuTimeAccumulator, |
| false |
| ) |
| ); |
| } |
| } |