fix no response to client when handleSubscribe because PendingAckHandleImpl init fail (#13655)

Fixes #13654 

### Modifications

When the initialization of `PendingAckHandleImpl` fails, `pendingAckHandleCompletableFuture` will not be exception or complete, then `org.apache.pulsar.broker.service.persistent.PersistentSubscription#addConsumer` will not return any response to the client.

```
public CompletableFuture<Void> addConsumer(Consumer consumer) {
        return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> ...)
}
```
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
index 3f2cc51..fed9add 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
@@ -36,4 +36,10 @@
      * @param pendingAckMetadataEntry {@link PendingAckMetadataEntry} the metadata entry of pending ack
      */
     void handleMetadataEntry(PendingAckMetadataEntry pendingAckMetadataEntry);
+
+    /**
+     * Pending ack replay failed callback for pending ack store.
+     */
+    void replayFailed(Throwable t);
+
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
index 6bcc344..b98e64e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
@@ -61,6 +61,13 @@
     }
 
     @Override
+    public void replayFailed(Throwable t) {
+        synchronized (pendingAckHandle) {
+            pendingAckHandle.exceptionHandleFuture(t);
+        }
+    }
+
+    @Override
     public void handleMetadataEntry(PendingAckMetadataEntry pendingAckMetadataEntry) {
         TxnID txnID = new TxnID(pendingAckMetadataEntry.getTxnidMostBits(),
                 pendingAckMetadataEntry.getTxnidLeastBits());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 1b63c11..d869be3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -303,6 +303,8 @@
         public void run() {
             try {
                 if (cursor.isClosed()) {
+                    pendingAckReplyCallBack.replayFailed(new ManagedLedgerException
+                            .CursorAlreadyClosedException("MLPendingAckStore cursor have been closed."));
                     log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.",
                             cursor.getManagedLedger().getName());
                     return;
@@ -350,6 +352,7 @@
                     }
                 }
             } catch (Exception e) {
+                pendingAckReplyCallBack.replayFailed(e);
                 log.error("[{}] Pending ack recover fail!", subManagedCursor.getManagedLedger().getName(), e);
                 return;
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index d92793a..ba74d78 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -141,6 +141,7 @@
                     }).exceptionally(e -> {
                         acceptQueue.clear();
                         changeToErrorState();
+                        exceptionHandleFuture(e.getCause());
                         log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
                         return null;
                     });
@@ -889,6 +890,12 @@
         }
     }
 
+    public synchronized void exceptionHandleFuture(Throwable t) {
+        if (!this.pendingAckHandleCompletableFuture.isDone()) {
+            this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+        }
+    }
+
     @Override
     public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
         TransactionInPendingAckStats transactionInPendingAckStats = new TransactionInPendingAckStats();