Fix the map of pendingReqs concurrent issue (#48)
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Fix the map of pendingReqs concurrent issue.
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 11337ec..d83f30b 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -105,9 +105,11 @@
incomingRequests chan *request
writeRequests chan []byte
- pendingReqs map[uint64]*request
- listeners map[uint64]ConnectionListener
- connWrapper *ConnWrapper
+
+ mapMutex sync.RWMutex
+ pendingReqs map[uint64]*request
+ listeners map[uint64]ConnectionListener
+ connWrapper *ConnWrapper
tlsOptions *TLSOptions
auth auth.Provider
@@ -255,7 +257,9 @@
if req == nil {
return
}
+ c.mapMutex.Lock()
c.pendingReqs[req.id] = req
+ c.mapMutex.Unlock()
c.writeCommand(req.cmd)
case data := <-c.writeRequests:
@@ -369,12 +373,16 @@
}
func (c *connection) internalSendRequest(req *request) {
+ c.mapMutex.Lock()
c.pendingReqs[req.id] = req
+ c.mapMutex.Unlock()
c.writeCommand(req.cmd)
}
func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
+ c.mapMutex.RLock()
request, ok := c.pendingReqs[requestID]
+ c.mapMutex.RUnlock()
if !ok {
c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type)
return