blob: aebacecff662e0769448a4d4a9b0e4dfe304337e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.druid.indexing.kafka.KafkaSequenceNumber;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Supervisor responsible for managing the KafkaIndexTasks for a single dataSource. At a high level, the class accepts a
* {@link KafkaSupervisorSpec} which includes the Kafka topic and configuration as well as an ingestion spec which will
* be used to generate the indexing tasks. The run loop periodically refreshes its view of the Kafka topic's partitions
* and the list of running indexing tasks and ensures that all partitions are being read from and that there are enough
* tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of
* Kafka offsets.
*/
public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartition, Long, KafkaRecordEntity>
{
public static final TypeReference<TreeMap<Integer, Map<KafkaTopicPartition, Long>>> CHECKPOINTS_TYPE_REF =
new TypeReference<TreeMap<Integer, Map<KafkaTopicPartition, Long>>>()
{
};
private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
private static final Long NOT_SET = -1L;
private static final Long END_OF_PARTITION = Long.MAX_VALUE;
private final ServiceEmitter emitter;
private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
private final Pattern pattern;
private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
private final KafkaSupervisorSpec spec;
public KafkaSupervisor(
final TaskStorage taskStorage,
final TaskMaster taskMaster,
final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
final KafkaIndexTaskClientFactory taskClientFactory,
final ObjectMapper mapper,
final KafkaSupervisorSpec spec,
final RowIngestionMetersFactory rowIngestionMetersFactory
)
{
super(
StringUtils.format("KafkaSupervisor-%s", spec.getDataSchema().getDataSource()),
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
taskClientFactory,
mapper,
spec,
rowIngestionMetersFactory,
false
);
this.spec = spec;
this.emitter = spec.getEmitter();
this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
this.pattern = getIoConfig().isMultiTopic() ? Pattern.compile(getIoConfig().getStream()) : null;
}
@Override
protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> setupRecordSupplier()
{
return new KafkaRecordSupplier(
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getConfigOverrides(),
spec.getIoConfig().isMultiTopic()
);
}
@Override
protected int getTaskGroupIdForPartition(KafkaTopicPartition partitionId)
{
Integer taskCount = spec.getIoConfig().getTaskCount();
if (partitionId.isMultiTopicPartition()) {
return Math.abs(31 * partitionId.topic().hashCode() + partitionId.partition()) % taskCount;
} else {
return partitionId.partition() % taskCount;
}
}
@Override
protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
{
return metadata instanceof KafkaDataSourceMetadata;
}
@Override
protected boolean doesTaskTypeMatchSupervisor(Task task)
{
return task instanceof KafkaIndexTask;
}
@Override
protected SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> createReportPayload(
int numPartitions,
boolean includeOffsets
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
Map<KafkaTopicPartition, Long> partitionLag = getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
return new KafkaSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getStream(),
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000,
includeOffsets ? latestSequenceFromStream : null,
includeOffsets ? partitionLag : null,
includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null,
includeOffsets ? sequenceLastUpdated : null,
spec.isSuspended(),
stateManager.isHealthy(),
stateManager.getSupervisorState().getBasicState(),
stateManager.getSupervisorState(),
stateManager.getExceptionEvents()
);
}
@Override
protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
int groupId,
Map<KafkaTopicPartition, Long> startPartitions,
Map<KafkaTopicPartition, Long> endPartitions,
String baseSequenceName,
DateTime minimumMessageTime,
DateTime maximumMessageTime,
Set<KafkaTopicPartition> exclusiveStartSequenceNumberPartitions,
SeekableStreamSupervisorIOConfig ioConfig
)
{
KafkaSupervisorIOConfig kafkaIoConfig = (KafkaSupervisorIOConfig) ioConfig;
return new KafkaIndexTaskIOConfig(
groupId,
baseSequenceName,
null,
null,
new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getStream(), startPartitions, Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getStream(), endPartitions),
kafkaIoConfig.getConsumerProperties(),
kafkaIoConfig.getPollTimeout(),
true,
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides(),
kafkaIoConfig.isMultiTopic()
);
}
@Override
protected List<SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEntity>> createIndexTasks(
int replicas,
String baseSequenceName,
ObjectMapper sortingMapper,
TreeMap<Integer, Map<KafkaTopicPartition, Long>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
RowIngestionMetersFactory rowIngestionMetersFactory
) throws JsonProcessingException
{
final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);
// Kafka index task always uses incremental handoff since 0.16.0.
// The below is for the compatibility when you want to downgrade your cluster to something earlier than 0.16.0.
// Kafka index task will pick up LegacyKafkaIndexTaskRunner without the below configuration.
context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);
List<SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEntity>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KafkaIndexTask(
taskId,
new TaskResource(baseSequenceName, 1),
spec.getDataSchema(),
(KafkaIndexTaskTuningConfig) taskTuningConfig,
(KafkaIndexTaskIOConfig) taskIoConfig,
context,
sortingMapper
));
}
return taskList;
}
@Override
protected Map<KafkaTopicPartition, Long> getPartitionRecordLag()
{
Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets();
if (latestSequenceFromStream == null) {
return null;
}
if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
log.warn(
"Lag metric: Kafka partitions %s do not match task partitions %s",
latestSequenceFromStream.keySet(),
highestCurrentOffsets.keySet()
);
}
return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);
}
@Nullable
@Override
protected Map<KafkaTopicPartition, Long> getPartitionTimeLag()
{
// time lag not currently support with kafka
return null;
}
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while calculating cummulative lag for entire stream
private Map<KafkaTopicPartition, Long> getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> currentOffsets)
{
if (latestSequenceFromStream == null) {
return Collections.emptyMap();
}
return latestSequenceFromStream
.entrySet()
.stream()
.collect(
Collectors.toMap(
Entry::getKey,
e -> e.getValue() != null
? e.getValue() - Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
: 0
)
);
}
@Override
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while generating Supervisor lag reports per task
protected Map<KafkaTopicPartition, Long> getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
{
if (latestSequenceFromStream == null || currentOffsets == null) {
return Collections.emptyMap();
}
return currentOffsets
.entrySet()
.stream()
.filter(e -> latestSequenceFromStream.get(e.getKey()) != null)
.collect(
Collectors.toMap(
Entry::getKey,
e -> e.getValue() != null
? latestSequenceFromStream.get(e.getKey()) - e.getValue()
: 0
)
);
}
@Override
protected Map<KafkaTopicPartition, Long> getTimeLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
{
return null;
}
@Override
protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<KafkaTopicPartition, Long> map)
{
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
@Override
protected OrderedSequenceNumber<Long> makeSequenceNumber(Long seq, boolean isExclusive)
{
return KafkaSequenceNumber.of(seq);
}
@Override
protected Long getNotSetMarker()
{
return NOT_SET;
}
@Override
protected Long getEndOfPartitionMarker()
{
return END_OF_PARTITION;
}
@Override
protected boolean isEndOfShard(Long seqNum)
{
return false;
}
@Override
protected boolean isShardExpirationMarker(Long seqNum)
{
return false;
}
@Override
protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
return false;
}
@Override
public LagStats computeLagStats()
{
Map<KafkaTopicPartition, Long> partitionRecordLag = getPartitionRecordLag();
if (partitionRecordLag == null) {
return new LagStats(0, 0, 0);
}
return computeLags(partitionRecordLag);
}
@Override
protected void updatePartitionLagFromStream()
{
getRecordSupplierLock().lock();
try {
Set<KafkaTopicPartition> partitionIds;
try {
partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream());
}
catch (Exception e) {
log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream());
throw new StreamException(e);
}
Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
.stream()
.map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
.collect(Collectors.toSet());
recordSupplier.seekToLatest(partitions);
// this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is
// because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest
// task offsets from the latest offsets from the stream when it is needed
latestSequenceFromStream =
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, recordSupplier::getPosition));
}
catch (InterruptedException e) {
throw new StreamException(e);
}
finally {
getRecordSupplierLock().unlock();
}
}
@Override
protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
{
return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>();
}
@Override
protected String baseTaskName()
{
return "index_kafka";
}
@Override
@VisibleForTesting
public KafkaSupervisorIOConfig getIoConfig()
{
return spec.getIoConfig();
}
@VisibleForTesting
public KafkaSupervisorTuningConfig getTuningConfig()
{
return spec.getTuningConfig();
}
protected boolean isMultiTopic()
{
return getIoConfig().isMultiTopic() && pattern != null;
}
/**
* Gets the offsets as stored in the metadata store. The map returned will only contain
* offsets from topic partitions that match the current supervisor config stream. This
* override is needed because in the case of multi-topic, a user could have updated the supervisor
* config from single topic to mult-topic, where the new multi-topic pattern regex matches the
* old config single topic. Without this override, the previously stored metadata for the single
* topic would be deemed as different from the currently configure stream, and not be included in
* the offset map returned. This implementation handles these cases appropriately.
*
* @return the previoulsy stored offsets from metadata storage, possibly updated with offsets removed
* for topics that do not match the currently configured supervisor topic. Topic partition keys may also be
* updated to single topic or multi-topic depending on the supervisor config, as needed.
*/
@Override
protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
@SuppressWarnings("unchecked")
SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = ((KafkaDataSourceMetadata) dataSourceMetadata)
.getSeekableStreamSequenceNumbers();
if (partitions != null && partitions.getPartitionSequenceNumberMap() != null) {
Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>();
Set<String> topicMisMatchLogged = new HashSet<>();
partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) -> {
final String matchValue;
// previous offsets are from multi-topic config
if (kafkaTopicPartition.topic().isPresent()) {
matchValue = kafkaTopicPartition.topic().get();
} else {
// previous offsets are from single topic config
matchValue = partitions.getStream();
}
KafkaTopicPartition matchingTopicPartition = getMatchingKafkaTopicPartition(kafkaTopicPartition, matchValue);
if (matchingTopicPartition == null && !topicMisMatchLogged.contains(matchValue)) {
log.warn(
"Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences",
matchValue,
getIoConfig().getStream()
);
topicMisMatchLogged.add(matchValue);
}
if (matchingTopicPartition != null) {
partitionOffsets.put(matchingTopicPartition, value);
}
});
return partitionOffsets;
}
}
return Collections.emptyMap();
}
@Nullable
private KafkaTopicPartition getMatchingKafkaTopicPartition(
final KafkaTopicPartition kafkaTopicPartition,
final String streamMatchValue
)
{
final boolean match;
match = pattern != null
? pattern.matcher(streamMatchValue).matches()
: getIoConfig().getStream().equals(streamMatchValue);
return match ? new KafkaTopicPartition(isMultiTopic(), streamMatchValue, kafkaTopicPartition.partition()) : null;
}
}