blob: d1cf5ad51ba124dad46327e60da935b9f2a6a20f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "flaky")
public class SimpleProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
private static final int RECEIVE_TIMEOUT_SECONDS = 3;
private static final int RECEIVE_TIMEOUT_SHORT_MILLIS = 100;
private static final int RECEIVE_TIMEOUT_MEDIUM_MILLIS = 500;
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}
@DataProvider
public static Object[][] variationsForExpectedPos() {
return new Object[][] {
// batching / start-inclusive / num-of-messages
{true, true, 10 },
{true, false, 10 },
{false, true, 10 },
{false, false, 10 },
{true, true, 100 },
{true, false, 100 },
{false, true, 100 },
{false, false, 100 },
};
}
@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testPublishTimestampBatchDisabled() throws Exception {
log.info("-- Starting {} test --", methodName);
AtomicLong ticker = new AtomicLong(0);
Clock clock = new Clock() {
@Override
public ZoneId getZone() {
return ZoneId.systemDefault();
}
@Override
public Clock withZone(ZoneId zone) {
return this;
}
@Override
public Instant instant() {
return Instant.ofEpochMilli(millis());
}
@Override
public long millis() {
return ticker.incrementAndGet();
}
};
@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.clock(clock)
.build();
final String topic = "persistent://my-property/my-ns/test-publish-timestamp";
@Cleanup
Consumer<byte[]> consumer = newPulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-sub")
.subscribe();
@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
final int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
producer.newMessage()
.value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L)
.sendAsync();
}
producer.flush();
for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("Received message '{}'.", new String(msg.getValue(), UTF_8));
assertEquals(1L + i, msg.getPublishTime());
assertEquals(100L * (i + 1), msg.getEventTime());
}
}
@Test
public void testPublishTimestampBatchEnabled() throws Exception {
log.info("-- Starting {} test --", methodName);
AtomicLong ticker = new AtomicLong(0);
Clock clock = new Clock() {
@Override
public ZoneId getZone() {
return ZoneId.systemDefault();
}
@Override
public Clock withZone(ZoneId zone) {
return this;
}
@Override
public Instant instant() {
return Instant.ofEpochMilli(millis());
}
@Override
public long millis() {
return ticker.incrementAndGet();
}
};
@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.clock(clock)
.build();
final String topic = "persistent://my-property/my-ns/test-publish-timestamp";
@Cleanup
Consumer<byte[]> consumer = newPulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-sub")
.subscribe();
final int numMessages = 5;
@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(10 * numMessages)
.create();
for (int i = 0; i < numMessages; i++) {
producer.newMessage()
.value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L)
.sendAsync();
}
producer.flush();
for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("Received message '{}'.", new String(msg.getValue(), UTF_8));
assertEquals(1L, msg.getPublishTime());
assertEquals(100L * (i + 1), msg.getEventTime());
}
}
@DataProvider(name = "batchAndAckReceipt")
public Object[][] codecProviderWithAckReceipt() {
return new Object[][] { { 0, true}, { 1000, false }, { 0, true }, { 1000, false }};
}
@DataProvider(name = "batch")
public Object[][] codecProvider() {
return new Object[][] { { 0 }, { 1000 } };
}
@Test(dataProvider = "batch")
public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/my-topic1");
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true);
producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerBuilder.batchingMaxMessages(5);
}
Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "batchAndAckReceipt")
public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic2")
.isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionName("my-subscriber-name").subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/my-topic2");
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true);
producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerBuilder.batchingMaxMessages(5);
}
Producer<byte[]> producer = producerBuilder.create();
List<Future<MessageId>> futures = Lists.newArrayList();
// Asynchronously produce messages
for (int i = 0; i < 10; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}
log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}
Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Asynchronously acknowledge upto and including the last message
Future<Void> ackFuture = consumer.acknowledgeCumulativeAsync(msg);
log.info("Waiting for async ack to complete");
ackFuture.get();
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "batch", timeOut = 100000)
public void testMessageListener(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
int numMessages = 100;
final CountDownLatch latch = new CountDownLatch(numMessages);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic3")
.subscriptionName("my-subscriber-name").messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
c1.acknowledgeAsync(msg);
latch.countDown();
}).subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/my-topic3");
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true);
producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerBuilder.batchingMaxMessages(5);
}
Producer<byte[]> producer = producerBuilder.create();
List<Future<MessageId>> futures = Lists.newArrayList();
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}
log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}
log.info("Waiting for message listener to ack all messages");
assertTrue(latch.await(numMessages, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 100000)
public void testPauseAndResume() throws Exception {
log.info("-- Starting {} test --", methodName);
int receiverQueueSize = 20; // number of permits broker has when consumer initially subscribes
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(receiverQueueSize));
AtomicInteger received = new AtomicInteger();
Consumer<byte[]> consumer = pulsarClient.newConsumer().receiverQueueSize(receiverQueueSize)
.topic("persistent://my-property/my-ns/my-topic-pr")
.subscriptionName("my-subscriber-name").messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
c1.acknowledgeAsync(msg);
received.incrementAndGet();
latch.get().countDown();
}).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/my-topic-pr").create();
consumer.pause();
for (int i = 0; i < receiverQueueSize * 2; i++) producer.send(("my-message-" + i).getBytes());
log.info("Waiting for message listener to ack " + receiverQueueSize + " messages");
assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
log.info("Giving message listener an opportunity to receive messages while paused");
Awaitility.await().untilAsserted(
() -> assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused"));
latch.set(new CountDownLatch(receiverQueueSize));
consumer.resume();
log.info("Waiting for message listener to ack all messages");
assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
consumer.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 30000)
public void testPauseAndResumeWithUnloading() throws Exception {
final String topicName = "persistent://my-property/my-ns/pause-and-resume-with-unloading";
final String subName = "sub";
final int receiverQueueSize = 20;
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(receiverQueueSize));
AtomicInteger received = new AtomicInteger();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(receiverQueueSize).messageListener((c1, msg) -> {
assertNotNull(msg, "Message cannot be null");
c1.acknowledgeAsync(msg);
received.incrementAndGet();
latch.get().countDown();
}).subscribe();
consumer.pause();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
for (int i = 0; i < receiverQueueSize * 2; i++) {
producer.send(("my-message-" + i).getBytes());
}
// Paused consumer receives only `receiverQueueSize` messages
assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
"Timed out waiting for message listener acks");
// Make sure no flow permits are sent when the consumer reconnects to the topic
admin.topics().unload(topicName);
Awaitility.await().untilAsserted(
() -> assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused"));
latch.set(new CountDownLatch(receiverQueueSize));
consumer.resume();
assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
"Timed out waiting for message listener acks");
consumer.unsubscribe();
producer.close();
}
@Test(dataProvider = "batch")
public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
// Create consumer and producer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic4")
.subscriptionName("my-subscriber-name")
.startMessageIdInclusive()
.subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/my-topic4");
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true);
producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerBuilder.batchingMaxMessages(5);
} else {
producerBuilder.enableBatching(false);
}
Producer<byte[]> producer = producerBuilder.create();
// Produce messages
for (int i = 0; i < 10; i++) {
producer.sendAsync(("my-message-" + i).getBytes()).thenApply(msgId -> {
log.info("Published message id: {}", msgId);
return msgId;
});
}
producer.flush();
Message<byte[]> msg;
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("Received: [{}]", new String(msg.getData()));
}
// Restart the broker and wait for the backoff to kick in. The client library will try to reconnect, and once
// the broker is up, the consumer should receive the duplicate messages.
log.info("-- Restarting broker --");
restartBroker();
msg = null;
log.info("Receiving duplicate messages..");
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("Received: [{}]", new String(msg.getData()));
Assert.assertNotNull(msg, "Message cannot be null");
}
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "batch")
public void testSendTimeout(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic5")
.subscriptionName("my-subscriber-name").subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true);
producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerBuilder.batchingMaxMessages(5);
}
Producer<byte[]> producer = producerBuilder.create();
final String message = "my-message";
// Trigger the send timeout
stopBroker();
Future<MessageId> future = producer.sendAsync(message.getBytes());
try {
future.get();
Assert.fail("Send operation should have failed");
} catch (ExecutionException e) {
// Expected
}
startBroker();
// We should not have received any message
Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertNull(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testInvalidSequence() throws Exception {
log.info("-- Starting {} test --", methodName);
PulsarClient client1 = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
client1.close();
try {
client1.newConsumer().topic("persistent://my-property/my-ns/my-topic6")
.subscriptionName("my-subscriber-name").subscribe();
Assert.fail("Should fail");
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.AlreadyClosedException);
}
try {
client1.newProducer().topic("persistent://my-property/my-ns/my-topic6").create();
Assert.fail("Should fail");
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.AlreadyClosedException);
}
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic6")
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic6")
.subscriptionName("my-subscriber-name").subscribe();
try {
TypedMessageBuilder<byte[]> builder = producer.newMessage().value("InvalidMessage".getBytes());
Message<byte[]> msg = ((TypedMessageBuilderImpl<byte[]>) builder).getMessage();
consumer.acknowledge(msg);
} catch (PulsarClientException.InvalidMessageException e) {
// ok
}
consumer.close();
try {
consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.fail("Should fail");
} catch (PulsarClientException.AlreadyClosedException e) {
// ok
}
try {
consumer.unsubscribe();
Assert.fail("Should fail");
} catch (PulsarClientException.AlreadyClosedException e) {
// ok
}
producer.close();
try {
producer.send("message".getBytes());
Assert.fail("Should fail");
} catch (PulsarClientException.AlreadyClosedException e) {
// ok
}
}
@Test
public void testSillyUser() {
try {
PulsarClient.builder().serviceUrl("invalid://url").build();
Assert.fail("should fail");
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.InvalidServiceURL);
}
try {
pulsarClient.newProducer().sendTimeout(-1, TimeUnit.SECONDS);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
// ok
}
try {
pulsarClient.newProducer().topic("invalid://topic").create();
Assert.fail("should fail");
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.InvalidTopicNameException);
}
try {
pulsarClient.newConsumer().messageListener(null);
Assert.fail("should fail");
} catch (NullPointerException e) {
// ok
}
try {
pulsarClient.newConsumer().subscriptionType(null);
Assert.fail("should fail");
} catch (NullPointerException e) {
// ok
}
try {
pulsarClient.newConsumer().receiverQueueSize(-1);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
// ok
}
try {
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic7").subscriptionName(null)
.subscribe();
Assert.fail("Should fail");
} catch (PulsarClientException | IllegalArgumentException e) {
assertEquals(e.getClass(), IllegalArgumentException.class);
}
try {
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic7").subscriptionName("")
.subscribe();
Assert.fail("Should fail");
} catch (PulsarClientException | IllegalArgumentException e) {
Assert.assertTrue(e instanceof IllegalArgumentException);
}
try {
pulsarClient.newConsumer().topic("invalid://topic7").subscriptionName("my-subscriber-name").subscribe();
Assert.fail("Should fail");
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.InvalidTopicNameException);
}
}
// This is to test that the flow control counter doesn't get corrupted while concurrent receives during
// reconnections
@Test(dataProvider = "batch", groups = "quarantine")
public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) throws Exception {
final int recvQueueSize = 100;
final int numConsumersThreads = 10;
String subName = UUID.randomUUID().toString();
final Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic7").subscriptionName(subName)
.startMessageIdInclusive()
.receiverQueueSize(recvQueueSize).subscribe();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 1);
for (int i = 0; i < numConsumersThreads; i++) {
executor.submit((Callable<Void>) () -> {
barrier.await();
consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
return null;
});
}
barrier.await();
// we restart the broker to reconnect
restartBroker();
// publish 100 messages so that the consumers blocked on receive() will now get the messages
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/my-topic7");
if (batchMessageDelayMs != 0) {
producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerBuilder.batchingMaxMessages(5);
producerBuilder.enableBatching(true);
}
Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < recvQueueSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
Awaitility.await().untilAsserted(() -> {
// The available permits should be 10 and num messages in the queue should be 90
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads);
});
barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
executor.submit((Callable<Void>) () -> {
barrier.await();
consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
return null;
});
}
barrier.await();
Awaitility.await().untilAsserted(() -> {
// The available permits should be 20 and num messages in the queue should be 80
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads * 2);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - (numConsumersThreads * 2));
});
// clear the queue
while (true) {
Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg == null) {
break;
}
}
Awaitility.await().untilAsserted(() -> {
// The available permits should be 0 and num messages in the queue should be 0
Assert.assertEquals(consumerImpl.getAvailablePermits(), 0);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), 0);
});
barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
executor.submit((Callable<Void>) () -> {
barrier.await();
consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
return null;
});
}
barrier.await();
restartBroker();
Awaitility.await().untilAsserted(() -> {
// The available permits should be 10 and num messages in the queue should be 90
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads);
});
consumer.close();
}
@Test
public void testSendBigMessageSize() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topic = "persistent://my-property/my-ns/bigMsg";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
// Messages are allowed up to MaxMessageSize
producer.newMessage().value(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE]);
try {
producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
fail("Should have thrown exception");
} catch (PulsarClientException.InvalidMessageException e) {
// OK
}
}
/**
* Verifies non-batch message size being validated after performing compression while batch-messaging validates
* before compression of message
*
* <pre>
* send msg with size > MAX_SIZE (5 MB)
* a. non-batch with compression: pass
* b. batch-msg with compression: pass
* c. non-batch w/o compression: fail
* d. non-batch with compression, consumer consume: pass
* e. batch-msg w/o compression: fail
* </pre>
*
* @throws Exception
*/
@Test
public void testSendBigMessageSizeButCompressed() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topic = "persistent://my-property/my-ns/bigMsg";
// (a) non-batch msg with compression
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(CompressionType.LZ4)
.create();
producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
producer.close();
// (b) batch-msg with compression
producer = pulsarClient.newProducer().topic(topic)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(CompressionType.LZ4)
.create();
producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
producer.close();
// (c) non-batch msg without compression
producer = pulsarClient.newProducer().topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(CompressionType.NONE)
.create();
try {
producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
fail("Should have thrown exception");
} catch (PulsarClientException.InvalidMessageException e) {
// OK
}
producer.close();
// (d) non-batch msg with compression and try to consume message
producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(CompressionType.LZ4).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe();
byte[] content = new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 10];
producer.send(content);
assertEquals(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getData(), content);
producer.close();
// (e) batch-msg w/o compression
producer = pulsarClient.newProducer().topic(topic)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.compressionType(CompressionType.NONE)
.create();
try {
producer.send(new byte[Commands.DEFAULT_MAX_MESSAGE_SIZE + 1]);
fail("Should have thrown exception");
} catch (PulsarClientException.InvalidMessageException e) {
// OK
}
producer.close();
consumer.close();
}
/**
* Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages -
* EntryCache should be cleaned : Once active subscription consumes messages
*
* Usecase 2: 2 Active Subscriptions (faster and slower) and slower gets closed - 2 subscribers - Produce Messages -
* 1 faster-subscriber consumes all messages and another slower-subscriber none - EntryCache should have cached
* messages as slower-subscriber has not consumed messages yet - close slower-subscriber - EntryCache should be
* cleared
*
* @throws Exception
*/
@Test
public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
log.info("-- Starting {} test --", methodName);
final long batchMessageDelayMs = 100;
final int receiverSize = 10;
final String topicName = "cache-topic";
final String sub1 = "faster-sub1";
final String sub2 = "slower-sub2";
/************ usecase-1: *************/
// 1. Subscriber Faster subscriber
Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub1)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
final String topic = "persistent://my-property/my-ns/" + topicName;
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic);
producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerBuilder.batchingMaxMessages(5);
producerBuilder.enableBatching(true);
Producer<byte[]> producer = producerBuilder.create();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
cacheField.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(cacheField, cacheField.getModifiers() & ~Modifier.FINAL);
EntryCacheImpl entryCache = spy((EntryCacheImpl) cacheField.get(ledger));
cacheField.set(ledger, entryCache);
Message<byte[]> msg;
// 2. Produce messages
for (int i = 0; i < 30; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// 3. Consume messages
for (int i = 0; i < 30; i++) {
msg = subscriber1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber1.acknowledge(msg);
}
// Verify: EntryCache has been invalidated
verify(entryCache, atLeastOnce()).invalidateEntries(any());
// sleep for a second: as ledger.updateCursorRateLimit RateLimiter will allow to invoke cursor-update after a
// second
Thread.sleep(1000);//
// produce-consume one more message to trigger : ledger.internalReadFromLedger(..) which updates cursor and
// EntryCache
producer.send("message".getBytes());
msg = subscriber1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
/************ usecase-2: *************/
// 1.b Subscriber slower-subscriber
Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub2).subscribe();
// Produce messages
final int moreMessages = 10;
for (int i = 0; i < receiverSize + moreMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// Consume messages
for (int i = 0; i < receiverSize + moreMessages; i++) {
msg = subscriber1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber1.acknowledge(msg);
}
// sleep for a second: as ledger.updateCursorRateLimit RateLimiter will allow to invoke cursor-update after a
// second
Thread.sleep(1000);//
// produce-consume one more message to trigger : ledger.internalReadFromLedger(..) which updates cursor and
// EntryCache
producer.send("message".getBytes());
msg = subscriber1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
// Verify: as active-subscriber2 has not consumed messages: EntryCache must have those entries in cache
Awaitility.await().untilAsserted(() -> assertNotEquals(entryCache.getSize(), 0));
// 3.b Close subscriber2: which will trigger cache to clear the cache
subscriber2.close();
// retry strategically until broker clean up closed subscribers and invalidate all cache entries
retryStrategically((test) -> entryCache.getSize() == 0, 5, 100);
// Verify: EntryCache should be cleared
assertEquals(entryCache.getSize(), 0);
subscriber1.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "ackReceiptEnabled")
public void testDeactivatingBacklogConsumer(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
final long batchMessageDelayMs = 100;
final int receiverSize = 10;
final String topicName = "cache-topic";
final String topic = "persistent://my-property/my-ns/" + topicName;
final String sub1 = "faster-sub1";
final String sub2 = "slower-sub2";
// 1. Subscriber Faster subscriber: let it consume all messages immediately
Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub1)
.isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
// 1.b. Subscriber Slow subscriber:
Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub2)
.isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic);
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5);
Producer<byte[]> producer = producerBuilder.create();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
// reflection to set/get cache-backlog fields value:
final long maxMessageCacheRetentionTimeMillis = conf.getManagedLedgerCacheEvictionTimeThresholdMillis();
final long maxActiveCursorBacklogEntries = conf.getManagedLedgerCursorBackloggedThreshold();
Message<byte[]> msg;
final int totalMsgs = (int) maxActiveCursorBacklogEntries + receiverSize + 1;
// 2. Produce messages
for (int i = 0; i < totalMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// 3. Consume messages: at Faster subscriber
for (int i = 0; i < totalMsgs; i++) {
msg = subscriber1.receive(RECEIVE_TIMEOUT_SHORT_MILLIS, TimeUnit.MILLISECONDS);
subscriber1.acknowledgeAsync(msg);
}
// wait : so message can be eligible to to be evict from cache
Thread.sleep(maxMessageCacheRetentionTimeMillis);
// 4. deactivate subscriber which has built the backlog
topicRef.checkBackloggedCursors();
Thread.sleep(100);
// 5. verify: active subscribers
Set<String> activeSubscriber = Sets.newHashSet();
ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
assertTrue(activeSubscriber.contains(sub1));
assertFalse(activeSubscriber.contains(sub2));
// 6. consume messages : at slower subscriber
for (int i = 0; i < totalMsgs; i++) {
msg = subscriber2.receive(RECEIVE_TIMEOUT_SHORT_MILLIS, TimeUnit.MILLISECONDS);
subscriber2.acknowledgeAsync(msg);
}
topicRef.checkBackloggedCursors();
activeSubscriber.clear();
ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
assertTrue(activeSubscriber.contains(sub1));
assertTrue(activeSubscriber.contains(sub2));
}
@Test(timeOut = 5000)
public void testAsyncProducerAndConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 100;
final Set<String> produceMsgs = Sets.newHashSet();
final Set<String> consumeMsgs = Sets.newHashSet();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
// produce message
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
.create();
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
produceMsgs.add(message);
}
log.info(" start receiving messages :");
CountDownLatch latch = new CountDownLatch(totalMsg);
// receive messages
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(1);
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);
latch.await();
// verify message produced correctly
assertEquals(produceMsgs.size(), totalMsg);
// verify produced and consumed messages must be exactly same
produceMsgs.removeAll(consumeMsgs);
assertTrue(produceMsgs.isEmpty());
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 5000)
public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 100;
final Set<String> produceMsgs = Sets.newHashSet();
final Set<String> consumeMsgs = Sets.newHashSet();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
// produce message
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
.create();
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
produceMsgs.add(message);
}
log.info(" start receiving messages :");
CountDownLatch latch = new CountDownLatch(totalMsg);
// receive messages
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(1);
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);
latch.await();
// verify message produced correctly
assertEquals(produceMsgs.size(), totalMsg);
// verify produced and consumed messages must be exactly same
produceMsgs.removeAll(consumeMsgs);
assertTrue(produceMsgs.isEmpty());
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testSendCallBackReturnSequenceId() throws Exception {
log.info("-- Starting {} test --", methodName);
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.enableBatching(false)
.topic("persistent://my-property/my-ns/my-topic5")
.sendTimeout(1, TimeUnit.SECONDS);
Producer<byte[]> producer = producerBuilder.create();
final String message = "my-message";
// Trigger the send timeout
stopBroker();
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for(int i = 0 ; i < 3 ; i++) {
CompletableFuture<MessageId> future = producer.newMessage().sequenceId(i).value(message.getBytes()).sendAsync();
futures.add(future);
}
Awaitility.await().until(() -> {
futures.get(0).exceptionally(ex -> {
long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId();
Assert.assertEquals(sequenceId, 0L);
return null;
});
futures.get(1).exceptionally(ex -> {
long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId();
Assert.assertEquals(sequenceId, 1L);
return null;
});
futures.get(2).exceptionally(ex -> {
long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId();
Assert.assertEquals(sequenceId, 2L);
return null;
});
return true;
});
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testSendCallBack() throws Exception {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 100;
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < totalMsg; i++) {
final String message = "my-message-" + i;
int len = message.getBytes().length;
final AtomicInteger msgLength = new AtomicInteger();
CompletableFuture<MessageId> future = producer.sendAsync(message.getBytes()).handle((r, ex) -> {
if (ex != null) {
log.error("Message send failed:", ex);
} else {
msgLength.set(len);
}
return null;
});
future.get();
assertEquals(message.getBytes().length, msgLength.get());
}
}
/**
* consume message from consumer1 and send acknowledgement from different consumer subscribed under same
* subscription-name
*
* @throws Exception
*/
@Test(dataProvider = "ackReceiptEnabled", timeOut = 30000)
public void testSharedConsumerAckDifferentConsumer(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name")
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Message<byte[]> msg;
Set<Message<byte[]>> consumerMsgSet1 = Sets.newHashSet();
Set<Message<byte[]>> consumerMsgSet2 = Sets.newHashSet();
for (int i = 0; i < 5; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
consumerMsgSet1.add(msg);
msg = consumer2.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
consumerMsgSet2.add(msg);
}
consumerMsgSet1.forEach(m -> {
try {
consumer2.acknowledge(m);
} catch (PulsarClientException e) {
fail();
}
});
consumerMsgSet2.forEach(m -> {
try {
consumer1.acknowledge(m);
} catch (PulsarClientException e) {
fail();
}
});
consumer1.redeliverUnacknowledgedMessages();
consumer2.redeliverUnacknowledgedMessages();
Thread.sleep(1000L);
try {
if (consumer1.receive(RECEIVE_TIMEOUT_SHORT_MILLIS, TimeUnit.MILLISECONDS) != null
|| consumer2.receive(RECEIVE_TIMEOUT_SHORT_MILLIS, TimeUnit.MILLISECONDS) != null) {
fail();
}
} finally {
consumer1.close();
consumer2.close();
}
log.info("-- Exiting {} test --", methodName);
}
private void receiveAsync(Consumer<byte[]> consumer, int totalMessage, int currentMessage, CountDownLatch latch,
final Set<String> consumeMsg, ExecutorService executor) throws PulsarClientException {
if (currentMessage < totalMessage) {
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
future.handle((msg, exception) -> {
if (exception == null) {
// add message to consumer-queue to verify with produced messages
consumeMsg.add(new String(msg.getData()));
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e1) {
fail("message acknowledge failed", e1);
}
// consume next message
executor.execute(() -> {
try {
receiveAsync(consumer, totalMessage, currentMessage + 1, latch, consumeMsg, executor);
} catch (PulsarClientException e) {
fail("message receive failed", e);
}
});
latch.countDown();
}
return null;
});
}
}
/**
* Verify: Consumer stops receiving msg when reach unack-msg limit and starts receiving once acks messages 1.
* Produce X (600) messages 2. Consumer has receive size (10) and receive message without acknowledging 3. Consumer
* will stop receiving message after unAckThreshold = 500 4. Consumer acks messages and starts consuming remaining
* messages This test case enables checksum sending while producing message and broker verifies the checksum for the
* message.
*
*/
@Test(dataProvider = "ackReceiptEnabled")
public void testConsumerBlockingWithUnAckedMessages(boolean ackReceiptEnabled) {
log.info("-- Starting {} test --", methodName);
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
try {
final int unAckedMessagesBufferSize = 500;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 600;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic").create();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// client must receive number of messages = unAckedMessagesBufferSize rather all produced messages
assertEquals(messages.size(), unAckedMessagesBufferSize);
// start acknowledging messages
messages.forEach(consumer::acknowledgeAsync);
// try to consume remaining messages
int remainingMessages = totalProducedMsgs - messages.size();
for (int i = 0; i < remainingMessages; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
log.info("Received message: " + new String(msg.getData()));
}
}
// total received-messages should match to produced messages
assertEquals(totalProducedMsgs, messages.size());
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
/**
* Verify: iteration of a. message receive w/o acking b. stop receiving msg c. ack msgs d. started receiving msgs
*
* 1. Produce total X (1500) messages 2. Consumer consumes messages without acking until stop receiving from broker
* due to reaching ack-threshold (500) 3. Consumer acks messages after stop getting messages 4. Consumer again tries
* to consume messages 5. Consumer should be able to complete consuming all 1500 messages in 3 iteration (1500/500)
*
*/
@Test(dataProvider = "ackReceiptEnabled")
public void testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean ackReceiptEnabled) {
log.info("-- Starting {} test --", methodName);
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
try {
final int unAckedMessagesBufferSize = 500;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 1500;
// receiver consumes messages in iteration after acknowledging broker
final int totalReceiveIteration = totalProducedMsgs / unAckedMessagesBufferSize;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic").create();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
int totalReceivedMessages = 0;
// (2) Receive Messages
for (int j = 0; j < totalReceiveIteration; j++) {
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// client must receive number of messages = unAckedMessagesBufferSize rather all produced messages
assertEquals(messages.size(), unAckedMessagesBufferSize);
// start acknowledging messages
messages.forEach(m -> {
try {
consumer.acknowledge(m);
} catch (PulsarClientException e) {
fail("ack failed", e);
}
});
totalReceivedMessages += messages.size();
}
// total received-messages should match to produced messages
assertEquals(totalReceivedMessages, totalProducedMsgs);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
/**
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
*
*/
@Test(dataProvider = "ackReceiptEnabled")
public void testMultipleSharedConsumerBlockingWithUnActedMessages(boolean ackReceiptEnabled) {
log.info("-- Starting {} test --", methodName);
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
try {
final int maxUnackedMessages = 20;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 100;
int totalReceiveMessages = 0;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic").create();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
totalReceiveMessages++;
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// client must receive number of messages = unAckedMessagesBufferSize rather all produced messages
assertEquals(messages.size(), maxUnackedMessages);
// (3.1) Consumer2 will start consuming messages without ack: it should stop after maxUnackedMessages
messages.clear();
for (int i = 0; i < totalProducedMsgs - maxUnackedMessages; i++) {
msg = consumer2.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
totalReceiveMessages++;
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
assertEquals(messages.size(), maxUnackedMessages);
// (3.2) ack for all maxUnackedMessages
messages.forEach(m -> {
try {
consumer2.acknowledge(m);
} catch (PulsarClientException e) {
fail("shouldn't have failed ", e);
}
});
// (4) Consumer2 consumer and ack: so it should consume all remaining messages
messages.clear();
for (int i = 0; i < totalProducedMsgs - (2 * maxUnackedMessages); i++) {
msg = consumer2.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
totalReceiveMessages++;
consumer2.acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// verify total-consumer messages = total-produce messages
assertEquals(totalProducedMsgs, totalReceiveMessages);
producer.close();
consumer1.close();
consumer2.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test
public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() {
log.info("-- Starting {} test --", methodName);
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
int totalReceiveMsg = 0;
try {
final int receiverQueueSize = 20;
final int totalProducedMsgs = 100;
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// (2) wait for consumer to receive messages
Awaitility.await().untilAsserted(() -> assertEquals(consumer.numMessagesInQueue(), receiverQueueSize));
// (3) wait for messages to expire, we should've received more
Awaitility.await().untilAsserted(() -> assertEquals(consumer.numMessagesInQueue(), receiverQueueSize));
for (int i = 0; i < totalProducedMsgs; i++) {
Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
consumer.acknowledge(msg);
totalReceiveMsg++;
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// total received-messages should match to produced messages
assertEquals(totalProducedMsgs, totalReceiveMsg);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test(dataProvider = "ackReceiptEnabled")
public void testUnackBlockRedeliverMessages(boolean ackReceiptEnabled) {
log.info("-- Starting {} test --", methodName);
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
int totalReceiveMsg = 0;
try {
final int unAckedMessagesBufferSize = 20;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 100;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic").create();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
totalReceiveMsg++;
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
consumer.redeliverUnacknowledgedMessages();
Thread.sleep(1000);
int alreadyConsumedMessages = messages.size();
messages.clear();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
consumer.acknowledge(msg);
totalReceiveMsg++;
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// total received-messages should match to produced messages
assertEquals(totalProducedMsgs + alreadyConsumedMessages, totalReceiveMsg);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test(dataProvider = "batchAndAckReceipt")
public void testUnackedBlockAtBatch(int batchMessageDelayMs, boolean ackReceiptEnabled) {
log.info("-- Starting {} test --", methodName);
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
try {
final int maxUnackedMessages = 20;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 100;
int totalReceiveMessages = 0;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
ProducerBuilder<byte[]> producerBuidler = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic");
if (batchMessageDelayMs != 0) {
producerBuidler.enableBatching(true);
producerBuidler.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerBuidler.batchingMaxMessages(5);
} else {
producerBuidler.enableBatching(false);
}
Producer<byte[]> producer = producerBuidler.create();
List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
futures.add(producer.sendAsync(message.getBytes()));
}
FutureUtil.waitForAll(futures).get();
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
totalReceiveMessages++;
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// should be blocked due to unack-msgs and should not consume all msgs
assertNotEquals(messages.size(), totalProducedMsgs);
// ack for all maxUnackedMessages
messages.forEach(m -> {
try {
consumer1.acknowledge(m);
} catch (PulsarClientException e) {
fail("shouldn't have failed ", e);
}
});
// (3) Consumer consumes and ack: so it should consume all remaining messages
messages.clear();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
totalReceiveMessages++;
consumer1.acknowledgeAsync(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// verify total-consumer messages = total-produce messages
assertEquals(totalProducedMsgs, totalReceiveMessages);
producer.close();
consumer1.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
/**
* Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages
*
*
*/
@Test
public void testBlockUnackConsumerAckByDifferentConsumer() {
log.info("-- Starting {} test --", methodName);
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
try {
final int maxUnackedMessages = 20;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 100;
int totalReceiveMessages = 0;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic").create();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]> msg;
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages.add(msg);
totalReceiveMessages++;
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
assertEquals(messages.size(), maxUnackedMessages); // consumer1
// (3) ack for all UnackedMessages from consumer2
messages.forEach(m -> {
try {
consumer2.acknowledge(m);
} catch (PulsarClientException e) {
fail("shouldn't have failed ", e);
}
});
// (4) consumer1 will consumer remaining msgs and consumer2 will ack those messages
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
totalReceiveMessages++;
consumer2.acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer2.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
totalReceiveMessages++;
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// verify total-consumer messages = total-produce messages
assertEquals(totalProducedMsgs, totalReceiveMessages);
producer.close();
consumer1.close();
consumer2.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test
public void testEnabledChecksumClient() throws Exception {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 10;
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/my-topic1");
final int batchMessageDelayMs = 300;
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5);
Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < totalMsg; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
/**
* It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets
* blocked due to unacked messsages
*
* Usecase: produce message with 10ms interval: so, consumer can consume only 10 messages without acking
*
*/
@Test
public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause() {
log.info("-- Starting {} test --", methodName);
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
try {
final int unAckedMessagesBufferSize = 10;
final int receiverQueueSize = 20;
final int totalProducedMsgs = 20;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic").create();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
Thread.sleep(10);
}
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg;
List<Message<byte[]>> messages1 = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages1.add(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// client should not receive all produced messages and should be blocked due to unack-messages
assertEquals(messages1.size(), unAckedMessagesBufferSize);
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m ->
(MessageIdImpl) m.getMessageId()).collect(Collectors.toSet());
// (3) redeliver all consumed messages
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Thread.sleep(1000);
Set<MessageIdImpl> messages2 = Sets.newHashSet();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages2.add((MessageIdImpl) msg.getMessageId());
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
assertEquals(messages1.size(), messages2.size());
// (4) Verify: redelivered all previous unacked-consumed messages
messages2.removeAll(redeliveryMessages);
assertEquals(messages2.size(), 0);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
/**
* It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets
* blocked due to unacked messsages
*
* Usecase: Consumer starts consuming only after all messages have been produced. So, consumer consumes total
* receiver-queue-size number messages => ask for redelivery and receives all messages again.
*
*/
@Test
public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() {
log.info("-- Starting {} test --", methodName);
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
try {
final int unAckedMessagesBufferSize = 10;
final int receiverQueueSize = 20;
final int totalProducedMsgs = 50;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
// Only subscribe consumer
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic").create();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
producer.flush();
// (1.a) start consumer again
consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg;
List<Message<byte[]>> messages1 = Lists.newArrayList();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages1.add(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
// client should not receive all produced messages and should be blocked due to unack-messages
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m ->
(MessageIdImpl) m.getMessageId()).collect(Collectors.toSet());
// (3) redeliver all consumed messages
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Thread.sleep(1000);
Set<MessageIdImpl> messages2 = Sets.newHashSet();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages2.add((MessageIdImpl) msg.getMessageId());
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
assertEquals(messages1.size(), messages2.size());
// (4) Verify: redelivered all previous unacked-consumed messages
messages2.removeAll(redeliveryMessages);
assertEquals(messages2.size(), 0);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test
public void testPriorityConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
@Cleanup
PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer3 = newPulsarClient1.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
@Cleanup
PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer4 = newPulsarClient2.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(2).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
.create();
List<Future<MessageId>> futures = Lists.newArrayList();
// Asynchronously produce messages
for (int i = 0; i < 15; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}
log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}
for (int i = 0; i < 20; i++) {
consumer1.receive(RECEIVE_TIMEOUT_SHORT_MILLIS, TimeUnit.MILLISECONDS);
consumer2.receive(RECEIVE_TIMEOUT_SHORT_MILLIS, TimeUnit.MILLISECONDS);
}
/**
* a. consumer1 and consumer2 now has more permits (as received and sent more permits) b. try to produce more
* messages: which will again distribute among consumer1 and consumer2 and should not dispatch to consumer4
*
*/
for (int i = 0; i < 5; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}
Assert.assertNull(consumer4.receive(RECEIVE_TIMEOUT_SHORT_MILLIS, TimeUnit.MILLISECONDS));
// Asynchronously acknowledge upto and including the last message
producer.close();
consumer1.close();
consumer2.close();
consumer3.close();
consumer4.close();
log.info("-- Exiting {} test --", methodName);
}
/**
* <pre>
* Verifies Dispatcher dispatches messages properly with shared-subscription consumers with combination of blocked
* and unblocked consumers.
*
* 1. Dispatcher will have 5 consumers : c1, c2, c3, c4, c5.
* Out of which : c1,c2,c4,c5 will be blocked due to MaxUnackedMessages limit.
* 2. So, dispatcher should moves round-robin and make sure it delivers unblocked consumer : c3
* </pre>
*
* @throws Exception
*/
@Test(timeOut = 30000)
public void testSharedSamePriorityConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
final int queueSize = 5;
int maxUnAckMsgs = pulsar.getConfiguration().getMaxConcurrentLookupRequest();
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(queueSize);
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> c1 = newPulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
@Cleanup
PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> c2 = newPulsarClient1.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
List<Future<MessageId>> futures = Lists.newArrayList();
// Asynchronously produce messages
final int totalPublishMessages = 500;
for (int i = 0; i < totalPublishMessages; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}
log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}
List<Message<byte[]>> messages = Lists.newArrayList();
// let consumer1 and consumer2 consume messages up to the queue will be full
for (int i = 0; i < totalPublishMessages; i++) {
Message<byte[]> msg = c1.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}
for (int i = 0; i < totalPublishMessages; i++) {
Message<byte[]> msg = c2.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}
Assert.assertEquals(queueSize * 2, messages.size());
// create new consumers with the same priority
@Cleanup
PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> c3 = newPulsarClient2.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
@Cleanup
PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> c4 = newPulsarClient3.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
@Cleanup
PulsarClient newPulsarClient4 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> c5 = newPulsarClient4.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
// c1 and c2 are blocked: so, let c3, c4 and c5 consume rest of the messages
for (int i = 0; i < totalPublishMessages; i++) {
Message<byte[]> msg = c4.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}
for (int i = 0; i < totalPublishMessages; i++) {
Message<byte[]> msg = c5.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}
for (int i = 0; i < totalPublishMessages; i++) {
Message<byte[]> msg = c3.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
c3.acknowledge(msg);
} else {
break;
}
}
// total messages must be consumed by all consumers
Assert.assertEquals(messages.size(), totalPublishMessages);
// Asynchronously acknowledge upto and including the last message
producer.close();
c1.close();
c2.close();
c3.close();
c4.close();
c5.close();
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "ackReceiptEnabled", groups = "quarantine")
public void testRedeliveryFailOverConsumer(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
final int receiverQueueSize = 10;
// Only subscribe consumer
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover)
.isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
.create();
// (1) First round to produce-consume messages
int consumeMsgInParts = 4;
for (int i = 0; i < receiverQueueSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
producer.flush();
// (1.a) consume first consumeMsgInParts msgs and trigger redeliver
Message<byte[]> msg;
List<Message<byte[]>> messages1 = Lists.newArrayList();
for (int i = 0; i < consumeMsgInParts; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages1.add(msg);
consumer.acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
assertEquals(messages1.size(), consumeMsgInParts);
consumer.redeliverUnacknowledgedMessages();
Thread.sleep(1000L);
// (1.b) consume second consumeMsgInParts msgs and trigger redeliver
messages1.clear();
for (int i = 0; i < consumeMsgInParts; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages1.add(msg);
consumer.acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
assertEquals(messages1.size(), consumeMsgInParts);
consumer.redeliverUnacknowledgedMessages();
Thread.sleep(1000L);
// (2) Second round to produce-consume messages
for (int i = 0; i < receiverQueueSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
producer.flush();
int remainingMsgs = (2 * receiverQueueSize) - (2 * consumeMsgInParts);
messages1.clear();
for (int i = 0; i < remainingMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
messages1.add(msg);
consumer.acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
assertEquals(messages1.size(), remainingMsgs);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 10000)
public void testFailReceiveAsyncOnConsumerClose() throws Exception {
log.info("-- Starting {} test --", methodName);
// (1) simple consumers
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/failAsyncReceive-1").subscriptionName("my-subscriber-name")
.subscribe();
consumer.close();
// receive messages
try {
consumer.receiveAsync().get(1, TimeUnit.SECONDS);
fail("it should have failed because consumer is already closed");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.AlreadyClosedException);
}
// (2) Partitioned-consumer
int numPartitions = 4;
TopicName topicName = TopicName.get("persistent://my-property/my-ns/failAsyncReceive-2");
admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
Consumer<byte[]> partitionedConsumer = pulsarClient.newConsumer().topic(topicName.toString())
.subscriptionName("my-partitioned-subscriber").subscribe();
partitionedConsumer.close();
// receive messages
try {
partitionedConsumer.receiveAsync().get(1, TimeUnit.SECONDS);
fail("it should have failed because consumer is already closed");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.AlreadyClosedException);
}
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testECDSAEncryption() throws Exception {
log.info("-- Starting {} test --", methodName);
String topicName = "persistent://my-property/my-ns/myecdsa-topic1-" + System.currentTimeMillis();
class EncKeyReader implements CryptoKeyReader {
final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
}
final int totalMsg = 10;
Set<String> messageSet = Sets.newHashSet();
Consumer<byte[]> cryptoConsumer = pulsarClient.newConsumer()
.topic(topicName).subscriptionName("my-subscriber-name")
.cryptoKeyReader(new EncKeyReader()).subscribe();
Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
.topic(topicName).subscriptionName("my-subscriber-name-normal")
.subscribe();
Producer<byte[]> cryptoProducer = pulsarClient.newProducer()
.topic(topicName).addEncryptionKey("client-ecdsa.pem")
.cryptoKeyReader(new EncKeyReader()).create();
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
cryptoProducer.send(message.getBytes());
}
Message<byte[]> msg;
msg = normalConsumer.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
// should not able to read message using normal message.
assertNull(msg);
for (int i = 0; i < totalMsg; i++) {
msg = cryptoConsumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
cryptoConsumer.acknowledgeCumulative(msg);
cryptoConsumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testRSAEncryption() throws Exception {
log.info("-- Starting {} test --", methodName);
String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
class EncKeyReader implements CryptoKeyReader {
final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
}
final int totalMsg = 10;
Set<String> messageSet = Sets.newHashSet();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
.subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
.topic(topicName).subscriptionName("my-subscriber-name-normal")
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
.addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
.addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
for (int i = totalMsg; i < totalMsg * 2; i++) {
String message = "my-message-" + i;
producer2.send(message.getBytes());
}
MessageImpl<byte[]> msg;
msg = (MessageImpl<byte[]>) normalConsumer.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
// should not able to read message using normal message.
assertNull(msg);
for (int i = 0; i < totalMsg * 2; i++) {
msg = (MessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
// verify that encrypted message contains encryption-context
msg.getEncryptionCtx()
.orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testCryptoWithChunking() throws Exception {
final String topic = "persistent://my-property/my-ns/testCryptoWithChunking" + System.currentTimeMillis();
final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
this.conf.setMaxMessageSize(1000);
@Cleanup
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic)
.enableChunking(true)
.enableBatching(false)
.addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader(ecdsaPublicKeyFile)
.create();
byte[] data = RandomUtils.nextBytes(5100);
MessageId id = producer1.send(data);
log.info("Message Id={}", id);
MessageImpl<byte[]> message;
message = (MessageImpl<byte[]>) consumer1.receive();
Assert.assertEquals(message.getData(), data);
Assert.assertEquals(message.getEncryptionCtx().get().getKeys().size(), 1);
}
@Test
public void testDefaultCryptoKeyReader() throws Exception {
final String topic = "persistent://my-property/my-ns/default-crypto-key-reader" + System.currentTimeMillis();
final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
final String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K";
final String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K";
final String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem";
final String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem";
final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";
final int numMsg = 10;
Map<String, String> privateKeyFileMap = Maps.newHashMap();
privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile);
privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile);
Map<String, String> privateKeyDataMap = Maps.newHashMap();
privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData);
privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub2")
.defaultCryptoKeyReader(ecdsaPrivateKeyData).subscribe();
Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub3")
.defaultCryptoKeyReader(privateKeyFileMap).subscribe();
Consumer<byte[]> consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub4")
.defaultCryptoKeyReader(privateKeyDataMap).subscribe();
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader(ecdsaPublicKeyFile).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader(ecdsaPublicKeyData).create();
for (int i = 0; i < numMsg; i++) {
producer1.send(("my-message-" + i).getBytes());
}
for (int i = numMsg; i < numMsg * 2; i++) {
producer2.send(("my-message-" + i).getBytes());
}
producer1.close();
producer2.close();
for (Consumer<byte[]> consumer : Lists.newArrayList(consumer1, consumer2)) {
MessageImpl<byte[]> msg = null;
for (int i = 0; i < numMsg * 2; i++) {
msg = (MessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
// verify that encrypted message contains encryption-context
msg.getEncryptionCtx().orElseThrow(
() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
assertEquals(new String(msg.getData()), "my-message-" + i);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
}
consumer1.unsubscribe();
consumer2.unsubscribe();
Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
.defaultCryptoKeyReader(rsaPublicKeyFile).create();
Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
.defaultCryptoKeyReader(rsaPublicKeyData).create();
for (int i = numMsg * 2; i < numMsg * 3; i++) {
producer3.send(("my-message-" + i).getBytes());
}
for (int i = numMsg * 3; i < numMsg * 4; i++) {
producer4.send(("my-message-" + i).getBytes());
}
producer3.close();
producer4.close();
for (Consumer<byte[]> consumer : Lists.newArrayList(consumer3, consumer4)) {
MessageImpl<byte[]> msg = null;
for (int i = 0; i < numMsg * 4; i++) {
msg = (MessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
// verify that encrypted message contains encryption-context
msg.getEncryptionCtx().orElseThrow(
() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
assertEquals(new String(msg.getData()), "my-message-" + i);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
}
consumer3.unsubscribe();
consumer4.unsubscribe();
}
@Test(groups = "quarantine")
public void testRedeliveryOfFailedMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
final String encryptionKeyName = "client-rsa.pem";
final String encryptionKeyVersion = "1.0";
Map<String, String> metadata = Maps.newHashMap();
metadata.put("version", encryptionKeyVersion);
class EncKeyReader implements CryptoKeyReader {
final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
keyInfo.setMetadata(metadata);
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
keyInfo.setMetadata(metadata);
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
}
class InvalidKeyReader implements CryptoKeyReader {
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
return null;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
return null;
}
}
/*
* Redelivery functionality guarantees that customer will get a chance to process the message again.
* In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
*
* For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
* - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
*
* In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
*
* Consumer 1 - Can decrypt message
* Consumer 2 - Has invalid Reader configured.
* Consumer 3 - Has no reader configured.
*
*/
String topicName = "persistent://my-property/my-ns/myrsa-topic1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
.enableBatching(false)
.cryptoKeyReader(new EncKeyReader()).create();
// Creates new client connection
@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);
Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
.subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
.subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
// Creates new client connection
@Cleanup
PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
.subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
.subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
// Creates new client connection
@Cleanup
PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
.subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
int numberOfMessages = 100;
String message = "my-message";
Set<String> messages = new HashSet<>(); // Since messages are in random order
for (int i = 0; i<numberOfMessages; i++) {
producer.send((message + i).getBytes());
}
// Consuming from consumer 2 and 3
// no message should be returned since they can't decrypt the message
Message m = consumer2.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNull(m);
m = consumer3.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNull(m);
// delay to reduce flakiness
Thread.sleep(1000L);
for (int i = 0; i<numberOfMessages; i++) {
// All messages would be received by consumer 1
m = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNotNull(m, "reading message index #" + i + " failed");
messages.add(new String(m.getData()));
consumer1.acknowledge(m);
}
// Consuming from consumer 2 and 3 again just to be sure
// no message should be returned since they can't decrypt the message
m = consumer2.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNull(m);
m = consumer3.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNull(m);
// checking if all messages were received
for (int i = 0; i<numberOfMessages; i++) {
assertTrue(messages.contains((message + i)));
}
consumer1.close();
consumer2.close();
consumer3.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testEncryptionFailure() throws Exception {
log.info("-- Starting {} test --", methodName);
class EncKeyReader implements CryptoKeyReader {
final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}
}
final int totalMsg = 10;
MessageImpl<byte[]> msg;
Set<String> messageSet = Sets.newHashSet();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
// 1. Invalid key name
try {
pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
Assert.fail("Producer creation should not suceed if failing to read key");
} catch (Exception e) {
// ok
}
// 2. Producer with valid key name
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.addEncryptionKey("client-rsa.pem")
.cryptoKeyReader(new EncKeyReader())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// 3. KeyReder is not set by consumer
// Receive should fail since key reader is not setup
msg = (MessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertNull(msg, "Receive should have failed with no keyreader");
// 4. Set consumer config to consume even if decryption fails
consumer.close();
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int msgNum = 0;
try {
// Receive should proceed and deliver encrypted message
msg = (MessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + msgNum++;
Assert.assertNotEquals(receivedMessage, expectedMessage, "Received encrypted message " + receivedMessage
+ " should not match the expected message " + expectedMessage);
consumer.acknowledgeCumulative(msg);
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Failed to receive message even aftet ConsumerCryptoFailureAction.CONSUME is set.");
}
// 5. Set keyreader and failure action
consumer.close();
// Set keyreader
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
.cryptoKeyReader(new EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
for (int i = msgNum; i < totalMsg - 1; i++) {
msg = (MessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
// verify that encrypted message contains encryption-context
msg.getEncryptionCtx()
.orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
// 6. Set consumer config to discard if decryption fails
consumer.close();
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
// Receive should proceed and discard encrypted messages
msg = (MessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertNull(msg, "Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
log.info("-- Starting {} test --", methodName);
final String encryptionKeyName = "client-rsa.pem";
final String encryptionKeyVersion = "1.0";
Map<String, String> metadata = Maps.newHashMap();
metadata.put("version", encryptionKeyVersion);
class EncKeyReader implements CryptoKeyReader {
final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
keyInfo.setMetadata(metadata);
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
keyInfo.setMetadata(metadata);
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
}
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
.cryptoKeyReader(new EncKeyReader()).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.subscribe();
String message = "my-message";
producer.send(message.getBytes());
TopicMessageImpl<byte[]> msg = (TopicMessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = decryptMessage(msg, encryptionKeyName, new EncKeyReader());
assertEquals(message, receivedMessage);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
private String decryptMessage(TopicMessageImpl<byte[]> msg,
String encryptionKeyName,
CryptoKeyReader reader) throws Exception {
Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
Assert.assertTrue(ctx.isPresent());
EncryptionContext encryptionCtx = ctx
.orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
Map<String, EncryptionKey> keys = encryptionCtx.getKeys();
assertEquals(keys.size(), 1);
EncryptionKey encryptionKey = keys.get(encryptionKeyName);
byte[] dataKey = encryptionKey.getKeyValue();
Map<String, String> metadata = encryptionKey.getMetadata();
String version = metadata.get("version");
assertEquals(version, "1.0");
CompressionType compressionType = encryptionCtx.getCompressionType();
int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
byte[] encrParam = encryptionCtx.getParam();
String encAlgo = encryptionCtx.getAlgorithm();
int batchSize = encryptionCtx.getBatchSize().orElse(0);
ByteBuffer payloadBuf = ByteBuffer.wrap(msg.getData());
// try to decrypt use default MessageCryptoBc
MessageCrypto<MessageMetadata, MessageMetadata> crypto =
new MessageCryptoBc("test", false);
MessageMetadata messageMetadata = new MessageMetadata()
.setEncryptionParam(encrParam)
.setProducerName("test")
.setSequenceId(123)
.setPublishTime(12333453454L)
.setCompression(CompressionCodecProvider.convertToWireProtocol(compressionType))
.setUncompressedSize(uncompressedSize);
messageMetadata.addEncryptionKey()
.setKey(encryptionKeyName)
.setValue(dataKey);
if (encAlgo != null) {
messageMetadata.setEncryptionAlgo(encAlgo);
}
ByteBuffer decryptedPayload = ByteBuffer.allocate(crypto.getMaxOutputSize(payloadBuf.remaining()));
crypto.decrypt(() -> messageMetadata, payloadBuf, decryptedPayload, reader);
// try to uncompress
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
ByteBuf uncompressedPayload = codec.decode(Unpooled.wrappedBuffer(decryptedPayload), uncompressedSize);
if (batchSize > 0) {
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
uncompressedPayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
singleMessageMetadata, 0, batchSize);
}
byte[] data = new byte[uncompressedPayload.readableBytes()];
uncompressedPayload.readBytes(data);
uncompressedPayload.release();
return new String(data);
}
@Test
public void testConsumerSubscriptionInitialize() throws Exception {
log.info("-- Starting {} test --", methodName);
String topicName = "persistent://my-property/my-ns/test-subscription-initialize-topic";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
// 1, produce 5 messages
for (int i = 0; i < 5; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}
// 2, create consumer
Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe();
Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-latest")
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe();
Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// 3, produce 5 messages more
for (int i = 5; i < 10; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}
// 4, verify consumer get right message.
assertEquals(defaultConsumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getData(), "my-message-5".getBytes());
assertEquals(latestConsumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getData(), "my-message-5".getBytes());
assertEquals(earliestConsumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getData(), "my-message-0".getBytes());
defaultConsumer.close();
latestConsumer.close();
earliestConsumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testMultiTopicsConsumerImplPauseForPartitionNumberChange() throws Exception {
log.info("-- Starting {} test --", methodName);
String topicName = "persistent://my-property/my-ns/partition-topic";
admin.topics().createPartitionedTopic(topicName, 1);
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.autoUpdatePartitionsInterval(2, TimeUnit.SECONDS)
.create();
// 1. produce 5 messages
for (int i = 0; i < 5; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes(UTF_8));
}
final int receiverQueueSize = 1;
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(receiverQueueSize)
.autoUpdatePartitionsInterval(2, TimeUnit.SECONDS)
.subscriptionName("test-multi-topic-consumer").subscribe();
int counter = 0;
for (; counter < 5 - receiverQueueSize; counter ++) {
assertEquals(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getData(), ("my-message-" + counter).getBytes());
}
// 2. pause multi-topic consumer
consumer.pause();
// 3. update partition
admin.topics().updatePartitionedTopic(topicName, 3);
// 4. wait for client to update partitions
Awaitility.await().until(() -> ((MultiTopicsConsumerImpl) consumer).getConsumers().size() <= 1);
// 5. produce 5 more messages
for (int i = 5; i < 10; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}
// 6. empty receiver queue
for (int i = 0; i < receiverQueueSize; i++) {
assertEquals(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getData(),
("my-message-" + counter++).getBytes());
}
// 7. should not consume any messages
assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS));
// 8. resume multi-topic consumer
consumer.resume();
// 9. continue to consume
while(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS) != null) {
counter++;
}
assertEquals(counter, 10);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testMultiTopicsConsumerImplPauseForManualSubscription() throws Exception {
log.info("-- Starting {} test --", methodName);
String topicNameBase = "persistent://my-property/my-ns/my-topic-";
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topicNameBase + "1")
.enableBatching(false)
.create();
Producer<byte[]> producer2 = pulsarClient.newProducer()
.topic(topicNameBase + "2")
.enableBatching(false)
.create();
Producer<byte[]> producer3 = pulsarClient.newProducer()
.topic(topicNameBase + "3")
.enableBatching(false)
.create();
// 1. produce 5 messages per topic
for (int i = 0; i < 5; i++) {
final String message = "my-message-" + i;
producer1.send(message.getBytes(UTF_8));
producer2.send(message.getBytes(UTF_8));
producer3.send(message.getBytes(UTF_8));
}
int receiverQueueSize = 1;
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topics(Lists.newArrayList(topicNameBase + "1", topicNameBase + "2"))
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(receiverQueueSize)
.subscriptionName("test-multi-topic-consumer")
.subscribe();
int counter = 0;
for (; counter < 2 * (5 - receiverQueueSize); counter++) {
assertThat(new String(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getData(), UTF_8))
.startsWith("my-message-");
}
// 2. pause multi-topic consumer
consumer.pause();
// 3. manually add the third consumer
((MultiTopicsConsumerImpl)consumer).subscribeAsync(topicNameBase + "3", true).join();
// 4. produce 5 more messages per topic
for (int i = 5; i < 10; i++) {
final String message = "my-message-" + i;
producer1.send(message.getBytes(UTF_8));
producer2.send(message.getBytes(UTF_8));
producer3.send(message.getBytes(UTF_8));
}
// 5. empty receiver queues
for (int i = 0; i < 2 * receiverQueueSize; i++) {
assertThat(new String(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getData(), UTF_8))
.startsWith("my-message-");
counter++;
}
// 6. should not consume any messages
Awaitility.await().untilAsserted(() -> assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)));
// 7. resume multi-topic consumer
consumer.resume();
// 8. continue to consume
while(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS) != null) {
counter++;
}
assertEquals(counter, 30);
producer1.close();
producer2.close();
producer3.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testFlushBatchEnabled() throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/test-flush-enabled")
.subscriptionName("my-subscriber-name").subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/test-flush-enabled")
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.batchingMaxMessages(10000);
try (Producer<byte[]> producer = producerBuilder.create()) {
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.sendAsync(message.getBytes());
}
producer.flush();
}
Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testFlushBatchDisabled() throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/test-flush-disabled")
.startMessageIdInclusive()
.subscriptionName("my-subscriber-name").subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/test-flush-disabled")
.enableBatching(false);
try (Producer<byte[]> producer = producerBuilder.create()) {
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.sendAsync(message.getBytes());
}
producer.flush();
}
Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
// Issue 1452: https://github.com/apache/pulsar/issues/1452
// reachedEndOfTopic should be called only once if a topic has been terminated before subscription
@SuppressWarnings("rawtypes")
@Test
public void testReachedEndOfTopic() throws Exception
{
String topicName = "persistent://my-property/my-ns/testReachedEndOfTopic";
Producer producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false).create();
producer.close();
admin.topics().terminateTopicAsync(topicName).get();
CountDownLatch latch = new CountDownLatch(2);
@SuppressWarnings("unchecked") Consumer consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("my-subscriber-name")
.messageListener(new MessageListener()
{
@Override
public void reachedEndOfTopic(Consumer consumer)
{
log.info("called reachedEndOfTopic {}", methodName);
latch.countDown();
}
@Override
public void received(Consumer consumer, Message message)
{
// do nothing
}
})
.subscribe();
assertFalse(latch.await(1, TimeUnit.SECONDS));
assertEquals(latch.getCount(), 1);
consumer.close();
}
/**
* This test verifies that broker activates fail-over consumer by considering priority-level as well.
*
* <pre>
* 1. Start two failover consumer with same priority level, broker selects consumer based on name-sorting (consumer1).
* 2. Switch non-active consumer to active (consumer2): by giving it higher priority
* Partitioned-topic with 9 partitions:
* 1. C1 (priority=1)
* 2. C2,C3,C4 (priority=0)
* So, broker should evenly distribute C2,C3,C4 active consumers among 9 partitions.
* </pre>
*
* @throws Exception
*/
@Test
public void testFailOverConsumerPriority() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/priority-topic";
final String subscriptionName = "my-sub";
final int noOfPartitions = 9;
// create partitioned topic
admin.topics().createPartitionedTopic(topicName, noOfPartitions);
// Only subscribe consumer
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.consumerName("aaa").subscriptionType(SubscriptionType.Failover)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).priorityLevel(1).subscribe();
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).consumerName("bbb1").subscriptionType(SubscriptionType.Failover)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).priorityLevel(1);
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
AtomicInteger consumer1Count = new AtomicInteger(0);
admin.topics().getPartitionedStats(topicName, true).getPartitions().forEach((p, stats) -> {
String activeConsumerName = stats.getSubscriptions().entrySet().iterator().next().getValue().getActiveConsumerName();
if (activeConsumerName.equals("aaa")) {
consumer1Count.incrementAndGet();
}
});
// validate even distribution among two consumers
assertNotEquals(consumer1Count, noOfPartitions);
consumer2.close();
consumer2 = consumerBuilder.priorityLevel(0).subscribe();
Consumer<byte[]> consumer3 = consumerBuilder.consumerName("bbb2").priorityLevel(0).subscribe();
Consumer<byte[]> consumer4 = consumerBuilder.consumerName("bbb3").priorityLevel(0).subscribe();
Consumer<byte[]> consumer5 = consumerBuilder.consumerName("bbb4").priorityLevel(1).subscribe();
Integer evenDistributionCount = noOfPartitions / 3;
retryStrategically((test) -> {
try {
Map<String, Integer> subsCount = Maps.newHashMap();
admin.topics().getPartitionedStats(topicName, true).getPartitions().forEach((p, stats) -> {
String activeConsumerName = stats.getSubscriptions().entrySet().iterator().next()
.getValue().getActiveConsumerName();
subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
});
return subsCount.size() == 3 && subsCount.get("bbb1").equals(evenDistributionCount)
&& subsCount.get("bbb2").equals(evenDistributionCount)
&& subsCount.get("bbb3").equals(evenDistributionCount);
} catch (PulsarAdminException e) {
// Ok
}
return false;
}, 5, 100);
Map<String, Integer> subsCount = Maps.newHashMap();
admin.topics().getPartitionedStats(topicName, true).getPartitions().forEach((p, stats) -> {
String activeConsumerName = stats.getSubscriptions().entrySet().iterator().next().getValue().getActiveConsumerName();
subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
});
assertEquals(subsCount.size(), 3);
assertEquals(subsCount.get("bbb1"), evenDistributionCount);
assertEquals(subsCount.get("bbb2"), evenDistributionCount);
assertEquals(subsCount.get("bbb3"), evenDistributionCount);
consumer1.close();
consumer2.close();
consumer3.close();
consumer4.close();
consumer5.close();
log.info("-- Exiting {} test --", methodName);
}
/**
* This test verifies Producer and Consumer of PartitionedTopic with 1 partition works well.
*
* <pre>
* 1. create producer/consumer with both original name and PARTITIONED_TOPIC_SUFFIX.
* 2. verify producer/consumer could produce/consume messages from same underline persistent topic.
* </pre>
*
* @throws Exception
*/
@Test
public void testPartitionedTopicWithOnePartition() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/one-partitioned-topic";
final String subscriptionName = "my-sub-";
// create partitioned topic
admin.topics().createPartitionedTopic(topicName, 1);
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 1);
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName + 1)
.consumerName("aaa")
.subscribe();
log.info("Consumer1 created. topic: {}", consumer1.getTopic());
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName + PARTITIONED_TOPIC_SUFFIX + 0)
.subscriptionName(subscriptionName + 2)
.consumerName("bbb")
.subscribe();
log.info("Consumer2 created. topic: {}", consumer2.getTopic());
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();
log.info("Producer1 created. topic: {}", producer1.getTopic());
@Cleanup
Producer<byte[]> producer2 = pulsarClient.newProducer()
.topic(topicName + PARTITIONED_TOPIC_SUFFIX + 0)
.enableBatching(false)
.create();
log.info("Producer2 created. topic: {}", producer2.getTopic());
final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
producer1.newMessage()
.value(("one-partitioned-topic-value-producer1-" + i).getBytes(UTF_8))
.send();
producer2.newMessage()
.value(("one-partitioned-topic-value-producer2-" + i).getBytes(UTF_8))
.send();
}
for (int i = 0; i < numMessages * 2; i++) {
Message<byte[]> msg = consumer1.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
assertNotNull(msg);
log.info("Consumer1 Received message '{}'.", new String(msg.getValue(), UTF_8));
msg = consumer2.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS);
assertNotNull(msg);
log.info("Consumer2 Received message '{}'.", new String(msg.getValue(), UTF_8));
}
assertNull(consumer1.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS));
assertNull(consumer2.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, TimeUnit.MILLISECONDS));
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "variationsForExpectedPos")
public void testConsumerStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
throws Exception {
final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos";
final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset
final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(batching)
.create();
CountDownLatch latch = new CountDownLatch(numOfMessages);
final AtomicReference<MessageId> resetPos = new AtomicReference<>();
for (int i = 0; i < numOfMessages; i++) {
final int j = i;
producer.sendAsync(String.format("msg num %d", i).getBytes())
.thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
.whenComplete((p, e) -> {
if (e != null) {
fail("send msg failed due to " + e.getMessage());
} else {
log.info("send msg with id {}", p.getRight());
if (p.getLeft() == resetIndex) {
resetPos.set(p.getRight());
}
}
latch.countDown();
});
}
latch.await();
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic(topicName);
if (startInclusive) {
consumerBuilder.startMessageIdInclusive();
}
Consumer<byte[]> consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe();
consumer.seek(resetPos.get());
log.info("reset cursor to {}", resetPos.get());
Set<String> messageSet = Sets.newHashSet();
for (int i = firstMessage; i < numOfMessages; i++) {
Message<byte[]> message = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(message.getData());
String expectedMessage = String.format("msg num %d", i);
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
assertEquals(((ConsumerImpl) consumer).numMessagesInQueue(), 0);
// Processed messages should be the number of messages in the range: [FirstResetMessage..TotalNumOfMessages]
assertEquals(messageSet.size(), numOfMessages - firstMessage);
consumer.close();
producer.close();
}
/**
* It verifies that message failure successfully releases semaphore and client successfully receives
* InvalidMessageException.
*
* @throws Exception
*/
@Test
public void testReleaseSemaphoreOnFailMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
int maxPendingMessages = 10;
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().enableBatching(false)
.blockIfQueueFull(true).maxPendingMessages(maxPendingMessages)
.topic("persistent://my-property/my-ns/my-topic2");
Producer<byte[]> producer = producerBuilder.create();
List<Future<MessageId>> futures = Lists.newArrayList();
// Asynchronously produce messages
byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
for (int i = 0; i < maxPendingMessages + 10; i++) {
Future<MessageId> future = producer.sendAsync(message);
try {
future.get();
fail("should fail with InvalidMessageException");
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException.InvalidMessageException);
}
}
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 10000)
public void testReceiveAsyncCompletedWhenClosing() throws Exception {
final String topic = "persistent://my-property/my-ns/testCompletedWhenClosing";
final String partitionedTopic = "persistent://my-property/my-ns/testCompletedWhenClosing-partitioned";
final String errorMsg = "cleaning and closing the consumers";
BatchReceivePolicy batchReceivePolicy
= BatchReceivePolicy.builder().maxNumBytes(10 * 1024).maxNumMessages(10).timeout(-1, TimeUnit.SECONDS).build();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic).subscriptionName("my-subscriber-name")
.batchReceivePolicy(batchReceivePolicy).subscribe();
// 1) Test receiveAsync is interrupted
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() -> {
try {
new Thread(() -> {
try {
consumer.close();
} catch (PulsarClientException ignore) {
}
}).start();
consumer.receiveAsync().get();
Assert.fail("should be interrupted");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(errorMsg));
countDownLatch.countDown();
}
}).start();
countDownLatch.await();
// 2) Test batchReceiveAsync is interrupted
CountDownLatch countDownLatch2 = new CountDownLatch(1);
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
.topic(topic).subscriptionName("my-subscriber-name")
.batchReceivePolicy(batchReceivePolicy).subscribe();
new Thread(() -> {
try {
new Thread(() -> {
try {
consumer2.close();
} catch (PulsarClientException ignore) {
}
}).start();
consumer2.batchReceiveAsync().get();
Assert.fail("should be interrupted");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(errorMsg));
countDownLatch2.countDown();
}
}).start();
countDownLatch2.await();
// 3) Test partitioned topic batchReceiveAsync is interrupted
CountDownLatch countDownLatch3 = new CountDownLatch(1);
admin.topics().createPartitionedTopic(partitionedTopic, 3);
Consumer<String> partitionedTopicConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(partitionedTopic).subscriptionName("my-subscriber-name-partitionedTopic")
.batchReceivePolicy(batchReceivePolicy).subscribe();
new Thread(() -> {
try {
new Thread(() -> {
try {
partitionedTopicConsumer.close();
} catch (PulsarClientException ignore) {
}
}).start();
partitionedTopicConsumer.batchReceiveAsync().get();
Assert.fail("should be interrupted");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(errorMsg));
countDownLatch3.countDown();
}
}).start();
countDownLatch3.await();
}
@Test(timeOut = 20000)
public void testResetPosition() throws Exception {
final String topicName = "persistent://my-property/my-ns/testResetPosition";
final String subName = "my-sub";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName).subscriptionName(subName).subscribe();
for (int i = 0; i < 50; i++) {
producer.send("msg" + i);
}
Message<String> lastMsg = null;
for (int i = 0; i < 10; i++) {
lastMsg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNotNull(lastMsg);
consumer.acknowledge(lastMsg);
}
MessageIdImpl lastMessageId = (MessageIdImpl)lastMsg.getMessageId();
consumer.close();
producer.close();
admin.topics().resetCursor(topicName, subName, lastMsg.getMessageId());
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe();
Message<String> message = consumer2.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertEquals(message.getMessageId(), lastMsg.getMessageId());
consumer2.close();
admin.topics().resetCursor(topicName, subName, lastMsg.getMessageId(), true);
Consumer<String> consumer3 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe();
message = consumer3.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNotEquals(message.getMessageId(), lastMsg.getMessageId());
MessageIdImpl messageId = (MessageIdImpl)message.getMessageId();
assertEquals(messageId.getEntryId() - 1, lastMessageId.getEntryId());
consumer3.close();
admin.topics().resetCursorAsync(topicName, subName, lastMsg.getMessageId(), true).get(3, TimeUnit.SECONDS);
Consumer<String> consumer4 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe();
message = consumer4.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNotEquals(message.getMessageId(), lastMsg.getMessageId());
messageId = (MessageIdImpl)message.getMessageId();
assertEquals(messageId.getEntryId() - 1, lastMessageId.getEntryId());
consumer4.close();
admin.topics().resetCursorAsync(topicName, subName, lastMsg.getMessageId()).get(3, TimeUnit.SECONDS);
Consumer<String> consumer5 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).subscribe();
message = consumer5.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertEquals(message.getMessageId(), lastMsg.getMessageId());
consumer5.close();
}
@Test
public void testGetLastDisconnectedTimestamp() throws Exception {
final String topicName = "persistent://my-property/my-ns/testGetLastDisconnectedTimestamp";
final String subName = "my-sub";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName).subscriptionName(subName).subscribe();
Assert.assertEquals(producer.getLastDisconnectedTimestamp(), 0L);
Assert.assertEquals(consumer.getLastDisconnectedTimestamp(), 0L);
pulsar.close();
Assert.assertTrue(producer.getLastDisconnectedTimestamp() > 0);
Assert.assertTrue(consumer.getLastDisconnectedTimestamp() > 0);
}
@Test
public void testGetLastDisconnectedTimestampForPartitionedTopic() throws Exception {
final String topicName = "persistent://my-property/my-ns/testGetLastDisconnectedTimestampForPartitionedTopic";
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topicName, 3);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName).subscriptionName(subName).subscribe();
Assert.assertEquals(producer.getLastDisconnectedTimestamp(), 0L);
Assert.assertEquals(consumer.getLastDisconnectedTimestamp(), 0L);
pulsar.close();
Assert.assertTrue(producer.getLastDisconnectedTimestamp() > 0);
Assert.assertTrue(consumer.getLastDisconnectedTimestamp() > 0);
}
@Test
public void testGetStats() throws Exception {
final String topicName = "persistent://my-property/my-ns/testGetStats" + UUID.randomUUID();
final String subName = "my-sub";
final int receiveQueueSize = 100;
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();
ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).receiverQueueSize(receiveQueueSize).subscriptionName(subName).subscribe();
Assert.assertNull(consumer.getStats().getMsgNumInSubReceiverQueue());
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), 0);
for (int i = 0; i < receiveQueueSize; i++) {
producer.sendAsync("msg" + i);
}
//Give some time to consume
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), receiveQueueSize));
consumer.close();
producer.close();
}
@Test
public void testGetStatsForPartitionedTopic() throws Exception {
final String topicName = "persistent://my-property/my-ns/testGetStatsForPartitionedTopic";
final String subName = "my-sub";
final int receiveQueueSize = 100;
admin.topics().createPartitionedTopic(topicName, 3);
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();
MultiTopicsConsumerImpl<String> consumer = (MultiTopicsConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).receiverQueueSize(receiveQueueSize).subscriptionName(subName).subscribe();
Assert.assertEquals(consumer.getStats().getMsgNumInSubReceiverQueue().size(), 3);
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), 0);
consumer.getStats().getMsgNumInSubReceiverQueue()
.forEach((key, value) -> Assert.assertEquals((int) value, 0));
for (int i = 0; i < receiveQueueSize; i++) {
producer.sendAsync("msg" + i);
}
//Give some time to consume
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), receiveQueueSize));
consumer.close();
producer.close();
}
@DataProvider(name = "partitioned")
public static Object[] isPartitioned() {
return new Object[] {false, true};
}
@Test(dataProvider = "partitioned")
public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
final String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" +
UUID.randomUUID().toString();
final String subName = "my-sub";
if (isPartitioned) {
admin.topics().createPartitionedTopic(topicName, 3);
}
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscribe();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
final int messages = 100;
List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync());
}
FutureUtil.waitForAll(messageIds).get();
Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should greater that 0, current size is {}", size);
Assert.assertTrue(size > 0);
});
for (int i = 0; i < messages; i++) {
consumer.acknowledge(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS));
}
Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should be 0, current size is {}", size);
Assert.assertEquals(size, 0);
});
}
@Data
@EqualsAndHashCode
public static class MyBean {
private String field;
}
@DataProvider(name = "enableBatching")
public static Object[] isEnableBatching() {
return new Object[]{false, true};
}
@Test(dataProvider = "enableBatching")
public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {
log.info("-- Starting {} test --", methodName);
final String topic = "persistent://my-property/my-ns/deferredSchemaCompressed";
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("testsub")
.subscribe();
// initially we are not setting a Schema in the producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(enableBatching)
.compressionType(CompressionType.LZ4)
.create();
MyBean payload = new MyBean();
payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");
// now we send with a schema, but we have enabled compression and batching
// the producer will have to setup the schema and resume the send
producer.newMessage(Schema.AVRO(MyBean.class)).value(payload).send();
producer.close();
GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
consumer.close();
assertEquals(SchemaType.AVRO, res.getSchemaType());
org.apache.avro.generic.GenericRecord nativeRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
org.apache.avro.Schema schema = nativeRecord.getSchema();
for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
log.info("field {} {}", f.getName(), res.getField(f));
assertEquals("field", f.getName());
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", nativeRecord.get(f.getName()).toString());
}
assertEquals(1, res.getFields().size());
}
@Test(dataProvider = "enableBatching")
public void testNativeAvroSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {
log.info("-- Starting {} test --", methodName);
final String topic = "persistent://my-property/my-ns/deferredSchemaCompressed";
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("testsub")
.subscribe();
// initially we are not setting a Schema in the producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(enableBatching)
.compressionType(CompressionType.LZ4)
.create();
MyBean payload = new MyBean();
payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");
// now we send with a schema, but we have enabled compression and batching
// the producer will have to setup the schema and resume the send
Schema<MyBean> myBeanSchema = Schema.AVRO(MyBean.class);
byte[] schemaBytes = myBeanSchema.getSchemaInfo().getSchema();
org.apache.avro.Schema schemaAvroNative = new Parser().parse(new ByteArrayInputStream(schemaBytes));
AvroWriter<MyBean> writer = new AvroWriter<>(schemaAvroNative);
byte[] content = writer.write(payload);
producer.newMessage(Schema.NATIVE_AVRO(schemaAvroNative)).value(content).send();
producer.close();
GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
consumer.close();
assertEquals(SchemaType.AVRO, res.getSchemaType());
org.apache.avro.generic.GenericRecord nativeRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
org.apache.avro.Schema schema = nativeRecord.getSchema();
for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
log.info("field {} {}", f.getName(), res.getField(f));
assertEquals("field", f.getName());
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", nativeRecord.get(f.getName()).toString());
}
assertEquals(1, res.getFields().size());
}
@DataProvider(name = "avroSchemaProvider")
public static Object[] avroSchemaProvider() {
return new Object[]{Schema.AVRO(MyBean.class), Schema.JSON(MyBean.class)};
}
@Test(dataProvider = "avroSchemaProvider")
public void testAccessAvroSchemaMetadata(Schema<MyBean> schema) throws Exception {
log.info("-- Starting {} test --", methodName);
final String topic = "persistent://my-property/my-ns/accessSchema";
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("testsub")
.subscribe();
Producer<MyBean> producer = pulsarClient
.newProducer(schema)
.topic(topic)
.create();
MyBean payload = new MyBean();
payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");
producer.send(payload);
producer.close();
GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
consumer.close();
assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
JsonNode nativeJsonRecord = null;
if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
assertNotNull(nativeAvroRecord);
} else {
nativeJsonRecord = (JsonNode) res.getNativeObject();
assertNotNull(nativeJsonRecord);
}
for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
log.info("field {} {}", f.getName(), res.getField(f));
assertEquals("field", f.getName());
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
if (nativeAvroRecord != null) {
// test that the native schema is accessible
org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName());
// a nullable string is an UNION
assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType());
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING));
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.NULL));
} else {
assertEquals(JsonNodeType.STRING, nativeJsonRecord.get("field").getNodeType());
}
}
assertEquals(1, res.getFields().size());
}
@Test
public void testTopicDoesNotExists() throws Exception {
cleanup();
conf.setAllowAutoTopicCreation(false);
setup();
String topic = "persistent://my-property/my-ns/none" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
try {
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.enableRetry(true)
.topic(topic).subscriptionName("sub").subscribe();
fail("should fail");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.NotFoundException);
} finally {
conf.setAllowAutoTopicCreation(true);
}
}
/**
* Test validates that consumer of partitioned-topic utilizes threads of all partitioned-consumers and slow-listener
* of one of the partition doesn't impact listener-processing of other partition.
* <p>
* Test starts consumer with 10 partitions where one of the partition listener gets blocked but that will not impact
* processing of other 9 partitions and they will be processed successfully.
* As of involved #11455(Fix Consumer listener does not respect receiver queue size),
* This test has changed the purpose that different thread run the messageListener. Because messageListener has to
* be called one by one, it's possible to run by the same one thread.
*
* @throws Exception
*/
@Test(timeOut = 20000)
public void testPartitionTopicsOnSeparateListener() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/one-partitioned-topic";
final String subscriptionName = "my-sub-";
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().listenerThreads(10).serviceUrl(lookupUrl.toString()).build();
// create partitioned topic
int partitions = 10;
admin.topics().createPartitionedTopic(topicName, partitions);
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, partitions);
// each partition
int totalMessages = partitions * 2;
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger count = new AtomicInteger();
Set<String> listenerThreads = Sets.newConcurrentHashSet();
MessageListener<byte[]> messageListener = (c, m) -> {
if (count.incrementAndGet() == totalMessages) {
latch.countDown();
}
listenerThreads.add(Thread.currentThread().getName());
};
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).messageListener(messageListener)
.subscriptionName(subscriptionName + 1).consumerName("aaa").subscribe();
log.info("Consumer1 created. topic: {}", consumer1.getTopic());
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).enableBatching(false).create();
log.info("Producer1 created. topic: {}", producer1.getTopic());
for (int i = 0; i < totalMessages; i++) {
producer1.newMessage().value(("one-partitioned-topic-value-producer1-" + i).getBytes(UTF_8)).send();
}
latch.await();
assertTrue(listenerThreads.size() >= 1);
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 30000)
public void testShareConsumerWithMessageListener() throws Exception {
String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
int total = 200;
Set<Integer> resultSet = Sets.newConcurrentHashSet();
AtomicInteger r1 = new AtomicInteger(0);
AtomicInteger r2 = new AtomicInteger(0);
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.maxPendingMessages(500)
.enableBatching(false)
.create();
@Cleanup
Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("shared")
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(10)
.consumerName("c1")
.messageListener((MessageListener<Integer>) (consumer, msg) -> {
log.info("c1 received : {}", msg.getValue());
try {
resultSet.add(msg.getValue());
r1.incrementAndGet();
consumer.acknowledge(msg);
Thread.sleep(10);
} catch (InterruptedException ignore) {
//
} catch (PulsarClientException ex) {
log.error("c1 acknowledge error", ex);
}
})
.subscribe();
for (int i = 0; i < total; i++) {
producer.newMessage()
.value(i)
.send();
}
@Cleanup
Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("shared")
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(10)
.consumerName("c2")
.messageListener((MessageListener<Integer>) (consumer, msg) -> {
log.info("c2 received : {}", msg.getValue());
try {
resultSet.add(msg.getValue());
r2.incrementAndGet();
consumer.acknowledge(msg);
Thread.sleep(10);
} catch (InterruptedException ignore) {
//
} catch (PulsarClientException ex) {
log.error("c2 acknowledge error", ex);
}
})
.subscribe();
Awaitility.await().untilAsserted(() -> {
assertTrue(r1.get() >= 1);
assertTrue(r2.get() >= 1);
assertEquals(resultSet.size(), total);
});
}
}