[ISSUE #293] Optimize Close of rmqClient (#303)

* optimize shutdown rmqClient
diff --git a/internal/client.go b/internal/client.go
index c8cce74..f66e8b0 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -164,15 +164,18 @@
 	hbMutex      sync.Mutex
 	close        bool
 	namesrvs     *namesrvs
+	done         chan struct{}
+	shutdownOnce sync.Once
 }
 
 var clientMap sync.Map
 
-func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) *rmqClient {
+func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
 	client := &rmqClient{
 		option:       option,
 		remoteClient: remote.NewRemotingClient(),
 		namesrvs:     option.Namesrv,
+		done:         make(chan struct{}),
 	}
 	actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
 	if !loaded {
@@ -228,63 +231,100 @@
 func (c *rmqClient) Start() {
 	//ctx, cancel := context.WithCancel(context.Background())
 	//c.cancel = cancel
-	c.close = false
 	c.once.Do(func() {
-		// TODO fetchNameServerAddr
 		if !c.option.Credentials.IsEmpty() {
 			c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
 		}
+		// TODO fetchNameServerAddr
 		go func() {}()
 
 		// schedule update route info
 		go func() {
 			// delay
+			ticker := time.NewTicker(_PullNameServerInterval)
+			defer ticker.Stop()
 			time.Sleep(50 * time.Millisecond)
-			for !c.close {
-				c.UpdateTopicRouteInfo()
-				time.Sleep(_PullNameServerInterval)
+			for {
+				select {
+				case <-ticker.C:
+					c.UpdateTopicRouteInfo()
+				case <-c.done:
+					rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
+						"clientID": c.ClientID(),
+					})
+					return
+				}
 			}
 		}()
 
 		// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
 		go func() {
-			for !c.close {
-				c.namesrvs.cleanOfflineBroker()
-				c.SendHeartbeatToAllBrokerWithLock()
-				time.Sleep(_HeartbeatBrokerInterval)
+			ticker := time.NewTicker(_HeartbeatBrokerInterval)
+			defer ticker.Stop()
+			for {
+				select {
+				case <-ticker.C:
+					c.namesrvs.cleanOfflineBroker()
+					c.SendHeartbeatToAllBrokerWithLock()
+				case <-c.done:
+					rlog.Info("The RMQClient stopping clean off line broker and heart beat", map[string]interface{}{
+						"clientID": c.ClientID(),
+					})
+					return
+				}
 			}
 		}()
 
 		// schedule persist offset
 		go func() {
-			//time.Sleep(10 * time.Second)
-			for !c.close {
-				c.consumerMap.Range(func(key, value interface{}) bool {
-					consumer := value.(InnerConsumer)
-					err := consumer.PersistConsumerOffset()
-					if err != nil {
-						rlog.Error("persist offset failed", map[string]interface{}{
-							rlog.LogKeyUnderlayError: err,
-						})
-					}
-					return true
-				})
-				time.Sleep(_PersistOffset)
+			ticker := time.NewTicker(_PersistOffset)
+			defer ticker.Stop()
+			for {
+				select {
+				case <-ticker.C:
+					c.consumerMap.Range(func(key, value interface{}) bool {
+						consumer := value.(InnerConsumer)
+						err := consumer.PersistConsumerOffset()
+						if err != nil {
+							rlog.Error("persist offset failed", map[string]interface{}{
+								rlog.LogKeyUnderlayError: err,
+							})
+						}
+						return true
+					})
+				case <-c.done:
+					rlog.Info("The RMQClient stopping persist offset", map[string]interface{}{
+						"clientID": c.ClientID(),
+					})
+					return
+				}
 			}
 		}()
 
 		go func() {
-			for !c.close {
-				c.RebalanceImmediately()
-				time.Sleep(_RebalanceInterval)
+			ticker := time.NewTicker(_RebalanceInterval)
+			defer ticker.Stop()
+			for {
+				select {
+				case <-ticker.C:
+					c.RebalanceImmediately()
+				case <-c.done:
+					rlog.Info("The RMQClient stopping do rebalance", map[string]interface{}{
+						"clientID": c.ClientID(),
+					})
+					return
+				}
 			}
 		}()
 	})
 }
 
 func (c *rmqClient) Shutdown() {
-	c.remoteClient.ShutDown()
-	c.close = true
+	c.shutdownOnce.Do(func() {
+		close(c.done)
+		c.close = true
+		c.remoteClient.ShutDown()
+	})
 }
 
 func (c *rmqClient) ClientID() string {