Fix lost message issue due to ledger rollover. (#14664)
(cherry picked from commit ad2cc2d38280b7dd0f056ee981ec8d3b157e3526)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c07b3ce..59b04fe 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -762,8 +762,8 @@
}
} else if (state == State.ClosedLedger) {
// No ledger and no pending operations. Create a new ledger
- log.info("[{}] Creating a new ledger", name);
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
+ log.info("[{}] Creating a new ledger", name);
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
@@ -1588,8 +1588,8 @@
}
synchronized void createLedgerAfterClosed() {
- if(isNeededCreateNewLedgerAfterCloseLedger()) {
- log.info("[{}] Creating a new ledger", name);
+ if (isNeededCreateNewLedgerAfterCloseLedger()) {
+ log.info("[{}] Creating a new ledger after closed", name);
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
@@ -1612,8 +1612,8 @@
@Override
public void rollCurrentLedgerIfFull() {
log.info("[{}] Start checking if current ledger is full", name);
- if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
- STATE_UPDATER.set(this, State.ClosingLedger);
+ if (currentLedgerEntries > 0 && currentLedgerIsFull()
+ && STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
@Override
public void closeComplete(int rc, LedgerHandle lh, Object o) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 09f11f2..074a55d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2238,6 +2238,9 @@
// roll a new ledger
int numLedgersBefore = ledger.getLedgersInfo().size();
ledger.getConfig().setMaxEntriesPerLedger(1);
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
Awaitility.await().atMost(20, TimeUnit.SECONDS)
.until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d837651..648d3e6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1936,6 +1936,9 @@
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
// let current ledger close
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
ml.rollCurrentLedgerIfFull();
// let retention expire
Thread.sleep(1500);
@@ -2205,6 +2208,9 @@
managedCursor.markDelete(positionMarkDelete);
//trigger ledger rollover and wait for the new ledger created
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
@@ -3063,7 +3069,7 @@
ledger.addEntry(new byte[1024 * 1024]);
}
- Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2);
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
List<Entry> entries = cursor.readEntries(msgNum);
Assert.assertEquals(msgNum, entries.size());
@@ -3074,9 +3080,12 @@
// all the messages have benn acknowledged
// and all the ledgers have been removed except the last ledger
- Thread.sleep(1000);
- Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
- Assert.assertEquals(ledger.getTotalSize(), 0);
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
+ ledger.rollCurrentLedgerIfFull();
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
}
@Test
@@ -3095,6 +3104,26 @@
}
@Test
+ public void testLedgerNotRolloverWithoutOpenState() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+
+ ManagedLedgerImpl ml = spy((ManagedLedgerImpl)factory.open("ledger-not-rollover-without-open-state", config));
+ ml.addEntry("test1".getBytes()).getLedgerId();
+ long ledgerId2 = ml.addEntry("test2".getBytes()).getLedgerId();
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ // Set state to CreatingLedger to avoid rollover
+ stateUpdater.set(ml, ManagedLedgerImpl.State.CreatingLedger);
+ ml.rollCurrentLedgerIfFull();
+ Field currentLedger = ManagedLedgerImpl.class.getDeclaredField("currentLedger");
+ currentLedger.setAccessible(true);
+ LedgerHandle lh = (LedgerHandle) currentLedger.get(ml);
+ Awaitility.await()
+ .until(() -> ledgerId2 == lh.getId());
+ }
+
+ @Test
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionTime(1, TimeUnit.SECONDS);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index 77ec229..b05abf3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -98,6 +99,9 @@
});
// trigger a ledger rollover
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
// the last ledger will be closed and removed and we have one ledger for empty
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 7fa3c08..a06bf9e 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -164,8 +164,11 @@
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog);
Position position = managedLedger.getLastConfirmedEntry();
if (isUseManagedLedgerProperties) {
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
+ managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().until(() -> {
- managedLedger.rollCurrentLedgerIfFull();
return !managedLedger.ledgerExists(position.getLedgerId());
});
}