Merge pull request #6 from zjykzk/feature-pull-consumer

Pull consumer with unit test
diff --git a/core/api.go b/core/api.go
index b64cc97..39240ba 100644
--- a/core/api.go
+++ b/core/api.go
@@ -44,6 +44,9 @@
 	// SendMessageSync send a message with sync
 	SendMessageSync(msg *Message) SendResult
 
+	// SendMessageOrderly send the message orderly
+	SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult
+
 	// SendMessageAsync send a message with async
 	SendMessageAsync(msg *Message)
 }
diff --git a/core/producer.go b/core/producer.go
index 3d7e022..3cccf21 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -16,12 +16,24 @@
  */
 package rocketmq
 
-//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
-//#include "rocketmq/CMessage.h"
-//#include "rocketmq/CProducer.h"
-//#include "rocketmq/CSendResult.h"
+/*
+#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+
+#include <stdio.h>
+#include "rocketmq/CMessage.h"
+#include "rocketmq/CProducer.h"
+#include "rocketmq/CSendResult.h"
+
+int queueSelectorCallback_cgo(int size, CMessage *msg, void *selectorKey) {
+	int queueSelectorCallback(int, void*);
+	return queueSelectorCallback(size, selectorKey);
+}
+*/
 import "C"
-import "fmt"
+import (
+	"fmt"
+	"unsafe"
+)
 
 type SendStatus int
 
@@ -77,7 +89,7 @@
 func (p *defaultProducer) Start() error {
 	err := int(C.StartProducer(p.cproduer))
 	// TODO How to process err code.
-	fmt.Printf("result: %v \n", err)
+	fmt.Printf("producer start result: %v \n", err)
 	return nil
 }
 
@@ -87,7 +99,7 @@
 	err := C.ShutdownProducer(p.cproduer)
 
 	// TODO How to process err code.
-	fmt.Printf("result: %v \n", err)
+	fmt.Printf("shutdown result: %v \n", err)
 	return nil
 }
 
@@ -105,6 +117,28 @@
 	return result
 }
 
+func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult {
+	cmsg := goMsgToC(msg)
+	key := selectors.put(&messageQueueSelectorWrapper{selector: selector, m: msg, arg: arg})
+
+	var sr C.struct__SendResult_
+	C.SendMessageOrderly(
+		p.cproduer,
+		cmsg,
+		(C.QueueSelectorCallback)(unsafe.Pointer(C.queueSelectorCallback_cgo)),
+		unsafe.Pointer(&key),
+		C.int(autoRetryTimes),
+		&sr,
+	)
+	C.DestroyMessage(cmsg)
+
+	return SendResult{
+		Status: SendStatus(sr.sendStatus),
+		MsgId:  C.GoString(&sr.msgId[0]),
+		Offset: int64(sr.offset),
+	}
+}
+
 func (p *defaultProducer) SendMessageAsync(msg *Message) {
 	// TODO
 }
