[issue 516]Add lock for compressionProviders to fix data race problem (#533)

* Add lock for compressionProviders to fix data race problem

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Fix the code without releasing the lock

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Change Mutex to RWMutex

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index daaf759..cf92949 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -138,6 +138,7 @@
 
 	log log.Logger
 
+	providersMutex       sync.RWMutex
 	compressionProviders map[pb.CompressionType]compression.Provider
 	metrics              *internal.TopicMetrics
 }
@@ -850,9 +851,11 @@
 		pc.log.Info("Closed consumer")
 	}
 
+	pc.providersMutex.Lock()
 	for _, provider := range pc.compressionProviders {
 		provider.Close()
 	}
+	pc.providersMutex.Unlock()
 
 	pc.setConsumerState(consumerClosed)
 	pc._getConn().DeleteConsumeHandler(pc.consumerID)
@@ -1062,7 +1065,9 @@
 }
 
 func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
+	pc.providersMutex.RLock()
 	provider, ok := pc.compressionProviders[msgMeta.GetCompression()]
+	pc.providersMutex.RUnlock()
 	if !ok {
 		var err error
 		if provider, err = pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil {
@@ -1070,7 +1075,9 @@
 			return nil, err
 		}
 
+		pc.providersMutex.Lock()
 		pc.compressionProviders[msgMeta.GetCompression()] = provider
+		pc.providersMutex.Unlock()
 	}
 
 	uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))