blob: a37dbc958b5b6c73c6a2ac0561e14655a786bdbe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka;
import org.apache.storm.utils.Time;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
private long retryInitialDelayMs;
private double retryDelayMultiplier;
private long retryDelayMaxMs;
private int retryLimit;
private Queue<MessageRetryRecord> waiting;
private Map<Long,MessageRetryRecord> records;
public ExponentialBackoffMsgRetryManager() {
}
public void prepare(SpoutConfig spoutConfig, Map stormConf) {
this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
this.retryLimit = spoutConfig.retryLimit;
this.waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
this.records = new ConcurrentHashMap<Long,MessageRetryRecord>();
}
@Override
public void failed(Long offset) {
MessageRetryRecord oldRecord = this.records.get(offset);
MessageRetryRecord newRecord = oldRecord == null ?
new MessageRetryRecord(offset) :
oldRecord.createNextRetryRecord();
this.records.put(offset, newRecord);
this.waiting.add(newRecord);
}
@Override
public void acked(Long offset) {
MessageRetryRecord record = this.records.remove(offset);
if (record != null) {
this.waiting.remove(record);
}
}
@Override
public void retryStarted(Long offset) {
MessageRetryRecord record = this.records.get(offset);
if (record == null || !this.waiting.contains(record)) {
throw new IllegalStateException("cannot retry a message that has not failed");
} else {
this.waiting.remove(record);
}
}
@Override
public Long nextFailedMessageToRetry() {
if (this.waiting.size() > 0) {
MessageRetryRecord first = this.waiting.peek();
if (Time.currentTimeMillis() >= first.retryTimeUTC) {
if (this.records.containsKey(first.offset)) {
return first.offset;
} else {
// defensive programming - should be impossible
this.waiting.remove(first);
return nextFailedMessageToRetry();
}
}
}
return null;
}
@Override
public boolean shouldReEmitMsg(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return record != null &&
this.waiting.contains(record) &&
Time.currentTimeMillis() >= record.retryTimeUTC;
}
@Override
public boolean retryFurther(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return ! (record != null &&
this.retryLimit > 0 &&
this.retryLimit <= record.retryNum);
}
@Override
public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
Set<Long> invalidOffsets = new HashSet<Long>();
for(Long offset : records.keySet()){
if(offset < kafkaOffset){
MessageRetryRecord record = this.records.remove(offset);
if (record != null) {
this.waiting.remove(record);
invalidOffsets.add(offset);
}
}
}
return invalidOffsets;
}
/**
* A MessageRetryRecord holds the data of how many times a message has
* failed and been retried, and when the last failure occurred. It can
* determine whether it is ready to be retried by employing an exponential
* back-off calculation using config values stored in SpoutConfig:
* <ul>
* <li>retryInitialDelayMs - time to delay before the first retry</li>
* <li>retryDelayMultiplier - multiplier by which to increase the delay for each subsequent retry</li>
* <li>retryDelayMaxMs - maximum retry delay (once this delay time is reached, subsequent retries will
* delay for this amount of time every time)
* </li>
* </ul>
*/
private class MessageRetryRecord {
private final long offset;
private final int retryNum;
private final long retryTimeUTC;
public MessageRetryRecord(long offset) {
this(offset, 1);
}
private MessageRetryRecord(long offset, int retryNum) {
this.offset = offset;
this.retryNum = retryNum;
this.retryTimeUTC = Time.currentTimeMillis() + calculateRetryDelay();
}
/**
* Create a MessageRetryRecord for the next retry that should occur after this one.
* @return MessageRetryRecord with the next retry time, or null to indicate that another
* retry should not be performed. The latter case can happen if we are about to
* run into the org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
* configuration.
*/
public MessageRetryRecord createNextRetryRecord() {
return new MessageRetryRecord(this.offset, this.retryNum + 1);
}
private long calculateRetryDelay() {
double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1);
double delay = retryInitialDelayMs * delayMultiplier;
Long maxLong = Long.MAX_VALUE;
long delayThisRetryMs = delay >= maxLong.doubleValue()
? maxLong
: (long) delay;
return Math.min(delayThisRetryMs, retryDelayMaxMs);
}
@Override
public boolean equals(Object other) {
return (other instanceof MessageRetryRecord
&& this.offset == ((MessageRetryRecord) other).offset);
}
@Override
public int hashCode() {
return Long.valueOf(this.offset).hashCode();
}
}
private static class RetryTimeComparator implements Comparator<MessageRetryRecord> {
@Override
public int compare(MessageRetryRecord record1, MessageRetryRecord record2) {
return Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC));
}
@Override
public boolean equals(Object obj) {
return false;
}
}
}