Fixed locking between connection_reader and connection (#84)

diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 58b3ffe..83e09ad 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -600,11 +600,9 @@
 	return nil
 }
 
-func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error {
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
 	pbMsgID := response.GetMessageId()
-
-	reader := internal.NewMessageReader(headersAndPayload)
-
+	reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
 	msgMeta, err := reader.ReadMessageMetadata()
 	if err != nil {
 		// TODO send discardCorruptedMessage
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 2e933b8..a68e34a 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -66,7 +66,7 @@
 }
 
 type ConsumerHandler interface {
-	MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error
+	MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) error
 
 	// ConnectionClosed close the TCP connection.
 	ConnectionClosed()
@@ -107,6 +107,11 @@
 	callback func(command *pb.BaseCommand, err error)
 }
 
+type incomingCmd struct {
+	cmd               *pb.BaseCommand
+	headersAndPayload Buffer
+}
+
 type connection struct {
 	sync.Mutex
 	cond  *sync.Cond
@@ -129,9 +134,9 @@
 	requestIDGenerator uint64
 
 	incomingRequestsCh chan *request
+	incomingCmdCh     chan *incomingCmd
 	writeRequestsCh    chan []byte
 
-	mapMutex    sync.RWMutex
 	pendingReqs map[uint64]*request
 	listeners   map[uint64]ConnectionListener
 
@@ -156,6 +161,7 @@
 		auth:                 auth,
 
 		incomingRequestsCh: make(chan *request),
+		incomingCmdCh:      make(chan *incomingCmd),
 		writeRequestsCh:    make(chan []byte),
 		listeners:          make(map[uint64]ConnectionListener),
 		consumerHandlers:   make(map[uint64]ConsumerHandler),
@@ -280,11 +286,12 @@
 			if req == nil {
 				return
 			}
-			c.mapMutex.Lock()
 			c.pendingReqs[req.id] = req
-			c.mapMutex.Unlock()
 			c.writeCommand(req.cmd)
 
+		case cmd := <- c.incomingCmdCh:
+			c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
+
 		case data := <-c.writeRequestsCh:
 			if data == nil {
 				return
@@ -331,7 +338,11 @@
 	c.internalWriteData(data)
 }
 
-func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) {
+func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
+	c.incomingCmdCh <- &incomingCmd{cmd, headersAndPayload}
+}
+
+func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
 	c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload)
 	c.setLastDataReceived(time.Now())
 	var err error
@@ -406,14 +417,11 @@
 }
 
 func (c *connection) internalSendRequest(req *request) {
-	c.mapMutex.Lock()
 	c.pendingReqs[req.id] = req
-	c.mapMutex.Unlock()
 	c.writeCommand(req.cmd)
 }
 
 func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
-	c.mapMutex.RLock()
 	request, ok := c.pendingReqs[requestID]
 	if !ok {
 		c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type)
@@ -421,13 +429,11 @@
 	}
 
 	delete(c.pendingReqs, requestID)
-	c.mapMutex.RUnlock()
 	request.callback(response, nil)
 }
 
 func (c *connection) handleResponseError(serverError *pb.CommandError) {
 	requestID := serverError.GetRequestId()
-	c.mapMutex.RLock()
 	request, ok := c.pendingReqs[requestID]
 	if !ok {
 		c.log.Warnf("Received unexpected error response for request %d of type %s",
@@ -436,7 +442,6 @@
 	}
 
 	delete(c.pendingReqs, requestID)
-	c.mapMutex.RUnlock()
 
 	request.callback(nil,
 		errors.New(fmt.Sprintf("server error: %s: %s", serverError.GetError(), serverError.GetMessage())))
@@ -451,7 +456,7 @@
 	}
 }
 
-func (c *connection) handleMessage(response *pb.CommandMessage, payload []byte) error {
+func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) error {
 	c.log.Debug("Got Message: ", response)
 	consumerID := response.GetConsumerId()
 	if consumer, ok := c.consumerHandler(consumerID); ok {
diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go
index 746db5c..c74a940 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -55,7 +55,7 @@
 	}
 }
 
-func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload []byte, err error) {
+func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload Buffer, err error) {
 	// First, we need to read the frame size
 	if r.buffer.ReadableBytes() < 4 {
 		if r.buffer.ReadableBytes() == 0 {
@@ -92,8 +92,8 @@
 	// Also read the eventual payload
 	headersAndPayloadSize := frameSize - (cmdSize + 4)
 	if cmdSize+4 < frameSize {
-		headersAndPayload = make([]byte, headersAndPayloadSize)
-		copy(headersAndPayload, r.buffer.Read(headersAndPayloadSize))
+		headersAndPayload = NewBuffer(int(headersAndPayloadSize))
+		headersAndPayload.Write(r.buffer.Read(headersAndPayloadSize))
 	}
 	return cmd, headersAndPayload, nil
 }