[Transaction] Fix transaction log recover fail problem. (#10146)

## Motivation
Now transaction log recover by compare ```lastConfirmedEntry.compareTo(currentLoadPosition) > 0```, when the ```lastConfirmedEntry``` has acked by cursor, we can't change the currentLoadPosition to ```lastConfirmedEntry``` or  ```lastConfirmedEntry ``` + 1, so we should judge the ```cursor.hasMoreEntries()``` and ```entryQueue.size()```
diff --git a/pulsar-transaction/coordinator/pom.xml b/pulsar-transaction/coordinator/pom.xml
index ed62796..a3cfdac 100644
--- a/pulsar-transaction/coordinator/pom.xml
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -54,6 +54,13 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>${awaitility.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
     
     <build>
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 626a687..dfadac9 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -169,8 +169,7 @@
         public void start() {
             TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry();
 
-            while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
-                fillEntryQueueCallback.fillQueue();
+            while (fillEntryQueueCallback.fillQueue() || entryQueue.size() > 0) {
                 Entry entry = entryQueue.poll();
                 if (entry != null) {
                     try {
@@ -197,12 +196,17 @@
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
 
-        void fillQueue() {
+        boolean fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
                     readAsync(100, this);
+                    return true;
+                } else {
+                    return false;
                 }
+            } else {
+                return true;
             }
         }
 
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 355c5e1..b885e78 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -29,6 +29,7 @@
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -37,6 +38,7 @@
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
@@ -270,6 +272,38 @@
         }
     }
 
+    @Test
+    public void testRecoverWhenDeleteFromCursor() throws Exception {
+        ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+        factoryConf.setMaxCacheSize(0);
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
+        TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+                new ManagedLedgerConfig());
+        MLTransactionMetadataStore transactionMetadataStore =
+                new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
+                        new TransactionTimeoutTrackerImpl());
+
+
+        Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
+
+        // txnID1 have not deleted from cursor, we can recover from transaction log
+        TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
+        // txnID2 have deleted from cursor.
+        TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+
+        transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING, TxnStatus.OPEN).get();
+        transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING).get();
+
+        mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+                new ManagedLedgerConfig());
+        transactionMetadataStore =
+                new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
+                        new TransactionTimeoutTrackerImpl());
+
+        Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
+    }
+
     public class TransactionTimeoutTrackerImpl implements TransactionTimeoutTracker {
 
         @Override