[integration-tests]Enable messaging tests to integration tests. (#5456)

* Enable messaging tests to integration tests.

* Fix integration tests
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
index 0043dc2..7ddd082 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
@@ -32,6 +32,7 @@
 import org.testng.annotations.BeforeMethod;
 
 import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -69,8 +70,8 @@
             (List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
         Set<T> messagesReceived = Sets.newHashSet();
         for (Consumer<T> consumer : consumerList) {
-            Message<T> currentReceived = null;
-            Message<T> lastReceived = null;
+            Message<T> currentReceived;
+            Map<String, Message<T>> lastReceivedMap = new HashMap<>();
             while (true) {
                 try {
                     currentReceived = consumer.receive(3, TimeUnit.SECONDS);
@@ -79,21 +80,21 @@
                     break;
                 }
                 // Make sure that messages are received in order
-                if (lastReceived == null) {
-                    assertNotNull(currentReceived);
+                if (currentReceived != null) {
+                    consumer.acknowledge(currentReceived);
+                    if (lastReceivedMap.containsKey(currentReceived.getTopicName())) {
+                        assertTrue(currentReceived.getMessageId().compareTo(
+                                lastReceivedMap.get(currentReceived.getTopicName()).getMessageId()) > 0,
+                                "Received messages are not in order.");
+                    }
                 } else {
-                    assertTrue(currentReceived != null
-                                    && (currentReceived.getValue().compareTo(lastReceived.getValue()) > 0),
-                            "Received messages are not in order.");
+                    break;
                 }
-                lastReceived = currentReceived;
+                lastReceivedMap.put(currentReceived.getTopicName(), currentReceived);
                 // Make sure that there are no duplicates
                 assertTrue(messagesReceived.add(currentReceived.getValue()),
                         "Received duplicate message " + currentReceived.getValue());
             }
-            if (currentReceived != null) {
-                consumer.acknowledgeCumulative(currentReceived);
-            }
         }
         assertEquals(messagesReceived.size(), messagesToReceive);
     }
@@ -110,12 +111,14 @@
                     log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
                     break;
                 }
-                // Make sure that there are no duplicates
-                assertTrue(messagesReceived.add(currentReceived.getValue()),
-                        "Received duplicate message " + currentReceived.getValue());
-            }
-            if (currentReceived != null) {
-                consumer.acknowledgeCumulative(currentReceived);
+                if (currentReceived != null) {
+                    consumer.acknowledge(currentReceived);
+                    // Make sure that there are no duplicates
+                    assertTrue(messagesReceived.add(currentReceived.getValue()),
+                            "Received duplicate message " + currentReceived.getValue());
+                } else {
+                    break;
+                }
             }
         }
         assertEquals(messagesReceived.size(), messagesToReceive);
