blob: a665681528114e5f4999b4cab3c8dd467abb89c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
public class MessageDuplicationTest {
private static final int BROKER_DEDUPLICATION_ENTRIES_INTERVAL = 10;
private static final int BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS = 10;
private static final String REPLICATOR_PREFIX = "foo";
@Test
public void testIsDuplicate() {
PulsarService pulsarService = mock(PulsarService.class);
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
PersistentTopic persistentTopic = mock(PersistentTopic.class);
ManagedLedger managedLedger = mock(ManagedLedger.class);
MessageDeduplication messageDeduplication = spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, persistentTopic, managedLedger);
doReturn(true).when(messageDeduplication).isEnabled();
String producerName1 = "producer1";
ByteBuf byteBuf1 = getMessage(producerName1, 0);
Topic.PublishContext publishContext1 = getPublishContext(producerName1, 0);
String producerName2 = "producer2";
ByteBuf byteBuf2 = getMessage(producerName2, 1);
Topic.PublishContext publishContext2 = getPublishContext(producerName2, 1);
MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
Long lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 0);
status = messageDeduplication.isDuplicate(publishContext2, byteBuf2);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
byteBuf1 = getMessage(producerName1, 1);
publishContext1 = getPublishContext(producerName1, 1);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
byteBuf1 = getMessage(producerName1, 5);
publishContext1 = getPublishContext(producerName1, 5);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 0);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
// should expect unknown because highestSequencePersisted is empty
assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
// update highest sequence persisted
messageDeduplication.highestSequencedPersisted.put(producerName1, 0L);
byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 0);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
// now that highestSequencedPersisted, message with seqId of zero can be classified as a dup
assertEquals(status, MessageDeduplication.MessageDupStatus.Dup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
// update highest sequence persisted
messageDeduplication.highestSequencedPushed.put(producerName1, 0L);
messageDeduplication.highestSequencedPersisted.put(producerName1, 0L);
byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 1, 5);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
publishContext1 = getPublishContext(producerName1, 4, 8);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
}
@Test
public void testInactiveProducerRemove() throws Exception {
PulsarService pulsarService = mock(PulsarService.class);
PersistentTopic topic = mock(PersistentTopic.class);
ManagedLedger managedLedger = mock(ManagedLedger.class);
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
serviceConfiguration.setBrokerDeduplicationProducerInactivityTimeoutMinutes(1);
doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
MessageDeduplication messageDeduplication = spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, topic, managedLedger);
doReturn(true).when(messageDeduplication).isEnabled();
ManagedCursor managedCursor = mock(ManagedCursor.class);
doReturn(managedCursor).when(messageDeduplication).getManagedCursor();
Topic.PublishContext publishContext = mock(Topic.PublishContext.class);
Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers");
field.setAccessible(true);
Map<String, Long> inactiveProducers = (ConcurrentHashMap<String, Long>) field.get(messageDeduplication);
String producerName1 = "test1";
when(publishContext.getHighestSequenceId()).thenReturn(2L);
when(publishContext.getSequenceId()).thenReturn(1L);
when(publishContext.getProducerName()).thenReturn(producerName1);
messageDeduplication.isDuplicate(publishContext, null);
String producerName2 = "test2";
when(publishContext.getProducerName()).thenReturn(producerName2);
messageDeduplication.isDuplicate(publishContext, null);
String producerName3 = "test3";
when(publishContext.getProducerName()).thenReturn(producerName3);
messageDeduplication.isDuplicate(publishContext, null);
// All 3 are added to the inactiveProducers list
messageDeduplication.producerRemoved(producerName1);
messageDeduplication.producerRemoved(producerName2);
messageDeduplication.producerRemoved(producerName3);
// Try first purgeInactive, all producer not inactive.
messageDeduplication.purgeInactiveProducers();
assertEquals(inactiveProducers.size(), 3);
doReturn(false).when(messageDeduplication).isEnabled();
inactiveProducers.put(producerName2, System.currentTimeMillis() - 80000);
inactiveProducers.put(producerName3, System.currentTimeMillis() - 80000);
messageDeduplication.purgeInactiveProducers();
assertFalse(inactiveProducers.containsKey(producerName2));
assertFalse(inactiveProducers.containsKey(producerName3));
doReturn(true).when(messageDeduplication).isEnabled();
// Modify the inactive time of produce2 and produce3
// messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);
inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000);
// Try second purgeInactive, produce2 and produce3 is inactive.
messageDeduplication.purgeInactiveProducers();
assertFalse(inactiveProducers.containsKey(producerName2));
assertFalse(inactiveProducers.containsKey(producerName3));
field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
field.setAccessible(true);
ConcurrentOpenHashMap<String, Long> highestSequencedPushed = (ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
assertFalse(highestSequencedPushed.containsKey(producerName2));
assertFalse(highestSequencedPushed.containsKey(producerName3));
}
@Test
public void testIsDuplicateWithFailure() {
PulsarService pulsarService = mock(PulsarService.class);
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
doReturn(mock(PulsarResources.class)).when(pulsarService).getPulsarResources();
doReturn(mock(CompactionServiceFactory.class)).when(pulsarService).getCompactionServiceFactory();
ManagedLedger managedLedger = mock(ManagedLedger.class);
MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger));
doReturn(true).when(messageDeduplication).isEnabled();
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
doAnswer(invocationOnMock -> {
Object[] args = invocationOnMock.getArguments();
Runnable test = (Runnable) args[0];
test.run();
return null;
}).when(eventLoopGroup).submit(any(Runnable.class));
BrokerService brokerService = mock(BrokerService.class);
doReturn(eventLoopGroup).when(brokerService).executor();
doReturn(pulsarService).when(brokerService).pulsar();
doReturn(new BacklogQuotaManager(pulsarService)).when(brokerService).getBacklogQuotaManager();
PersistentTopic persistentTopic = spyWithClassAndConstructorArgs(PersistentTopic.class, "topic-1", brokerService, managedLedger, messageDeduplication);
String producerName1 = "producer1";
ByteBuf byteBuf1 = getMessage(producerName1, 0);
Topic.PublishContext publishContext1 = getPublishContext(producerName1, 0);
String producerName2 = "producer2";
ByteBuf byteBuf2 = getMessage(producerName2, 1);
Topic.PublishContext publishContext2 = getPublishContext(producerName2, 1);
persistentTopic.publishMessage(byteBuf1, publishContext1);
persistentTopic.addComplete(new PositionImpl(0, 1), null, publishContext1);
verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class), any(), any());
Long lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 0);
lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 0);
persistentTopic.publishMessage(byteBuf2, publishContext2);
persistentTopic.addComplete(new PositionImpl(0, 2), null, publishContext2);
verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
byteBuf1 = getMessage(producerName1, 1);
publishContext1 = getPublishContext(producerName1, 1);
persistentTopic.publishMessage(byteBuf1, publishContext1);
persistentTopic.addComplete(new PositionImpl(0, 3), null, publishContext1);
verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
byteBuf1 = getMessage(producerName1, 5);
publishContext1 = getPublishContext(producerName1, 5);
persistentTopic.publishMessage(byteBuf1, publishContext1);
persistentTopic.addComplete(new PositionImpl(0, 4), null, publishContext1);
verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
// publish dup
byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 0);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L));
// publish message unknown dup status
byteBuf1 = getMessage(producerName1, 6);
publishContext1 = getPublishContext(producerName1, 6);
// don't complete message
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 6);
lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
// publish same message again
byteBuf1 = getMessage(producerName1, 6);
publishContext1 = getPublishContext(producerName1, 6);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(publishContext1, times(1)).completed(any(MessageDeduplication.MessageDupUnknownException.class), eq(-1L), eq(-1L));
// complete seq 6 message eventually
persistentTopic.addComplete(new PositionImpl(0, 5), null, publishContext1);
// simulate failure
byteBuf1 = getMessage(producerName1, 7);
publishContext1 = getPublishContext(producerName1, 7);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any());
persistentTopic.addFailed(new ManagedLedgerException("test"), publishContext1);
// check highestSequencedPushed is reset
assertEquals(messageDeduplication.highestSequencedPushed.size(), 2);
assertEquals(messageDeduplication.highestSequencedPersisted.size(), 2);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertEquals(lastSequenceIdPushed.longValue(), 6);
lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
assertEquals(lastSequenceIdPushed.longValue(), 6);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2);
assertEquals(lastSequenceIdPushed.longValue(), 1);
lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName2);
assertEquals(lastSequenceIdPushed.longValue(), 1);
verify(messageDeduplication, times(1)).resetHighestSequenceIdPushed();
// try dup
byteBuf1 = getMessage(producerName1, 6);
publishContext1 = getPublishContext(producerName1, 6);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L));
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 6);
// try new message
byteBuf1 = getMessage(producerName1, 8);
publishContext1 = getPublishContext(producerName1, 8);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class), any(), any());
persistentTopic.addComplete(new PositionImpl(0, 5), null, publishContext1);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 8);
lastSequenceIdPushed = messageDeduplication.highestSequencedPersisted.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 8);
}
public ByteBuf getMessage(String producerName, long seqId) {
MessageMetadata messageMetadata = new MessageMetadata()
.setProducerName(producerName)
.setSequenceId(seqId)
.setPublishTime(System.currentTimeMillis());
return serializeMetadataAndPayload(
Commands.ChecksumType.Crc32c, messageMetadata, io.netty.buffer.Unpooled.copiedBuffer(new byte[0]));
}
public Topic.PublishContext getPublishContext(String producerName, long seqId) {
return spy(new Topic.PublishContext() {
@Override
public String getProducerName() {
return producerName;
}
public long getSequenceId() {
return seqId;
}
@Override
public void completed(Exception e, long ledgerId, long entryId) {
}
});
}
public Topic.PublishContext getPublishContext(String producerName, long seqId, long lastSequenceId) {
return spy(new Topic.PublishContext() {
@Override
public String getProducerName() {
return producerName;
}
@Override
public long getSequenceId() {
return seqId;
}
@Override
public long getHighestSequenceId() {
return lastSequenceId;
}
@Override
public void completed(Exception e, long ledgerId, long entryId) {
}
});
}
}