add closeCh for go rutine leak
Signed-off-by: xiaolongran <xiaolongran@tencent.com>
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d67c0c0..3d62758 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -78,6 +78,7 @@
// Channel where app is posting messages to be published
eventsChan chan interface{}
+ closeCh chan struct{}
connectClosedCh chan connectionClosed
publishSemaphore internal.Semaphore
@@ -117,6 +118,7 @@
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
connectClosedCh: make(chan connectionClosed, 10),
+ closeCh: make(chan struct{}),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
@@ -369,6 +371,16 @@
}
func (p *partitionProducer) runEventsLoop() {
+ go func() {
+ select {
+ case <-p.closeCh:
+ return
+ case <-p.connectClosedCh:
+ p.log.Info("runEventsLoop will reconnect in producer")
+ p.reconnectToBroker()
+ }
+ }()
+
for {
select {
case i := <-p.eventsChan:
@@ -381,8 +393,6 @@
p.internalClose(v)
return
}
- case <-p.connectClosedCh:
- p.reconnectToBroker()
case <-p.batchFlushTicker.C:
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
@@ -864,6 +874,8 @@
p.setProducerState(producerClosed)
p.cnx.UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()
+
+ close(p.closeCh)
}
func (p *partitionProducer) LastSequenceID() int64 {