Merge pull request #6 from zjykzk/feature-pull-consumer
Pull consumer with unit test
diff --git a/core/api.go b/core/api.go
index 0f4b6f9..39240ba 100644
--- a/core/api.go
+++ b/core/api.go
@@ -77,6 +77,15 @@
Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
}
+// PullConsumer consumer pulling the message
+type PullConsumer interface {
+ baseAPI
+ // Pull returns the messages from the consume queue by specify the offset and the max number
+ Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult
+ // FetchSubscriptionMessageQueues returns the consume queue of the topic
+ FetchSubscriptionMessageQueues(topic string) []MessageQueue
+}
+
type SessionCredentials struct {
AccessKey string
SecretKey string
diff --git a/core/log.go b/core/log.go
new file mode 100644
index 0000000..e081e4c
--- /dev/null
+++ b/core/log.go
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rocketmq
+
+/*
+#include "rocketmq/CCommon.h"
+*/
+import "C"
+import "fmt"
+
+// LogLevel the log level
+type LogLevel int
+
+// predefined log level
+const (
+ LogLevelFatal = LogLevel(C.E_LOG_LEVEL_FATAL)
+ LogLevelError = LogLevel(C.E_LOG_LEVEL_ERROR)
+ LogLevelWarn = LogLevel(C.E_LOG_LEVEL_WARN)
+ LogLevelInfo = LogLevel(C.E_LOG_LEVEL_INFO)
+ LogLevelDebug = LogLevel(C.E_LOG_LEVEL_DEBUG)
+ LogLevelTrace = LogLevel(C.E_LOG_LEVEL_TRACE)
+ LogLevelNum = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
+)
+
+// LogConfig the log configuration for the pull consumer
+type LogConfig struct {
+ Path string
+ FileNum int
+ FileSize int64
+ Level LogLevel
+}
+
+func (lc *LogConfig) String() string {
+ return fmt.Sprintf("%+v", *lc)
+}
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
new file mode 100644
index 0000000..19a033c
--- /dev/null
+++ b/core/pull_consumer.go
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rocketmq
+
+/*
+#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
+
+#include <stdio.h>
+#include <stdlib.h>
+#include "rocketmq/CMessageExt.h"
+#include "rocketmq/CPullConsumer.h"
+*/
+import "C"
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "unsafe"
+)
+
+// PullStatus pull status
+type PullStatus int
+
+// predefined pull status
+const (
+ PullFound = PullStatus(C.E_FOUND)
+ PullNoNewMsg = PullStatus(C.E_NO_NEW_MSG)
+ PullNoMatchedMsg = PullStatus(C.E_NO_MATCHED_MSG)
+ PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL)
+ PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT)
+)
+
+func (ps PullStatus) String() string {
+ switch ps {
+ case PullFound:
+ return "Found"
+ case PullNoNewMsg:
+ return "NoNewMsg"
+ case PullNoMatchedMsg:
+ return "NoMatchedMsg"
+ case PullOffsetIllegal:
+ return "OffsetIllegal"
+ case PullBrokerTimeout:
+ return "BrokerTimeout"
+ default:
+ return "Unknown status"
+ }
+}
+
+// PullConsumerConfig the configuration for the pull consumer
+type PullConsumerConfig struct {
+ GroupID string
+ NameServer string
+ Credentials *SessionCredentials
+ Log *LogConfig
+}
+
+// DefaultPullConsumer default consumer pulling the message
+type DefaultPullConsumer struct {
+ PullConsumerConfig
+ cconsumer *C.struct_CPullConsumer
+ funcsMap sync.Map
+}
+
+func (c *DefaultPullConsumer) String() string {
+ topics := ""
+ c.funcsMap.Range(func(key, value interface{}) bool {
+ topics += key.(string) + ", "
+ return true
+ })
+ return fmt.Sprintf("[%+v, subcribed topics: [%s]]", c.PullConsumerConfig, topics)
+}
+
+// NewPullConsumer creates one pull consumer
+func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
+ cs := C.CString(conf.GroupID)
+ cconsumer := C.CreatePullConsumer(cs)
+ C.free(unsafe.Pointer(cs))
+
+ cs = C.CString(conf.NameServer)
+ C.SetPullConsumerNameServerAddress(cconsumer, cs)
+ C.free(unsafe.Pointer(cs))
+
+ log := conf.Log
+ if log != nil {
+ cs = C.CString(log.Path)
+ if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
+ return nil, errors.New("new pull consumer error:set log path failed")
+ }
+ C.free(unsafe.Pointer(cs))
+
+ if C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(log.FileNum), C.long(log.FileSize)) != 0 {
+ return nil, errors.New("new pull consumer error:set log file num and size failed")
+ }
+ if C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(log.Level)) != 0 {
+ return nil, errors.New("new pull consumer error:set log level failed")
+ }
+ }
+
+ if conf.Credentials != nil {
+ ak := C.CString(conf.Credentials.AccessKey)
+ sk := C.CString(conf.Credentials.SecretKey)
+ ch := C.CString(conf.Credentials.Channel)
+ C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch)
+
+ C.free(unsafe.Pointer(ak))
+ C.free(unsafe.Pointer(sk))
+ C.free(unsafe.Pointer(ch))
+ }
+
+ return &DefaultPullConsumer{PullConsumerConfig: *conf, cconsumer: cconsumer}, nil
+}
+
+// Start starts the pulling conumser
+func (c *DefaultPullConsumer) Start() error {
+ r := C.StartPullConsumer(c.cconsumer)
+ if r != 0 {
+ return fmt.Errorf("start failed, code:%d", r)
+ }
+ return nil
+}
+
+// Shutdown shutdown the pulling conumser
+func (c *DefaultPullConsumer) Shutdown() error {
+ r := C.ShutdownPullConsumer(c.cconsumer)
+ if r != 0 {
+ return fmt.Errorf("shutdown failed, code:%d", r)
+ }
+
+ r = C.DestroyPullConsumer(c.cconsumer)
+ if r != 0 {
+ return fmt.Errorf("destory failed, code:%d", r)
+ }
+ return nil
+}
+
+// FetchSubscriptionMessageQueues returns the topic's consume queue
+func (c *DefaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
+ var (
+ q *C.struct__CMessageQueue_
+ size C.int
+ )
+
+ ctopic := C.CString(topic)
+ C.FetchSubscriptionMessageQueues(c.cconsumer, ctopic, &q, &size)
+ C.free(unsafe.Pointer(ctopic))
+ if size == 0 {
+ return nil
+ }
+
+ qs := make([]MessageQueue, size)
+ for i := range qs {
+ cq := (*C.struct__CMessageQueue_)(
+ unsafe.Pointer(uintptr(unsafe.Pointer(q)) + uintptr(i)*unsafe.Sizeof(*q)),
+ )
+ qs[i].ID, qs[i].Broker, qs[i].Topic = int(cq.queueId), C.GoString(&cq.brokerName[0]), topic
+ }
+ C.ReleaseSubscriptionMessageQueue(q)
+
+ return qs
+}
+
+// PullResult the pull result
+type PullResult struct {
+ NextBeginOffset int64
+ MinOffset int64
+ MaxOffset int64
+ Status PullStatus
+ Messages []*MessageExt
+}
+
+func (pr *PullResult) String() string {
+ return fmt.Sprintf("%+v", *pr)
+}
+
+// Pull pulling the message from the specified message queue
+func (c *DefaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
+ cmq := C.struct__CMessageQueue_{
+ queueId: C.int(mq.ID),
+ }
+
+ copy(cmq.topic[:], *(*[]C.char)(unsafe.Pointer(&mq.Topic)))
+ copy(cmq.brokerName[:], *(*[]C.char)(unsafe.Pointer(&mq.Broker)))
+
+ csubExpr := C.CString(subExpression)
+ cpullResult := C.Pull(c.cconsumer, &cmq, csubExpr, C.longlong(offset), C.int(maxNums))
+
+ pr := PullResult{
+ NextBeginOffset: int64(cpullResult.nextBeginOffset),
+ MinOffset: int64(cpullResult.minOffset),
+ MaxOffset: int64(cpullResult.maxOffset),
+ Status: PullStatus(cpullResult.pullStatus),
+ }
+ if cpullResult.size > 0 {
+ msgs := make([]*MessageExt, cpullResult.size)
+ for i := range msgs {
+ msgs[i] = cmsgExtToGo(*(**C.struct_CMessageExt)(
+ unsafe.Pointer(
+ uintptr(unsafe.Pointer(cpullResult.msgFoundList)) + uintptr(i)*unsafe.Sizeof(*cpullResult.msgFoundList),
+ ),
+ ))
+ }
+ pr.Messages = msgs
+ }
+
+ C.free(unsafe.Pointer(csubExpr))
+ C.ReleasePullResult(cpullResult)
+ return pr
+}
diff --git a/core/queue.go b/core/queue.go
new file mode 100644
index 0000000..47f10ef
--- /dev/null
+++ b/core/queue.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rocketmq
+
+import "fmt"
+
+// MessageQueue the queue of the message
+type MessageQueue struct {
+ Topic string
+ Broker string
+ ID int
+}
+
+func (q *MessageQueue) String() string {
+ return fmt.Sprintf("broker:%s, topic:%s, id:%d", q.Broker, q.Topic, q.ID)
+}
diff --git a/examples/pullconsumer/consumer.go b/examples/pullconsumer/consumer.go
new file mode 100644
index 0000000..3643a22
--- /dev/null
+++ b/examples/pullconsumer/consumer.go
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "time"
+
+ rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+var (
+ namesrvAddrs string
+ topic string
+ body string
+ groupID string
+ msgCount int64
+ workerCount int
+)
+
+func init() {
+ flag.StringVar(&namesrvAddrs, "n", "", "name server address")
+ flag.StringVar(&topic, "t", "", "topic")
+ flag.StringVar(&groupID, "g", "", "group")
+}
+
+// ./consumer -n "localhost:9876" -t test -g local_test
+func main() {
+ flag.Parse()
+
+ if namesrvAddrs == "" {
+ fmt.Println("empty namesrv")
+ return
+ }
+
+ if topic == "" {
+ fmt.Println("empty topic")
+ return
+ }
+
+ if groupID == "" {
+ fmt.Println("empty groupID")
+ return
+ }
+
+ consumer, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
+ GroupID: groupID,
+ NameServer: namesrvAddrs,
+ Log: &rocketmq.LogConfig{
+ Path: "example",
+ },
+ })
+ if err != nil {
+ fmt.Printf("new pull consumer error:%s\n", err)
+ return
+ }
+
+ err = consumer.Start()
+ if err != nil {
+ fmt.Printf("start consumer error:%s\n", err)
+ return
+ }
+ defer consumer.Shutdown()
+
+ mqs := consumer.FetchSubscriptionMessageQueues(topic)
+ fmt.Printf("fetch subscription mqs:%+v\n", mqs)
+
+ total, offsets, now := 0, map[int]int64{}, time.Now()
+
+PULL:
+ for {
+ for _, mq := range mqs {
+ pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
+ total += len(pr.Messages)
+ fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
+
+ switch pr.Status {
+ case rocketmq.PullNoNewMsg:
+ break PULL
+ case rocketmq.PullFound:
+ fallthrough
+ case rocketmq.PullNoMatchedMsg:
+ fallthrough
+ case rocketmq.PullOffsetIllegal:
+ offsets[mq.ID] = pr.NextBeginOffset
+ case rocketmq.PullBrokerTimeout:
+ fmt.Println("broker timeout occur")
+ }
+ }
+ }
+
+ var timePerMessage time.Duration
+ if total > 0 {
+ timePerMessage = time.Since(now) / time.Duration(total)
+ }
+ fmt.Printf("total message:%d, per message time:%d\n", total, timePerMessage)
+}