[ISSUE #293] Optimize Close of rmqClient (#303)
* optimize shutdown rmqClient
diff --git a/internal/client.go b/internal/client.go
index c8cce74..f66e8b0 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -164,15 +164,18 @@
hbMutex sync.Mutex
close bool
namesrvs *namesrvs
+ done chan struct{}
+ shutdownOnce sync.Once
}
var clientMap sync.Map
-func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) *rmqClient {
+func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
client := &rmqClient{
option: option,
remoteClient: remote.NewRemotingClient(),
namesrvs: option.Namesrv,
+ done: make(chan struct{}),
}
actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
if !loaded {
@@ -228,63 +231,100 @@
func (c *rmqClient) Start() {
//ctx, cancel := context.WithCancel(context.Background())
//c.cancel = cancel
- c.close = false
c.once.Do(func() {
- // TODO fetchNameServerAddr
if !c.option.Credentials.IsEmpty() {
c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
}
+ // TODO fetchNameServerAddr
go func() {}()
// schedule update route info
go func() {
// delay
+ ticker := time.NewTicker(_PullNameServerInterval)
+ defer ticker.Stop()
time.Sleep(50 * time.Millisecond)
- for !c.close {
- c.UpdateTopicRouteInfo()
- time.Sleep(_PullNameServerInterval)
+ for {
+ select {
+ case <-ticker.C:
+ c.UpdateTopicRouteInfo()
+ case <-c.done:
+ rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
+ "clientID": c.ClientID(),
+ })
+ return
+ }
}
}()
// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
go func() {
- for !c.close {
- c.namesrvs.cleanOfflineBroker()
- c.SendHeartbeatToAllBrokerWithLock()
- time.Sleep(_HeartbeatBrokerInterval)
+ ticker := time.NewTicker(_HeartbeatBrokerInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ c.namesrvs.cleanOfflineBroker()
+ c.SendHeartbeatToAllBrokerWithLock()
+ case <-c.done:
+ rlog.Info("The RMQClient stopping clean off line broker and heart beat", map[string]interface{}{
+ "clientID": c.ClientID(),
+ })
+ return
+ }
}
}()
// schedule persist offset
go func() {
- //time.Sleep(10 * time.Second)
- for !c.close {
- c.consumerMap.Range(func(key, value interface{}) bool {
- consumer := value.(InnerConsumer)
- err := consumer.PersistConsumerOffset()
- if err != nil {
- rlog.Error("persist offset failed", map[string]interface{}{
- rlog.LogKeyUnderlayError: err,
- })
- }
- return true
- })
- time.Sleep(_PersistOffset)
+ ticker := time.NewTicker(_PersistOffset)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ c.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ err := consumer.PersistConsumerOffset()
+ if err != nil {
+ rlog.Error("persist offset failed", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
+ }
+ return true
+ })
+ case <-c.done:
+ rlog.Info("The RMQClient stopping persist offset", map[string]interface{}{
+ "clientID": c.ClientID(),
+ })
+ return
+ }
}
}()
go func() {
- for !c.close {
- c.RebalanceImmediately()
- time.Sleep(_RebalanceInterval)
+ ticker := time.NewTicker(_RebalanceInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ c.RebalanceImmediately()
+ case <-c.done:
+ rlog.Info("The RMQClient stopping do rebalance", map[string]interface{}{
+ "clientID": c.ClientID(),
+ })
+ return
+ }
}
}()
})
}
func (c *rmqClient) Shutdown() {
- c.remoteClient.ShutDown()
- c.close = true
+ c.shutdownOnce.Do(func() {
+ close(c.done)
+ c.close = true
+ c.remoteClient.ShutDown()
+ })
}
func (c *rmqClient) ClientID() string {