blob: d8c2f6ce3797ccc0eb3898f3760e5ff717d1e4c9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Lists;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class BatchMessageTest extends BrokerTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
super.baseSetup();
}
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@DataProvider(name = "codecAndContainerBuilder")
public Object[][] codecAndContainerBuilderProvider() {
return new Object[][] {
{ CompressionType.NONE, BatcherBuilder.DEFAULT },
{ CompressionType.LZ4, BatcherBuilder.DEFAULT },
{ CompressionType.ZLIB, BatcherBuilder.DEFAULT },
{ CompressionType.NONE, BatcherBuilder.KEY_BASED },
{ CompressionType.LZ4, BatcherBuilder.KEY_BASED },
{ CompressionType.ZLIB, BatcherBuilder.KEY_BASED }
};
}
@DataProvider(name = "containerBuilder")
public Object[][] containerBuilderProvider() {
return new Object[][] {
{ BatcherBuilder.DEFAULT },
{ BatcherBuilder.KEY_BASED }
};
}
@Test(dataProvider = "codecAndContainerBuilder")
public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 50;
int numMsgsInBatch = numMsgs / 2;
final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-" + UUID.randomUUID();
final String subscriptionName = "sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
.batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("my-message-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 2 messages in the backlog since we sent 50 messages with the batch size set to 25. We have set the
// batch time high enough for it to not affect the number of messages in the batch
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + i;
Assert.assertEquals(receivedMessage, expectedMessage,
"Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
}
consumer.close();
producer.close();
}
@Test(dataProvider = "codecAndContainerBuilder")
public void testSimpleBatchProducerWithFixedBatchBytes(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 50;
int numBytesInBatch = 600;
final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-" + UUID.randomUUID();
final String subscriptionName = "sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.compressionType(compressionType)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxMessages(0)
.batchingMaxBytes(numBytesInBatch)
.enableBatching(true)
.batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("my-message-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 2 messages in the backlog since we sent 50 messages with the batch size set to 25. We have set the
// batch time high enough for it to not affect the number of messages in the batch
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + i;
Assert.assertEquals(receivedMessage, expectedMessage,
"Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
}
consumer.close();
producer.close();
}
@Test(dataProvider = "codecAndContainerBuilder")
public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 100;
final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchTime-" + UUID.randomUUID();
final String subscriptionName = "time-sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).enableBatching(true)
.batcherBuilder(builder)
.create();
Random random = new Random();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
// put a random sleep from 0 to 3 ms
Thread.sleep(random.nextInt(4));
byte[] message = ("msg-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
LOG.info("Sent {} messages, backlog is {} messages", numMsgs,
topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) < numMsgs);
producer.close();
}
@Test(dataProvider = "codecAndContainerBuilder")
public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 100;
final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime-" + UUID.randomUUID();
final String subscriptionName = "time-size-sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).batchingMaxMessages(5)
.batcherBuilder(builder)
.compressionType(compressionType).enableBatching(true).create();
Random random = new Random();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
// put a random sleep from 0 to 3 ms
Thread.sleep(random.nextInt(4));
byte[] message = ("msg-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
LOG.info("Sent {} messages, backlog is {} messages", numMsgs,
topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) < numMsgs);
producer.close();
}
@Test(dataProvider = "codecAndContainerBuilder")
public void testBatchProducerWithLargeMessage(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 50;
int numMsgsInBatch = numMsgs / 2;
final String topicName = "persistent://prop/ns-abc/testBatchProducerWithLargeMessage-" + UUID.randomUUID();
final String subscriptionName = "large-message-sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
.batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
if (i == 25) {
// send a large message
byte[] largeMessage = new byte[128 * 1024 + 4];
sendFutureList.add(producer.sendAsync(largeMessage));
} else {
byte[] message = ("msg-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
}
byte[] lastMsg = ("msg-" + "last").getBytes();
sendFutureList.add(producer.sendAsync(lastMsg));
FutureUtil.waitForAll(sendFutureList).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 3 messages in the backlog since the large message in the middle should
// close out the batch and be sent in a batch of its own
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 3);
consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
for (int i = 0; i <= numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
LOG.info("received msg - {}", msg.getData().toString());
consumer.acknowledge(msg);
}
Thread.sleep(100);
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
consumer.close();
producer.close();
}
@Test(dataProvider = "codecAndContainerBuilder")
public void testSimpleBatchProducerConsumer(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 500;
int numMsgsInBatch = numMsgs / 20;
final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer-" + UUID.randomUUID();
final String subscriptionName = "pc-sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.compressionType(compressionType)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
// disabled time based batch by setting delay to a large enough value
.batchingMaxPublishDelay(60, TimeUnit.HOURS)
// disabled size based batch
.batchingMaxMessages(2 * numMsgs)
.enableBatching(true)
.batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("msg-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
if ((i + 1) % numMsgsInBatch == 0) {
producer.flush();
LOG.info("Flush {} messages", (i + 1));
}
}
FutureUtil.waitForAll(sendFutureList).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Message<byte[]> lastunackedMsg = null;
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
if (i % 2 == 0) {
consumer.acknowledgeCumulative(msg);
} else {
lastunackedMsg = msg;
}
}
if (lastunackedMsg != null) {
consumer.acknowledgeCumulative(lastunackedMsg);
}
Thread.sleep(100);
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
consumer.close();
producer.close();
}
@Test(dataProvider = "containerBuilder")
public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder builder) throws Exception {
int numMsgs = 10;
int numMsgsInBatch = numMsgs / 2;
final String topicName = "persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize-" + UUID.randomUUID();
final String subscriptionName = "syncsub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
.batcherBuilder(builder)
.create();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("my-message-" + i).getBytes();
producer.send(message);
}
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 10 messages in the backlog since we sent 10 messages with the batch size set to 5.
// However, we are using synchronous send and so each message will go as an individual message
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 10);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + i;
Assert.assertEquals(receivedMessage, expectedMessage,
"Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
}
consumer.close();
producer.close();
}
@Test(dataProvider = "containerBuilder")
public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder builder) throws Exception {
int numMsgs = 2000;
int numMsgsInBatch = 4;
final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer1kMessages-" + UUID.randomUUID();
final String subscriptionName = "pc1k-sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).maxPendingMessages(numMsgs + 1)
.batchingMaxPublishDelay(30, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
.batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("msg-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
int sendError = 0;
for (CompletableFuture<MessageId> sendFuture : sendFutureList) {
if (sendFuture.isCompletedExceptionally()) {
++sendError;
}
}
if (sendError != 0) {
LOG.warn("[{}] Error sending {} messages", subscriptionName, sendError);
numMsgs = numMsgs - sendError;
}
LOG.info("[{}] sent {} messages", subscriptionName, numMsgs);
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
// allow stats to be updated..
LOG.info("[{}] checking backlog stats..");
rolloverPerIntervalStats();
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Message<byte[]> lastunackedMsg = null;
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
lastunackedMsg = msg;
}
if (lastunackedMsg != null) {
consumer.acknowledgeCumulative(lastunackedMsg);
}
consumer.close();
producer.close();
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
}
// test for ack holes
/*
* lid eid bid 0 0 1-10 ack type cumul till id 9 0 1 1-10 ack type cumul on batch id 5. (should remove 0,1, 10 also
* on broker) individual ack on 6-10. (if ack type individual on bid 5, then hole remains which is ok) 0 2 1-10 0 3
* 1-10
*/
@Test
public void testOutOfOrderAcksForBatchMessage() throws Exception {
int numMsgs = 40;
int numMsgsInBatch = numMsgs / 4;
final String topicName = "persistent://prop/ns-abc/testOutOfOrderAcksForBatchMessage";
final String subscriptionName = "oooack-sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("msg-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Set<Integer> individualAcks = new HashSet<>();
for (int i = 15; i < 20; i++) {
individualAcks.add(i);
}
Message<byte[]> lastunackedMsg = null;
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
LOG.info("received message {}", new String(msg.getData(), UTF_8));
assertNotNull(msg);
if (i == 8) {
consumer.acknowledgeCumulative(msg);
} else if (i == 9) {
// do not ack
} else if (i == 14) {
// should ack lid =0 eid = 1 on broker
consumer.acknowledgeCumulative(msg);
Thread.sleep(1000);
rolloverPerIntervalStats();
Thread.sleep(1000);
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 3);
} else if (individualAcks.contains(i)) {
consumer.acknowledge(msg);
} else {
lastunackedMsg = msg;
}
}
Thread.sleep(1000);
rolloverPerIntervalStats();
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2);
if (lastunackedMsg != null) {
consumer.acknowledgeCumulative(lastunackedMsg);
}
Thread.sleep(100);
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
consumer.close();
producer.close();
}
@Test(dataProvider = "containerBuilder")
public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder builder) throws Exception {
int numMsgs = 10;
int numMsgsInBatch = numMsgs;
final String topicName = "persistent://prop/ns-abc/testNonBatchCumulativeAckAfterBatchPublish-" + UUID.randomUUID();
final String subscriptionName = "nbcaabp-sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
.batcherBuilder(builder)
.create();
// create producer to publish non batch messages
Producer<byte[]> noBatchProducer = pulsarClient.newProducer().topic(topicName).create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("msg-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
sendFutureList.clear();
byte[] nobatchmsg = ("nobatch").getBytes();
noBatchProducer.sendAsync(nobatchmsg).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Message<byte[]> lastunackedMsg = null;
for (int i = 0; i <= numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
lastunackedMsg = msg;
}
if (lastunackedMsg != null) {
consumer.acknowledgeCumulative(lastunackedMsg);
}
Thread.sleep(100);
rolloverPerIntervalStats();
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
consumer.close();
producer.close();
noBatchProducer.close();
}
@Test(dataProvider = "containerBuilder")
public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) throws Exception {
int numMsgs = 50;
int numMsgsInBatch = numMsgs / 10;
final String topicName = "persistent://prop/ns-abc/testBatchAndNonBatchCumulativeAcks-" + UUID.randomUUID();
final String subscriptionName = "bnb-sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxMessages(numMsgsInBatch)
.enableBatching(true)
.batcherBuilder(builder)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
// create producer to publish non batch messages
Producer<byte[]> noBatchProducer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs / 2; i++) {
byte[] message = ("msg-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
byte[] nobatchmsg = ("nobatch-" + i).getBytes();
sendFutureList.add(noBatchProducer.sendAsync(nobatchmsg));
}
FutureUtil.waitForAll(sendFutureList).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
(numMsgs / 2) / numMsgsInBatch + numMsgs / 2);
consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
Message<byte[]> lastunackedMsg = null;
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
LOG.info("[{}] got message position{} data {}", subscriptionName, msg.getMessageId(),
String.valueOf(msg.getData()));
if (i % 2 == 0) {
lastunackedMsg = msg;
} else {
consumer.acknowledgeCumulative(msg);
LOG.info("[{}] did cumulative ack on position{} ", subscriptionName, msg.getMessageId());
}
}
if (lastunackedMsg != null) {
consumer.acknowledgeCumulative(lastunackedMsg);
}
retryStrategically(t -> topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) == 0, 100, 100);
consumer.close();
producer.close();
noBatchProducer.close();
}
/**
* Verifies batch-message acking is thread-safe
*
* @throws Exception
*/
@Test(dataProvider = "containerBuilder", timeOut = 3000)
public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Exception {
int numMsgs = 10;
final String topicName = "persistent://prop/ns-abc/testConcurrentAck-" + UUID.randomUUID();
final String subscriptionName = "sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared).subscribe();
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true)
.batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("my-message-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
final Consumer<byte[]> myConsumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscribe();
// assertEquals(dispatcher.getTotalUnackedMessages(), 1);
ExecutorService executor = Executors.newFixedThreadPool(10);
final CountDownLatch latch = new CountDownLatch(numMsgs);
final AtomicBoolean failed = new AtomicBoolean(false);
for (int i = 0; i < numMsgs; i++) {
executor.submit(() -> {
try {
Message<byte[]> msg = myConsumer.receive(1, TimeUnit.SECONDS);
myConsumer.acknowledge(msg);
} catch (Exception e) {
failed.set(false);
}
latch.countDown();
});
}
latch.await();
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) topic
.getSubscription(subscriptionName).getDispatcher();
// check strategically to let ack-message receive by broker
retryStrategically((test) -> dispatcher.getConsumers().get(0).getUnackedMessages() == 0, 50, 150);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
executor.shutdown();
myConsumer.close();
producer.close();
}
@Test
public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientException, ExecutionException, InterruptedException {
final String topicName = "persistent://prop/ns-abc/testKeyBased";
final String subscriptionName = "sub-1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxMessages(30)
.enableBatching(true)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
String[] keys = new String[]{"key-1", "key-2", "key-3"};
for (int i = 0; i < 10; i++) {
byte[] message = ("my-message-" + i).getBytes();
for (String key : keys) {
sendFutureList.add(producer.newMessage().key(key).value(message).sendAsync());
}
}
FutureUtil.waitForAll(sendFutureList).get();
String receivedKey = "";
int receivedMessageIndex = 0;
for (int i = 0; i < 30; i++) {
Message<byte[]> received = consumer.receive();
if (!received.getKey().equals(receivedKey)) {
receivedKey = received.getKey();
receivedMessageIndex = 0;
}
assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10);
consumer.acknowledge(received);
receivedMessageIndex++;
}
for (int i = 0; i < 10; i++) {
byte[] message = ("my-message-" + i).getBytes();
for (String key : keys) {
sendFutureList.add(producer.newMessage()
.key(UUID.randomUUID().toString())
.orderingKey(key.getBytes())
.value(message)
.sendAsync());
}
}
FutureUtil.waitForAll(sendFutureList).get();
receivedKey = "";
receivedMessageIndex = 0;
for (int i = 0; i < 30; i++) {
Message<byte[]> received = consumer.receive();
if (!new String(received.getOrderingKey()).equals(receivedKey)) {
receivedKey = new String(received.getOrderingKey());
receivedMessageIndex = 0;
}
assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10);
consumer.acknowledge(received);
receivedMessageIndex++;
}
consumer.close();
producer.close();
}
@Test(dataProvider = "containerBuilder")
public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws Exception {
int numMsgs = 10;
final String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdGenerated-" + UUID.randomUUID();
final String subscriptionName = "sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true)
.batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("my-message-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> received = consumer.receive();
Assert.assertEquals(received.getSequenceId(), i);
consumer.acknowledge(received);
}
producer.close();
consumer.close();
}
@Test(dataProvider = "containerBuilder")
public void testRetrieveSequenceIdSpecify(BatcherBuilder builder) throws Exception {
int numMsgs = 10;
final String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
final String subscriptionName = "sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true)
.batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("my-message-" + i).getBytes();
sendFutureList.add(producer.newMessage().sequenceId(i + 100).value(message).sendAsync());
}
FutureUtil.waitForAll(sendFutureList).get();
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> received = consumer.receive();
Assert.assertEquals(received.getSequenceId(), i + 100);
consumer.acknowledge(received);
}
producer.close();
consumer.close();
}
@Test(dataProvider = "codecAndContainerBuilder")
public void testSendOverSizeMessage(CompressionType compressionType, BatcherBuilder builder) throws Exception {
final int numMsgs = 10;
final String topicName = "persistent://prop/ns-abc/testSendOverSizeMessage-" + UUID.randomUUID();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
.batchingMaxMessages(2)
.enableBatching(true)
.compressionType(compressionType)
.batcherBuilder(builder)
.create();
try {
producer.send(new byte[1024 * 1024 * 10]);
} catch (PulsarClientException e) {
assertTrue(e instanceof PulsarClientException.InvalidMessageException);
}
for (int i = 0; i < numMsgs; i++) {
producer.send(new byte[1024]);
}
producer.close();
}
private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class);
}