blob: e5846a9f05aca91e3e553445bfb31af73c92cbca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.schema;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.io.IOException;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ManageLifecycle
public class DruidSchema extends AbstractSchema
{
// Newest segments first, so they override older ones.
private static final Comparator<SegmentId> SEGMENT_ORDER = Comparator
.comparing((SegmentId segmentId) -> segmentId.getInterval().getStart())
.reversed()
.thenComparing(Function.identity());
private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private static final long DEFAULT_NUM_ROWS = 0;
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerConfig config;
// Escalator, so we can attach an authentication result to queries we generate.
private final Escalator escalator;
private final SegmentManager segmentManager;
private final JoinableFactory joinableFactory;
private final ExecutorService cacheExec;
private final ExecutorService callbackExec;
/**
* Map of DataSource -> DruidTable.
* This map can be accessed by {@link #cacheExec} and {@link #callbackExec} threads.
*/
private final ConcurrentMap<String, DruidTable> tables = new ConcurrentHashMap<>();
/**
* DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment.
* Use SortedMap for segments so they are merged in deterministic order, from older to newer.
*
* This map is updated by these two threads.
*
* - {@link #callbackExec} can update it in {@link #addSegment}, {@link #removeServerSegment},
* and {@link #removeSegment}.
* - {@link #cacheExec} can update it in {@link #refreshSegmentsForDataSource}.
*
* While it is being updated, this map is read by these two types of thread.
*
* - {@link #cacheExec} can iterate all {@link AvailableSegmentMetadata}s per datasource.
* See {@link #buildDruidTable}.
* - Query threads can create a snapshot of the entire map for processing queries on the system table.
* See {@link #getSegmentMetadataSnapshot()}.
*
* As the access pattern of this map is read-intensive, we should minimize the contention between writers and readers.
* Since there are two threads that can update this map at the same time, those writers should lock the inner map
* first and then lock the entry before it updates segment metadata. This can be done using
* {@link ConcurrentMap#compute} as below. Note that, if you need to update the variables guarded by {@link #lock}
* inside of compute(), you should get the lock before calling compute() to keep the function executed in compute()
* not expensive.
*
* <pre>
* segmentMedataInfo.compute(
* datasourceParam,
* (datasource, segmentsMap) -> {
* if (segmentsMap == null) return null;
* else {
* segmentsMap.compute(
* segmentIdParam,
* (segmentId, segmentMetadata) -> {
* // update segmentMetadata
* }
* );
* return segmentsMap;
* }
* }
* );
* </pre>
*
* Readers can simply delegate the locking to the concurrent map and iterate map entries.
*/
private final ConcurrentHashMap<String, ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata>> segmentMetadataInfo
= new ConcurrentHashMap<>();
// For awaitInitialization.
private final CountDownLatch initialized = new CountDownLatch(1);
/**
* This lock coordinates the access from multiple threads to those variables guarded by this lock.
* Currently, there are 2 threads that can access these variables.
*
* - {@link #callbackExec} executes the timeline callbacks whenever BrokerServerView changes.
* - {@link #cacheExec} periodically refreshes segment metadata and {@link DruidTable} if necessary
* based on the information collected via timeline callbacks.
*/
private final Object lock = new Object();
// All mutable segments.
@GuardedBy("lock")
private final TreeSet<SegmentId> mutableSegments = new TreeSet<>(SEGMENT_ORDER);
// All dataSources that need tables regenerated.
@GuardedBy("lock")
private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
// All segments that need to be refreshed.
@GuardedBy("lock")
private final TreeSet<SegmentId> segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER);
@GuardedBy("lock")
private boolean refreshImmediately = false;
@GuardedBy("lock")
private boolean isServerViewInitialized = false;
/**
* Counts the total number of known segments. This variable is used only for the segments table in the system schema
* to initialize a map with a more proper size when it creates a snapshot. As a result, it doesn't have to be exact,
* and thus there is no concurrency control for this variable.
*/
private int totalSegments = 0;
@Inject
public DruidSchema(
final QueryLifecycleFactory queryLifecycleFactory,
final TimelineServerView serverView,
final SegmentManager segmentManager,
final JoinableFactory joinableFactory,
final PlannerConfig config,
final Escalator escalator
)
{
this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
Preconditions.checkNotNull(serverView, "serverView");
this.segmentManager = segmentManager;
this.joinableFactory = joinableFactory;
this.config = Preconditions.checkNotNull(config, "config");
this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d");
this.escalator = escalator;
serverView.registerTimelineCallback(
callbackExec,
new TimelineServerView.TimelineCallback()
{
@Override
public ServerView.CallbackAction timelineInitialized()
{
synchronized (lock) {
isServerViewInitialized = true;
lock.notifyAll();
}
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment)
{
addSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(final DataSegment segment)
{
removeSegment(segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction serverSegmentRemoved(
final DruidServerMetadata server,
final DataSegment segment
)
{
removeServerSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
@LifecycleStart
public void start() throws InterruptedException
{
cacheExec.submit(
() -> {
long lastRefresh = 0L;
long lastFailure = 0L;
try {
while (!Thread.currentThread().isInterrupted()) {
final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
final Set<String> dataSourcesToRebuild = new TreeSet<>();
try {
synchronized (lock) {
final long nextRefreshNoFuzz = DateTimes
.utc(lastRefresh)
.plus(config.getMetadataRefreshPeriod())
.getMillis();
// Fuzz a bit to spread load out when we have multiple brokers.
final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
while (true) {
// Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
final boolean wasRecentFailure = DateTimes.utc(lastFailure)
.plus(config.getMetadataRefreshPeriod())
.isAfterNow();
if (isServerViewInitialized &&
!wasRecentFailure &&
(!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
(refreshImmediately || nextRefresh < System.currentTimeMillis())) {
// We need to do a refresh. Break out of the waiting loop.
break;
}
// lastFailure != 0L means exceptions happened before and there're some refresh work was not completed.
// so that even ServerView is initialized, we can't let broker complete initialization.
if (isServerViewInitialized && lastFailure == 0L) {
// Server view is initialized, but we don't need to do a refresh. Could happen if there are
// no segments in the system yet. Just mark us as initialized, then.
initialized.countDown();
}
// Wait some more, we'll wake up when it might be time to do another refresh.
lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
}
segmentsToRefresh.addAll(segmentsNeedingRefresh);
segmentsNeedingRefresh.clear();
// Mutable segments need a refresh every period, since new columns could be added dynamically.
segmentsNeedingRefresh.addAll(mutableSegments);
lastFailure = 0L;
lastRefresh = System.currentTimeMillis();
refreshImmediately = false;
}
refresh(segmentsToRefresh, dataSourcesToRebuild);
initialized.countDown();
}
catch (InterruptedException e) {
// Fall through.
throw e;
}
catch (Exception e) {
log.warn(e, "Metadata refresh failed, trying again soon.");
synchronized (lock) {
// Add our segments and dataSources back to their refresh and rebuild lists.
segmentsNeedingRefresh.addAll(segmentsToRefresh);
dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
lastFailure = System.currentTimeMillis();
}
}
}
}
catch (InterruptedException e) {
// Just exit.
}
catch (Throwable e) {
// Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
// OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
log.makeAlert(e, "Metadata refresh failed permanently").emit();
throw e;
}
finally {
log.info("Metadata refresh stopped.");
}
}
);
if (config.isAwaitInitializationOnStart()) {
final long startNanos = System.nanoTime();
log.debug("%s waiting for initialization.", getClass().getSimpleName());
awaitInitialization();
log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), (System.nanoTime() - startNanos) / 1000000);
}
}
@VisibleForTesting
void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> dataSourcesToRebuild) throws IOException
{
// Refresh the segments.
final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
synchronized (lock) {
// Add missing segments back to the refresh list.
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
// Compute the list of dataSources to rebuild tables for.
dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
dataSourcesNeedingRebuild.clear();
}
// Rebuild the dataSources.
for (String dataSource : dataSourcesToRebuild) {
final DruidTable druidTable = buildDruidTable(dataSource);
final DruidTable oldTable = tables.put(dataSource, druidTable);
final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource";
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.getRowSignature());
} else {
log.debug("%s [%s] signature is unchanged.", description, dataSource);
}
}
}
@LifecycleStop
public void stop()
{
cacheExec.shutdownNow();
callbackExec.shutdownNow();
}
public void awaitInitialization() throws InterruptedException
{
initialized.await();
}
@Override
protected Map<String, Table> getTableMap()
{
return ImmutableMap.copyOf(tables);
}
@VisibleForTesting
void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
// broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately
if (server.getType().equals(ServerType.BROKER)) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it needs to be globalized
markDataSourceAsNeedRebuild(segment.getDataSource());
} else {
segmentMetadataInfo.compute(
segment.getDataSource(),
(datasource, segmentsMap) -> {
if (segmentsMap == null) {
segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
}
segmentsMap.compute(
segment.getId(),
(segmentId, segmentMetadata) -> {
if (segmentMetadata == null) {
// Unknown segment.
totalSegments++;
// segmentReplicatable is used to determine if segments are served by historical or realtime servers
long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1;
segmentMetadata = AvailableSegmentMetadata
.builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS)
.build();
markSegmentAsNeedRefresh(segment.getId());
if (!server.isSegmentReplicationTarget()) {
log.debug("Added new mutable segment[%s].", segment.getId());
markSegmentAsMutable(segment.getId());
} else {
log.debug("Added new immutable segment[%s].", segment.getId());
}
} else {
// We know this segment.
final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<DruidServerMetadata> servers = new ImmutableSet.Builder<DruidServerMetadata>()
.addAll(segmentServers)
.add(server)
.build();
segmentMetadata = AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
.withRealtime(recomputeIsRealtime(servers))
.build();
if (server.isSegmentReplicationTarget()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers.
unmarkSegmentAsMutable(segment.getId());
log.debug("Segment[%s] has become immutable.", segment.getId());
}
}
assert segmentMetadata != null;
return segmentMetadata;
}
);
return segmentsMap;
}
);
}
if (!tables.containsKey(segment.getDataSource())) {
refreshImmediately = true;
}
lock.notifyAll();
}
}
@VisibleForTesting
void removeSegment(final DataSegment segment)
{
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment[%s] is gone.", segment.getId());
segmentsNeedingRefresh.remove(segment.getId());
unmarkSegmentAsMutable(segment.getId());
segmentMetadataInfo.compute(
segment.getDataSource(),
(dataSource, segmentsMap) -> {
if (segmentsMap == null) {
log.warn("Unknown segment[%s] was removed from the cluster. Ignoring this event.", segment.getId());
return null;
} else {
if (segmentsMap.remove(segment.getId()) == null) {
log.warn("Unknown segment[%s] was removed from the cluster. Ignoring this event.", segment.getId());
} else {
totalSegments--;
}
if (segmentsMap.isEmpty()) {
tables.remove(segment.getDataSource());
log.info("dataSource[%s] no longer exists, all metadata removed.", segment.getDataSource());
return null;
} else {
markDataSourceAsNeedRebuild(segment.getDataSource());
return segmentsMap;
}
}
}
);
lock.notifyAll();
}
}
@VisibleForTesting
void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
segmentMetadataInfo.compute(
segment.getDataSource(),
(datasource, knownSegments) -> {
if (knownSegments == null) {
log.warn(
"Unknown segment[%s] is removed from server[%s]. Ignoring this event",
segment.getId(),
server.getHost()
);
return null;
}
if (server.getType().equals(ServerType.BROKER)) {
// for brokers, if the segment drops from all historicals before the broker this could be null.
if (!knownSegments.isEmpty()) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it might no longer be broadcast or something
markDataSourceAsNeedRebuild(segment.getDataSource());
}
} else {
knownSegments.compute(
segment.getId(),
(segmentId, segmentMetadata) -> {
if (segmentMetadata == null) {
log.warn(
"Unknown segment[%s] is removed from server[%s]. Ignoring this event",
segment.getId(),
server.getHost()
);
return null;
} else {
final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<DruidServerMetadata> servers = FluentIterable
.from(segmentServers)
.filter(Predicates.not(Predicates.equalTo(server)))
.toSet();
return AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
.withRealtime(recomputeIsRealtime(servers))
.build();
}
}
);
}
if (knownSegments.isEmpty()) {
return null;
} else {
return knownSegments;
}
}
);
lock.notifyAll();
}
}
private void markSegmentAsNeedRefresh(SegmentId segmentId)
{
synchronized (lock) {
segmentsNeedingRefresh.add(segmentId);
}
}
private void markSegmentAsMutable(SegmentId segmentId)
{
synchronized (lock) {
mutableSegments.add(segmentId);
}
}
private void unmarkSegmentAsMutable(SegmentId segmentId)
{
synchronized (lock) {
mutableSegments.remove(segmentId);
}
}
@VisibleForTesting
void markDataSourceAsNeedRebuild(String datasource)
{
synchronized (lock) {
dataSourcesNeedingRebuild.add(datasource);
}
}
/**
* Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed,
* which may be a subset of the asked-for set.
*/
@VisibleForTesting
Set<SegmentId> refreshSegments(final Set<SegmentId> segments) throws IOException
{
final Set<SegmentId> retVal = new HashSet<>();
// Organize segments by dataSource.
final Map<String, TreeSet<SegmentId>> segmentMap = new TreeMap<>();
for (SegmentId segmentId : segments) {
segmentMap.computeIfAbsent(segmentId.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER))
.add(segmentId);
}
for (Map.Entry<String, TreeSet<SegmentId>> entry : segmentMap.entrySet()) {
final String dataSource = entry.getKey();
retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue()));
}
return retVal;
}
private long recomputeIsRealtime(ImmutableSet<DruidServerMetadata> servers)
{
if (servers.isEmpty()) {
return 0;
}
final Optional<DruidServerMetadata> historicalServer = servers
.stream()
// Ideally, this filter should have checked whether it's a broadcast segment loaded in brokers.
// However, we don't current track of the broadcast segments loaded in brokers, so this filter is still valid.
// See addSegment(), removeServerSegment(), and removeSegment()
.filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL))
.findAny();
// if there is any historical server in the replicas, isRealtime flag should be unset
return historicalServer.isPresent() ? 0 : 1;
}
/**
* Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of
* segments actually refreshed, which may be a subset of the asked-for set.
*/
private Set<SegmentId> refreshSegmentsForDataSource(final String dataSource, final Set<SegmentId> segments)
throws IOException
{
if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) {
// Sanity check. We definitely expect this to pass.
throw new ISE("'segments' must all match 'dataSource'!");
}
log.debug("Refreshing metadata for dataSource[%s].", dataSource);
final long startTime = System.currentTimeMillis();
// Segment id string -> SegmentId object.
final Map<String, SegmentId> segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString);
final Set<SegmentId> retVal = new HashSet<>();
final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery(
queryLifecycleFactory,
Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY),
escalator.createEscalatedAuthenticationResult()
);
Yielder<SegmentAnalysis> yielder = Yielders.each(sequence);
try {
while (!yielder.isDone()) {
final SegmentAnalysis analysis = yielder.get();
final SegmentId segmentId = segmentIdMap.get(analysis.getId());
if (segmentId == null) {
log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId());
} else {
final RowSignature rowSignature = analysisToRowSignature(analysis);
log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature);
segmentMetadataInfo.compute(
dataSource,
(datasourceKey, dataSourceSegments) -> {
if (dataSourceSegments == null) {
// Datasource may have been removed or become unavailable while this refresh was ongoing.
log.warn(
"No segment map found with datasource[%s], skipping refresh of segment[%s]",
datasourceKey,
segmentId
);
return null;
} else {
dataSourceSegments.compute(
segmentId,
(segmentIdKey, segmentMetadata) -> {
if (segmentMetadata == null) {
log.warn("No segment[%s] found, skipping refresh", segmentId);
return null;
} else {
final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata
.from(segmentMetadata)
.withRowSignature(rowSignature)
.withNumRows(analysis.getNumRows())
.build();
retVal.add(segmentId);
return updatedSegmentMetadata;
}
}
);
if (dataSourceSegments.isEmpty()) {
return null;
} else {
return dataSourceSegments;
}
}
}
);
}
yielder = yielder.next(null);
}
}
finally {
yielder.close();
}
log.debug(
"Refreshed metadata for dataSource[%s] in %,d ms (%d segments queried, %d segments left).",
dataSource,
System.currentTimeMillis() - startTime,
retVal.size(),
segments.size() - retVal.size()
);
return retVal;
}
@VisibleForTesting
DruidTable buildDruidTable(final String dataSource)
{
ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentsMap = segmentMetadataInfo.get(dataSource);
final Map<String, ValueType> columnTypes = new TreeMap<>();
if (segmentsMap != null) {
for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) {
final RowSignature rowSignature = availableSegmentMetadata.getRowSignature();
if (rowSignature != null) {
for (String column : rowSignature.getColumnNames()) {
// Newer column types should override older ones.
final ValueType columnType =
rowSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column[%s]", column));
columnTypes.putIfAbsent(column, columnType);
}
}
}
}
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);
final TableDataSource tableDataSource;
// to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing
// in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here)
// to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources
// at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource
// if also joinable
final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource);
final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal);
final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource);
if (isBroadcast && isJoinable) {
tableDataSource = maybeGlobal;
} else {
tableDataSource = new TableDataSource(dataSource);
}
return new DruidTable(tableDataSource, builder.build(), isJoinable, isBroadcast);
}
@VisibleForTesting
Map<SegmentId, AvailableSegmentMetadata> getSegmentMetadataSnapshot()
{
final Map<SegmentId, AvailableSegmentMetadata> segmentMetadata = Maps.newHashMapWithExpectedSize(totalSegments);
for (ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> val : segmentMetadataInfo.values()) {
segmentMetadata.putAll(val);
}
return segmentMetadata;
}
/**
* Returns total number of segments. This method doesn't use the lock intentionally to avoid expensive contention.
* As a result, the returned value might be inexact.
*/
int getTotalSegments()
{
return totalSegments;
}
@VisibleForTesting
TreeSet<SegmentId> getSegmentsNeedingRefresh()
{
synchronized (lock) {
return segmentsNeedingRefresh;
}
}
@VisibleForTesting
TreeSet<SegmentId> getMutableSegments()
{
synchronized (lock) {
return mutableSegments;
}
}
@VisibleForTesting
Set<String> getDataSourcesNeedingRebuild()
{
synchronized (lock) {
return dataSourcesNeedingRebuild;
}
}
private static Sequence<SegmentAnalysis> runSegmentMetadataQuery(
final QueryLifecycleFactory queryLifecycleFactory,
final Iterable<SegmentId> segments,
final AuthenticationResult authenticationResult
)
{
// Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource.
final String dataSource = Iterables.getOnlyElement(
StreamSupport.stream(segments.spliterator(), false)
.map(SegmentId::getDataSource).collect(Collectors.toSet())
);
final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec(
StreamSupport.stream(segments.spliterator(), false)
.map(SegmentId::toDescriptor).collect(Collectors.toList())
);
final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(dataSource),
querySegmentSpec,
new AllColumnIncluderator(),
false,
ImmutableMap.of(),
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
);
return queryLifecycleFactory.factorize().runSimple(segmentMetadataQuery, authenticationResult, Access.OK);
}
private static RowSignature analysisToRowSignature(final SegmentAnalysis analysis)
{
final RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
for (Map.Entry<String, ColumnAnalysis> entry : analysis.getColumns().entrySet()) {
if (entry.getValue().isError()) {
// Skip columns with analysis errors.
continue;
}
ValueType valueType;
try {
valueType = ValueType.valueOf(StringUtils.toUpperCase(entry.getValue().getType()));
}
catch (IllegalArgumentException e) {
// Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly
// what kind of complex column it is, which we may want to preserve some day.
valueType = ValueType.COMPLEX;
}
rowSignatureBuilder.add(entry.getKey(), valueType);
}
return rowSignatureBuilder.build();
}
/**
* This method is not thread-safe and must be used only in unit tests.
*/
@VisibleForTesting
void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegmentMetadata availableSegmentMetadata)
{
final ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> dataSourceSegments = segmentMetadataInfo
.computeIfAbsent(
segmentId.getDataSource(),
k -> new ConcurrentSkipListMap<>(SEGMENT_ORDER)
);
if (dataSourceSegments.put(segmentId, availableSegmentMetadata) == null) {
totalSegments++;
}
}
/**
* This is a helper method for unit tests to emulate heavy work done with {@link #lock}.
* It must be used only in unit tests.
*/
@VisibleForTesting
void doInLock(Runnable runnable)
{
synchronized (lock) {
runnable.run();
}
}
}