Fix panic() in internal/connection when writing to a closed channel during close (#539)

The race is as follows:
T1 - calls SendRequestNoWait(), checks the connection state, and prepares
     to enter the select statement
T2 - calls TriggerClose() closes cnx and the closeCh
T3 - run() go-routine for processing incomingRequestsCh drops into
     case <-closeCh: and calls failLeftRequestsWhenClose() which drains
     and closes incomingRequestsCh
T1 - resumes and drops into the select where both closeCh and
     incomingRequestsCh are closed.

When two cases of a `select` are valid, the case executed is chosen at
random; see https://tour.golang.org/concurrency/5

This commit introduces a connectionClosing state and a wait group to track
writes by the SendRequest() methods.
* TriggerClose() moves the connection into the connectionClosing state
  before the closeCh is closed.
* The failLeftRequestsWhenClosed() method waits on the waitgroup for
  outstanding SendRequest() methods to complete before it closes
  the incomingRequestsCh
* The SendRequest() methods first add to the waitgroup before checking the
  connection state; if the state is either closing or closed, SendRequest()
  returns an error.

With the above it is not possible for thread to attempt to add a request
to the incomingRequestsCh without being tracked by the waitgroup, and the
incomingRequestsCh will not be closed until operations tracked by the
waitgroup have completed.

Signed-off-by: Daniel Ferstay <dferstay@splunk.com>

Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
Co-authored-by: xiaolongran <xiaolongran@tencent.com>
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1e3ce02..70c1468 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -89,6 +89,7 @@
 const (
 	connectionInit = iota
 	connectionReady
+	connectionClosing
 	connectionClosed
 )
 
@@ -98,6 +99,8 @@
 		return "Initializing"
 	case connectionReady:
 		return "Ready"
+	case connectionClosing:
+		return "Closing"
 	case connectionClosed:
 		return "Closed"
 	default:
@@ -142,6 +145,7 @@
 
 	requestIDGenerator uint64
 
+	incomingRequestsWG sync.WaitGroup
 	incomingRequestsCh chan *request
 	incomingCmdCh      chan *incomingCmd
 	closeCh            chan interface{}
@@ -333,10 +337,15 @@
 }
 
 func (c *connection) failLeftRequestsWhenClose() {
+	// wait for outstanding incoming requests to complete before draining
+	// and closing the channel
+	c.incomingRequestsWG.Wait()
+
 	reqLen := len(c.incomingRequestsCh)
 	for i := 0; i < reqLen; i++ {
 		c.internalSendRequest(<-c.incomingRequestsCh)
 	}
+
 	close(c.incomingRequestsCh)
 }
 
@@ -549,8 +558,13 @@
 
 func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
 	callback func(command *pb.BaseCommand, err error)) {
-	if c.getState() == connectionClosed {
+	c.incomingRequestsWG.Add(1)
+	defer c.incomingRequestsWG.Done()
+
+	state := c.getState()
+	if state == connectionClosed || state == connectionClosing {
 		callback(req, ErrConnectionClosed)
+
 	} else {
 		select {
 		case <-c.closeCh:
@@ -566,7 +580,11 @@
 }
 
 func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
-	if c.getState() == connectionClosed {
+	c.incomingRequestsWG.Add(1)
+	defer c.incomingRequestsWG.Done()
+
+	state := c.getState()
+	if state == connectionClosed || state == connectionClosing {
 		return ErrConnectionClosed
 	}
 
@@ -760,6 +778,8 @@
 // broadcasting the notification on the close channel
 func (c *connection) TriggerClose() {
 	c.closeOnce.Do(func() {
+		c.setState(connectionClosing)
+
 		cnx := c.cnx
 		if cnx != nil {
 			cnx.Close()
@@ -780,9 +800,10 @@
 	}
 
 	c.log.Info("Connection closed")
+	c.TriggerClose()
 	// do not use changeState() since they share the same lock
 	c.setState(connectionClosed)
-	c.TriggerClose()
+
 	c.pingTicker.Stop()
 	c.pingCheckTicker.Stop()