blob: da6d89f75b0525b22ef606e23f89b5c3e733720c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* Mimics the behavior of {@link org.apache.druid.client.CachingClusteredClient} when it queries data servers (like
* Historicals, which use {@link org.apache.druid.server.coordination.ServerManager}). Used by {@link QueryStackTests}.
*
* This class's logic is like a mashup of those two classes. With the right abstractions, it may be possible to get rid
* of this class and replace it with the production classes.
*/
public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
{
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines;
private final JoinableFactoryWrapper joinableFactoryWrapper;
private final QueryRunnerFactoryConglomerate conglomerate;
@Nullable
private final QueryScheduler scheduler;
TestClusterQuerySegmentWalker(
Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines,
JoinableFactory joinableFactory,
QueryRunnerFactoryConglomerate conglomerate,
@Nullable QueryScheduler scheduler
)
{
this.timelines = timelines;
this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
this.conglomerate = conglomerate;
this.scheduler = scheduler;
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
// Just like CachingClusteredClient, ignore "query" and defer action until the QueryRunner is called.
// Strange, but true. Required to get authentic behavior with UnionDataSources. (Although, it would be great if
// this wasn't required.)
return (queryPlus, responseContext) -> {
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryPlus.getQuery().getDataSource());
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", queryPlus.getQuery().getDataSource());
}
final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
FunctionalIterable<SegmentDescriptor> segmentDescriptors = FunctionalIterable
.create(intervals)
.transformCat(interval -> getSegmentsForTable(dataSourceName, interval))
.transform(WindowedSegment::getDescriptor);
return getQueryRunnerForSegments(queryPlus.getQuery(), segmentDescriptors).run(queryPlus, responseContext);
};
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", query.getDataSource());
}
final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
// Make sure this query type can handle the subquery, if present.
if (analysis.isQuery()
&& !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
}
final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn(
analysis.getPreJoinableClauses(),
new AtomicLong(),
analysis.getBaseQuery().orElse(query)
);
final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
makeTableRunner(toolChest, factory, getSegmentsForTable(dataSourceName, specs), segmentMapFn)
)
)
),
toolChest
);
// Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
// This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
// to function properly. SegmentServerSelector does not currently mimic CachingClusteredClient, it is using
// the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects
// to actually serve the queries
return (theQuery, responseContext) -> {
responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
responseContext.add(
Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
new NonnullPair<>(theQuery.getQuery().getMostSpecificId(), 0)
);
if (scheduler != null) {
Set<SegmentServerSelector> segments = new HashSet<>();
specs.forEach(spec -> segments.add(new SegmentServerSelector(spec)));
return scheduler.run(
scheduler.prioritizeAndLaneQuery(theQuery, segments),
new LazySequence<>(
() -> baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(
theQuery.getQuery(),
ImmutableList.copyOf(specs)
)),
responseContext
)
)
);
} else {
return baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))),
responseContext
);
}
};
}
private <T> QueryRunner<T> makeTableRunner(
final QueryToolChest<T, Query<T>> toolChest,
final QueryRunnerFactory<T, Query<T>> factory,
final Iterable<WindowedSegment> segments,
final Function<SegmentReference, SegmentReference> segmentMapFn
)
{
final List<WindowedSegment> segmentsList = Lists.newArrayList(segments);
if (segmentsList.isEmpty()) {
// Note: this is not correct when there's a right or full outer join going on.
// See https://github.com/apache/druid/issues/9229 for details.
return new NoopQueryRunner<>();
}
return new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
Execs.directExecutor(),
FunctionalIterable
.create(segmentsList)
.transform(
segment ->
new SpecificSegmentQueryRunner<>(
factory.createRunner(segmentMapFn.apply(ReferenceCountingSegment.wrapRootGenerationSegment(
segment.getSegment()))),
new SpecificSegmentSpec(segment.getDescriptor())
)
)
)
),
toolChest
);
}
private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Interval interval)
{
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
if (timeline == null) {
return Collections.emptyList();
} else {
final List<WindowedSegment> retVal = new ArrayList<>();
for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : timeline.lookup(interval)) {
for (PartitionChunk<ReferenceCountingSegment> chunk : holder.getObject()) {
retVal.add(new WindowedSegment(chunk.getObject(), holder.getInterval()));
}
}
return retVal;
}
}
private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Iterable<SegmentDescriptor> specs)
{
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
if (timeline == null) {
return Collections.emptyList();
} else {
final List<WindowedSegment> retVal = new ArrayList<>();
for (SegmentDescriptor spec : specs) {
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
spec.getInterval(),
spec.getVersion()
);
retVal.add(new WindowedSegment(entry.getChunk(spec.getPartitionNumber()).getObject(), spec.getInterval()));
}
return retVal;
}
}
private static class WindowedSegment
{
private final Segment segment;
private final Interval interval;
public WindowedSegment(Segment segment, Interval interval)
{
this.segment = segment;
this.interval = interval;
Preconditions.checkArgument(segment.getId().getInterval().contains(interval));
}
public Segment getSegment()
{
return segment;
}
public Interval getInterval()
{
return interval;
}
public SegmentDescriptor getDescriptor()
{
return new SegmentDescriptor(interval, segment.getId().getVersion(), segment.getId().getPartitionNum());
}
}
}