Merge pull request #6 from zjykzk/feature-pull-consumer
Pull consumer with unit test
diff --git a/core/api.go b/core/api.go
index b64cc97..39240ba 100644
--- a/core/api.go
+++ b/core/api.go
@@ -44,6 +44,9 @@
// SendMessageSync send a message with sync
SendMessageSync(msg *Message) SendResult
+ // SendMessageOrderly send the message orderly
+ SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult
+
// SendMessageAsync send a message with async
SendMessageAsync(msg *Message)
}
diff --git a/core/producer.go b/core/producer.go
index 3d7e022..3cccf21 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -16,12 +16,24 @@
*/
package rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
-//#include "rocketmq/CMessage.h"
-//#include "rocketmq/CProducer.h"
-//#include "rocketmq/CSendResult.h"
+/*
+#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+
+#include <stdio.h>
+#include "rocketmq/CMessage.h"
+#include "rocketmq/CProducer.h"
+#include "rocketmq/CSendResult.h"
+
+int queueSelectorCallback_cgo(int size, CMessage *msg, void *selectorKey) {
+ int queueSelectorCallback(int, void*);
+ return queueSelectorCallback(size, selectorKey);
+}
+*/
import "C"
-import "fmt"
+import (
+ "fmt"
+ "unsafe"
+)
type SendStatus int
@@ -77,7 +89,7 @@
func (p *defaultProducer) Start() error {
err := int(C.StartProducer(p.cproduer))
// TODO How to process err code.
- fmt.Printf("result: %v \n", err)
+ fmt.Printf("producer start result: %v \n", err)
return nil
}
@@ -87,7 +99,7 @@
err := C.ShutdownProducer(p.cproduer)
// TODO How to process err code.
- fmt.Printf("result: %v \n", err)
+ fmt.Printf("shutdown result: %v \n", err)
return nil
}
@@ -105,6 +117,28 @@
return result
}
+func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult {
+ cmsg := goMsgToC(msg)
+ key := selectors.put(&messageQueueSelectorWrapper{selector: selector, m: msg, arg: arg})
+
+ var sr C.struct__SendResult_
+ C.SendMessageOrderly(
+ p.cproduer,
+ cmsg,
+ (C.QueueSelectorCallback)(unsafe.Pointer(C.queueSelectorCallback_cgo)),
+ unsafe.Pointer(&key),
+ C.int(autoRetryTimes),
+ &sr,
+ )
+ C.DestroyMessage(cmsg)
+
+ return SendResult{
+ Status: SendStatus(sr.sendStatus),
+ MsgId: C.GoString(&sr.msgId[0]),
+ Offset: int64(sr.offset),
+ }
+}
+
func (p *defaultProducer) SendMessageAsync(msg *Message) {
// TODO
}
diff --git a/core/queue_selector.go b/core/queue_selector.go
new file mode 100644
index 0000000..311c378
--- /dev/null
+++ b/core/queue_selector.go
@@ -0,0 +1,62 @@
+package rocketmq
+
+import "C"
+import (
+ "strconv"
+ "sync"
+ "unsafe"
+)
+
+var selectors = selectorHolder{selectors: map[int]*messageQueueSelectorWrapper{}}
+
+//export queueSelectorCallback
+func queueSelectorCallback(size int, selectorKey unsafe.Pointer) int {
+ s, ok := selectors.getAndDelete(*(*int)(selectorKey))
+ if !ok {
+ panic("BUG: not register the selector with key:" + strconv.Itoa(*(*int)(selectorKey)))
+ }
+ return s.Select(size)
+}
+
+type messageQueueSelectorWrapper struct {
+ selector MessageQueueSelector
+
+ m *Message
+ arg interface{}
+}
+
+func (w *messageQueueSelectorWrapper) Select(size int) int {
+ return w.selector.Select(size, w.m, w.arg)
+}
+
+// MessageQueueSelector select one message queue
+type MessageQueueSelector interface {
+ Select(size int, m *Message, arg interface{}) int
+}
+
+type selectorHolder struct {
+ sync.Mutex
+
+ selectors map[int]*messageQueueSelectorWrapper
+ key int
+}
+
+func (s *selectorHolder) put(selector *messageQueueSelectorWrapper) (key int) {
+ s.Lock()
+ key = s.key
+ s.selectors[key] = selector
+ s.key++
+ s.Unlock()
+ return
+}
+
+func (s *selectorHolder) getAndDelete(key int) (*messageQueueSelectorWrapper, bool) {
+ s.Lock()
+ selector, ok := s.selectors[key]
+ if ok {
+ delete(s.selectors, key)
+ }
+ s.Unlock()
+
+ return selector, ok
+}
diff --git a/core/queue_selector_test.go b/core/queue_selector_test.go
new file mode 100644
index 0000000..06743b4
--- /dev/null
+++ b/core/queue_selector_test.go
@@ -0,0 +1,53 @@
+package rocketmq
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+type mockMessageQueueSelector struct {
+ arg interface{}
+ m *Message
+ size int
+
+ selectRet int
+}
+
+func (m *mockMessageQueueSelector) Select(size int, msg *Message, arg interface{}) int {
+ m.arg, m.m, m.size = arg, msg, size
+ return m.selectRet
+}
+
+func TestWraper(t *testing.T) {
+ s := &mockMessageQueueSelector{selectRet: 2}
+ w := &messageQueueSelectorWrapper{selector: s, m: &Message{}, arg: 3}
+
+ assert.Equal(t, 2, w.Select(4))
+ assert.Equal(t, w.m, s.m)
+ v, ok := s.arg.(int)
+ assert.True(t, ok)
+ assert.Equal(t, 3, v)
+}
+
+func TestSelectorHolder(t *testing.T) {
+ s := &messageQueueSelectorWrapper{}
+
+ key := selectors.put(s)
+ assert.Equal(t, 0, key)
+
+ key = selectors.put(s)
+ assert.Equal(t, 1, key)
+
+ assert.Equal(t, 2, len(selectors.selectors))
+
+ ss, ok := selectors.getAndDelete(0)
+ assert.Equal(t, s, ss)
+ assert.True(t, ok)
+
+ ss, ok = selectors.getAndDelete(1)
+ assert.Equal(t, s, ss)
+ assert.True(t, ok)
+
+ assert.Equal(t, 0, len(selectors.selectors))
+}
diff --git a/doc/Introduction.md b/doc/Introduction.md
index 8b13789..3f24e99 100644
--- a/doc/Introduction.md
+++ b/doc/Introduction.md
@@ -1 +1,84 @@
+----------
+## RocketMQ Client Go
+### 1. Go Version
+* go1.10.5 darwin/amd64
+
+
+### 2. Dependency
+* [librocketmq](https://github.com/apache/rocketmq-client-cpp)
+
+### 3. Build and Install
+#### macOS Platform (macOS Mojave 10.14)
+* Install Compile tools (homebrew package manager)
+ ```
+ 1. xcode-select --install
+ 2. brew install cmake
+ 3. brew install automake
+ ```
+* Install dependencies
+ 1. [Go official download](https://golang.org/dl/)
+
+ 2. Get go client package
+ ```
+ go get github.com/apache/rocketmq-client-go
+ ```
+ 3. [librocketmq](https://github.com/apache/rocketmq-client-cpp)
+ - `git clone https://github.com/apache/rocketmq-client-cpp`
+ - `cd rocketmq-client-cpp`
+ - `sudo sh build.sh`
+ - `cp bin/librocketmq.dylib /usr/local/lib`
+ - `sudo mkdir /usr/local/include/rocketmq`
+ - `sudo cp incldue/* /usr/local/incldue/rocketmq/`
+
+#### Linux
+
+*coming soon*
+
+#### Windows
+
+*coming soon*
+
+----------
+## How to use
+
+- import package
+ ```
+ import "github.com/apache/rocketmq-client-go/core"
+ ```
+- Send message
+ ```go
+ func SendMessagge(){
+ producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{GroupID: "testGroup", NameServer: "localhost:9876"})
+ producer.Start()
+ defer producer.Shutdown()
+ fmt.Printf("Producer: %s started... \n", producer)
+ for i := 0; i < 100; i++ {
+ msg := fmt.Sprintf("Hello RocketMQ-%d", i)
+ result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+ fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
+ }
+ time.Sleep(10 * time.Second)
+ producer.Shutdown()
+ }
+ ```
+- Push Consumer
+ ```go
+ func PushConsumeMessage() {
+ fmt.Println("Start Receiving Messages...")
+ consumer, _ := rocketmq.NewPushConsumer(&rocketmq.ConsumerConfig{
+ GroupID: "testGroupId",
+ NameServer: "localhost:9876",
+ ConsumerThreadCount: 2,
+ MessageBatchMaxSize: 16})
+ // MUST subscribe topic before consumer started.
+ consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt)rocketmq.ConsumeStatus {
+ fmt.Printf("A message received: \"%s\" \n", msg.Body)
+ return rocketmq.ConsumeSuccess})
+ consumer.Start()
+ defer consumer.Shutdown()
+ fmt.Printf("consumer: %s started...\n", consumer)
+ time.Sleep(10 * time.Minute)
+ }
+ ```
+- [Full example](../examples)
\ No newline at end of file
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
new file mode 100644
index 0000000..8e78fb9
--- /dev/null
+++ b/examples/orderproducer/producer.go
@@ -0,0 +1,105 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type queueSelectorByOrderID struct{}
+
+func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interface{}) int {
+ return arg.(int) % size
+}
+
+var (
+ namesrvAddrs string
+ topic string
+ body string
+ groupID string
+ msgCount int64
+ workerCount int
+)
+
+func init() {
+ flag.StringVar(&namesrvAddrs, "n", "", "name server address")
+ flag.StringVar(&topic, "t", "", "topic")
+ flag.StringVar(&groupID, "g", "", "group")
+ flag.StringVar(&body, "d", "", "body")
+ flag.Int64Var(&msgCount, "m", 0, "message count")
+ flag.IntVar(&workerCount, "w", 0, "worker count")
+}
+
+type worker struct {
+ p rocketmq.Producer
+ leftMsgCount *int64
+}
+
+func (w *worker) run() {
+ selector := queueSelectorByOrderID{}
+ for atomic.AddInt64(w.leftMsgCount, -1) >= 0 {
+ r := w.p.SendMessageOrderly(
+ &rocketmq.Message{Topic: topic, Body: body}, selector, 7 /*orderID*/, 3,
+ )
+ fmt.Printf("send result:%+v\n", r)
+ }
+}
+
+func main() {
+ flag.Parse()
+
+ if namesrvAddrs == "" {
+ println("empty namesrv address")
+ return
+ }
+
+ if topic == "" {
+ println("empty topic")
+ return
+ }
+
+ if body == "" {
+ println("empty body")
+ return
+ }
+
+ if groupID == "" {
+ println("empty groupID")
+ return
+ }
+
+ if msgCount == 0 {
+ println("zero message count")
+ return
+ }
+
+ if workerCount == 0 {
+ println("zero worker count")
+ return
+ }
+
+ producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{
+ GroupID: "testGroup",
+ NameServer: "10.200.20.25:9988",
+ })
+ producer.Start()
+ defer producer.Shutdown()
+
+ wg := sync.WaitGroup{}
+ wg.Add(workerCount)
+
+ workers := make([]worker, workerCount)
+ for i := range workers {
+ workers[i].p = producer
+ workers[i].leftMsgCount = &msgCount
+ }
+
+ for i := range workers {
+ go func(w *worker) { w.run(); wg.Done() }(&workers[i])
+ }
+
+ wg.Wait()
+}