Merge pull request #677 from wenfengwang/fix-init
[ISSUE #678] optimizing goroutine of Stat creation
diff --git a/producer/producer.go b/producer/producer.go
index 910bb23..8ebb660 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -143,6 +143,8 @@
return nil, err
}
+ p.messagesWithNamespace(msgs...)
+
msg := p.encodeBatch(msgs...)
resp := primitive.NewSendResult()
@@ -179,10 +181,6 @@
err error
)
- if p.options.Namespace != "" {
- msg.Topic = p.options.Namespace + "%" + msg.Topic
- }
-
var producerCtx *primitive.ProducerCtx
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
@@ -217,6 +215,8 @@
return err
}
+ p.messagesWithNamespace(msgs...)
+
msg := p.encodeBatch(msgs...)
if p.interceptor != nil {
@@ -230,9 +230,7 @@
}
func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
- if p.options.Namespace != "" {
- msg.Topic = p.options.Namespace + "%" + msg.Topic
- }
+
mq := p.selectMessageQueue(msg)
if mq == nil {
return errors.Errorf("the topic=%s route info not found", msg.Topic)
@@ -260,6 +258,8 @@
return err
}
+ p.messagesWithNamespace(msgs...)
+
msg := p.encodeBatch(msgs...)
if p.interceptor != nil {
@@ -275,10 +275,6 @@
func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
retryTime := 1 + p.options.RetryTimes
- if p.options.Namespace != "" {
- msg.Topic = p.options.Namespace + "%" + msg.Topic
- }
-
var err error
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
@@ -302,6 +298,17 @@
return err
}
+func (p *defaultProducer) messagesWithNamespace(msgs ...*primitive.Message) {
+
+ if p.options.Namespace == "" {
+ return
+ }
+
+ for _, msg := range msgs {
+ msg.Topic = p.options.Namespace + "%" + msg.Topic
+ }
+}
+
func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool {
if msg.Compress {
return true
diff --git a/producer/producer_test.go b/producer/producer_test.go
index d1b88c3..12cefb8 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -30,7 +30,8 @@
)
const (
- topic = "TopicTest"
+ topic = "TopicTest"
+ namespaceTopic = "Test%TopicTest"
)
func TestShutdown(t *testing.T) {
@@ -83,6 +84,16 @@
},
},
})
+ p.publishInfo.Store(namespaceTopic, &internal.TopicPublishInfo{
+ HaveTopicRouterInfo: true,
+ MqList: []*primitive.MessageQueue{
+ {
+ Topic: namespaceTopic,
+ BrokerName: "aa",
+ QueueId: 0,
+ },
+ },
+ })
p.options.Namesrv.AddBroker(&internal.TopicRouteData{
BrokerDataList: []*internal.BrokerData{
{
@@ -245,3 +256,56 @@
err = p.SendOneWay(ctx, msg)
assert.Nil(t, err)
}
+
+func TestSyncWithNamespace(t *testing.T) {
+ p, _ := NewDefaultProducer(
+ WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ WithRetry(2),
+ WithQueueSelector(NewManualQueueSelector()),
+ WithNamespace("Test"),
+ )
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ client := internal.NewMockRMQClient(ctrl)
+ p.client = client
+
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().Start().Return()
+ err := p.Start()
+ assert.Nil(t, err)
+
+ ctx := context.Background()
+ msg := &primitive.Message{
+ Topic: topic,
+ Body: []byte("this is a message body"),
+ Queue: &primitive.MessageQueue{
+ Topic: namespaceTopic,
+ BrokerName: "aa",
+ QueueId: 0,
+ },
+ }
+ msg.WithProperty("key", "value")
+
+ expectedResp := &primitive.SendResult{
+ Status: primitive.SendOK,
+ MsgID: "111",
+ QueueOffset: 0,
+ OffsetMsgID: "0",
+ }
+
+ mockB4Send(p)
+
+ client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
+ client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
+ func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
+ resp.Status = expectedResp.Status
+ resp.MsgID = expectedResp.MsgID
+ resp.QueueOffset = expectedResp.QueueOffset
+ resp.OffsetMsgID = expectedResp.OffsetMsgID
+ })
+ resp, err := p.SendSync(ctx, msg)
+ assert.Nil(t, err)
+ assert.Equal(t, expectedResp, resp)
+ assert.Equal(t, namespaceTopic, msg.Topic)
+}
diff --git a/producer/selector.go b/producer/selector.go
index 69d216f..74f5bad 100644
--- a/producer/selector.go
+++ b/producer/selector.go
@@ -21,7 +21,6 @@
"hash/fnv"
"math/rand"
"sync"
- "sync/atomic"
"time"
"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -64,35 +63,32 @@
// roundRobinQueueSelector choose the queue by roundRobin.
type roundRobinQueueSelector struct {
sync.Locker
- indexer map[string]*int32
+ indexer map[string]*uint32
}
func NewRoundRobinQueueSelector() QueueSelector {
s := &roundRobinQueueSelector{
Locker: new(sync.Mutex),
- indexer: map[string]*int32{},
+ indexer: map[string]*uint32{},
}
return s
}
func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
t := message.Topic
- if _, exist := r.indexer[t]; !exist {
- r.Lock()
- if _, exist := r.indexer[t]; !exist {
- var v = int32(0)
- r.indexer[t] = &v
- }
- r.Unlock()
- }
- index := r.indexer[t]
+ var idx *uint32
- i := atomic.AddInt32(index, 1)
- if i < 0 {
- i = -i
- atomic.StoreInt32(index, 0)
+ r.Lock()
+ idx, exist := r.indexer[t]
+ if !exist {
+ var v uint32 = 0
+ idx = &v
+ r.indexer[t] = idx
}
- qIndex := int(i) % len(queues)
+ *idx++
+ r.Unlock()
+
+ qIndex := *idx % uint32(len(queues))
return queues[qIndex]
}