Add more configurable field and add log framework
diff --git a/cmd/producer.go b/cmd/producer.go
new file mode 100644
index 0000000..df8712a
--- /dev/null
+++ b/cmd/producer.go
@@ -0,0 +1,54 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "github.com/apache/rocketmq-client-go/core"
+)
+
+var (
+ namesrvAddrs string
+ topic string
+ body string
+ groupID string
+ keys string
+)
+
+func init() {
+ flag.StringVar(&namesrvAddrs, "addr", "", "name server address")
+ flag.StringVar(&topic, "t", "", "topic name")
+ flag.StringVar(&groupID, "group", "", "producer group")
+ flag.StringVar(&body, "body", "", "message body")
+ flag.StringVar(&keys, "keys", "", "message keys")
+
+}
+
+func main() {
+ flag.Parse()
+ if namesrvAddrs == "" {
+ println("empty nameServer address")
+ return
+ }
+
+ if topic == "" {
+ println("empty topic")
+ return
+ }
+
+ if body == "" {
+ println("empty body")
+ return
+ }
+
+ if groupID == "" {
+ println("empty groupID")
+ return
+ }
+
+ producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{GroupID: groupID, NameServer: namesrvAddrs})
+ producer.Start()
+ defer producer.Shutdown()
+
+ result := producer.SendMessageSync(&rocketmq.Message{Topic: topic, Body: body, Keys: keys})
+ println(fmt.Sprintf("send message result: %s", result))
+}
\ No newline at end of file
diff --git a/core/api.go b/core/api.go
index bf38a78..69f3567 100644
--- a/core/api.go
+++ b/core/api.go
@@ -23,20 +23,31 @@
}
// NewProducer create a new producer with config
-func NewProducer(config *ProducerConfig) Producer {
+func NewProducer(config *ProducerConfig) (Producer, error) {
return newDefaultProducer(config)
}
// ProducerConfig define a producer
type ProducerConfig struct {
- GroupID string
- NameServer string
- Credentials *SessionCredentials
+ GroupID string
+ NameServer string
+ NameServerDomain string
+ GroupName string
+ InstanceName string
+ Credentials *SessionCredentials
+
+ // TODO log config
+ SendMsgTimeout int
+ CompressLevel int
+ MaxMessageSize int
}
func (config *ProducerConfig) String() string {
// For security, don't print Credentials default.
- return fmt.Sprintf("[groupId: %s, nameServer: %s]", config.NameServer, config.GroupID)
+ return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, NameServer: %s, "+
+ "SendMsgTimeout: %d, CompressLevel: %d, MaxMessageSize: %d, ]", config.NameServer, config.GroupID,
+ config.NameServerDomain, config.GroupName, config.InstanceName, config.SendMsgTimeout, config.CompressLevel,
+ config.MaxMessageSize)
}
type Producer interface {
@@ -52,23 +63,28 @@
}
// NewPushConsumer create a new consumer with config.
-func NewPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
return newPushConsumer(config)
}
-// ConsumerConfig define a new consumer.
-type ConsumerConfig struct {
- GroupID string
- NameServer string
- ConsumerThreadCount int
+// ConsumerConfig define a new conusmer.
+type PushConsumerConfig struct {
+ GroupID string
+ NameServer string
+ NameServerDomain string
+ InstanceName string
+ Credentials *SessionCredentials
+
+ // TODO log config
+ ThreadCount int
MessageBatchMaxSize int
- //ConsumerInstanceName int
- Credentials *SessionCredentials
+ Model MessageModel
}
-func (config *ConsumerConfig) String() string {
- return fmt.Sprintf("[groupId: %s, nameServer: %s, consumerThreadCount: %d, messageBatchMaxSize: %d]",
- config.GroupID, config.NameServer, config.ConsumerThreadCount, config.MessageBatchMaxSize)
+func (config *PushConsumerConfig) String() string {
+ return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, "+
+ "ThreadCount: %d, MessageBatchMaxSize: %d, Model: %v ]", config.NameServer, config.GroupID,
+ config.NameServerDomain, config.InstanceName, config.ThreadCount, config.MessageBatchMaxSize, config.Model)
}
type PushConsumer interface {
@@ -105,7 +121,6 @@
func (result SendResult) String() string {
return fmt.Sprintf("[status: %s, messageId: %s, offset: %d]", result.Status, result.MsgId, result.Offset)
-
}
type baseAPI interface {
diff --git a/core/producer.go b/core/producer.go
index 3cccf21..bd9ed19 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -31,7 +31,8 @@
*/
import "C"
import (
- "fmt"
+ "errors"
+ log "github.com/sirupsen/logrus"
"unsafe"
)
@@ -59,21 +60,85 @@
}
}
-func newDefaultProducer(config *ProducerConfig) *defaultProducer {
+func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
+ if config.GroupID == "" {
+ return nil, errors.New("GroupId is empty.")
+ }
+
+ if config.NameServer == "" && config.NameServerDomain == "" {
+ return nil, errors.New("NameServer and NameServerDomain is empty.")
+ }
+
+
producer := &defaultProducer{config: config}
- producer.cproduer = C.CreateProducer(C.CString(config.GroupID))
- code := int(C.SetProducerNameServerAddress(producer.cproduer, C.CString(producer.config.NameServer)))
+ cproduer := C.CreateProducer(C.CString(config.GroupID))
+
+ if cproduer == nil {
+ log.Fatal("Create Producer failed, please check cpp logs for details.")
+ }
+
+ var code int
+ if config.NameServer != "" {
+ code = int(C.SetProducerNameServerAddress(cproduer, C.CString(config.NameServer)))
+ if code != 0 {
+ log.Fatalf("Producer Set NameServerAddress error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
+ if config.NameServerDomain != "" {
+ code = int(C.SetProducerNameServerDomain(cproduer, C.CString(config.NameServerDomain)))
+ if code != 0 {
+ log.Fatalf("Producer Set NameServerDomain error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
+ if config.InstanceName != "" {
+ code = int(C.SetProducerInstanceName(cproduer, C.CString(config.InstanceName)))
+ if code != 0 {
+ log.Fatalf("Producer Set InstanceName error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
if config.Credentials != nil {
- ret := C.SetProducerSessionCredentials(producer.cproduer,
+ code = int(C.SetProducerSessionCredentials(cproduer,
C.CString(config.Credentials.AccessKey),
C.CString(config.Credentials.SecretKey),
- C.CString(config.Credentials.Channel))
- code = int(ret)
+ C.CString(config.Credentials.Channel)))
+ if code != 0 {
+ log.Fatalf("Producer Set Credentials error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
}
- switch code {
+ if config.SendMsgTimeout > 0 {
+ code = int(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+ if code != 0 {
+ log.Fatalf("Producer Set SendMsgTimeout error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
}
- return producer
+
+ if config.CompressLevel > 0 {
+ code = int(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+ if code != 0 {
+ log.Fatalf("Producer Set CompressLevel error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
+ if config.MaxMessageSize > 0 {
+ code = int(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+ if code != 0 {
+ log.Fatalf("Producer Set MaxMessageSize error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
+ producer.cproduer = cproduer
+ return producer, nil
}
type defaultProducer struct {
@@ -87,19 +152,26 @@
// Start the producer.
func (p *defaultProducer) Start() error {
- err := int(C.StartProducer(p.cproduer))
- // TODO How to process err code.
- fmt.Printf("producer start result: %v \n", err)
+ code := int(C.StartProducer(p.cproduer))
+ if code != 0 {
+ log.Fatalf("start producer error, error code is: %d", code)
+ }
return nil
}
// Shutdown the producer.
func (p *defaultProducer) Shutdown() error {
- defer C.DestroyProducer(p.cproduer)
- err := C.ShutdownProducer(p.cproduer)
+ code := int(C.ShutdownProducer(p.cproduer))
- // TODO How to process err code.
- fmt.Printf("shutdown result: %v \n", err)
+ if code != 0 {
+ log.Warnf("shutdown producer error, error code is: %d", code)
+ }
+
+ code = int(int(C.DestroyProducer(p.cproduer)))
+ if code != 0 {
+ log.Warnf("destroy producer error, error code is: %d", code)
+ }
+
return nil
}
@@ -108,7 +180,11 @@
defer C.DestroyMessage(cmsg)
var sr C.struct__SendResult_
- C.SendMessageSync(p.cproduer, cmsg, &sr)
+ code := int(C.SendMessageSync(p.cproduer, cmsg, &sr))
+
+ if code != 0 {
+ log.Warnf("send message error, error code is: %d", code)
+ }
result := SendResult{}
result.Status = SendStatus(sr.sendStatus)
diff --git a/core/push_consumer.go b/core/push_consumer.go
index e431d95..a0236e7 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -32,10 +32,14 @@
import (
"fmt"
+ "github.com/pkg/errors"
+ "github.com/prometheus/common/log"
"sync"
"unsafe"
)
+type MessageModel C.CMessageModel
+
type ConsumeStatus int
const (
@@ -55,7 +59,7 @@
}
type defaultPushConsumer struct {
- config *ConsumerConfig
+ config *PushConsumerConfig
cconsumer *C.struct_CPushConsumer
funcsMap sync.Map
}
@@ -69,18 +73,90 @@
return fmt.Sprintf("[%s, subcribed topics: [%s]]", c.config, topics)
}
-func newPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
+ if config.GroupID == "" {
+ return nil, errors.New("GroupId is empty.")
+ }
+
+ if config.NameServer == "" && config.NameServerDomain == "" {
+ return nil, errors.New("NameServer and NameServerDomain is empty.")
+ }
+
+ //if config.Model == nil {
+ // return nil, errors.New("MessageModel is nil")
+ //}
+
consumer := &defaultPushConsumer{config: config}
cconsumer := C.CreatePushConsumer(C.CString(config.GroupID))
- C.SetPushConsumerNameServerAddress(cconsumer, C.CString(config.NameServer))
- C.SetPushConsumerThreadCount(cconsumer, C.int(config.ConsumerThreadCount))
- C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.ConsumerThreadCount))
- C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo)))
+
+ if cconsumer == nil {
+ log.Fatal("Create PushConsumer failed, please check cpp logs for details.")
+ }
+
+ var code int
+ if config.NameServer != "" {
+ code = int(C.SetPushConsumerNameServerAddress(cconsumer, C.CString(config.NameServer)))
+ if code != 0 {
+ log.Fatalf("PushConsumer Set NameServerAddress error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
+ if config.NameServerDomain != "" {
+ code = int(C.SetPushConsumerNameServerDomain(cconsumer, C.CString(config.NameServerDomain)))
+ if code != 0 {
+ log.Fatalf("PushConsumer Set NameServerDomain error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
+ if config.InstanceName != "" {
+ code = int(C.SetPushConsumerInstanceName(cconsumer, C.CString(config.InstanceName)))
+ if code != 0 {
+ log.Fatalf("PushConsumer Set InstanceName error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
if config.Credentials != nil {
- C.SetPushConsumerSessionCredentials(cconsumer,
+ code = int(C.SetPushConsumerSessionCredentials(cconsumer,
C.CString(config.Credentials.AccessKey),
C.CString(config.Credentials.SecretKey),
- C.CString(config.Credentials.Channel))
+ C.CString(config.Credentials.Channel)))
+ if code != 0 {
+ log.Fatalf("PushConsumer Set Credentials error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
+ if config.ThreadCount > 0 {
+ code = int(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
+ if code != 0 {
+ log.Fatalf("PushConsumer Set ThreadCount error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
+ if config.MessageBatchMaxSize > 0 {
+ code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
+ if code != 0 {
+ log.Fatalf("PushConsumer Set MessageBatchMaxSize error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+ }
+
+ code = int(C.SetPushConsumerMessageModel(cconsumer, (C.CMessageModel)(config.Model)))
+
+ if code != 0 {
+ log.Fatalf("PushConsumer Set ConsumerMessageModel error, code is: %d, " +
+ "please check cpp logs for details", code)
+ }
+
+ code = int(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
+
+ if code != 0 {
+ log.Fatalf("PushConsumer RegisterMessageCallback error, code is: %d, " +
+ "please check cpp logs for details", code)
}
consumer.cconsumer = cconsumer
@@ -89,13 +165,24 @@
}
func (c *defaultPushConsumer) Start() error {
- C.StartPushConsumer(c.cconsumer)
+ code := C.StartPushConsumer(c.cconsumer)
+ if code != 0{
+ log.Fatalf("start PushConsumer error, code is: %d, please check cpp logs for details", code)
+ }
return nil
}
func (c *defaultPushConsumer) Shutdown() error {
- C.ShutdownPushConsumer(c.cconsumer)
+ code := C.ShutdownPushConsumer(c.cconsumer)
+
+ if code != 0 {
+ log.Warnf("Shutdown PushConsumer error, code is: %d, please check cpp logs for details", code)
+ }
+
C.DestroyPushConsumer(c.cconsumer)
+ if code != 0 {
+ log.Warnf("Destroy PushConsumer error, code is: %d, please check cpp logs for details", code)
+ }
return nil
}
@@ -104,9 +191,11 @@
if exist {
return nil
}
- err := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
- fmt.Println("err:", err)
+ code := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
+ if code != 0 {
+ return errors.New(fmt.Sprintf("subscribe topic: %s failed, error code is: %d", topic, int(code)))
+ }
c.funcsMap.Store(topic, consumeFunc)
- fmt.Printf("subscribe topic[%s] with expression[%s] successfully. \n", topic, expression)
+ log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
return nil
}
diff --git a/examples/producer.go b/examples/producer.go
index 98a45b0..62ddfa0 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -24,10 +24,6 @@
)
func main() {
- SendMessage()
-}
-
-func SendMessage() {
producer := rocketmq.NewProducer(&rocketmq.ProducerConfig{GroupID: "testGroup", NameServer: "localhost:9876"})
producer.Start()
defer producer.Shutdown()
@@ -35,9 +31,9 @@
fmt.Printf("Producer: %s started... \n", producer)
for i := 0; i < 100; i++ {
msg := fmt.Sprintf("Hello RocketMQ-%d", i)
- result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+ result := producer.SendMessageSync(&rocketmq.Message{Topic: "wwf1", Body: msg})
fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
}
time.Sleep(10 * time.Second)
- producer.Shutdown()
}
+
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index e65613b..590e5a5 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -24,13 +24,9 @@
)
func main() {
- PushConsumeMessage()
-}
-
-func PushConsumeMessage() {
fmt.Println("Start Receiving Messages...")
- consumer, _ := rocketmq.NewPushConsumer(&rocketmq.ConsumerConfig{GroupID: "testGroupId", NameServer: "localhost:9876",
- ConsumerThreadCount: 2, MessageBatchMaxSize: 16})
+ consumer, _ := rocketmq.NewPushConsumer(&rocketmq.PushConsumerConfig{GroupID: "testGroupId", NameServer: "localhost:9876",
+ ThreadCount: 2, MessageBatchMaxSize: 16})
// MUST subscribe topic before consumer started.
consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {