add closeCh for go rutine leak

Signed-off-by: xiaolongran <xiaolongran@tencent.com>
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d67c0c0..3d62758 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -78,6 +78,7 @@
 
 	// Channel where app is posting messages to be published
 	eventsChan      chan interface{}
+	closeCh         chan struct{}
 	connectClosedCh chan connectionClosed
 
 	publishSemaphore internal.Semaphore
@@ -117,6 +118,7 @@
 		producerID:       client.rpcClient.NewProducerID(),
 		eventsChan:       make(chan interface{}, maxPendingMessages),
 		connectClosedCh:  make(chan connectionClosed, 10),
+		closeCh:          make(chan struct{}),
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
 		publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
 		pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
@@ -369,6 +371,16 @@
 }
 
 func (p *partitionProducer) runEventsLoop() {
+	go func() {
+		select {
+		case <-p.closeCh:
+			return
+		case <-p.connectClosedCh:
+			p.log.Info("runEventsLoop will reconnect in producer")
+			p.reconnectToBroker()
+		}
+	}()
+
 	for {
 		select {
 		case i := <-p.eventsChan:
@@ -381,8 +393,6 @@
 				p.internalClose(v)
 				return
 			}
-		case <-p.connectClosedCh:
-			p.reconnectToBroker()
 		case <-p.batchFlushTicker.C:
 			if p.batchBuilder.IsMultiBatches() {
 				p.internalFlushCurrentBatches()
@@ -864,6 +874,8 @@
 	p.setProducerState(producerClosed)
 	p.cnx.UnregisterListener(p.producerID)
 	p.batchFlushTicker.Stop()
+
+	close(p.closeCh)
 }
 
 func (p *partitionProducer) LastSequenceID() int64 {