blob: aaf3040f89c61fdd1ab4780ef3c5de0ac22f1bfe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
*/
public class PersistentTopicE2ETest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
super.baseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testSimpleProducerEvents() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic0";
// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
// 2. producer publish messages
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
rolloverPerIntervalStats();
assertTrue(topicRef.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// 3. producer disconnect
producer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(topicRef.getProducers().size(), 0);
}
@Test
public void testSimpleConsumerEvents() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic1";
final String subName = "sub1";
final int numMsgs = 10;
// 1. client connect
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subRef = topicRef.getSubscription(subName);
assertNotNull(topicRef);
assertNotNull(subRef);
assertTrue(subRef.getDispatcher().isConsumerConnected());
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(getAvailablePermits(subRef), 1000 /* default */);
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < numMsgs * 2; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
assertTrue(subRef.getDispatcher().isConsumerConnected());
rolloverPerIntervalStats();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs * 2);
// 2. messages pushed before client receive
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(getAvailablePermits(subRef), 1000 - numMsgs * 2);
Message<byte[]> msg = null;
for (int i = 0; i < numMsgs; i++) {
msg = consumer.receive();
// 3. in-order message delivery
assertEquals(new String(msg.getData()), "my-message-" + i);
consumer.acknowledge(msg);
}
rolloverPerIntervalStats();
// 4. messages deleted on individual acks
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
for (int i = 0; i < numMsgs; i++) {
msg = consumer.receive();
if (i == numMsgs - 1) {
consumer.acknowledgeCumulative(msg);
}
}
rolloverPerIntervalStats();
// 5. messages deleted on cumulative acks
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
// 6. consumer unsubscribe
consumer.unsubscribe();
// 6. consumer graceful close
consumer.close();
// 7. consumer unsubscribe
try {
consumer.unsubscribe();
fail("Should have failed");
} catch (PulsarClientException.AlreadyClosedException e) {
// ok
}
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
subRef = topicRef.getSubscription(subName);
assertNull(subRef);
producer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
}
@Test
public void testConsumerFlowControl() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic2";
final String subName = "sub2";
Message<byte[]> msg;
int recvQueueSize = 4;
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(recvQueueSize).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getSubscription(subName);
assertNotNull(subRef);
// 1. initial receive queue size recorded
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(getAvailablePermits(subRef), recvQueueSize);
for (int i = 0; i < recvQueueSize / 2; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
msg = consumer.receive();
consumer.acknowledge(msg);
}
// 2. queue size re-adjusted after successful receive of half of window size
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(getAvailablePermits(subRef), recvQueueSize);
consumer.close();
assertFalse(subRef.getDispatcher().isConsumerConnected());
}
/**
* Validation: 1. validates active-cursor after active subscription 2. validate active-cursor with subscription 3.
* unconsumed messages should be present into cache 4. cache and active-cursor should be empty once subscription is
* closed
*
* @throws Exception
*/
@Test
public void testActiveSubscriptionWithCache() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic2";
final String subName = "sub2";
Message<byte[]> msg;
int recvQueueSize = 4;
// (1) Create subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(recvQueueSize).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
// (2) Produce Messages
for (int i = 0; i < recvQueueSize / 2; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
msg = consumer.receive();
consumer.acknowledge(msg);
}
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
// (3) Get Entry cache
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
cacheField.setAccessible(true);
EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger);
/************* Validation on non-empty active-cursor **************/
// (4) Get ActiveCursor : which is list of active subscription
Iterable<ManagedCursor> activeCursors = ledger.getActiveCursors();
ManagedCursor curosr = activeCursors.iterator().next();
// (4.1) Validate: active Cursor must be non-empty
assertNotNull(curosr);
// (4.2) Validate: validate cursor name
assertEquals(subName, curosr.getName());
/************* Validation on empty active-cursor **************/
// (5) Close consumer: which (1)removes activeConsumer and (2)clears the entry-cache
consumer.close();
Thread.sleep(1000);
// (5.1) Validate: active-consumer must be empty
assertFalse(ledger.getActiveCursors().iterator().hasNext());
// (5.2) Validate: Entry-cache must be cleared
assertEquals(entryCache.getSize(), 0);
}
// some race conditions needs to be handled
// disabling the test for now to not block commit jobs
@Test(enabled = false)
public void testConcurrentConsumerThreads() throws Exception {
// test concurrent consumer threads on same consumerId
final String topicName = "persistent://prop/ns-abc/topic3";
final String subName = "sub3";
final int recvQueueSize = 100;
final int numConsumersThreads = 10;
ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 1);
for (int i = 0; i < numConsumersThreads; i++) {
executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
barrier.await();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(recvQueueSize).subscribe();
for (int i = 0; i < recvQueueSize / numConsumersThreads; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
}
return null;
}
});
}
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < recvQueueSize * numConsumersThreads; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
barrier.await();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subRef = topicRef.getSubscription(subName);
// 1. cumulatively all threads drain the backlog
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
// 2. flow control works the same as single consumer single thread
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(getAvailablePermits(subRef), recvQueueSize);
executor.shutdown();
}
@Test(enabled = false)
// TODO: enable this after java client supports graceful close
public void testGracefulClose() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic4";
final String subName = "sub4";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
ExecutorService executor = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(1);
executor.submit(() -> {
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
latch.countDown();
return null;
});
producer.close();
// 1. verify there are no pending publish acks once the producer close
// is completed on client
assertEquals(topicRef.getProducers().values().iterator().next().getPendingPublishAcks(), 0);
// safety latch in case of failure,
// wait for the spawned thread to complete
latch.await();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
PersistentSubscription subRef = topicRef.getSubscription(subName);
assertNotNull(subRef);
Message<byte[]> msg = null;
for (int i = 0; i < 10; i++) {
msg = consumer.receive();
}
// 2. verify consumer close fails when there are outstanding
// message acks
try {
consumer.close();
fail("should have failed");
} catch (IllegalStateException e) {
// Expected - messages not acked
}
consumer.acknowledgeCumulative(msg);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
// 3. verify consumer close succeeds once all messages are ack'ed
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertTrue(subRef.getDispatcher().isConsumerConnected());
executor.shutdown();
}
@Test
public void testSimpleCloseTopic() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic5";
final String subName = "sub5";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getSubscription(subName);
assertNotNull(subRef);
Message<byte[]> msg;
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
msg = consumer.receive();
consumer.acknowledge(msg);
}
producer.close();
consumer.close();
topicRef.close().get();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}
@Test
public void testSingleClientMultipleSubscriptions() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic6";
final String subName = "sub6";
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
fail("Should have thrown an exception since one consumer is already connected");
} catch (PulsarClientException cce) {
Assert.assertTrue(cce.getMessage().contains("Exclusive consumer is already connected"));
}
}
@Test
public void testMultipleClientsMultipleSubscriptions() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic7";
final String subName = "sub7";
PulsarClient client1 = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
PulsarClient client2 = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
try {
client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
client1.newProducer().topic(topicName).create();
client2.newProducer().topic(topicName).create();
client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
fail("Should have thrown an exception since one consumer is already connected");
} catch (PulsarClientException cce) {
Assert.assertTrue(cce.getMessage().contains("Exclusive consumer is already connected"));
} finally {
client2.shutdown();
client1.shutdown();
}
}
@Test
public void testTopicDeleteWithDisconnectedSubscription() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic8";
final String subName = "sub1";
// 1. client connect
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subRef = topicRef.getSubscription(subName);
assertNotNull(topicRef);
assertNotNull(subRef);
assertTrue(subRef.getDispatcher().isConsumerConnected());
// 2. client disconnect
consumer.close();
assertFalse(subRef.getDispatcher().isConsumerConnected());
// 3. delete topic
admin.topics().delete(topicName);
try {
admin.topics().getStats(topicName);
} catch (PulsarAdminException e) {
// ok
}
}
int getAvailablePermits(PersistentSubscription sub) {
return sub.getDispatcher().getConsumers().get(0).getAvailablePermits();
}
@Test(enabled = false)
public void testUnloadNamespace() throws Exception {
String topic = "persistent://prop/ns-abc/topic-9";
TopicName topicName = TopicName.get(topic);
pulsarClient.newProducer().topic(topic).create();
pulsarClient.close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topic));
assertTrue(((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()).getManagedLedgers()
.containsKey(topicName.getPersistenceNamingEncoding()));
admin.namespaces().unload("prop/ns-abc");
int i = 0;
for (i = 0; i < 30; i++) {
if (pulsar.getBrokerService().getTopicReference(topic) == null) {
break;
}
Thread.sleep(1000);
}
if (i == 30) {
fail("The topic reference should be null");
}
// ML should have been closed as well
assertFalse(((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()).getManagedLedgers()
.containsKey(topicName.getPersistenceNamingEncoding()));
}
@Test
public void testGC() throws Exception {
// 1. Simple successful GC
String topicName = "persistent://prop/ns-abc/topic-10";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 2. Topic is not GCed with live connection
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 3. Topic with subscription is not GCed even with no connections
consumer.close();
runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}
@Data
@ToString
@EqualsAndHashCode
private static class Foo {
private String field1;
private String field2;
private int field3;
}
private Optional<Topic> getTopic(String topicName) {
return pulsar.getBrokerService().getTopicReference(topicName);
}
private boolean topicHasSchema(String topicName) {
String base = TopicName.get(topicName).getPartitionedTopicName();
String schemaName = TopicName.get(base).getSchemaName();
SchemaRegistry.SchemaAndMetadata result = pulsar.getSchemaRegistryService().getSchema(schemaName).join();
return result != null && !result.schema.isDeleted();
}
@Test
public void testGCWillDeleteSchema() throws Exception {
// 1. Simple successful GC
String topicName = "persistent://prop/ns-abc/topic-1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
Optional<Topic> topic = getTopic(topicName);
assertTrue(topic.isPresent());
byte[] data = JSONSchema.of(SchemaDefinition.builder()
.withPojo(Foo.class).build()).getSchemaInfo().getSchema();
SchemaData schemaData = SchemaData.builder()
.data(data)
.type(SchemaType.BYTES)
.user("foo").build();
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));
runGC();
topic = getTopic(topicName);
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));
// 2. Topic is not GCed with live connection
topicName = "persistent://prop/ns-abc/topic-2";
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));
runGC();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
assertTrue(topicHasSchema(topicName));
// 3. Topic with subscription is not GCed even with no connections
consumer.close();
runGC();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
assertTrue(topicHasSchema(topicName));
// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);
runGC();
topic = getTopic(topicName);
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));
}
@Test
public void testDeleteSchema() throws Exception {
PulsarClientImpl httpProtocolClient = (PulsarClientImpl) PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
PulsarClientImpl binaryProtocolClient = (PulsarClientImpl) pulsarClient;
LookupService binaryLookupService = binaryProtocolClient.getLookup();
LookupService httpLookupService = httpProtocolClient.getLookup();
String topicName = "persistent://prop/ns-abc/topic-1";
//Topic is not GCed with live connection
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Optional<Topic> topic = getTopic(topicName);
assertTrue(topic.isPresent());
byte[] data = JSONSchema.of(SchemaDefinition.builder()
.withPojo(Foo.class).build()).getSchemaInfo().getSchema();
SchemaData schemaData = SchemaData.builder()
.data(data)
.type(SchemaType.BYTES)
.user("foo").build();
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));
Assert.assertEquals(admin.schemas().getAllSchemas(topicName).size(), 1);
assertTrue(httpLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
assertTrue(binaryLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
topic.get().deleteSchema().join();
Assert.assertEquals(admin.schemas().getAllSchemas(topicName).size(), 0);
assertFalse(httpLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
assertFalse(binaryLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
assertFalse(topicHasSchema(topicName));
}
/**
* A topic that has retention policy set to non-0, should not be GCed until it has been inactive for at least the
* retention time.
*/
@Test
public void testGcAndRetentionPolicy() throws Exception {
// Retain data for at-least 10min
admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(10, 10));
// 1. Simple successful GC
String topicName = "persistent://prop/ns-abc/topic-10";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
runGC();
// Should not have been deleted, since we have retention
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// Remove retention
admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies());
Thread.sleep(300);
// 2. Topic is not GCed with live connection
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
runGC();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// 3. Topic with subscription is not GCed even with no connections
consumer.close();
runGC();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}
/**
* A topic that has retention policy set to -1, should not be GCed until it has been inactive for at least the
* retention time and the data should never be deleted
*/
@Test
public void testInfiniteRetentionPolicy() throws Exception {
// Retain data forever
admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(-1, -1));
// 1. Simple successful GC
String topicName = "persistent://prop/ns-abc/topic-10";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
runGC();
// Should not have been deleted, since we have retention
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// Remove retention
admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies());
Thread.sleep(300);
// 2. Topic is not GCed with live connection
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
runGC();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// 3. Topic with subscription is not GCed even with no connections
consumer.close();
runGC();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}
/**
* Set retention policy in default configuration.
* It should be effective.
*/
@Test
public void testServiceConfigurationRetentionPolicy() throws Exception {
// set retention policy in service configuration
pulsar.getConfiguration().setDefaultRetentionSizeInMB(-1);
pulsar.getConfiguration().setDefaultRetentionTimeInMinutes(-1);
String namespaceName = "prop/ns-default-retention-policy";
admin.namespaces().createNamespace(namespaceName);
// 1. Simple successful GC
String topicName = "persistent://prop/ns-abc/topic-10";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
runGC();
// Should not have been deleted, since we have retention
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// Remove retention
admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies());
Thread.sleep(300);
// 2. Topic is not GCed with live connection
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 3. Topic with subscription is not GCed even with no connections
consumer.close();
runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}
@Test
public void testMessageExpiry() throws Exception {
int messageTTLSecs = 1;
String namespaceName = "prop/expiry-check";
admin.namespaces().createNamespace(namespaceName);
admin.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet("test"));
admin.namespaces().setNamespaceMessageTTL(namespaceName, messageTTLSecs);
final String topicName = "persistent://prop/expiry-check/topic1";
final String subName = "sub1";
final int numMsgs = 10;
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subRef = topicRef.getSubscription(subName);
consumer.close();
assertFalse(subRef.getDispatcher().isConsumerConnected());
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
rolloverPerIntervalStats();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs));
runMessageExpiryCheck();
// 1. check all messages expired for this unconnected subscription
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
// clean-up
producer.close();
consumer.close();
admin.topics().deleteSubscription(topicName, subName);
admin.topics().delete(topicName);
admin.namespaces().deleteNamespace(namespaceName);
}
@Test
public void testMessageExpiryWithTopicMessageTTL() throws Exception {
int namespaceMessageTTLSecs = 10;
int topicMessageTTLSecs = 2;
String namespaceName = "prop/expiry-check-2";
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
cleanup();
setup();
admin.namespaces().createNamespace(namespaceName);
admin.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet("test"));
admin.namespaces().setNamespaceMessageTTL(namespaceName, namespaceMessageTTLSecs);
final String topicName = "persistent://prop/expiry-check-2/topic2";
final String subName = "sub1";
final int numMsgs = 10;
admin.topics().createNonPartitionedTopic(topicName);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
// Set topic level message ttl.
Thread.sleep(3000);
admin.topics().setMessageTTL(topicName, topicMessageTTLSecs);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subRef = topicRef.getSubscription(subName);
consumer.close();
assertFalse(subRef.getDispatcher().isConsumerConnected());
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
rolloverPerIntervalStats();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
Thread.sleep(TimeUnit.SECONDS.toMillis(topicMessageTTLSecs));
runMessageExpiryCheck();
// 1. check all messages expired for this unconnected subscription
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
producer.close();
// Set topic level message ttl.
Thread.sleep(3000);
admin.topics().removeMessageTTL(topicName);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
subRef = topicRef.getSubscription(subName);
consumer.close();
assertFalse(subRef.getDispatcher().isConsumerConnected());
producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Thread.sleep(TimeUnit.SECONDS.toMillis(topicMessageTTLSecs));
rolloverPerIntervalStats();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
Thread.sleep(TimeUnit.SECONDS.toMillis(namespaceMessageTTLSecs - topicMessageTTLSecs));
runMessageExpiryCheck();
// 1. check all messages expired for this unconnected subscription
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
// clean-up
try {
producer.close();
consumer.close();
admin.topics().deleteSubscription(topicName, subName);
admin.topics().delete(topicName);
admin.namespaces().deleteNamespace(namespaceName);
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 500);
}
}
@Test
public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
int messageTTLSecs = 10;
String namespaceName = "prop/expiry-check-1";
admin.namespaces().createNamespace(namespaceName);
admin.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet("test"));
admin.namespaces().setNamespaceMessageTTL(namespaceName, messageTTLSecs);
final String topicName = "persistent://prop/expiry-check-1/topic1";
final String subName = "sub1";
final int numMsgs = 10;
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subRef = topicRef.getSubscription(subName);
assertTrue(subRef.getDispatcher().isConsumerConnected());
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
rolloverPerIntervalStats();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs));
runMessageExpiryCheck();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs / 2));
runMessageExpiryCheck();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
}
@Test
public void testSubscriptionTypeTransitions() throws Exception {
final String topicName = "persistent://prop/ns-abc/shared-topic2";
final String subName = "sub2";
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
Consumer<byte[]> consumer2 = null;
Consumer<byte[]> consumer3 = null;
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subRef = topicRef.getSubscription(subName);
// 1. shared consumer on an exclusive sub fails
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).subscribe();
fail("should have failed");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Subscription is of different type"));
}
// 2. failover consumer on an exclusive sub fails
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover).subscribe();
fail("should have failed");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Subscription is of different type"));
}
// 3. disconnected sub can be converted in shared
consumer1.close();
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).subscribe();
assertEquals(subRef.getDispatcher().getType(), SubType.Shared);
} catch (PulsarClientException e) {
fail("should not fail");
}
// 4. exclusive fails on shared sub
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
fail("should have failed");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Subscription is of different type"));
}
// 5. disconnected sub can be converted in failover
consumer2.close();
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover).subscribe();
assertEquals(subRef.getDispatcher().getType(), SubType.Failover);
} catch (PulsarClientException e) {
fail("should not fail");
}
// 5. exclusive consumer can connect after failover disconnects
consumer3.close();
try {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
assertEquals(subRef.getDispatcher().getType(), SubType.Exclusive);
} catch (PulsarClientException e) {
fail("should not fail");
}
consumer1.close();
admin.topics().delete(topicName);
}
@Test
public void testReceiveWithTimeout() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic-receive-timeout";
final String subName = "sub";
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subName).receiverQueueSize(1000).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
assertEquals(consumer.getAvailablePermits(), 0);
Message<byte[]> msg = consumer.receive(10, TimeUnit.MILLISECONDS);
assertNull(msg);
assertEquals(consumer.getAvailablePermits(), 0);
producer.send("test".getBytes());
Thread.sleep(100);
assertEquals(consumer.getAvailablePermits(), 0);
msg = consumer.receive(10, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(consumer.getAvailablePermits(), 1);
msg = consumer.receive(10, TimeUnit.MILLISECONDS);
assertNull(msg);
assertEquals(consumer.getAvailablePermits(), 1);
}
@Test
public void testProducerReturnedMessageId() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic-xyz";
// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topicRef.getManagedLedger();
long ledgerId = managedLedger.getLedgersInfoAsList().get(0).getLedgerId();
// 2. producer publish messages
final int SyncMessages = 10;
for (int i = 0; i < SyncMessages; i++) {
String message = "my-message-" + i;
MessageId receivedMessageId = producer.send(message.getBytes());
assertEquals(receivedMessageId, new MessageIdImpl(ledgerId, i, -1));
}
// 3. producer publish messages async
final int AsyncMessages = 10;
final CountDownLatch counter = new CountDownLatch(AsyncMessages);
for (int i = SyncMessages; i < (SyncMessages + AsyncMessages); i++) {
String content = "my-message-" + i;
final int index = i;
producer.sendAsync(content.getBytes()).thenAccept((msgId) -> {
assertEquals(msgId, new MessageIdImpl(ledgerId, index, -1));
counter.countDown();
}).exceptionally((ex) -> {
return null;
});
}
counter.await();
// 4. producer disconnect
producer.close();
}
@Test
public void testProducerQueueFullBlocking() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic-xyzx";
final int messages = 10;
PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
// 1. Producer connect
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer()
.topic(topicName)
.maxPendingMessages(messages)
.blockIfQueueFull(true)
.sendTimeout(1, TimeUnit.SECONDS)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
// 2. Stop broker
super.internalCleanup();
// 2. producer publish messages
long startTime = System.nanoTime();
for (int i = 0; i < messages; i++) {
// Should never block
producer.sendAsync("msg".getBytes());
}
// Verify thread was not blocked
long delayNs = System.nanoTime() - startTime;
assertTrue(delayNs < TimeUnit.SECONDS.toNanos(1));
assertEquals(producer.getPendingQueueSize(), messages);
// Next send operation must block, until all the messages in the queue expire
startTime = System.nanoTime();
producer.sendAsync("msg".getBytes());
delayNs = System.nanoTime() - startTime;
assertTrue(delayNs > TimeUnit.MILLISECONDS.toNanos(500));
assertTrue(delayNs < TimeUnit.MILLISECONDS.toNanos(1500));
assertEquals(producer.getPendingQueueSize(), 1);
// 4. producer disconnect
producer.close();
client.close();
// 5. Restart broker
setup();
}
@Test
public void testProducerQueueFullNonBlocking() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic-xyzx";
final int messages = 10;
// 1. Producer connect
PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer()
.topic(topicName)
.maxPendingMessages(messages)
.blockIfQueueFull(false)
.sendTimeout(1, TimeUnit.SECONDS)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
// 2. Stop broker
super.internalCleanup();
// 2. producer publish messages
long startTime = System.nanoTime();
for (int i = 0; i < messages; i++) {
// Should never block
producer.sendAsync("msg".getBytes());
}
// Verify thread was not blocked
long delayNs = System.nanoTime() - startTime;
assertTrue(delayNs < TimeUnit.SECONDS.toNanos(1));
assertEquals(producer.getPendingQueueSize(), messages);
// Next send operation must fail and not block
startTime = System.nanoTime();
try {
producer.send("msg".getBytes());
fail("Send should have failed");
} catch (PulsarClientException.ProducerQueueIsFullError e) {
// Expected
}
delayNs = System.nanoTime() - startTime;
assertTrue(delayNs < TimeUnit.SECONDS.toNanos(1));
assertEquals(producer.getPendingQueueSize(), messages);
// 4. producer disconnect
producer.close();
client.close();
// 5. Restart broker
setup();
}
@Test
public void testDeleteTopics() throws Exception {
BrokerService brokerService = pulsar.getBrokerService();
// 1. producers connect
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic("persistent://prop/ns-abc/topic-1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
/* Producer<byte[]> producer2 = */ pulsarClient.newProducer()
.topic("persistent://prop/ns-abc/topic-2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
brokerService.updateRates();
Map<String, NamespaceBundleStats> bundleStatsMap = brokerService.getBundleStats();
assertEquals(bundleStatsMap.size(), 1);
NamespaceBundleStats bundleStats = bundleStatsMap.get("prop/ns-abc/0x00000000_0xffffffff");
assertNotNull(bundleStats);
producer1.close();
admin.topics().delete("persistent://prop/ns-abc/topic-1");
brokerService.updateRates();
bundleStatsMap = brokerService.getBundleStats();
assertEquals(bundleStatsMap.size(), 1);
bundleStats = bundleStatsMap.get("prop/ns-abc/0x00000000_0xffffffff");
assertNotNull(bundleStats);
// // Delete 2nd topic as well
// producer2.close();
// admin.topics().delete("persistent://prop/ns-abc/topic-2");
//
// brokerService.updateRates();
//
// bundleStatsMap = brokerService.getBundleStats();
// assertEquals(bundleStatsMap.size(), 0);
}
@DataProvider(name = "codec")
public Object[][] codecProvider() {
return new Object[][] { { CompressionType.NONE }, { CompressionType.LZ4 }, { CompressionType.ZLIB }, };
}
@Test(dataProvider = "codec")
public void testCompression(CompressionType compressionType) throws Exception {
final String topicName = "persistent://prop/ns-abc/topic0" + compressionType;
// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(compressionType)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
// 2. producer publish messages
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(msg.getData(), ("my-message-" + i).getBytes());
}
// 3. producer disconnect
producer.close();
consumer.close();
}
@Test
public void testBrokerTopicStats() throws Exception {
BrokerService brokerService = this.pulsar.getBrokerService();
Field field = BrokerService.class.getDeclaredField("statsUpdater");
field.setAccessible(true);
ScheduledExecutorService statsUpdater = (ScheduledExecutorService) field.get(brokerService);
// disable statsUpdate to calculate rates explicitly
statsUpdater.shutdown();
final String namespace = "prop/ns-abc";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://" + namespace + "/topic0")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
// 1. producer publish messages
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Metrics metric = null;
// sleep 1 sec to caclulate metrics per second
Thread.sleep(1000);
brokerService.updateRates();
List<Metrics> metrics = brokerService.getTopicMetrics();
for (int i = 0; i < metrics.size(); i++) {
if (metrics.get(i).getDimension("namespace").equalsIgnoreCase(namespace)) {
metric = metrics.get(i);
break;
}
}
assertNotNull(metric);
double msgInRate = (double) metrics.get(0).getMetrics().get("brk_in_rate");
// rate should be calculated and no must be > 0 as we have produced 10 msgs so far
assertTrue(msgInRate > 0);
}
@Test
public void testPayloadCorruptionDetection() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic1";
// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
CompletableFuture<MessageId> future1 = producer.newMessage().value("message-1".getBytes()).sendAsync();
// Stop the broker, and publishes messages. Messages are accumulated in the producer queue and they're checksums
// would have already been computed. If we change the message content at that point, it should result in a
// checksum validation error
stopBroker();
byte[] a2 = "message-2".getBytes();
TypedMessageBuilder<byte[]> msg2 = producer.newMessage().value(a2);
CompletableFuture<MessageId> future2 = msg2.sendAsync();
// corrupt the message, new content would be 'message-3'
((TypedMessageBuilderImpl<byte[]>) msg2).getContent().put(a2.length - 1, (byte) '3');
// Restart the broker to have the messages published
startBroker();
future1.get();
try {
future2.get();
fail("since we corrupted the message, it should be rejected by the broker");
} catch (Exception e) {
// ok
}
// We should only receive msg1
Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "message-1");
while ((msg = consumer.receive(1, TimeUnit.SECONDS)) != null) {
assertEquals(new String(msg.getData()), "message-1");
}
}
/**
* Verify: Broker should not replay already acknowledged messages again and should clear them from messageReplay
* bucket
*
* 1. produce messages 2. consume messages and ack all except 1 msg 3. Verification: should replay only 1 unacked
* message
*/
@Test
public void testMessageRedelivery() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic2";
final String subName = "sub2";
Message<String> msg;
List<Message<String>> unackedMessages = new ArrayList<>();
int totalMessages = 20;
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
// (1) Produce messages
for (int i = 0; i < totalMessages; i++) {
producer.send("my-message-" + i);
}
// (2) Consume and only ack last 10 messages
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive();
if (i >= 10) {
unackedMessages.add(msg);
} else {
consumer.acknowledge(msg);
}
}
consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < 10; i++) {
// Verify: msg [L:0] must be redelivered
try {
final Message<String> redeliveredMsg = consumer.receive(1, TimeUnit.SECONDS);
unackedMessages.removeIf(unackedMessage -> unackedMessage.getValue().equals(redeliveredMsg.getValue()));
} catch (Exception e) {
fail("msg should be redelivered ", e);
}
}
// Make sure that first 10 messages that we didn't acknowledge get redelivered.
assertEquals(unackedMessages.size(), 0);
// Verify no other messages are redelivered
msg = consumer.receive(100, TimeUnit.MILLISECONDS);
assertNull(msg);
consumer.close();
producer.close();
}
/**
* Verify: 1. Broker should not replay already acknowledged messages 2. Dispatcher should not stuck while
* dispatching new messages due to previous-replay of invalid/already-acked messages
*
* @throws Exception
*/
@Test
public void testMessageReplay() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic2";
final String subName = "sub2";
Message<byte[]> msg;
int totalMessages = 10;
int replayIndex = totalMessages / 2;
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(1).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getSubscription(subName);
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef
.getDispatcher();
Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToRedeliver");
replayMap.setAccessible(true);
ConcurrentLongPairSet messagesToReplay = new ConcurrentLongPairSet(64, 1);
assertNotNull(subRef);
// (1) Produce messages
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
MessageIdImpl firstAckedMsg = null;
// (2) Consume and ack messages except first message
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
if (i == 0) {
firstAckedMsg = msgId;
}
if (i < replayIndex) {
// (3) accumulate acked messages for replay
messagesToReplay.add(msgId.getLedgerId(), msgId.getEntryId());
}
}
// (4) redelivery : should redeliver only unacked messages
Thread.sleep(1000);
replayMap.set(dispatcher, messagesToReplay);
// (a) redelivery with all acked-message should clear messageReply bucket
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
assertEquals(messagesToReplay.size(), 0);
// (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it
messagesToReplay.add(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId());
replayMap.set(dispatcher, messagesToReplay);
// send new message
final String testMsg = "testMsg";
producer.send(testMsg.getBytes());
// consumer should be able to receive only new message and not the
dispatcher.consumerFlow(dispatcher.getConsumers().get(0), 1);
msg = consumer.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(msg.getData(), testMsg.getBytes());
consumer.close();
producer.close();
}
@Test
public void testCreateProducerWithSameName() throws Exception {
String topic = "persistent://prop/ns-abc/testCreateProducerWithSameName";
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic(topic)
.producerName("test-producer-a")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition);
Producer<byte[]> p1 = producerBuilder.create();
try {
producerBuilder.create();
fail("Should have thrown ProducerBusyException");
} catch (ProducerBusyException e) {
// Expected
}
p1.close();
// Now p2 should succeed
Producer<byte[]> p2 = producerBuilder.create();
p2.close();
}
@Test
public void testGetOrCreateTopic() throws Exception {
String topicName = "persistent://prop/ns-abc/testGetOrCreateTopic";
admin.lookups().lookupTopic(topicName);
Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get();
assertNotNull(topic);
Optional<Topic> t = pulsar.getBrokerService().getTopicReference(topicName);
assertTrue(t.isPresent());
}
@Test
public void testGetTopicIfExists() throws Exception {
String topicName = "persistent://prop/ns-abc/testGetTopicIfExists";
admin.lookups().lookupTopic(topicName);
Optional<Topic> topic = pulsar.getBrokerService().getTopicIfExists(topicName).join();
assertFalse(topic.isPresent());
Optional<Topic> t = pulsar.getBrokerService().getTopicReference(topicName);
assertFalse(t.isPresent());
}
@Test
public void testWithEventTime() throws Exception {
final String topicName = "prop/ns-abc/topic-event-time";
final String subName = "sub";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName)
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
producer.newMessage().value("test").eventTime(5).send();
Message<String> msg = consumer.receive();
assertNotNull(msg);
assertEquals(msg.getValue(), "test");
assertEquals(msg.getEventTime(), 5);
}
}