Implements the pull consumer
diff --git a/core/log.go b/core/log.go
new file mode 100644
index 0000000..d9b87e8
--- /dev/null
+++ b/core/log.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rocketmq
+
+/*
+#include "rocketmq/CCommon.h"
+*/
+import "C"
+
+// LogLevel the log level
+type LogLevel int
+
+// predefined log level
+const (
+ LogLevelFatal = LogLevel(C.E_LOG_LEVEL_FATAL)
+ LogLevelError = LogLevel(C.E_LOG_LEVEL_ERROR)
+ LogLevelWarn = LogLevel(C.E_LOG_LEVEL_WARN)
+ LogLevelInfo = LogLevel(C.E_LOG_LEVEL_INFO)
+ LogLevelDebug = LogLevel(C.E_LOG_LEVEL_DEBUG)
+ LogLevelTrace = LogLevel(C.E_LOG_LEVEL_TRACE)
+ LogLevelNum = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
+)
+
+// LogConfig the log configuration for the pull consumer
+type LogConfig struct {
+ Path string
+ FileNum int
+ FileSize int64
+ Level LogLevel
+}
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index b9c7338..e236ee6 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package rocketmq
/*
@@ -27,18 +28,37 @@
import "C"
import (
+ "errors"
"fmt"
"sync"
"unsafe"
)
+// PullStatus pull status
+type PullStatus int
+
+// predefined pull status
+const (
+ PullFound = PullStatus(C.E_FOUND)
+ PullNoNewMsg = PullStatus(C.E_NO_NEW_MSG)
+ PullNoMatchedMsg = PullStatus(C.E_NO_MATCHED_MSG)
+ PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL)
+ PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT)
+)
+
+// PullConsumerConfig the configuration for the pull consumer
+type PullConsumerConfig struct {
+ GroupID string
+ NameServer string
+ Credentials *SessionCredentials
+ Log *LogConfig
+}
+
// DefaultPullConsumer default consumer pulling the message
type DefaultPullConsumer struct {
- config *ConsumerConfig
+ PullConsumerConfig
cconsumer *C.struct_CPullConsumer
funcsMap sync.Map
-
- resources []*C.char
}
func (c *DefaultPullConsumer) String() string {
@@ -47,48 +67,141 @@
topics += key.(string) + ", "
return true
})
- return fmt.Sprintf("[%s, subcribed topics: [%s]]", c.config, topics)
+ return fmt.Sprintf("[%+v, subcribed topics: [%s]]", c.PullConsumerConfig, topics)
+}
+
+// NewPullConsumer creates one pull consumer
+func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
+ cs := C.CString(conf.GroupID)
+ cconsumer := C.CreatePullConsumer(cs)
+ C.free(unsafe.Pointer(cs))
+
+ cs = C.CString(conf.NameServer)
+ C.SetPullConsumerNameServerAddress(cconsumer, cs)
+ C.free(unsafe.Pointer(cs))
+
+ log := conf.Log
+ if log != nil {
+ cs = C.CString(log.Path)
+ if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
+ return nil, errors.New("new pull consumer error:set log path failed")
+ }
+ C.free(unsafe.Pointer(cs))
+
+ if C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(log.FileNum), C.long(log.FileSize)) != 0 {
+ return nil, errors.New("new pull consumer error:set log file num and size failed")
+ }
+ if C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(log.Level)) != 0 {
+ return nil, errors.New("new pull consumer error:set log level failed")
+ }
+ }
+
+ if conf.Credentials != nil {
+ ak := C.CString(conf.Credentials.AccessKey)
+ sk := C.CString(conf.Credentials.SecretKey)
+ ch := C.CString(conf.Credentials.Channel)
+ C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch)
+
+ C.free(unsafe.Pointer(ak))
+ C.free(unsafe.Pointer(sk))
+ C.free(unsafe.Pointer(ch))
+ }
+
+ return &DefaultPullConsumer{PullConsumerConfig: *conf, cconsumer: cconsumer}, nil
}
// Start starts the pulling conumser
func (c *DefaultPullConsumer) Start() error {
- C.StartPullConsumer(c.cconsumer)
+ r := C.StartPullConsumer(c.cconsumer)
+ if r != 0 {
+ return fmt.Errorf("start failed, code:%d", r)
+ }
return nil
}
// Shutdown shutdown the pulling conumser
func (c *DefaultPullConsumer) Shutdown() error {
- C.ShutdownPullConsumer(c.cconsumer)
- C.DestroyPullConsumer(c.cconsumer)
- for _, r := range c.resources {
- C.free(unsafe.Pointer(r))
+ r := C.ShutdownPullConsumer(c.cconsumer)
+ if r != 0 {
+ return fmt.Errorf("shutdown failed, code:%d", r)
+ }
+
+ r = C.DestroyPullConsumer(c.cconsumer)
+ if r != 0 {
+ return fmt.Errorf("destory failed, code:%d", r)
}
return nil
}
+// FetchSubscriptionMessageQueues fetchs the topic's subcripted message queues
func (c *DefaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
+ var (
+ q *C.struct__CMessageQueue_
+ size C.int
+ )
+ ctopic := C.CString(topic)
+ C.FetchSubscriptionMessageQueues(c.cconsumer, ctopic, &q, &size)
+ C.free(unsafe.Pointer(ctopic))
+ if size == 0 {
+ return nil
+ }
+
+ qs := make([]MessageQueue, size)
+ for i := range qs {
+ cq := (*C.struct__CMessageQueue_)(
+ unsafe.Pointer(uintptr(unsafe.Pointer(q)) + uintptr(i)*unsafe.Sizeof(*q)),
+ )
+ qs[i].ID, qs[i].Broker, qs[i].Topic = int(cq.queueId), C.GoString(&cq.brokerName[0]), topic
+ }
+ C.ReleaseSubscriptionMessageQueue(q)
+
+ return qs
}
-/*
+// PullResult the pull result
+type PullResult struct {
+ NextBeginOffset int64
+ MinOffset int64
+ MaxOffset int64
+ Status PullStatus
+ Messages []*MessageExt
+}
-CPullConsumer *CreatePullConsumer(const char *groupId);
-int DestroyPullConsumer(CPullConsumer *consumer);
-int StartPullConsumer(CPullConsumer *consumer);
-int ShutdownPullConsumer(CPullConsumer *consumer);
-int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
-const char *GetPullConsumerGroupID(CPullConsumer *consumer);
-int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv);
-int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
- const char *channel);
-int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
-int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize);
-int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);
+// Pull pulling the message from the specified message queue
+func (c *DefaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
+ cmq := C.struct__CMessageQueue_{
+ queueId: C.int(mq.ID),
+ }
-int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs , int* size);
-int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
+ copy(cmq.topic[:], *(*[]C.char)(unsafe.Pointer(&mq.Topic)))
+ copy(cmq.brokerName[:], *(*[]C.char)(unsafe.Pointer(&mq.Broker)))
+ fmt.Printf("%+v\n", []byte(mq.Topic))
+ fmt.Printf("%+v\n", cmq.topic)
+ fmt.Printf("%+v\n", []byte(mq.Broker))
+ fmt.Printf("%+v\n", cmq.brokerName)
-CPullResult Pull(CPullConsumer *consumer,const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums);
-int ReleasePullResult(CPullResult pullResult);
+ csubExpr := C.CString(subExpression)
+ cpullResult := C.Pull(c.cconsumer, &cmq, csubExpr, C.longlong(offset), C.int(maxNums))
-*/
+ pullResult := PullResult{
+ NextBeginOffset: int64(cpullResult.nextBeginOffset),
+ MinOffset: int64(cpullResult.minOffset),
+ MaxOffset: int64(cpullResult.maxOffset),
+ Status: PullStatus(cpullResult.pullStatus),
+ }
+ if cpullResult.size > 0 {
+ msgs := make([]*MessageExt, cpullResult.size)
+ for i := range msgs {
+ msgs[i] = cmsgExtToGo((*C.struct_CMessageExt)(
+ unsafe.Pointer(
+ uintptr(unsafe.Pointer(cpullResult.msgFoundList)) + uintptr(i)*unsafe.Sizeof(*cpullResult.msgFoundList),
+ ),
+ ))
+ }
+ }
+
+ C.free(unsafe.Pointer(csubExpr))
+ C.ReleasePullResult(cpullResult)
+ return pullResult
+}
diff --git a/core/queue.go b/core/queue.go
new file mode 100644
index 0000000..47f10ef
--- /dev/null
+++ b/core/queue.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rocketmq
+
+import "fmt"
+
+// MessageQueue the queue of the message
+type MessageQueue struct {
+ Topic string
+ Broker string
+ ID int
+}
+
+func (q *MessageQueue) String() string {
+ return fmt.Sprintf("broker:%s, topic:%s, id:%d", q.Broker, q.Topic, q.ID)
+}
diff --git a/examples/pullconsumer/consumer.go b/examples/pullconsumer/consumer.go
new file mode 100644
index 0000000..875d5eb
--- /dev/null
+++ b/examples/pullconsumer/consumer.go
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "time"
+
+ rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+var (
+ namesrvAddrs string
+ topic string
+ body string
+ groupID string
+ msgCount int64
+ workerCount int
+)
+
+func init() {
+ flag.StringVar(&namesrvAddrs, "n", "", "name server address")
+ flag.StringVar(&topic, "t", "", "topic")
+ flag.StringVar(&groupID, "g", "", "group")
+}
+
+// ./consumer -n "localhost:9988" -t test -g local_test
+func main() {
+ flag.Parse()
+
+ if namesrvAddrs == "" {
+ fmt.Println("empty namesrv")
+ return
+ }
+
+ if topic == "" {
+ fmt.Println("empty topic")
+ return
+ }
+
+ if groupID == "" {
+ fmt.Println("empty groupID")
+ return
+ }
+
+ consumer, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
+ GroupID: groupID,
+ NameServer: namesrvAddrs,
+ Log: &rocketmq.LogConfig{
+ Path: "example",
+ },
+ })
+ if err != nil {
+ fmt.Printf("new pull consumer error:%s\n", err)
+ return
+ }
+
+ err = consumer.Start()
+ if err != nil {
+ fmt.Printf("start consumer error:%s\n", err)
+ return
+ }
+ defer consumer.Shutdown()
+
+ mqs := consumer.FetchSubscriptionMessageQueues(topic)
+ fmt.Printf("fetch subscription mqs:%+v\n", mqs)
+
+ total, offsets, now := 0, map[int]int64{}, time.Now()
+
+PULL:
+ for {
+ for _, mq := range mqs {
+ pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
+ total += len(pr.Messages)
+ fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
+
+ switch pr.Status {
+ case rocketmq.PullNoNewMsg:
+ break PULL
+ case rocketmq.PullFound:
+ fallthrough
+ case rocketmq.PullNoMatchedMsg:
+ fallthrough
+ case rocketmq.PullOffsetIllegal:
+ offsets[mq.ID] = pr.NextBeginOffset
+ case rocketmq.PullBrokerTimeout:
+ fmt.Println("broker timeout occur")
+ }
+ }
+ }
+
+ var timePerMessage time.Duration
+ if total > 0 {
+ timePerMessage = time.Since(now) / time.Duration(total)
+ }
+ fmt.Printf("total message:%d, per message time:%d\n", total, timePerMessage)
+}