blob: dad0b78ea77872bcd0f8bab2401166cdff91ab4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.metadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.segment.SchemaPayloadPlus;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Coordinator-side cache of segment metadata that combines segments to build
* datasources. The cache provides metadata about a datasource, see {@link DataSourceInformation}.
* <p>
* Major differences from the other implementation {@code BrokerSegmentMetadataCache} are,
* <li>The refresh is executed only on the leader Coordinator node.</li>
* <li>Realtime segment schema refresh. Schema update for realtime segment is pushed periodically.
* The schema is merged with any existing schema for the segment and the cache is updated.
* Corresponding datasource is marked for refresh.</li>
* <li>The refresh mechanism is significantly different from the other implementation,
* <ul><li>Metadata query is executed only for those non-realtime segments for which the schema is not cached.</li>
* <li>Datasources marked for refresh are then rebuilt.</li></ul>
* </li>
*/
@ManageLifecycle
public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCache<DataSourceInformation>
{
private static final EmittingLogger log = new EmittingLogger(CoordinatorSegmentMetadataCache.class);
private final SegmentMetadataCacheConfig config;
private final ColumnTypeMergePolicy columnTypeMergePolicy;
private final SegmentSchemaCache segmentSchemaCache;
private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
private @Nullable Future<?> cacheExecFuture = null;
@Inject
public CoordinatorSegmentMetadataCache(
QueryLifecycleFactory queryLifecycleFactory,
CoordinatorServerView serverView,
SegmentMetadataCacheConfig config,
Escalator escalator,
InternalQueryConfig internalQueryConfig,
ServiceEmitter emitter,
SegmentSchemaCache segmentSchemaCache,
SegmentSchemaBackFillQueue segmentSchemaBackfillQueue
)
{
super(queryLifecycleFactory, config, escalator, internalQueryConfig, emitter);
this.config = config;
this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy();
this.segmentSchemaCache = segmentSchemaCache;
this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
initServerViewTimelineCallback(serverView);
}
private void initServerViewTimelineCallback(final CoordinatorServerView serverView)
{
serverView.registerTimelineCallback(
callbackExec,
new TimelineServerView.TimelineCallback()
{
@Override
public ServerView.CallbackAction timelineInitialized()
{
synchronized (lock) {
isServerViewInitialized = true;
lock.notifyAll();
}
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentAdded(final DruidServerMetadata server, final DataSegment segment)
{
addSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(final DataSegment segment)
{
removeSegment(segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction serverSegmentRemoved(
final DruidServerMetadata server,
final DataSegment segment
)
{
removeServerSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
{
updateSchemaForRealtimeSegments(segmentSchemas);
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
@LifecycleStart
@Override
public void start()
{
// noop, refresh is started only on leader node
}
@LifecycleStop
@Override
public void stop()
{
callbackExec.shutdownNow();
cacheExec.shutdownNow();
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
}
public void onLeaderStart()
{
log.info("Initializing cache on leader node.");
try {
segmentSchemaBackfillQueue.onLeaderStart();
cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
if (config.isAwaitInitializationOnStart()) {
awaitInitialization();
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public void onLeaderStop()
{
log.info("No longer leader, stopping cache.");
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
}
/**
* This method ensures that the refresh goes through only when schemaCache is initialized.
*/
@Override
public synchronized void refreshWaitCondition() throws InterruptedException
{
segmentSchemaCache.awaitInitialization();
}
@Override
protected void unmarkSegmentAsMutable(SegmentId segmentId)
{
synchronized (lock) {
log.debug("SegmentId [%s] is marked as finalized.", segmentId);
mutableSegments.remove(segmentId);
// remove it from the realtime schema cache
segmentSchemaCache.realtimeSegmentRemoved(segmentId);
}
}
@Override
protected void removeSegmentAction(SegmentId segmentId)
{
log.debug("SegmentId [%s] is removed.", segmentId);
segmentSchemaCache.segmentRemoved(segmentId);
}
@Override
protected boolean fetchAggregatorsInSegmentMetadataQuery()
{
return true;
}
@Override
protected boolean segmentMetadataQueryResultHandler(
String dataSource,
SegmentId segmentId,
RowSignature rowSignature,
SegmentAnalysis analysis
)
{
AtomicBoolean added = new AtomicBoolean(false);
segmentMetadataInfo.compute(
dataSource,
(datasourceKey, dataSourceSegments) -> {
if (dataSourceSegments == null) {
// Datasource may have been removed or become unavailable while this refresh was ongoing.
log.warn(
"No segment map found with datasource [%s], skipping refresh of segment [%s]",
datasourceKey,
segmentId
);
return null;
} else {
dataSourceSegments.compute(
segmentId,
(segmentIdKey, segmentMetadata) -> {
if (segmentMetadata == null) {
log.warn("No segment [%s] found, skipping refresh", segmentId);
return null;
} else {
long numRows = analysis.getNumRows();
log.debug("Publishing segment schema. SegmentId [%s], RowSignature [%s], numRows [%d]", segmentId, rowSignature, numRows);
Map<String, AggregatorFactory> aggregators = analysis.getAggregators();
// cache the signature
segmentSchemaCache.addTemporaryMetadataQueryResult(segmentId, rowSignature, aggregators, numRows);
// queue the schema for publishing to the DB
segmentSchemaBackfillQueue.add(segmentId, rowSignature, aggregators, numRows);
added.set(true);
return segmentMetadata;
}
}
);
if (dataSourceSegments.isEmpty()) {
return null;
} else {
return dataSourceSegments;
}
}
}
);
return added.get();
}
@Override
public Map<SegmentId, AvailableSegmentMetadata> getSegmentMetadataSnapshot()
{
final Map<SegmentId, AvailableSegmentMetadata> segmentMetadata = Maps.newHashMapWithExpectedSize(getTotalSegments());
for (ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> val : segmentMetadataInfo.values()) {
for (Map.Entry<SegmentId, AvailableSegmentMetadata> entry : val.entrySet()) {
Optional<SchemaPayloadPlus> metadata = segmentSchemaCache.getSchemaForSegment(entry.getKey());
AvailableSegmentMetadata availableSegmentMetadata = entry.getValue();
if (metadata.isPresent()) {
availableSegmentMetadata = AvailableSegmentMetadata.from(entry.getValue())
.withRowSignature(metadata.get().getSchemaPayload().getRowSignature())
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(entry.getKey());
log.debug("SchemaMetadata for segmentId [%s] is absent.", entry.getKey());
}
segmentMetadata.put(entry.getKey(), availableSegmentMetadata);
}
}
return segmentMetadata;
}
@Nullable
@Override
public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, SegmentId segmentId)
{
ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentMap = segmentMetadataInfo.get(datasource);
AvailableSegmentMetadata availableSegmentMetadata = null;
if (segmentMap != null) {
availableSegmentMetadata = segmentMap.get(segmentId);
}
if (availableSegmentMetadata == null) {
return null;
}
Optional<SchemaPayloadPlus> metadata = segmentSchemaCache.getSchemaForSegment(segmentId);
if (metadata.isPresent()) {
availableSegmentMetadata = AvailableSegmentMetadata.from(availableSegmentMetadata)
.withRowSignature(metadata.get().getSchemaPayload().getRowSignature())
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
}
return availableSegmentMetadata;
}
/**
* Executes SegmentMetadataQuery to fetch schema information for each segment in the refresh list.
* The schema information for individual segments is combined to construct a table schema, which is then cached.
*
* @param segmentsToRefresh segments for which the schema might have changed
* @param dataSourcesToRebuild datasources for which the schema might have changed
* @throws IOException when querying segment from data nodes and tasks
*/
@Override
public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> dataSourcesToRebuild) throws IOException
{
log.debug("Segments to refresh [%s], dataSourcesToRebuild [%s]", segmentsToRefresh, dataSourcesToRebuild);
filterRealtimeSegments(segmentsToRefresh);
log.debug("SegmentsToRefreshMinusRealtimeSegments [%s]", segmentsToRefresh);
final Set<SegmentId> cachedSegments = filterSegmentWithCachedSchema(segmentsToRefresh);
log.debug("SegmentsToRefreshMinusCachedSegments [%s], cachedSegments [%s]", segmentsToRefresh, cachedSegments);
// Refresh the segments.
Set<SegmentId> refreshed = Collections.emptySet();
if (!config.isDisableSegmentMetadataQueries()) {
refreshed = refreshSegments(segmentsToRefresh);
log.debug("Refreshed segments are [%s]", refreshed);
}
synchronized (lock) {
// Add missing segments back to the refresh list.
segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
// Compute the list of datasources to rebuild tables for.
dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
cachedSegments.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
dataSourcesNeedingRebuild.clear();
}
log.debug("Datasources to rebuild are [%s]", dataSourcesToRebuild);
// Rebuild the datasources.
for (String dataSource : dataSourcesToRebuild) {
final RowSignature rowSignature = buildDataSourceRowSignature(dataSource);
if (rowSignature == null) {
log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", dataSource);
tables.remove(dataSource);
continue;
}
DataSourceInformation druidTable = new DataSourceInformation(dataSource, rowSignature);
final DataSourceInformation oldTable = tables.put(dataSource, druidTable);
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
log.info("[%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
} else {
log.debug("[%s] signature is unchanged.", dataSource);
}
}
}
private void filterRealtimeSegments(Set<SegmentId> segmentIds)
{
synchronized (lock) {
segmentIds.removeAll(mutableSegments);
}
}
private Set<SegmentId> filterSegmentWithCachedSchema(Set<SegmentId> segmentIds)
{
Set<SegmentId> cachedSegments = new HashSet<>();
for (SegmentId id : segmentIds) {
if (segmentSchemaCache.isSchemaCached(id)) {
cachedSegments.add(id);
}
}
segmentIds.removeAll(cachedSegments);
return cachedSegments;
}
@VisibleForTesting
@Nullable
@Override
public RowSignature buildDataSourceRowSignature(final String dataSource)
{
ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentsMap = segmentMetadataInfo.get(dataSource);
// Preserve order.
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
if (segmentsMap != null && !segmentsMap.isEmpty()) {
for (SegmentId segmentId : segmentsMap.keySet()) {
Optional<SchemaPayloadPlus> optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId);
if (optionalSchema.isPresent()) {
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
for (String column : rowSignature.getColumnNames()) {
final ColumnType columnType =
rowSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column));
columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType));
}
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
}
}
} else {
// table has no segments
return null;
}
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);
return builder.build();
}
/**
* Update schema for segments.
*/
@VisibleForTesting
void updateSchemaForRealtimeSegments(SegmentSchemas segmentSchemas)
{
log.debug("SchemaUpdate for realtime segments [%s].", segmentSchemas);
List<SegmentSchemas.SegmentSchema> segmentSchemaList = segmentSchemas.getSegmentSchemaList();
for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) {
String dataSource = segmentSchema.getDataSource();
SegmentId segmentId = SegmentId.tryParse(dataSource, segmentSchema.getSegmentId());
if (segmentId == null) {
log.error("Could not apply schema update. Failed parsing segmentId [%s]", segmentSchema.getSegmentId());
continue;
}
log.debug("Applying schema update for segmentId [%s] datasource [%s]", segmentId, dataSource);
segmentMetadataInfo.compute(
dataSource,
(dataSourceKey, segmentsMap) -> {
if (segmentsMap == null) {
// Datasource may have been removed or become unavailable while this refresh was ongoing.
log.warn(
"No segment map found with datasource [%s], skipping refresh of segment [%s]",
dataSourceKey,
segmentId
);
return null;
} else {
segmentsMap.compute(
segmentId,
(id, segmentMetadata) -> {
if (segmentMetadata == null) {
// By design, this case shouldn't arise since both segment and schema is announced in the same flow
// and messages shouldn't be lost in the poll
// also segment announcement should always precede schema announcement
// and there shouldn't be any schema updates for removed segments
log.makeAlert("Schema update [%s] for unknown segment [%s]", segmentSchema, segmentId).emit();
} else {
// We know this segment.
Optional<SchemaPayloadPlus> schemaMetadata = segmentSchemaCache.getSchemaForSegment(segmentId);
Optional<RowSignature> rowSignature =
mergeOrCreateRowSignature(
segmentId,
schemaMetadata.map(
segmentSchemaMetadata -> segmentSchemaMetadata.getSchemaPayload().getRowSignature())
.orElse(null),
segmentSchema
);
if (rowSignature.isPresent()) {
log.debug(
"Segment [%s] signature [%s] after applying schema update.",
segmentId,
rowSignature.get()
);
segmentSchemaCache.addRealtimeSegmentSchema(segmentId, rowSignature.get(), segmentSchema.getNumRows());
// mark the datasource for rebuilding
markDataSourceAsNeedRebuild(dataSource);
}
}
return segmentMetadata;
}
);
return segmentsMap;
}
}
);
}
}
/**
* Merge or create a new RowSignature using the existing RowSignature and schema update.
*/
@VisibleForTesting
Optional<RowSignature> mergeOrCreateRowSignature(
SegmentId segmentId,
@Nullable RowSignature existingSignature,
SegmentSchemas.SegmentSchema segmentSchema
)
{
if (!segmentSchema.isDelta()) {
// absolute schema
// override the existing signature
// this case could arise when the server restarts or counter mismatch between client and server
RowSignature.Builder builder = RowSignature.builder();
Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
for (String column : segmentSchema.getNewColumns()) {
builder.add(column, columnMapping.get(column));
}
return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build()));
} else if (existingSignature != null) {
// delta update
// merge with the existing signature
RowSignature.Builder builder = RowSignature.builder();
final Map<String, ColumnType> mergedColumnTypes = new LinkedHashMap<>();
for (String column : existingSignature.getColumnNames()) {
final ColumnType columnType =
existingSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column));
mergedColumnTypes.put(column, columnType);
}
Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
// column type to be updated is not present in the existing schema
boolean missingUpdateColumns = false;
// new column to be added is already present in the existing schema
boolean existingNewColumns = false;
for (String column : segmentSchema.getUpdatedColumns()) {
if (!mergedColumnTypes.containsKey(column)) {
missingUpdateColumns = true;
mergedColumnTypes.put(column, columnMapping.get(column));
} else {
mergedColumnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnMapping.get(column)));
}
}
for (String column : segmentSchema.getNewColumns()) {
if (mergedColumnTypes.containsKey(column)) {
existingNewColumns = true;
mergedColumnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnMapping.get(column)));
} else {
mergedColumnTypes.put(column, columnMapping.get(column));
}
}
if (missingUpdateColumns || existingNewColumns) {
log.makeAlert(
"Error merging delta schema update with existing row signature. segmentId [%s], "
+ "existingSignature [%s], deltaSchema [%s], missingUpdateColumns [%s], existingNewColumns [%s].",
segmentId,
existingSignature,
segmentSchema,
missingUpdateColumns,
existingNewColumns
).emit();
}
mergedColumnTypes.forEach(builder::add);
return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build()));
} else {
// delta update
// we don't have the previous signature, but we received delta update, raise alert
// this case shouldn't arise by design
// this can happen if a new segment is added and this is the very first schema update,
// implying we lost the absolute schema update
// which implies either the absolute schema update was never computed or lost in polling
log.makeAlert("Received delta schema update [%s] for a segment [%s] with no previous schema. ",
segmentSchema, segmentId
).emit();
return Optional.empty();
}
}
}