diff --git a/core/queue_selector.go b/core/queue_selector.go
new file mode 100644
index 0000000..311c378
--- /dev/null
+++ b/core/queue_selector.go
@@ -0,0 +1,62 @@
+package rocketmq
+
+import "C"
+import (
+	"strconv"
+	"sync"
+	"unsafe"
+)
+
+var selectors = selectorHolder{selectors: map[int]*messageQueueSelectorWrapper{}}
+
+//export queueSelectorCallback
+func queueSelectorCallback(size int, selectorKey unsafe.Pointer) int {
+	s, ok := selectors.getAndDelete(*(*int)(selectorKey))
+	if !ok {
+		panic("BUG: not register the selector with key:" + strconv.Itoa(*(*int)(selectorKey)))
+	}
+	return s.Select(size)
+}
+
+type messageQueueSelectorWrapper struct {
+	selector MessageQueueSelector
+
+	m   *Message
+	arg interface{}
+}
+
+func (w *messageQueueSelectorWrapper) Select(size int) int {
+	return w.selector.Select(size, w.m, w.arg)
+}
+
+// MessageQueueSelector select one message queue
+type MessageQueueSelector interface {
+	Select(size int, m *Message, arg interface{}) int
+}
+
+type selectorHolder struct {
+	sync.Mutex
+
+	selectors map[int]*messageQueueSelectorWrapper
+	key       int
+}
+
+func (s *selectorHolder) put(selector *messageQueueSelectorWrapper) (key int) {
+	s.Lock()
+	key = s.key
+	s.selectors[key] = selector
+	s.key++
+	s.Unlock()
+	return
+}
+
+func (s *selectorHolder) getAndDelete(key int) (*messageQueueSelectorWrapper, bool) {
+	s.Lock()
+	selector, ok := s.selectors[key]
+	if ok {
+		delete(s.selectors, key)
+	}
+	s.Unlock()
+
+	return selector, ok
+}
diff --git a/core/queue_selector_test.go b/core/queue_selector_test.go
new file mode 100644
index 0000000..06743b4
--- /dev/null
+++ b/core/queue_selector_test.go
@@ -0,0 +1,53 @@
+package rocketmq
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+type mockMessageQueueSelector struct {
+	arg  interface{}
+	m    *Message
+	size int
+
+	selectRet int
+}
+
+func (m *mockMessageQueueSelector) Select(size int, msg *Message, arg interface{}) int {
+	m.arg, m.m, m.size = arg, msg, size
+	return m.selectRet
+}
+
+func TestWraper(t *testing.T) {
+	s := &mockMessageQueueSelector{selectRet: 2}
+	w := &messageQueueSelectorWrapper{selector: s, m: &Message{}, arg: 3}
+
+	assert.Equal(t, 2, w.Select(4))
+	assert.Equal(t, w.m, s.m)
+	v, ok := s.arg.(int)
+	assert.True(t, ok)
+	assert.Equal(t, 3, v)
+}
+
+func TestSelectorHolder(t *testing.T) {
+	s := &messageQueueSelectorWrapper{}
+
+	key := selectors.put(s)
+	assert.Equal(t, 0, key)
+
+	key = selectors.put(s)
+	assert.Equal(t, 1, key)
+
+	assert.Equal(t, 2, len(selectors.selectors))
+
+	ss, ok := selectors.getAndDelete(0)
+	assert.Equal(t, s, ss)
+	assert.True(t, ok)
+
+	ss, ok = selectors.getAndDelete(1)
+	assert.Equal(t, s, ss)
+	assert.True(t, ok)
+
+	assert.Equal(t, 0, len(selectors.selectors))
+}
diff --git a/doc/Introduction.md b/doc/Introduction.md
index 8b13789..3f24e99 100644
--- a/doc/Introduction.md
+++ b/doc/Introduction.md
@@ -1 +1,84 @@
+----------
+## RocketMQ Client Go
 
