Implement send orderly
diff --git a/core/api.go b/core/api.go
index 92619f3..0f4b6f9 100644
--- a/core/api.go
+++ b/core/api.go
@@ -29,7 +29,7 @@
// ProducerConfig define a producer
type ProducerConfig struct {
- GroupID string
+ GroupID string
NameServer string
Credentials *SessionCredentials
}
@@ -44,6 +44,9 @@
// SendMessageSync send a message with sync
SendMessageSync(msg *Message) SendResult
+ // SendMessageOrderly send the message orderly
+ SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult
+
// SendMessageAsync send a message with async
SendMessageAsync(msg *Message)
}
diff --git a/core/producer.go b/core/producer.go
index 3d7e022..3cccf21 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -16,12 +16,24 @@
*/
package rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
-//#include "rocketmq/CMessage.h"
-//#include "rocketmq/CProducer.h"
-//#include "rocketmq/CSendResult.h"
+/*
+#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+
+#include <stdio.h>
+#include "rocketmq/CMessage.h"
+#include "rocketmq/CProducer.h"
+#include "rocketmq/CSendResult.h"
+
+int queueSelectorCallback_cgo(int size, CMessage *msg, void *selectorKey) {
+ int queueSelectorCallback(int, void*);
+ return queueSelectorCallback(size, selectorKey);
+}
+*/
import "C"
-import "fmt"
+import (
+ "fmt"
+ "unsafe"
+)
type SendStatus int
@@ -77,7 +89,7 @@
func (p *defaultProducer) Start() error {
err := int(C.StartProducer(p.cproduer))
// TODO How to process err code.
- fmt.Printf("result: %v \n", err)
+ fmt.Printf("producer start result: %v \n", err)
return nil
}
@@ -87,7 +99,7 @@
err := C.ShutdownProducer(p.cproduer)
// TODO How to process err code.
- fmt.Printf("result: %v \n", err)
+ fmt.Printf("shutdown result: %v \n", err)
return nil
}
@@ -105,6 +117,28 @@
return result
}
+func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult {
+ cmsg := goMsgToC(msg)
+ key := selectors.put(&messageQueueSelectorWrapper{selector: selector, m: msg, arg: arg})
+
+ var sr C.struct__SendResult_
+ C.SendMessageOrderly(
+ p.cproduer,
+ cmsg,
+ (C.QueueSelectorCallback)(unsafe.Pointer(C.queueSelectorCallback_cgo)),
+ unsafe.Pointer(&key),
+ C.int(autoRetryTimes),
+ &sr,
+ )
+ C.DestroyMessage(cmsg)
+
+ return SendResult{
+ Status: SendStatus(sr.sendStatus),
+ MsgId: C.GoString(&sr.msgId[0]),
+ Offset: int64(sr.offset),
+ }
+}
+
func (p *defaultProducer) SendMessageAsync(msg *Message) {
// TODO
}
diff --git a/core/queue_selector.go b/core/queue_selector.go
new file mode 100644
index 0000000..311c378
--- /dev/null
+++ b/core/queue_selector.go
@@ -0,0 +1,62 @@
+package rocketmq
+
+import "C"
+import (
+ "strconv"
+ "sync"
+ "unsafe"
+)
+
+var selectors = selectorHolder{selectors: map[int]*messageQueueSelectorWrapper{}}
+
+//export queueSelectorCallback
+func queueSelectorCallback(size int, selectorKey unsafe.Pointer) int {
+ s, ok := selectors.getAndDelete(*(*int)(selectorKey))
+ if !ok {
+ panic("BUG: not register the selector with key:" + strconv.Itoa(*(*int)(selectorKey)))
+ }
+ return s.Select(size)
+}
+
+type messageQueueSelectorWrapper struct {
+ selector MessageQueueSelector
+
+ m *Message
+ arg interface{}
+}
+
+func (w *messageQueueSelectorWrapper) Select(size int) int {
+ return w.selector.Select(size, w.m, w.arg)
+}
+
+// MessageQueueSelector select one message queue
+type MessageQueueSelector interface {
+ Select(size int, m *Message, arg interface{}) int
+}
+
+type selectorHolder struct {
+ sync.Mutex
+
+ selectors map[int]*messageQueueSelectorWrapper
+ key int
+}
+
+func (s *selectorHolder) put(selector *messageQueueSelectorWrapper) (key int) {
+ s.Lock()
+ key = s.key
+ s.selectors[key] = selector
+ s.key++
+ s.Unlock()
+ return
+}
+
+func (s *selectorHolder) getAndDelete(key int) (*messageQueueSelectorWrapper, bool) {
+ s.Lock()
+ selector, ok := s.selectors[key]
+ if ok {
+ delete(s.selectors, key)
+ }
+ s.Unlock()
+
+ return selector, ok
+}
diff --git a/core/queue_selector_test.go b/core/queue_selector_test.go
new file mode 100644
index 0000000..06743b4
--- /dev/null
+++ b/core/queue_selector_test.go
@@ -0,0 +1,53 @@
+package rocketmq
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+type mockMessageQueueSelector struct {
+ arg interface{}
+ m *Message
+ size int
+
+ selectRet int
+}
+
+func (m *mockMessageQueueSelector) Select(size int, msg *Message, arg interface{}) int {
+ m.arg, m.m, m.size = arg, msg, size
+ return m.selectRet
+}
+
+func TestWraper(t *testing.T) {
+ s := &mockMessageQueueSelector{selectRet: 2}
+ w := &messageQueueSelectorWrapper{selector: s, m: &Message{}, arg: 3}
+
+ assert.Equal(t, 2, w.Select(4))
+ assert.Equal(t, w.m, s.m)
+ v, ok := s.arg.(int)
+ assert.True(t, ok)
+ assert.Equal(t, 3, v)
+}
+
+func TestSelectorHolder(t *testing.T) {
+ s := &messageQueueSelectorWrapper{}
+
+ key := selectors.put(s)
+ assert.Equal(t, 0, key)
+
+ key = selectors.put(s)
+ assert.Equal(t, 1, key)
+
+ assert.Equal(t, 2, len(selectors.selectors))
+
+ ss, ok := selectors.getAndDelete(0)
+ assert.Equal(t, s, ss)
+ assert.True(t, ok)
+
+ ss, ok = selectors.getAndDelete(1)
+ assert.Equal(t, s, ss)
+ assert.True(t, ok)
+
+ assert.Equal(t, 0, len(selectors.selectors))
+}
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
new file mode 100644
index 0000000..8e78fb9
--- /dev/null
+++ b/examples/orderproducer/producer.go
@@ -0,0 +1,105 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type queueSelectorByOrderID struct{}
+
+func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interface{}) int {
+ return arg.(int) % size
+}
+
+var (
+ namesrvAddrs string
+ topic string
+ body string
+ groupID string
+ msgCount int64
+ workerCount int
+)
+
+func init() {
+ flag.StringVar(&namesrvAddrs, "n", "", "name server address")
+ flag.StringVar(&topic, "t", "", "topic")
+ flag.StringVar(&groupID, "g", "", "group")
+ flag.StringVar(&body, "d", "", "body")
+ flag.Int64Var(&msgCount, "m", 0, "message count")
+ flag.IntVar(&workerCount, "w", 0, "worker count")
+}
+
+type worker struct {
+ p rocketmq.Producer
+ leftMsgCount *int64
+}
+
+func (w *worker) run() {
+ selector := queueSelectorByOrderID{}
+ for atomic.AddInt64(w.leftMsgCount, -1) >= 0 {
+ r := w.p.SendMessageOrderly(
+ &rocketmq.Message{Topic: topic, Body: body}, selector, 7 /*orderID*/, 3,
+ )
+ fmt.Printf("send result:%+v\n", r)
+ }
+}
+
+func main() {
+ flag.Parse()
+
+ if namesrvAddrs == "" {
+ println("empty namesrv address")
+ return
+ }
+
+ if topic == "" {
+ println("empty topic")
+ return
+ }
+
+ if body == "" {
+ println("empty body")
+ return
+ }
+
+ if groupID == "" {
+ println("empty groupID")
+ return
+ }
+
+ if msgCount == 0 {
+ println("zero message count")
+ return
+ }
+
+ if workerCount == 0 {
+ println("zero worker count")
+ return
+ }
+
+ producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{
+ GroupID: "testGroup",
+ NameServer: "10.200.20.25:9988",
+ })
+ producer.Start()
+ defer producer.Shutdown()
+
+ wg := sync.WaitGroup{}
+ wg.Add(workerCount)
+
+ workers := make([]worker, workerCount)
+ for i := range workers {
+ workers[i].p = producer
+ workers[i].leftMsgCount = &msgCount
+ }
+
+ for i := range workers {
+ go func(w *worker) { w.run(); wg.Done() }(&workers[i])
+ }
+
+ wg.Wait()
+}