blob: 2c94966aa3f0f16ab1bf36146bd3363af9782ffe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerStatTest.class);
@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetupForStatsTest();
super.producerBaseSetup();
}
@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@DataProvider(name = "batch")
public Object[][] batchMessageDelayMsProvider() {
return new Object[][] { { 0 }, { 1000 } };
}
@DataProvider(name = "batch_with_timeout")
public Object[][] ackTimeoutSecProvider() {
return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } };
}
@Test(dataProvider = "batch_with_timeout")
public void testSyncProducerAndConsumer(int batchMessageDelayMs, int ackTimeoutSec) throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/tp1/my-ns/my-topic1").subscriptionName("my-subscriber-name");
// Cumulative Ack-counter works if ackTimeOutTimer-task is enabled
boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
if (ackTimeoutSec > 0) {
consumerBuilder.ackTimeout(ackTimeoutSec, TimeUnit.SECONDS);
}
Consumer<byte[]> consumer = consumerBuilder.subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/tp1/my-ns/my-topic1");
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5);
}
Producer<byte[]> producer = producerBuilder.create();
int numMessages = 11;
for (int i = 0; i < numMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < numMessages; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
Thread.sleep(2000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "batch_with_timeout")
public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeoutSec) throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/tp1/my-ns/my-topic2").subscriptionName("my-subscriber-name");
if (ackTimeoutSec > 0) {
consumerBuilder.ackTimeout(ackTimeoutSec, TimeUnit.SECONDS);
}
Consumer<byte[]> consumer = consumerBuilder.subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/tp1/my-ns/my-topic2")
.messageRoutingMode(MessageRoutingMode.SinglePartition);
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5);
} else {
producerBuilder.enableBatching(false);
}
Producer<byte[]> producer = producerBuilder.create();
List<Future<MessageId>> futures = Lists.newArrayList();
int numMessages = 50;
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}
log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}
Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < numMessages; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Asynchronously acknowledge upto and including the last message
Future<Void> ackFuture = consumer.acknowledgeCumulativeAsync(msg);
log.info("Waiting for async ack to complete");
ackFuture.get();
Thread.sleep(2000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer, batchMessageDelayMs == 0 && ackTimeoutSec > 0);
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "batch_with_timeout")
public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs, int ackTimeoutSec)
throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/tp1/my-ns/my-topic2").subscriptionName("my-subscriber-name");
if (ackTimeoutSec > 0) {
consumerBuilder.ackTimeout(ackTimeoutSec, TimeUnit.SECONDS);
}
Consumer<byte[]> consumer = consumerBuilder.subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/tp1/my-ns/my-topic2")
.messageRoutingMode(MessageRoutingMode.SinglePartition);
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5);
} else {
producerBuilder.enableBatching(false);
}
Producer<byte[]> producer = producerBuilder.create();
List<Future<MessageId>> futures = Lists.newArrayList();
int numMessages = 101;
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}
log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}
Message<byte[]> msg = null;
CompletableFuture<Message<byte[]>> future_msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < numMessages; i++) {
future_msg = consumer.receiveAsync();
Thread.sleep(10);
msg = future_msg.get();
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Asynchronously acknowledge upto and including the last message
Future<Void> ackFuture = consumer.acknowledgeCumulativeAsync(msg);
log.info("Waiting for async ack to complete");
ackFuture.get();
Thread.sleep(5000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer, batchMessageDelayMs == 0 && ackTimeoutSec > 0);
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "batch", timeOut = 100000)
public void testMessageListener(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
int numMessages = 100;
final CountDownLatch latch = new CountDownLatch(numMessages);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/tp1/my-ns/my-topic3")
.subscriptionName("my-subscriber-name").ackTimeout(100, TimeUnit.SECONDS)
.messageListener((consumer1, msg) -> {
assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
consumer1.acknowledgeAsync(msg);
latch.countDown();
}).subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/tp1/my-ns/my-topic3");
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5);
}
Producer<byte[]> producer = producerBuilder.create();
List<Future<MessageId>> futures = Lists.newArrayList();
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}
log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}
Thread.sleep(5000);
log.info("Waiting for message listener to ack all messages");
assertEquals(latch.await(numMessages, TimeUnit.SECONDS), true, "Timed out waiting for message listener acks");
consumer.close();
producer.close();
validatingLogInfo(consumer, producer, true);
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "batch")
public void testSendTimeout(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/tp1/my-ns/my-topic5")
.subscriptionName("my-subscriber-name").subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/tp1/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true).batchingMaxPublishDelay(2 * batchMessageDelayMs, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5);
}
Producer<byte[]> producer = producerBuilder.create();
final String message = "my-message";
// Trigger the send timeout
stopBroker();
Future<MessageId> future = producer.sendAsync(message.getBytes());
try {
future.get();
fail("Send operation should have failed");
} catch (ExecutionException e) {
// Expected
}
startBroker();
// We should not have received any message
Message<byte[]> msg = consumer.receive(3, TimeUnit.SECONDS);
assertNull(msg);
consumer.close();
producer.close();
Thread.sleep(1000);
ConsumerStats cStat = consumer.getStats();
ProducerStats pStat = producer.getStats();
assertEquals(pStat.getTotalMsgsSent(), 0);
assertEquals(pStat.getTotalSendFailed(), 1);
assertEquals(cStat.getTotalMsgsReceived(), 0);
assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent());
log.info("-- Exiting {} test --", methodName);
}
public void testBatchMessagesRateOut() throws PulsarClientException, InterruptedException, PulsarAdminException {
log.info("-- Starting {} test --", methodName);
String topicName = "persistent://my-property/cluster/my-ns/testBatchMessagesRateOut";
double produceRate = 17;
int batchSize = 5;
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).batchingMaxMessages(batchSize)
.enableBatching(true).batchingMaxPublishDelay(2, TimeUnit.SECONDS).create();
AtomicBoolean runTest = new AtomicBoolean(true);
Thread t1 = new Thread(() -> {
RateLimiter r = RateLimiter.create(produceRate);
while (runTest.get()) {
r.acquire();
producer.sendAsync("Hello World".getBytes());
consumer.receiveAsync().thenAccept(consumer::acknowledgeAsync);
}
});
t1.start();
Thread.sleep(2000); // Two seconds sleep
runTest.set(false);
pulsar.getBrokerService().updateRates();
double actualRate = admin.topics().getStats(topicName).msgRateOut;
assertTrue(actualRate > (produceRate / batchSize));
consumer.unsubscribe();
log.info("-- Exiting {} test --", methodName);
}
public void validatingLogInfo(Consumer<?> consumer, Producer<?> producer, boolean verifyAckCount)
throws InterruptedException {
// Waiting for recording last stat info
Thread.sleep(1000);
ConsumerStats cStat = consumer.getStats();
ProducerStats pStat = producer.getStats();
assertEquals(pStat.getTotalMsgsSent(), cStat.getTotalMsgsReceived());
assertEquals(pStat.getTotalBytesSent(), cStat.getTotalBytesReceived());
assertEquals(pStat.getTotalMsgsSent(), pStat.getTotalAcksReceived());
if (verifyAckCount) {
assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent());
}
}
}