Fixed detection and handling of stale connections (#131)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 88ee673..35835ab 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -62,6 +62,7 @@
 	UnregisterListener(id uint64)
 	AddConsumeHandler(id uint64, handler ConsumerHandler)
 	DeleteConsumeHandler(id uint64)
+	ID() string
 	Close()
 }
 
@@ -128,6 +129,7 @@
 	lastDataReceivedLock sync.Mutex
 	lastDataReceivedTime time.Time
 	pingTicker           *time.Ticker
+	pingCheckTicker      *time.Ticker
 
 	log *log.Entry
 
@@ -135,6 +137,7 @@
 
 	incomingRequestsCh chan *request
 	incomingCmdCh      chan *incomingCmd
+	closeCh            chan interface{}
 	writeRequestsCh    chan []byte
 
 	pendingReqs map[uint64]*request
@@ -157,12 +160,14 @@
 		pendingReqs:          make(map[uint64]*request),
 		lastDataReceivedTime: time.Now(),
 		pingTicker:           time.NewTicker(keepAliveInterval),
+		pingCheckTicker:      time.NewTicker(keepAliveInterval),
 		tlsOptions:           tlsOptions,
 		auth:                 auth,
 
-		incomingRequestsCh: make(chan *request),
-		incomingCmdCh:      make(chan *incomingCmd),
-		writeRequestsCh:    make(chan []byte),
+		closeCh:            make(chan interface{}),
+		incomingRequestsCh: make(chan *request, 10),
+		incomingCmdCh:      make(chan *incomingCmd, 10),
+		writeRequestsCh:    make(chan []byte, 10),
 		listeners:          make(map[uint64]ConnectionListener),
 		consumerHandlers:   make(map[uint64]ConsumerHandler),
 	}
@@ -236,6 +241,10 @@
 		return false
 	}
 
+	// During the initial handshake, the internal keep alive is not
+	// active yet, so we need to timeout write and read requests
+	c.cnx.SetDeadline(time.Now().Add(keepAliveInterval))
+
 	c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, &pb.CommandConnect{
 		ProtocolVersion: &version,
 		ClientVersion:   proto.String("Pulsar Go 0.1"),
@@ -249,6 +258,9 @@
 		return false
 	}
 
+	// Reset the deadline so that we don't use read timeouts
+	c.cnx.SetDeadline(time.Time{})
+
 	if cmd.Connected == nil {
 		c.log.Warnf("Failed to perform initial handshake - Expecting 'Connected' cmd, got '%s'",
 			cmd.Type)
@@ -279,9 +291,14 @@
 func (c *connection) run() {
 	// All reads come from the reader goroutine
 	go c.reader.readFromConnection()
+	go c.runPingCheck()
 
 	for {
 		select {
+		case <- c.closeCh:
+			c.Close()
+			return
+
 		case req := <-c.incomingRequestsCh:
 			if req == nil {
 				return
@@ -303,6 +320,23 @@
 	}
 }
 
+func (c *connection) runPingCheck() {
+	for {
+		select {
+		case <- c.closeCh:
+			return
+		case _ = <-c.pingCheckTicker.C:
+			if c.lastDataReceived().Add(2 * keepAliveInterval).Before(time.Now()) {
+				// We have not received a response to the previous Ping request, the
+				// connection to broker is stale
+				c.log.Warn("Detected stale connection to broker")
+				c.TriggerClose()
+				return
+			}
+		}
+	}
+}
+
 func (c *connection) WriteData(data []byte) {
 	c.writeRequestsCh <- data
 }
@@ -489,23 +523,16 @@
 }
 
 func (c *connection) sendPing() {
-	if c.lastDataReceived().Add(2 * keepAliveInterval).Before(time.Now()) {
-		// We have not received a response to the previous Ping request, the
-		// connection to broker is stale
-		c.log.Info("Detected stale connection to broker")
-		c.Close()
-		return
-	}
-
 	c.log.Debug("Sending PING")
 	c.writeCommand(baseCommand(pb.BaseCommand_PING, &pb.CommandPing{}))
 }
 
 func (c *connection) handlePong() {
-	c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
+	c.log.Debug("Received PONG response")
 }
 
 func (c *connection) handlePing() {
+	c.log.Debug("Responding to PING request")
 	c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
 }
 
@@ -543,6 +570,23 @@
 	delete(c.listeners, id)
 }
 
+// Triggers the connection close by forcing the socket to close and
+// broadcasting the notification on the close channel
+func (c *connection) TriggerClose() {
+	cnx := c.cnx
+	if cnx != nil {
+		cnx.Close()
+	}
+
+	select {
+		case <- c.closeCh:
+			return
+	default:
+		close(c.closeCh)
+	}
+
+}
+
 func (c *connection) Close() {
 	c.Lock()
 	defer c.Unlock()
@@ -559,13 +603,16 @@
 		c.cnx.Close()
 	}
 	c.pingTicker.Stop()
-	close(c.incomingRequestsCh)
-	close(c.writeRequestsCh)
+	c.pingCheckTicker.Stop()
 
 	for _, listener := range c.listeners {
 		listener.ConnectionClosed()
 	}
 
+	for _, req := range c.pendingReqs {
+		req.callback(nil, errors.New("connection closed"))
+	}
+
 	consumerHandlers := make(map[uint64]ConsumerHandler)
 	c.consumerHandlersLock.RLock()
 	for id, handler := range c.consumerHandlers {
@@ -641,3 +688,8 @@
 	h, ok := c.consumerHandlers[id]
 	return h, ok
 }
+
+func (c *connection) ID() (string) {
+	return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr())
+}
+
diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go
index c74a940..ffca7a7 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -45,7 +45,7 @@
 		cmd, headersAndPayload, err := r.readSingleCommand()
 		if err != nil {
 			r.cnx.log.WithError(err).Info("Error reading from connection")
-			r.cnx.Close()
+			r.cnx.TriggerClose()
 			break
 		}
 
