blob: 0b99a40f1b89daca2f97a6f5f645c0e012e06ffe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.Validate;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1) where failCount = 1, 2, 3, ...
* nextRetry = Min(nextRetry, currentTime + maxDelay)
*/
public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
private final TimeInterval initialDelay;
private final TimeInterval delayPeriod;
private final TimeInterval maxDelay;
private final int maxRetries;
//This class assumes that there is at most one retry schedule per message id in this set at a time.
private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
/**
* Comparator ordering by timestamp.
*/
private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
@Override
public int compare(RetrySchedule entry1, RetrySchedule entry2) {
int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
if (result == 0) {
//TreeSet uses compareTo instead of equals() for the Set contract
//Ensure that we can save two retry schedules with the same timestamp
result = entry1.hashCode() - entry2.hashCode();
}
return result;
}
}
private class RetrySchedule {
private final KafkaSpoutMessageId msgId;
private long nextRetryTimeNanos;
RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) {
this.msgId = msgId;
this.nextRetryTimeNanos = nextRetryTimeNanos;
LOG.debug("Created {}", this);
}
public void setNextRetryTimeNanos() {
nextRetryTimeNanos = nextTime(msgId);
LOG.debug("Updated {}", this);
}
public boolean retry(long currentTimeNanos) {
return nextRetryTimeNanos <= currentTimeNanos;
}
@Override
public String toString() {
return "RetrySchedule{"
+ "msgId=" + msgId
+ ", nextRetryTimeNanos=" + nextRetryTimeNanos
+ '}';
}
public KafkaSpoutMessageId msgId() {
return msgId;
}
public long nextRetryTimeNanos() {
return nextRetryTimeNanos;
}
}
public static class TimeInterval implements Serializable {
private final long lengthNanos;
private final TimeUnit timeUnit;
private final long length;
/**
* Creates a new TimeInterval.
* @param length length of the time interval in the units specified by {@link TimeUnit}
* @param timeUnit unit used to specify a time interval on which to specify a time unit
*/
public TimeInterval(long length, TimeUnit timeUnit) {
this.lengthNanos = timeUnit.toNanos(length);
this.timeUnit = timeUnit;
this.length = length;
}
public static TimeInterval seconds(long length) {
return new TimeInterval(length, TimeUnit.SECONDS);
}
public static TimeInterval milliSeconds(long length) {
return new TimeInterval(length, TimeUnit.MILLISECONDS);
}
public static TimeInterval microSeconds(long length) {
return new TimeInterval(length, TimeUnit.MICROSECONDS);
}
public long lengthNanos() {
return lengthNanos;
}
public TimeUnit timeUnit() {
return timeUnit;
}
@Override
public String toString() {
return "TimeInterval{"
+ "length=" + length
+ ", timeUnit=" + timeUnit
+ '}';
}
}
/**
* The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
* where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
* <p/>
* By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
* previous polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
* @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit
* @param maxDelay maximum amount of time waiting before retrying
*
*/
public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
this.initialDelay = initialDelay;
this.delayPeriod = delayPeriod;
this.maxRetries = maxRetries;
this.maxDelay = maxDelay;
LOG.debug("Instantiated {}", this.toStringImpl());
}
@Override
public Map<TopicPartition, Long> earliestRetriableOffsets() {
final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
final KafkaSpoutMessageId msgId = retrySchedule.msgId;
final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
tpToEarliestRetriableOffset.merge(tpForMessage, msgId.offset(), Math::min);
} else {
break; // Stop searching as soon as passed current time
}
}
LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
return tpToEarliestRetriableOffset;
}
@Override
public boolean isReady(KafkaSpoutMessageId msgId) {
boolean retry = false;
if (isScheduled(msgId)) {
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
if (retrySchedule.msgId.equals(msgId)) {
retry = true;
LOG.debug("Found entry to retry {}", retrySchedule);
break; //Stop searching if the message is known to be ready for retry
}
} else {
LOG.debug("Entry to retry not found {}", retrySchedule);
break; // Stop searching as soon as passed current time
}
}
}
return retry;
}
@Override
public boolean isScheduled(KafkaSpoutMessageId msgId) {
return toRetryMsgs.contains(msgId);
}
@Override
public boolean remove(KafkaSpoutMessageId msgId) {
boolean removed = false;
if (isScheduled(msgId)) {
toRetryMsgs.remove(msgId);
for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
final RetrySchedule retrySchedule = iterator.next();
if (retrySchedule.msgId().equals(msgId)) {
iterator.remove();
removed = true;
break; //There is at most one schedule per message id
}
}
}
LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId);
LOG.trace("Current state {}", retrySchedules);
return removed;
}
@Override
public boolean retainAll(Collection<TopicPartition> topicPartitions) {
boolean result = false;
for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) {
final RetrySchedule retrySchedule = rsIterator.next();
final KafkaSpoutMessageId msgId = retrySchedule.msgId;
final TopicPartition tpRetry = new TopicPartition(msgId.topic(), msgId.partition());
if (!topicPartitions.contains(tpRetry)) {
rsIterator.remove();
toRetryMsgs.remove(msgId);
LOG.debug("Removed {}", retrySchedule);
LOG.trace("Current state {}", retrySchedules);
result = true;
}
}
return result;
}
@Override
public boolean schedule(KafkaSpoutMessageId msgId) {
if (msgId.numFails() > maxRetries) {
LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
return false;
} else {
//Remove existing schedule for the message id
remove(msgId);
final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
retrySchedules.add(retrySchedule);
toRetryMsgs.add(msgId);
LOG.debug("Scheduled. {}", retrySchedule);
LOG.trace("Current state {}", retrySchedules);
return true;
}
}
@Override
public int readyMessageCount() {
int count = 0;
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
++count;
} else {
break; //Stop counting when past current time
}
}
return count;
}
@Override
public KafkaSpoutMessageId getMessageId(TopicPartition tp, long offset) {
KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(tp, offset);
if (isScheduled(msgId)) {
for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
if (originalMsgId.equals(msgId)) {
return originalMsgId;
}
}
}
return msgId;
}
// if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
private long nextTime(KafkaSpoutMessageId msgId) {
Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once");
final long currentTimeNanos = Time.nanoTime();
final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
? currentTimeNanos + initialDelay.lengthNanos
: currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails() - 1));
return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
}
@Override
public String toString() {
return toStringImpl();
}
private String toStringImpl() {
//This is here to avoid an overridable call in the constructor
return "KafkaSpoutRetryExponentialBackoff{"
+ "delay=" + initialDelay
+ ", ratio=" + delayPeriod
+ ", maxRetries=" + maxRetries
+ ", maxRetryDelay=" + maxDelay
+ '}';
}
}