[fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger (#22221)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e3dbe23..ba70fcd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -59,6 +59,7 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
@@ -241,6 +242,9 @@
     protected volatile long lastAddEntryTimeMs = 0;
     private long inactiveLedgerRollOverTimeMs = 0;
 
+    /** A signal that may trigger all the subsequent OpAddEntry of current ledger to be failed due to timeout. **/
+    protected volatile AtomicBoolean currentLedgerTimeoutTriggered;
+
     protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
     protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
     private static final String MIGRATION_STATE_PROPERTY = "migrated";
@@ -533,6 +537,7 @@
                 STATE_UPDATER.set(this, State.LedgerOpened);
                 updateLastLedgerCreatedTimeAndScheduleRolloverTask();
                 currentLedger = lh;
+                currentLedgerTimeoutTriggered = new AtomicBoolean();
 
                 lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
                 // bypass empty ledgers, find last ledger with Message if possible.
@@ -775,7 +780,8 @@
 
         // Jump to specific thread to avoid contention from writers writing from different threads
         executor.execute(() -> {
-            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx,
+                    currentLedgerTimeoutTriggered);
             internalAsyncAddEntry(addOperation);
         });
     }
@@ -791,7 +797,8 @@
 
         // Jump to specific thread to avoid contention from writers writing from different threads
         executor.execute(() -> {
-            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
+                    currentLedgerTimeoutTriggered);
             internalAsyncAddEntry(addOperation);
         });
     }
@@ -843,6 +850,7 @@
 
             // Write into lastLedger
             addOperation.setLedger(currentLedger);
+            addOperation.setTimeoutTriggered(currentLedgerTimeoutTriggered);
 
             ++currentLedgerEntries;
             currentLedgerSize += addOperation.data.readableBytes();
@@ -1586,6 +1594,7 @@
                         LedgerHandle originalCurrentLedger = currentLedger;
                         ledgers.put(lh.getId(), newLedger);
                         currentLedger = lh;
+                        currentLedgerTimeoutTriggered = new AtomicBoolean();
                         currentLedgerEntries = 0;
                         currentLedgerSize = 0;
                         updateLedgersIdsComplete(originalCurrentLedger);
@@ -1669,9 +1678,11 @@
             if (existsOp != null) {
                 // If op is used by another ledger handle, we need to close it and create a new one
                 if (existsOp.ledger != null) {
-                    existsOp.close();
-                    existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data,
-                            existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
+                    existsOp = existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
+                } else {
+                    // This scenario should not happen.
+                    log.warn("[{}] An OpAddEntry's ledger is empty.", name);
+                    existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
                 }
                 existsOp.setLedger(currentLedger);
                 pendingAddEntries.add(existsOp);
@@ -4156,13 +4167,14 @@
         }
         OpAddEntry opAddEntry = pendingAddEntries.peek();
         if (opAddEntry != null) {
-            final long finalAddOpCount = opAddEntry.addOpCount;
             boolean isTimedOut = opAddEntry.lastInitTime != -1
                     && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec;
             if (isTimedOut) {
-                log.error("Failed to add entry for ledger {} in time-out {} sec",
-                        (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
-                opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger, finalAddOpCount);
+                log.warn("[{}] Failed to add entry {}:{} in time-out {} sec", this.name,
+                        opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1,
+                        opAddEntry.entryId, timeoutSec);
+                currentLedgerTimeoutTriggered.set(true);
+                opAddEntry.handleAddFailure(opAddEntry.ledger);
             }
         }
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index ae2beaf..acbb0da 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -24,8 +24,10 @@
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -45,7 +47,7 @@
 public class OpAddEntry implements AddCallback, CloseCallback, Runnable {
     protected ManagedLedgerImpl ml;
     LedgerHandle ledger;
-    private long entryId;
+    long entryId;
     private int numberOfMessages;
 
     @SuppressWarnings("unused")
@@ -68,6 +70,9 @@
             AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
     volatile State state;
 
+    @Setter
+    private AtomicBoolean timeoutTriggered;
+
     enum State {
         OPEN,
         INITIATED,
@@ -76,8 +81,8 @@
     }
 
     public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback,
-                                                  Object ctx) {
-        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
+                                                  Object ctx, AtomicBoolean timeoutTriggered) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx, timeoutTriggered);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
         }
@@ -85,8 +90,9 @@
     }
 
     public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages,
-                                                  AddEntryCallback callback, Object ctx) {
-        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
+                                                  AddEntryCallback callback, Object ctx,
+                                                  AtomicBoolean timeoutTriggered) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx, timeoutTriggered);
         op.numberOfMessages = numberOfMessages;
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -95,7 +101,8 @@
     }
 
     private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data,
-                                                             AddEntryCallback callback, Object ctx) {
+                                                             AddEntryCallback callback, Object ctx,
+                                                             AtomicBoolean timeoutTriggered) {
         OpAddEntry op = RECYCLER.get();
         op.ml = ml;
         op.ledger = null;
@@ -109,6 +116,7 @@
         op.startTime = System.nanoTime();
         op.state = State.OPEN;
         op.payloadProcessorHandle = null;
+        op.timeoutTriggered = timeoutTriggered;
         ml.mbean.addAddEntrySample(op.dataLength);
         return op;
     }
@@ -176,7 +184,9 @@
         if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) {
             log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(),
                     entryId);
-            OpAddEntry.this.recycle();
+            // Since there is a thread is coping this object, do not recycle this object to avoid other problems.
+            // For example: we recycled this object, other thread get a null "opAddEntry.{variable_name}".
+            // Recycling is not mandatory, JVM GC will collect it.
             return;
         }
 
