[fix][client] Fix messages in the batch container timed out unexpectedly (#21889)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
index f05f735..240d8d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -20,22 +20,32 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
* Different with {@link org.apache.pulsar.client.api.SimpleProducerConsumerTest}, this class can visit the variables
- * of {@link ConsumerImpl} which are modified `protected`.
+ * of {@link ConsumerImpl} or {@link ProducerImpl} which have protected or default access modifiers.
*/
-@Test(groups = "broker-api")
+@Slf4j
+@Test(groups = "broker-impl")
public class ProducerConsumerInternalTest extends ProducerConsumerBase {
@BeforeClass(alwaysRun = true)
@@ -144,4 +154,36 @@
consumer.close();
admin.topics().delete(topicName, false);
}
+
+ @DataProvider(name = "containerBuilder")
+ public Object[][] containerBuilderProvider() {
+ return new Object[][] {
+ { BatcherBuilder.DEFAULT },
+ { BatcherBuilder.KEY_BASED }
+ };
+ }
+
+ @Test(timeOut = 30000, dataProvider = "containerBuilder")
+ public void testSendTimerCheckForBatchContainer(BatcherBuilder batcherBuilder) throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .batcherBuilder(batcherBuilder)
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+ .batchingMaxMessages(1000)
+ .create();
+
+ log.info("Before sendAsync msg-0: {}", System.nanoTime());
+ CompletableFuture<MessageId> future = producer.sendAsync("msg-0".getBytes());
+ future.thenAccept(msgId -> log.info("msg-0 done: {} (msgId: {})", System.nanoTime(), msgId));
+ future.get(); // t: the current time point
+
+ ((ProducerImpl<byte[]>) producer).triggerSendTimer(); // t+1000ms && t+2000ms: run() will be called again
+
+ Thread.sleep(1950); // t+2050ms: the batch timer is expired, which happens after run() is called
+ log.info("Before sendAsync msg-1: {}", System.nanoTime());
+ future = producer.sendAsync("msg-1".getBytes());
+ future.thenAccept(msgId -> log.info("msg-1 done: {} (msgId: {})", System.nanoTime(), msgId));
+ future.get();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index e81365d..8c17d8f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -53,6 +53,7 @@
// allocate a new buffer that can hold the entire batch without needing costly reallocations
protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
protected int maxMessagesNum = INITIAL_MESSAGES_NUM;
+ private volatile long firstAddedTimestamp = 0L;
@Override
public boolean haveEnoughSpace(MessageImpl<?> msg) {
@@ -127,4 +128,19 @@
return currentTxnidMostBits == msg.getMessageBuilder().getTxnidMostBits()
&& currentTxnidLeastBits == msg.getMessageBuilder().getTxnidLeastBits();
}
+
+ @Override
+ public long getFirstAddedTimestamp() {
+ return firstAddedTimestamp;
+ }
+
+ protected void tryUpdateTimestamp() {
+ if (numMessagesInBatch == 1) {
+ firstAddedTimestamp = System.nanoTime();
+ }
+ }
+
+ protected void clearTimestamp() {
+ firstAddedTimestamp = 0L;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
index 8fb4e9f..ddbe1bc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
@@ -82,4 +82,11 @@
* @return belong to the same txn or not
*/
boolean hasSameTxn(MessageImpl<?> msg);
+
+ /**
+ * Get the timestamp in nanoseconds when the 1st message is added into the batch container.
+ *
+ * @return the timestamp in nanoseconds or 0L if the batch container is empty
+ */
+ long getFirstAddedTimestamp();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index dfcbc42..bf8c1f9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -127,6 +127,7 @@
previousCallback = callback;
currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
messages.add(msg);
+ tryUpdateTimestamp();
if (lowestSequenceId == -1L) {
lowestSequenceId = msg.getSequenceId();
@@ -203,6 +204,7 @@
@Override
public void clear() {
+ clearTimestamp();
messages = new ArrayList<>(maxMessagesNum);
firstCallback = null;
previousCallback = null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index e2ce9e2..1592d3c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -57,11 +57,13 @@
numMessagesInBatch++;
currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
}
+ tryUpdateTimestamp();
return isBatchFull();
}
@Override
public void clear() {
+ clearTimestamp();
numMessagesInBatch = 0;
currentBatchSizeBytes = 0;
batches.clear();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 2763da5..4908d10 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1983,6 +1983,11 @@
return producerName;
}
+ @VisibleForTesting
+ void triggerSendTimer() throws Exception {
+ run(sendTimeout);
+ }
+
/**
* Process sendTimeout events.
*/
@@ -2001,7 +2006,8 @@
}
OpSendMsg firstMsg = pendingMessages.peek();
- if (firstMsg == null && (batchMessageContainer == null || batchMessageContainer.isEmpty())) {
+ if (firstMsg == null && (batchMessageContainer == null || batchMessageContainer.isEmpty()
+ || batchMessageContainer.getFirstAddedTimestamp() == 0L)) {
// If there are no pending messages, reset the timeout to the configured value.
timeToWaitMs = conf.getSendTimeoutMs();
} else {
@@ -2011,7 +2017,7 @@
} else {
// Because we don't flush batch messages while disconnected, we consider them "createdAt" when
// they would have otherwise been flushed.
- createdAt = lastBatchSendNanoTime
+ createdAt = batchMessageContainer.getFirstAddedTimestamp()
+ TimeUnit.MICROSECONDS.toNanos(conf.getBatchingMaxPublishDelayMicros());
}
// If there is at least one message, calculate the diff between the message timeout and the elapsed