@@ -126,7 +129,7 @@
         Map<String, Set<String>> consumerKeys = Maps.newHashMap();
         Set<T> messagesReceived = Sets.newHashSet();
         for (Consumer<T> consumer : consumerList) {
-            Message<T> currentReceived = null;
+            Message<T> currentReceived;
             while (true) {
                 try {
                     currentReceived = consumer.receive(3, TimeUnit.SECONDS);
@@ -134,15 +137,17 @@
                     log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
                     break;
                 }
-                assertNotNull(currentReceived.getKey());
-                consumerKeys.putIfAbsent(consumer.getConsumerName(), Sets.newHashSet());
-                consumerKeys.get(consumer.getConsumerName()).add(currentReceived.getKey());
-                // Make sure that there are no duplicates
-                assertTrue(messagesReceived.add(currentReceived.getValue()),
-                        "Received duplicate message " + currentReceived.getValue());
-            }
-            if (currentReceived != null) {
-                consumer.acknowledgeCumulative(currentReceived);
+                if (currentReceived != null) {
+                    consumer.acknowledge(currentReceived);
+                    assertNotNull(currentReceived.getKey());
+                    consumerKeys.putIfAbsent(consumer.getConsumerName(), Sets.newHashSet());
+                    consumerKeys.get(consumer.getConsumerName()).add(currentReceived.getKey());
+                    // Make sure that there are no duplicates
+                    assertTrue(messagesReceived.add(currentReceived.getValue()),
+                            "Received duplicate message " + currentReceived.getValue());
+                } else {
+                    break;
+                }
             }
         }
         // Make sure key will not be distributed to multiple consumers
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
index 7fb1fda..416896f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
@@ -56,11 +56,11 @@
 
     @Test(dataProvider = "ServiceUrls")
     public void testNonPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception {
-        nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true);
+        nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl, false);
     }
 
     @Test(dataProvider = "ServiceUrls")
     public void testPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception {
-        partitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true);
+        partitionedTopicSendAndReceiveWithKeyShared(serviceUrl, false);
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java
index 26097cf..2ed30f9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java
@@ -20,13 +20,14 @@
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -54,17 +55,15 @@
                 .subscriptionName("test-sub")
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscribe();
-        Exception exception = null;
         try {
             client.newConsumer(Schema.STRING)
                     .topic(topicName)
                     .subscriptionName("test-sub")
                     .subscriptionType(SubscriptionType.Exclusive)
                     .subscribe();
-        } catch (PulsarClientException e) {
-            exception = e;
+            fail("should be failed");
+        } catch (PulsarClientException ignore) {
         }
-        assertNotNull(exception);
         final int messagesToSend = 10;
         final String producerName = "producerForExclusive";
         @Cleanup
@@ -93,31 +92,30 @@
         List<Consumer<String>> consumerList = new ArrayList<>(3);
         for (int i = 0; i < partitions; i++) {
             Consumer<String> consumer = client.newConsumer(Schema.STRING)
-                    .topic(topicName)
+                    .topic(topicName + "-partition-" + i)
                     .subscriptionName("test-sub")
                     .subscriptionType(SubscriptionType.Exclusive)
                     .subscribe();
             consumerList.add(consumer);
         }
         assertEquals(partitions, consumerList.size());
-        Exception exception = null;
         try {
             client.newConsumer(Schema.STRING)
-                    .topic(topicName)
+                    .topic(topicName + "-partition-" + 0)
                     .subscriptionName("test-sub")
                     .subscriptionType(SubscriptionType.Exclusive)
                     .subscribe();
-        } catch (PulsarClientException e) {
-            exception = e;
+            fail("should be failed");
+        } catch (PulsarClientException ignore) {
         }
-        assertNotNull(exception);
-        final int messagesToSend = 10;
+        final int messagesToSend = 9;
         final String producerName = "producerForExclusive";
         @Cleanup
         final Producer<String> producer = client.newProducer(Schema.STRING)
                 .topic(topicName)
                 .enableBatching(false)
                 .producerName(producerName)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                 .create();
         for (int i = 0; i < messagesToSend; i++) {
             MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
@@ -132,7 +130,7 @@
             MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
             assertNotNull(messageId);
         }
-        receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
+        receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend - 3);
         closeConsumers(consumerList);
         log.info("-- Exiting {} test --", methodName);
     }
@@ -144,27 +142,21 @@
         final PulsarClient client = PulsarClient.builder()
                 .serviceUrl(serviceUrl)
                 .build();
-        List<Consumer<String>> consumerList = new ArrayList<>(3);
+        List<Consumer<String>> consumerList = new ArrayList<>(2);
         final Consumer<String> consumer = client.newConsumer(Schema.STRING)
                 .topic(topicName)
                 .subscriptionName("test-sub")
                 .subscriptionType(SubscriptionType.Failover)
                 .subscribe();
         consumerList.add(consumer);
-        Exception exception = null;
-        Consumer<String> standbyConsumer = null;
-        try {
-            standbyConsumer = client.newConsumer(Schema.STRING)
-                    .topic(topicName)
-                    .subscriptionName("test-sub")
-                    .subscriptionType(SubscriptionType.Failover)
-                    .subscribe();
-        } catch (PulsarClientException e) {
-            exception = e;
-        }
-        assertNull(exception);
+        final Consumer<String> standbyConsumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-sub")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe();
         assertNotNull(standbyConsumer);
         assertTrue(standbyConsumer.isConnected());
