[ Issue 13551 ] FIxed NPE when reset-cursor at a non-existent topic (RestException without cause) (#13573)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 81a9200..df0f380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3747,7 +3747,7 @@
             if (e.getCause() instanceof NotAllowedException) {
                 throw new RestException(Status.CONFLICT, e.getCause());
             }
-            throw new RestException(e.getCause());
+            throw new RestException(e.getCause() == null ? e : e.getCause());
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
index 0cec819..2c01a7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
@@ -38,10 +38,9 @@
         writer.append("\n --- An unexpected error occurred in the server ---\n\n");
         if (t != null) {
             writer.append("Message: ").append(t.getMessage()).append("\n\n");
+            writer.append("Stacktrace:\n\n");
+            t.printStackTrace(new PrintWriter(writer));
         }
-        writer.append("Stacktrace:\n\n");
-
-        t.printStackTrace(new PrintWriter(writer));
         return writer.toString();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 96076b5..14719fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -44,6 +44,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.ws.rs.InternalServerErrorException;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
@@ -57,6 +58,7 @@
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -1149,4 +1151,21 @@
             Assert.assertEquals(e.getMessage(), "Termination of a non-partitioned topic is not allowed using partitioned-terminate, please use terminate commands.");
         }
     }
+
+    @Test
+    public void testResetCursorReturnTimeoutWhenZKTimeout() {
+        String topic = "persistent://" + testTenant + "/" + testNamespace + "/" + "topic-2";
+        BrokerService brokerService = spy(pulsar.getBrokerService());
+        doReturn(brokerService).when(pulsar).getBrokerService();
+        CompletableFuture<Optional<Topic>> completableFuture = new CompletableFuture<>();
+        doReturn(completableFuture).when(brokerService).getTopicIfExists(topic);
+        try {
+            admin.topics().resetCursor(topic, "my-sub", System.currentTimeMillis());
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            String errorMsg = ((InternalServerErrorException) e.getCause()).getResponse().readEntity(String.class);
+            Assert.assertTrue(errorMsg.contains("TimeoutException"));
+        }
+    }
+
 }