blob: 6beb01f18ab9af64234d88442a450b302d95e84e [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.service;
import static com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.matches;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertFalse;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.cache.ConfigurationCacheService;
import com.yahoo.pulsar.broker.namespace.NamespaceService;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.service.ServerCnx;
import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
public class PersistentDispatcherFailoverConsumerTest {
private BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
private ConfigurationCacheService configCacheService;
final String successTopicName = "persistent://part-perf/global/perf.t1/ptopic";
final String failTopicName = "persistent://part-perf/global/perf.t1/pfailTopic";
@BeforeMethod
public void setup() throws Exception {
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
PulsarService pulsar = spy(new PulsarService(svcConfig));
doReturn(svcConfig).when(pulsar).getConfiguration();
mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
configCacheService = mock(ConfigurationCacheService.class);
@SuppressWarnings("unchecked")
ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(configCacheService).when(pulsar).getConfigurationCache();
brokerService = spy(new BrokerService(pulsar));
doReturn(brokerService).when(pulsar).getBrokerService();
serverCnx = spy(new ServerCnx(brokerService));
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
NamespaceService nsSvc = mock(NamespaceService.class);
doReturn(nsSvc).when(pulsar).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(DestinationName.class));
setupMLAsyncCallbackMocks();
}
void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursor.class);
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
doReturn("mockCursor").when(cursorMock).getName();
// call openLedgerComplete with ledgerMock on ML factory asyncOpen
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());
// call openLedgerFailed on ML factory asyncOpen
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());
// call addComplete on ledger asyncAddEntry
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1), null);
return null;
}
}).when(ledgerMock).asyncAddEntry(any(byte[].class), any(AddEntryCallback.class), anyObject());
// call openCursorComplete on cursor asyncOpen
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(cursorMock, null);
return null;
}
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(OpenCursorCallback.class), anyObject());
// call deleteLedgerComplete on ledger asyncDelete
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
return null;
}
}).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), anyObject());
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
return null;
}
}).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
}
@Test
public void testAddRemoveConsumer() throws Exception {
log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, cursorMock);
int partitionIndex = 0;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic);
// 1. Verify no consumers connected
assertFalse(pdfc.isConsumerConnected());
// 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
assertEquals(1, consumers.size());
// 3. Add again, duplicate allowed
pdfc.addConsumer(consumer1);
consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
assertEquals(2, consumers.size());
// 4. Verify active consumer
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
// 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(3, consumers.size());
// 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, 0 /* consumer id */, 0, "Cons0"/* consumer name */,
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
assertEquals(4, consumers.size());
// 7. Remove last consumer
pdfc.removeConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
assertEquals(3, consumers.size());
// 8. Verify if we can unsubscribe when more than one consumer is connected
assertFalse(pdfc.canUnsubscribe(consumer0));
// 9. Remove active consumer
pdfc.removeConsumer(consumer0);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(2, consumers.size());
// 10. Attempt to remove already removed consumer
String cause = "";
try {
pdfc.removeConsumer(consumer0);
} catch (Exception e) {
cause = e.getMessage();
}
assertEquals(cause, "Consumer was not connected");
// 11. Remove active consumer
pdfc.removeConsumer(consumer1);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(1, consumers.size());
// 11. With only one consumer, unsubscribe is allowed
assertTrue(pdfc.canUnsubscribe(consumer1));
}
@Test
public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
Consumer consumer4 = createConsumer(1, 2, false, 4);
Consumer consumer5 = createConsumer(1, 1, false, 5);
Consumer consumer6 = createConsumer(1, 2, false, 6);
Consumer consumer7 = createConsumer(2, 1, false, 7);
Consumer consumer8 = createConsumer(2, 1, false, 8);
Consumer consumer9 = createConsumer(2, 1, false, 9);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
dispatcher.addConsumer(consumer7);
dispatcher.addConsumer(consumer8);
dispatcher.addConsumer(consumer9);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer5);
Assert.assertEquals(getNextConsumer(dispatcher), consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer8);
// in between add upper priority consumer with more permits
Consumer consumer10 = createConsumer(0, 2, false, 10);
dispatcher.addConsumer(consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer9);
}
@Test
public void testFewBlockedConsumerSamePriority() throws Exception{
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
Consumer consumer4 = createConsumer(0, 2, false, 4);
Consumer consumer5 = createConsumer(0, 1, true, 5);
Consumer consumer6 = createConsumer(0, 2, true, 6);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), null);
}
@Test
public void testFewBlockedConsumerDifferentPriority() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
Consumer consumer4 = createConsumer(0, 2, false, 4);
Consumer consumer5 = createConsumer(0, 1, true, 5);
Consumer consumer6 = createConsumer(0, 2, true, 6);
Consumer consumer7 = createConsumer(1, 2, false, 7);
Consumer consumer8 = createConsumer(1, 10, true, 8);
Consumer consumer9 = createConsumer(1, 2, false, 9);
Consumer consumer10 = createConsumer(2, 2, false, 10);
Consumer consumer11 = createConsumer(2, 10, true, 11);
Consumer consumer12 = createConsumer(2, 2, false, 12);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
dispatcher.addConsumer(consumer7);
dispatcher.addConsumer(consumer8);
dispatcher.addConsumer(consumer9);
dispatcher.addConsumer(consumer10);
dispatcher.addConsumer(consumer11);
dispatcher.addConsumer(consumer12);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer9);
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer9);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer12);
// add consumer with lower priority again
Consumer consumer13 = createConsumer(0, 2, false, 13);
Consumer consumer14 = createConsumer(0, 2, true, 14);
dispatcher.addConsumer(consumer13);
dispatcher.addConsumer(consumer14);
Assert.assertEquals(getNextConsumer(dispatcher), consumer13);
Assert.assertEquals(getNextConsumer(dispatcher), consumer13);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer12);
Assert.assertEquals(getNextConsumer(dispatcher), null);
}
@Test
public void testFewBlockedConsumerDifferentPriority2() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
Consumer consumer1 = createConsumer(0, 2, true, 1);
Consumer consumer2 = createConsumer(0, 2, true, 2);
Consumer consumer3 = createConsumer(0, 2, true, 3);
Consumer consumer4 = createConsumer(1, 2, false, 4);
Consumer consumer5 = createConsumer(1, 1, false, 5);
Consumer consumer6 = createConsumer(2, 1, false, 6);
Consumer consumer7 = createConsumer(2, 2, true, 7);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
dispatcher.addConsumer(consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer5);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), null);
}
private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatcher) throws Exception {
Method getNextConsumerMethod = PersistentDispatcherMultipleConsumers.class.getDeclaredMethod("getNextConsumer");
getNextConsumerMethod.setAccessible(true);
Consumer consumer = (Consumer) getNextConsumerMethod.invoke(dispatcher);
if (consumer != null) {
Field field = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER");
field.setAccessible(true);
AtomicIntegerFieldUpdater<Consumer> messagePermits = (AtomicIntegerFieldUpdater) field.get(consumer);
messagePermits.decrementAndGet(consumer);
return consumer;
}
return null;
}
private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
Consumer consumer = new Consumer(null, SubType.Shared, id, priority, ""+id, 5000, serverCnx, "appId");
try {
consumer.flowPermits(permit);
} catch (Exception e) {
}
// set consumer blocked flag
Field blockField = Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs");
blockField.setAccessible(true);
blockField.set(consumer, blocked);
return consumer;
}
private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherFailoverConsumerTest.class);
}