blob: 3c79026e06d44bae99e94ff740b3ad29c507a8bd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.UUID.randomUUID;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.resources.BaseResources;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.HandlerState.State;
import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl;
import org.apache.pulsar.client.impl.schema.reader.JacksonJsonReader;
import org.apache.pulsar.client.impl.schema.writer.JacksonJsonWriter;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
public class BrokerClientIntegrationTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(BrokerClientIntegrationTest.class);
@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@DataProvider
public Object[][] subType() {
return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } };
}
@DataProvider(name = "booleanFlagProvider")
public Object[][] booleanFlagProvider() {
return new Object[][] { { true }, { false } };
}
/**
* Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle.
*
* <pre>
* 1. after disabling broker fron loadbalancer
* 2. unload namespace-bundle "my-ns1" which disconnects client (producer/consumer) connected on that namespacebundle
* 3. but doesn't close the connection for namesapce-bundle "my-ns2" and clients are still connected
* 4. verifies unloaded "my-ns1" should not connected again with the broker as broker is disabled
* 5. unload "my-ns2" which closes the connection as broker doesn't have any more client connected on that connection
* 6. all namespace-bundles are in "connecting" state and waiting for available broker
* </pre>
*
* @throws Exception
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testDisconnectClientWithoutClosingConnection() throws Exception {
final String ns1 = "my-property/con-ns1";
final String ns2 = "my-property/con-ns2";
admin.namespaces().createNamespace(ns1, Sets.newHashSet("test"));
admin.namespaces().createNamespace(ns2, Sets.newHashSet("test"));
final String topic1 = "persistent://" + ns1 + "/my-topic";
final String topic2 = "persistent://" + ns2 + "/my-topic";
ConsumerImpl<byte[]> cons1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
.subscriptionName("my-subscriber-name").subscribe();
ProducerImpl<byte[]> prod1 = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic1).create();
ProducerImpl<byte[]> prod2 = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic2).create();
ConsumerImpl<byte[]> consumer1 = spy(cons1);
doAnswer(invocationOnMock -> cons1.getState()).when(consumer1).getState();
doAnswer(invocationOnMock -> cons1.getClientCnx()).when(consumer1).getClientCnx();
doAnswer(invocationOnMock -> cons1.cnx()).when(consumer1).cnx();
doAnswer(invocationOnMock -> {
cons1.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]);
return null;
}).when(consumer1).connectionClosed(any());
ProducerImpl<byte[]> producer1 = spy(prod1);
doAnswer(invocationOnMock -> prod1.getState()).when(producer1).getState();
doAnswer(invocationOnMock -> prod1.getClientCnx()).when(producer1).getClientCnx();
doAnswer(invocationOnMock -> prod1.cnx()).when(producer1).cnx();
doAnswer(invocationOnMock -> {
prod1.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]);
return null;
}).when(producer1).connectionClosed(any());
ProducerImpl<byte[]> producer2 = spy(prod2);
doAnswer(invocationOnMock -> prod2.getState()).when(producer2).getState();
doAnswer(invocationOnMock -> prod2.getClientCnx()).when(producer2).getClientCnx();
doAnswer(invocationOnMock -> prod2.cnx()).when(producer2).cnx();
doAnswer(invocationOnMock -> {
prod2.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]);
return null;
}).when(producer2).connectionClosed(any());
ClientCnx clientCnx = producer1.getClientCnx();
Field pfield = ClientCnx.class.getDeclaredField("producers");
pfield.setAccessible(true);
Field cfield = ClientCnx.class.getDeclaredField("consumers");
cfield.setAccessible(true);
ConcurrentLongHashMap<ProducerImpl<byte[]>> producers = (ConcurrentLongHashMap) pfield.get(clientCnx);
ConcurrentLongHashMap<ConsumerImpl<byte[]>> consumers = (ConcurrentLongHashMap) cfield.get(clientCnx);
producers.put(2, producers.get(0));
producers.put(3, producers.get(1));
consumers.put(1, consumers.get(0));
producers.put(0, producer1);
producers.put(1, producer2);
consumers.put(0, consumer1);
// disable this broker to avoid any new requests
pulsar.getLoadManager().get().disableBroker();
NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(TopicName.get(topic1));
NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(TopicName.get(topic2));
// unload ns-bundle:1
pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle1).join();
// let server send signal to close-connection and client close the connection
Thread.sleep(1000);
// [1] Verify: producer1 must get connectionClosed signal
verify(producer1, atLeastOnce()).connectionClosed(any());
// [2] Verify: consumer1 must get connectionClosed signal
verify(consumer1, atLeastOnce()).connectionClosed(any());
// [3] Verify: producer2 should have not received connectionClosed signal
verify(producer2, never()).connectionClosed(any());
// sleep for sometime to let other disconnected producer and consumer connect again: but they should not get
// connected with same broker as that broker is already out from active-broker list
Thread.sleep(200);
// producer1 must not be able to connect again
assertNull(prod1.getClientCnx());
assertEquals(State.Connecting, prod1.getState());
// consumer1 must not be able to connect again
assertNull(cons1.getClientCnx());
assertEquals(State.Connecting, cons1.getState());
// producer2 must have live connection
assertNotNull(prod2.getClientCnx());
assertEquals(State.Ready, prod2.getState());
// unload ns-bundle2 as well
pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle2).join();
// let producer2 give some time to get disconnect signal and get disconnected
Thread.sleep(200);
verify(producer2, atLeastOnce()).connectionClosed(any());
// producer1 must not be able to connect again
assertNull(prod1.getClientCnx());
assertEquals(State.Connecting, prod1.getState());
// consumer1 must not be able to connect again
assertNull(cons1.getClientCnx());
assertEquals(State.Connecting, cons1.getState());
// producer2 must not be able to connect again
assertNull(prod2.getClientCnx());
assertEquals(State.Connecting, prod2.getState());
producer1.close();
producer2.close();
consumer1.close();
prod1.close();
prod2.close();
cons1.close();
}
/**
* Verifies: 1. Closing of Broker service unloads all bundle gracefully and there must not be any connected bundles
* after closing broker service
*
* @throws Exception
*/
@Test
public void testCloseBrokerService() throws Exception {
final String ns1 = "my-property/brok-ns1";
final String ns2 = "my-property/brok-ns2";
admin.namespaces().createNamespace(ns1, Sets.newHashSet("test"));
admin.namespaces().createNamespace(ns2, Sets.newHashSet("test"));
final String topic1 = "persistent://" + ns1 + "/my-topic";
final String topic2 = "persistent://" + ns2 + "/my-topic";
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
.subscriptionName("my-subscriber-name").subscribe();
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic1).create();
ProducerImpl<byte[]> producer2 = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic2).create();
// unload all other namespace
pulsar.getBrokerService().close();
// [1] OwnershipCache should not contain any more namespaces
OwnershipCache ownershipCache = pulsar.getNamespaceService().getOwnershipCache();
assertTrue(ownershipCache.getOwnedBundles().keySet().isEmpty());
// Strategical retry
retryStrategically((test) -> (producer1.getClientCnx() == null && consumer1.getClientCnx() == null
&& producer2.getClientCnx() == null), 5, 100);
// [2] All clients must be disconnected and in connecting state
// producer1 must not be able to connect again
assertNull(producer1.getClientCnx());
assertEquals(State.Connecting, producer1.getState());
// consumer1 must not be able to connect again
assertNull(consumer1.getClientCnx());
assertEquals(State.Connecting, consumer1.getState());
// producer2 must not be able to connect again
assertNull(producer2.getClientCnx());
assertEquals(State.Connecting, producer2.getState());
producer1.close();
producer2.close();
consumer1.close();
}
/**
* It verifies that consumer which doesn't support batch-message:
* <p>
* 1. broker disconnects that consumer
* <p>
* 2. redeliver all those messages to other supported consumer under the same subscription
*
* @param subType
* @throws Exception
*/
@Test(dataProvider = "subType")
public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/my-topic1";
final String subscriptionName = "my-subscriber-name" + subType;
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.subscribe();
final int numMessagesPerBatch = 10;
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();
Producer<byte[]> batchProducer = pulsarClient.newProducer()
.topic(topicName).enableBatching(true)
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.SECONDS)
.batchingMaxMessages(numMessagesPerBatch)
.create();
// update consumer's version to incompatible batch-message version = Version.V3
Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get();
org.apache.pulsar.broker.service.Consumer brokerConsumer = topic.getSubscriptions().get(subscriptionName)
.getConsumers().get(0);
Field cnxField = org.apache.pulsar.broker.service.Consumer.class.getDeclaredField("cnx");
cnxField.setAccessible(true);
PulsarHandler cnx = (PulsarHandler) cnxField.get(brokerConsumer);
Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion");
versionField.setAccessible(true);
versionField.set(cnx, 3);
// (1) send non-batch message: consumer should be able to consume
MessageId lastNonBatchedMessageId = null;
for (int i = 0; i < numMessagesPerBatch; i++) {
String message = "my-message-" + i;
lastNonBatchedMessageId = producer.send(message.getBytes());
}
Set<String> messageSet = Sets.newHashSet();
Message<byte[]> msg = null;
for (int i = 0; i < numMessagesPerBatch; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
consumer1.acknowledge(msg);
}
// Also set clientCnx of the consumer to null so, it avoid reconnection so, other consumer can consume for
// verification
consumer1.setClientCnx(null);
// (2) send batch-message which should not be able to consume: as broker will disconnect the consumer
for (int i = 0; i < numMessagesPerBatch; i++) {
String message = "my-batch-message-" + i;
batchProducer.sendAsync(message.getBytes());
}
batchProducer.flush();
// consumer should have not received any message as it should have been disconnected
msg = consumer1.receive(100, TimeUnit.MILLISECONDS);
assertNull(msg);
// subscribe consumer2 with supporting batch version
@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0); // Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.subscribe();
consumer2.seek(lastNonBatchedMessageId);
messageSet.clear();
for (int i = 0; i < numMessagesPerBatch; i++) {
msg = consumer2.receive();
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-batch-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
consumer2.acknowledge(msg);
}
consumer2.close();
producer.close();
batchProducer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "subType")
public void testResetCursor(SubscriptionType subType) throws Exception {
final RetentionPolicies policy = new RetentionPolicies(60, 52 * 1024);
final TopicName topicName = TopicName.get("persistent://my-property/my-ns/unacked-topic");
final int warmup = 20;
final int testSize = 150;
final List<Message<byte[]>> received = new ArrayList<>();
final String subsId = "sub";
final NavigableMap<Long, TimestampEntryCount> publishTimeIdMap = new ConcurrentSkipListMap<>();
// set delay time to start dispatching messages to active consumer in order to avoid message duplication
conf.setActiveConsumerFailoverDelayTimeMillis(500);
restartBroker();
admin.namespaces().setRetention(topicName.getNamespace(), policy);
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName.toString())
.startMessageIdInclusive()
.subscriptionName(subsId).subscriptionType(subType).messageListener((consumer, msg) -> {
try {
synchronized (received) {
received.add(msg);
}
consumer.acknowledge(msg);
long publishTime = msg.getPublishTime();
log.info(" publish time is " + publishTime + "," + msg.getMessageId());
TimestampEntryCount timestampEntryCount = publishTimeIdMap.computeIfAbsent(publishTime,
(k) -> new TimestampEntryCount(publishTime));
timestampEntryCount.incrementAndGet();
} catch (final PulsarClientException e) {
log.warn("Failed to ack!");
}
});
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
final Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()).create();
log.info("warm up started for " + topicName.toString());
// send warmup msgs
byte[] msgBytes = new byte[1000];
for (int i = 0; i < warmup; i++) {
producer.send(msgBytes);
}
log.info("warm up finished.");
// sleep to ensure receiving of msgs
for (int n = 0; n < 10 && received.size() < warmup; n++) {
Thread.sleep(200);
}
// validate received msgs
Assert.assertEquals(received.size(), warmup);
received.clear();
// publish testSize num of msgs
log.info("Sending more messages.");
for (int n = 0; n < testSize; n++) {
producer.send(msgBytes);
Thread.sleep(1);
}
log.info("Sending more messages done.");
Thread.sleep(3000);
long begints = publishTimeIdMap.firstEntry().getKey();
long endts = publishTimeIdMap.lastEntry().getKey();
// find reset timestamp
long timestamp = (endts - begints) / 2 + begints;
timestamp = publishTimeIdMap.floorKey(timestamp);
NavigableMap<Long, TimestampEntryCount> expectedMessages = new ConcurrentSkipListMap<>();
expectedMessages.putAll(publishTimeIdMap.tailMap(timestamp, true));
received.clear();
log.info("reset cursor to " + timestamp + " for topic " + topicName.toString() + " for subs " + subsId);
log.info("issuing admin operation on " + admin.getServiceUrl());
List<String> subList = admin.topics().getSubscriptions(topicName.toString());
for (String subs : subList) {
log.info("got sub " + subs);
}
publishTimeIdMap.clear();
// reset the cursor to this timestamp
Assert.assertTrue(subList.contains(subsId));
admin.topics().resetCursor(topicName.toString(), subsId, timestamp);
Thread.sleep(3000);
int totalExpected = 0;
for (TimestampEntryCount tec : expectedMessages.values()) {
totalExpected += tec.numMessages;
}
// validate that replay happens after the timestamp
Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp);
consumer1.close();
consumer2.close();
producer.close();
// validate that expected and received counts match
int totalReceived = 0;
for (TimestampEntryCount tec : publishTimeIdMap.values()) {
totalReceived += tec.numMessages;
}
Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset");
resetConfig();
restartBroker();
}
/**
* It verifies that broker throttles down configured concurrent topic loading requests
*
* <pre>
* 1. Start broker with N maxConcurrentTopicLoadRequest
* 2. create concurrent producers on different topics which makes broker to load topics concurrently
* 3. Producer operationtimeout = 1 ms so, if producers creation will fail for throttled topics
* 4. verify all producers should have connected
* </pre>
*
* @throws Exception
*/
@Test
public void testMaxConcurrentTopicLoading() throws Exception {
final String topicName = "persistent://prop/usw/my-ns/cocurrentLoadingTopic";
int concurrentTopic = pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest();
final int concurrentLookupRequests = 20;
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests);
try {
pulsar.getConfiguration().setAuthorizationEnabled(false);
stopBroker();
startBroker();
pulsar.getConfiguration().setMaxConcurrentTopicLoadRequest(1);
String lookupUrl = pulsar.getBrokerServiceUrl();
try (PulsarClientImpl pulsarClient = (PulsarClientImpl) PulsarClient.builder().serviceUrl(lookupUrl)
.statsInterval(0, TimeUnit.SECONDS).maxNumberOfRejectedRequestPerConnection(0).build();
PulsarClientImpl pulsarClient2 = (PulsarClientImpl) PulsarClient.builder().serviceUrl(lookupUrl)
.statsInterval(0, TimeUnit.SECONDS).ioThreads(concurrentLookupRequests)
.connectionsPerBroker(20).build()) {
ProducerImpl<byte[]> producer =
(ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create();
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());
final List<CompletableFuture<Producer<byte[]>>> futures = Lists.newArrayList();
final int totalProducers = 10;
CountDownLatch latch = new CountDownLatch(totalProducers);
for (int i = 0; i < totalProducers; i++) {
executor.submit(() -> {
final String randomTopicName1 = topicName + randomUUID().toString();
final String randomTopicName2 = topicName + randomUUID().toString();
// pass producer-name to avoid exception: producer is already connected to topic
synchronized (futures) {
futures.add(pulsarClient2.newProducer().topic(randomTopicName1).createAsync());
futures.add(pulsarClient.newProducer().topic(randomTopicName2).createAsync());
}
latch.countDown();
});
}
latch.await();
synchronized (futures) {
FutureUtil.waitForAll(futures).get();
}
}
} finally {
// revert back to original value
pulsar.getConfiguration().setMaxConcurrentTopicLoadRequest(concurrentTopic);
}
}
/**
* It verifies that client closes the connection on internalSerevrError which is "ServiceNotReady" from Broker-side
*
* @throws Exception
*/
@Test
public void testCloseConnectionOnInternalServerError() throws Exception {
final String topicName = "persistent://prop/usw/my-ns/newTopic";
@Cleanup
final PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS)
.build();
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create();
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());
Field cacheField = BaseResources.class.getDeclaredField("cache");
cacheField.setAccessible(true);
cacheField.set(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources(), null);
try {
pulsarClient.newProducer().topic(topicName).create();
fail("it should have fail with lookup-exception:");
} catch (Exception e) {
// ok
}
// connection must be closed
assertFalse(cnx.channel().isActive());
}
@Test
public void testInvalidDynamicConfiguration() throws Exception {
// (1) try to update invalid loadManagerClass name
try {
admin.brokers().updateDynamicConfiguration("loadManagerClassName", "org.apache.pulsar.invalid.loadmanager");
fail("it should have failed due to invalid argument");
} catch (PulsarAdminException e) {
// Ok: should have failed due to invalid config value
}
// (2) try to update with valid loadManagerClass name
try {
admin.brokers().updateDynamicConfiguration("loadManagerClassName",
"org.apache.pulsar.broker.loadbalance.ModularLoadManager");
} catch (PulsarAdminException e) {
fail("it should have failed due to invalid argument", e);
}
// (3) restart broker with invalid config value
pulsar.getPulsarResources().getDynamicConfigResources()
.setDynamicConfiguration(m -> {
m.put("loadManagerClassName", "org.apache.pulsar.invalid.loadmanager");
return m;
});
}
static class TimestampEntryCount {
private final long timestamp;
private int numMessages;
public TimestampEntryCount(long ts) {
this.numMessages = 0;
this.timestamp = ts;
}
public int incrementAndGet() {
return ++numMessages;
}
public long getTimestamp() {
return timestamp;
}
}
@Test
public void testCleanProducer() throws Exception {
log.info("-- Starting {} test --", methodName);
admin.clusters().createCluster("global", ClusterData.builder().build());
admin.namespaces().createNamespace("my-property/global/lookup");
final int operationTimeOut = 500;
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).operationTimeout(operationTimeOut, TimeUnit.MILLISECONDS).build();
CountDownLatch latch = new CountDownLatch(1);
pulsarClient.newProducer().topic("persistent://my-property/global/lookup/my-topic1").createAsync()
.handle((producer, e) -> {
latch.countDown();
return null;
});
latch.await(operationTimeOut + 1000, TimeUnit.MILLISECONDS);
Field prodField = PulsarClientImpl.class.getDeclaredField("producers");
prodField.setAccessible(true);
@SuppressWarnings("unchecked")
Set<ProducerBase<byte[]>> producers = (Set<ProducerBase<byte[]>>) prodField
.get(pulsarClient);
assertTrue(producers.isEmpty());
log.info("-- Exiting {} test --", methodName);
}
/**
* It verifies that if broker fails to complete producer/consumer operation then client times out rather waiting
* forever.
*
* @throws PulsarClientException
*/
@Test(expectedExceptions = PulsarClientException.TimeoutException.class)
public void testOperationTimeout() throws PulsarClientException {
final String topicName = "persistent://my-property/my-ns/my-topic1";
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = pulsar.getBrokerService()
.getTopics();
// non-complete topic future so, create topic should timeout
topics.put(topicName, new CompletableFuture<>());
try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
.operationTimeout(2, TimeUnit.SECONDS).statsInterval(0, TimeUnit.SECONDS).build()) {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
} finally {
topics.clear();
}
}
@Test
public void testAddEntryOperationTimeout() throws Exception {
log.info("-- Starting {} test --", methodName);
conf.setManagedLedgerAddEntryTimeoutSeconds(1);
final String topicName = "persistent://my-property/my-ns/addEntryTimeoutTopic";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscriber-name").subscribe();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
class MockLedgerHandle extends PulsarMockLedgerHandle {
public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd)
throws GeneralSecurityException {
super(bk, id, digest, passwd);
}
@Override
public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) {
// do nothing
}
@Override
public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback cb, Object ctx) {
cb.closeComplete(BKException.Code.OK, this, ctx);
}
}
MockLedgerHandle ledgerHandle = mock(MockLedgerHandle.class);
final byte[] data = "data".getBytes();
// this will make first entry to be timed out but then managed-ledger will create a new ledger and next time add
// entry should be successful.
doNothing().when(ledgerHandle).asyncAddEntry(data, null, null);
MockedPulsarServiceBaseTest.setFieldValue(ManagedLedgerImpl.class, ml, "currentLedger", ledgerHandle);
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean addedSuccessfully = new AtomicBoolean(false);
producer.sendAsync(data).handle((res, ex) -> {
if (ex == null) {
addedSuccessfully.set(true);
} else {
log.error("add-entry failed for {}", methodName, ex);
}
latch.countDown();
return null;
});
latch.await();
// broker should be resilient enough to add-entry timeout and add entry successfully.
assertTrue(addedSuccessfully.get());
byte[] receivedData = consumer.receive().getData();
assertEquals(receivedData, data);
producer.close();
consumer.close();
}
@Test
public void testAvroSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws PulsarClientException {
final String topicName = "persistent://my-property/my-ns/my-topic1";
TestMessageObject object = new TestMessageObject();
SchemaReader<TestMessageObject> reader = Mockito.mock(SchemaReader.class);
SchemaWriter<TestMessageObject> writer = Mockito.mock(SchemaWriter.class);
Mockito.when(reader.read(Mockito.any(byte[].class), Mockito.any(byte[].class))).thenReturn(object);
Mockito.when(writer.write(Mockito.any(TestMessageObject.class))).thenReturn("fake data".getBytes(StandardCharsets.UTF_8));
SchemaDefinition<TestMessageObject> schemaDefinition = new SchemaDefinitionBuilderImpl<TestMessageObject>()
.withPojo(TestMessageObject.class)
.withSchemaReader(reader)
.withSchemaWriter(writer)
.build();
Schema<TestMessageObject> schema = Schema.AVRO(schemaDefinition);
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.build(); Producer<TestMessageObject> producer = client.newProducer(schema).topic(topicName).create();
Consumer<TestMessageObject> consumer =
client.newConsumer(schema).topic(topicName).subscriptionName("my-subscriber-name").subscribe()) {
assertNotNull(producer);
assertNotNull(consumer);
producer.newMessage().value(object).send();
TestMessageObject testObject = consumer.receive().getValue();
Assert.assertEquals(object.getValue(), testObject.getValue());
Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
Mockito.verify(reader, Mockito.times(1)).read(Mockito.any(byte[].class), Mockito.any(byte[].class));
}
}
@Test
public void testJsonSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws PulsarClientException {
final String topicName = "persistent://my-property/my-ns/my-topic1";
ObjectMapper mapper = new ObjectMapper();
SchemaReader<TestMessageObject> reader =
spyWithClassAndConstructorArgs(JacksonJsonReader.class, mapper, TestMessageObject.class);
SchemaWriter<TestMessageObject> writer = spyWithClassAndConstructorArgs(JacksonJsonWriter.class, mapper);
SchemaDefinition<TestMessageObject> schemaDefinition = new SchemaDefinitionBuilderImpl<TestMessageObject>()
.withPojo(TestMessageObject.class)
.withSchemaReader(reader)
.withSchemaWriter(writer)
.build();
Schema<TestMessageObject> schema = Schema.JSON(schemaDefinition);
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.build(); Producer<TestMessageObject> producer = client.newProducer(schema).topic(topicName).create();
Consumer<TestMessageObject> consumer = client.newConsumer(schema).topic(topicName).subscriptionName("my-subscriber-name").subscribe()) {
assertNotNull(producer);
assertNotNull(consumer);
TestMessageObject object = new TestMessageObject();
object.setValue("fooooo");
producer.newMessage().value(object).send();
TestMessageObject testObject = consumer.receive().getValue();
Assert.assertEquals(object.getValue(), testObject.getValue());
Mockito.verify(writer, Mockito.times(1)).write(Mockito.any());
Mockito.verify(reader, Mockito.times(1)).read(Mockito.any(byte[].class));
}
}
@Getter
@Setter
@EqualsAndHashCode
private static final class TestMessageObject{
private String value;
}
/**
* It validates pooled message consumption for batch and non-batch messages.
*
* @throws Exception
*/
@Test(dataProvider = "booleanFlagProvider")
public void testConsumerWithPooledMessages(boolean isBatchingEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled;
@Cleanup
Consumer<ByteBuffer> consumer = newPulsarClient.newConsumer(Schema.BYTEBUFFER).topic(topic)
.subscriptionName("my-sub").poolMessages(true).subscribe();
@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L).sendAsync();
}
producer.flush();
// Reuse pre-allocated pooled buffer to process every message
byte[] val = null;
int size = 0;
for (int i = 0; i < numMessages; i++) {
Message<ByteBuffer> msg = consumer.receive();
ByteBuffer value;
try {
value = msg.getValue();
int capacity = value.remaining();
// expand the size of buffer if needed
if (capacity > size) {
val = new byte[capacity];
size = capacity;
}
// read message into pooled buffer
value.get(val, 0, capacity);
// process the message
assertEquals(("value-" + i), new String(val, 0, capacity));
} finally {
msg.release();
}
}
consumer.close();
producer.close();
}
/**
* It verifies that expiry/redelivery of messages relesaes the messages without leak.
*
* @param isBatchingEnabled
* @throws Exception
*/
@Test(dataProvider = "booleanFlagProvider")
public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
final String topic = "persistent://my-property/my-ns/testPooledMessageWithAckTimeout" + isBatchingEnabled;
@Cleanup
ConsumerImpl<ByteBuffer> consumer = (ConsumerImpl<ByteBuffer>) newPulsarClient.newConsumer(Schema.BYTEBUFFER)
.topic(topic).subscriptionName("my-sub").poolMessages(true).subscribe();
@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled)
.create();
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync();
}
producer.flush();
retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500);
MessageImpl<ByteBuffer> msg = (MessageImpl) consumer.incomingMessages.peek();
assertNotNull(msg);
ByteBuf payload = ((MessageImpl) msg).getPayload();
assertNotEquals(payload.refCnt(), 0);
consumer.redeliverUnacknowledgedMessages();
assertEquals(payload.refCnt(), 0);
consumer.close();
producer.close();
}
/**
* It validates pooled message consumption for batch and non-batch messages.
*
* @throws Exception
*/
@Test(dataProvider = "booleanFlagProvider")
public void testConsumerWithPooledMessagesWithReader(boolean isBatchingEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled;
@Cleanup
Reader<ByteBuffer> reader = newPulsarClient.newReader(Schema.BYTEBUFFER).topic(topic).poolMessages(true)
.startMessageId(MessageId.latest).create();
@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L).sendAsync();
}
producer.flush();
// Reuse pre-allocated pooled buffer to process every message
byte[] val = null;
int size = 0;
for (int i = 0; i < numMessages; i++) {
Message<ByteBuffer> msg = reader.readNext();
ByteBuffer value;
try {
value = msg.getValue();
int capacity = value.remaining();
// expand the size of buffer if needed
if (capacity > size) {
val = new byte[capacity];
size = capacity;
}
// read message into pooled buffer
value.get(val, 0, capacity);
// process the message
assertEquals(("value-" + i), new String(val, 0, capacity));
assertTrue(value.isDirect());
} finally {
msg.release();
}
}
reader.close();
producer.close();
}
}