[fix][client] Fix messages in the batch container timed out unexpectedly (#21889)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
index f05f735..240d8d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -20,22 +20,32 @@
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
  * Different with {@link org.apache.pulsar.client.api.SimpleProducerConsumerTest}, this class can visit the variables
- * of {@link ConsumerImpl} which are modified `protected`.
+ * of {@link ConsumerImpl} or {@link ProducerImpl} which have protected or default access modifiers.
  */
-@Test(groups = "broker-api")
+@Slf4j
+@Test(groups = "broker-impl")
 public class ProducerConsumerInternalTest extends ProducerConsumerBase {
 
     @BeforeClass(alwaysRun = true)
@@ -144,4 +154,36 @@
         consumer.close();
         admin.topics().delete(topicName, false);
     }
+
+    @DataProvider(name = "containerBuilder")
+    public Object[][] containerBuilderProvider() {
+        return new Object[][] {
+                { BatcherBuilder.DEFAULT },
+                { BatcherBuilder.KEY_BASED }
+        };
+    }
+
+    @Test(timeOut = 30000, dataProvider = "containerBuilder")
+    public void testSendTimerCheckForBatchContainer(BatcherBuilder batcherBuilder) throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .batcherBuilder(batcherBuilder)
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+                .batchingMaxMessages(1000)
+                .create();
+
+        log.info("Before sendAsync msg-0: {}", System.nanoTime());
+        CompletableFuture<MessageId> future = producer.sendAsync("msg-0".getBytes());
+        future.thenAccept(msgId -> log.info("msg-0 done: {} (msgId: {})", System.nanoTime(), msgId));
+        future.get(); // t: the current time point
+
+        ((ProducerImpl<byte[]>) producer).triggerSendTimer(); // t+1000ms && t+2000ms: run() will be called again
+
+        Thread.sleep(1950); // t+2050ms: the batch timer is expired, which happens after run() is called
+        log.info("Before sendAsync msg-1: {}", System.nanoTime());
+        future = producer.sendAsync("msg-1".getBytes());
+        future.thenAccept(msgId -> log.info("msg-1 done: {} (msgId: {})", System.nanoTime(), msgId));
+        future.get();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index e81365d..8c17d8f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -53,6 +53,7 @@
     // allocate a new buffer that can hold the entire batch without needing costly reallocations
     protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
     protected int maxMessagesNum = INITIAL_MESSAGES_NUM;
+    private volatile long firstAddedTimestamp = 0L;
 
     @Override
     public boolean haveEnoughSpace(MessageImpl<?> msg) {
@@ -127,4 +128,19 @@
         return currentTxnidMostBits == msg.getMessageBuilder().getTxnidMostBits()
                 && currentTxnidLeastBits == msg.getMessageBuilder().getTxnidLeastBits();
     }
+
+    @Override
+    public long getFirstAddedTimestamp() {
+        return firstAddedTimestamp;
+    }
+
+    protected void tryUpdateTimestamp() {
+        if (numMessagesInBatch == 1) {
+            firstAddedTimestamp = System.nanoTime();
+        }
+    }
+
+    protected void clearTimestamp() {
+        firstAddedTimestamp = 0L;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
index 8fb4e9f..ddbe1bc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
@@ -82,4 +82,11 @@
      * @return belong to the same txn or not
      */
     boolean hasSameTxn(MessageImpl<?> msg);
+
+    /**
+     * Get the timestamp in nanoseconds when the 1st message is added into the batch container.
+     *
+     * @return the timestamp in nanoseconds or 0L if the batch container is empty
+     */
+    long getFirstAddedTimestamp();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index dfcbc42..bf8c1f9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -127,6 +127,7 @@
         previousCallback = callback;
         currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
         messages.add(msg);
+        tryUpdateTimestamp();
 
         if (lowestSequenceId == -1L) {
             lowestSequenceId = msg.getSequenceId();
@@ -203,6 +204,7 @@
 
     @Override
     public void clear() {
+        clearTimestamp();
         messages = new ArrayList<>(maxMessagesNum);
         firstCallback = null;
         previousCallback = null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index e2ce9e2..1592d3c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -57,11 +57,13 @@
             numMessagesInBatch++;
             currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
         }
+        tryUpdateTimestamp();
         return isBatchFull();
     }
 
     @Override
     public void clear() {
+        clearTimestamp();
         numMessagesInBatch = 0;
         currentBatchSizeBytes = 0;
         batches.clear();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 2763da5..4908d10 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1983,6 +1983,11 @@
         return producerName;
     }
 
+    @VisibleForTesting
+    void triggerSendTimer() throws Exception {
+        run(sendTimeout);
+    }
+
     /**
      * Process sendTimeout events.
      */
@@ -2001,7 +2006,8 @@
             }
 
             OpSendMsg firstMsg = pendingMessages.peek();
-            if (firstMsg == null && (batchMessageContainer == null || batchMessageContainer.isEmpty())) {
+            if (firstMsg == null && (batchMessageContainer == null || batchMessageContainer.isEmpty()
+                    || batchMessageContainer.getFirstAddedTimestamp() == 0L)) {
                 // If there are no pending messages, reset the timeout to the configured value.
                 timeToWaitMs = conf.getSendTimeoutMs();
             } else {
@@ -2011,7 +2017,7 @@
                 } else {
                     // Because we don't flush batch messages while disconnected, we consider them "createdAt" when
                     // they would have otherwise been flushed.
-                    createdAt = lastBatchSendNanoTime
+                    createdAt = batchMessageContainer.getFirstAddedTimestamp()
                             + TimeUnit.MICROSECONDS.toNanos(conf.getBatchingMaxPublishDelayMicros());
                 }
                 // If there is at least one message, calculate the diff between the message timeout and the elapsed