@@ -200,7 +210,7 @@
                     lh == null ? -1 : lh.getId(), entryId, dataLength, rc);
         }
 
-        if (rc != BKException.Code.OK) {
+        if (rc != BKException.Code.OK || timeoutTriggered.get()) {
             handleAddFailure(lh);
         } else {
             // Trigger addComplete callback in a thread hashed on the managed ledger name
@@ -307,13 +317,6 @@
         return false;
     }
 
-    void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {
-        if (checkAndCompleteOp(ctx)) {
-            this.close();
-            this.handleAddFailure(ledger);
-        }
-    }
-
     /**
      * It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
      *
@@ -333,8 +336,11 @@
         });
     }
 
-    void close() {
+    OpAddEntry duplicateAndClose(AtomicBoolean timeoutTriggered) {
         STATE_UPDATER.set(OpAddEntry.this, State.CLOSED);
+        OpAddEntry duplicate =
+                OpAddEntry.createNoRetainBuffer(ml, data, getNumberOfMessages(), callback, ctx, timeoutTriggered);
+        return duplicate;
     }
 
     public State getState() {
@@ -389,6 +395,7 @@
         startTime = -1;
         lastInitTime = -1;
         payloadProcessorHandle = null;
+        timeoutTriggered = null;
         recyclerHandle.recycle(this);
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 8b2742d..ec5b006 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback;
@@ -54,6 +55,8 @@
                                    String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
         this.sourceMLName = config.getShadowSourceName();
+        // ShadowManagedLedgerImpl does not implement add entry timeout yet, so this variable will always be false.
+        this.currentLedgerTimeoutTriggered = new AtomicBoolean(false);
     }
 
     /**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 7a34947..eca9c28 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -146,6 +146,7 @@
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
+import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
@@ -3184,6 +3185,55 @@
         ledger.close();
     }
 
+    @Test
+    public void testAddEntryResponseTimeout() throws Exception {
+        // Create ML with feature Add Entry Timeout Check.
+        final ManagedLedgerConfig config = new ManagedLedgerConfig().setAddEntryTimeoutSeconds(2);
+        final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("ml1", config);
+        final ManagedCursor cursor = ledger.openCursor("c1");
+        final CollectCtxAddEntryCallback collectCtxAddEntryCallback = new CollectCtxAddEntryCallback();
+
+        // Insert a response delay.
+        bkc.addEntryResponseDelay(8, TimeUnit.SECONDS);
+
+        // Add two entries.
+        final byte[] msg1 = new byte[]{1};
+        final byte[] msg2 = new byte[]{2};
+        int ctx1 = 1;
+        int ctx2 = 2;
+        ledger.asyncAddEntry(msg1, collectCtxAddEntryCallback, ctx1);
+        ledger.asyncAddEntry(msg2, collectCtxAddEntryCallback, ctx2);
+        // Verify all write requests are completed.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(collectCtxAddEntryCallback.addCompleteCtxList, Arrays.asList(1, 2));
+        });
+        Entry entry1 = cursor.readEntries(1).get(0);
+        assertEquals(entry1.getData(), msg1);
+        entry1.release();
+        Entry entry2 = cursor.readEntries(1).get(0);
+        assertEquals(entry2.getData(), msg2);
+        entry2.release();
+
+        // cleanup.
+        factory.delete(ledger.name);
+    }
+
+    private static class CollectCtxAddEntryCallback implements AddEntryCallback {
+
+        public List<Object> addCompleteCtxList = new BlockingArrayQueue<>();
+        public List<Object> addFailedCtxList = new BlockingArrayQueue<>();
+
+        @Override
+        public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+            addCompleteCtxList.add(ctx);
+        }
+
+        @Override
+        public void addFailed(ManagedLedgerException exception, Object ctx) {
+            addFailedCtxList.add(ctx);
+        }
+    }
+
     /**
      * It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enough
      * to create new ledger and add entry successfully.
@@ -3259,7 +3309,8 @@
 
         List<OpAddEntry> oldOps = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
+            OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger,
+                    ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null, new AtomicBoolean());
             if (i > 4) {
                 op.setLedger(mock(LedgerHandle.class));
             }
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index f0d279e..4516cfe 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -89,6 +90,7 @@
     }
 
     final Queue<Long> addEntryDelaysMillis = new ConcurrentLinkedQueue<>();
+    final Queue<Long> addEntryResponseDelaysMillis = new ConcurrentLinkedQueue<>();
     final List<CompletableFuture<Void>> failures = new ArrayList<>();
     final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();
 
@@ -367,6 +369,11 @@
         addEntryDelaysMillis.add(unit.toMillis(delay));
     }
 
+    public synchronized void addEntryResponseDelay(long delay, TimeUnit unit) {
+        checkArgument(delay >= 0, "The delay time must not be negative.");
+        addEntryResponseDelaysMillis.add(unit.toMillis(delay));
+    }
+
     static int getExceptionCode(Throwable t) {
         if (t instanceof BKException) {
             return ((BKException) t).getCode();
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index dea33a0..aa61e54 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -197,6 +197,13 @@
                         cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception),
                                        PulsarMockLedgerHandle.this, LedgerHandle.INVALID_ENTRY_ID, ctx);
                     } else {
+                        Long responseDelayMillis = bk.addEntryResponseDelaysMillis.poll();
+                        if (responseDelayMillis != null) {
+                            try {
+                                Thread.sleep(responseDelayMillis);
+                            } catch (InterruptedException e) {
+                            }
+                        }
                         cb.addComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, entryId, ctx);
                     }
                 }, bk.executor);