+### 1. Go Version
+* go1.10.5 darwin/amd64
+
+
+### 2. Dependency
+* [librocketmq](https://github.com/apache/rocketmq-client-cpp)	
+
+### 3. Build and Install
+#### macOS Platform (macOS Mojave 10.14)
+* Install Compile tools (homebrew package manager)
+    ```
+    1. xcode-select --install
+    2. brew install cmake
+    3. brew install automake
+    ```
+* Install dependencies
+    1. [Go official download](https://golang.org/dl/)
+
+    2. Get go client package
+    ```
+    go get github.com/apache/rocketmq-client-go
+    ```
+    3. [librocketmq](https://github.com/apache/rocketmq-client-cpp)
+        - `git clone https://github.com/apache/rocketmq-client-cpp`
+        - `cd rocketmq-client-cpp`
+        - `sudo sh build.sh` 
+        - `cp bin/librocketmq.dylib /usr/local/lib`
+        - `sudo mkdir /usr/local/include/rocketmq`
+        - `sudo cp incldue/* /usr/local/incldue/rocketmq/`
+
+#### Linux
+
+*coming soon*
+
+#### Windows
+
+*coming soon*
+
+----------
+## How to use
+
+- import package
+    ```
+    import "github.com/apache/rocketmq-client-go/core"
+    ```
+- Send message
+    ```go
+    func SendMessagge(){
+        producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{GroupID: "testGroup", NameServer: "localhost:9876"})
+        producer.Start()
+        defer producer.Shutdown()
+        fmt.Printf("Producer: %s started... \n", producer)
+	    for i := 0; i < 100; i++ {
+		    msg := fmt.Sprintf("Hello RocketMQ-%d", i)
+		    result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+		    fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
+        }
+        time.Sleep(10 * time.Second)
+	    producer.Shutdown()
+    }
+    ```
+- Push Consumer
+    ```go
+    func PushConsumeMessage() {
+	    fmt.Println("Start Receiving Messages...")
+	    consumer, _ := rocketmq.NewPushConsumer(&rocketmq.ConsumerConfig{
+            GroupID: "testGroupId", 
+            NameServer: "localhost:9876",
+            ConsumerThreadCount: 2, 
+            MessageBatchMaxSize: 16})
+	    // MUST subscribe topic before consumer started.
+	    consumer.Subscribe("test", "*", func(msg    *rocketmq.MessageExt)rocketmq.ConsumeStatus {
+		    fmt.Printf("A message received: \"%s\" \n", msg.Body)
+		    return rocketmq.ConsumeSuccess})
+	    consumer.Start()
+	    defer consumer.Shutdown()
+	    fmt.Printf("consumer: %s started...\n", consumer)
+	    time.Sleep(10 * time.Minute)
+    }
+    ```
+- [Full example](../examples)
\ No newline at end of file
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
new file mode 100644
index 0000000..8e78fb9
--- /dev/null
+++ b/examples/orderproducer/producer.go
@@ -0,0 +1,105 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"sync"
+	"sync/atomic"
+
+	rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type queueSelectorByOrderID struct{}
+
+func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interface{}) int {
+	return arg.(int) % size
+}
+
+var (
+	namesrvAddrs string
+	topic        string
+	body         string
+	groupID      string
+	msgCount     int64
+	workerCount  int
+)
+
+func init() {
+	flag.StringVar(&namesrvAddrs, "n", "", "name server address")
+	flag.StringVar(&topic, "t", "", "topic")
+	flag.StringVar(&groupID, "g", "", "group")
+	flag.StringVar(&body, "d", "", "body")
+	flag.Int64Var(&msgCount, "m", 0, "message count")
+	flag.IntVar(&workerCount, "w", 0, "worker count")
+}
+
+type worker struct {
+	p            rocketmq.Producer
+	leftMsgCount *int64
+}
+
+func (w *worker) run() {
+	selector := queueSelectorByOrderID{}
+	for atomic.AddInt64(w.leftMsgCount, -1) >= 0 {
+		r := w.p.SendMessageOrderly(
+			&rocketmq.Message{Topic: topic, Body: body}, selector, 7 /*orderID*/, 3,
+		)
+		fmt.Printf("send result:%+v\n", r)
+	}
+}
+
+func main() {
+	flag.Parse()
+
+	if namesrvAddrs == "" {
+		println("empty namesrv address")
+		return
+	}
+
+	if topic == "" {
+		println("empty topic")
+		return
+	}
+
+	if body == "" {
+		println("empty body")
+		return
+	}
+
+	if groupID == "" {
+		println("empty groupID")
+		return
+	}
+
+	if msgCount == 0 {
+		println("zero message count")
+		return
+	}
+
+	if workerCount == 0 {
+		println("zero worker count")
+		return
+	}
+
+	producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{
+		GroupID:    "testGroup",
+		NameServer: "10.200.20.25:9988",
+	})
+	producer.Start()
+	defer producer.Shutdown()
+
+	wg := sync.WaitGroup{}
+	wg.Add(workerCount)
+
+	workers := make([]worker, workerCount)
+	for i := range workers {
+		workers[i].p = producer
+		workers[i].leftMsgCount = &msgCount
+	}
+
+	for i := range workers {
+		go func(w *worker) { w.run(); wg.Done() }(&workers[i])
+	}
+
+	wg.Wait()
+}