Fix race condition/goroutine leak in partition discovery goroutine (#474)
* pulsar/consumer_impl: rewrite partition discovery goroutine so that it actually gets closed before close active consumers
* pulsar/producer_impl: rewrite partition discovery goroutine so that it actually gets closed before close active producers
* pulsar/producer_impl: avoid consequtive Close calls side-effects by wrapping method body with sync.Once (similar to consumer.Close)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index d05c495..b7bc607 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -49,12 +49,12 @@
// channel used to deliver message to clients
messageCh chan ConsumerMessage
- dlq *dlqRouter
- rlq *retryRouter
- closeOnce sync.Once
- closeCh chan struct{}
- errorCh chan error
- ticker *time.Ticker
+ dlq *dlqRouter
+ rlq *retryRouter
+ closeOnce sync.Once
+ closeCh chan struct{}
+ errorCh chan error
+ stopDiscovery func()
log log.Logger
metrics *internal.TopicMetrics
@@ -210,19 +210,7 @@
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
}
- consumer.ticker = time.NewTicker(duration)
-
- go func() {
- for {
- select {
- case <-consumer.closeCh:
- return
- case <-consumer.ticker.C:
- consumer.log.Debug("Auto discovering new partitions")
- consumer.internalTopicSubscribeToPartitions()
- }
- }
- }()
+ consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)
return consumer, nil
}
@@ -232,6 +220,32 @@
return c.consumerName
}
+func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
+ var wg sync.WaitGroup
+ stopDiscoveryCh := make(chan struct{})
+ ticker := time.NewTicker(period)
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-stopDiscoveryCh:
+ return
+ case <-ticker.C:
+ c.log.Debug("Auto discovering new partitions")
+ c.internalTopicSubscribeToPartitions()
+ }
+ }
+ }()
+
+ return func() {
+ ticker.Stop()
+ close(stopDiscoveryCh)
+ wg.Wait()
+ }
+}
+
func (c *consumer) internalTopicSubscribeToPartitions() error {
partitions, err := c.client.TopicPartitions(c.topic)
if err != nil {
@@ -485,6 +499,8 @@
func (c *consumer) Close() {
c.closeOnce.Do(func() {
+ c.stopDiscovery()
+
c.Lock()
defer c.Unlock()
@@ -498,7 +514,6 @@
}
wg.Wait()
close(c.closeCh)
- c.ticker.Stop()
c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index bf4d92e..e8d43e0 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -51,8 +51,8 @@
producersPtr unsafe.Pointer
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
- ticker *time.Ticker
- tickerStop chan struct{}
+ closeOnce sync.Once
+ stopDiscovery func()
log log.Logger
metrics *internal.TopicMetrics
}
@@ -125,24 +125,36 @@
return nil, err
}
- ticker := time.NewTicker(partitionsAutoDiscoveryInterval)
- p.ticker = ticker
- p.tickerStop = make(chan struct{})
+ p.stopDiscovery = p.runBackgroundPartitionDiscovery(partitionsAutoDiscoveryInterval)
+ p.metrics.ProducersOpened.Inc()
+ return p, nil
+}
+
+func (p *producer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
+ var wg sync.WaitGroup
+ stopDiscoveryCh := make(chan struct{})
+ ticker := time.NewTicker(period)
+
+ wg.Add(1)
go func() {
+ defer wg.Done()
for {
select {
+ case <-stopDiscoveryCh:
+ return
case <-ticker.C:
p.log.Debug("Auto discovering new partitions")
p.internalCreatePartitionsProducers()
- case <-p.tickerStop:
- return
}
}
}()
- p.metrics.ProducersOpened.Inc()
- return p, nil
+ return func() {
+ ticker.Stop()
+ close(stopDiscoveryCh)
+ wg.Wait()
+ }
}
func (p *producer) internalCreatePartitionsProducers() error {
@@ -292,18 +304,17 @@
}
func (p *producer) Close() {
- p.Lock()
- defer p.Unlock()
- if p.ticker != nil {
- p.ticker.Stop()
- close(p.tickerStop)
- p.ticker = nil
- }
+ p.closeOnce.Do(func() {
+ p.stopDiscovery()
- for _, pp := range p.producers {
- pp.Close()
- }
- p.client.handlers.Del(p)
- p.metrics.ProducersPartitions.Sub(float64(len(p.producers)))
- p.metrics.ProducersClosed.Inc()
+ p.Lock()
+ defer p.Unlock()
+
+ for _, pp := range p.producers {
+ pp.Close()
+ }
+ p.client.handlers.Del(p)
+ p.metrics.ProducersPartitions.Sub(float64(len(p.producers)))
+ p.metrics.ProducersClosed.Inc()
+ })
}