blob: aba071de1dfc6ec24a015206a4f2b43455da502c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.segment.realtime.plumber.SinkSegmentReference;
import org.apache.druid.server.ResourceIdPopulatingQueryRunner;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Query handler for indexing tasks.
*/
public class SinkQuerySegmentWalker implements QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
private final String dataSource;
private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
private final ObjectMapper objectMapper;
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final QueryProcessingPool queryProcessingPool;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor> newIdToBasePendingSegment
= new ConcurrentHashMap<>();
public SinkQuerySegmentWalker(
String dataSource,
VersionedIntervalTimeline<String, Sink> sinkTimeline,
ObjectMapper objectMapper,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
QueryProcessingPool queryProcessingPool,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.sinkTimeline = Preconditions.checkNotNull(sinkTimeline, "sinkTimeline");
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool");
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
if (!cache.isLocal()) {
log.warn("Configured cache[%s] is not local, caching will not be enabled.", cache.getClass().getName());
}
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
final Iterable<SegmentDescriptor> specs = FunctionalIterable
.create(intervals)
.transformCat(sinkTimeline::lookup)
.transformCat(
holder -> FunctionalIterable
.create(holder.getObject())
.transform(
chunk -> new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
chunk.getChunkNumber()
)
)
);
return getQueryRunnerForSegments(query, specs);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
// We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out.
final DataSource dataSourceFromQuery = query.getDataSource();
final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
// Sanity check: make sure the query is based on the table we're meant to handle.
if (!analysis.getBaseTableDataSource().filter(ds -> dataSource.equals(ds.getName())).isPresent()) {
throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final boolean skipIncrementalSegment = query.context().getBoolean(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
// Make sure this query type can handle the subquery, if present.
if ((dataSourceFromQuery instanceof QueryDataSource)
&& !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) {
throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
}
// segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<SegmentReference, SegmentReference> segmentMapFn =
dataSourceFromQuery.createSegmentMapFunction(
query,
cpuTimeAccumulator
);
// We compute the join cache key here itself so it doesn't need to be re-computed for every segment
final Optional<byte[]> cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey());
// We need to report data for each Sink all-or-nothing, which means we need to acquire references for all
// subsegments (FireHydrants) of a segment (Sink) at once. To ensure they are properly released even when a
// query fails or is canceled, we acquire *all* sink reference upfront, and release them all when the main
// QueryRunner returned by this method is closed. (We can't do the acquisition and releasing at the level of
// each FireHydrant's runner, since then it wouldn't be properly all-or-nothing on a per-Sink basis.)
final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>();
try {
for (final SegmentDescriptor newDescriptor : specs) {
final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
descriptor.getInterval(),
descriptor.getVersion(),
descriptor.getPartitionNumber()
);
if (chunk == null) {
allRunners.put(
descriptor,
Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor))
);
continue;
}
final Sink theSink = chunk.getObject();
final SegmentId sinkSegmentId = theSink.getSegment().getId();
segmentIdMap.put(descriptor, sinkSegmentId);
final List<SinkSegmentReference> sinkSegmentReferences =
theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment);
if (sinkSegmentReferences == null) {
// We failed to acquire references for all subsegments. Bail and report the entire sink missing.
allRunners.put(
descriptor,
Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor))
);
} else if (sinkSegmentReferences.isEmpty()) {
allRunners.put(descriptor, Collections.singletonList(new NoopQueryRunner<>()));
} else {
allSegmentReferences.addAll(sinkSegmentReferences);
allRunners.put(
descriptor,
sinkSegmentReferences.stream().map(
segmentReference -> {
QueryRunner<T> runner = new MetricsEmittingQueryRunner<>(
emitter,
factory.getToolchest(),
factory.createRunner(segmentReference.getSegment()),
QueryMetrics::reportSegmentTime,
queryMetrics -> queryMetrics.segment(sinkSegmentId.toString())
);
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (segmentReference.isImmutable() && cache.isLocal()) {
StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter();
long segmentMinTime = storageAdapter.getMinTime().getMillis();
long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
runner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()),
cacheKeyPrefix,
descriptor,
actualDataInterval,
objectMapper,
cache,
toolChest,
runner,
// Always populate in foreground regardless of config
new ForegroundCachePopulator(
objectMapper,
cachePopulatorStats,
cacheConfig.getMaxEntrySize()
),
cacheConfig
);
}
// Regardless of whether caching is enabled, do reportSegmentAndCacheTime outside the
// *possible* caching.
runner = new MetricsEmittingQueryRunner<>(
emitter,
factory.getToolchest(),
runner,
QueryMetrics::reportSegmentAndCacheTime,
queryMetrics -> queryMetrics.segment(sinkSegmentId.toString())
).withWaitMeasuredFromNow();
// Emit CPU time metrics.
runner = CPUTimeMetricQueryRunner.safeBuild(
runner,
toolChest,
emitter,
cpuTimeAccumulator,
false
);
// Run with specific segment descriptor.
runner = new SpecificSegmentQueryRunner<>(
runner,
new SpecificSegmentSpec(descriptor)
);
return runner;
}
).collect(Collectors.toList())
);
}
}
final QueryRunner<T> mergedRunner;
if (query.context().isBySegment()) {
// bySegment: merge all hydrants for a Sink first, then merge Sinks. Necessary to keep results for the
// same segment together, but causes additional memory usage due to the extra layer of materialization,
// so we only do this if we need to.
mergedRunner = factory.mergeRunners(
queryProcessingPool,
allRunners.entrySet().stream().map(
entry -> new BySegmentQueryRunner<>(
segmentIdMap.get(entry.getKey()),
entry.getKey().getInterval().getStart(),
factory.mergeRunners(
DirectQueryProcessingPool.INSTANCE,
entry.getValue()
)
)
).collect(Collectors.toList())
);
} else {
// Not bySegment: merge all hydrants at the same level, rather than grouped by Sink (segment).
mergedRunner = factory.mergeRunners(
queryProcessingPool,
new SinkQueryRunners<>(
allRunners.entrySet().stream().flatMap(
entry ->
entry.getValue().stream().map(
runner ->
Pair.of(entry.getKey().getInterval(), runner)
)
).collect(Collectors.toList()))
);
}
// 1) Populate resource id to the query
// 2) Merge results using the toolChest, finalize if necessary.
// 3) Measure CPU time of that operation.
// 4) Release all sink segment references.
return new ResourceIdPopulatingQueryRunner<>(
QueryRunnerHelper.makeClosingQueryRunner(
CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<>(toolChest.mergeResults(mergedRunner, true), toolChest),
toolChest,
emitter,
cpuTimeAccumulator,
true
),
() -> CloseableUtils.closeAll(allSegmentReferences)
)
);
}
catch (Throwable e) {
throw CloseableUtils.closeAndWrapInCatch(e, () -> CloseableUtils.closeAll(allSegmentReferences));
}
}
public void registerUpgradedPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec upgradedPendingSegment
)
{
newIdToBasePendingSegment.put(
upgradedPendingSegment.asSegmentId().toDescriptor(),
basePendingSegment.asSegmentId().toDescriptor()
);
}
@VisibleForTesting
String getDataSource()
{
return dataSource;
}
public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
{
return sinkTimeline;
}
public static String makeHydrantCacheIdentifier(final FireHydrant hydrant)
{
return makeHydrantCacheIdentifier(hydrant.getSegmentId(), hydrant.getCount());
}
public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final int hydrantNumber)
{
// Cache ID like segmentId_H0, etc. The 'H' disambiguates subsegment [foo_x_y_z partition 0 hydrant 1]
// from full segment [foo_x_y_z partition 1], and is therefore useful if we ever want the cache to mix full segments
// with subsegments (hydrants).
return segmentId + "_H" + hydrantNumber;
}
}