blob: 490be4a8876fccc5b4ee6d7da9406d9bca31dbbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class that contains all the logic to control and perform the deduplication on the broker side.
*/
public class MessageDeduplication {
private final PulsarService pulsar;
private final PersistentTopic topic;
private final ManagedLedger managedLedger;
private ManagedCursor managedCursor;
enum Status {
// Deduplication is initialized
Initialized,
// Deduplication is disabled
Disabled,
// Initializing deduplication state
Recovering,
// Deduplication is in effect
Enabled,
// Turning off deduplication
Removing,
// Failed to enable/disable
Failed,
}
@VisibleForTesting
public enum MessageDupStatus {
// whether a message is a definitely a duplicate or not cannot be determined at this time
Unknown,
// message is definitely NOT a duplicate
NotDup,
// message is definitely a duplicate
Dup,
}
public static class MessageDupUnknownException extends RuntimeException {
public MessageDupUnknownException() {
super("Cannot determine whether the message is a duplicate at this time");
}
}
private volatile Status status;
// Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
// the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
// Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
// after the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
// Number of persisted entries after which to store a snapshot of the sequence ids map
private final int snapshotInterval;
// Counter of number of entries stored after last snapshot was taken
private int snapshotCounter;
// The timestamp when the snapshot was taken by the scheduled task last time
private volatile long lastSnapshotTimestamp = 0L;
// Max number of producer for which to persist the sequence id information
private final int maxNumberOfProducers;
// Map used to track the inactive producer along with the timestamp of their last activity
private final Map<String, Long> inactiveProducers = new ConcurrentHashMap<>();
private final String replicatorPrefix;
public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) {
this.pulsar = pulsar;
this.topic = topic;
this.managedLedger = managedLedger;
this.status = Status.Initialized;
this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval();
this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers();
this.snapshotCounter = 0;
this.replicatorPrefix = pulsar.getConfiguration().getReplicatorPrefix();
}
private CompletableFuture<Void> recoverSequenceIdsMap() {
// Load the sequence ids from the snapshot in the cursor properties
managedCursor.getProperties().forEach((k, v) -> {
producerRemoved(k);
highestSequencedPushed.put(k, v);
highestSequencedPersisted.put(k, v);
});
// Replay all the entries and apply all the sequence ids updates
log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries());
CompletableFuture<Void> future = new CompletableFuture<>();
replayCursor(future);
return future;
}
/**
* Read all the entries published from the cursor position until the most recent and update the highest sequence id
* from each producer.
*
* @param future future to trigger when the replay is complete
*/
private void replayCursor(CompletableFuture<Void> future) {
managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
for (Entry entry : entries) {
ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload);
String producerName = md.getProducerName();
long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId());
highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, sequenceId);
producerRemoved(producerName);
entry.release();
}
if (managedCursor.hasMoreEntries()) {
// Read next batch of entries
pulsar.getExecutor().execute(() -> replayCursor(future));
} else {
// Done replaying
future.complete(null);
}
}
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null, PositionImpl.LATEST);
}
public Status getStatus() {
return status;
}
/**
* Check the status of deduplication. If the configuration has changed, it will enable/disable deduplication,
* returning a future to track the completion of the task
*/
public CompletableFuture<Void> checkStatus() {
boolean shouldBeEnabled = isDeduplicationEnabled();
synchronized (this) {
if (status == Status.Recovering || status == Status.Removing) {
// If there's already a transition happening, check later for status
pulsar.getExecutor().schedule(this::checkStatus, 1, TimeUnit.MINUTES);
return CompletableFuture.completedFuture(null);
}
if (status == Status.Initialized && !shouldBeEnabled) {
status = Status.Removing;
managedLedger.asyncDeleteCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME,
new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
status = Status.Disabled;
log.info("[{}] Deleted deduplication cursor", topic.getName());
}
@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
status = Status.Disabled;
} else {
log.error("[{}] Deleted deduplication cursor error", topic.getName(), exception);
}
}
}, null);
}
if (status == Status.Enabled && !shouldBeEnabled) {
// Disabled deduping
CompletableFuture<Void> future = new CompletableFuture<>();
status = Status.Removing;
managedLedger.asyncDeleteCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME,
new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
status = Status.Disabled;
managedCursor = null;
highestSequencedPushed.clear();
highestSequencedPersisted.clear();
future.complete(null);
log.info("[{}] Disabled deduplication", topic.getName());
}
@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
// It's ok for disable message deduplication.
if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
status = Status.Disabled;
managedCursor = null;
highestSequencedPushed.clear();
highestSequencedPersisted.clear();
future.complete(null);
} else {
log.warn("[{}] Failed to disable deduplication: {}", topic.getName(),
exception.getMessage());
status = Status.Failed;
future.completeExceptionally(exception);
}
}
}, null);
return future;
} else if ((status == Status.Disabled || status == Status.Initialized) && shouldBeEnabled) {
// Enable deduping
CompletableFuture<Void> future = new CompletableFuture<>();
managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
// We don't want to retain cache for this cursor
cursor.setAlwaysInactive();
managedCursor = cursor;
recoverSequenceIdsMap().thenRun(() -> {
status = Status.Enabled;
future.complete(null);
log.info("[{}] Enabled deduplication", topic.getName());
}).exceptionally(ex -> {
status = Status.Failed;
log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), ex.getMessage());
future.completeExceptionally(ex);
return null;
});
}
@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to enable deduplication: {}", topic.getName(),
exception.getMessage());
future.completeExceptionally(exception);
}
}, null);
return future;
} else {
// Nothing to do, we are in the correct state
return CompletableFuture.completedFuture(null);
}
}
}
public boolean isEnabled() {
return status == Status.Enabled;
}
/**
* Assess whether the message was already stored in the topic.
*
* @return true if the message should be published or false if it was recognized as a duplicate
*/
public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf headersAndPayload) {
if (!isEnabled() || publishContext.isMarkerMessage()) {
return MessageDupStatus.NotDup;
}
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId);
if (producerName.startsWith(replicatorPrefix)) {
// Message is coming from replication, we need to use the original producer name and sequence id
// for the purpose of deduplication and not rely on the "replicator" name.
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
producerName = md.getProducerName();
sequenceId = md.getSequenceId();
highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId);
publishContext.setOriginalProducerName(producerName);
publishContext.setOriginalSequenceId(sequenceId);
publishContext.setOriginalHighestSequenceId(highestSequenceId);
headersAndPayload.readerIndex(readerIndex);
}
// Synchronize the get() and subsequent put() on the map. This would only be relevant if the producer
// disconnects and re-connects very quickly. At that point the call can be coming from a different thread
synchronized (highestSequencedPushed) {
Long lastSequenceIdPushed = highestSequencedPushed.get(producerName);
if (lastSequenceIdPushed != null && sequenceId <= lastSequenceIdPushed) {
if (log.isDebugEnabled()) {
log.debug("[{}] Message identified as duplicated producer={} seq-id={} -- highest-seq-id={}",
topic.getName(), producerName, sequenceId, lastSequenceIdPushed);
}
// Also need to check sequence ids that has been persisted.
// If current message's seq id is smaller or equals to the
// lastSequenceIdPersisted than its definitely a dup
// If current message's seq id is between lastSequenceIdPersisted and
// lastSequenceIdPushed, then we cannot be sure whether the message is a dup or not
// we should return an error to the producer for the latter case so that it can retry at a future time
Long lastSequenceIdPersisted = highestSequencedPersisted.get(producerName);
if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
return MessageDupStatus.Dup;
} else {
return MessageDupStatus.Unknown;
}
}
highestSequencedPushed.put(producerName, highestSequenceId);
}
return MessageDupStatus.NotDup;
}
/**
* Call this method whenever a message is persisted to get the chance to trigger a snapshot.
*/
public void recordMessagePersisted(PublishContext publishContext, PositionImpl position) {
if (!isEnabled() || publishContext.isMarkerMessage()) {
return;
}
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
long highestSequenceId = publishContext.getHighestSequenceId();
if (publishContext.getOriginalProducerName() != null) {
// In case of replicated messages, this will be different from the current replicator producer name
producerName = publishContext.getOriginalProducerName();
sequenceId = publishContext.getOriginalSequenceId();
highestSequenceId = publishContext.getOriginalHighestSequenceId();
}
highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId));
if (++snapshotCounter >= snapshotInterval) {
snapshotCounter = 0;
takeSnapshot(position);
}
}
public void resetHighestSequenceIdPushed() {
if (!isEnabled()) {
return;
}
highestSequencedPushed.clear();
for (String producer : highestSequencedPersisted.keys()) {
highestSequencedPushed.put(producer, highestSequencedPersisted.get(producer));
}
}
private void takeSnapshot(Position position) {
if (log.isDebugEnabled()) {
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
}
Map<String, Long> snapshot = new TreeMap<>();
highestSequencedPersisted.forEach((producerName, sequenceId) -> {
if (snapshot.size() < maxNumberOfProducers) {
snapshot.put(producerName, sequenceId);
}
});
getManagedCursor().asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position);
}
lastSnapshotTimestamp = System.currentTimeMillis();
}
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position);
}
}, null);
}
private boolean isDeduplicationEnabled() {
return topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get();
}
/**
* Topic will call this method whenever a producer connects.
*/
public void producerAdded(String producerName) {
// Producer is no-longer inactive
inactiveProducers.remove(producerName);
}
/**
* Topic will call this method whenever a producer disconnects.
*/
public void producerRemoved(String producerName) {
// Producer is no-longer active
inactiveProducers.put(producerName, System.currentTimeMillis());
}
/**
* Remove from hash maps all the producers that were inactive for more than the configured amount of time.
*/
public synchronized void purgeInactiveProducers() {
long minimumActiveTimestamp = System.currentTimeMillis() - TimeUnit.MINUTES
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
Iterator<Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
boolean hasInactive = false;
while (mapIterator.hasNext()) {
java.util.Map.Entry<String, Long> entry = mapIterator.next();
String producerName = entry.getKey();
long lastActiveTimestamp = entry.getValue();
if (lastActiveTimestamp < minimumActiveTimestamp) {
log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
mapIterator.remove();
highestSequencedPushed.remove(producerName);
highestSequencedPersisted.remove(producerName);
hasInactive = true;
}
}
if (hasInactive && isEnabled()) {
takeSnapshot(getManagedCursor().getMarkDeletedPosition());
}
}
public long getLastPublishedSequenceId(String producerName) {
Long sequenceId = highestSequencedPushed.get(producerName);
return sequenceId != null ? sequenceId : -1;
}
public void takeSnapshot() {
if (!isEnabled()) {
return;
}
Integer interval = topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get();
long currentTimeStamp = System.currentTimeMillis();
if (interval == null || interval <= 0
|| currentTimeStamp - lastSnapshotTimestamp < TimeUnit.SECONDS.toMillis(interval)) {
return;
}
PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();
if (position == null) {
return;
}
PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) {
return;
}
takeSnapshot(position);
}
@VisibleForTesting
ManagedCursor getManagedCursor() {
return managedCursor;
}
private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class);
}