[fix][broker] Record GeoPersistentReplicator.msgOut before producer#sendAsync (#21673)

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index 0888298..b8287dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -149,9 +149,6 @@
                 }
 
                 dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));
-
-                msgOut.recordEvent(headersAndPayload.readableBytes());
-
                 msg.setReplicatedFrom(localCluster);
 
                 headersAndPayload.retain();
@@ -181,6 +178,7 @@
                     msg.setSchemaInfoForReplicator(schemaFuture.get());
                     msg.getMessageBuilder().clearTxnidMostBits();
                     msg.getMessageBuilder().clearTxnidLeastBits();
+                    msgOut.recordEvent(headersAndPayload.readableBytes());
                     // Increment pending messages for messages produced locally
                     PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                     producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));