fix no response to client when handleSubscribe because PendingAckHandleImpl init fail (#13655)
Fixes #13654
### Modifications
When the initialization of `PendingAckHandleImpl` fails, `pendingAckHandleCompletableFuture` will not be exception or complete, then `org.apache.pulsar.broker.service.persistent.PersistentSubscription#addConsumer` will not return any response to the client.
```
public CompletableFuture<Void> addConsumer(Consumer consumer) {
return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> ...)
}
```
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
index 3f2cc51..fed9add 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
@@ -36,4 +36,10 @@
* @param pendingAckMetadataEntry {@link PendingAckMetadataEntry} the metadata entry of pending ack
*/
void handleMetadataEntry(PendingAckMetadataEntry pendingAckMetadataEntry);
+
+ /**
+ * Pending ack replay failed callback for pending ack store.
+ */
+ void replayFailed(Throwable t);
+
}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
index 6bcc344..b98e64e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
@@ -61,6 +61,13 @@
}
@Override
+ public void replayFailed(Throwable t) {
+ synchronized (pendingAckHandle) {
+ pendingAckHandle.exceptionHandleFuture(t);
+ }
+ }
+
+ @Override
public void handleMetadataEntry(PendingAckMetadataEntry pendingAckMetadataEntry) {
TxnID txnID = new TxnID(pendingAckMetadataEntry.getTxnidMostBits(),
pendingAckMetadataEntry.getTxnidLeastBits());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 1b63c11..d869be3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -303,6 +303,8 @@
public void run() {
try {
if (cursor.isClosed()) {
+ pendingAckReplyCallBack.replayFailed(new ManagedLedgerException
+ .CursorAlreadyClosedException("MLPendingAckStore cursor have been closed."));
log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.",
cursor.getManagedLedger().getName());
return;
@@ -350,6 +352,7 @@
}
}
} catch (Exception e) {
+ pendingAckReplyCallBack.replayFailed(e);
log.error("[{}] Pending ack recover fail!", subManagedCursor.getManagedLedger().getName(), e);
return;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index d92793a..ba74d78 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -141,6 +141,7 @@
}).exceptionally(e -> {
acceptQueue.clear();
changeToErrorState();
+ exceptionHandleFuture(e.getCause());
log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
return null;
});
@@ -889,6 +890,12 @@
}
}
+ public synchronized void exceptionHandleFuture(Throwable t) {
+ if (!this.pendingAckHandleCompletableFuture.isDone()) {
+ this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+ }
+ }
+
@Override
public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
TransactionInPendingAckStats transactionInPendingAckStats = new TransactionInPendingAckStats();