[fix] [log] fix the vague response if topic not found (#20932)
### Motivation
When I did this test below and got the error "Topic not found".
```java
pulsarAdmin.topics().createNonPartitionedTopic("persistent://public/default/tp_1");
Consumer consumer = null;
Consumer consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/tp_1")
.subscriptionName("s1")
.enableRetry(true)
.subscribe();
```
I do create the topic `persistent://public/default/tp_1` first but got a response "Topic not found", it is confusing.
The root cause is the retry letter topic `persistent://public/default/tp_1-sub1-RETRY` was not created.
### Modifications
clear the vague response if the topic is not founded.
(cherry picked from commit 1a024bc6aa0999669ef8b67bdbe0dc80eeff9f8f)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 3921334..470cdc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -501,11 +501,13 @@
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(topicName.toString());
if (topicFuture == null) {
- return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+ return FutureUtil.failedFuture(new RestException(NOT_FOUND,
+ String.format("Topic not found %s", topicName.toString())));
}
return topicFuture.thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent()) {
- return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+ return FutureUtil.failedFuture(new RestException(NOT_FOUND,
+ String.format("Topic not found %s", topicName.toString())));
}
return CompletableFuture.completedFuture((PersistentTopic) optionalTopic.get());
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 0d857f2..1c1dd74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -284,11 +284,12 @@
}
}
- private Topic getTopicReference(TopicName topicName) {
+ private Topic getTopicReference(final TopicName topicName) {
try {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+ String.format("Topic not found %s", topicName.toString())));
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException | TimeoutException e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index c360eea..386b974 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -233,7 +233,8 @@
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ String.format("Partitioned topic not found %s", topicName.toString())));
return;
}
NonPersistentPartitionedTopicStatsImpl stats =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index bd70201..9c9c522 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -56,7 +56,7 @@
private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";
- protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topicName, boolean authoritative,
+ protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName topicName, boolean authoritative,
String listenerName) {
if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
log.warn("No broker was found available for topic {}", topicName);
@@ -79,7 +79,8 @@
})
.thenCompose(exist -> {
if (!exist) {
- throw new RestException(Response.Status.NOT_FOUND, "Topic not found.");
+ throw new RestException(Response.Status.NOT_FOUND,
+ String.format("Topic not found %s", topicName.toString()));
}
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
.getBrokerServiceUrlAsync(topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b819a99..4e61a32 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1371,7 +1371,7 @@
cmdProducer.hasInitialSubscriptionName() ? cmdProducer.getInitialSubscriptionName() : null;
final boolean supportsPartialProducer = supportsPartialProducer();
- TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
+ final TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
if (topicName == null) {
return;
}
@@ -1564,7 +1564,7 @@
// Do not print stack traces for expected exceptions
if (cause instanceof NoSuchElementException) {
- cause = new TopicNotFoundException("Topic Not Found.");
+ cause = new TopicNotFoundException(String.format("Topic not found %s", topicName.toString()));
log.warn("[{}] Failed to load topic {}, producerId={}: Topic not found", remoteAddress, topicName,
producerId);
} else if (!Exceptions.areExceptionsPresentInChain(cause,
@@ -2377,7 +2377,7 @@
schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
if (schemaAndMetadata == null) {
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound,
- "Topic not found or no-schema");
+ String.format("Topic not found or no-schema %s", commandGetSchema.getTopic()));
} else {
commandSender.sendGetSchemaResponse(requestId,
SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);
@@ -2395,7 +2395,7 @@
log.debug("Received CommandGetOrCreateSchema call from {}", remoteAddress);
}
long requestId = commandGetOrCreateSchema.getRequestId();
- String topicName = commandGetOrCreateSchema.getTopic();
+ final String topicName = commandGetOrCreateSchema.getTopic();
SchemaData schemaData = getSchema(commandGetOrCreateSchema.getSchema());
SchemaData schema = schemaData.getType() == SchemaType.NONE ? null : schemaData;
service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
@@ -2415,7 +2415,7 @@
});
} else {
commandSender.sendGetOrCreateSchemaErrorResponse(requestId, ServerError.TopicNotFound,
- "Topic not found");
+ String.format("Topic not found %s", topicName));
}
}).exceptionally(ex -> {
ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
index 3d2a6b9..27d72f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
@@ -168,7 +168,7 @@
testNamespace, "my-topic", true);
} catch (Exception e) {
//System.out.println(e.getMessage());
- Assert.assertEquals("Topic not found", e.getMessage());
+ Assert.assertTrue(e.getMessage().contains("Topic not found"));
}
String key = "legendtkl";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 1e5f467..049fd0f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -163,7 +163,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
@@ -173,7 +173,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
@@ -208,7 +208,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
@@ -219,7 +219,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
@@ -334,7 +334,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
@@ -344,7 +344,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
@@ -392,7 +392,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
@@ -402,7 +402,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);
@@ -541,7 +541,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
@@ -551,7 +551,7 @@
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
- assertEquals(cause.getMessage(), "Topic not found");
+ assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java
new file mode 100644
index 0000000..728e556
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class SimpleProducerConsumerDisallowAutoCreateTopicTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setAllowAutoTopicCreation(false);
+ }
+
+ @Test
+ public void testClearErrorIfRetryTopicNotExists() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ final String subName = "sub";
+ final String retryTopicName = topicName + "-" + subName + RETRY_GROUP_TOPIC_SUFFIX;
+ admin.topics().createNonPartitionedTopic(topicName);
+ Consumer consumer = null;
+ try {
+ consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .enableRetry(true)
+ .subscribe();
+ fail("");
+ } catch (Exception ex) {
+ log.info("got an expected error", ex);
+ assertTrue(ex.getMessage().contains("Not found:"));
+ assertTrue(ex.getMessage().contains(retryTopicName));
+ } finally {
+ // cleanup.
+ if (consumer != null) {
+ consumer.close();
+ }
+ admin.topics().delete(topicName);
+ }
+ }
+}