Fixed detection and handling of stale connections (#131)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 88ee673..35835ab 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -62,6 +62,7 @@
UnregisterListener(id uint64)
AddConsumeHandler(id uint64, handler ConsumerHandler)
DeleteConsumeHandler(id uint64)
+ ID() string
Close()
}
@@ -128,6 +129,7 @@
lastDataReceivedLock sync.Mutex
lastDataReceivedTime time.Time
pingTicker *time.Ticker
+ pingCheckTicker *time.Ticker
log *log.Entry
@@ -135,6 +137,7 @@
incomingRequestsCh chan *request
incomingCmdCh chan *incomingCmd
+ closeCh chan interface{}
writeRequestsCh chan []byte
pendingReqs map[uint64]*request
@@ -157,12 +160,14 @@
pendingReqs: make(map[uint64]*request),
lastDataReceivedTime: time.Now(),
pingTicker: time.NewTicker(keepAliveInterval),
+ pingCheckTicker: time.NewTicker(keepAliveInterval),
tlsOptions: tlsOptions,
auth: auth,
- incomingRequestsCh: make(chan *request),
- incomingCmdCh: make(chan *incomingCmd),
- writeRequestsCh: make(chan []byte),
+ closeCh: make(chan interface{}),
+ incomingRequestsCh: make(chan *request, 10),
+ incomingCmdCh: make(chan *incomingCmd, 10),
+ writeRequestsCh: make(chan []byte, 10),
listeners: make(map[uint64]ConnectionListener),
consumerHandlers: make(map[uint64]ConsumerHandler),
}
@@ -236,6 +241,10 @@
return false
}
+ // During the initial handshake, the internal keep alive is not
+ // active yet, so we need to timeout write and read requests
+ c.cnx.SetDeadline(time.Now().Add(keepAliveInterval))
+
c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, &pb.CommandConnect{
ProtocolVersion: &version,
ClientVersion: proto.String("Pulsar Go 0.1"),
@@ -249,6 +258,9 @@
return false
}
+ // Reset the deadline so that we don't use read timeouts
+ c.cnx.SetDeadline(time.Time{})
+
if cmd.Connected == nil {
c.log.Warnf("Failed to perform initial handshake - Expecting 'Connected' cmd, got '%s'",
cmd.Type)
@@ -279,9 +291,14 @@
func (c *connection) run() {
// All reads come from the reader goroutine
go c.reader.readFromConnection()
+ go c.runPingCheck()
for {
select {
+ case <- c.closeCh:
+ c.Close()
+ return
+
case req := <-c.incomingRequestsCh:
if req == nil {
return
@@ -303,6 +320,23 @@
}
}
+func (c *connection) runPingCheck() {
+ for {
+ select {
+ case <- c.closeCh:
+ return
+ case _ = <-c.pingCheckTicker.C:
+ if c.lastDataReceived().Add(2 * keepAliveInterval).Before(time.Now()) {
+ // We have not received a response to the previous Ping request, the
+ // connection to broker is stale
+ c.log.Warn("Detected stale connection to broker")
+ c.TriggerClose()
+ return
+ }
+ }
+ }
+}
+
func (c *connection) WriteData(data []byte) {
c.writeRequestsCh <- data
}
@@ -489,23 +523,16 @@
}
func (c *connection) sendPing() {
- if c.lastDataReceived().Add(2 * keepAliveInterval).Before(time.Now()) {
- // We have not received a response to the previous Ping request, the
- // connection to broker is stale
- c.log.Info("Detected stale connection to broker")
- c.Close()
- return
- }
-
c.log.Debug("Sending PING")
c.writeCommand(baseCommand(pb.BaseCommand_PING, &pb.CommandPing{}))
}
func (c *connection) handlePong() {
- c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
+ c.log.Debug("Received PONG response")
}
func (c *connection) handlePing() {
+ c.log.Debug("Responding to PING request")
c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
}
@@ -543,6 +570,23 @@
delete(c.listeners, id)
}
+// Triggers the connection close by forcing the socket to close and
+// broadcasting the notification on the close channel
+func (c *connection) TriggerClose() {
+ cnx := c.cnx
+ if cnx != nil {
+ cnx.Close()
+ }
+
+ select {
+ case <- c.closeCh:
+ return
+ default:
+ close(c.closeCh)
+ }
+
+}
+
func (c *connection) Close() {
c.Lock()
defer c.Unlock()
@@ -559,13 +603,16 @@
c.cnx.Close()
}
c.pingTicker.Stop()
- close(c.incomingRequestsCh)
- close(c.writeRequestsCh)
+ c.pingCheckTicker.Stop()
for _, listener := range c.listeners {
listener.ConnectionClosed()
}
+ for _, req := range c.pendingReqs {
+ req.callback(nil, errors.New("connection closed"))
+ }
+
consumerHandlers := make(map[uint64]ConsumerHandler)
c.consumerHandlersLock.RLock()
for id, handler := range c.consumerHandlers {
@@ -641,3 +688,8 @@
h, ok := c.consumerHandlers[id]
return h, ok
}
+
+func (c *connection) ID() (string) {
+ return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr())
+}
+
diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go
index c74a940..ffca7a7 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -45,7 +45,7 @@
cmd, headersAndPayload, err := r.readSingleCommand()
if err != nil {
r.cnx.log.WithError(err).Info("Error reading from connection")
- r.cnx.Close()
+ r.cnx.TriggerClose()
break
}
@@ -71,7 +71,7 @@
frameSize := r.buffer.ReadUint32()
if frameSize > MaxFrameSize {
r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
- r.cnx.Close()
+ r.cnx.TriggerClose()
return nil, nil, errors.New("Frame size too big")
}
@@ -114,7 +114,7 @@
n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
if err != nil {
- r.cnx.Close()
+ r.cnx.TriggerClose()
return false
}
@@ -127,7 +127,7 @@
err := proto.Unmarshal(data, cmd)
if err != nil {
r.cnx.log.WithError(err).Warn("Failed to parse protobuf command")
- r.cnx.Close()
+ r.cnx.TriggerClose()
return nil, err
}
return cmd, nil
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b3b8937..b9b50b0 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -48,7 +48,7 @@
cnx internal.Connection
options *ProducerOptions
- producerName *string
+ producerName string
producerID uint64
batchBuilder *internal.BatchBuilder
sequenceIDGenerator *uint64
@@ -89,7 +89,7 @@
topic: topic,
options: options,
producerID: client.rpcClient.NewProducerID(),
- eventsChan: make(chan interface{}, 1),
+ eventsChan: make(chan interface{}, 10),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: make(internal.Semaphore, maxPendingMessages),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
@@ -98,7 +98,7 @@
}
if options.Name != "" {
- p.producerName = &options.Name
+ p.producerName = options.Name
}
err := p.grabCnx()
@@ -107,8 +107,8 @@
return nil, err
}
- p.log = p.log.WithField("name", p.producerName)
- p.log.Info("Created producer")
+ p.log = p.log.WithField("producer_name", p.producerName)
+ p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
p.state = producerReady
go p.runEventsLoop()
@@ -130,9 +130,13 @@
Topic: proto.String(p.topic),
Encrypted: nil,
ProducerId: proto.Uint64(p.producerID),
- ProducerName: p.producerName,
Schema: nil,
}
+
+ if p.producerName != "" {
+ cmdProducer.ProducerName = proto.String(p.producerName)
+ }
+
if len(p.options.Properties) > 0 {
cmdProducer.Metadata = toKeyValues(p.options.Properties)
}
@@ -143,21 +147,22 @@
return err
}
- p.producerName = res.Response.ProducerSuccess.ProducerName
+ p.producerName = res.Response.ProducerSuccess.GetProducerName()
if p.batchBuilder == nil {
- p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
+ p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.producerName,
p.producerID, pb.CompressionType(p.options.CompressionType))
if err != nil {
return err
}
}
+
if p.sequenceIDGenerator == nil {
nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
p.sequenceIDGenerator = &nextSequenceID
}
p.cnx = res.Cnx
p.cnx.RegisterListener(p.producerID, p)
- p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
+ p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
if p.pendingQueue.Size() > 0 {
p.log.Infof("Resending %d pending batches", p.pendingQueue.Size())
@@ -172,6 +177,7 @@
func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
+ p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
p.eventsChan <- &connectionClosed{}
}
@@ -190,7 +196,7 @@
err := p.grabCnx()
if err == nil {
// Successfully reconnected
- p.log.Info("Reconnected producer to broker")
+ p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
return
}
}
@@ -223,7 +229,7 @@
}
func (p *partitionProducer) Name() string {
- return *p.producerName
+ return p.producerName
}
func (p *partitionProducer) internalSend(request *sendRequest) {