[Managed Ledger] Fix the incorrect total size when BrokerEntryMetadata is enabled (#12714)

### Motivation

When the BrokerEntryMetadata is enabled, the total size in `ManagedLedgerImpl` is inaccurate. Because when the total size is updated in `OpAddEntry#safeRun`, the `dataLength` is the initial size of `data` when `OpAddEntry` is constructed, but `data` could be changed via `setData` method.

The inaccurate total size could affect the retention size validation. Because in `ManagedLedgerImpl#internalTrimLedgers`, the total size reduces by the size of `LedgerInfo`, which is assigned from the `LedgerHandle#getLength()`. Therefore, the total size will become 0 or less before all ledgers are removed.

### Modifications

- Update `dataLength` field in `setData` method.
- Add a `testManagedLedgerTotalSize` test to `BrokerEntryMetadataE2ETest`. It produces 10 messages and trigger the rollover manually so that the first `LedgerInfo` of the managed ledger contains the correct total bytes. Then compare the `totalSize` field with it to verify this fix works.

(cherry picked from commit 5dbb7d25849f3a037aa522b5d0767801aa0a5096)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 4f36d56..0b96b59 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -313,6 +313,7 @@
     }
 
     public void setData(ByteBuf data) {
+        this.dataLength = data.readableBytes();
         this.data = data;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index d2ea2f1..49b4742 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -18,18 +18,26 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+
+import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.assertj.core.util.Sets;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -354,4 +362,41 @@
         producer.close();
         consumer.close();
     }
+
+    @Test
+    public void testManagedLedgerTotalSize() throws Exception {
+        final String topic = newTopicName();
+        final int messages = 10;
+
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.lookups().lookupTopic(topic);
+        final ManagedLedgerImpl managedLedger = pulsar.getBrokerService().getTopicIfExists(topic).get()
+                .map(topicObject -> (ManagedLedgerImpl) ((PersistentTopic) topicObject).getManagedLedger())
+                .orElse(null);
+        Assert.assertNotNull(managedLedger);
+        final ManagedCursor cursor = managedLedger.openCursor("cursor"); // prevent ledgers being removed
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < messages; i++) {
+            producer.send("msg-" + i);
+        }
+
+        Assert.assertTrue(managedLedger.getTotalSize() > 0);
+
+        managedLedger.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        managedLedger.getConfig().setMaxEntriesPerLedger(1);
+        managedLedger.rollCurrentLedgerIfFull();
+
+        Awaitility.await().atMost(Duration.ofSeconds(3))
+                .until(() -> managedLedger.getLedgersInfo().size() > 1);
+
+        final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList();
+        Assert.assertEquals(ledgerInfoList.size(), 2);
+        Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize());
+
+        cursor.close();
+    }
 }