+        consumerList.add(standbyConsumer);
         final int messagesToSend = 10;
         final String producerName = "producerForFailover";
         @Cleanup
@@ -181,11 +173,12 @@
         receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
         // To simulate a consumer crashed
         Consumer<String> crashedConsumer = consumerList.remove(0);
+        // wait ack send
+        Thread.sleep(3000);
         crashedConsumer.close();
-        consumerList.add(standbyConsumer);
         for (int i = 0; i < messagesToSend; i++) {
-            MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
-            assertNotNull(messageId);
+                MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+                assertNotNull(messageId);
         }
         receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
         closeConsumers(consumerList);
@@ -201,30 +194,22 @@
                 .serviceUrl(serviceUrl)
                 .build();
         List<Consumer<String>> consumerList = new ArrayList<>(3);
-        for (int i = 0; i < partitions; i++) {
-            Consumer<String> consumer = client.newConsumer(Schema.STRING)
-                    .topic(topicName)
-                    .subscriptionName("test-sub")
-                    .subscriptionType(SubscriptionType.Failover)
-                    .subscribe();
-            consumerList.add(consumer);
-        }
-        assertEquals(partitions, consumerList.size());
-        Exception exception = null;
-        Consumer<String> standbyConsumer = null;
-        try {
-            standbyConsumer = client.newConsumer(Schema.STRING)
-                    .topic(topicName)
-                    .subscriptionName("test-sub")
-                    .subscriptionType(SubscriptionType.Failover)
-                    .subscribe();
-        } catch (PulsarClientException e) {
-            exception = e;
-        }
-        assertNull(exception);
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-sub")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe();
+        consumerList.add(consumer);
+        Consumer<String> standbyConsumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-sub")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe();
         assertNotNull(standbyConsumer);
         assertTrue(standbyConsumer.isConnected());
-        final int messagesToSend = 10;
+        consumerList.add(standbyConsumer);
+        assertEquals(consumerList.size(), 2);
+        final int messagesToSend = 9;
         final String producerName = "producerForFailover";
         @Cleanup
         final Producer<String> producer = client.newProducer(Schema.STRING)
@@ -240,8 +225,9 @@
         receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
         // To simulate a consumer crashed
         Consumer<String> crashedConsumer = consumerList.remove(0);
+        // wait ack send
+        Thread.sleep(3000);
         crashedConsumer.close();
-        consumerList.add(standbyConsumer);
         for (int i = 0; i < messagesToSend; i++) {
             MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
             assertNotNull(messageId);
@@ -265,18 +251,11 @@
                 .subscriptionType(SubscriptionType.Shared)
                 .subscribe();
         consumerList.add(consumer);
-        Exception exception = null;
-        Consumer<String> moreConsumer = null;
-        try {
-            moreConsumer = client.newConsumer(Schema.STRING)
-                    .topic(topicName)
-                    .subscriptionName("test-sub")
-                    .subscriptionType(SubscriptionType.Shared)
-                    .subscribe();
-        } catch (PulsarClientException e) {
-            exception = e;
-        }
-        assertNull(exception);
+        Consumer<String> moreConsumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
         assertNotNull(moreConsumer);
         assertTrue(moreConsumer.isConnected());
         consumerList.add(moreConsumer);
@@ -324,20 +303,6 @@
             consumerList.add(consumer);
         }
         assertEquals(partitions, consumerList.size());
-        Exception exception = null;
-        Consumer<String> moreConsumer = null;
-        try {
-            moreConsumer = client.newConsumer(Schema.STRING)
-                    .topic(topicName)
-                    .subscriptionName("test-sub")
-                    .subscriptionType(SubscriptionType.Shared)
-                    .subscribe();
-        } catch (PulsarClientException e) {
-            exception = e;
-        }
-        assertNull(exception);
-        assertNotNull(moreConsumer);
-        assertTrue(moreConsumer.isConnected());
         final int messagesToSend = 10;
         final String producerName = "producerForFailover";
         @Cleanup
