blob: e33f458a83377db793584bd5d8f2fbc5cad3b5ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.indexing.kinesis.KinesisRecordSupplier;
import org.apache.druid.indexing.kinesis.KinesisSequenceNumber;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* Supervisor responsible for managing the KinesisIndexTask for a single dataSource. At a high level, the class accepts a
* {@link KinesisSupervisorSpec} which includes the Kinesis stream and configuration as well as an ingestion spec which will
* be used to generate the indexing tasks. The run loop periodically refreshes its view of the Kinesis stream's partitions
* and the list of running indexing tasks and ensures that all partitions are being read from and that there are enough
* tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of
* Kinesis sequences.
*/
public class KinesisSupervisor extends SeekableStreamSupervisor<String, String, ByteEntity>
{
private static final EmittingLogger log = new EmittingLogger(KinesisSupervisor.class);
public static final TypeReference<TreeMap<Integer, Map<String, String>>> CHECKPOINTS_TYPE_REF =
new TypeReference<TreeMap<Integer, Map<String, String>>>()
{
};
public static final String OFFSET_NOT_SET = "-1";
private final KinesisSupervisorSpec spec;
private final AWSCredentialsConfig awsCredentialsConfig;
private volatile Map<String, Long> currentPartitionTimeLag;
public KinesisSupervisor(
final TaskStorage taskStorage,
final TaskMaster taskMaster,
final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
final KinesisIndexTaskClientFactory taskClientFactory,
final ObjectMapper mapper,
final KinesisSupervisorSpec spec,
final RowIngestionMetersFactory rowIngestionMetersFactory,
final AWSCredentialsConfig awsCredentialsConfig
)
{
super(
StringUtils.format("KinesisSupervisor-%s", spec.getDataSchema().getDataSource()),
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
taskClientFactory,
mapper,
spec,
rowIngestionMetersFactory,
true
);
this.spec = spec;
this.awsCredentialsConfig = awsCredentialsConfig;
this.currentPartitionTimeLag = null;
}
@Override
protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
int groupId,
Map<String, String> startPartitions,
Map<String, String> endPartitions,
String baseSequenceName,
DateTime minimumMessageTime,
DateTime maximumMessageTime,
Set<String> exclusiveStartSequenceNumberPartitions,
SeekableStreamSupervisorIOConfig ioConfigg
)
{
KinesisSupervisorIOConfig ioConfig = (KinesisSupervisorIOConfig) ioConfigg;
return new KinesisIndexTaskIOConfig(
groupId,
baseSequenceName,
new SeekableStreamStartSequenceNumbers<>(
ioConfig.getStream(),
startPartitions,
exclusiveStartSequenceNumberPartitions
),
new SeekableStreamEndSequenceNumbers<>(ioConfig.getStream(), endPartitions),
true,
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat(),
ioConfig.getEndpoint(),
ioConfig.getRecordsPerFetch(),
ioConfig.getFetchDelayMillis(),
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId(),
ioConfig.isDeaggregate()
);
}
@Override
protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(
int replicas,
String baseSequenceName,
ObjectMapper sortingMapper,
TreeMap<Integer, Map<String, String>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
RowIngestionMetersFactory rowIngestionMetersFactory
) throws JsonProcessingException
{
final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);
List<SeekableStreamIndexTask<String, String, ByteEntity>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KinesisIndexTask(
taskId,
new TaskResource(baseSequenceName, 1),
spec.getDataSchema(),
(KinesisIndexTaskTuningConfig) taskTuningConfig,
(KinesisIndexTaskIOConfig) taskIoConfig,
context,
awsCredentialsConfig
));
}
return taskList;
}
@Override
protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier() throws RuntimeException
{
KinesisSupervisorIOConfig ioConfig = spec.getIoConfig();
KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig();
return new KinesisRecordSupplier(
KinesisRecordSupplier.getAmazonKinesisClient(
ioConfig.getEndpoint(),
awsCredentialsConfig,
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
),
ioConfig.getRecordsPerFetch(),
ioConfig.getFetchDelayMillis(),
0, // skip starting background fetch, it is not used
ioConfig.isDeaggregate(),
taskTuningConfig.getRecordBufferSize(),
taskTuningConfig.getRecordBufferOfferTimeout(),
taskTuningConfig.getRecordBufferFullWait(),
taskTuningConfig.getFetchSequenceNumberTimeout(),
taskTuningConfig.getMaxRecordsPerPoll(),
ioConfig.isUseEarliestSequenceNumber()
);
}
/**
* We hash the shard ID string, and then use the first four bytes of the hash as an int % task count
*/
@Override
protected int getTaskGroupIdForPartition(String partitionId)
{
return getTaskGroupIdForPartitionWithProvidedList(partitionId, partitionIds);
}
private int getTaskGroupIdForPartitionWithProvidedList(String partitionId, List<String> availablePartitions)
{
int index = availablePartitions.indexOf(partitionId);
if (index < 0) {
return index;
}
return availablePartitions.indexOf(partitionId) % spec.getIoConfig().getTaskCount();
}
@Override
protected Map<Integer, Set<String>> recomputePartitionGroupsForExpiration(Set<String> availablePartitions)
{
List<String> availablePartitionsList = new ArrayList<>(availablePartitions);
Map<Integer, Set<String>> newPartitionGroups = new HashMap<>();
for (String availablePartition : availablePartitions) {
int newTaskGroupId = getTaskGroupIdForPartitionWithProvidedList(availablePartition, availablePartitionsList);
Set<String> newGroup = newPartitionGroups.computeIfAbsent(
newTaskGroupId,
k -> new HashSet<>()
);
newGroup.add(availablePartition);
}
return newPartitionGroups;
}
@Override
protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
{
return metadata instanceof KinesisDataSourceMetadata;
}
@Override
protected boolean doesTaskTypeMatchSupervisor(Task task)
{
return task instanceof KinesisIndexTask;
}
@Override
protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(
int numPartitions,
boolean includeOffsets
)
{
KinesisSupervisorIOConfig ioConfig = spec.getIoConfig();
Map<String, Long> partitionLag = getTimeLagPerPartition(getHighestCurrentOffsets());
return new KinesisSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getStream(),
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000,
spec.isSuspended(),
stateManager.isHealthy(),
stateManager.getSupervisorState().getBasicState(),
stateManager.getSupervisorState(),
stateManager.getExceptionEvents(),
includeOffsets ? partitionLag : null,
includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null
);
}
// not yet supported, will be implemented in the future maybe? need a way to get record count between current
// sequence and latest sequence
@Override
protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets)
{
return ImmutableMap.of();
}
@Override
protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets)
{
return currentOffsets
.entrySet()
.stream()
.filter(e -> e.getValue() != null &&
currentPartitionTimeLag != null &&
currentPartitionTimeLag.get(e.getKey()) != null
)
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> currentPartitionTimeLag.get(e.getKey())
)
);
}
@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
)
{
return new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(stream, map));
}
@Override
protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive)
{
return KinesisSequenceNumber.of(seq, isExclusive);
}
@Override
protected void updatePartitionLagFromStream()
{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
// this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock
currentPartitionTimeLag = supplier.getPartitionsTimeLag(getIoConfig().getStream(), getHighestCurrentOffsets());
}
@Override
protected Map<String, Long> getPartitionRecordLag()
{
return null;
}
@Override
protected Map<String, Long> getPartitionTimeLag()
{
return currentPartitionTimeLag;
}
@Override
protected String baseTaskName()
{
return "index_kinesis";
}
@Override
protected String getNotSetMarker()
{
return OFFSET_NOT_SET;
}
@Override
protected String getEndOfPartitionMarker()
{
return KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER;
}
@Override
protected boolean isEndOfShard(String seqNum)
{
return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum);
}
@Override
protected boolean isShardExpirationMarker(String seqNum)
{
return KinesisSequenceNumber.EXPIRED_MARKER.equals(seqNum);
}
@Override
protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
return true;
}
// Unlike the Kafka Indexing Service,
// Kinesis reports lag metrics measured in time difference in milliseconds between the current sequence number and latest sequence number,
// rather than message count.
@Override
public LagStats computeLagStats()
{
Map<String, Long> partitionTimeLags = getPartitionTimeLag();
if (partitionTimeLags == null) {
return new LagStats(0, 0, 0);
}
return computeLags(partitionTimeLags);
}
@Override
protected Map<String, OrderedSequenceNumber<String>> filterExpiredPartitionsFromStartingOffsets(
Map<String, OrderedSequenceNumber<String>> startingOffsets
)
{
Map<String, OrderedSequenceNumber<String>> filteredOffsets = new HashMap<>();
for (Map.Entry<String, OrderedSequenceNumber<String>> entry : startingOffsets.entrySet()) {
if (!entry.getValue().get().equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)) {
filteredOffsets.put(entry.getKey(), entry.getValue());
} else {
log.debug("Excluding shard[%s] because it has reached EOS.", entry.getKey());
}
}
return filteredOffsets;
}
@Override
protected boolean supportsPartitionExpiration()
{
return true;
}
@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds
)
{
log.info("Marking expired shards in metadata: " + expiredPartitionIds);
return createDataSourceMetadataWithClosedOrExpiredPartitions(
currentMetadata,
expiredPartitionIds,
KinesisSequenceNumber.EXPIRED_MARKER
);
}
private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata,
Set<String> terminatedPartitionIds,
String terminationMarker
)
{
final KinesisDataSourceMetadata dataSourceMetadata = (KinesisDataSourceMetadata) currentMetadata;
SeekableStreamSequenceNumbers<String, String> old = dataSourceMetadata.getSeekableStreamSequenceNumbers();
Map<String, String> oldPartitionSequenceNumberMap = old.getPartitionSequenceNumberMap();
Map<String, String> newPartitionSequenceNumberMap = new HashMap<>();
for (Map.Entry<String, String> entry : oldPartitionSequenceNumberMap.entrySet()) {
if (!terminatedPartitionIds.contains(entry.getKey())) {
newPartitionSequenceNumberMap.put(entry.getKey(), entry.getValue());
} else {
newPartitionSequenceNumberMap.put(entry.getKey(), terminationMarker);
}
}
SeekableStreamSequenceNumbers<String, String> newSequences;
if (old instanceof SeekableStreamStartSequenceNumbers) {
Set<String> oldExclusiveStartPartitions;
Set<String> newExclusiveStartPartitions;
newExclusiveStartPartitions = new HashSet<>();
oldExclusiveStartPartitions = ((SeekableStreamStartSequenceNumbers<String, String>) old).getExclusivePartitions();
for (String partitionId : oldExclusiveStartPartitions) {
if (!terminatedPartitionIds.contains(partitionId)) {
newExclusiveStartPartitions.add(partitionId);
}
}
newSequences = new SeekableStreamStartSequenceNumbers<>(
old.getStream(),
null,
newPartitionSequenceNumberMap,
null,
newExclusiveStartPartitions
);
} else {
newSequences = new SeekableStreamEndSequenceNumbers<>(
old.getStream(),
null,
newPartitionSequenceNumberMap,
null
);
}
return new KinesisDataSourceMetadata(newSequences);
}
}