Merge pull request #6 from zjykzk/feature-pull-consumer

Pull consumer with unit test
diff --git a/core/api.go b/core/api.go
index 0f4b6f9..39240ba 100644
--- a/core/api.go
+++ b/core/api.go
@@ -77,6 +77,15 @@
 	Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
 }
 
+// PullConsumer consumer pulling the message
+type PullConsumer interface {
+	baseAPI
+	// Pull returns the messages from the consume queue by specify the offset and the max number
+	Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult
+	// FetchSubscriptionMessageQueues returns the consume queue of the topic
+	FetchSubscriptionMessageQueues(topic string) []MessageQueue
+}
+
 type SessionCredentials struct {
 	AccessKey string
 	SecretKey string
diff --git a/core/log.go b/core/log.go
new file mode 100644
index 0000000..e081e4c
--- /dev/null
+++ b/core/log.go
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package rocketmq
+
+/*
+#include "rocketmq/CCommon.h"
+*/
+import "C"
+import "fmt"
+
+// LogLevel the log level
+type LogLevel int
+
+// predefined log level
+const (
+	LogLevelFatal = LogLevel(C.E_LOG_LEVEL_FATAL)
+	LogLevelError = LogLevel(C.E_LOG_LEVEL_ERROR)
+	LogLevelWarn  = LogLevel(C.E_LOG_LEVEL_WARN)
+	LogLevelInfo  = LogLevel(C.E_LOG_LEVEL_INFO)
+	LogLevelDebug = LogLevel(C.E_LOG_LEVEL_DEBUG)
+	LogLevelTrace = LogLevel(C.E_LOG_LEVEL_TRACE)
+	LogLevelNum   = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
+)
+
+// LogConfig the log configuration for the pull consumer
+type LogConfig struct {
+	Path     string
+	FileNum  int
+	FileSize int64
+	Level    LogLevel
+}
+
+func (lc *LogConfig) String() string {
+	return fmt.Sprintf("%+v", *lc)
+}
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
new file mode 100644
index 0000000..19a033c
--- /dev/null
+++ b/core/pull_consumer.go
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package rocketmq
+
+/*
+#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
+
+#include <stdio.h>
+#include <stdlib.h>
+#include "rocketmq/CMessageExt.h"
+#include "rocketmq/CPullConsumer.h"
+*/
+import "C"
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+	"unsafe"
+)
+
+// PullStatus pull status
+type PullStatus int
+
+// predefined pull status
+const (
+	PullFound         = PullStatus(C.E_FOUND)
+	PullNoNewMsg      = PullStatus(C.E_NO_NEW_MSG)
+	PullNoMatchedMsg  = PullStatus(C.E_NO_MATCHED_MSG)
+	PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL)
+	PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT)
+)
+
+func (ps PullStatus) String() string {
+	switch ps {
+	case PullFound:
+		return "Found"
+	case PullNoNewMsg:
+		return "NoNewMsg"
+	case PullNoMatchedMsg:
+		return "NoMatchedMsg"
+	case PullOffsetIllegal:
+		return "OffsetIllegal"
+	case PullBrokerTimeout:
+		return "BrokerTimeout"
+	default:
+		return "Unknown status"
+	}
+}
+
+// PullConsumerConfig the configuration for the pull consumer
+type PullConsumerConfig struct {
+	GroupID     string
+	NameServer  string
+	Credentials *SessionCredentials
+	Log         *LogConfig
+}
+
+// DefaultPullConsumer default consumer pulling the message
+type DefaultPullConsumer struct {
+	PullConsumerConfig
+	cconsumer *C.struct_CPullConsumer
+	funcsMap  sync.Map
+}
+
+func (c *DefaultPullConsumer) String() string {
+	topics := ""
+	c.funcsMap.Range(func(key, value interface{}) bool {
+		topics += key.(string) + ", "
+		return true
+	})
+	return fmt.Sprintf("[%+v, subcribed topics: [%s]]", c.PullConsumerConfig, topics)
+}
+
+// NewPullConsumer creates one pull consumer
+func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
+	cs := C.CString(conf.GroupID)
+	cconsumer := C.CreatePullConsumer(cs)
+	C.free(unsafe.Pointer(cs))
+
+	cs = C.CString(conf.NameServer)
+	C.SetPullConsumerNameServerAddress(cconsumer, cs)
+	C.free(unsafe.Pointer(cs))
+
+	log := conf.Log
+	if log != nil {
+		cs = C.CString(log.Path)
+		if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
+			return nil, errors.New("new pull consumer error:set log path failed")
+		}
+		C.free(unsafe.Pointer(cs))
+
+		if C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(log.FileNum), C.long(log.FileSize)) != 0 {
+			return nil, errors.New("new pull consumer error:set log file num and size failed")
+		}
+		if C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(log.Level)) != 0 {
+			return nil, errors.New("new pull consumer error:set log level failed")
+		}
+	}
+
+	if conf.Credentials != nil {
+		ak := C.CString(conf.Credentials.AccessKey)
+		sk := C.CString(conf.Credentials.SecretKey)
+		ch := C.CString(conf.Credentials.Channel)
+		C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch)
+
+		C.free(unsafe.Pointer(ak))
+		C.free(unsafe.Pointer(sk))
+		C.free(unsafe.Pointer(ch))
+	}
+
+	return &DefaultPullConsumer{PullConsumerConfig: *conf, cconsumer: cconsumer}, nil
+}
+
+// Start starts the pulling conumser
+func (c *DefaultPullConsumer) Start() error {
+	r := C.StartPullConsumer(c.cconsumer)
+	if r != 0 {
+		return fmt.Errorf("start failed, code:%d", r)
+	}
+	return nil
+}
+
+// Shutdown shutdown the pulling conumser
+func (c *DefaultPullConsumer) Shutdown() error {
+	r := C.ShutdownPullConsumer(c.cconsumer)
+	if r != 0 {
+		return fmt.Errorf("shutdown failed, code:%d", r)
+	}
+
+	r = C.DestroyPullConsumer(c.cconsumer)
+	if r != 0 {
+		return fmt.Errorf("destory failed, code:%d", r)
+	}
+	return nil
+}
+
+// FetchSubscriptionMessageQueues returns the topic's consume queue
+func (c *DefaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
+	var (
+		q    *C.struct__CMessageQueue_
+		size C.int
+	)
+
+	ctopic := C.CString(topic)
+	C.FetchSubscriptionMessageQueues(c.cconsumer, ctopic, &q, &size)
+	C.free(unsafe.Pointer(ctopic))
+	if size == 0 {
+		return nil
+	}
+
+	qs := make([]MessageQueue, size)
+	for i := range qs {
+		cq := (*C.struct__CMessageQueue_)(
+			unsafe.Pointer(uintptr(unsafe.Pointer(q)) + uintptr(i)*unsafe.Sizeof(*q)),
+		)
+		qs[i].ID, qs[i].Broker, qs[i].Topic = int(cq.queueId), C.GoString(&cq.brokerName[0]), topic
+	}
+	C.ReleaseSubscriptionMessageQueue(q)
+
+	return qs
+}
+
+// PullResult the pull result
+type PullResult struct {
+	NextBeginOffset int64
+	MinOffset       int64
+	MaxOffset       int64
+	Status          PullStatus
+	Messages        []*MessageExt
+}
+
+func (pr *PullResult) String() string {
+	return fmt.Sprintf("%+v", *pr)
+}
+
+// Pull pulling the message from the specified message queue
+func (c *DefaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
+	cmq := C.struct__CMessageQueue_{
+		queueId: C.int(mq.ID),
+	}
+
+	copy(cmq.topic[:], *(*[]C.char)(unsafe.Pointer(&mq.Topic)))
+	copy(cmq.brokerName[:], *(*[]C.char)(unsafe.Pointer(&mq.Broker)))
+
+	csubExpr := C.CString(subExpression)
+	cpullResult := C.Pull(c.cconsumer, &cmq, csubExpr, C.longlong(offset), C.int(maxNums))
+
+	pr := PullResult{
+		NextBeginOffset: int64(cpullResult.nextBeginOffset),
+		MinOffset:       int64(cpullResult.minOffset),
+		MaxOffset:       int64(cpullResult.maxOffset),
+		Status:          PullStatus(cpullResult.pullStatus),
+	}
+	if cpullResult.size > 0 {
+		msgs := make([]*MessageExt, cpullResult.size)
+		for i := range msgs {
+			msgs[i] = cmsgExtToGo(*(**C.struct_CMessageExt)(
+				unsafe.Pointer(
+					uintptr(unsafe.Pointer(cpullResult.msgFoundList)) + uintptr(i)*unsafe.Sizeof(*cpullResult.msgFoundList),
+				),
+			))
+		}
+		pr.Messages = msgs
+	}
+
+	C.free(unsafe.Pointer(csubExpr))
+	C.ReleasePullResult(cpullResult)
+	return pr
+}
diff --git a/core/queue.go b/core/queue.go
new file mode 100644
index 0000000..47f10ef
--- /dev/null
+++ b/core/queue.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package rocketmq
+
+import "fmt"
+
+// MessageQueue the queue of the message
+type MessageQueue struct {
+	Topic  string
+	Broker string
+	ID     int
+}
+
+func (q *MessageQueue) String() string {
+	return fmt.Sprintf("broker:%s, topic:%s, id:%d", q.Broker, q.Topic, q.ID)
+}
diff --git a/examples/pullconsumer/consumer.go b/examples/pullconsumer/consumer.go
new file mode 100644
index 0000000..3643a22
--- /dev/null
+++ b/examples/pullconsumer/consumer.go
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+	"flag"
+	"fmt"
+	"time"
+
+	rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+var (
+	namesrvAddrs string
+	topic        string
+	body         string
+	groupID      string
+	msgCount     int64
+	workerCount  int
+)
+
+func init() {
+	flag.StringVar(&namesrvAddrs, "n", "", "name server address")
+	flag.StringVar(&topic, "t", "", "topic")
+	flag.StringVar(&groupID, "g", "", "group")
+}
+
+// ./consumer -n "localhost:9876" -t test -g local_test
+func main() {
+	flag.Parse()
+
+	if namesrvAddrs == "" {
+		fmt.Println("empty namesrv")
+		return
+	}
+
+	if topic == "" {
+		fmt.Println("empty topic")
+		return
+	}
+
+	if groupID == "" {
+		fmt.Println("empty groupID")
+		return
+	}
+
+	consumer, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
+		GroupID:    groupID,
+		NameServer: namesrvAddrs,
+		Log: &rocketmq.LogConfig{
+			Path: "example",
+		},
+	})
+	if err != nil {
+		fmt.Printf("new pull consumer error:%s\n", err)
+		return
+	}
+
+	err = consumer.Start()
+	if err != nil {
+		fmt.Printf("start consumer error:%s\n", err)
+		return
+	}
+	defer consumer.Shutdown()
+
+	mqs := consumer.FetchSubscriptionMessageQueues(topic)
+	fmt.Printf("fetch subscription mqs:%+v\n", mqs)
+
+	total, offsets, now := 0, map[int]int64{}, time.Now()
+
+PULL:
+	for {
+		for _, mq := range mqs {
+			pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
+			total += len(pr.Messages)
+			fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
+
+			switch pr.Status {
+			case rocketmq.PullNoNewMsg:
+				break PULL
+			case rocketmq.PullFound:
+				fallthrough
+			case rocketmq.PullNoMatchedMsg:
+				fallthrough
+			case rocketmq.PullOffsetIllegal:
+				offsets[mq.ID] = pr.NextBeginOffset
+			case rocketmq.PullBrokerTimeout:
+				fmt.Println("broker timeout occur")
+			}
+		}
+	}
+
+	var timePerMessage time.Duration
+	if total > 0 {
+		timePerMessage = time.Since(now) / time.Duration(total)
+	}
+	fmt.Printf("total message:%d, per message time:%d\n", total, timePerMessage)
+}