blob: 391aeccb642d4af2c2d4fff2a8bcd40eaaeab2e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaSpout<K, V> extends BaseRichSpout {
private static final long serialVersionUID = 4151921085047987154L;
//Initial delay for the commit and assignment refresh timers
public static final long TIMER_DELAY_MS = 500;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
// Storm
protected SpoutOutputCollector collector;
// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
private final ConsumerFactory<K, V> kafkaConsumerFactory;
private final TopicAssigner topicAssigner;
private transient Consumer<K, V> consumer;
// Bookkeeping
// Strategy to determine the fetch offset of the first realized by the spout upon activation
private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
// Class that has the logic to handle tuple failure.
private transient KafkaSpoutRetryService retryService;
// Handles tuple events (emit, ack etc.)
private transient KafkaTupleListener tupleListener;
// timer == null only if the processing guarantee is at-most-once
private transient Timer commitTimer;
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
// Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
// or after a consumer rebalance, or during close/deactivate. Always empty if processing guarantee is none or at-most-once.
private transient Map<TopicPartition, OffsetManager> offsetManagers;
// Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed.
// Always empty if processing guarantee is none or at-most-once
private transient Set<KafkaSpoutMessageId> emitted;
// Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;
// Triggers when an assignment should be refreshed
private transient Timer refreshAssignmentTimer;
private transient TopologyContext context;
private transient CommitMetadataManager commitMetadataManager;
private transient KafkaOffsetMetric<K, V> kafkaOffsetMetric;
private transient KafkaSpoutConsumerRebalanceListener rebalanceListener;
public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new ConsumerFactoryDefault<>(), new TopicAssigner());
}
@VisibleForTesting
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> kafkaConsumerFactory, TopicAssigner topicAssigner) {
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.topicAssigner = topicAssigner;
this.kafkaSpoutConfig = kafkaSpoutConfig;
}
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
// Spout internals
this.collector = collector;
// Offset management
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
// Retries management
retryService = kafkaSpoutConfig.getRetryService();
tupleListener = kafkaSpoutConfig.getTupleListener();
if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
// In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer
commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
}
refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = new HashMap<>();
commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
rebalanceListener = new KafkaSpoutConsumerRebalanceListener();
consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
tupleListener.open(conf, context);
if (canRegisterMetrics()) {
registerMetric();
}
LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}
private void registerMetric() {
LOG.info("Registering Spout Metrics");
kafkaOffsetMetric = new KafkaOffsetMetric<>(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer);
context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
}
private boolean canRegisterMetrics() {
try {
KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class);
} catch (NoSuchMethodException e) {
LOG.warn("Minimum required kafka-clients library version to enable metrics is 0.10.1.0. Disabling spout metrics.");
return false;
}
return true;
}
private boolean isAtLeastOnceProcessing() {
return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
}
// =========== Consumer Rebalance Listener - On the same thread as the caller ===========
private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
private Collection<TopicPartition> previousAssignment = new HashSet<>();
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
previousAssignment = partitions;
LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples();
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]",
context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
initialize(partitions);
tupleListener.onPartitionsReassigned(partitions);
}
private void initialize(Collection<TopicPartition> partitions) {
if (isAtLeastOnceProcessing()) {
// remove offsetManagers for all partitions that are no longer assigned to this spout
offsetManagers.keySet().retainAll(partitions);
retryService.retainAll(partitions);
/*
* Emitted messages for partitions that are no longer assigned to this spout can't be acked and should not be retried, hence
* remove them from emitted collection.
*/
emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition()));
}
waitingToEmit.keySet().retainAll(partitions);
Set<TopicPartition> newPartitions = new HashSet<>(partitions);
// If this partition was previously assigned to this spout,
// leave the acked offsets and consumer position as they were to resume where it left off
newPartitions.removeAll(previousAssignment);
for (TopicPartition newTp : newPartitions) {
final OffsetAndMetadata committedOffset = consumer.committed(newTp);
final long fetchOffset = doSeek(newTp, committedOffset);
LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]",
fetchOffset, newTp, firstPollOffsetStrategy, committedOffset);
if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(newTp)) {
offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset));
}
}
LOG.info("Initialization complete");
}
/**
* Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset.
*/
private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) {
LOG.trace("Seeking offset for topic-partition [{}] with [{}] and committed offset [{}]",
newTp, firstPollOffsetStrategy, committedOffset);
if (committedOffset != null) {
// offset was previously committed for this consumer group and topic-partition, either by this or another topology.
if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp,
committedOffset,
Collections.unmodifiableMap(offsetManagers))) {
// Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply.
consumer.seek(newTp, committedOffset.offset());
} else {
// offset was not committed by this topology, therefore FirstPollOffsetStrategy applies
// (only when the topology is first deployed).
if (firstPollOffsetStrategy.equals(EARLIEST)) {
consumer.seekToBeginning(Collections.singleton(newTp));
} else if (firstPollOffsetStrategy.equals(LATEST)) {
consumer.seekToEnd(Collections.singleton(newTp));
} else {
// Resume polling at the last committed offset, i.e. the first offset that is not marked as processed.
consumer.seek(newTp, committedOffset.offset());
}
}
} else {
// no offset commits have ever been done for this consumer group and topic-partition,
// so start at the beginning or end depending on FirstPollOffsetStrategy
if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
consumer.seekToBeginning(Collections.singleton(newTp));
} else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
consumer.seekToEnd(Collections.singleton(newTp));
}
}
return consumer.position(newTp);
}
}
// ======== Next Tuple =======
@Override
public void nextTuple() {
try {
if (refreshAssignmentTimer.isExpiredResetOnTrue()) {
refreshAssignment();
}
if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples();
} else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitAsync(offsetsToCommit, null);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
}
PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
if (pollablePartitionsInfo.shouldPoll()) {
try {
setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
} catch (RetriableException e) {
LOG.error("Failed to poll from kafka.", e);
}
}
emitIfWaitingNotEmitted();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
private void throwKafkaConsumerInterruptedException() {
//Kafka throws their own type of exception when interrupted.
//Throw a new Java InterruptedException to ensure Storm can recognize the exception as a reaction to an interrupt.
throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
}
private PollablePartitionsInfo getPollablePartitionsInfo() {
if (isWaitingToEmit()) {
LOG.debug("Not polling. Tuples waiting to be emitted.");
return new PollablePartitionsInfo(Collections.emptySet(), Collections.emptyMap());
}
Set<TopicPartition> assignment = consumer.assignment();
if (!isAtLeastOnceProcessing()) {
return new PollablePartitionsInfo(assignment, Collections.emptyMap());
}
Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets();
Set<TopicPartition> pollablePartitions = new HashSet<>();
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
for (TopicPartition tp : assignment) {
OffsetManager offsetManager = offsetManagers.get(tp);
int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets();
if (numUncommittedOffsets < maxUncommittedOffsets) {
//Allow poll if the partition is not at the maxUncommittedOffsets limit
pollablePartitions.add(tp);
} else {
long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
Long earliestRetriableOffset = earliestRetriableOffsets.get(tp);
if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) {
//Allow poll if there are retriable tuples within the maxUncommittedOffsets limit
pollablePartitions.add(tp);
} else {
LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp,
numUncommittedOffsets, maxUncommittedOffsets);
}
}
}
return new PollablePartitionsInfo(pollablePartitions, earliestRetriableOffsets);
}
private boolean isWaitingToEmit() {
return waitingToEmit.values().stream()
.anyMatch(list -> !list.isEmpty());
}
private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
for (TopicPartition tp : consumerRecords.partitions()) {
waitingToEmit.put(tp, new LinkedList<>(consumerRecords.records(tp)));
}
}
// ======== poll =========
private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
try {
consumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
return consumerRecords;
} finally {
consumer.resume(pausedPartitions);
}
}
private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
//Seek directly to the earliest retriable message for each retriable topic partition
consumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
}
}
private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets,
ConsumerRecords<K, V> consumerRecords) {
for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
if (!records.isEmpty()) {
ConsumerRecord<K, V> record = records.get(0);
long seekOffset = entry.getValue();
long earliestReceivedOffset = record.offset();
if (seekOffset < earliestReceivedOffset) {
//Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away.
//Ack up to the first offset received if the record is not already acked or currently in the topology
for (long i = seekOffset; i < earliestReceivedOffset; i++) {
KafkaSpoutMessageId msgId = retryService.getMessageId(tp, i);
if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) {
LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp);
retryService.remove(msgId);
emitted.add(msgId);
ack(msgId);
}
}
}
}
}
}
// ======== emit =========
private void emitIfWaitingNotEmitted() {
Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = waitingToEmit.values().iterator();
outerLoop:
while (waitingToEmitIter.hasNext()) {
List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmitIter.next();
while (!waitingToEmitForTp.isEmpty()) {
final boolean emittedTuple = emitOrRetryTuple(waitingToEmitForTp.remove(0));
if (emittedTuple) {
break outerLoop;
}
}
waitingToEmitIter.remove();
}
}
/**
* Creates a tuple from the kafka record and emits it if it was never emitted or it is ready to be retried.
*
* @param record to be emitted
* @return true if tuple was emitted. False if tuple has been acked or has been emitted and is pending ack or fail
*/
private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset());
if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
} else {
final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
if (isEmitTuple(tuple)) {
final boolean isScheduled = retryService.isScheduled(msgId);
// not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried
if (!isScheduled || retryService.isReady(msgId)) {
final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;
if (!isAtLeastOnceProcessing()) {
if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
collector.emit(stream, tuple, msgId);
LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
} else {
collector.emit(stream, tuple);
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
} else {
emitted.add(msgId);
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule.
retryService.remove(msgId);
}
collector.emit(stream, tuple, msgId);
tupleListener.onEmit(tuple, msgId);
LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
}
return true;
}
} else {
/*
* if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately to allow its offset
* to be commited to Kafka
*/
LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
if (isAtLeastOnceProcessing()) {
msgId.setNullTuple(true);
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
ack(msgId);
}
}
}
return false;
}
/**
* Emits a tuple if it is not a null tuple, or if the spout is configured to emit null tuples.
*/
private boolean isEmitTuple(List<Object> tuple) {
return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
}
private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignedPartitions) {
offsetsToCommit.put(tp, new OffsetAndMetadata(consumer.position(tp), commitMetadataManager.getCommitMetadata()));
}
return offsetsToCommit;
}
private void commitOffsetsForAckedTuples() {
final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetManager> tpOffset : offsetManagers.entrySet()) {
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
}
}
// Commit offsets that are ready to be committed for every topic partition
if (!nextCommitOffsets.isEmpty()) {
consumer.commitSync(nextCommitOffsets);
LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
// Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
// in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
//Update the OffsetManager for each committed partition, and update numUncommittedOffsets
final TopicPartition tp = tpOffset.getKey();
long position = consumer.position(tp);
long committedOffset = tpOffset.getValue().offset();
if (position < committedOffset) {
/*
* The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more
* than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be
* part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward
* to the committed offset.
*/
LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
position, committedOffset);
consumer.seek(tp, committedOffset);
}
/**
* In some cases the waitingToEmit list may contain tuples that have just been committed. Drop these.
*/
List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
if (waitingToEmitForTp != null) {
//Discard the pending records that are already committed
waitingToEmit.put(tp, waitingToEmitForTp.stream()
.filter(record -> record.offset() >= committedOffset)
.collect(Collectors.toCollection(LinkedList::new)));
}
final OffsetManager offsetManager = offsetManagers.get(tp);
offsetManager.commit(tpOffset.getValue());
LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
}
} else {
LOG.trace("No offsets to commit. {}", this);
}
}
// ======== Ack =======
@Override
public void ack(Object messageId) {
if (!isAtLeastOnceProcessing()) {
return;
}
// Only need to keep track of acked tuples if commits to Kafka are controlled by
// tuple acks, which happens only for at-least-once processing semantics
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (msgId.isNullTuple()) {
//a null tuple should be added to the ack list since by definition is a direct ack
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
tupleListener.onAck(msgId);
return;
}
if (!emitted.contains(msgId)) {
LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
+ "came from a topic-partition that this consumer group instance is no longer tracking "
+ "due to rebalance/partition reassignment. No action taken.", msgId);
} else {
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
emitted.remove(msgId);
}
tupleListener.onAck(msgId);
}
// ======== Fail =======
@Override
public void fail(Object messageId) {
if (!isAtLeastOnceProcessing()) {
return;
}
// Only need to keep track of failed tuples if commits to Kafka are controlled by
// tuple acks, which happens only for at-least-once processing semantics
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!emitted.contains(msgId)) {
LOG.debug("Received fail for tuple this spout is no longer tracking."
+ " Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed."
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
msgId.incrementNumFails();
if (!retryService.schedule(msgId)) {
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
// this tuple should be removed from emitted only inside the ack() method. This is to ensure
// that the OffsetManager for that TopicPartition is updated and allows commit progression
tupleListener.onMaxRetryReached(msgId);
ack(msgId);
} else {
tupleListener.onRetry(msgId);
emitted.remove(msgId);
}
}
// ======== Activate / Deactivate / Close / Declare Outputs =======
@Override
public void activate() {
try {
refreshAssignment();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
private void refreshAssignment() {
Set<TopicPartition> allPartitions = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(consumer);
List<TopicPartition> allPartitionsSorted = new ArrayList<>(allPartitions);
Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE);
Set<TopicPartition> assignedPartitions = kafkaSpoutConfig.getTopicPartitioner()
.getPartitionsForThisTask(allPartitionsSorted, context);
topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener);
}
@Override
public void deactivate() {
try {
commitIfNecessary();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
@Override
public void close() {
try {
shutdown();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
private void commitIfNecessary() {
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples();
}
}
private void shutdown() {
try {
commitIfNecessary();
} finally {
//remove resources
consumer.close();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
for (String stream : translator.streams()) {
declarer.declareStream(stream, translator.getFieldsFor(stream));
}
}
@Override
public String toString() {
return "KafkaSpout{"
+ "offsetManagers =" + offsetManagers
+ ", emitted=" + emitted
+ "}";
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> configuration = super.getComponentConfiguration();
if (configuration == null) {
configuration = new HashMap<>();
}
String configKeyPrefix = "config.";
configuration.put(configKeyPrefix + "topics", getTopicsString());
configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
for (Entry<String, Object> conf : kafkaSpoutConfig.getKafkaProps().entrySet()) {
if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
} else {
LOG.debug("Dropping Kafka prop '{}' from component configuration", conf.getKey());
}
}
return configuration;
}
private boolean isPrimitiveOrWrapper(Class<?> type) {
if (type == null) {
return false;
}
return type.isPrimitive() || isWrapper(type);
}
private boolean isWrapper(Class<?> type) {
return type == Double.class || type == Float.class || type == Long.class
|| type == Integer.class || type == Short.class || type == Character.class
|| type == Byte.class || type == Boolean.class || type == String.class;
}
private String getTopicsString() {
return kafkaSpoutConfig.getTopicFilter().getTopicsString();
}
private static class PollablePartitionsInfo {
private final Set<TopicPartition> pollablePartitions;
//The subset of earliest retriable offsets that are on pollable partitions
private final Map<TopicPartition, Long> pollableEarliestRetriableOffsets;
PollablePartitionsInfo(Set<TopicPartition> pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
this.pollablePartitions = pollablePartitions;
this.pollableEarliestRetriableOffsets = earliestRetriableOffsets.entrySet().stream()
.filter(entry -> pollablePartitions.contains(entry.getKey()))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
}
public boolean shouldPoll() {
return !this.pollablePartitions.isEmpty();
}
}
@VisibleForTesting
KafkaOffsetMetric<K, V> getKafkaOffsetMetric() {
return kafkaOffsetMetric;
}
}