blob: 6097857cff8c6cf84cb9b336cf093e83295a7e66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* Query handler for indexing tasks.
*/
public class SinkQuerySegmentWalker implements QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
private final String dataSource;
private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
private final ObjectMapper objectMapper;
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final ExecutorService queryExecutorService;
private final JoinableFactoryWrapper joinableFactoryWrapper;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
public SinkQuerySegmentWalker(
String dataSource,
VersionedIntervalTimeline<String, Sink> sinkTimeline,
ObjectMapper objectMapper,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.sinkTimeline = Preconditions.checkNotNull(sinkTimeline, "sinkTimeline");
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
if (!cache.isLocal()) {
log.warn("Configured cache[%s] is not local, caching will not be enabled.", cache.getClass().getName());
}
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
final Iterable<SegmentDescriptor> specs = FunctionalIterable
.create(intervals)
.transformCat(sinkTimeline::lookup)
.transformCat(
holder -> FunctionalIterable
.create(holder.getObject())
.transform(
chunk -> new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
chunk.getChunkNumber()
)
)
);
return getQueryRunnerForSegments(query, specs);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
// We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
// Sanity check: make sure the query is based on the table we're meant to handle.
if (!analysis.getBaseTableDataSource().filter(ds -> dataSource.equals(ds.getName())).isPresent()) {
throw new ISE("Cannot handle datasource: %s", analysis.getDataSource());
}
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
// Make sure this query type can handle the subquery, if present.
if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
}
// segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn(
analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
analysis.getPreJoinableClauses(),
cpuTimeAccumulator,
analysis.getBaseQuery().orElse(query)
);
// We compute the join cache key here itself so it doesn't need to be re-computed for every segment
final Optional<byte[]> cacheKeyPrefix = analysis.isJoin()
? joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis)
: Optional.of(StringUtils.EMPTY_BYTES);
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
specs,
descriptor -> {
final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
descriptor.getInterval(),
descriptor.getVersion(),
descriptor.getPartitionNumber()
);
if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final Sink theSink = chunk.getObject();
final SegmentId sinkSegmentId = theSink.getSegment().getId();
Iterable<QueryRunner<T>> perHydrantRunners = new SinkQueryRunners<>(
Iterables.transform(
theSink,
hydrant -> {
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new Pair<>(hydrant.getSegmentDataInterval(), new NoopQueryRunner<>());
}
// Prevent the underlying segment from swapping when its being iterated
final Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable =
hydrant.getSegmentForQuery(segmentMapFn);
// if optional isn't present, we failed to acquire reference to the segment or any joinables
if (!maybeSegmentAndCloseable.isPresent()) {
return new Pair<>(
hydrant.getSegmentDataInterval(),
new ReportTimelineMissingSegmentQueryRunner<>(descriptor)
);
}
final Pair<SegmentReference, Closeable> segmentAndCloseable = maybeSegmentAndCloseable.get();
try {
QueryRunner<T> runner = factory.createRunner(segmentAndCloseable.lhs);
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
StorageAdapter storageAdapter = segmentAndCloseable.lhs.asStorageAdapter();
long segmentMinTime = storageAdapter.getMinTime().getMillis();
long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
runner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
cacheKeyPrefix,
descriptor,
actualDataInterval,
objectMapper,
cache,
toolChest,
runner,
// Always populate in foreground regardless of config
new ForegroundCachePopulator(
objectMapper,
cachePopulatorStats,
cacheConfig.getMaxEntrySize()
),
cacheConfig
);
}
// Make it always use Closeable to decrement()
runner = QueryRunnerHelper.makeClosingQueryRunner(
runner,
segmentAndCloseable.rhs
);
return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner);
}
catch (RuntimeException e) {
CloseQuietly.close(segmentAndCloseable.rhs);
throw e;
}
}
)
);
return new SpecificSegmentQueryRunner<>(
withPerSinkMetrics(
new BySegmentQueryRunner<>(
sinkSegmentId,
descriptor.getInterval().getStart(),
factory.mergeRunners(
Execs.directExecutor(),
perHydrantRunners
)
),
toolChest,
sinkSegmentId,
cpuTimeAccumulator
),
new SpecificSegmentSpec(descriptor)
);
}
);
final QueryRunner<T> mergedRunner =
toolChest.mergeResults(
factory.mergeRunners(
queryExecutorService,
perSegmentRunners
)
);
return CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<>(mergedRunner, toolChest),
toolChest,
emitter,
cpuTimeAccumulator,
true
);
}
@VisibleForTesting
String getDataSource()
{
return dataSource;
}
/**
* Decorates a Sink's query runner to emit query/segmentAndCache/time, query/segment/time, query/wait/time once
* each for the whole Sink. Also adds CPU time to cpuTimeAccumulator.
*/
private <T> QueryRunner<T> withPerSinkMetrics(
final QueryRunner<T> sinkRunner,
final QueryToolChest<T, ? extends Query<T>> queryToolChest,
final SegmentId sinkSegmentId,
final AtomicLong cpuTimeAccumulator
)
{
// Note: reportSegmentAndCacheTime and reportSegmentTime are effectively the same here. They don't split apart
// cache vs. non-cache due to the fact that Sinks may be partially cached and partially uncached. Making this
// better would need to involve another accumulator like the cpuTimeAccumulator that we could share with the
// sinkRunner.
String sinkSegmentIdString = sinkSegmentId.toString();
return CPUTimeMetricQueryRunner.safeBuild(
new MetricsEmittingQueryRunner<>(
emitter,
queryToolChest,
new MetricsEmittingQueryRunner<>(
emitter,
queryToolChest,
sinkRunner,
QueryMetrics::reportSegmentTime,
queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
),
QueryMetrics::reportSegmentAndCacheTime,
queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
).withWaitMeasuredFromNow(),
queryToolChest,
emitter,
cpuTimeAccumulator,
false
);
}
public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
{
return sinkTimeline;
}
public static String makeHydrantCacheIdentifier(FireHydrant input)
{
return input.getSegmentId() + "_" + input.getCount();
}
}