[fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger (#22221)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e3dbe23..ba70fcd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -59,6 +59,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
@@ -241,6 +242,9 @@
protected volatile long lastAddEntryTimeMs = 0;
private long inactiveLedgerRollOverTimeMs = 0;
+ /** A signal that may trigger all the subsequent OpAddEntry of current ledger to be failed due to timeout. **/
+ protected volatile AtomicBoolean currentLedgerTimeoutTriggered;
+
protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
private static final String MIGRATION_STATE_PROPERTY = "migrated";
@@ -533,6 +537,7 @@
STATE_UPDATER.set(this, State.LedgerOpened);
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
currentLedger = lh;
+ currentLedgerTimeoutTriggered = new AtomicBoolean();
lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
// bypass empty ledgers, find last ledger with Message if possible.
@@ -775,7 +780,8 @@
// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
- OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
+ OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx,
+ currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}
@@ -791,7 +797,8 @@
// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
- OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
+ OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
+ currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}
@@ -843,6 +850,7 @@
// Write into lastLedger
addOperation.setLedger(currentLedger);
+ addOperation.setTimeoutTriggered(currentLedgerTimeoutTriggered);
++currentLedgerEntries;
currentLedgerSize += addOperation.data.readableBytes();
@@ -1586,6 +1594,7 @@
LedgerHandle originalCurrentLedger = currentLedger;
ledgers.put(lh.getId(), newLedger);
currentLedger = lh;
+ currentLedgerTimeoutTriggered = new AtomicBoolean();
currentLedgerEntries = 0;
currentLedgerSize = 0;
updateLedgersIdsComplete(originalCurrentLedger);
@@ -1669,9 +1678,11 @@
if (existsOp != null) {
// If op is used by another ledger handle, we need to close it and create a new one
if (existsOp.ledger != null) {
- existsOp.close();
- existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data,
- existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
+ existsOp = existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
+ } else {
+ // This scenario should not happen.
+ log.warn("[{}] An OpAddEntry's ledger is empty.", name);
+ existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
@@ -4156,13 +4167,14 @@
}
OpAddEntry opAddEntry = pendingAddEntries.peek();
if (opAddEntry != null) {
- final long finalAddOpCount = opAddEntry.addOpCount;
boolean isTimedOut = opAddEntry.lastInitTime != -1
&& TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec;
if (isTimedOut) {
- log.error("Failed to add entry for ledger {} in time-out {} sec",
- (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
- opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger, finalAddOpCount);
+ log.warn("[{}] Failed to add entry {}:{} in time-out {} sec", this.name,
+ opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1,
+ opAddEntry.entryId, timeoutSec);
+ currentLedgerTimeoutTriggered.set(true);
+ opAddEntry.handleAddFailure(opAddEntry.ledger);
}
}
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index ae2beaf..acbb0da 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -24,8 +24,10 @@
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -45,7 +47,7 @@
public class OpAddEntry implements AddCallback, CloseCallback, Runnable {
protected ManagedLedgerImpl ml;
LedgerHandle ledger;
- private long entryId;
+ long entryId;
private int numberOfMessages;
@SuppressWarnings("unused")
@@ -68,6 +70,9 @@
AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
volatile State state;
+ @Setter
+ private AtomicBoolean timeoutTriggered;
+
enum State {
OPEN,
INITIATED,
@@ -76,8 +81,8 @@
}
public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback,
- Object ctx) {
- OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
+ Object ctx, AtomicBoolean timeoutTriggered) {
+ OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx, timeoutTriggered);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
@@ -85,8 +90,9 @@
}
public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages,
- AddEntryCallback callback, Object ctx) {
- OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
+ AddEntryCallback callback, Object ctx,
+ AtomicBoolean timeoutTriggered) {
+ OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx, timeoutTriggered);
op.numberOfMessages = numberOfMessages;
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
@@ -95,7 +101,8 @@
}
private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data,
- AddEntryCallback callback, Object ctx) {
+ AddEntryCallback callback, Object ctx,
+ AtomicBoolean timeoutTriggered) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
@@ -109,6 +116,7 @@
op.startTime = System.nanoTime();
op.state = State.OPEN;
op.payloadProcessorHandle = null;
+ op.timeoutTriggered = timeoutTriggered;
ml.mbean.addAddEntrySample(op.dataLength);
return op;
}
@@ -176,7 +184,9 @@
if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) {
log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(),
entryId);
- OpAddEntry.this.recycle();
+ // Since there is a thread is coping this object, do not recycle this object to avoid other problems.
+ // For example: we recycled this object, other thread get a null "opAddEntry.{variable_name}".
+ // Recycling is not mandatory, JVM GC will collect it.
return;
}
@@ -200,7 +210,7 @@
lh == null ? -1 : lh.getId(), entryId, dataLength, rc);
}
- if (rc != BKException.Code.OK) {
+ if (rc != BKException.Code.OK || timeoutTriggered.get()) {
handleAddFailure(lh);
} else {
// Trigger addComplete callback in a thread hashed on the managed ledger name
@@ -307,13 +317,6 @@
return false;
}
- void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {
- if (checkAndCompleteOp(ctx)) {
- this.close();
- this.handleAddFailure(ledger);
- }
- }
-
/**
* It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
*
@@ -333,8 +336,11 @@
});
}
- void close() {
+ OpAddEntry duplicateAndClose(AtomicBoolean timeoutTriggered) {
STATE_UPDATER.set(OpAddEntry.this, State.CLOSED);
+ OpAddEntry duplicate =
+ OpAddEntry.createNoRetainBuffer(ml, data, getNumberOfMessages(), callback, ctx, timeoutTriggered);
+ return duplicate;
}
public State getState() {
@@ -389,6 +395,7 @@
startTime = -1;
lastInitTime = -1;
payloadProcessorHandle = null;
+ timeoutTriggered = null;
recyclerHandle.recycle(this);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 8b2742d..ec5b006 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
@@ -54,6 +55,8 @@
String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
this.sourceMLName = config.getShadowSourceName();
+ // ShadowManagedLedgerImpl does not implement add entry timeout yet, so this variable will always be false.
+ this.currentLedgerTimeoutTriggered = new AtomicBoolean(false);
}
/**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 7a34947..eca9c28 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -146,6 +146,7 @@
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
+import org.eclipse.jetty.util.BlockingArrayQueue;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
@@ -3184,6 +3185,55 @@
ledger.close();
}
+ @Test
+ public void testAddEntryResponseTimeout() throws Exception {
+ // Create ML with feature Add Entry Timeout Check.
+ final ManagedLedgerConfig config = new ManagedLedgerConfig().setAddEntryTimeoutSeconds(2);
+ final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("ml1", config);
+ final ManagedCursor cursor = ledger.openCursor("c1");
+ final CollectCtxAddEntryCallback collectCtxAddEntryCallback = new CollectCtxAddEntryCallback();
+
+ // Insert a response delay.
+ bkc.addEntryResponseDelay(8, TimeUnit.SECONDS);
+
+ // Add two entries.
+ final byte[] msg1 = new byte[]{1};
+ final byte[] msg2 = new byte[]{2};
+ int ctx1 = 1;
+ int ctx2 = 2;
+ ledger.asyncAddEntry(msg1, collectCtxAddEntryCallback, ctx1);
+ ledger.asyncAddEntry(msg2, collectCtxAddEntryCallback, ctx2);
+ // Verify all write requests are completed.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(collectCtxAddEntryCallback.addCompleteCtxList, Arrays.asList(1, 2));
+ });
+ Entry entry1 = cursor.readEntries(1).get(0);
+ assertEquals(entry1.getData(), msg1);
+ entry1.release();
+ Entry entry2 = cursor.readEntries(1).get(0);
+ assertEquals(entry2.getData(), msg2);
+ entry2.release();
+
+ // cleanup.
+ factory.delete(ledger.name);
+ }
+
+ private static class CollectCtxAddEntryCallback implements AddEntryCallback {
+
+ public List<Object> addCompleteCtxList = new BlockingArrayQueue<>();
+ public List<Object> addFailedCtxList = new BlockingArrayQueue<>();
+
+ @Override
+ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+ addCompleteCtxList.add(ctx);
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ addFailedCtxList.add(ctx);
+ }
+ }
+
/**
* It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enough
* to create new ledger and add entry successfully.
@@ -3259,7 +3309,8 @@
List<OpAddEntry> oldOps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
+ OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger,
+ ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null, new AtomicBoolean());
if (i > 4) {
op.setLedger(mock(LedgerHandle.class));
}
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index f0d279e..4516cfe 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;
+import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
@@ -89,6 +90,7 @@
}
final Queue<Long> addEntryDelaysMillis = new ConcurrentLinkedQueue<>();
+ final Queue<Long> addEntryResponseDelaysMillis = new ConcurrentLinkedQueue<>();
final List<CompletableFuture<Void>> failures = new ArrayList<>();
final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();
@@ -367,6 +369,11 @@
addEntryDelaysMillis.add(unit.toMillis(delay));
}
+ public synchronized void addEntryResponseDelay(long delay, TimeUnit unit) {
+ checkArgument(delay >= 0, "The delay time must not be negative.");
+ addEntryResponseDelaysMillis.add(unit.toMillis(delay));
+ }
+
static int getExceptionCode(Throwable t) {
if (t instanceof BKException) {
return ((BKException) t).getCode();
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index dea33a0..aa61e54 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -197,6 +197,13 @@
cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception),
PulsarMockLedgerHandle.this, LedgerHandle.INVALID_ENTRY_ID, ctx);
} else {
+ Long responseDelayMillis = bk.addEntryResponseDelaysMillis.poll();
+ if (responseDelayMillis != null) {
+ try {
+ Thread.sleep(responseDelayMillis);
+ } catch (InterruptedException e) {
+ }
+ }
cb.addComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, entryId, ctx);
}
}, bk.executor);