blob: 030d74a014f29316b475b83384fcb4683b26000b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Class that contains all the logic to control and perform the deduplication on the broker side.
public class MessageDeduplication {
private final PulsarService pulsar;
private final PersistentTopic topic;
private final ManagedLedger managedLedger;
private ManagedCursor managedCursor;
enum Status {
// Deduplication is initialized
// Deduplication is disabled
// Initializing deduplication state
// Deduplication is in effect
// Turning off deduplication
// Failed to enable/disable
public enum MessageDupStatus {
// whether a message is a definitely a duplicate or not cannot be determined at this time
// message is definitely NOT a duplicate
// message is definitely a duplicate
public static class MessageDupUnknownException extends RuntimeException {
public MessageDupUnknownException() {
super("Cannot determine whether the message is a duplicate at this time");
private volatile Status status;
// Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
// the messages are persisted
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
ConcurrentOpenHashMap.<String, Long>newBuilder()
// Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
// after the messages are persisted
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted =
ConcurrentOpenHashMap.<String, Long>newBuilder()
// Number of persisted entries after which to store a snapshot of the sequence ids map
private final int snapshotInterval;
// Counter of number of entries stored after last snapshot was taken
private int snapshotCounter;
// The timestamp when the snapshot was taken by the scheduled task last time
private volatile long lastSnapshotTimestamp = 0L;
// Max number of producer for which to persist the sequence id information
private final int maxNumberOfProducers;
// Map used to track the inactive producer along with the timestamp of their last activity
private final Map<String, Long> inactiveProducers = new HashMap<>();
private final String replicatorPrefix;
public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) {
this.pulsar = pulsar;
this.topic = topic;
this.managedLedger = managedLedger;
this.status = Status.Initialized;
this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval();
this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers();
this.snapshotCounter = 0;
this.replicatorPrefix = pulsar.getConfiguration().getReplicatorPrefix();
private CompletableFuture<Void> recoverSequenceIdsMap() {
// Load the sequence ids from the snapshot in the cursor properties
managedCursor.getProperties().forEach((k, v) -> {
highestSequencedPushed.put(k, v);
highestSequencedPersisted.put(k, v);
// Replay all the entries and apply all the sequence ids updates"[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries());
CompletableFuture<Void> future = new CompletableFuture<>();
return future;
* Read all the entries published from the cursor position until the most recent and update the highest sequence id
* from each producer.
* @param future future to trigger when the replay is complete
private void replayCursor(CompletableFuture<Void> future) {
managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
for (Entry entry : entries) {
ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload);
String producerName = md.getProducerName();
long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId());
highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, sequenceId);
if (managedCursor.hasMoreEntries()) {
// Read next batch of entries
pulsar.getExecutor().execute(() -> replayCursor(future));
} else {
// Done replaying
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}, null, PositionImpl.LATEST);
public Status getStatus() {
return status;
* Check the status of deduplication. If the configuration has changed, it will enable/disable deduplication,
* returning a future to track the completion of the task
public CompletableFuture<Void> checkStatus() {
boolean shouldBeEnabled = isDeduplicationEnabled();
synchronized (this) {
if (status == Status.Recovering || status == Status.Removing) {
// If there's already a transition happening, check later for status
pulsar.getExecutor().schedule(this::checkStatus, 1, TimeUnit.MINUTES);
return CompletableFuture.completedFuture(null);
if (status == Status.Initialized && !shouldBeEnabled) {
status = Status.Removing;
new DeleteCursorCallback() {
public void deleteCursorComplete(Object ctx) {
status = Status.Disabled;"[{}] Deleted deduplication cursor", topic.getName());
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
status = Status.Disabled;
} else {
log.error("[{}] Deleted deduplication cursor error", topic.getName(), exception);
}, null);
if (status == Status.Enabled && !shouldBeEnabled) {
// Disabled deduping
CompletableFuture<Void> future = new CompletableFuture<>();
status = Status.Removing;
new DeleteCursorCallback() {
public void deleteCursorComplete(Object ctx) {
status = Status.Disabled;
managedCursor = null;
future.complete(null);"[{}] Disabled deduplication", topic.getName());
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
// It's ok for disable message deduplication.
if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
status = Status.Disabled;
managedCursor = null;
} else {
log.warn("[{}] Failed to disable deduplication: {}", topic.getName(),
status = Status.Failed;
}, null);
return future;
} else if ((status == Status.Disabled || status == Status.Initialized) && shouldBeEnabled) {
// Enable deduping
CompletableFuture<Void> future = new CompletableFuture<>();
managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new OpenCursorCallback() {
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
// We don't want to retain cache for this cursor
managedCursor = cursor;
recoverSequenceIdsMap().thenRun(() -> {
status = Status.Enabled;
future.complete(null);"[{}] Enabled deduplication", topic.getName());
}).exceptionally(ex -> {
status = Status.Failed;
log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), ex.getMessage());
return null;
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to enable deduplication: {}", topic.getName(),
}, null);
return future;
} else {
// Nothing to do, we are in the correct state
return CompletableFuture.completedFuture(null);
public boolean isEnabled() {
return status == Status.Enabled;
* Assess whether the message was already stored in the topic.
* @return true if the message should be published or false if it was recognized as a duplicate
public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf headersAndPayload) {
if (!isEnabled() || publishContext.isMarkerMessage()) {
return MessageDupStatus.NotDup;
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId);
if (producerName.startsWith(replicatorPrefix)) {
// Message is coming from replication, we need to use the original producer name and sequence id
// for the purpose of deduplication and not rely on the "replicator" name.
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
producerName = md.getProducerName();
sequenceId = md.getSequenceId();
highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId);
// Synchronize the get() and subsequent put() on the map. This would only be relevant if the producer
// disconnects and re-connects very quickly. At that point the call can be coming from a different thread
synchronized (highestSequencedPushed) {
Long lastSequenceIdPushed = highestSequencedPushed.get(producerName);
if (lastSequenceIdPushed != null && sequenceId <= lastSequenceIdPushed) {
if (log.isDebugEnabled()) {
log.debug("[{}] Message identified as duplicated producer={} seq-id={} -- highest-seq-id={}",
topic.getName(), producerName, sequenceId, lastSequenceIdPushed);
// Also need to check sequence ids that has been persisted.
// If current message's seq id is smaller or equals to the
// lastSequenceIdPersisted than its definitely a dup
// If current message's seq id is between lastSequenceIdPersisted and
// lastSequenceIdPushed, then we cannot be sure whether the message is a dup or not
// we should return an error to the producer for the latter case so that it can retry at a future time
Long lastSequenceIdPersisted = highestSequencedPersisted.get(producerName);
if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
return MessageDupStatus.Dup;
} else {
return MessageDupStatus.Unknown;
highestSequencedPushed.put(producerName, highestSequenceId);
return MessageDupStatus.NotDup;
* Call this method whenever a message is persisted to get the chance to trigger a snapshot.
public void recordMessagePersisted(PublishContext publishContext, PositionImpl position) {
if (!isEnabled() || publishContext.isMarkerMessage()) {
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
long highestSequenceId = publishContext.getHighestSequenceId();
if (publishContext.getOriginalProducerName() != null) {
// In case of replicated messages, this will be different from the current replicator producer name
producerName = publishContext.getOriginalProducerName();
sequenceId = publishContext.getOriginalSequenceId();
highestSequenceId = publishContext.getOriginalHighestSequenceId();
highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId));
if (++snapshotCounter >= snapshotInterval) {
snapshotCounter = 0;
public void resetHighestSequenceIdPushed() {
if (!isEnabled()) {
for (String producer : highestSequencedPersisted.keys()) {
highestSequencedPushed.put(producer, highestSequencedPersisted.get(producer));
private void takeSnapshot(Position position) {
if (log.isDebugEnabled()) {
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
Map<String, Long> snapshot = new TreeMap<>();
highestSequencedPersisted.forEach((producerName, sequenceId) -> {
if (snapshot.size() < maxNumberOfProducers) {
snapshot.put(producerName, sequenceId);
getManagedCursor().asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position);
lastSnapshotTimestamp = System.currentTimeMillis();
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position);
}, null);
private boolean isDeduplicationEnabled() {
return topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get();
* Topic will call this method whenever a producer connects.
public synchronized void producerAdded(String producerName) {
// Producer is no-longer inactive
* Topic will call this method whenever a producer disconnects.
public synchronized void producerRemoved(String producerName) {
// Producer is no-longer active
inactiveProducers.put(producerName, System.currentTimeMillis());
* Remove from hash maps all the producers that were inactive for more than the configured amount of time.
public synchronized void purgeInactiveProducers() {
long minimumActiveTimestamp = System.currentTimeMillis() - TimeUnit.MINUTES
Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
boolean hasInactive = false;
while (mapIterator.hasNext()) {
java.util.Map.Entry<String, Long> entry =;
String producerName = entry.getKey();
long lastActiveTimestamp = entry.getValue();
if (lastActiveTimestamp < minimumActiveTimestamp) {"[{}] Purging dedup information for producer {}", topic.getName(), producerName);
hasInactive = true;
if (hasInactive && isEnabled()) {
public long getLastPublishedSequenceId(String producerName) {
Long sequenceId = highestSequencedPushed.get(producerName);
return sequenceId != null ? sequenceId : -1;
public void takeSnapshot() {
Integer interval = topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get();
long currentTimeStamp = System.currentTimeMillis();
if (interval == null || interval <= 0
|| currentTimeStamp - lastSnapshotTimestamp < TimeUnit.SECONDS.toMillis(interval)) {
PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();
if (position == null) {
PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) {
ManagedCursor getManagedCursor() {
return managedCursor;
private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class);