blob: a631db5549148839fb3e64c29d4fba70ba2057ba [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertSame;
import static org.testng.AssertJUnit.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.function.Supplier;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class PersistentDispatcherFailoverConsumerTest {
private BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
private ServerCnx serverCnxWithOldVersion;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
private ConfigurationCacheService configCacheService;
private ChannelHandlerContext channelCtx;
private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
private ZooKeeper mockZk;
protected PulsarService pulsar;
final String successTopicName = "persistent://part-perf/global/perf.t1/ptopic";
final String failTopicName = "persistent://part-perf/global/perf.t1/pfailTopic";
@BeforeMethod
public void setup() throws Exception {
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
pulsar = spy(new PulsarService(svcConfig));
doReturn(svcConfig).when(pulsar).getConfiguration();
mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
doReturn(createMockBookKeeper(mockZk, ForkJoinPool.commonPool()))
.when(pulsar).getBookKeeperClient();
ZooKeeperCache cache = mock(ZooKeeperCache.class);
doReturn(30).when(cache).getZkOperationTimeoutSeconds();
doReturn(cache).when(pulsar).getLocalZkCache();
configCacheService = mock(ConfigurationCacheService.class);
@SuppressWarnings("unchecked")
ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class);
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any());
doReturn(zkDataCache).when(zkCache).policiesCache();
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(configCacheService).when(pulsar).getConfigurationCache();
doReturn(zkCache).when(pulsar).getLocalZkCacheService();
brokerService = spy(new BrokerService(pulsar));
doReturn(brokerService).when(pulsar).getBrokerService();
consumerChanges = new LinkedBlockingQueue<>();
this.channelCtx = mock(ChannelHandlerContext.class);
doAnswer(invocationOnMock -> {
ByteBuf buf = invocationOnMock.getArgument(0);
ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4);
try {
int cmdSize = (int) cmdBuf.readUnsignedInt();
int writerIndex = cmdBuf.writerIndex();
BaseCommand cmd = new BaseCommand();
cmd.parseFrom(cmdBuf, cmdSize);
if (cmd.hasActiveConsumerChange()) {
consumerChanges.put(cmd.getActiveConsumerChange());
}
} finally {
cmdBuf.release();
}
return null;
}).when(channelCtx).writeAndFlush(any(), any());
serverCnx = spy(new ServerCnx(pulsar));
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
when(serverCnx.ctx()).thenReturn(channelCtx);
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
.when(serverCnx).getCommandSender();
serverCnxWithOldVersion = spy(new ServerCnx(pulsar));
doReturn(true).when(serverCnxWithOldVersion).isActive();
doReturn(true).when(serverCnxWithOldVersion).isWritable();
doReturn(new InetSocketAddress("localhost", 1234))
.when(serverCnxWithOldVersion).clientAddress();
when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion())
.thenReturn(ProtocolVersion.v11.getValue());
when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx);
doReturn(new PulsarCommandSenderImpl(null, serverCnxWithOldVersion))
.when(serverCnxWithOldVersion).getCommandSender();
NamespaceService nsSvc = mock(NamespaceService.class);
doReturn(nsSvc).when(pulsar).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
setupMLAsyncCallbackMocks();
}
@AfterMethod(alwaysRun = true)
public void shutdown() throws Exception {
if (brokerService != null) {
brokerService.close();
brokerService = null;
}
if (pulsar != null) {
pulsar.close();
pulsar = null;
}
if (mockZk != null) {
mockZk.close();
}
}
void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursorImpl.class);
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
doReturn("mockCursor").when(cursorMock).getName();
// call openLedgerComplete with ledgerMock on ML factory asyncOpen
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class), any());
// call openLedgerFailed on ML factory asyncOpen
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class), any());
// call addComplete on ledger asyncAddEntry
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(
new PositionImpl(1, 1), null, null);
return null;
}
}).when(ledgerMock).asyncAddEntry(any(byte[].class), any(AddEntryCallback.class), any());
// call openCursorComplete on cursor asyncOpen
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
return null;
}
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
// call deleteLedgerComplete on ledger asyncDelete
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
return null;
}
}).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
return null;
}
}).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
}
private void verifyActiveConsumerChange(CommandActiveConsumerChange change,
long consumerId,
boolean isActive) {
assertEquals(consumerId, change.getConsumerId());
assertEquals(isActive, change.isIsActive());
}
@Test
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
int partitionIndex = 0;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic, sub);
// 1. Verify no consumers connected
assertFalse(pdfc.isConsumerConnected());
// 2. Add old consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null);
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
assertEquals(1, consumers.size());
assertNull(consumerChanges.poll());
verify(channelCtx, times(0)).write(any());
// 3. Add new consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null);
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
assertEquals(2, consumers.size());
CommandActiveConsumerChange change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, false);
verify(channelCtx, times(1)).writeAndFlush(any(), any());
}
@Test
public void testAddRemoveConsumer() throws Exception {
log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
int partitionIndex = 4;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic, sub);
// 1. Verify no consumers connected
assertFalse(pdfc.isConsumerConnected());
// 2. Add consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
assertEquals(1, consumers.size());
CommandActiveConsumerChange change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, true);
verify(consumer1, times(1)).notifyActiveConsumerChange(same(consumer1));
// 3. Add again, duplicate allowed
pdfc.addConsumer(consumer1);
consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
assertEquals(2, consumers.size());
// 4. Verify active consumer
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
// get the notified with who is the leader
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, true);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
// 5. Add another consumer which does not change active consumer
Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null));
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
assertEquals(3, consumers.size());
// get notified with who is the leader
change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, false);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
// 6. Add a consumer which changes active consumer
Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer0.consumerName());
assertEquals(4, consumers.size());
// all consumers will receive notifications
change = consumerChanges.take();
verifyActiveConsumerChange(change, 0, true);
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, false);
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, false);
change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, false);
verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0));
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer0));
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer0));
// 7. Remove last consumer
pdfc.removeConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
assertEquals(3, consumers.size());
// not consumer group changes
assertNull(consumerChanges.poll());
// 8. Verify if we cannot unsubscribe when more than one consumer is connected
assertFalse(pdfc.canUnsubscribe(consumer0));
// 9. Remove active consumer
pdfc.removeConsumer(consumer0);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
assertEquals(2, consumers.size());
// the remaining consumers will receive notifications
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, true);
change = consumerChanges.take();
verifyActiveConsumerChange(change, 1, true);
// 10. Attempt to remove already removed consumer
String cause = "";
try {
pdfc.removeConsumer(consumer0);
} catch (Exception e) {
cause = e.getMessage();
}
assertEquals(cause, "Consumer was not connected");
// 11. Remove active consumer
pdfc.removeConsumer(consumer1);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
assertEquals(1, consumers.size());
// not consumer group changes
assertNull(consumerChanges.poll());
// 11. With only one consumer, unsubscribe is allowed
assertTrue(pdfc.canUnsubscribe(consumer1));
}
@Test
public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
// Non partitioned topic.
int partitionIndex = -1;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic, sub);
// 1. Verify no consumers connected
assertFalse(pdfc.isConsumerConnected());
// 2. Add a consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertEquals(1, consumers.size());
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
// 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order.
Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null));
pdfc.addConsumer(consumer2);
// 4. Verify active consumer doesn't change
consumers = pdfc.getConsumers();
assertEquals(2, consumers.size());
CommandActiveConsumerChange change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, false);
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
// 5. Add another consumer which has higher priority level
Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null));
pdfc.addConsumer(consumer3);
consumers = pdfc.getConsumers();
assertEquals(3, consumers.size());
change = consumerChanges.take();
verifyActiveConsumerChange(change, 3, false);
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer1));
// 7. Remove first consumer and active consumer should change to consumer2 since it's added before consumer3
// though consumer 3 has higher priority level
pdfc.removeConsumer(consumer1);
consumers = pdfc.getConsumers();
assertEquals(2, consumers.size());
change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, true);
assertSame(pdfc.getActiveConsumer().consumerName(), consumer2.consumerName());
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer2));
verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer2));
}
@Test
public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
Consumer consumer4 = createConsumer(1, 2, false, 4);
Consumer consumer5 = createConsumer(1, 1, false, 5);
Consumer consumer6 = createConsumer(1, 2, false, 6);
Consumer consumer7 = createConsumer(2, 1, false, 7);
Consumer consumer8 = createConsumer(2, 1, false, 8);
Consumer consumer9 = createConsumer(2, 1, false, 9);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
dispatcher.addConsumer(consumer7);
dispatcher.addConsumer(consumer8);
dispatcher.addConsumer(consumer9);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer5);
Assert.assertEquals(getNextConsumer(dispatcher), consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer8);
// in between add upper priority consumer with more permits
Consumer consumer10 = createConsumer(0, 2, false, 10);
dispatcher.addConsumer(consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer9);
}
@Test
public void testFewBlockedConsumerSamePriority() throws Exception{
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
Consumer consumer4 = createConsumer(0, 2, false, 4);
Consumer consumer5 = createConsumer(0, 1, true, 5);
Consumer consumer6 = createConsumer(0, 2, true, 6);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
assertNull(getNextConsumer(dispatcher));
}
@Test
public void testFewBlockedConsumerDifferentPriority() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
Consumer consumer4 = createConsumer(0, 2, false, 4);
Consumer consumer5 = createConsumer(0, 1, true, 5);
Consumer consumer6 = createConsumer(0, 2, true, 6);
Consumer consumer7 = createConsumer(1, 2, false, 7);
Consumer consumer8 = createConsumer(1, 10, true, 8);
Consumer consumer9 = createConsumer(1, 2, false, 9);
Consumer consumer10 = createConsumer(2, 2, false, 10);
Consumer consumer11 = createConsumer(2, 10, true, 11);
Consumer consumer12 = createConsumer(2, 2, false, 12);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
dispatcher.addConsumer(consumer7);
dispatcher.addConsumer(consumer8);
dispatcher.addConsumer(consumer9);
dispatcher.addConsumer(consumer10);
dispatcher.addConsumer(consumer11);
dispatcher.addConsumer(consumer12);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer9);
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer9);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer12);
// add consumer with lower priority again
Consumer consumer13 = createConsumer(0, 2, false, 13);
Consumer consumer14 = createConsumer(0, 2, true, 14);
dispatcher.addConsumer(consumer13);
dispatcher.addConsumer(consumer14);
Assert.assertEquals(getNextConsumer(dispatcher), consumer13);
Assert.assertEquals(getNextConsumer(dispatcher), consumer13);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer12);
assertNull(getNextConsumer(dispatcher));
}
@Test
public void testFewBlockedConsumerDifferentPriority2() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(0, 2, true, 1);
Consumer consumer2 = createConsumer(0, 2, true, 2);
Consumer consumer3 = createConsumer(0, 2, true, 3);
Consumer consumer4 = createConsumer(1, 2, false, 4);
Consumer consumer5 = createConsumer(1, 1, false, 5);
Consumer consumer6 = createConsumer(2, 1, false, 6);
Consumer consumer7 = createConsumer(2, 2, true, 7);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
dispatcher.addConsumer(consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer5);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer6);
assertNull(getNextConsumer(dispatcher));
}
@SuppressWarnings("unchecked")
private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatcher) throws Exception {
Consumer consumer = dispatcher.getNextConsumer();
if (consumer != null) {
Field field = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER");
field.setAccessible(true);
AtomicIntegerFieldUpdater<Consumer> messagePermits = (AtomicIntegerFieldUpdater<Consumer>) field.get(consumer);
messagePermits.decrementAndGet(consumer);
return consumer;
}
return null;
}
private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
Consumer consumer =
new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000,
serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null);
try {
consumer.flowPermits(permit);
} catch (Exception e) {
}
// set consumer blocked flag
Field blockField = Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs");
blockField.setAccessible(true);
blockField.set(consumer, blocked);
return consumer;
}
private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherFailoverConsumerTest.class);
}