Fix the wrong multi-topic has message available behavior (#13634)

Fixes #13605

### Motivation

Currently, the multiTopicReader `hasMessageAvailable` might get the wrong result, we must check `numMessagesInQueue() > 0` again after finish all consumer `hasMessageAvaliableAsync` future, bacause some message might already in `MultiTopicsConsumerImpl#incomingMessages`. 

### Modifications

* Fix the wrong multi-topic has message available behavior.
* Use `reader.readNextAsync()` instead of block method `reader.readNext()`.
* Reduce the units test running time by changing `MultiTopicsReaderTest` to use `@BeforeClass`, `@AfterClass`.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index f6230e2..6b6bf95 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -61,17 +61,17 @@
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-@Test(groups = "flaky")
 @Slf4j
+@Test(groups = "flaky")
 public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
 
     private static final String subscription = "reader-multi-topics-sub";
 
-    @BeforeMethod(alwaysRun = true)
+    @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
@@ -87,7 +87,7 @@
         admin.namespaces().createNamespace("my-property/my-ns", policies);
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -169,15 +169,15 @@
     private static <T> void readMessageUseAsync(Reader<T> reader, List<Message<T>> msgs, CountDownLatch latch) {
         reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> {
             if (hasMessageAvailable) {
-                try {
-                    Message<T> msg = reader.readNext();
+                reader.readNextAsync().whenComplete((msg, ex) -> {
+                    if (ex != null) {
+                        log.error("Read message failed.", ex);
+                        latch.countDown();
+                        return;
+                    }
                     msgs.add(msg);
-                } catch (PulsarClientException e) {
-                    log.error("Read message failed.", e);
-                    latch.countDown();
-                    return;
-                }
-                readMessageUseAsync(reader, msgs, latch);
+                    readMessageUseAsync(reader, msgs, latch);
+                });
             } else {
                 latch.countDown();
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 44739ef..c758bdd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -782,7 +782,7 @@
             if (exception != null) {
                 completableFuture.completeExceptionally(exception);
             } else {
-                completableFuture.complete(hasMessageAvailable.get());
+                completableFuture.complete(hasMessageAvailable.get() || numMessagesInQueue() > 0);
             }
         });
         return completableFuture;