blob: 1da0fbabf9c947f6e33a3d426bda84339fee870c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.util;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.LookupSegmentWrangler;
import org.apache.druid.segment.MapSegmentWrangler;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactoryTest;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* A self-contained class that executes queries similarly to the normal Druid query stack.
*
* {@link ClientQuerySegmentWalker}, the same class that Brokers use as the entry point for their query stack, is
* used directly. It, and the sub-walkers it needs, are created by {@link QueryStackTests}.
*/
public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, Closeable
{
private final QuerySegmentWalker walker;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines = new HashMap<>();
private final List<Closeable> closeables = new ArrayList<>();
private final List<DataSegment> segments = new ArrayList<>();
/**
* Create an instance using the provided query runner factory conglomerate and lookup provider.
* If a JoinableFactory is provided, it will be used instead of the default. If a scheduler is included,
* the runner will schedule queries according to the scheduling config.
*/
public SpecificSegmentsQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable final JoinableFactory joinableFactory,
final QueryScheduler scheduler
)
{
final JoinableFactory joinableFactoryToUse;
if (joinableFactory == null) {
joinableFactoryToUse = MapJoinableFactoryTest.fromMap(
ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
.put(InlineDataSource.class, new InlineJoinableFactory())
.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider))
.build()
);
} else {
joinableFactoryToUse = joinableFactory;
}
this.walker = QueryStackTests.createClientQuerySegmentWalker(
QueryStackTests.createClusterQuerySegmentWalker(
timelines,
joinableFactoryToUse,
conglomerate,
scheduler
),
QueryStackTests.createLocalQuerySegmentWalker(
conglomerate,
new MapSegmentWrangler(
ImmutableMap.<Class<? extends DataSource>, SegmentWrangler>builder()
.put(InlineDataSource.class, new InlineSegmentWrangler())
.put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider))
.build()
),
joinableFactoryToUse,
scheduler
),
conglomerate,
new ServerConfig()
);
}
/**
* Create an instance without any lookups and with a default {@link JoinableFactory} that handles only inline
* datasources.
*/
public SpecificSegmentsQuerySegmentWalker(final QueryRunnerFactoryConglomerate conglomerate)
{
this(
conglomerate,
new LookupExtractorFactoryContainerProvider()
{
@Override
public Set<String> getAllLookupNames()
{
return Collections.emptySet();
}
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
return Optional.empty();
}
},
null,
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}
public SpecificSegmentsQuerySegmentWalker add(final DataSegment descriptor, final QueryableIndex index)
{
final Segment segment = new QueryableIndexSegment(index, descriptor.getId());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.computeIfAbsent(
descriptor.getDataSource(),
datasource -> new VersionedIntervalTimeline<>(Ordering.natural())
);
timeline.add(
descriptor.getInterval(),
descriptor.getVersion(),
descriptor.getShardSpec().createChunk(ReferenceCountingSegment.wrapSegment(segment, descriptor.getShardSpec()))
);
segments.add(descriptor);
closeables.add(index);
return this;
}
public List<DataSegment> getSegments()
{
return segments;
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
return walker.getQueryRunnerForIntervals(query, intervals);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
return walker.getQueryRunnerForSegments(query, specs);
}
@Override
public void close() throws IOException
{
for (Closeable closeable : closeables) {
Closeables.close(closeable, true);
}
}
}