[fix] [log] fix the vague response if topic not found (#20932)

### Motivation

When I did this test below and got the error "Topic not found".

```java
pulsarAdmin.topics().createNonPartitionedTopic("persistent://public/default/tp_1");
Consumer consumer = null;
Consumer consumer = pulsarClient.newConsumer()
          .topic("persistent://public/default/tp_1")
          .subscriptionName("s1")
          .enableRetry(true)
          .subscribe();
```

I do create the topic `persistent://public/default/tp_1` first but got a response "Topic not found", it is confusing.

The root cause is the retry letter topic `persistent://public/default/tp_1-sub1-RETRY` was not created.

### Modifications

clear the vague response if the topic is not founded.

(cherry picked from commit 1a024bc6aa0999669ef8b67bdbe0dc80eeff9f8f)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 3921334..470cdc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -501,11 +501,13 @@
             CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
                     .getTopics().get(topicName.toString());
             if (topicFuture == null) {
-                return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+                return FutureUtil.failedFuture(new RestException(NOT_FOUND,
+                        String.format("Topic not found %s", topicName.toString())));
             }
             return topicFuture.thenCompose(optionalTopic -> {
                 if (!optionalTopic.isPresent()) {
-                    return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+                    return FutureUtil.failedFuture(new RestException(NOT_FOUND,
+                            String.format("Topic not found %s", topicName.toString())));
                 }
                 return CompletableFuture.completedFuture((PersistentTopic) optionalTopic.get());
             });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 0d857f2..1c1dd74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -284,11 +284,12 @@
         }
     }
 
-    private Topic getTopicReference(TopicName topicName) {
+    private Topic getTopicReference(final TopicName topicName) {
         try {
             return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
                     .get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+                            String.format("Topic not found %s", topicName.toString())));
         } catch (ExecutionException e) {
             throw new RuntimeException(e.getCause());
         } catch (InterruptedException | TimeoutException e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index c360eea..386b974 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -233,7 +233,8 @@
             getPartitionedTopicMetadataAsync(topicName,
                     authoritative, false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions == 0) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                            String.format("Partitioned topic not found %s", topicName.toString())));
                     return;
                 }
                 NonPersistentPartitionedTopicStatsImpl stats =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index bd70201..9c9c522 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -56,7 +56,7 @@
     private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
     private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";
 
-    protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topicName, boolean authoritative,
+    protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName topicName, boolean authoritative,
                                                                      String listenerName) {
         if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
             log.warn("No broker was found available for topic {}", topicName);
@@ -79,7 +79,8 @@
                 })
                 .thenCompose(exist -> {
                     if (!exist) {
-                        throw new RestException(Response.Status.NOT_FOUND, "Topic not found.");
+                        throw new RestException(Response.Status.NOT_FOUND,
+                                String.format("Topic not found %s", topicName.toString()));
                     }
                     CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
                             .getBrokerServiceUrlAsync(topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b819a99..4e61a32 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1371,7 +1371,7 @@
                 cmdProducer.hasInitialSubscriptionName() ? cmdProducer.getInitialSubscriptionName() : null;
         final boolean supportsPartialProducer = supportsPartialProducer();
 
-        TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
+        final TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
         if (topicName == null) {
             return;
         }
@@ -1564,7 +1564,7 @@
 
                 // Do not print stack traces for expected exceptions
                 if (cause instanceof NoSuchElementException) {
-                    cause = new TopicNotFoundException("Topic Not Found.");
+                    cause = new TopicNotFoundException(String.format("Topic not found %s", topicName.toString()));
                     log.warn("[{}] Failed to load topic {}, producerId={}: Topic not found", remoteAddress, topicName,
                             producerId);
                 } else if (!Exceptions.areExceptionsPresentInChain(cause,
@@ -2377,7 +2377,7 @@
         schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
             if (schemaAndMetadata == null) {
                 commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound,
-                        "Topic not found or no-schema");
+                        String.format("Topic not found or no-schema %s", commandGetSchema.getTopic()));
             } else {
                 commandSender.sendGetSchemaResponse(requestId,
                         SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);
@@ -2395,7 +2395,7 @@
             log.debug("Received CommandGetOrCreateSchema call from {}", remoteAddress);
         }
         long requestId = commandGetOrCreateSchema.getRequestId();
-        String topicName = commandGetOrCreateSchema.getTopic();
+        final String topicName = commandGetOrCreateSchema.getTopic();
         SchemaData schemaData = getSchema(commandGetOrCreateSchema.getSchema());
         SchemaData schema = schemaData.getType() == SchemaType.NONE ? null : schemaData;
         service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
@@ -2415,7 +2415,7 @@
                 });
             } else {
                 commandSender.sendGetOrCreateSchemaErrorResponse(requestId, ServerError.TopicNotFound,
-                        "Topic not found");
+                        String.format("Topic not found %s", topicName));
             }
         }).exceptionally(ex -> {
             ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
index 3d2a6b9..27d72f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
@@ -168,7 +168,7 @@
                     testNamespace, "my-topic", true);
         } catch (Exception e) {
             //System.out.println(e.getMessage());
-            Assert.assertEquals("Topic not found", e.getMessage());
+            Assert.assertTrue(e.getMessage().contains("Topic not found"));
         }
 
         String key = "legendtkl";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 1e5f467..049fd0f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -163,7 +163,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         try {
             pulsar.getBrokerService().getTopic(topic, false);
@@ -173,7 +173,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         admin.topics().createNonPartitionedTopic(topic);
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
@@ -208,7 +208,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         try {
             pulsar.getBrokerService().getTopic(topic, false);
@@ -219,7 +219,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         admin.topics().createNonPartitionedTopic(topic);
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
@@ -334,7 +334,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         try {
             pulsar.getBrokerService().getTopic(topic, false);
@@ -344,7 +344,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         admin.topics().createNonPartitionedTopic(topic);
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
@@ -392,7 +392,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         try {
             pulsar.getBrokerService().getTopic(topic, false);
@@ -402,7 +402,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         admin.topics().createNonPartitionedTopic(topic);
 
@@ -541,7 +541,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         try {
             pulsar.getBrokerService().getTopic(topic, false);
@@ -551,7 +551,7 @@
         } catch (ExecutionException ex) {
             assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
             PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
-            assertEquals(cause.getMessage(), "Topic not found");
+            assertTrue(cause.getMessage().contains("Topic not found"));
         }
         admin.topics().createNonPartitionedTopic(topic);
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java
new file mode 100644
index 0000000..728e556
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class SimpleProducerConsumerDisallowAutoCreateTopicTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setAllowAutoTopicCreation(false);
+    }
+
+    @Test
+    public void testClearErrorIfRetryTopicNotExists() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        final String subName = "sub";
+        final String retryTopicName = topicName + "-" + subName + RETRY_GROUP_TOPIC_SUFFIX;
+        admin.topics().createNonPartitionedTopic(topicName);
+        Consumer consumer = null;
+        try {
+            consumer = pulsarClient.newConsumer()
+                    .topic(topicName)
+                    .subscriptionName(subName)
+                    .enableRetry(true)
+                    .subscribe();
+            fail("");
+        } catch (Exception ex) {
+            log.info("got an expected error", ex);
+            assertTrue(ex.getMessage().contains("Not found:"));
+            assertTrue(ex.getMessage().contains(retryTopicName));
+        } finally {
+            // cleanup.
+            if (consumer != null) {
+                consumer.close();
+            }
+            admin.topics().delete(topicName);
+        }
+    }
+}