make skip all messages async. (#5375)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 94412a9..a13884a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -42,6 +42,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
@@ -965,28 +966,32 @@
         } else {
             validateAdminAccessForSubscriber(subName, authoritative);
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+            BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
+                if (ex != null) {
+                    asyncResponse.resume(new RestException(ex));
+                    log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex);
+                } else {
+                    asyncResponse.resume(Response.noContent().build());
+                    log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
+                }
+            };
             try {
                 if (subName.startsWith(topic.getReplicatorPrefix())) {
                     String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                     PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
                     checkNotNull(repl);
-                    repl.clearBacklog().get();
+                    repl.clearBacklog().whenComplete(biConsumer);
                 } else {
                     PersistentSubscription sub = topic.getSubscription(subName);
                     checkNotNull(sub);
-                    sub.clearBacklog().get(pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(),
-                            TimeUnit.SECONDS);
+                    sub.clearBacklog().whenComplete(biConsumer);
                 }
-                log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
-                asyncResponse.resume(Response.noContent().build());
-                return;
-            } catch (NullPointerException npe) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
-                return;
-            } catch (Exception exception) {
-                log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, exception);
-                asyncResponse.resume(new RestException(exception));
-                return;
+            } catch (Exception e) {
+                if (e instanceof NullPointerException) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                } else {
+                    asyncResponse.resume(new RestException(e));
+                }
             }
         }
     }