Clean callbacks of connection after run loop stopped (#239) (#248)
Fixes #239
### Motivation
As @wolfstudy pointed out here https://github.com/apache/pulsar-client-go/issues/239#issuecomment-625238240
There is a race on callbacks of `pendingReqs` when closing the connection while the run loop is still running, which will lead to calling a callback up to 2 times:
https://github.com/apache/pulsar-client-go/blob/e7f1673350f208b5063823282d14906d70d66904/pulsar/internal/connection.go#L669-L671
### Modifications
Introducing a `runLoopStoppedCh` to make sure that the run loop has already stopped when cleaning callbacks of `pendingReqs` in the `Close()`
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 0e13380..8de1ad5 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -152,6 +152,7 @@
cond *sync.Cond
state int32
connectionTimeout time.Duration
+ closeOnce sync.Once
logicalAddr *url.URL
physicalAddr *url.URL
@@ -352,10 +353,19 @@
go c.reader.readFromConnection()
go c.runPingCheck()
+ defer func() {
+ // all the accesses to the pendingReqs should be happened in this run loop thread,
+ // including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239
+ for id, req := range c.pendingReqs {
+ req.callback(nil, errors.New("connection closed"))
+ delete(c.pendingReqs, id)
+ }
+ c.Close()
+ }()
+
for {
select {
case <-c.closeCh:
- c.Close()
return
case req := <-c.incomingRequestsCh:
@@ -431,7 +441,7 @@
c.log.Debug("Write data: ", data.ReadableBytes())
if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
c.log.WithError(err).Warn("Failed to write on connection")
- c.Close()
+ c.TriggerClose()
}
}
@@ -520,7 +530,7 @@
default:
c.log.Errorf("Received invalid command type: %s", cmd.Type)
- c.Close()
+ c.TriggerClose()
}
}
@@ -638,7 +648,7 @@
authData, err := c.auth.GetData()
if err != nil {
c.log.WithError(err).Warn("Failed to load auth credentials")
- c.Close()
+ c.TriggerClose()
return
}
@@ -700,18 +710,14 @@
// Triggers the connection close by forcing the socket to close and
// broadcasting the notification on the close channel
func (c *connection) TriggerClose() {
- cnx := c.cnx
- if cnx != nil {
- cnx.Close()
- }
+ c.closeOnce.Do(func() {
+ cnx := c.cnx
+ if cnx != nil {
+ cnx.Close()
+ }
- select {
- case <-c.closeCh:
- return
- default:
close(c.closeCh)
- }
-
+ })
}
func (c *connection) Close() {
@@ -726,9 +732,7 @@
c.log.Info("Connection closed")
c.state = connectionClosed
- if c.cnx != nil {
- c.cnx.Close()
- }
+ c.TriggerClose()
c.pingTicker.Stop()
c.pingCheckTicker.Stop()
@@ -736,10 +740,6 @@
listener.ConnectionClosed()
}
- for _, req := range c.pendingReqs {
- req.callback(nil, errors.New("connection closed"))
- }
-
consumerHandlers := make(map[uint64]ConsumerHandler)
c.consumerHandlersLock.RLock()
for id, handler := range c.consumerHandlers {