Avoid producer deadlock on connection closing (#337)

* Avoid producer deadlock on connection closing

* Fixed constants init

* Avoid creating timer instance each time, if channel is not full

* Added debug statements
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 4be9ba2..0e13380 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -107,14 +107,14 @@
 	ConnectionClosed()
 }
 
-type connectionState int
+type connectionState int32
 
 const (
-	connectionInit connectionState = iota
-	connectionConnecting
-	connectionTCPConnected
-	connectionReady
-	connectionClosed
+	connectionInit         = 0
+	connectionConnecting   = 1
+	connectionTCPConnected = 2
+	connectionReady        = 3
+	connectionClosed       = 4
 )
 
 func (s connectionState) String() string {
@@ -150,7 +150,7 @@
 type connection struct {
 	sync.Mutex
 	cond              *sync.Cond
-	state             connectionState
+	state             int32
 	connectionTimeout time.Duration
 
 	logicalAddr  *url.URL
@@ -190,7 +190,7 @@
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
 	connectionTimeout time.Duration, auth auth.Provider) *connection {
 	cnx := &connection{
-		state:                connectionInit,
+		state:                int32(connectionInit),
 		connectionTimeout:    connectionTimeout,
 		logicalAddr:          logicalAddr,
 		physicalAddr:         physicalAddr,
@@ -397,7 +397,34 @@
 }
 
 func (c *connection) WriteData(data Buffer) {
-	c.writeRequestsCh <- data
+	select {
+	case c.writeRequestsCh <- data:
+		// Channel is not full
+		return
+
+	default:
+		// Channel full, fallback to probe if connection is closed
+	}
+
+	for {
+		select {
+		case c.writeRequestsCh <- data:
+			// Successfully wrote on the channel
+			return
+
+		case <-time.After(100 * time.Millisecond):
+			// The channel is either:
+			// 1. blocked, in which case we need to wait until we have space
+			// 2. the connection is already closed, then we need to bail out
+			c.log.Debug("Couldn't write on connection channel immediately")
+			state := connectionState(atomic.LoadInt32(&c.state))
+			if state != connectionReady {
+				c.log.Debug("Connection was already closed")
+				return
+			}
+		}
+	}
+
 }
 
 func (c *connection) internalWriteData(data Buffer) {
@@ -729,7 +756,7 @@
 
 func (c *connection) changeState(state connectionState) {
 	c.Lock()
-	c.state = state
+	atomic.StoreInt32(&c.state, int32(state))
 	c.cond.Broadcast()
 	c.Unlock()
 }