[integration-tests]Enable messaging tests to integration tests. (#5456)
* Enable messaging tests to integration tests.
* Fix integration tests
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
index 0043dc2..7ddd082 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
@@ -32,6 +32,7 @@
import org.testng.annotations.BeforeMethod;
import java.lang.reflect.Method;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -69,8 +70,8 @@
(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
Set<T> messagesReceived = Sets.newHashSet();
for (Consumer<T> consumer : consumerList) {
- Message<T> currentReceived = null;
- Message<T> lastReceived = null;
+ Message<T> currentReceived;
+ Map<String, Message<T>> lastReceivedMap = new HashMap<>();
while (true) {
try {
currentReceived = consumer.receive(3, TimeUnit.SECONDS);
@@ -79,21 +80,21 @@
break;
}
// Make sure that messages are received in order
- if (lastReceived == null) {
- assertNotNull(currentReceived);
+ if (currentReceived != null) {
+ consumer.acknowledge(currentReceived);
+ if (lastReceivedMap.containsKey(currentReceived.getTopicName())) {
+ assertTrue(currentReceived.getMessageId().compareTo(
+ lastReceivedMap.get(currentReceived.getTopicName()).getMessageId()) > 0,
+ "Received messages are not in order.");
+ }
} else {
- assertTrue(currentReceived != null
- && (currentReceived.getValue().compareTo(lastReceived.getValue()) > 0),
- "Received messages are not in order.");
+ break;
}
- lastReceived = currentReceived;
+ lastReceivedMap.put(currentReceived.getTopicName(), currentReceived);
// Make sure that there are no duplicates
assertTrue(messagesReceived.add(currentReceived.getValue()),
"Received duplicate message " + currentReceived.getValue());
}
- if (currentReceived != null) {
- consumer.acknowledgeCumulative(currentReceived);
- }
}
assertEquals(messagesReceived.size(), messagesToReceive);
}
@@ -110,12 +111,14 @@
log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
break;
}
- // Make sure that there are no duplicates
- assertTrue(messagesReceived.add(currentReceived.getValue()),
- "Received duplicate message " + currentReceived.getValue());
- }
- if (currentReceived != null) {
- consumer.acknowledgeCumulative(currentReceived);
+ if (currentReceived != null) {
+ consumer.acknowledge(currentReceived);
+ // Make sure that there are no duplicates
+ assertTrue(messagesReceived.add(currentReceived.getValue()),
+ "Received duplicate message " + currentReceived.getValue());
+ } else {
+ break;
+ }
}
}
assertEquals(messagesReceived.size(), messagesToReceive);
@@ -126,7 +129,7 @@
Map<String, Set<String>> consumerKeys = Maps.newHashMap();
Set<T> messagesReceived = Sets.newHashSet();
for (Consumer<T> consumer : consumerList) {
- Message<T> currentReceived = null;
+ Message<T> currentReceived;
while (true) {
try {
currentReceived = consumer.receive(3, TimeUnit.SECONDS);
@@ -134,15 +137,17 @@
log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
break;
}
- assertNotNull(currentReceived.getKey());
- consumerKeys.putIfAbsent(consumer.getConsumerName(), Sets.newHashSet());
- consumerKeys.get(consumer.getConsumerName()).add(currentReceived.getKey());
- // Make sure that there are no duplicates
- assertTrue(messagesReceived.add(currentReceived.getValue()),
- "Received duplicate message " + currentReceived.getValue());
- }
- if (currentReceived != null) {
- consumer.acknowledgeCumulative(currentReceived);
+ if (currentReceived != null) {
+ consumer.acknowledge(currentReceived);
+ assertNotNull(currentReceived.getKey());
+ consumerKeys.putIfAbsent(consumer.getConsumerName(), Sets.newHashSet());
+ consumerKeys.get(consumer.getConsumerName()).add(currentReceived.getKey());
+ // Make sure that there are no duplicates
+ assertTrue(messagesReceived.add(currentReceived.getValue()),
+ "Received duplicate message " + currentReceived.getValue());
+ } else {
+ break;
+ }
}
}
// Make sure key will not be distributed to multiple consumers
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
index 7fb1fda..416896f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
@@ -56,11 +56,11 @@
@Test(dataProvider = "ServiceUrls")
public void testNonPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true);
+ nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl, false);
}
@Test(dataProvider = "ServiceUrls")
public void testPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true);
+ partitionedTopicSendAndReceiveWithKeyShared(serviceUrl, false);
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java
index 26097cf..2ed30f9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java
@@ -20,13 +20,14 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -54,17 +55,15 @@
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
- Exception exception = null;
try {
client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
- } catch (PulsarClientException e) {
- exception = e;
+ fail("should be failed");
+ } catch (PulsarClientException ignore) {
}
- assertNotNull(exception);
final int messagesToSend = 10;
final String producerName = "producerForExclusive";
@Cleanup
@@ -93,31 +92,30 @@
List<Consumer<String>> consumerList = new ArrayList<>(3);
for (int i = 0; i < partitions; i++) {
Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic(topicName)
+ .topic(topicName + "-partition-" + i)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
consumerList.add(consumer);
}
assertEquals(partitions, consumerList.size());
- Exception exception = null;
try {
client.newConsumer(Schema.STRING)
- .topic(topicName)
+ .topic(topicName + "-partition-" + 0)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
- } catch (PulsarClientException e) {
- exception = e;
+ fail("should be failed");
+ } catch (PulsarClientException ignore) {
}
- assertNotNull(exception);
- final int messagesToSend = 10;
+ final int messagesToSend = 9;
final String producerName = "producerForExclusive";
@Cleanup
final Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.producerName(producerName)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
@@ -132,7 +130,7 @@
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
assertNotNull(messageId);
}
- receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
+ receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend - 3);
closeConsumers(consumerList);
log.info("-- Exiting {} test --", methodName);
}
@@ -144,27 +142,21 @@
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
- List<Consumer<String>> consumerList = new ArrayList<>(3);
+ List<Consumer<String>> consumerList = new ArrayList<>(2);
final Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Failover)
.subscribe();
consumerList.add(consumer);
- Exception exception = null;
- Consumer<String> standbyConsumer = null;
- try {
- standbyConsumer = client.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- .subscriptionType(SubscriptionType.Failover)
- .subscribe();
- } catch (PulsarClientException e) {
- exception = e;
- }
- assertNull(exception);
+ final Consumer<String> standbyConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
assertNotNull(standbyConsumer);
assertTrue(standbyConsumer.isConnected());
+ consumerList.add(standbyConsumer);
final int messagesToSend = 10;
final String producerName = "producerForFailover";
@Cleanup
@@ -181,11 +173,12 @@
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
// To simulate a consumer crashed
Consumer<String> crashedConsumer = consumerList.remove(0);
+ // wait ack send
+ Thread.sleep(3000);
crashedConsumer.close();
- consumerList.add(standbyConsumer);
for (int i = 0; i < messagesToSend; i++) {
- MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
}
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
@@ -201,30 +194,22 @@
.serviceUrl(serviceUrl)
.build();
List<Consumer<String>> consumerList = new ArrayList<>(3);
- for (int i = 0; i < partitions; i++) {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- .subscriptionType(SubscriptionType.Failover)
- .subscribe();
- consumerList.add(consumer);
- }
- assertEquals(partitions, consumerList.size());
- Exception exception = null;
- Consumer<String> standbyConsumer = null;
- try {
- standbyConsumer = client.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- .subscriptionType(SubscriptionType.Failover)
- .subscribe();
- } catch (PulsarClientException e) {
- exception = e;
- }
- assertNull(exception);
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+ consumerList.add(consumer);
+ Consumer<String> standbyConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
assertNotNull(standbyConsumer);
assertTrue(standbyConsumer.isConnected());
- final int messagesToSend = 10;
+ consumerList.add(standbyConsumer);
+ assertEquals(consumerList.size(), 2);
+ final int messagesToSend = 9;
final String producerName = "producerForFailover";
@Cleanup
final Producer<String> producer = client.newProducer(Schema.STRING)
@@ -240,8 +225,9 @@
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
// To simulate a consumer crashed
Consumer<String> crashedConsumer = consumerList.remove(0);
+ // wait ack send
+ Thread.sleep(3000);
crashedConsumer.close();
- consumerList.add(standbyConsumer);
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
assertNotNull(messageId);
@@ -265,18 +251,11 @@
.subscriptionType(SubscriptionType.Shared)
.subscribe();
consumerList.add(consumer);
- Exception exception = null;
- Consumer<String> moreConsumer = null;
- try {
- moreConsumer = client.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- } catch (PulsarClientException e) {
- exception = e;
- }
- assertNull(exception);
+ Consumer<String> moreConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
assertNotNull(moreConsumer);
assertTrue(moreConsumer.isConnected());
consumerList.add(moreConsumer);
@@ -324,20 +303,6 @@
consumerList.add(consumer);
}
assertEquals(partitions, consumerList.size());
- Exception exception = null;
- Consumer<String> moreConsumer = null;
- try {
- moreConsumer = client.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- } catch (PulsarClientException e) {
- exception = e;
- }
- assertNull(exception);
- assertNotNull(moreConsumer);
- assertTrue(moreConsumer.isConnected());
final int messagesToSend = 10;
final String producerName = "producerForFailover";
@Cleanup
@@ -371,25 +336,19 @@
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
- List<Consumer<String>> consumerList = new ArrayList<>(4);
- final Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ List<Consumer<String>> consumerList = new ArrayList<>(2);
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
+ assertTrue(consumer.isConnected());
consumerList.add(consumer);
- Exception exception = null;
- Consumer<String> moreConsumer = null;
- try {
- moreConsumer = client.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- .subscriptionType(SubscriptionType.Key_Shared)
- .subscribe();
- } catch (PulsarClientException e) {
- exception = e;
- }
- assertNull(exception);
+ Consumer<String> moreConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
assertNotNull(moreConsumer);
assertTrue(moreConsumer.isConnected());
consumerList.add(moreConsumer);
@@ -408,7 +367,7 @@
.send();
assertNotNull(messageId);
}
- log.info("public messages complete.");
+ log.info("publish messages complete.");
receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
// To simulate a consumer crashed
Consumer<String> crashedConsumer = consumerList.remove(0);
@@ -433,30 +392,22 @@
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
- List<Consumer<String>> consumerList = new ArrayList<>(3);
- for (int i = 0; i < partitions; i++) {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- .subscriptionType(SubscriptionType.Key_Shared)
- .subscribe();
- consumerList.add(consumer);
- }
- assertEquals(partitions, consumerList.size());
- Exception exception = null;
- Consumer<String> moreConsumer = null;
- try {
- moreConsumer = client.newConsumer(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- .subscriptionType(SubscriptionType.Key_Shared)
- .subscribe();
- } catch (PulsarClientException e) {
- exception = e;
- }
- assertNull(exception);
+ List<Consumer<String>> consumerList = new ArrayList<>(2);
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ assertTrue(consumer.isConnected());
+ consumerList.add(consumer);
+ Consumer<String> moreConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
assertNotNull(moreConsumer);
assertTrue(moreConsumer.isConnected());
+ consumerList.add(moreConsumer);
final int messagesToSend = 10;
final String producerName = "producerForKeyShared";
@Cleanup
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
new file mode 100644
index 0000000..dcd9b71
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -0,0 +1,29 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar (Messaging) Integration Tests" verbose="2" annotations="JDK">
+ <test name="messaging-test-suite" preserve-order="true">
+ <classes>
+ <class name="org.apache.pulsar.tests.integration.messaging.PersistentTopicMessagingTest" />
+ <class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
+ </classes>
+ </test>
+</suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar.xml b/tests/integration/src/test/resources/pulsar.xml
index 6bf67b6..468d328 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -30,5 +30,6 @@
<suite-file path="./tiered-jcloud-storage.xml" />
<suite-file path="./tiered-filesystem-storage.xml"/>
<suite-file path="./pulsar-function-state.xml" />
+ <suite-file path="./pulsar-messaging.xml" />
</suite-files>
</suite>