@@ -371,25 +336,19 @@
         final PulsarClient client = PulsarClient.builder()
                 .serviceUrl(serviceUrl)
                 .build();
-        List<Consumer<String>> consumerList = new ArrayList<>(4);
-        final Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        List<Consumer<String>> consumerList = new ArrayList<>(2);
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                 .topic(topicName)
                 .subscriptionName("test-sub")
                 .subscriptionType(SubscriptionType.Key_Shared)
                 .subscribe();
+        assertTrue(consumer.isConnected());
         consumerList.add(consumer);
-        Exception exception = null;
-        Consumer<String> moreConsumer = null;
-        try {
-            moreConsumer = client.newConsumer(Schema.STRING)
-                    .topic(topicName)
-                    .subscriptionName("test-sub")
-                    .subscriptionType(SubscriptionType.Key_Shared)
-                    .subscribe();
-        } catch (PulsarClientException e) {
-            exception = e;
-        }
-        assertNull(exception);
+        Consumer<String> moreConsumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-sub")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
         assertNotNull(moreConsumer);
         assertTrue(moreConsumer.isConnected());
         consumerList.add(moreConsumer);
@@ -408,7 +367,7 @@
                     .send();
             assertNotNull(messageId);
         }
-        log.info("public messages complete.");
+        log.info("publish messages complete.");
         receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
         // To simulate a consumer crashed
         Consumer<String> crashedConsumer = consumerList.remove(0);
@@ -433,30 +392,22 @@
         final PulsarClient client = PulsarClient.builder()
                 .serviceUrl(serviceUrl)
                 .build();
-        List<Consumer<String>> consumerList = new ArrayList<>(3);
-        for (int i = 0; i < partitions; i++) {
-            Consumer<String> consumer = client.newConsumer(Schema.STRING)
-                    .topic(topicName)
-                    .subscriptionName("test-sub")
-                    .subscriptionType(SubscriptionType.Key_Shared)
-                    .subscribe();
-            consumerList.add(consumer);
-        }
-        assertEquals(partitions, consumerList.size());
-        Exception exception = null;
-        Consumer<String> moreConsumer = null;
-        try {
-            moreConsumer = client.newConsumer(Schema.STRING)
-                    .topic(topicName)
-                    .subscriptionName("test-sub")
-                    .subscriptionType(SubscriptionType.Key_Shared)
-                    .subscribe();
-        } catch (PulsarClientException e) {
-            exception = e;
-        }
-        assertNull(exception);
+        List<Consumer<String>> consumerList = new ArrayList<>(2);
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-sub")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        assertTrue(consumer.isConnected());
+        consumerList.add(consumer);
+        Consumer<String> moreConsumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-sub")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
         assertNotNull(moreConsumer);
         assertTrue(moreConsumer.isConnected());
+        consumerList.add(moreConsumer);
         final int messagesToSend = 10;
         final String producerName = "producerForKeyShared";
         @Cleanup
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
new file mode 100644
index 0000000..dcd9b71
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -0,0 +1,29 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar (Messaging) Integration Tests" verbose="2" annotations="JDK">
+    <test name="messaging-test-suite" preserve-order="true">
+        <classes>
+            <class name="org.apache.pulsar.tests.integration.messaging.PersistentTopicMessagingTest" />
+            <class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
+        </classes>
+    </test>
+</suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar.xml b/tests/integration/src/test/resources/pulsar.xml
index 6bf67b6..468d328 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -30,5 +30,6 @@
         <suite-file path="./tiered-jcloud-storage.xml" />
         <suite-file path="./tiered-filesystem-storage.xml"/>
         <suite-file path="./pulsar-function-state.xml" />
+        <suite-file path="./pulsar-messaging.xml" />
     </suite-files>
 </suite>