[Managed Ledger] Fix the incorrect total size when BrokerEntryMetadata is enabled (#12714)
### Motivation
When the BrokerEntryMetadata is enabled, the total size in `ManagedLedgerImpl` is inaccurate. Because when the total size is updated in `OpAddEntry#safeRun`, the `dataLength` is the initial size of `data` when `OpAddEntry` is constructed, but `data` could be changed via `setData` method.
The inaccurate total size could affect the retention size validation. Because in `ManagedLedgerImpl#internalTrimLedgers`, the total size reduces by the size of `LedgerInfo`, which is assigned from the `LedgerHandle#getLength()`. Therefore, the total size will become 0 or less before all ledgers are removed.
### Modifications
- Update `dataLength` field in `setData` method.
- Add a `testManagedLedgerTotalSize` test to `BrokerEntryMetadataE2ETest`. It produces 10 messages and trigger the rollover manually so that the first `LedgerInfo` of the managed ledger contains the correct total bytes. Then compare the `totalSize` field with it to verify this fix works.
(cherry picked from commit 5dbb7d25849f3a037aa522b5d0767801aa0a5096)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 4f36d56..0b96b59 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -313,6 +313,7 @@
}
public void setData(ByteBuf data) {
+ this.dataLength = data.readableBytes();
this.data = data;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index d2ea2f1..49b4742 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -18,18 +18,26 @@
*/
package org.apache.pulsar.broker.service;
+import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+
+import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.assertj.core.util.Sets;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -354,4 +362,41 @@
producer.close();
consumer.close();
}
+
+ @Test
+ public void testManagedLedgerTotalSize() throws Exception {
+ final String topic = newTopicName();
+ final int messages = 10;
+
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.lookups().lookupTopic(topic);
+ final ManagedLedgerImpl managedLedger = pulsar.getBrokerService().getTopicIfExists(topic).get()
+ .map(topicObject -> (ManagedLedgerImpl) ((PersistentTopic) topicObject).getManagedLedger())
+ .orElse(null);
+ Assert.assertNotNull(managedLedger);
+ final ManagedCursor cursor = managedLedger.openCursor("cursor"); // prevent ledgers being removed
+
+ @Cleanup
+ final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+ for (int i = 0; i < messages; i++) {
+ producer.send("msg-" + i);
+ }
+
+ Assert.assertTrue(managedLedger.getTotalSize() > 0);
+
+ managedLedger.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ managedLedger.getConfig().setMaxEntriesPerLedger(1);
+ managedLedger.rollCurrentLedgerIfFull();
+
+ Awaitility.await().atMost(Duration.ofSeconds(3))
+ .until(() -> managedLedger.getLedgersInfo().size() > 1);
+
+ final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList();
+ Assert.assertEquals(ledgerInfoList.size(), 2);
+ Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize());
+
+ cursor.close();
+ }
}