Merge pull request #677 from wenfengwang/fix-init

[ISSUE #678] optimizing goroutine of Stat creation
diff --git a/producer/producer.go b/producer/producer.go
index 910bb23..8ebb660 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -143,6 +143,8 @@
 		return nil, err
 	}
 
+	p.messagesWithNamespace(msgs...)
+
 	msg := p.encodeBatch(msgs...)
 
 	resp := primitive.NewSendResult()
@@ -179,10 +181,6 @@
 		err error
 	)
 
-	if p.options.Namespace != "" {
-		msg.Topic = p.options.Namespace + "%" + msg.Topic
-	}
-
 	var producerCtx *primitive.ProducerCtx
 	for retryCount := 0; retryCount < retryTime; retryCount++ {
 		mq := p.selectMessageQueue(msg)
@@ -217,6 +215,8 @@
 		return err
 	}
 
+	p.messagesWithNamespace(msgs...)
+
 	msg := p.encodeBatch(msgs...)
 
 	if p.interceptor != nil {
@@ -230,9 +230,7 @@
 }
 
 func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
-	if p.options.Namespace != "" {
-		msg.Topic = p.options.Namespace + "%" + msg.Topic
-	}
+
 	mq := p.selectMessageQueue(msg)
 	if mq == nil {
 		return errors.Errorf("the topic=%s route info not found", msg.Topic)
@@ -260,6 +258,8 @@
 		return err
 	}
 
+	p.messagesWithNamespace(msgs...)
+
 	msg := p.encodeBatch(msgs...)
 
 	if p.interceptor != nil {
@@ -275,10 +275,6 @@
 func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
 	retryTime := 1 + p.options.RetryTimes
 
-	if p.options.Namespace != "" {
-		msg.Topic = p.options.Namespace + "%" + msg.Topic
-	}
-
 	var err error
 	for retryCount := 0; retryCount < retryTime; retryCount++ {
 		mq := p.selectMessageQueue(msg)
@@ -302,6 +298,17 @@
 	return err
 }
 
+func (p *defaultProducer) messagesWithNamespace(msgs ...*primitive.Message) {
+
+	if p.options.Namespace == "" {
+		return
+	}
+
+	for _, msg := range msgs {
+		msg.Topic = p.options.Namespace + "%" + msg.Topic
+	}
+}
+
 func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool {
 	if msg.Compress {
 		return true
diff --git a/producer/producer_test.go b/producer/producer_test.go
index d1b88c3..12cefb8 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -30,7 +30,8 @@
 )
 
 const (
-	topic = "TopicTest"
+	topic          = "TopicTest"
+	namespaceTopic = "Test%TopicTest"
 )
 
 func TestShutdown(t *testing.T) {
@@ -83,6 +84,16 @@
 			},
 		},
 	})
+	p.publishInfo.Store(namespaceTopic, &internal.TopicPublishInfo{
+		HaveTopicRouterInfo: true,
+		MqList: []*primitive.MessageQueue{
+			{
+				Topic:      namespaceTopic,
+				BrokerName: "aa",
+				QueueId:    0,
+			},
+		},
+	})
 	p.options.Namesrv.AddBroker(&internal.TopicRouteData{
 		BrokerDataList: []*internal.BrokerData{
 			{
@@ -245,3 +256,56 @@
 	err = p.SendOneWay(ctx, msg)
 	assert.Nil(t, err)
 }
+
+func TestSyncWithNamespace(t *testing.T) {
+	p, _ := NewDefaultProducer(
+		WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+		WithRetry(2),
+		WithQueueSelector(NewManualQueueSelector()),
+		WithNamespace("Test"),
+	)
+
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	client := internal.NewMockRMQClient(ctrl)
+	p.client = client
+
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().Start().Return()
+	err := p.Start()
+	assert.Nil(t, err)
+
+	ctx := context.Background()
+	msg := &primitive.Message{
+		Topic: topic,
+		Body:  []byte("this is a message body"),
+		Queue: &primitive.MessageQueue{
+			Topic:      namespaceTopic,
+			BrokerName: "aa",
+			QueueId:    0,
+		},
+	}
+	msg.WithProperty("key", "value")
+
+	expectedResp := &primitive.SendResult{
+		Status:      primitive.SendOK,
+		MsgID:       "111",
+		QueueOffset: 0,
+		OffsetMsgID: "0",
+	}
+
+	mockB4Send(p)
+
+	client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
+	client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
+		func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
+			resp.Status = expectedResp.Status
+			resp.MsgID = expectedResp.MsgID
+			resp.QueueOffset = expectedResp.QueueOffset
+			resp.OffsetMsgID = expectedResp.OffsetMsgID
+		})
+	resp, err := p.SendSync(ctx, msg)
+	assert.Nil(t, err)
+	assert.Equal(t, expectedResp, resp)
+	assert.Equal(t, namespaceTopic, msg.Topic)
+}
diff --git a/producer/selector.go b/producer/selector.go
index 69d216f..74f5bad 100644
--- a/producer/selector.go
+++ b/producer/selector.go
@@ -21,7 +21,6 @@
 	"hash/fnv"
 	"math/rand"
 	"sync"
-	"sync/atomic"
 	"time"
 
 	"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -64,35 +63,32 @@
 // roundRobinQueueSelector choose the queue by roundRobin.
 type roundRobinQueueSelector struct {
 	sync.Locker
-	indexer map[string]*int32
+	indexer map[string]*uint32
 }
 
 func NewRoundRobinQueueSelector() QueueSelector {
 	s := &roundRobinQueueSelector{
 		Locker:  new(sync.Mutex),
-		indexer: map[string]*int32{},
+		indexer: map[string]*uint32{},
 	}
 	return s
 }
 
 func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
 	t := message.Topic
-	if _, exist := r.indexer[t]; !exist {
-		r.Lock()
-		if _, exist := r.indexer[t]; !exist {
-			var v = int32(0)
-			r.indexer[t] = &v
-		}
-		r.Unlock()
-	}
-	index := r.indexer[t]
+	var idx *uint32
 
-	i := atomic.AddInt32(index, 1)
-	if i < 0 {
-		i = -i
-		atomic.StoreInt32(index, 0)
+	r.Lock()
+	idx, exist := r.indexer[t]
+	if !exist {
+		var v uint32 = 0
+		idx = &v
+		r.indexer[t] = idx
 	}
-	qIndex := int(i) % len(queues)
+	*idx++
+	r.Unlock()
+
+	qIndex := *idx % uint32(len(queues))
 	return queues[qIndex]
 }