Fix: failed producer creation leak (#927)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 1d76aa3..d48bd01 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static java.util.UUID.randomUUID;
import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.atLeastOnce;
@@ -33,11 +34,11 @@
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
-import static java.util.UUID.randomUUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
@@ -60,16 +61,12 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.HandlerBase.State;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -737,4 +734,37 @@
}
}
+ @Test
+ public void testCleanProducer() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ ConsumerConfiguration conf = new ConsumerConfiguration();
+ conf.setSubscriptionType(SubscriptionType.Exclusive);
+ ProducerConfiguration producerConf = new ProducerConfiguration();
+ admin.clusters().createCluster("global", new ClusterData());
+ admin.namespaces().createNamespace("my-property/global/lookup");
+
+ ClientConfiguration clientConf = new ClientConfiguration();
+ final int operationTimeOut = 500;
+ clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+ clientConf.setOperationTimeout(operationTimeOut, TimeUnit.MILLISECONDS);
+ PulsarClient pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf);
+ CountDownLatch latch = new CountDownLatch(1);
+ pulsarClient.createProducerAsync("persistent://my-property/global/lookup/my-topic1", producerConf)
+ .handle((producer, e) -> {
+ latch.countDown();
+ return null;
+ });
+
+ latch.await(operationTimeOut+1000, TimeUnit.MILLISECONDS);
+ Field prodField = PulsarClientImpl.class.getDeclaredField("producers");
+ prodField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ IdentityHashMap<ProducerBase, Boolean> producers = (IdentityHashMap<ProducerBase, Boolean>) prodField
+ .get(pulsarClient);
+ assertTrue(producers.isEmpty());
+ pulsarClient.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1c950b3..19f922a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -654,6 +654,7 @@
void connectionFailed(PulsarClientException exception) {
if (System.currentTimeMillis() > subscribeTimeout && subscribeFuture.completeExceptionally(exception)) {
setState(State.Failed);
+ log.info("[{}] Consumer creation failed for consumer {}", topic, consumerId);
client.cleanupConsumer(this);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5f647ad..f8befce 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -899,6 +899,7 @@
&& producerCreatedFuture.completeExceptionally(exception)) {
log.info("[{}] Producer creation failed for producer {}", topic, producerId);
setState(State.Failed);
+ client.cleanupProducer(this);
}
}