| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.client.impl; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertTrue; |
| |
| import java.util.HashSet; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.pulsar.broker.service.BrokerTestBase; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.MessageRoutingMode; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.client.api.SubscriptionType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase { |
| private static final long testTimeout = 90000; // 1.5 min |
| private static final Logger log = LoggerFactory.getLogger(UnAcknowledgedMessagesTimeoutTest.class); |
| private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2); |
| |
| @Override |
| @BeforeMethod |
| public void setup() throws Exception { |
| super.baseSetup(); |
| } |
| |
| @Override |
| @AfterMethod |
| public void cleanup() throws Exception { |
| super.internalCleanup(); |
| } |
| |
| @Test(timeOut = testTimeout) |
| public void testExclusiveSingleAckedNormalTopic() throws Exception { |
| String key = "testExclusiveSingleAckedNormalTopic"; |
| final String topicName = "persistent://prop/ns-abc/topic-" + key; |
| final String subscriptionName = "my-ex-subscription-" + key; |
| final String messagePredicate = "my-message-" + key + "-"; |
| final int totalMessages = 10; |
| |
| // 1. producer connect |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| // 2. Create consumer |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) |
| .receiverQueueSize(7).ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe(); |
| |
| // 3. producer publish messages |
| for (int i = 0; i < totalMessages / 2; i++) { |
| String message = messagePredicate + i; |
| log.info("Producer produced: " + message); |
| producer.send(message.getBytes()); |
| } |
| |
| // 4. Receiver receives the message |
| Message<byte[]> message = consumer.receive(); |
| while (message != null) { |
| log.info("Consumer received : " + new String(message.getData())); |
| message = consumer.receive(500, TimeUnit.MILLISECONDS); |
| } |
| long size = ((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().size(); |
| log.info(key + " Unacked Message Tracker size is " + size); |
| assertEquals(size, totalMessages / 2); |
| |
| // Blocking call, redeliver should kick in |
| message = consumer.receive(); |
| log.info("Consumer received : " + new String(message.getData())); |
| |
| HashSet<String> hSet = new HashSet<>(); |
| for (int i = totalMessages / 2; i < totalMessages; i++) { |
| String messageString = messagePredicate + i; |
| producer.send(messageString.getBytes()); |
| } |
| |
| do { |
| hSet.add(new String(message.getData())); |
| consumer.acknowledge(message); |
| log.info("Consumer acknowledged : " + new String(message.getData())); |
| message = consumer.receive(500, TimeUnit.MILLISECONDS); |
| } while (message != null); |
| |
| size = ((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().size(); |
| log.info(key + " Unacked Message Tracker size is " + size); |
| assertEquals(size, 0); |
| assertEquals(hSet.size(), totalMessages); |
| } |
| |
| @Test(timeOut = testTimeout) |
| public void testExclusiveCumulativeAckedNormalTopic() throws Exception { |
| String key = "testExclusiveCumulativeAckedNormalTopic"; |
| final String topicName = "persistent://prop/use/ns-abc/topic-" + key; |
| final String subscriptionName = "my-ex-subscription-" + key; |
| final String messagePredicate = "my-message-" + key + "-"; |
| final int totalMessages = 10; |
| |
| // 1. producer connect |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| // 2. Create consumer |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) |
| .receiverQueueSize(7).ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe(); |
| |
| // 3. producer publish messages |
| for (int i = 0; i < totalMessages; i++) { |
| String message = messagePredicate + i; |
| producer.send(message.getBytes()); |
| } |
| |
| // 4. Receiver receives the message |
| HashSet<String> hSet = new HashSet<>(); |
| Message<byte[]> message = consumer.receive(); |
| Message<byte[]> lastMessage = message; |
| while (message != null) { |
| lastMessage = message; |
| hSet.add(new String(message.getData())); |
| log.info("Consumer received " + new String(message.getData())); |
| log.info("Message ID details " + ((MessageIdImpl) message.getMessageId()).toString()); |
| message = consumer.receive(500, TimeUnit.MILLISECONDS); |
| } |
| long size = ((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().size(); |
| assertEquals(size, totalMessages); |
| log.info("Comulative Ack sent for " + new String(lastMessage.getData())); |
| log.info("Message ID details " + ((MessageIdImpl) lastMessage.getMessageId()).toString()); |
| consumer.acknowledgeCumulative(lastMessage); |
| size = ((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().size(); |
| assertEquals(size, 0); |
| message = consumer.receive((int) (2 * ackTimeOutMillis), TimeUnit.MILLISECONDS); |
| assertEquals(message, null); |
| } |
| |
| @Test(timeOut = testTimeout) |
| public void testSharedSingleAckedPartitionedTopic() throws Exception { |
| String key = "testSharedSingleAckedPartitionedTopic"; |
| final String topicName = "persistent://prop/ns-abc/topic-" + key; |
| final String subscriptionName = "my-shared-subscription-" + key; |
| final String messagePredicate = "my-message-" + key + "-"; |
| final int totalMessages = 20; |
| final int numberOfPartitions = 3; |
| admin.topics().createPartitionedTopic(topicName, numberOfPartitions); |
| // Special step to create partitioned topic |
| |
| // 1. producer connect |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) |
| .create(); |
| |
| // 2. Create consumer |
| Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) |
| .receiverQueueSize(100).subscriptionType(SubscriptionType.Shared) |
| .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-1").subscribe(); |
| Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) |
| .receiverQueueSize(100).subscriptionType(SubscriptionType.Shared) |
| .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-2").subscribe(); |
| |
| // 3. producer publish messages |
| for (int i = 0; i < totalMessages; i++) { |
| String message = messagePredicate + i; |
| MessageId msgId = producer.send(message.getBytes()); |
| log.info("Message produced: {} -- msgId: {}", message, msgId); |
| } |
| |
| // 4. Receive messages |
| int messageCount1 = receiveAllMessage(consumer1, false /* no-ack */); |
| int messageCount2 = receiveAllMessage(consumer2, true /* yes-ack */); |
| int ackCount1 = 0; |
| int ackCount2 = messageCount2; |
| |
| log.info(key + " messageCount1 = " + messageCount1); |
| log.info(key + " messageCount2 = " + messageCount2); |
| log.info(key + " ackCount1 = " + ackCount1); |
| log.info(key + " ackCount2 = " + ackCount2); |
| assertEquals(messageCount1 + messageCount2, totalMessages); |
| |
| // 5. Check if Messages redelivered again |
| // Since receive is a blocking call hoping that timeout will kick in |
| Thread.sleep((int) (ackTimeOutMillis * 1.1)); |
| log.info(key + " Timeout should be triggered now"); |
| messageCount1 = receiveAllMessage(consumer1, true); |
| messageCount2 += receiveAllMessage(consumer2, false); |
| |
| ackCount1 = messageCount1; |
| |
| log.info(key + " messageCount1 = " + messageCount1); |
| log.info(key + " messageCount2 = " + messageCount2); |
| log.info(key + " ackCount1 = " + ackCount1); |
| log.info(key + " ackCount2 = " + ackCount2); |
| assertEquals(messageCount1 + messageCount2, totalMessages); |
| assertEquals(ackCount1 + messageCount2, totalMessages); |
| |
| Thread.sleep((int) (ackTimeOutMillis * 1.1)); |
| |
| // Since receive is a blocking call hoping that timeout will kick in |
| log.info(key + " Timeout should be triggered again"); |
| ackCount1 += receiveAllMessage(consumer1, true); |
| ackCount2 += receiveAllMessage(consumer2, true); |
| log.info(key + " ackCount1 = " + ackCount1); |
| log.info(key + " ackCount2 = " + ackCount2); |
| assertEquals(ackCount1 + ackCount2, totalMessages); |
| } |
| |
| private static int receiveAllMessage(Consumer<?> consumer, boolean ackMessages) throws Exception { |
| int messagesReceived = 0; |
| Message<?> msg = consumer.receive(1, TimeUnit.SECONDS); |
| while (msg != null) { |
| ++messagesReceived; |
| log.info("Consumer received {}", new String(msg.getData())); |
| |
| if (ackMessages) { |
| consumer.acknowledge(msg); |
| } |
| |
| msg = consumer.receive(1, TimeUnit.SECONDS); |
| } |
| |
| return messagesReceived; |
| } |
| |
| @Test(timeOut = testTimeout) |
| public void testFailoverSingleAckedPartitionedTopic() throws Exception { |
| String key = "testFailoverSingleAckedPartitionedTopic"; |
| final String topicName = "persistent://prop/ns-abc/topic-" + key; |
| final String subscriptionName = "my-failover-subscription-" + key; |
| final String messagePredicate = "my-message-" + key + "-"; |
| final int totalMessages = 10; |
| final int numberOfPartitions = 3; |
| admin.topics().createPartitionedTopic(topicName, numberOfPartitions); |
| // Special step to create partitioned topic |
| |
| // 1. producer connect |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) |
| .create(); |
| |
| // 2. Create consumer |
| Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) |
| .receiverQueueSize(7).subscriptionType(SubscriptionType.Shared) |
| .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-1").subscribe(); |
| Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) |
| .receiverQueueSize(7).subscriptionType(SubscriptionType.Shared) |
| .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-2").subscribe(); |
| |
| // 3. producer publish messages |
| for (int i = 0; i < totalMessages; i++) { |
| String message = messagePredicate + i; |
| log.info("Message produced: " + message); |
| producer.send(message.getBytes()); |
| } |
| |
| // 4. Receive messages |
| Message<byte[]> message1 = consumer1.receive(); |
| Message<byte[]> message2 = consumer2.receive(); |
| int messageCount1 = 0; |
| int messageCount2 = 0; |
| int ackCount1 = 0; |
| int ackCount2 = 0; |
| do { |
| if (message1 != null) { |
| log.info("Consumer1 received " + new String(message1.getData())); |
| messageCount1 += 1; |
| } |
| if (message2 != null) { |
| log.info("Consumer2 received " + new String(message2.getData())); |
| messageCount2 += 1; |
| consumer2.acknowledge(message2); |
| ackCount2 += 1; |
| } |
| message1 = consumer1.receive(500, TimeUnit.MILLISECONDS); |
| message2 = consumer2.receive(500, TimeUnit.MILLISECONDS); |
| } while (message1 != null || message2 != null); |
| log.info(key + " messageCount1 = " + messageCount1); |
| log.info(key + " messageCount2 = " + messageCount2); |
| log.info(key + " ackCount1 = " + ackCount1); |
| log.info(key + " ackCount2 = " + ackCount2); |
| assertEquals(messageCount1 + messageCount2, totalMessages); |
| |
| // 5. Check if Messages redelivered again |
| // Since receive is a blocking call hoping that timeout will kick in |
| log.info(key + " Timeout should be triggered now"); |
| message1 = consumer1.receive(); |
| messageCount1 = 0; |
| do { |
| if (message1 != null) { |
| log.info("Consumer1 received " + new String(message1.getData())); |
| messageCount1 += 1; |
| consumer1.acknowledge(message1); |
| ackCount1 += 1; |
| } |
| if (message2 != null) { |
| log.info("Consumer2 received " + new String(message2.getData())); |
| messageCount2 += 1; |
| } |
| message1 = consumer1.receive(500, TimeUnit.MILLISECONDS); |
| message2 = consumer2.receive(500, TimeUnit.MILLISECONDS); |
| } while (message1 != null || message2 != null); |
| log.info(key + " messageCount1 = " + messageCount1); |
| log.info(key + " messageCount2 = " + messageCount2); |
| log.info(key + " ackCount1 = " + ackCount1); |
| log.info(key + " ackCount2 = " + ackCount2); |
| assertEquals(ackCount1 + messageCount2, totalMessages); |
| } |
| |
| @Test |
| public void testAckTimeoutMinValue() throws PulsarClientException { |
| try { |
| pulsarClient.newConsumer().ackTimeout(999, TimeUnit.MILLISECONDS); |
| Assert.fail("Exception should have been thrown since the set timeout is less than min timeout."); |
| } catch (Exception ex) { |
| // Ok |
| } |
| } |
| |
| @Test(timeOut = testTimeout) |
| public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException, InterruptedException { |
| String key = "testCheckUnAcknowledgedMessageTimer"; |
| final String topicName = "persistent://prop/ns-abc/topic-" + key; |
| final String subscriptionName = "my-ex-subscription-" + key; |
| final String messagePredicate = "my-message-" + key + "-"; |
| final int totalMessages = 3; |
| |
| // 1. producer connect |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| // 2. Create consumer |
| ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) |
| .subscriptionName(subscriptionName).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared) |
| .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe(); |
| |
| // 3. producer publish messages |
| for (int i = 0; i < totalMessages; i++) { |
| String message = messagePredicate + i; |
| log.info("Producer produced: " + message); |
| producer.send(message.getBytes()); |
| } |
| |
| Thread.sleep((long) (ackTimeOutMillis * 1.1)); |
| |
| for (int i = 0; i < totalMessages - 1; i++) { |
| Message<byte[]> msg = consumer.receive(); |
| consumer.acknowledge(msg); |
| } |
| |
| assertEquals(consumer.getUnAckedMessageTracker().size(), 1); |
| |
| Message<byte[]> msg = consumer.receive(); |
| consumer.acknowledge(msg); |
| assertEquals(consumer.getUnAckedMessageTracker().size(), 0); |
| |
| Thread.sleep((long) (ackTimeOutMillis * 1.1)); |
| |
| assertEquals(consumer.getUnAckedMessageTracker().size(), 0); |
| } |
| |
| |
| @Test |
| public void testSingleMessageBatch() throws Exception { |
| String topicName = "prop/ns-abc/topic-estSingleMessageBatch"; |
| |
| Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .topic(topicName) |
| .enableBatching(true) |
| .batchingMaxPublishDelay(10, TimeUnit.SECONDS) |
| .create(); |
| |
| Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) |
| .topic(topicName) |
| .subscriptionName("subscription") |
| .ackTimeout(1, TimeUnit.HOURS) |
| .subscribe(); |
| |
| // Force the creation of a batch with a single message |
| producer.sendAsync("hello"); |
| producer.flush(); |
| |
| Message<String> message = consumer.receive(); |
| |
| assertFalse(((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().isEmpty()); |
| |
| consumer.acknowledge(message); |
| |
| assertTrue(((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().isEmpty()); |
| } |
| } |