make skip all messages async. (#5375)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 94412a9..a13884a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -42,6 +42,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
@@ -965,28 +966,32 @@
} else {
validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
+ if (ex != null) {
+ asyncResponse.resume(new RestException(ex));
+ log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex);
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
+ }
+ };
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
- repl.clearBacklog().get();
+ repl.clearBacklog().whenComplete(biConsumer);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
- sub.clearBacklog().get(pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(),
- TimeUnit.SECONDS);
+ sub.clearBacklog().whenComplete(biConsumer);
}
- log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName);
- asyncResponse.resume(Response.noContent().build());
- return;
- } catch (NullPointerException npe) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
- return;
- } catch (Exception exception) {
- log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, exception);
- asyncResponse.resume(new RestException(exception));
- return;
+ } catch (Exception e) {
+ if (e instanceof NullPointerException) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ } else {
+ asyncResponse.resume(new RestException(e));
+ }
}
}
}