[Transaction] Fix transaction log recover fail problem. (#10146)
## Motivation
Now transaction log recover by compare ```lastConfirmedEntry.compareTo(currentLoadPosition) > 0```, when the ```lastConfirmedEntry``` has acked by cursor, we can't change the currentLoadPosition to ```lastConfirmedEntry``` or ```lastConfirmedEntry ``` + 1, so we should judge the ```cursor.hasMoreEntries()``` and ```entryQueue.size()```
diff --git a/pulsar-transaction/coordinator/pom.xml b/pulsar-transaction/coordinator/pom.xml
index ed62796..a3cfdac 100644
--- a/pulsar-transaction/coordinator/pom.xml
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -54,6 +54,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 626a687..dfadac9 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -169,8 +169,7 @@
public void start() {
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry();
- while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
- fillEntryQueueCallback.fillQueue();
+ while (fillEntryQueueCallback.fillQueue() || entryQueue.size() > 0) {
Entry entry = entryQueue.poll();
if (entry != null) {
try {
@@ -197,12 +196,17 @@
private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
- void fillQueue() {
+ boolean fillQueue() {
if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
readAsync(100, this);
+ return true;
+ } else {
+ return false;
}
+ } else {
+ return true;
}
}
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 355c5e1..b885e78 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -29,6 +29,7 @@
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,6 +38,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
@@ -270,6 +272,38 @@
}
}
+ @Test
+ public void testRecoverWhenDeleteFromCursor() throws Exception {
+ ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+ factoryConf.setMaxCacheSize(0);
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
+ TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+ MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+ new ManagedLedgerConfig());
+ MLTransactionMetadataStore transactionMetadataStore =
+ new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
+ new TransactionTimeoutTrackerImpl());
+
+
+ Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
+
+ // txnID1 have not deleted from cursor, we can recover from transaction log
+ TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
+ // txnID2 have deleted from cursor.
+ TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+
+ transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING, TxnStatus.OPEN).get();
+ transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING).get();
+
+ mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+ new ManagedLedgerConfig());
+ transactionMetadataStore =
+ new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
+ new TransactionTimeoutTrackerImpl());
+
+ Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
+ }
+
public class TransactionTimeoutTrackerImpl implements TransactionTimeoutTracker {
@Override