Merge pull request #12 from wenfengwang/master
More Configurable fields and mores
diff --git a/cmd/producer.go b/cmd/producer.go
new file mode 100644
index 0000000..b67f50d
--- /dev/null
+++ b/cmd/producer.go
@@ -0,0 +1,58 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "github.com/apache/rocketmq-client-go/core"
+)
+
+var (
+ namesrvAddrs string
+ topic string
+ body string
+ groupID string
+ keys string
+)
+
+func init() {
+ flag.StringVar(&namesrvAddrs, "addr", "", "name server address")
+ flag.StringVar(&topic, "t", "", "topic name")
+ flag.StringVar(&groupID, "group", "", "producer group")
+ flag.StringVar(&body, "body", "", "message body")
+ flag.StringVar(&keys, "keys", "", "message keys")
+
+}
+
+func main() {
+ flag.Parse()
+ if namesrvAddrs == "" {
+ println("empty nameServer address")
+ return
+ }
+
+ if topic == "" {
+ println("empty topic")
+ return
+ }
+
+ if body == "" {
+ println("empty body")
+ return
+ }
+
+ if groupID == "" {
+ println("empty groupID")
+ return
+ }
+
+ cfg := &rocketmq.ProducerConfig{}
+ cfg.GroupID = groupID
+ cfg.NameServer = namesrvAddrs
+
+ producer, _ := rocketmq.NewProducer(cfg)
+ producer.Start()
+ defer producer.Shutdown()
+
+ result := producer.SendMessageSync(&rocketmq.Message{Topic: topic, Body: body, Keys: keys})
+ println(fmt.Sprintf("send message result: %s", result))
+}
diff --git a/core/api.go b/core/api.go
index bf38a78..8ad6af0 100644
--- a/core/api.go
+++ b/core/api.go
@@ -22,21 +22,35 @@
return GetVersion()
}
+type clientConfig struct {
+ GroupID string
+ NameServer string
+ NameServerDomain string
+ GroupName string
+ InstanceName string
+ Credentials *SessionCredentials
+ LogC *LogConfig
+}
+
// NewProducer create a new producer with config
-func NewProducer(config *ProducerConfig) Producer {
+func NewProducer(config *ProducerConfig) (Producer, error) {
return newDefaultProducer(config)
}
// ProducerConfig define a producer
type ProducerConfig struct {
- GroupID string
- NameServer string
- Credentials *SessionCredentials
+ clientConfig
+ SendMsgTimeout int
+ CompressLevel int
+ MaxMessageSize int
}
func (config *ProducerConfig) String() string {
// For security, don't print Credentials default.
- return fmt.Sprintf("[groupId: %s, nameServer: %s]", config.NameServer, config.GroupID)
+ return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, NameServer: %s, "+
+ "SendMsgTimeout: %d, CompressLevel: %d, MaxMessageSize: %d, ]", config.NameServer, config.GroupID,
+ config.NameServerDomain, config.GroupName, config.InstanceName, config.SendMsgTimeout, config.CompressLevel,
+ config.MaxMessageSize)
}
type Producer interface {
@@ -52,27 +66,27 @@
}
// NewPushConsumer create a new consumer with config.
-func NewPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
return newPushConsumer(config)
}
-// ConsumerConfig define a new consumer.
-type ConsumerConfig struct {
- GroupID string
- NameServer string
- ConsumerThreadCount int
+// PushConsumerConfig define a new consumer.
+type PushConsumerConfig struct {
+ clientConfig
+ ThreadCount int
MessageBatchMaxSize int
- //ConsumerInstanceName int
- Credentials *SessionCredentials
+ Model MessageModel
}
-func (config *ConsumerConfig) String() string {
- return fmt.Sprintf("[groupId: %s, nameServer: %s, consumerThreadCount: %d, messageBatchMaxSize: %d]",
- config.GroupID, config.NameServer, config.ConsumerThreadCount, config.MessageBatchMaxSize)
+func (config *PushConsumerConfig) String() string {
+ return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, "+
+ "ThreadCount: %d, MessageBatchMaxSize: %d, Model: %v ]", config.NameServer, config.GroupID,
+ config.NameServerDomain, config.InstanceName, config.ThreadCount, config.MessageBatchMaxSize, config.Model)
}
type PushConsumer interface {
baseAPI
+
// Subscribe a new topic with specify filter expression and consume function.
Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
}
@@ -80,8 +94,10 @@
// PullConsumer consumer pulling the message
type PullConsumer interface {
baseAPI
+
// Pull returns the messages from the consume queue by specify the offset and the max number
Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult
+
// FetchSubscriptionMessageQueues returns the consume queue of the topic
FetchSubscriptionMessageQueues(topic string) []MessageQueue
}
@@ -105,7 +121,6 @@
func (result SendResult) String() string {
return fmt.Sprintf("[status: %s, messageId: %s, offset: %d]", result.Status, result.MsgId, result.Offset)
-
}
type baseAPI interface {
diff --git a/core/cfuns.go b/core/cfuns.go
index 7fe4ffe..5b4cba9 100644
--- a/core/cfuns.go
+++ b/core/cfuns.go
@@ -36,6 +36,7 @@
}
msgExt := cmsgExtToGo(msg)
+ //C.DestroyMessageExt(msg)
cfunc, exist := consumer.(*defaultPushConsumer).funcsMap.Load(msgExt.Topic)
if !exist {
return C.int(ReConsumeLater)
diff --git a/core/log.go b/core/log.go
index e081e4c..5ff244e 100644
--- a/core/log.go
+++ b/core/log.go
@@ -37,6 +37,27 @@
LogLevelNum = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
)
+func (l LogLevel) String() string {
+ switch l {
+ case LogLevelFatal:
+ return "Fatal"
+ case LogLevelError:
+ return "Error"
+ case LogLevelWarn:
+ return "Warn"
+ case LogLevelInfo:
+ return "Info"
+ case LogLevelDebug:
+ return "Debug"
+ case LogLevelTrace:
+ return "Trace"
+ case LogLevelNum:
+ return "Num"
+ default:
+ return "Unkonw"
+ }
+}
+
// LogConfig the log configuration for the pull consumer
type LogConfig struct {
Path string
diff --git a/core/log_test.go b/core/log_test.go
new file mode 100644
index 0000000..8c4a449
--- /dev/null
+++ b/core/log_test.go
@@ -0,0 +1,22 @@
+package rocketmq
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestLogConfig_String(t *testing.T) {
+ logc := LogConfig{Path: "/log/path1", FileNum: 3, FileSize: 1 << 20, Level: LogLevelDebug}
+ assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Debug}", logc.String())
+ logc.Level = LogLevelFatal
+ assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Fatal}", logc.String())
+ logc.Level = LogLevelError
+ assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Error}", logc.String())
+ logc.Level = LogLevelWarn
+ assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Warn}", logc.String())
+ logc.Level = LogLevelInfo
+ assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Info}", logc.String())
+ logc.Level = LogLevelTrace
+ assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Trace}", logc.String())
+ logc.Level = LogLevelError
+}
diff --git a/core/message.go b/core/message.go
index 98dc6cb..8e75d18 100644
--- a/core/message.go
+++ b/core/message.go
@@ -16,80 +16,105 @@
*/
package rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
-//#include "rocketmq/CMessage.h"
-//#include "rocketmq/CMessageExt.h"
+/*
+#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+#include "rocketmq/CMessage.h"
+#include "rocketmq/CMessageExt.h"
+#include <stdlib.h>
+*/
import "C"
-import "fmt"
+import (
+ "fmt"
+ "unsafe"
+)
type Message struct {
- Topic string
- Keys string
- // improve: maybe []byte is better.
- Body string
+ Topic string
+ Tags string
+ Keys string
+ Body string
+ DelayTimeLevel int
+ Property map[string]string
}
func (msg *Message) String() string {
- return fmt.Sprintf("[topic: %s, keys: %s, body: %s]", msg.Topic, msg.Keys, msg.Body)
+ return fmt.Sprintf("[Topic: %s, Tags: %s, Keys: %s, Body: %s, DelayTimeLevel: %d, Property: %v]",
+ msg.Topic, msg.Tags, msg.Keys, msg.Body, msg.DelayTimeLevel, msg.Property)
+}
+
+func goMsgToC(gomsg *Message) *C.struct_CMessage {
+ cs := C.CString(gomsg.Topic)
+ var cmsg = C.CreateMessage(cs)
+ C.free(unsafe.Pointer(cs))
+
+ cs = C.CString(gomsg.Tags)
+ C.SetMessageTags(cmsg, cs)
+ C.free(unsafe.Pointer(cs))
+
+ cs = C.CString(gomsg.Keys)
+ C.SetMessageKeys(cmsg, cs)
+ C.free(unsafe.Pointer(cs))
+
+ cs = C.CString(gomsg.Body)
+ C.SetMessageBody(cmsg, cs)
+ C.free(unsafe.Pointer(cs))
+
+ C.SetDelayTimeLevel(cmsg, C.int(gomsg.DelayTimeLevel))
+
+ for k, v := range gomsg.Property {
+ key := C.CString(k)
+ value := C.CString(v)
+ C.SetMessageProperty(cmsg, key, value)
+ C.free(unsafe.Pointer(key))
+ C.free(unsafe.Pointer(value))
+ }
+ return cmsg
}
type MessageExt struct {
Message
- MessageID string
- Tags string
+ MessageID string
+ QueueId int
+ ReconsumeTimes int
+ StoreSize int
+ BornTimestamp int64
+ StoreTimestamp int64
+ QueueOffset int64
+ CommitLogOffset int64
+ PreparedTransactionOffset int64
+
// improve: is there is a method convert c++ map to go variable?
cmsgExt *C.struct_CMessageExt
- //Properties string
}
func (msgExt *MessageExt) String() string {
- return fmt.Sprintf("[messageId: %s, %s, Tags: %s]", msgExt.MessageID, msgExt.Message, msgExt.Tags)
+ return fmt.Sprintf("[MessageId: %s, %s, QueueId: %d, ReconsumeTimes: %d, StoreSize: %d, BornTimestamp: %d, "+
+ "StoreTimestamp: %d, QueueOffset: %d, CommitLogOffset: %d, PreparedTransactionOffset: %d]", msgExt.MessageID,
+ msgExt.Message.String(), msgExt.QueueId, msgExt.ReconsumeTimes, msgExt.StoreSize, msgExt.BornTimestamp,
+ msgExt.StoreTimestamp, msgExt.QueueOffset, msgExt.CommitLogOffset, msgExt.PreparedTransactionOffset)
}
func (msgExt *MessageExt) GetProperty(key string) string {
return C.GoString(C.GetMessageProperty(msgExt.cmsgExt, C.CString(key)))
}
-func cmsgToGo(cmsg *C.struct_CMessage) *Message {
- defer C.DestroyMessage(cmsg)
- gomsg := &Message{}
-
- return gomsg
-}
-
-func goMsgToC(gomsg *Message) *C.struct_CMessage {
- var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
-
- // int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
- C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
-
- // int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
- C.SetMessageBody(cmsg, C.CString(gomsg.Body))
- return cmsg
-}
-
-//
func cmsgExtToGo(cmsg *C.struct_CMessageExt) *MessageExt {
- //defer C.DestroyMessageExt(cmsg)
- gomsg := &MessageExt{}
+ gomsg := &MessageExt{cmsgExt: cmsg}
gomsg.Topic = C.GoString(C.GetMessageTopic(cmsg))
- gomsg.Body = C.GoString(C.GetMessageBody(cmsg))
- gomsg.Keys = C.GoString(C.GetMessageKeys(cmsg))
gomsg.Tags = C.GoString(C.GetMessageTags(cmsg))
+ gomsg.Keys = C.GoString(C.GetMessageKeys(cmsg))
+ gomsg.Body = C.GoString(C.GetMessageBody(cmsg))
gomsg.MessageID = C.GoString(C.GetMessageId(cmsg))
+ gomsg.DelayTimeLevel = int(C.GetMessageDelayTimeLevel(cmsg))
+ gomsg.QueueId = int(C.GetMessageQueueId(cmsg))
+ gomsg.ReconsumeTimes = int(C.GetMessageReconsumeTimes(cmsg))
+ gomsg.StoreSize = int(C.GetMessageStoreSize(cmsg))
+ gomsg.BornTimestamp = int64(C.GetMessageBornTimestamp(cmsg))
+ gomsg.StoreTimestamp = int64(C.GetMessageStoreTimestamp(cmsg))
+ gomsg.QueueOffset = int64(C.GetMessageQueueOffset(cmsg))
+ gomsg.CommitLogOffset = int64(C.GetMessageCommitLogOffset(cmsg))
+ gomsg.PreparedTransactionOffset = int64(C.GetMessagePreparedTransactionOffset(cmsg))
return gomsg
}
-
-//
-//func goMsgExtToC(gomsg *MessageExt) *C.struct_CMessageExt {
-// var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
-//
-// // int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
-// C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
-//
-// // int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
-// C.SetMessageBody(cmsg, C.CString(gomsg.Body))
-// return cmsg
-//}
diff --git a/core/message_test.go b/core/message_test.go
index 1d1f2a8..1df6ade 100644
--- a/core/message_test.go
+++ b/core/message_test.go
@@ -17,12 +17,44 @@
package rocketmq
import (
+ "github.com/stretchr/testify/assert"
"testing"
)
-func TestGetMessageTopic(test *testing.T) {
- //fmt.Println("-----TestGetMessageTopic Start----")
- //msg := rocketmq.CreateMessage("testTopic")
- //rocketmq.DestroyMessage(msg)
- //fmt.Println("-----TestGetMessageTopic Finish----")
+func TestMessage_String(t *testing.T) {
+ msg := Message{
+ Topic: "testTopic",
+ Tags: "TagA, TagB",
+ Keys: "Key1, Key2",
+ Body: "Body1234567890",
+ DelayTimeLevel: 8}
+ expect := "[Topic: testTopic, Tags: TagA, TagB, Keys: Key1, Key2, Body: Body1234567890, DelayTimeLevel: 8," +
+ " Property: map[]]"
+ assert.Equal(t, expect, msg.String())
+}
+
+func TestMessageExt_String(t *testing.T) {
+ msg := Message{
+ Topic: "testTopic",
+ Tags: "TagA, TagB",
+ Keys: "Key1, Key2",
+ Body: "Body1234567890",
+ DelayTimeLevel: 8}
+ msgExt := MessageExt{
+ Message: msg,
+ MessageID: "messageId",
+ QueueId: 2,
+ ReconsumeTimes: 13,
+ StoreSize: 1 << 10,
+ BornTimestamp: int64(1234567890897),
+ StoreTimestamp: int64(1234567890),
+ QueueOffset: int64(1234567890),
+ CommitLogOffset: int64(1234567890),
+ PreparedTransactionOffset: int64(1234567890),
+ }
+ expect := "[MessageId: messageId, [Topic: testTopic, Tags: TagA, TagB, Keys: Key1, Key2, " +
+ "Body: Body1234567890, DelayTimeLevel: 8, Property: map[]], QueueId: 2, ReconsumeTimes: " +
+ "13, StoreSize: 1024, BornTimestamp: 1234567890897, StoreTimestamp: 1234567890, QueueOffset: 1234567890," +
+ " CommitLogOffset: 1234567890, PreparedTransactionOffset: 1234567890]"
+ assert.Equal(t, expect, msgExt.String())
}
diff --git a/core/producer.go b/core/producer.go
index 3cccf21..77ed63b 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -20,6 +20,7 @@
#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
#include <stdio.h>
+#include <stdlib.h>
#include "rocketmq/CMessage.h"
#include "rocketmq/CProducer.h"
#include "rocketmq/CSendResult.h"
@@ -31,7 +32,9 @@
*/
import "C"
import (
+ "errors"
"fmt"
+ log "github.com/sirupsen/logrus"
"unsafe"
)
@@ -59,21 +62,115 @@
}
}
-func newDefaultProducer(config *ProducerConfig) *defaultProducer {
- producer := &defaultProducer{config: config}
- producer.cproduer = C.CreateProducer(C.CString(config.GroupID))
- code := int(C.SetProducerNameServerAddress(producer.cproduer, C.CString(producer.config.NameServer)))
- if config.Credentials != nil {
- ret := C.SetProducerSessionCredentials(producer.cproduer,
- C.CString(config.Credentials.AccessKey),
- C.CString(config.Credentials.SecretKey),
- C.CString(config.Credentials.Channel))
- code = int(ret)
+func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
+ if config == nil {
+ return nil, errors.New("config is nil")
}
- switch code {
+ if config.GroupID == "" {
+ return nil, errors.New("GroupId is empty")
}
- return producer
+
+ if config.NameServer == "" && config.NameServerDomain == "" {
+ return nil, errors.New("NameServer and NameServerDomain is empty")
+ }
+
+ producer := &defaultProducer{config: config}
+ cs := C.CString(config.GroupID)
+ cproduer := C.CreateProducer(cs)
+ C.free(unsafe.Pointer(cs))
+
+ if cproduer == nil {
+ return nil, errors.New("create Producer failed, please check cpp logs for details")
+ }
+
+ var code int
+ if config.NameServer != "" {
+ cs = C.CString(config.NameServer)
+ code = int(C.SetProducerNameServerAddress(cproduer, cs))
+ C.free(unsafe.Pointer(cs))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set NameServerAddress error, code is: %d"+
+ "please check cpp logs for details", code))
+ }
+ }
+
+ if config.NameServerDomain != "" {
+ cs = C.CString(config.NameServerDomain)
+ code = int(C.SetProducerNameServerDomain(cproduer, cs))
+ C.free(unsafe.Pointer(cs))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set NameServerDomain error, code is: %d"+
+ "please check cpp logs for details", code))
+ }
+ }
+
+ if config.InstanceName != "" {
+ cs = C.CString(config.InstanceName)
+ code = int(C.SetProducerInstanceName(cproduer, cs))
+ C.free(unsafe.Pointer(cs))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set InstanceName error, code is: %d"+
+ "please check cpp logs for details", code))
+ }
+ }
+
+ if config.Credentials != nil {
+ ak := C.CString(config.Credentials.AccessKey)
+ sk := C.CString(config.Credentials.SecretKey)
+ ch := C.CString(config.Credentials.Channel)
+ code = int(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+
+ C.free(unsafe.Pointer(ak))
+ C.free(unsafe.Pointer(sk))
+ C.free(unsafe.Pointer(ch))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set Credentials error, code is: %d", code))
+ }
+ }
+
+ if config.LogC != nil {
+ cs = C.CString(config.LogC.Path)
+ code = int(C.SetProducerLogPath(cproduer, cs))
+ C.free(unsafe.Pointer(cs))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+ }
+
+ code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+ }
+
+ code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+ }
+ }
+
+ if config.SendMsgTimeout > 0 {
+ code = int(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set SendMsgTimeout error, code is: %d", code))
+ }
+ }
+
+ if config.CompressLevel > 0 {
+ code = int(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set CompressLevel error, code is: %d", code))
+ }
+ }
+
+ if config.MaxMessageSize > 0 {
+ code = int(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set MaxMessageSize error, code is: %d", code))
+ }
+ }
+
+ producer.cproduer = cproduer
+ return producer, nil
}
type defaultProducer struct {
@@ -87,19 +184,26 @@
// Start the producer.
func (p *defaultProducer) Start() error {
- err := int(C.StartProducer(p.cproduer))
- // TODO How to process err code.
- fmt.Printf("producer start result: %v \n", err)
+ code := int(C.StartProducer(p.cproduer))
+ if code != 0 {
+ return errors.New(fmt.Sprintf("start producer error, error code is: %d", code))
+ }
return nil
}
// Shutdown the producer.
func (p *defaultProducer) Shutdown() error {
- defer C.DestroyProducer(p.cproduer)
- err := C.ShutdownProducer(p.cproduer)
+ code := int(C.ShutdownProducer(p.cproduer))
- // TODO How to process err code.
- fmt.Printf("shutdown result: %v \n", err)
+ if code != 0 {
+ log.Warnf("shutdown producer error, error code is: %d", code)
+ }
+
+ code = int(int(C.DestroyProducer(p.cproduer)))
+ if code != 0 {
+ log.Warnf("destroy producer error, error code is: %d", code)
+ }
+
return nil
}
@@ -108,7 +212,11 @@
defer C.DestroyMessage(cmsg)
var sr C.struct__SendResult_
- C.SendMessageSync(p.cproduer, cmsg, &sr)
+ code := int(C.SendMessageSync(p.cproduer, cmsg, &sr))
+
+ if code != 0 {
+ log.Warnf("send message error, error code is: %d", code)
+ }
result := SendResult{}
result.Status = SendStatus(sr.sendStatus)
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index 19a033c..6dcac43 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -65,10 +65,7 @@
// PullConsumerConfig the configuration for the pull consumer
type PullConsumerConfig struct {
- GroupID string
- NameServer string
- Credentials *SessionCredentials
- Log *LogConfig
+ clientConfig
}
// DefaultPullConsumer default consumer pulling the message
@@ -89,6 +86,10 @@
// NewPullConsumer creates one pull consumer
func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
+ if conf == nil {
+ return nil, errors.New("config is nil")
+ }
+
cs := C.CString(conf.GroupID)
cconsumer := C.CreatePullConsumer(cs)
C.free(unsafe.Pointer(cs))
@@ -97,7 +98,7 @@
C.SetPullConsumerNameServerAddress(cconsumer, cs)
C.free(unsafe.Pointer(cs))
- log := conf.Log
+ log := conf.LogC
if log != nil {
cs = C.CString(log.Path)
if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
@@ -131,21 +132,21 @@
func (c *DefaultPullConsumer) Start() error {
r := C.StartPullConsumer(c.cconsumer)
if r != 0 {
- return fmt.Errorf("start failed, code:%d", r)
+ return fmt.Errorf("start failed, code:%d", int(r))
}
return nil
}
-// Shutdown shutdown the pulling conumser
+// Shutdown shutdown the pulling consumer
func (c *DefaultPullConsumer) Shutdown() error {
r := C.ShutdownPullConsumer(c.cconsumer)
if r != 0 {
- return fmt.Errorf("shutdown failed, code:%d", r)
+ return fmt.Errorf("shutdown failed, code:%d", int(r))
}
r = C.DestroyPullConsumer(c.cconsumer)
if r != 0 {
- return fmt.Errorf("destory failed, code:%d", r)
+ return fmt.Errorf("destory failed, code:%d", int(r))
}
return nil
}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index e431d95..5f63b16 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -18,9 +18,9 @@
/*
#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
+#include <stdlib.h>
#include "rocketmq/CMessageExt.h"
#include "rocketmq/CPushConsumer.h"
-#include "stdio.h"
extern int consumeMessageCallback(CPushConsumer *consumer, CMessageExt *msg);
@@ -32,10 +32,14 @@
import (
"fmt"
+ "github.com/pkg/errors"
+ "github.com/prometheus/common/log"
"sync"
"unsafe"
)
+type MessageModel C.CMessageModel
+
type ConsumeStatus int
const (
@@ -55,7 +59,7 @@
}
type defaultPushConsumer struct {
- config *ConsumerConfig
+ config *PushConsumerConfig
cconsumer *C.struct_CPushConsumer
funcsMap sync.Map
}
@@ -69,18 +73,112 @@
return fmt.Sprintf("[%s, subcribed topics: [%s]]", c.config, topics)
}
-func newPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
+ if config == nil {
+ return nil, errors.New("config is nil")
+ }
+ if config.GroupID == "" {
+ return nil, errors.New("GroupId is empty.")
+ }
+
+ if config.NameServer == "" && config.NameServerDomain == "" {
+ return nil, errors.New("NameServer and NameServerDomain is empty.")
+ }
+
consumer := &defaultPushConsumer{config: config}
- cconsumer := C.CreatePushConsumer(C.CString(config.GroupID))
- C.SetPushConsumerNameServerAddress(cconsumer, C.CString(config.NameServer))
- C.SetPushConsumerThreadCount(cconsumer, C.int(config.ConsumerThreadCount))
- C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.ConsumerThreadCount))
- C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo)))
+ cs := C.CString(config.GroupID)
+ cconsumer := C.CreatePushConsumer(cs)
+ C.free(unsafe.Pointer(cs))
+
+ if cconsumer == nil {
+ return nil, errors.New("Create PushConsumer failed")
+ }
+
+ var code int
+ if config.NameServer != "" {
+ cs = C.CString(config.NameServer)
+ code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
+ C.free(unsafe.Pointer(cs))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf(fmt.Sprintf("PushConsumer Set NameServerAddress error, code is: %d", code)))
+ }
+ }
+
+ if config.NameServerDomain != "" {
+ cs = C.CString(config.NameServerDomain)
+ code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
+ C.free(unsafe.Pointer(cs))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("PushConsumer Set NameServerDomain error, code is: %d", code))
+ }
+ }
+
+ if config.InstanceName != "" {
+ cs = C.CString(config.InstanceName)
+ code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
+ C.free(unsafe.Pointer(cs))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("PushConsumer Set InstanceName error, code is: %d, "+
+ "please check cpp logs for details", code))
+ }
+ }
+
if config.Credentials != nil {
- C.SetPushConsumerSessionCredentials(cconsumer,
- C.CString(config.Credentials.AccessKey),
- C.CString(config.Credentials.SecretKey),
- C.CString(config.Credentials.Channel))
+ ak := C.CString(config.Credentials.AccessKey)
+ sk := C.CString(config.Credentials.SecretKey)
+ ch := C.CString(config.Credentials.Channel)
+ code = int(C.SetPushConsumerSessionCredentials(cconsumer, ak, sk, ch))
+ C.free(unsafe.Pointer(ak))
+ C.free(unsafe.Pointer(sk))
+ C.free(unsafe.Pointer(ch))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("PushConsumer Set Credentials error, code is: %d", int(code)))
+ }
+ }
+
+ if config.LogC != nil {
+ cs = C.CString(config.LogC.Path)
+ code = int(C.SetProducerLogPath(cconsumer, cs))
+ C.free(unsafe.Pointer(cs))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+ }
+
+ code = int(C.SetProducerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+ }
+
+ code = int(C.SetProducerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+ }
+ }
+
+ if config.ThreadCount > 0 {
+ code = int(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("PushConsumer Set ThreadCount error, code is: %d", int(code)))
+ }
+ }
+
+ if config.MessageBatchMaxSize > 0 {
+ code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code)))
+ }
+ }
+
+ code = int(C.SetPushConsumerMessageModel(cconsumer, (C.CMessageModel)(config.Model)))
+
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code)))
+ }
+
+ code = int(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
+
+ if code != 0 {
+ return nil, errors.New(fmt.Sprintf("PushConsumer RegisterMessageCallback error, code is: %d", int(code)))
}
consumer.cconsumer = cconsumer
@@ -89,13 +187,24 @@
}
func (c *defaultPushConsumer) Start() error {
- C.StartPushConsumer(c.cconsumer)
+ code := C.StartPushConsumer(c.cconsumer)
+ if code != 0 {
+ return errors.New(fmt.Sprintf("start PushConsumer error, code is: %d", int(code)))
+ }
return nil
}
func (c *defaultPushConsumer) Shutdown() error {
- C.ShutdownPushConsumer(c.cconsumer)
+ code := C.ShutdownPushConsumer(c.cconsumer)
+
+ if code != 0 {
+ log.Warnf("Shutdown PushConsumer error, code is: %d, please check cpp logs for details", code)
+ }
+
C.DestroyPushConsumer(c.cconsumer)
+ if code != 0 {
+ log.Warnf("Destroy PushConsumer error, code is: %d, please check cpp logs for details", code)
+ }
return nil
}
@@ -104,9 +213,11 @@
if exist {
return nil
}
- err := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
- fmt.Println("err:", err)
+ code := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
+ if code != 0 {
+ return errors.New(fmt.Sprintf("subscribe topic: %s failed, error code is: %d", topic, int(code)))
+ }
c.funcsMap.Store(topic, consumeFunc)
- fmt.Printf("subscribe topic[%s] with expression[%s] successfully. \n", topic, expression)
+ log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
return nil
}
diff --git a/core/version.go b/core/version.go
index 5700714..4ac86d5 100644
--- a/core/version.go
+++ b/core/version.go
@@ -16,8 +16,8 @@
*/
package rocketmq
-const GO_CLIENT_VERSION = "Go Client V1.0.0, BuildTime:2018.10.30"
+const GoClientVersion = "Go Client V1.0.0, BuildTime:2018.10.30"
func GetVersion() (version string) {
- return GO_CLIENT_VERSION
+ return GoClientVersion
}
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
index 11fb864..5559d87 100644
--- a/examples/orderproducer/producer.go
+++ b/examples/orderproducer/producer.go
@@ -83,11 +83,16 @@
return
}
- producer := rocketmq.NewProducer(&rocketmq.ProducerConfig{
- GroupID: groupID,
- NameServer: namesrvAddrs,
- })
-
+ cfg := &rocketmq.ProducerConfig{}
+ cfg.GroupID = groupID
+ cfg.NameServer = namesrvAddrs
+
+ producer, err := rocketmq.NewProducer(cfg)
+ if err != nil {
+ fmt.Println("create Producer failed, error:", err)
+ return
+ }
+
producer.Start()
defer producer.Shutdown()
diff --git a/examples/producer.go b/examples/producer.go
index 98a45b0..56ecee2 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -24,20 +24,23 @@
)
func main() {
- SendMessage()
-}
+ cfg := &rocketmq.ProducerConfig{}
+ cfg.GroupID = "testGroup"
+ cfg.NameServer = "47.101.55.250:9876"
+ producer, err := rocketmq.NewProducer(cfg)
+ if err != nil {
+ fmt.Println("create Producer failed, error:", err)
+ return
+ }
-func SendMessage() {
- producer := rocketmq.NewProducer(&rocketmq.ProducerConfig{GroupID: "testGroup", NameServer: "localhost:9876"})
producer.Start()
defer producer.Shutdown()
fmt.Printf("Producer: %s started... \n", producer)
for i := 0; i < 100; i++ {
msg := fmt.Sprintf("Hello RocketMQ-%d", i)
- result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+ result := producer.SendMessageSync(&rocketmq.Message{Topic: "wwf1", Body: msg})
fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
}
time.Sleep(10 * time.Second)
- producer.Shutdown()
}
diff --git a/examples/pullconsumer/consumer.go b/examples/pullconsumer/consumer.go
index 3643a22..d63729e 100644
--- a/examples/pullconsumer/consumer.go
+++ b/examples/pullconsumer/consumer.go
@@ -59,13 +59,14 @@
return
}
- consumer, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
- GroupID: groupID,
- NameServer: namesrvAddrs,
- Log: &rocketmq.LogConfig{
- Path: "example",
- },
- })
+ cfg := &rocketmq.PullConsumerConfig{}
+ cfg.GroupID = groupID
+ cfg.NameServer = namesrvAddrs
+ cfg.LogC = &rocketmq.LogConfig{
+ Path: "example",
+ }
+
+ consumer, err := rocketmq.NewPullConsumer(cfg)
if err != nil {
fmt.Printf("new pull consumer error:%s\n", err)
return
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index e65613b..d17559c 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -24,13 +24,18 @@
)
func main() {
- PushConsumeMessage()
-}
-
-func PushConsumeMessage() {
fmt.Println("Start Receiving Messages...")
- consumer, _ := rocketmq.NewPushConsumer(&rocketmq.ConsumerConfig{GroupID: "testGroupId", NameServer: "localhost:9876",
- ConsumerThreadCount: 2, MessageBatchMaxSize: 16})
+ cfg := &rocketmq.PushConsumerConfig{
+ ThreadCount: 2,
+ MessageBatchMaxSize: 16,
+ }
+ cfg.GroupID = "testGroupId"
+ cfg.NameServer = "localhost:9876"
+ consumer, err := rocketmq.NewPushConsumer(cfg)
+ if err != nil {
+ fmt.Println("create Consumer failed, error:", err)
+ return
+ }
// MUST subscribe topic before consumer started.
consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {