[fix][broker] Record GeoPersistentReplicator.msgOut before producer#sendAsync (#21673)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index 0888298..b8287dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -149,9 +149,6 @@
}
dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));
-
- msgOut.recordEvent(headersAndPayload.readableBytes());
-
msg.setReplicatedFrom(localCluster);
headersAndPayload.retain();
@@ -181,6 +178,7 @@
msg.setSchemaInfoForReplicator(schemaFuture.get());
msg.getMessageBuilder().clearTxnidMostBits();
msg.getMessageBuilder().clearTxnidLeastBits();
+ msgOut.recordEvent(headersAndPayload.readableBytes());
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));