Fix data race issue in ServiceNameResolver (#581)
* fix: fix data race at service_name_resolver
* fix: remove atomic.StoreInt32 from service_name_resolver, move rand.Seed to init function at service_name_resolver
* fix: move mutex at service_name_resolver
diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go
index 3b1209c..419f8be 100644
--- a/pulsar/internal/service_name_resolver.go
+++ b/pulsar/internal/service_name_resolver.go
@@ -22,7 +22,7 @@
"fmt"
"math/rand"
"net/url"
- "sync/atomic"
+ "sync"
"time"
log "github.com/sirupsen/logrus"
@@ -42,6 +42,12 @@
ServiceURL *url.URL
CurrentIndex int32
AddressList []*url.URL
+
+ mutex sync.Mutex
+}
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
}
func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver {
@@ -54,6 +60,9 @@
}
func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL, error) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
if r.AddressList == nil {
return nil, errors.New("no service url is provided yet")
}
@@ -64,7 +73,7 @@
return r.AddressList[0], nil
}
idx := (r.CurrentIndex + 1) % int32(len(r.AddressList))
- atomic.StoreInt32(&r.CurrentIndex, idx)
+ r.CurrentIndex = idx
return r.AddressList[idx], nil
}
@@ -95,11 +104,14 @@
}
addresses = append(addresses, u)
}
+
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
r.AddressList = addresses
r.ServiceURL = u
r.ServiceURI = uri
- rand.Seed(time.Now().Unix()) // initialize global pseudo random generator
- atomic.StoreInt32(&r.CurrentIndex, int32(rand.Intn(len(addresses))))
+ r.CurrentIndex = int32(rand.Intn(len(addresses)))
return nil
}