revert #2590 (#3110)
In PR #2590, we made a wrong change, to make redelivery happened more early.
This PR Try to revert change of commit 0ab2325fa33231f1c69782e081483012467dfca.
revert #2590
Worked as before 2590, Ut pass.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index dfeb4af..2c94966 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -217,6 +217,7 @@
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < numMessages; i++) {
future_msg = consumer.receiveAsync();
+ Thread.sleep(10);
msg = future_msg.get();
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 5af0088..dc3f5bc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -25,7 +25,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -2663,50 +2662,4 @@
assertEquals(latch.getCount(), 1);
consumer.close();
}
-
- /**
- * Ack timeout message is redelivered on time.
- * Related github issue #2584
- */
- @Test
- public void testAckTimeoutRedeliver() throws Exception {
- log.info("-- Starting {} test --", methodName);
-
- // create consumer and producer
- ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
- .topic("persistent://my-property/my-ns/ack-timeout-topic")
- .subscriptionName("subscriber-1")
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscriptionType(SubscriptionType.Shared)
- .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
- .subscribe();
-
- Producer<byte[]> producer = pulsarClient.newProducer()
- .topic("persistent://my-property/my-ns/ack-timeout-topic")
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
-
- // (1) Produced one Message
- String content = "my-message-will-ack-timeout";
- producer.send(content.getBytes());
-
- // (2) consumer to receive messages, and not ack
- Message<byte[]> message = consumer.receive();
-
- // (3) should be re-delivered once ack-timeout.
- Thread.sleep(1000);
- message = consumer.receive(200, TimeUnit.MILLISECONDS);
- assertNotNull(message);
-
- Thread.sleep(1000);
- message = consumer.receive(200, TimeUnit.MILLISECONDS);
- assertNotNull(message);
-
- assertEquals(content, new String(message.getData()));
-
- producer.close();
- consumer.close();
- log.info("-- Exiting {} test --", methodName);
- }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index e178feb..da53760 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -235,17 +235,16 @@
private static int receiveAllMessage(Consumer<?> consumer, boolean ackMessages) throws Exception {
int messagesReceived = 0;
- Message<?> msg = consumer.receive(200, TimeUnit.MILLISECONDS);
+ Message<?> msg = consumer.receive(1, TimeUnit.SECONDS);
while (msg != null) {
++messagesReceived;
- log.info("Consumer {} received {}", consumer.getConsumerName(), new String(msg.getData()));
+ log.info("Consumer received {}", new String(msg.getData()));
if (ackMessages) {
consumer.acknowledge(msg);
- log.info("Consumer {} acknowledged {}", consumer.getConsumerName(), new String(msg.getData()));
}
- msg = consumer.receive(200, TimeUnit.MILLISECONDS);
+ msg = consumer.receive(1, TimeUnit.SECONDS);
}
return messagesReceived;
@@ -284,31 +283,56 @@
}
// 4. Receive messages
+ Message<byte[]> message1 = consumer1.receive();
+ Message<byte[]> message2 = consumer2.receive();
int messageCount1 = 0;
int messageCount2 = 0;
-
- messageCount1 += receiveAllMessage(consumer1, false);
- messageCount2 += receiveAllMessage(consumer2, true);
-
+ int ackCount1 = 0;
+ int ackCount2 = 0;
+ do {
+ if (message1 != null) {
+ log.info("Consumer1 received " + new String(message1.getData()));
+ messageCount1 += 1;
+ }
+ if (message2 != null) {
+ log.info("Consumer2 received " + new String(message2.getData()));
+ messageCount2 += 1;
+ consumer2.acknowledge(message2);
+ ackCount2 += 1;
+ }
+ message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
+ message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
+ } while (message1 != null || message2 != null);
log.info(key + " messageCount1 = " + messageCount1);
log.info(key + " messageCount2 = " + messageCount2);
-
+ log.info(key + " ackCount1 = " + ackCount1);
+ log.info(key + " ackCount2 = " + ackCount2);
assertEquals(messageCount1 + messageCount2, totalMessages);
- Thread.sleep((int) (ackTimeOutMillis * 1.1));
-
// 5. Check if Messages redelivered again
// Since receive is a blocking call hoping that timeout will kick in
log.info(key + " Timeout should be triggered now");
+ message1 = consumer1.receive();
messageCount1 = 0;
-
- messageCount1 += receiveAllMessage(consumer1, true);
- messageCount2 += receiveAllMessage(consumer2, false);
-
+ do {
+ if (message1 != null) {
+ log.info("Consumer1 received " + new String(message1.getData()));
+ messageCount1 += 1;
+ consumer1.acknowledge(message1);
+ ackCount1 += 1;
+ }
+ if (message2 != null) {
+ log.info("Consumer2 received " + new String(message2.getData()));
+ messageCount2 += 1;
+ }
+ message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
+ message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
+ } while (message1 != null || message2 != null);
log.info(key + " messageCount1 = " + messageCount1);
log.info(key + " messageCount2 = " + messageCount2);
-
- assertEquals(messageCount1 + messageCount2, totalMessages);
+ log.info(key + " ackCount1 = " + ackCount1);
+ log.info(key + " ackCount2 = " + ackCount2);
+ assertEquals(ackCount1 + messageCount2, totalMessages);
}
@Test
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 504de14..266eb3b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -87,7 +87,6 @@
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
- toggle();
if (isAckTimeout()) {
log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size());
Set<MessageId> messageIds = new HashSet<>();
@@ -95,6 +94,7 @@
oldOpenSet.clear();
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
+ toggle();
timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS);
}
}, ackTimeoutMillis, TimeUnit.MILLISECONDS);