Fix race condition/goroutine leak in partition discovery goroutine (#474)

* pulsar/consumer_impl: rewrite partition discovery goroutine so that it actually gets closed before close active consumers

* pulsar/producer_impl: rewrite partition discovery goroutine so that it actually gets closed before close active producers

* pulsar/producer_impl: avoid consequtive Close calls side-effects by wrapping method body with sync.Once (similar to consumer.Close)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index d05c495..b7bc607 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -49,12 +49,12 @@
 	// channel used to deliver message to clients
 	messageCh chan ConsumerMessage
 
-	dlq       *dlqRouter
-	rlq       *retryRouter
-	closeOnce sync.Once
-	closeCh   chan struct{}
-	errorCh   chan error
-	ticker    *time.Ticker
+	dlq           *dlqRouter
+	rlq           *retryRouter
+	closeOnce     sync.Once
+	closeCh       chan struct{}
+	errorCh       chan error
+	stopDiscovery func()
 
 	log     log.Logger
 	metrics *internal.TopicMetrics
@@ -210,19 +210,7 @@
 	if duration <= 0 {
 		duration = defaultAutoDiscoveryDuration
 	}
-	consumer.ticker = time.NewTicker(duration)
-
-	go func() {
-		for {
-			select {
-			case <-consumer.closeCh:
-				return
-			case <-consumer.ticker.C:
-				consumer.log.Debug("Auto discovering new partitions")
-				consumer.internalTopicSubscribeToPartitions()
-			}
-		}
-	}()
+	consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)
 
 	return consumer, nil
 }
@@ -232,6 +220,32 @@
 	return c.consumerName
 }
 
+func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
+	var wg sync.WaitGroup
+	stopDiscoveryCh := make(chan struct{})
+	ticker := time.NewTicker(period)
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case <-stopDiscoveryCh:
+				return
+			case <-ticker.C:
+				c.log.Debug("Auto discovering new partitions")
+				c.internalTopicSubscribeToPartitions()
+			}
+		}
+	}()
+
+	return func() {
+		ticker.Stop()
+		close(stopDiscoveryCh)
+		wg.Wait()
+	}
+}
+
 func (c *consumer) internalTopicSubscribeToPartitions() error {
 	partitions, err := c.client.TopicPartitions(c.topic)
 	if err != nil {
@@ -485,6 +499,8 @@
 
 func (c *consumer) Close() {
 	c.closeOnce.Do(func() {
+		c.stopDiscovery()
+
 		c.Lock()
 		defer c.Unlock()
 
@@ -498,7 +514,6 @@
 		}
 		wg.Wait()
 		close(c.closeCh)
-		c.ticker.Stop()
 		c.client.handlers.Del(c)
 		c.dlq.close()
 		c.rlq.close()
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index bf4d92e..e8d43e0 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -51,8 +51,8 @@
 	producersPtr  unsafe.Pointer
 	numPartitions uint32
 	messageRouter func(*ProducerMessage, TopicMetadata) int
-	ticker        *time.Ticker
-	tickerStop    chan struct{}
+	closeOnce     sync.Once
+	stopDiscovery func()
 	log           log.Logger
 	metrics       *internal.TopicMetrics
 }
@@ -125,24 +125,36 @@
 		return nil, err
 	}
 
-	ticker := time.NewTicker(partitionsAutoDiscoveryInterval)
-	p.ticker = ticker
-	p.tickerStop = make(chan struct{})
+	p.stopDiscovery = p.runBackgroundPartitionDiscovery(partitionsAutoDiscoveryInterval)
 
+	p.metrics.ProducersOpened.Inc()
+	return p, nil
+}
+
+func (p *producer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
+	var wg sync.WaitGroup
+	stopDiscoveryCh := make(chan struct{})
+	ticker := time.NewTicker(period)
+
+	wg.Add(1)
 	go func() {
+		defer wg.Done()
 		for {
 			select {
+			case <-stopDiscoveryCh:
+				return
 			case <-ticker.C:
 				p.log.Debug("Auto discovering new partitions")
 				p.internalCreatePartitionsProducers()
-			case <-p.tickerStop:
-				return
 			}
 		}
 	}()
 
-	p.metrics.ProducersOpened.Inc()
-	return p, nil
+	return func() {
+		ticker.Stop()
+		close(stopDiscoveryCh)
+		wg.Wait()
+	}
 }
 
 func (p *producer) internalCreatePartitionsProducers() error {
@@ -292,18 +304,17 @@
 }
 
 func (p *producer) Close() {
-	p.Lock()
-	defer p.Unlock()
-	if p.ticker != nil {
-		p.ticker.Stop()
-		close(p.tickerStop)
-		p.ticker = nil
-	}
+	p.closeOnce.Do(func() {
+		p.stopDiscovery()
 
-	for _, pp := range p.producers {
-		pp.Close()
-	}
-	p.client.handlers.Del(p)
-	p.metrics.ProducersPartitions.Sub(float64(len(p.producers)))
-	p.metrics.ProducersClosed.Inc()
+		p.Lock()
+		defer p.Unlock()
+
+		for _, pp := range p.producers {
+			pp.Close()
+		}
+		p.client.handlers.Del(p)
+		p.metrics.ProducersPartitions.Sub(float64(len(p.producers)))
+		p.metrics.ProducersClosed.Inc()
+	})
 }