Fix panic() in internal/connection when writing to a closed channel during close (#539)
The race is as follows:
T1 - calls SendRequestNoWait(), checks the connection state, and prepares
to enter the select statement
T2 - calls TriggerClose() closes cnx and the closeCh
T3 - run() go-routine for processing incomingRequestsCh drops into
case <-closeCh: and calls failLeftRequestsWhenClose() which drains
and closes incomingRequestsCh
T1 - resumes and drops into the select where both closeCh and
incomingRequestsCh are closed.
When two cases of a `select` are valid, the case executed is chosen at
random; see https://tour.golang.org/concurrency/5
This commit introduces a connectionClosing state and a wait group to track
writes by the SendRequest() methods.
* TriggerClose() moves the connection into the connectionClosing state
before the closeCh is closed.
* The failLeftRequestsWhenClosed() method waits on the waitgroup for
outstanding SendRequest() methods to complete before it closes
the incomingRequestsCh
* The SendRequest() methods first add to the waitgroup before checking the
connection state; if the state is either closing or closed, SendRequest()
returns an error.
With the above it is not possible for thread to attempt to add a request
to the incomingRequestsCh without being tracked by the waitgroup, and the
incomingRequestsCh will not be closed until operations tracked by the
waitgroup have completed.
Signed-off-by: Daniel Ferstay <dferstay@splunk.com>
Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
Co-authored-by: xiaolongran <xiaolongran@tencent.com>
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1e3ce02..70c1468 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -89,6 +89,7 @@
const (
connectionInit = iota
connectionReady
+ connectionClosing
connectionClosed
)
@@ -98,6 +99,8 @@
return "Initializing"
case connectionReady:
return "Ready"
+ case connectionClosing:
+ return "Closing"
case connectionClosed:
return "Closed"
default:
@@ -142,6 +145,7 @@
requestIDGenerator uint64
+ incomingRequestsWG sync.WaitGroup
incomingRequestsCh chan *request
incomingCmdCh chan *incomingCmd
closeCh chan interface{}
@@ -333,10 +337,15 @@
}
func (c *connection) failLeftRequestsWhenClose() {
+ // wait for outstanding incoming requests to complete before draining
+ // and closing the channel
+ c.incomingRequestsWG.Wait()
+
reqLen := len(c.incomingRequestsCh)
for i := 0; i < reqLen; i++ {
c.internalSendRequest(<-c.incomingRequestsCh)
}
+
close(c.incomingRequestsCh)
}
@@ -549,8 +558,13 @@
func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
- if c.getState() == connectionClosed {
+ c.incomingRequestsWG.Add(1)
+ defer c.incomingRequestsWG.Done()
+
+ state := c.getState()
+ if state == connectionClosed || state == connectionClosing {
callback(req, ErrConnectionClosed)
+
} else {
select {
case <-c.closeCh:
@@ -566,7 +580,11 @@
}
func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
- if c.getState() == connectionClosed {
+ c.incomingRequestsWG.Add(1)
+ defer c.incomingRequestsWG.Done()
+
+ state := c.getState()
+ if state == connectionClosed || state == connectionClosing {
return ErrConnectionClosed
}
@@ -760,6 +778,8 @@
// broadcasting the notification on the close channel
func (c *connection) TriggerClose() {
c.closeOnce.Do(func() {
+ c.setState(connectionClosing)
+
cnx := c.cnx
if cnx != nil {
cnx.Close()
@@ -780,9 +800,10 @@
}
c.log.Info("Connection closed")
+ c.TriggerClose()
// do not use changeState() since they share the same lock
c.setState(connectionClosed)
- c.TriggerClose()
+
c.pingTicker.Stop()
c.pingCheckTicker.Stop()