| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service.persistent; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; |
| import static org.mockito.Mockito.any; |
| import static org.mockito.Mockito.anyBoolean; |
| import static org.mockito.Mockito.anyInt; |
| import static org.mockito.Mockito.anyList; |
| import static org.mockito.Mockito.anyLong; |
| import static org.mockito.Mockito.anySet; |
| import static org.mockito.Mockito.anyString; |
| import static org.mockito.Mockito.argThat; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.eq; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.fail; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| import io.netty.channel.ChannelPromise; |
| import java.lang.reflect.Field; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Collectors; |
| import org.apache.bookkeeper.mledger.Entry; |
| import org.apache.bookkeeper.mledger.Position; |
| import org.apache.bookkeeper.mledger.impl.EntryImpl; |
| import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.service.BrokerService; |
| import org.apache.pulsar.broker.service.Consumer; |
| import org.apache.pulsar.broker.service.EntryBatchIndexesAcks; |
| import org.apache.pulsar.broker.service.EntryBatchSizes; |
| import org.apache.pulsar.broker.service.RedeliveryTracker; |
| import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; |
| import org.apache.pulsar.common.api.proto.KeySharedMeta; |
| import org.apache.pulsar.common.api.proto.KeySharedMode; |
| import org.apache.pulsar.common.api.proto.MessageMetadata; |
| import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; |
| import org.apache.pulsar.common.protocol.Commands; |
| import org.apache.pulsar.common.protocol.Markers; |
| import org.mockito.ArgumentCaptor; |
| import org.powermock.api.mockito.PowerMockito; |
| import org.powermock.core.classloader.annotations.PowerMockIgnore; |
| import org.powermock.core.classloader.annotations.PrepareForTest; |
| import org.testng.Assert; |
| import org.testng.IObjectFactory; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.ObjectFactory; |
| import org.testng.annotations.Test; |
| |
| @PrepareForTest({ DispatchRateLimiter.class }) |
| @PowerMockIgnore({"org.apache.logging.log4j.*"}) |
| @Test(groups = "broker") |
| public class PersistentStickyKeyDispatcherMultipleConsumersTest { |
| |
| private PulsarService pulsarMock; |
| private BrokerService brokerMock; |
| private ManagedCursorImpl cursorMock; |
| private Consumer consumerMock; |
| private PersistentTopic topicMock; |
| private PersistentSubscription subscriptionMock; |
| private ServiceConfiguration configMock; |
| private ChannelPromise channelMock; |
| |
| private PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher; |
| |
| final String topicName = "persistent://public/default/testTopic"; |
| final String subscriptionName = "testSubscription"; |
| |
| @ObjectFactory |
| public IObjectFactory getObjectFactory() { |
| return new org.powermock.modules.testng.PowerMockObjectFactory(); |
| } |
| |
| @BeforeMethod |
| public void setup() throws Exception { |
| configMock = mock(ServiceConfiguration.class); |
| doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled(); |
| doReturn(100).when(configMock).getDispatcherMaxReadBatchSize(); |
| doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); |
| doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); |
| |
| pulsarMock = mock(PulsarService.class); |
| doReturn(configMock).when(pulsarMock).getConfiguration(); |
| |
| brokerMock = mock(BrokerService.class); |
| doReturn(pulsarMock).when(brokerMock).pulsar(); |
| |
| HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies(); |
| topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0); |
| |
| topicMock = mock(PersistentTopic.class); |
| doReturn(brokerMock).when(topicMock).getBrokerService(); |
| doReturn(topicName).when(topicMock).getName(); |
| doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies(); |
| |
| cursorMock = mock(ManagedCursorImpl.class); |
| doReturn(null).when(cursorMock).getLastIndividualDeletedRange(); |
| doReturn(subscriptionName).when(cursorMock).getName(); |
| |
| consumerMock = mock(Consumer.class); |
| channelMock = mock(ChannelPromise.class); |
| doReturn("consumer1").when(consumerMock).consumerName(); |
| doReturn(1000).when(consumerMock).getAvailablePermits(); |
| doReturn(true).when(consumerMock).isWritable(); |
| doReturn(channelMock).when(consumerMock).sendMessages( |
| anyList(), |
| any(EntryBatchSizes.class), |
| any(EntryBatchIndexesAcks.class), |
| anyInt(), |
| anyLong(), |
| anyLong(), |
| any(RedeliveryTracker.class) |
| ); |
| |
| subscriptionMock = mock(PersistentSubscription.class); |
| |
| PowerMockito.mockStatic(DispatchRateLimiter.class); |
| PowerMockito.when(DispatchRateLimiter.isDispatchRateNeeded( |
| any(BrokerService.class), |
| any(Optional.class), |
| anyString(), |
| any(DispatchRateLimiter.Type.class)) |
| ).thenReturn(false); |
| |
| persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( |
| topicMock, cursorMock, subscriptionMock, configMock, |
| new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); |
| } |
| |
| @Test |
| public void testSendMarkerMessage() { |
| try { |
| persistentDispatcher.addConsumer(consumerMock); |
| persistentDispatcher.consumerFlow(consumerMock, 1000); |
| } catch (Exception e) { |
| fail("Failed to add mock consumer", e); |
| } |
| |
| List<Entry> entries = new ArrayList<>(); |
| ByteBuf markerMessage = Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster"); |
| entries.add(EntryImpl.create(1, 1, markerMessage)); |
| entries.add(EntryImpl.create(1, 2, createMessage("message1", 1))); |
| entries.add(EntryImpl.create(1, 3, createMessage("message2", 2))); |
| entries.add(EntryImpl.create(1, 4, createMessage("message3", 3))); |
| entries.add(EntryImpl.create(1, 5, createMessage("message4", 4))); |
| entries.add(EntryImpl.create(1, 6, createMessage("message5", 5))); |
| |
| try { |
| persistentDispatcher.readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); |
| } catch (Exception e) { |
| fail("Failed to readEntriesComplete.", e); |
| } |
| |
| ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class); |
| verify(consumerMock, times(1)).sendMessages( |
| anyList(), |
| any(EntryBatchSizes.class), |
| any(EntryBatchIndexesAcks.class), |
| totalMessagesCaptor.capture(), |
| anyLong(), |
| anyLong(), |
| any(RedeliveryTracker.class) |
| ); |
| |
| List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues(); |
| Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5); |
| } |
| |
| @Test(timeOut = 10000) |
| public void testSendMessage() { |
| KeySharedMeta keySharedMeta = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY); |
| PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( |
| topicMock, cursorMock, subscriptionMock, configMock, keySharedMeta); |
| try { |
| keySharedMeta.addHashRange() |
| .setStart(0) |
| .setEnd(9); |
| |
| Consumer consumerMock = mock(Consumer.class); |
| doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta(); |
| persistentDispatcher.addConsumer(consumerMock); |
| persistentDispatcher.consumerFlow(consumerMock, 1000); |
| } catch (Exception e) { |
| fail("Failed to add mock consumer", e); |
| } |
| |
| List<Entry> entries = new ArrayList<>(); |
| entries.add(EntryImpl.create(1, 1, createMessage("message1", 1))); |
| entries.add(EntryImpl.create(1, 2, createMessage("message2", 2))); |
| |
| try { |
| //Should success,see issue #8960 |
| persistentDispatcher.readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); |
| } catch (Exception e) { |
| fail("Failed to readEntriesComplete.", e); |
| } |
| } |
| |
| @Test |
| public void testSkipRedeliverTemporally() { |
| final Consumer slowConsumerMock = mock(Consumer.class); |
| final ChannelPromise slowChannelMock = mock(ChannelPromise.class); |
| // add entries to redeliver and read target |
| final List<Entry> redeliverEntries = new ArrayList<>(); |
| redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key1"))); |
| final List<Entry> readEntries = new ArrayList<>(); |
| readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1"))); |
| readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key2"))); |
| |
| try { |
| Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits"); |
| totalAvailablePermitsField.setAccessible(true); |
| totalAvailablePermitsField.set(persistentDispatcher, 1000); |
| |
| doAnswer(invocationOnMock -> { |
| ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) |
| .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); |
| return null; |
| }).when(cursorMock).asyncReadEntriesOrWait( |
| anyInt(), anyLong(), any(PersistentStickyKeyDispatcherMultipleConsumers.class), |
| eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal), any()); |
| } catch (Exception e) { |
| fail("Failed to set to field", e); |
| } |
| |
| // Create 2Consumers |
| try { |
| doReturn("consumer2").when(slowConsumerMock).consumerName(); |
| // Change slowConsumer availablePermits to 0 and back to normal |
| when(slowConsumerMock.getAvailablePermits()) |
| .thenReturn(0) |
| .thenReturn(1); |
| doReturn(true).when(slowConsumerMock).isWritable(); |
| doReturn(slowChannelMock).when(slowConsumerMock).sendMessages( |
| anyList(), |
| any(EntryBatchSizes.class), |
| any(EntryBatchIndexesAcks.class), |
| anyInt(), |
| anyLong(), |
| anyLong(), |
| any(RedeliveryTracker.class) |
| ); |
| |
| persistentDispatcher.addConsumer(consumerMock); |
| persistentDispatcher.addConsumer(slowConsumerMock); |
| } catch (Exception e) { |
| fail("Failed to add mock consumer", e); |
| } |
| |
| // run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers |
| // run readMoreEntries internally (and skip internally) |
| // Change slowConsumer availablePermits to 1 |
| // run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers internally |
| // and then stop to dispatch to slowConsumer |
| persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries); |
| |
| verify(consumerMock, times(1)).sendMessages( |
| argThat(arg -> { |
| assertEquals(arg.size(), 1); |
| Entry entry = arg.get(0); |
| assertEquals(entry.getLedgerId(), 1); |
| assertEquals(entry.getEntryId(), 3); |
| return true; |
| }), |
| any(EntryBatchSizes.class), |
| any(EntryBatchIndexesAcks.class), |
| anyInt(), |
| anyLong(), |
| anyLong(), |
| any(RedeliveryTracker.class) |
| ); |
| verify(slowConsumerMock, times(0)).sendMessages( |
| anyList(), |
| any(EntryBatchSizes.class), |
| any(EntryBatchIndexesAcks.class), |
| anyInt(), |
| anyLong(), |
| anyLong(), |
| any(RedeliveryTracker.class) |
| ); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testMessageRedelivery() throws Exception { |
| final Queue<Position> actualEntriesToConsumer1 = new ConcurrentLinkedQueue<>(); |
| final Queue<Position> actualEntriesToConsumer2 = new ConcurrentLinkedQueue<>(); |
| |
| final Queue<Position> expectedEntriesToConsumer1 = new ConcurrentLinkedQueue<>(); |
| expectedEntriesToConsumer1.add(PositionImpl.get(1, 1)); |
| final Queue<Position> expectedEntriesToConsumer2 = new ConcurrentLinkedQueue<>(); |
| expectedEntriesToConsumer2.add(PositionImpl.get(1, 2)); |
| expectedEntriesToConsumer2.add(PositionImpl.get(1, 3)); |
| |
| final AtomicInteger remainingEntriesNum = new AtomicInteger( |
| expectedEntriesToConsumer1.size() + expectedEntriesToConsumer2.size()); |
| |
| // Messages with key1 are routed to consumer1 and messages with key2 are routed to consumer2 |
| final List<Entry> allEntries = new ArrayList<>(); |
| allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key2"))); |
| allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1"))); |
| allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key1"))); |
| allEntries.forEach(entry -> ((EntryImpl) entry).retain()); |
| |
| final List<Entry> redeliverEntries = new ArrayList<>(); |
| redeliverEntries.add(allEntries.get(0)); // message1 |
| final List<Entry> readEntries = new ArrayList<>(); |
| readEntries.add(allEntries.get(2)); // message3 |
| |
| final Consumer consumer1 = mock(Consumer.class); |
| doReturn("consumer1").when(consumer1).consumerName(); |
| // Change availablePermits of consumer1 to 0 and then back to normal |
| when(consumer1.getAvailablePermits()).thenReturn(0).thenReturn(10); |
| doReturn(true).when(consumer1).isWritable(); |
| doAnswer(invocationOnMock -> { |
| @SuppressWarnings("unchecked") |
| List<Entry> entries = (List<Entry>) invocationOnMock.getArgument(0); |
| for (Entry entry : entries) { |
| remainingEntriesNum.decrementAndGet(); |
| actualEntriesToConsumer1.add(entry.getPosition()); |
| } |
| return channelMock; |
| }).when(consumer1).sendMessages(anyList(), any(EntryBatchSizes.class), any(EntryBatchIndexesAcks.class), |
| anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); |
| |
| final Consumer consumer2 = mock(Consumer.class); |
| doReturn("consumer2").when(consumer2).consumerName(); |
| when(consumer2.getAvailablePermits()).thenReturn(10); |
| doReturn(true).when(consumer2).isWritable(); |
| doAnswer(invocationOnMock -> { |
| @SuppressWarnings("unchecked") |
| List<Entry> entries = (List<Entry>) invocationOnMock.getArgument(0); |
| for (Entry entry : entries) { |
| remainingEntriesNum.decrementAndGet(); |
| actualEntriesToConsumer2.add(entry.getPosition()); |
| } |
| return channelMock; |
| }).when(consumer2).sendMessages(anyList(), any(EntryBatchSizes.class), any(EntryBatchIndexesAcks.class), |
| anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); |
| |
| persistentDispatcher.addConsumer(consumer1); |
| persistentDispatcher.addConsumer(consumer2); |
| |
| final Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class |
| .getDeclaredField("totalAvailablePermits"); |
| totalAvailablePermitsField.setAccessible(true); |
| totalAvailablePermitsField.set(persistentDispatcher, 1000); |
| |
| final Field redeliveryMessagesField = PersistentDispatcherMultipleConsumers.class |
| .getDeclaredField("redeliveryMessages"); |
| redeliveryMessagesField.setAccessible(true); |
| MessageRedeliveryController redeliveryMessages = (MessageRedeliveryController) redeliveryMessagesField |
| .get(persistentDispatcher); |
| redeliveryMessages.add(allEntries.get(0).getLedgerId(), allEntries.get(0).getEntryId(), |
| getStickyKeyHash(allEntries.get(0))); // message1 |
| redeliveryMessages.add(allEntries.get(1).getLedgerId(), allEntries.get(1).getEntryId(), |
| getStickyKeyHash(allEntries.get(1))); // message2 |
| |
| // Mock Cursor#asyncReplayEntries |
| doAnswer(invocationOnMock -> { |
| @SuppressWarnings("unchecked") |
| Set<Position> positions = (Set<Position>) invocationOnMock.getArgument(0); |
| List<Entry> entries = allEntries.stream().filter(entry -> positions.contains(entry.getPosition())) |
| .collect(Collectors.toList()); |
| if (!entries.isEmpty()) { |
| ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(1)) |
| .readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay); |
| } |
| return Collections.emptySet(); |
| }).when(cursorMock).asyncReplayEntries(anySet(), any(PersistentStickyKeyDispatcherMultipleConsumers.class), |
| eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), anyBoolean()); |
| |
| // Mock Cursor#asyncReadEntriesOrWait |
| doAnswer(invocationOnMock -> { |
| ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) |
| .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); |
| return null; |
| }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(), |
| any(PersistentStickyKeyDispatcherMultipleConsumers.class), |
| eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal), any()); |
| |
| // (1) Run sendMessagesToConsumers |
| // (2) Attempts to send message1 to consumer1 but skipped because availablePermits is 0 |
| // (3) Change availablePermits of consumer1 to 10 |
| // (4) Run readMoreEntries internally |
| // (5) Run sendMessagesToConsumers internally |
| // (6) Attempts to send message3 to consumer2 but skipped because redeliveryMessages contains message2 |
| persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay, |
| redeliverEntries); |
| while (remainingEntriesNum.get() > 0) { |
| // (7) Run readMoreEntries and resend message1 to consumer1 and message2-3 to consumer2 |
| persistentDispatcher.readMoreEntries(); |
| } |
| |
| assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1); |
| assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2); |
| |
| allEntries.forEach(entry -> entry.release()); |
| } |
| |
| private ByteBuf createMessage(String message, int sequenceId) { |
| return createMessage(message, sequenceId, "testKey"); |
| } |
| |
| private ByteBuf createMessage(String message, int sequenceId, String key) { |
| MessageMetadata messageMetadata = new MessageMetadata() |
| .setSequenceId(sequenceId) |
| .setProducerName("testProducer") |
| .setPartitionKey(key) |
| .setPartitionKeyB64Encoded(false) |
| .setPublishTime(System.currentTimeMillis()); |
| return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, Unpooled.copiedBuffer(message.getBytes(UTF_8))); |
| } |
| |
| private int getStickyKeyHash(Entry entry) { |
| byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscriptionName); |
| return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); |
| } |
| } |