Revert "[Issue 5597][pulsar-client-java] retry when getPartitionedTopicMetadata failed (#5603)" (#5733)
This reverts commit ee11e100d8a05296f1ddf0da6c4e52f63ca02294.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 00d95d1..fca1485 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -31,8 +31,6 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.slf4j.Logger;
@@ -49,7 +47,6 @@
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
@@ -135,65 +132,5 @@
log.info("-- Exiting {} test --", methodName);
}
- @Test
- public void testGetPartitionedTopicMetaData() throws Exception {
- log.info("-- Starting {} test --", methodName);
-
- final String topicName = "persistent://my-property/my-ns/my-topic1";
- final String subscriptionName = "my-subscriber-name";
-
-
-
- try {
- String url = "http://localhost:51000,localhost:" + BROKER_WEBSERVICE_PORT;
- if (isTcpLookup) {
- url = "pulsar://localhost:51000,localhost:" + BROKER_PORT;
- }
- PulsarClient client = newPulsarClient(url, 0);
-
- Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName(subscriptionName)
- .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
- Producer<byte[]> producer = client.newProducer().topic(topicName).create();
-
- consumer.close();
- producer.close();
- client.close();
- } catch (PulsarClientException pce) {
- log.error("create producer or consumer error: ", pce);
- fail();
- }
-
- log.info("-- Exiting {} test --", methodName);
- }
-
- @Test (timeOut = 4000)
- public void testGetPartitionedTopicDataTimeout() {
- log.info("-- Starting {} test --", methodName);
-
- final String topicName = "persistent://my-property/my-ns/my-topic1";
-
- String url = "http://localhost:51000,localhost:51001";
- if (isTcpLookup) {
- url = "pulsar://localhost:51000,localhost:51001";
- }
-
- PulsarClient client;
- try {
- client = PulsarClient.builder()
- .serviceUrl(url)
- .statsInterval(0, TimeUnit.SECONDS)
- .operationTimeout(3, TimeUnit.SECONDS)
- .build();
-
- Producer<byte[]> producer = client.newProducer().topic(topicName).create();
-
- fail();
- } catch (PulsarClientException pce) {
- log.error("create producer error: ", pce);
- }
-
- log.info("-- Exiting {} test --", methodName);
- }
-
private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 845c741..96c62d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -21,7 +21,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.HttpURLConnection;
-import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
@@ -128,9 +127,8 @@
public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
final CompletableFuture<T> future = new CompletableFuture<>();
try {
- URI hostUri = serviceNameResolver.resolveHostUri();
- String requestUrl = new URL(hostUri.toURL(), path).toString();
- String remoteHostName = hostUri.getHost();
+ String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString();
+ String remoteHostName = serviceNameResolver.resolveHostUri().getHost();
AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);
CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index abad54b..53468d3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -641,45 +641,17 @@
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) {
- CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
+ CompletableFuture<PartitionedTopicMetadata> metadataFuture;
try {
TopicName topicName = TopicName.get(topic);
- AtomicLong opTimeoutMs = new AtomicLong(conf.getOperationTimeoutMs());
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
- .setMax(0, TimeUnit.MILLISECONDS)
- .useUserConfiguredIntervals(conf.getDefaultBackoffIntervalNanos(),
- conf.getMaxBackoffIntervalNanos())
- .create();
- getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture);
+ metadataFuture = lookup.getPartitionedTopicMetadata(topicName);
} catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
}
return metadataFuture;
}
- private void getPartitionedTopicMetadata(TopicName topicName,
- Backoff backoff,
- AtomicLong remainingTime,
- CompletableFuture<PartitionedTopicMetadata> future) {
- lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
- long nextDelay = Math.min(backoff.next(), remainingTime.get());
- if (nextDelay <= 0) {
- future.completeExceptionally(new PulsarClientException
- .TimeoutException("Could not getPartitionedTopicMetadata within configured timeout."));
- return null;
- }
-
- timer.newTimeout( task -> {
- remainingTime.addAndGet(-nextDelay);
- getPartitionedTopicMetadata(topicName, backoff, remainingTime, future);
- }, nextDelay, TimeUnit.MILLISECONDS);
- return null;
- });
- }
-
@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
return getPartitionedTopicMetadata(topic).thenApply(metadata -> {