blob: 486ad46920b8d60fd91c456e4b5f775161612812 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CollectionUtils;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* This class is responsible for managing data sources and their states like timeline, total segment size, and number of
* segments. All public methods of this class must be thread-safe.
*/
public class SegmentManager
{
private static final EmittingLogger log = new EmittingLogger(SegmentManager.class);
private final SegmentLoader segmentLoader;
private final ConcurrentHashMap<String, DataSourceState> dataSources = new ConcurrentHashMap<>();
/**
* Represent the state of a data source including the timeline, total segment size, and number of segments.
*/
public static class DataSourceState
{
private final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline =
new VersionedIntervalTimeline<>(Ordering.natural());
private final ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable> tablesLookup = new ConcurrentHashMap<>();
private long totalSegmentSize;
private long numSegments;
private void addSegment(DataSegment segment)
{
totalSegmentSize += segment.getSize();
numSegments++;
}
private void removeSegment(DataSegment segment)
{
totalSegmentSize -= segment.getSize();
numSegments--;
}
public VersionedIntervalTimeline<String, ReferenceCountingSegment> getTimeline()
{
return timeline;
}
public ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable> getTablesLookup()
{
return tablesLookup;
}
public long getTotalSegmentSize()
{
return totalSegmentSize;
}
public long getNumSegments()
{
return numSegments;
}
public boolean isEmpty()
{
return numSegments == 0;
}
}
@Inject
public SegmentManager(
SegmentLoader segmentLoader
)
{
this.segmentLoader = segmentLoader;
}
@VisibleForTesting
Map<String, DataSourceState> getDataSources()
{
return dataSources;
}
/**
* Returns a map of dataSource to the total byte size of segments managed by this segmentManager. This method should
* be used carefully because the returned map might be different from the actual data source states.
*
* @return a map of dataSources and their total byte sizes
*/
public Map<String, Long> getDataSourceSizes()
{
return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize);
}
public Set<String> getDataSourceNames()
{
return dataSources.keySet();
}
/**
* Returns a map of dataSource to the number of segments managed by this segmentManager. This method should be
* carefully because the returned map might be different from the actual data source states.
*
* @return a map of dataSources and number of segments
*/
public Map<String, Long> getDataSourceCounts()
{
return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments);
}
/**
* Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based
* datasource of a single table.
*
* @param analysis data source analysis information
*
* @return timeline, if it exists
*
* @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table
*/
public Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> getTimeline(DataSourceAnalysis analysis)
{
final TableDataSource tableDataSource = getTableDataSource(analysis);
return Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTimeline);
}
/**
* Returns the collection of {@link IndexedTable} for the entire timeline (since join conditions do not currently
* consider the queries intervals), if the timeline exists for each of its segments that are joinable.
*/
public Optional<Stream<ReferenceCountingIndexedTable>> getIndexedTables(DataSourceAnalysis analysis)
{
return getTimeline(analysis).map(timeline -> {
// join doesn't currently consider intervals, so just consider all segments
final Stream<ReferenceCountingSegment> segments =
timeline.lookup(Intervals.ETERNITY)
.stream()
.flatMap(x -> StreamSupport.stream(x.getObject().payloads().spliterator(), false));
final TableDataSource tableDataSource = getTableDataSource(analysis);
ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable> tables =
Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTablesLookup)
.orElseThrow(() -> new ISE("Datasource %s does not have IndexedTables", tableDataSource.getName()));
return segments.map(segment -> tables.get(segment.getId())).filter(Objects::nonNull);
});
}
public boolean hasIndexedTables(String dataSourceName)
{
if (dataSources.containsKey(dataSourceName)) {
return dataSources.get(dataSourceName).tablesLookup.size() > 0;
}
return false;
}
private TableDataSource getTableDataSource(DataSourceAnalysis analysis)
{
return analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
}
/**
* Load a single segment.
*
* @param segment segment to load
* @param lazy whether to lazy load columns metadata
* @param loadFailed callBack to execute when segment lazy load failed
*
* @return true if the segment was newly loaded, false if it was already loaded
*
* @throws SegmentLoadingException if the segment cannot be loaded
*/
public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
final Segment adapter = getAdapter(segment, lazy, loadFailed);
final SettableSupplier<Boolean> resultSupplier = new SettableSupplier<>();
// compute() is used to ensure that the operation for a data source is executed atomically
dataSources.compute(
segment.getDataSource(),
(k, v) -> {
final DataSourceState dataSourceState = v == null ? new DataSourceState() : v;
final VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals =
dataSourceState.getTimeline();
final PartitionChunk<ReferenceCountingSegment> entry = loadedIntervals.findChunk(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
if (entry != null) {
log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId());
resultSupplier.set(false);
} else {
IndexedTable table = adapter.as(IndexedTable.class);
if (table != null) {
if (dataSourceState.isEmpty() || dataSourceState.numSegments == dataSourceState.tablesLookup.size()) {
dataSourceState.tablesLookup.put(segment.getId(), new ReferenceCountingIndexedTable(table));
} else {
log.error("Cannot load segment[%s] with IndexedTable, no existing segments are joinable", segment.getId());
}
} else if (dataSourceState.tablesLookup.size() > 0) {
log.error("Cannot load segment[%s] without IndexedTable, all existing segments are joinable", segment.getId());
}
loadedIntervals.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(
ReferenceCountingSegment.wrapSegment(adapter, segment.getShardSpec())
)
);
dataSourceState.addSegment(segment);
resultSupplier.set(true);
}
return dataSourceState;
}
);
return resultSupplier.get();
}
private Segment getAdapter(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
final Segment adapter;
try {
adapter = segmentLoader.getSegment(segment, lazy, loadFailed);
}
catch (SegmentLoadingException e) {
segmentLoader.cleanup(segment);
throw e;
}
if (adapter == null) {
throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec());
}
return adapter;
}
public void dropSegment(final DataSegment segment)
{
final String dataSource = segment.getDataSource();
// compute() is used to ensure that the operation for a data source is executed atomically
dataSources.compute(
dataSource,
(dataSourceName, dataSourceState) -> {
if (dataSourceState == null) {
log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSourceName);
return null;
} else {
final VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals =
dataSourceState.getTimeline();
final ShardSpec shardSpec = segment.getShardSpec();
final PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
segment.getInterval(),
segment.getVersion(),
// remove() internally searches for a partitionChunk to remove which is *equal* to the given
// partitionChunk. Note that partitionChunk.equals() checks only the partitionNum, but not the object.
segment.getShardSpec().createChunk(ReferenceCountingSegment.wrapSegment(null, shardSpec))
);
final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
if (oldQueryable != null) {
try (final Closer closer = Closer.create()) {
dataSourceState.removeSegment(segment);
closer.register(oldQueryable);
log.info("Attempting to close segment %s", segment.getId());
final ReferenceCountingIndexedTable oldTable = dataSourceState.tablesLookup.remove(segment.getId());
if (oldTable != null) {
closer.register(oldTable);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version[%s] that I don't have.",
dataSourceName,
segment.getInterval(),
segment.getVersion()
);
}
// Returning null removes the entry of dataSource from the map
return dataSourceState.isEmpty() ? null : dataSourceState;
}
}
);
segmentLoader.cleanup(segment);
}
}