[branch-2.9][fix][broker] Fix index generator is not rollback after entries are failed added (#19978)
Co-authored-by: gavingaozhangmin <gavingaozhangmin@didiglobal.com>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 829ad0a..0777014 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -826,6 +826,13 @@
}
}
+ protected void afterFailedAddEntry(int numOfMessages) {
+ if (managedLedgerInterceptor == null) {
+ return;
+ }
+ managedLedgerInterceptor.afterFailedAddEntry(numOfMessages);
+ }
+
private boolean beforeAddEntry(OpAddEntry addOperation) {
// if no interceptor, just return true to make sure addOperation will be initiate()
if (managedLedgerInterceptor == null) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 46069ea..c34b989 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -148,6 +148,7 @@
public void failed(ManagedLedgerException e) {
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+ ml.afterFailedAddEntry(this.getNumberOfMessages());
if (cb != null) {
ReferenceCountUtil.release(data);
cb.addFailed(e, ctx);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
index 13fc530..f420b1b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
@@ -43,6 +43,14 @@
OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages);
/**
+ * Intercept When add entry failed.
+ * @param numberOfMessages
+ */
+ default void afterFailedAddEntry(int numberOfMessages){
+
+ }
+
+ /**
* Intercept when ManagedLedger is initialized.
* @param propertiesMap map of properties.
*/
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index 0f634e0..11cfaf9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -79,6 +79,15 @@
}
@Override
+ public void afterFailedAddEntry(int numberOfMessages) {
+ for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+ if (interceptor instanceof AppendIndexMetadataInterceptor) {
+ ((AppendIndexMetadataInterceptor) interceptor).decreaseWithNumberOfMessages(numberOfMessages);
+ }
+ }
+ }
+
+ @Override
public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
if (propertiesMap == null || propertiesMap.size() == 0) {
return;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 2256260..57cf446 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -305,6 +305,49 @@
}
@Test
+ public void testAddEntryFailed() throws Exception {
+ final int MOCK_BATCH_SIZE = 2;
+ final String ledgerAndCursorName = "testAddEntryFailed";
+
+ ManagedLedgerInterceptor interceptor =
+ new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setManagedLedgerInterceptor(interceptor);
+
+ ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+
+ ledger.terminate();
+
+ ManagedLedgerInterceptorImpl interceptor1 =
+ (ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor();
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ try {
+ ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ countDownLatch.countDown();
+ }
+ }, null);
+
+ countDownLatch.await();
+ assertEquals(interceptor1.getIndex(), -1);
+ } finally {
+ ledger.close();
+ factory.shutdown();
+ }
+
+ }
+
+ @Test
public void testBeforeAddEntryWithException() throws Exception {
final int MOCK_BATCH_SIZE = 2;
final String ledgerAndCursorName = "testBeforeAddEntryWithException";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
index 96d4ab2..ced33ae 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
@@ -51,4 +51,8 @@
public long getIndex() {
return indexGenerator.get();
}
+
+ public void decreaseWithNumberOfMessages(int numberOfMessages) {
+ indexGenerator.addAndGet(-numberOfMessages);
+ }
}