Avoid producer deadlock on connection closing (#337)
* Avoid producer deadlock on connection closing
* Fixed constants init
* Avoid creating timer instance each time, if channel is not full
* Added debug statements
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 4be9ba2..0e13380 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -107,14 +107,14 @@
ConnectionClosed()
}
-type connectionState int
+type connectionState int32
const (
- connectionInit connectionState = iota
- connectionConnecting
- connectionTCPConnected
- connectionReady
- connectionClosed
+ connectionInit = 0
+ connectionConnecting = 1
+ connectionTCPConnected = 2
+ connectionReady = 3
+ connectionClosed = 4
)
func (s connectionState) String() string {
@@ -150,7 +150,7 @@
type connection struct {
sync.Mutex
cond *sync.Cond
- state connectionState
+ state int32
connectionTimeout time.Duration
logicalAddr *url.URL
@@ -190,7 +190,7 @@
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
connectionTimeout time.Duration, auth auth.Provider) *connection {
cnx := &connection{
- state: connectionInit,
+ state: int32(connectionInit),
connectionTimeout: connectionTimeout,
logicalAddr: logicalAddr,
physicalAddr: physicalAddr,
@@ -397,7 +397,34 @@
}
func (c *connection) WriteData(data Buffer) {
- c.writeRequestsCh <- data
+ select {
+ case c.writeRequestsCh <- data:
+ // Channel is not full
+ return
+
+ default:
+ // Channel full, fallback to probe if connection is closed
+ }
+
+ for {
+ select {
+ case c.writeRequestsCh <- data:
+ // Successfully wrote on the channel
+ return
+
+ case <-time.After(100 * time.Millisecond):
+ // The channel is either:
+ // 1. blocked, in which case we need to wait until we have space
+ // 2. the connection is already closed, then we need to bail out
+ c.log.Debug("Couldn't write on connection channel immediately")
+ state := connectionState(atomic.LoadInt32(&c.state))
+ if state != connectionReady {
+ c.log.Debug("Connection was already closed")
+ return
+ }
+ }
+ }
+
}
func (c *connection) internalWriteData(data Buffer) {
@@ -729,7 +756,7 @@
func (c *connection) changeState(state connectionState) {
c.Lock()
- c.state = state
+ atomic.StoreInt32(&c.state, int32(state))
c.cond.Broadcast()
c.Unlock()
}