[branch-2.9][fix][broker] Fix index generator is not rollback after entries are failed added (#19978)

Co-authored-by: gavingaozhangmin <gavingaozhangmin@didiglobal.com>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 829ad0a..0777014 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -826,6 +826,13 @@
         }
     }
 
+    protected void afterFailedAddEntry(int numOfMessages) {
+        if (managedLedgerInterceptor == null) {
+            return;
+        }
+        managedLedgerInterceptor.afterFailedAddEntry(numOfMessages);
+    }
+
     private boolean beforeAddEntry(OpAddEntry addOperation) {
         // if no interceptor, just return true to make sure addOperation will be initiate()
         if (managedLedgerInterceptor == null) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 46069ea..c34b989 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -148,6 +148,7 @@
 
     public void failed(ManagedLedgerException e) {
         AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+        ml.afterFailedAddEntry(this.getNumberOfMessages());
         if (cb != null) {
             ReferenceCountUtil.release(data);
             cb.addFailed(e, ctx);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
index 13fc530..f420b1b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
@@ -43,6 +43,14 @@
     OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages);
 
     /**
+     * Intercept When add entry failed.
+     * @param numberOfMessages
+     */
+    default void afterFailedAddEntry(int numberOfMessages){
+
+    }
+
+    /**
      * Intercept when ManagedLedger is initialized.
      * @param propertiesMap map of properties.
      */
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index 0f634e0..11cfaf9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -79,6 +79,15 @@
     }
 
     @Override
+    public void afterFailedAddEntry(int numberOfMessages) {
+        for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+            if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                ((AppendIndexMetadataInterceptor) interceptor).decreaseWithNumberOfMessages(numberOfMessages);
+            }
+        }
+    }
+
+    @Override
     public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
         if (propertiesMap == null || propertiesMap.size() == 0) {
             return;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 2256260..57cf446 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -305,6 +305,49 @@
     }
 
     @Test
+    public void testAddEntryFailed() throws Exception {
+        final int MOCK_BATCH_SIZE = 2;
+        final String ledgerAndCursorName = "testAddEntryFailed";
+
+        ManagedLedgerInterceptor interceptor =
+                new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setManagedLedgerInterceptor(interceptor);
+
+        ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+        ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+
+        ledger.terminate();
+
+        ManagedLedgerInterceptorImpl interceptor1 =
+                (ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor();
+
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        try {
+            ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                    countDownLatch.countDown();
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    countDownLatch.countDown();
+                }
+            }, null);
+
+            countDownLatch.await();
+            assertEquals(interceptor1.getIndex(), -1);
+        } finally {
+            ledger.close();
+            factory.shutdown();
+        }
+
+    }
+
+    @Test
     public void testBeforeAddEntryWithException() throws Exception {
         final int MOCK_BATCH_SIZE = 2;
         final String ledgerAndCursorName = "testBeforeAddEntryWithException";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
index 96d4ab2..ced33ae 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
@@ -51,4 +51,8 @@
     public long getIndex() {
         return indexGenerator.get();
     }
+
+    public void decreaseWithNumberOfMessages(int numberOfMessages) {
+        indexGenerator.addAndGet(-numberOfMessages);
+    }
 }