@@ -71,7 +71,7 @@
 	frameSize := r.buffer.ReadUint32()
 	if frameSize > MaxFrameSize {
 		r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
-		r.cnx.Close()
+		r.cnx.TriggerClose()
 		return nil, nil, errors.New("Frame size too big")
 	}
 
@@ -114,7 +114,7 @@
 
 	n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
 	if err != nil {
-		r.cnx.Close()
+		r.cnx.TriggerClose()
 		return false
 	}
 
@@ -127,7 +127,7 @@
 	err := proto.Unmarshal(data, cmd)
 	if err != nil {
 		r.cnx.log.WithError(err).Warn("Failed to parse protobuf command")
-		r.cnx.Close()
+		r.cnx.TriggerClose()
 		return nil, err
 	}
 	return cmd, nil
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b3b8937..b9b50b0 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -48,7 +48,7 @@
 	cnx    internal.Connection
 
 	options             *ProducerOptions
-	producerName        *string
+	producerName        string
 	producerID          uint64
 	batchBuilder        *internal.BatchBuilder
 	sequenceIDGenerator *uint64
@@ -89,7 +89,7 @@
 		topic:            topic,
 		options:          options,
 		producerID:       client.rpcClient.NewProducerID(),
-		eventsChan:       make(chan interface{}, 1),
+		eventsChan:       make(chan interface{}, 10),
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
 		publishSemaphore: make(internal.Semaphore, maxPendingMessages),
 		pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
@@ -98,7 +98,7 @@
 	}
 
 	if options.Name != "" {
-		p.producerName = &options.Name
+		p.producerName = options.Name
 	}
 
 	err := p.grabCnx()
@@ -107,8 +107,8 @@
 		return nil, err
 	}
 
-	p.log = p.log.WithField("name", p.producerName)
-	p.log.Info("Created producer")
+	p.log = p.log.WithField("producer_name", p.producerName)
+	p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
 	p.state = producerReady
 
 	go p.runEventsLoop()
@@ -130,9 +130,13 @@
 		Topic:        proto.String(p.topic),
 		Encrypted:    nil,
 		ProducerId:   proto.Uint64(p.producerID),
-		ProducerName: p.producerName,
 		Schema:       nil,
 	}
+
+	if p.producerName != "" {
+		cmdProducer.ProducerName = proto.String(p.producerName)
+	}
+
 	if len(p.options.Properties) > 0 {
 		cmdProducer.Metadata = toKeyValues(p.options.Properties)
 	}
@@ -143,21 +147,22 @@
 		return err
 	}
 
-	p.producerName = res.Response.ProducerSuccess.ProducerName
+	p.producerName = res.Response.ProducerSuccess.GetProducerName()
 	if p.batchBuilder == nil {
-		p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
+		p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.producerName,
 			p.producerID, pb.CompressionType(p.options.CompressionType))
 		if err != nil {
 			return err
 		}
 	}
+
 	if p.sequenceIDGenerator == nil {
 		nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
 		p.sequenceIDGenerator = &nextSequenceID
 	}
 	p.cnx = res.Cnx
 	p.cnx.RegisterListener(p.producerID, p)
-	p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
+	p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
 
 	if p.pendingQueue.Size() > 0 {
 		p.log.Infof("Resending %d pending batches", p.pendingQueue.Size())
@@ -172,6 +177,7 @@
 
 func (p *partitionProducer) ConnectionClosed() {
 	// Trigger reconnection in the produce goroutine
+	p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
 	p.eventsChan <- &connectionClosed{}
 }
 
@@ -190,7 +196,7 @@
 		err := p.grabCnx()
 		if err == nil {
 			// Successfully reconnected
-			p.log.Info("Reconnected producer to broker")
+			p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
 			return
 		}
 	}
@@ -223,7 +229,7 @@
 }
 
 func (p *partitionProducer) Name() string {
-	return *p.producerName
+	return p.producerName
 }
 
 func (p *partitionProducer) internalSend(request *sendRequest) {