[ Issue 13551 ] FIxed NPE when reset-cursor at a non-existent topic (RestException without cause) (#13573)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 81a9200..df0f380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3747,7 +3747,7 @@
if (e.getCause() instanceof NotAllowedException) {
throw new RestException(Status.CONFLICT, e.getCause());
}
- throw new RestException(e.getCause());
+ throw new RestException(e.getCause() == null ? e : e.getCause());
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
index 0cec819..2c01a7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
@@ -38,10 +38,9 @@
writer.append("\n --- An unexpected error occurred in the server ---\n\n");
if (t != null) {
writer.append("Message: ").append(t.getMessage()).append("\n\n");
+ writer.append("Stacktrace:\n\n");
+ t.printStackTrace(new PrintWriter(writer));
}
- writer.append("Stacktrace:\n\n");
-
- t.printStackTrace(new PrintWriter(writer));
return writer.toString();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 96076b5..14719fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -44,6 +44,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
@@ -57,6 +58,7 @@
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -1149,4 +1151,21 @@
Assert.assertEquals(e.getMessage(), "Termination of a non-partitioned topic is not allowed using partitioned-terminate, please use terminate commands.");
}
}
+
+ @Test
+ public void testResetCursorReturnTimeoutWhenZKTimeout() {
+ String topic = "persistent://" + testTenant + "/" + testNamespace + "/" + "topic-2";
+ BrokerService brokerService = spy(pulsar.getBrokerService());
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ CompletableFuture<Optional<Topic>> completableFuture = new CompletableFuture<>();
+ doReturn(completableFuture).when(brokerService).getTopicIfExists(topic);
+ try {
+ admin.topics().resetCursor(topic, "my-sub", System.currentTimeMillis());
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ String errorMsg = ((InternalServerErrorException) e.getCause()).getResponse().readEntity(String.class);
+ Assert.assertTrue(errorMsg.contains("TimeoutException"));
+ }
+ }
+
}