Fix: failed producer creation leak (#927)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 1d76aa3..d48bd01 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static java.util.UUID.randomUUID;
 import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.atLeastOnce;
@@ -33,11 +34,11 @@
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
-import static java.util.UUID.randomUUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
@@ -60,16 +61,12 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.HandlerBase.State;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -737,4 +734,37 @@
         }
     }
 
+    @Test
+    public void testCleanProducer() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        admin.clusters().createCluster("global", new ClusterData());
+        admin.namespaces().createNamespace("my-property/global/lookup");
+
+        ClientConfiguration clientConf = new ClientConfiguration();
+        final int operationTimeOut = 500;
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setOperationTimeout(operationTimeOut, TimeUnit.MILLISECONDS);
+        PulsarClient pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf);
+        CountDownLatch latch = new CountDownLatch(1);
+        pulsarClient.createProducerAsync("persistent://my-property/global/lookup/my-topic1", producerConf)
+                .handle((producer, e) -> {
+                    latch.countDown();
+                    return null;
+                });
+
+        latch.await(operationTimeOut+1000, TimeUnit.MILLISECONDS);
+        Field prodField = PulsarClientImpl.class.getDeclaredField("producers");
+        prodField.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        IdentityHashMap<ProducerBase, Boolean> producers = (IdentityHashMap<ProducerBase, Boolean>) prodField
+                .get(pulsarClient);
+        assertTrue(producers.isEmpty());
+        pulsarClient.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+    
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1c950b3..19f922a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -654,6 +654,7 @@
     void connectionFailed(PulsarClientException exception) {
         if (System.currentTimeMillis() > subscribeTimeout && subscribeFuture.completeExceptionally(exception)) {
             setState(State.Failed);
+            log.info("[{}] Consumer creation failed for consumer {}", topic, consumerId);
             client.cleanupConsumer(this);
         }
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5f647ad..f8befce 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -899,6 +899,7 @@
                 && producerCreatedFuture.completeExceptionally(exception)) {
             log.info("[{}] Producer creation failed for producer {}", topic, producerId);
             setState(State.Failed);
+            client.cleanupProducer(this);
         }
     }