Fix the wrong multi-topic has message available behavior (#13634)
Fixes #13605
### Motivation
Currently, the multiTopicReader `hasMessageAvailable` might get the wrong result, we must check `numMessagesInQueue() > 0` again after finish all consumer `hasMessageAvaliableAsync` future, bacause some message might already in `MultiTopicsConsumerImpl#incomingMessages`.
### Modifications
* Fix the wrong multi-topic has message available behavior.
* Use `reader.readNextAsync()` instead of block method `reader.readNext()`.
* Reduce the units test running time by changing `MultiTopicsReaderTest` to use `@BeforeClass`, `@AfterClass`.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index f6230e2..6b6bf95 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -61,17 +61,17 @@
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-@Test(groups = "flaky")
@Slf4j
+@Test(groups = "flaky")
public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
private static final String subscription = "reader-multi-topics-sub";
- @BeforeMethod(alwaysRun = true)
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -87,7 +87,7 @@
admin.namespaces().createNamespace("my-property/my-ns", policies);
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
@@ -169,15 +169,15 @@
private static <T> void readMessageUseAsync(Reader<T> reader, List<Message<T>> msgs, CountDownLatch latch) {
reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> {
if (hasMessageAvailable) {
- try {
- Message<T> msg = reader.readNext();
+ reader.readNextAsync().whenComplete((msg, ex) -> {
+ if (ex != null) {
+ log.error("Read message failed.", ex);
+ latch.countDown();
+ return;
+ }
msgs.add(msg);
- } catch (PulsarClientException e) {
- log.error("Read message failed.", e);
- latch.countDown();
- return;
- }
- readMessageUseAsync(reader, msgs, latch);
+ readMessageUseAsync(reader, msgs, latch);
+ });
} else {
latch.countDown();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 44739ef..c758bdd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -782,7 +782,7 @@
if (exception != null) {
completableFuture.completeExceptionally(exception);
} else {
- completableFuture.complete(hasMessageAvailable.get());
+ completableFuture.complete(hasMessageAvailable.get() || numMessagesInQueue() > 0);
}
});
return completableFuture;