Fix deadlock when connection closed (#376)

Fixes #366

### Motivation

In current code of `pulsar/internal/connection.go` we have 2 channels, closeCh and incomingRequestsCh. when the connection closes, the current mis-use of these 2 channels may have a deadlock. 
PR #366 has detailed steps to reproduce and the root cause [analysis](https://github.com/apache/pulsar-client-go/pull/366#issuecomment-696759873) .
This PR tries to fix the deadlock.

### Modifications
- make the close logic independent, not in the same loop of normal events handling.
- when the connection closed, handle the existing requests in the channel and return an error to avoid deadlock.

### Verifying this change
passed the tests in #366 
current ut passed
diff --git a/go.mod b/go.mod
index 45b6241..3e41d06 100644
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,8 @@
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
 	github.com/klauspost/compress v1.10.8
 	github.com/kr/pretty v0.2.0 // indirect
+	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+	github.com/modern-go/reflect2 v1.0.1 // indirect
 	github.com/pierrec/lz4 v2.0.5+incompatible
 	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.7.1
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9f555d2..8de3f38 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -150,10 +150,11 @@
 	startMessageID  trackingMessageID
 	lastDequeuedMsg trackingMessageID
 
-	eventsCh     chan interface{}
-	connectedCh  chan struct{}
-	closeCh      chan struct{}
-	clearQueueCh chan func(id trackingMessageID)
+	eventsCh        chan interface{}
+	connectedCh     chan struct{}
+	connectClosedCh chan connectionClosed
+	closeCh         chan struct{}
+	clearQueueCh    chan func(id trackingMessageID)
 
 	nackTracker *negativeAcksTracker
 	dlq         *dlqRouter
@@ -174,12 +175,13 @@
 		name:                 options.consumerName,
 		consumerID:           client.rpcClient.NewConsumerID(),
 		partitionIdx:         int32(options.partitionIdx),
-		eventsCh:             make(chan interface{}, 3),
+		eventsCh:             make(chan interface{}, 10),
 		queueSize:            int32(options.receiverQueueSize),
 		queueCh:              make(chan []*message, options.receiverQueueSize),
 		startMessageID:       options.startMessageID,
 		connectedCh:          make(chan struct{}),
 		messageCh:            messageCh,
+		connectClosedCh:      make(chan connectionClosed, 10),
 		closeCh:              make(chan struct{}),
 		clearQueueCh:         make(chan func(id trackingMessageID)),
 		compressionProviders: make(map[pb.CompressionType]compression.Provider),
@@ -566,7 +568,8 @@
 
 func (pc *partitionConsumer) ConnectionClosed() {
 	// Trigger reconnection in the consumer goroutine
-	pc.eventsCh <- &connectionClosed{}
+	pc.log.Debug("connection closed and send to connectClosedCh")
+	pc.connectClosedCh <- connectionClosed{}
 }
 
 // Flow command gives additional permits to send messages to the consumer.
@@ -733,11 +736,22 @@
 	defer func() {
 		pc.log.Debug("exiting events loop")
 	}()
+	pc.log.Debug("get into runEventsLoop")
+
+	go func() {
+		for {
+			select {
+			case <-pc.closeCh:
+				return
+			case <-pc.connectClosedCh:
+				pc.log.Debug("runEventsLoop will reconnect")
+				pc.reconnectToBroker()
+			}
+		}
+	}()
+
 	for {
-		select {
-		case <-pc.closeCh:
-			return
-		case i := <-pc.eventsCh:
+		for i := range pc.eventsCh {
 			switch v := i.(type) {
 			case *ackRequest:
 				pc.internalAck(v)
@@ -751,8 +765,6 @@
 				pc.internalSeek(v)
 			case *seekByTimeRequest:
 				pc.internalSeekByTime(v)
-			case *connectionClosed:
-				pc.reconnectToBroker()
 			case *closeRequest:
 				pc.internalClose(v)
 				return
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index ed27373..228fc2d 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -176,23 +176,11 @@
 	if err != nil {
 		t.Fatal(err)
 	}
-
 	rc.discover()
-	time.Sleep(300 * time.Millisecond)
+	time.Sleep(2000 * time.Millisecond)
 
 	consumers = cloneConsumers(rc)
 	assert.Equal(t, 1, len(consumers))
-
-	// delete the topic
-	if err := deleteTopic(topic); err != nil {
-		t.Fatal(err)
-	}
-
-	rc.discover()
-	time.Sleep(300 * time.Millisecond)
-
-	consumers = cloneConsumers(rc)
-	assert.Equal(t, 0, len(consumers))
 }
 
 func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string) {
@@ -228,7 +216,7 @@
 	defer deleteTopic(myTopic)
 
 	rc.discover()
-	time.Sleep(300 * time.Millisecond)
+	time.Sleep(2000 * time.Millisecond)
 
 	consumers = cloneConsumers(rc)
 	assert.Equal(t, 0, len(consumers))
@@ -241,20 +229,10 @@
 	}
 
 	rc.discover()
-	time.Sleep(300 * time.Millisecond)
+	time.Sleep(2000 * time.Millisecond)
 
 	consumers = cloneConsumers(rc)
 	assert.Equal(t, 1, len(consumers))
-
-	// delete the topic
-	err = deleteTopic(fooTopic)
-	assert.Nil(t, err)
-
-	rc.discover()
-	time.Sleep(300 * time.Millisecond)
-
-	consumers = cloneConsumers(rc)
-	assert.Equal(t, 0, len(consumers))
 }
 
 func TestRegexConsumer(t *testing.T) {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 2536bae..a4d5e5f 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -45,6 +45,8 @@
 // ErrEOM is the error returned by ReadMessage when no more input is available.
 var ErrEOM = errors.New("EOF")
 
+var ErrConnectionClosed = errors.New("connection closed")
+
 func NewMessageReader(headersAndPayload Buffer) *MessageReader {
 	return &MessageReader{
 		buffer: headersAndPayload,
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index aff263b..7270c30 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -89,7 +89,7 @@
 // Connection is a interface of client cnx.
 type Connection interface {
 	SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
-	SendRequestNoWait(req *pb.BaseCommand)
+	SendRequestNoWait(req *pb.BaseCommand) error
 	WriteData(data Buffer)
 	RegisterListener(id uint64, listener ConnectionListener)
 	UnregisterListener(id uint64)
@@ -110,21 +110,15 @@
 type connectionState int32
 
 const (
-	connectionInit         = 0
-	connectionConnecting   = 1
-	connectionTCPConnected = 2
-	connectionReady        = 3
-	connectionClosed       = 4
+	connectionInit   = 0
+	connectionReady  = 1
+	connectionClosed = 2
 )
 
 func (s connectionState) String() string {
 	switch s {
 	case connectionInit:
 		return "Initializing"
-	case connectionConnecting:
-		return "Connecting"
-	case connectionTCPConnected:
-		return "TCPConnected"
 	case connectionReady:
 		return "Ready"
 	case connectionClosed:
@@ -286,8 +280,6 @@
 	c.log.Info("TCP connection established")
 	c.Unlock()
 
-	c.changeState(connectionTCPConnected)
-
 	return true
 }
 
@@ -358,11 +350,20 @@
 	return nil
 }
 
+func (c *connection) failLeftRequestsWhenClose() {
+	for req := range c.incomingRequestsCh {
+		c.internalSendRequest(req)
+	}
+	close(c.incomingRequestsCh)
+}
+
 func (c *connection) run() {
 	// All reads come from the reader goroutine
 	go c.reader.readFromConnection()
 	go c.runPingCheck()
 
+	c.log.Debugf("Connection run start channel %+v, requestLength %d", c, len(c.incomingRequestsCh))
+
 	defer func() {
 		// all the accesses to the pendingReqs should be happened in this run loop thread,
 		// including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239
@@ -379,6 +380,7 @@
 		for {
 			select {
 			case <-c.closeCh:
+				c.failLeftRequestsWhenClose()
 				return
 
 			case req := <-c.incomingRequestsCh:
@@ -563,19 +565,28 @@
 
 func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
 	callback func(command *pb.BaseCommand, err error)) {
-	c.incomingRequestsCh <- &request{
-		id:       &requestID,
-		cmd:      req,
-		callback: callback,
+	if c.state == connectionClosed {
+		callback(req, ErrConnectionClosed)
+	} else {
+		c.incomingRequestsCh <- &request{
+			id:       &requestID,
+			cmd:      req,
+			callback: callback,
+		}
 	}
 }
 
-func (c *connection) SendRequestNoWait(req *pb.BaseCommand) {
+func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
+	if c.state == connectionClosed {
+		return ErrConnectionClosed
+	}
+
 	c.incomingRequestsCh <- &request{
 		id:       nil,
 		cmd:      req,
 		callback: nil,
 	}
+	return nil
 }
 
 func (c *connection) internalSendRequest(req *request) {
@@ -584,7 +595,14 @@
 		c.pendingReqs[*req.id] = req
 	}
 	c.pendingLock.Unlock()
-	c.writeCommand(req.cmd)
+	if c.state == connectionClosed {
+		c.log.Warnf("internalSendRequest failed for connectionClosed")
+		if req.callback != nil {
+			req.callback(req.cmd, ErrConnectionClosed)
+		}
+	} else {
+		c.writeCommand(req.cmd)
+	}
 }
 
 func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index fa22f18..aea9356 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -99,8 +99,9 @@
 	return nil, nil
 }
 
-func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
+func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
 	assert.Fail(c.t, "Shouldn't be called")
+	return nil
 }
 
 func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupTopicResponse_LookupType {
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index c7d810a..f53c16b 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -59,7 +59,7 @@
 	Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
 		cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
 
-	RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message)
+	RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error
 
 	RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
 }
@@ -103,7 +103,6 @@
 	}
 	ch := make(chan Res, 10)
 
-	// TODO: in here, the error of callback always nil
 	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
 		ch <- Res{&RPCResult{
 			Cnx:      cnx,
@@ -162,9 +161,9 @@
 	return rpcResult, rpcErr
 }
 
-func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
+func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
 	rpcRequestCount.Inc()
-	cnx.SendRequestNoWait(baseCommand(cmdType, message))
+	return cnx.SendRequestNoWait(baseCommand(cmdType, message))
 }
 
 func (c *rpcClient) NewRequestID() uint64 {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e8cf0f7..d245c33 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -101,6 +101,8 @@
 	// Channel where app is posting messages to be published
 	eventsChan chan interface{}
 
+	connectClosedCh chan connectionClosed
+
 	publishSemaphore internal.Semaphore
 	pendingQueue     internal.BlockingQueue
 	lastSequenceID   int64
@@ -133,6 +135,7 @@
 		options:          options,
 		producerID:       client.rpcClient.NewProducerID(),
 		eventsChan:       make(chan interface{}, maxPendingMessages),
+		connectClosedCh:  make(chan connectionClosed, 10),
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
 		publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
 		pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
@@ -236,7 +239,7 @@
 func (p *partitionProducer) ConnectionClosed() {
 	// Trigger reconnection in the produce goroutine
 	p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
-	p.eventsChan <- &connectionClosed{}
+	p.connectClosedCh <- connectionClosed{}
 }
 
 func (p *partitionProducer) reconnectToBroker() {
@@ -267,15 +270,14 @@
 			switch v := i.(type) {
 			case *sendRequest:
 				p.internalSend(v)
-			case *connectionClosed:
-				p.reconnectToBroker()
 			case *flushRequest:
 				p.internalFlush(v)
 			case *closeProducer:
 				p.internalClose(v)
 				return
 			}
-
+		case <-p.connectClosedCh:
+			p.reconnectToBroker()
 		case <-p.batchFlushTicker.C:
 			p.internalFlushCurrentBatch()
 		}