[issue 516]Add lock for compressionProviders to fix data race problem (#533)
* Add lock for compressionProviders to fix data race problem
Signed-off-by: fishpenguin <kun.yu@zilliz.com>
* Fix the code without releasing the lock
Signed-off-by: fishpenguin <kun.yu@zilliz.com>
* Change Mutex to RWMutex
Signed-off-by: fishpenguin <kun.yu@zilliz.com>
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index daaf759..cf92949 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -138,6 +138,7 @@
log log.Logger
+ providersMutex sync.RWMutex
compressionProviders map[pb.CompressionType]compression.Provider
metrics *internal.TopicMetrics
}
@@ -850,9 +851,11 @@
pc.log.Info("Closed consumer")
}
+ pc.providersMutex.Lock()
for _, provider := range pc.compressionProviders {
provider.Close()
}
+ pc.providersMutex.Unlock()
pc.setConsumerState(consumerClosed)
pc._getConn().DeleteConsumeHandler(pc.consumerID)
@@ -1062,7 +1065,9 @@
}
func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
+ pc.providersMutex.RLock()
provider, ok := pc.compressionProviders[msgMeta.GetCompression()]
+ pc.providersMutex.RUnlock()
if !ok {
var err error
if provider, err = pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil {
@@ -1070,7 +1075,9 @@
return nil, err
}
+ pc.providersMutex.Lock()
pc.compressionProviders[msgMeta.GetCompression()] = provider
+ pc.providersMutex.Unlock()
}
uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))