Optimizing Codebase Style (#19)
* Make PullConsumer be consistent with others files
* Optimizing struct of examples
* Porcessing error gracefully
diff --git a/cmd/producer.go b/cmd/producer.go
deleted file mode 100644
index b67f50d..0000000
--- a/cmd/producer.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package main
-
-import (
- "flag"
- "fmt"
- "github.com/apache/rocketmq-client-go/core"
-)
-
-var (
- namesrvAddrs string
- topic string
- body string
- groupID string
- keys string
-)
-
-func init() {
- flag.StringVar(&namesrvAddrs, "addr", "", "name server address")
- flag.StringVar(&topic, "t", "", "topic name")
- flag.StringVar(&groupID, "group", "", "producer group")
- flag.StringVar(&body, "body", "", "message body")
- flag.StringVar(&keys, "keys", "", "message keys")
-
-}
-
-func main() {
- flag.Parse()
- if namesrvAddrs == "" {
- println("empty nameServer address")
- return
- }
-
- if topic == "" {
- println("empty topic")
- return
- }
-
- if body == "" {
- println("empty body")
- return
- }
-
- if groupID == "" {
- println("empty groupID")
- return
- }
-
- cfg := &rocketmq.ProducerConfig{}
- cfg.GroupID = groupID
- cfg.NameServer = namesrvAddrs
-
- producer, _ := rocketmq.NewProducer(cfg)
- producer.Start()
- defer producer.Shutdown()
-
- result := producer.SendMessageSync(&rocketmq.Message{Topic: topic, Body: body, Keys: keys})
- println(fmt.Sprintf("send message result: %s", result))
-}
diff --git a/core/api.go b/core/api.go
index 1ed6c82..8a6fdd3 100644
--- a/core/api.go
+++ b/core/api.go
@@ -32,7 +32,7 @@
LogC *LogConfig
}
-func (config *ClientConfig) string() string {
+func (config *ClientConfig) String() string {
// For security, don't print Credentials.
str := ""
str = strJoin(str, "GroupId", config.GroupID)
@@ -62,7 +62,7 @@
}
func (config *ProducerConfig) String() string {
- str := "ProducerConfig=[" + config.ClientConfig.string()
+ str := "ProducerConfig=[" + config.ClientConfig.String()
if config.SendMsgTimeout > 0 {
str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
@@ -82,13 +82,17 @@
type Producer interface {
baseAPI
// SendMessageSync send a message with sync
- SendMessageSync(msg *Message) SendResult
+ SendMessageSync(msg *Message) (*SendResult, error)
// SendMessageOrderly send the message orderly
- SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult
+ SendMessageOrderly(
+ msg *Message,
+ selector MessageQueueSelector,
+ arg interface{},
+ autoRetryTimes int) (*SendResult, error)
// SendMessageOneway send a message with oneway
- SendMessageOneway(msg *Message)
+ SendMessageOneway(msg *Message) error
}
// NewPushConsumer create a new consumer with config.
@@ -124,7 +128,7 @@
func (config *PushConsumerConfig) String() string {
// For security, don't print Credentials.
- str := "PushConsumerConfig=[" + config.ClientConfig.string()
+ str := "PushConsumerConfig=[" + config.ClientConfig.String()
if config.ThreadCount > 0 {
str = strJoin(str, "ThreadCount", config.ThreadCount)
@@ -148,6 +152,15 @@
Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
}
+// PullConsumerConfig the configuration for the pull consumer
+type PullConsumerConfig struct {
+ ClientConfig
+}
+
+func (config *PullConsumerConfig) String() string {
+ return "PushConsumerConfig=[" + config.ClientConfig.String() + "]"
+}
+
// PullConsumer consumer pulling the message
type PullConsumer interface {
baseAPI
@@ -176,7 +189,7 @@
Offset int64
}
-func (result SendResult) String() string {
+func (result *SendResult) String() string {
return fmt.Sprintf("[status: %s, messageId: %s, offset: %d]", result.Status, result.MsgId, result.Offset)
}
diff --git a/core/api_test.go b/core/api_test.go
index fc507f0..01f360b 100644
--- a/core/api_test.go
+++ b/core/api_test.go
@@ -37,8 +37,8 @@
pConfig.CompressLevel = 4
pConfig.MaxMessageSize = 1024
- expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: NameServerDomain, " +
- "GroupId: testGroup, InstanceName: testProducer, " +
+ expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
+ "GroupName: producerGroupName, InstanceName: testProducer, " +
"LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, S" +
"endMsgTimeout: 30, CompressLevel: 4, MaxMessageSize: 1024, ]"
assert.Equal(t, expect, pConfig.String())
diff --git a/core/error.go b/core/error.go
new file mode 100644
index 0000000..b50b4b2
--- /dev/null
+++ b/core/error.go
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rocketmq
+
+/*
+#include "rocketmq/CCommon.h"
+*/
+import "C"
+import "fmt"
+
+type rmqError int
+
+const (
+ NIL = rmqError(C.OK)
+ ErrNullPoint = rmqError(C.NULL_POINTER)
+ ErrMallocFailed = rmqError(C.MALLOC_FAILED)
+ ErrProducerStartFailed = rmqError(C.PRODUCER_START_FAILED)
+ ErrSendSyncFailed = rmqError(C.PRODUCER_SEND_SYNC_FAILED)
+ ErrSendOnewayFailed = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED)
+ ErrSendOrderlyFailed = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED)
+ ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START)
+ ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED)
+ ErrFetchMQFailed = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED)
+ ErrFetchMessageFailed = rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED)
+)
+
+func (e rmqError) Error() string {
+ switch e {
+ case ErrNullPoint:
+ return "null point"
+ case ErrMallocFailed:
+ return "malloc memory failed"
+ case ErrProducerStartFailed:
+ return "start producer failed"
+ case ErrSendSyncFailed:
+ return "send message with sync failed"
+ case ErrSendOrderlyFailed:
+ return "send message with orderly failed"
+ case ErrSendOnewayFailed:
+ return "send message with oneway failed"
+ case ErrPushConsumerStartFailed:
+ return "start push-consumer failed"
+ case ErrPullConsumerStartFailed:
+ return "start pull-consumer failed"
+ case ErrFetchMQFailed:
+ return "fetch MessageQueue failed"
+ case ErrFetchMessageFailed:
+ return "fetch Message failed"
+ default:
+ return fmt.Sprintf("unknow error: %v", int(e))
+ }
+}
diff --git a/core/producer.go b/core/producer.go
index f402589..8ad3ea4 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -33,7 +33,6 @@
import "C"
import (
"errors"
- "fmt"
log "github.com/sirupsen/logrus"
"unsafe"
)
@@ -81,34 +80,34 @@
C.free(unsafe.Pointer(cs))
if cproduer == nil {
- return nil, errors.New("create Producer failed, please check cpp logs for details")
+ return nil, errors.New("create Producer failed")
}
- var code int
+ var err rmqError
if config.NameServer != "" {
cs = C.CString(config.NameServer)
- code = int(C.SetProducerNameServerAddress(cproduer, cs))
+ err = rmqError(C.SetProducerNameServerAddress(cproduer, cs))
C.free(unsafe.Pointer(cs))
- if code != 0 {
- return nil, fmt.Errorf("producer Set NameServerAddress error, code is: %d", code)
+ if err != NIL {
+ return nil, err
}
}
if config.NameServerDomain != "" {
cs = C.CString(config.NameServerDomain)
- code = int(C.SetProducerNameServerDomain(cproduer, cs))
+ err = rmqError(C.SetProducerNameServerDomain(cproduer, cs))
C.free(unsafe.Pointer(cs))
- if code != 0 {
- return nil, fmt.Errorf("producer Set NameServerDomain error, code is: %d", code)
+ if err != NIL {
+ return nil, err
}
}
if config.InstanceName != "" {
cs = C.CString(config.InstanceName)
- code = int(C.SetProducerInstanceName(cproduer, cs))
+ err = rmqError(C.SetProducerInstanceName(cproduer, cs))
C.free(unsafe.Pointer(cs))
- if code != 0 {
- return nil, fmt.Errorf("producer Set InstanceName error, code is: %d", code)
+ if err != NIL {
+ return nil, err
}
}
@@ -116,53 +115,53 @@
ak := C.CString(config.Credentials.AccessKey)
sk := C.CString(config.Credentials.SecretKey)
ch := C.CString(config.Credentials.Channel)
- code = int(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+ err = rmqError(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
C.free(unsafe.Pointer(ak))
C.free(unsafe.Pointer(sk))
C.free(unsafe.Pointer(ch))
- if code != 0 {
- return nil, fmt.Errorf("producer Set Credentials error, code is: %d", code)
+ if err != NIL {
+ return nil, err
}
}
if config.LogC != nil {
cs = C.CString(config.LogC.Path)
- code = int(C.SetProducerLogPath(cproduer, cs))
+ err = rmqError(C.SetProducerLogPath(cproduer, cs))
C.free(unsafe.Pointer(cs))
- if code != 0 {
- return nil, fmt.Errorf("producer Set LogPath error, code is: %d", code)
+ if err != NIL {
+ return nil, err
}
- code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
- if code != 0 {
- return nil, fmt.Errorf("producer Set FileNumAndSize error, code is: %d", code)
+ err = rmqError(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+ if err != NIL {
+ return nil, err
}
- code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
- if code != 0 {
- return nil, fmt.Errorf("producer Set LogLevel error, code is: %d", code)
+ err = rmqError(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+ if err != NIL {
+ return nil, err
}
}
if config.SendMsgTimeout > 0 {
- code = int(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
- if code != 0 {
- return nil, fmt.Errorf("producer Set SendMsgTimeout error, code is: %d", code)
+ err = rmqError(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+ if err != NIL {
+ return nil, err
}
}
if config.CompressLevel > 0 {
- code = int(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
- if code != 0 {
- return nil, fmt.Errorf("producer Set CompressLevel error, code is: %d", code)
+ err = rmqError(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+ if err != NIL {
+ return nil, err
}
}
if config.MaxMessageSize > 0 {
- code = int(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
- if code != 0 {
- return nil, fmt.Errorf("producer Set MaxMessageSize error, code is: %d", code)
+ err = rmqError(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+ if err != NIL {
+ return nil, err
}
}
@@ -181,77 +180,84 @@
// Start the producer.
func (p *defaultProducer) Start() error {
- code := int(C.StartProducer(p.cproduer))
- if code != 0 {
- return fmt.Errorf("start producer error, error code is: %d", code)
+ err := rmqError(C.StartProducer(p.cproduer))
+ if err != NIL {
+ return err
}
return nil
}
// Shutdown the producer.
func (p *defaultProducer) Shutdown() error {
- code := int(C.ShutdownProducer(p.cproduer))
+ err := rmqError(C.ShutdownProducer(p.cproduer))
- if code != 0 {
- log.Warnf("shutdown producer error, error code is: %d", code)
+ if err != NIL {
+ return err
}
- code = int(int(C.DestroyProducer(p.cproduer)))
- if code != 0 {
- log.Warnf("destroy producer error, error code is: %d", code)
+ err = rmqError(int(C.DestroyProducer(p.cproduer)))
+ if err != NIL {
+ return err
}
- return nil
+ return err
}
-func (p *defaultProducer) SendMessageSync(msg *Message) SendResult {
+func (p *defaultProducer) SendMessageSync(msg *Message) (*SendResult, error) {
cmsg := goMsgToC(msg)
defer C.DestroyMessage(cmsg)
var sr C.struct__SendResult_
- code := int(C.SendMessageSync(p.cproduer, cmsg, &sr))
+ err := rmqError(C.SendMessageSync(p.cproduer, cmsg, &sr))
- if code != 0 {
- log.Warnf("send message error, error code is: %d", code)
+ if err != NIL {
+ log.Warnf("send message error, error is: %s", err.Error())
+ return nil, err
}
- result := SendResult{}
+ result := &SendResult{}
result.Status = SendStatus(sr.sendStatus)
result.MsgId = C.GoString(&sr.msgId[0])
result.Offset = int64(sr.offset)
- return result
+ return result, nil
}
-func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult {
+func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) (*SendResult, error) {
cmsg := goMsgToC(msg)
+ defer C.DestroyMessage(cmsg)
key := selectors.put(&messageQueueSelectorWrapper{selector: selector, m: msg, arg: arg})
var sr C.struct__SendResult_
- C.SendMessageOrderly(
+ err := rmqError(C.SendMessageOrderly(
p.cproduer,
cmsg,
(C.QueueSelectorCallback)(unsafe.Pointer(C.queueSelectorCallback_cgo)),
unsafe.Pointer(&key),
C.int(autoRetryTimes),
- &sr,
- )
- C.DestroyMessage(cmsg)
-
- return SendResult{
+ &sr))
+
+ if err != NIL {
+ log.Warnf("send message orderly error, error is: %s", err.Error())
+ return nil, err
+ }
+
+ return &SendResult{
Status: SendStatus(sr.sendStatus),
MsgId: C.GoString(&sr.msgId[0]),
Offset: int64(sr.offset),
- }
+ }, nil
}
-func (p *defaultProducer) SendMessageOneway(msg *Message) {
+func (p *defaultProducer) SendMessageOneway(msg *Message) error {
cmsg := goMsgToC(msg)
defer C.DestroyMessage(cmsg)
- code := int(C.SendMessageOneway(p.cproduer, cmsg))
- if code != 0 {
- log.Warnf("send message with oneway error, error code is: %d", code)
- } else {
- log.Debugf("Send Message: %s with oneway success.", msg.String())
+ err := rmqError(C.SendMessageOneway(p.cproduer, cmsg))
+ if err != NIL {
+ log.Warnf("send message with oneway error, error is: %s", err.Error())
+ return err
}
+
+ log.Debugf("Send Message: %s with oneway success.", msg.String())
+ return nil
}
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index fdf8d76..74e4a0b 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -63,19 +63,14 @@
}
}
-// PullConsumerConfig the configuration for the pull consumer
-type PullConsumerConfig struct {
- ClientConfig
-}
-
-// DefaultPullConsumer default consumer pulling the message
-type DefaultPullConsumer struct {
+// defaultPullConsumer default consumer pulling the message
+type defaultPullConsumer struct {
PullConsumerConfig
cconsumer *C.struct_CPullConsumer
funcsMap sync.Map
}
-func (c *DefaultPullConsumer) String() string {
+func (c *defaultPullConsumer) String() string {
topics := ""
c.funcsMap.Range(func(key, value interface{}) bool {
topics += key.(string) + ", "
@@ -84,75 +79,105 @@
return fmt.Sprintf("[%+v, subcribed topics: [%s]]", c.PullConsumerConfig, topics)
}
-// NewPullConsumer creates one pull consumer
-func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
- if conf == nil {
+// NewPullConsumer creates a pull consumer
+func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error) {
+ if config == nil {
return nil, errors.New("config is nil")
}
+ if config.GroupID == "" {
+ return nil, errors.New("GroupId is empty")
+ }
- cs := C.CString(conf.GroupID)
+ if config.NameServer == "" && config.NameServerDomain == "" {
+ return nil, errors.New("NameServer and NameServerDomain is empty")
+ }
+
+ cs := C.CString(config.GroupID)
cconsumer := C.CreatePullConsumer(cs)
C.free(unsafe.Pointer(cs))
+ if cconsumer == nil {
+ return nil, errors.New("create PullConsumer failed")
+ }
- cs = C.CString(conf.NameServer)
- C.SetPullConsumerNameServerAddress(cconsumer, cs)
- C.free(unsafe.Pointer(cs))
-
- log := conf.LogC
- if log != nil {
- cs = C.CString(log.Path)
- if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
- return nil, errors.New("new pull consumer error:set log path failed")
- }
+ var err rmqError
+ if config.NameServer != "" {
+ cs = C.CString(config.NameServer)
+ err = rmqError(C.SetPullConsumerNameServerAddress(cconsumer, cs))
C.free(unsafe.Pointer(cs))
-
- if C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(log.FileNum), C.long(log.FileSize)) != 0 {
- return nil, errors.New("new pull consumer error:set log file num and size failed")
- }
- if C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(log.Level)) != 0 {
- return nil, errors.New("new pull consumer error:set log level failed")
+ if err != NIL {
+ return nil, err
}
}
- if conf.Credentials != nil {
- ak := C.CString(conf.Credentials.AccessKey)
- sk := C.CString(conf.Credentials.SecretKey)
- ch := C.CString(conf.Credentials.Channel)
- C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch)
+ if config.NameServerDomain != "" {
+ cs = C.CString(config.NameServerDomain)
+ err = rmqError(C.SetPullConsumerNameServerDomain(cconsumer, cs))
+ C.free(unsafe.Pointer(cs))
+ if err != NIL {
+ return nil, err
+ }
+ }
+ if config.Credentials != nil {
+ ak := C.CString(config.Credentials.AccessKey)
+ sk := C.CString(config.Credentials.SecretKey)
+ ch := C.CString(config.Credentials.Channel)
+ err = rmqError(C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch))
C.free(unsafe.Pointer(ak))
C.free(unsafe.Pointer(sk))
C.free(unsafe.Pointer(ch))
+ if err != NIL {
+ return nil, err
+ }
}
- return &DefaultPullConsumer{PullConsumerConfig: *conf, cconsumer: cconsumer}, nil
+ if config.LogC != nil {
+ cs = C.CString(config.LogC.Path)
+ err = rmqError(C.SetPullConsumerLogPath(cconsumer, cs))
+ C.free(unsafe.Pointer(cs))
+ if err != NIL {
+ return nil, err
+ }
+
+ err = rmqError(C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+ if err != NIL {
+ return nil, err
+ }
+
+ err = rmqError(C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+ if err != NIL {
+ return nil, err
+ }
+ }
+
+ return &defaultPullConsumer{PullConsumerConfig: *config, cconsumer: cconsumer}, nil
}
-// Start starts the pulling conumser
-func (c *DefaultPullConsumer) Start() error {
- r := C.StartPullConsumer(c.cconsumer)
- if r != 0 {
- return fmt.Errorf("start failed, code:%d", int(r))
+// Start starts the pulling consumer
+func (c *defaultPullConsumer) Start() error {
+ err := rmqError(C.StartPullConsumer(c.cconsumer))
+ if err != NIL {
+ return err
}
return nil
}
// Shutdown shutdown the pulling consumer
-func (c *DefaultPullConsumer) Shutdown() error {
- r := C.ShutdownPullConsumer(c.cconsumer)
- if r != 0 {
- return fmt.Errorf("shutdown failed, code:%d", int(r))
+func (c *defaultPullConsumer) Shutdown() error {
+ err := rmqError(C.ShutdownPullConsumer(c.cconsumer))
+ if err != NIL {
+ return err
}
- r = C.DestroyPullConsumer(c.cconsumer)
- if r != 0 {
- return fmt.Errorf("destory failed, code:%d", int(r))
+ err = rmqError(C.DestroyPullConsumer(c.cconsumer))
+ if err != NIL {
+ return err
}
return nil
}
// FetchSubscriptionMessageQueues returns the topic's consume queue
-func (c *DefaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
+func (c *defaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
var (
q *C.struct__CMessageQueue_
size C.int
@@ -191,7 +216,7 @@
}
// Pull pulling the message from the specified message queue
-func (c *DefaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
+func (c *defaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
cmq := C.struct__CMessageQueue_{
queueId: C.int(mq.ID),
}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index c39dc09..e7ee1d7 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -92,32 +92,31 @@
return nil, errors.New("Create PushConsumer failed")
}
- var code int
+ var err rmqError
if config.NameServer != "" {
cs = C.CString(config.NameServer)
- code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
+ err = rmqError(C.SetPushConsumerNameServerAddress(cconsumer, cs))
C.free(unsafe.Pointer(cs))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set NameServerAddress error, code is: %d", code)
+ if err != NIL {
+ return nil, err
}
}
if config.NameServerDomain != "" {
cs = C.CString(config.NameServerDomain)
- code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
+ err = rmqError(C.SetPushConsumerNameServerDomain(cconsumer, cs))
C.free(unsafe.Pointer(cs))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set NameServerDomain error, code is: %d", code)
+ if err != NIL {
+ return nil, err
}
}
if config.InstanceName != "" {
cs = C.CString(config.InstanceName)
- code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
+ err = rmqError(C.SetPushConsumerInstanceName(cconsumer, cs))
C.free(unsafe.Pointer(cs))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set InstanceName error, code is: %d, "+
- "please check cpp logs for details", code)
+ if err != NIL {
+ return nil, err
}
}
@@ -125,45 +124,45 @@
ak := C.CString(config.Credentials.AccessKey)
sk := C.CString(config.Credentials.SecretKey)
ch := C.CString(config.Credentials.Channel)
- code = int(C.SetPushConsumerSessionCredentials(cconsumer, ak, sk, ch))
+ err = rmqError(C.SetPushConsumerSessionCredentials(cconsumer, ak, sk, ch))
C.free(unsafe.Pointer(ak))
C.free(unsafe.Pointer(sk))
C.free(unsafe.Pointer(ch))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set Credentials error, code is: %d", int(code))
+ if err != NIL {
+ return nil, err
}
}
if config.LogC != nil {
cs = C.CString(config.LogC.Path)
- code = int(C.SetPushConsumerLogPath(cconsumer, cs))
+ err = rmqError(C.SetPushConsumerLogPath(cconsumer, cs))
C.free(unsafe.Pointer(cs))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set LogPath error, code is: %d", code)
+ if err != NIL {
+ return nil, err
}
- code = int(C.SetPushConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set FileNumAndSize error, code is: %d", code)
+ err = rmqError(C.SetPushConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+ if err != NIL {
+ return nil, err
}
- code = int(C.SetPushConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set LogLevel error, code is: %d", code)
+ err = rmqError(C.SetPushConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+ if err != NIL {
+ return nil, err
}
}
if config.ThreadCount > 0 {
- code = int(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set ThreadCount error, code is: %d", int(code))
+ err = rmqError(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
+ if err != NIL {
+ return nil, err
}
}
if config.MessageBatchMaxSize > 0 {
- code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code))
+ err = rmqError(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
+ if err != NIL {
+ return nil, err
}
}
@@ -175,18 +174,18 @@
case Clustering:
mode = C.CLUSTERING
}
- code = int(C.SetPushConsumerMessageModel(cconsumer, mode))
+ err = rmqError(C.SetPushConsumerMessageModel(cconsumer, mode))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code))
+ if err != NIL {
+ return nil, err
}
}
- code = int(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
+ err = rmqError(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
- if code != 0 {
- return nil, fmt.Errorf("PushConsumer RegisterMessageCallback error, code is: %d", int(code))
+ if err != NIL {
+ return nil, err
}
consumer.cconsumer = cconsumer
@@ -195,23 +194,23 @@
}
func (c *defaultPushConsumer) Start() error {
- code := C.StartPushConsumer(c.cconsumer)
- if code != 0 {
- return fmt.Errorf("start PushConsumer error, code is: %d", int(code))
+ err := rmqError(C.StartPushConsumer(c.cconsumer))
+ if err != NIL {
+ return err
}
return nil
}
func (c *defaultPushConsumer) Shutdown() error {
- code := C.ShutdownPushConsumer(c.cconsumer)
+ err := rmqError(C.ShutdownPushConsumer(c.cconsumer))
- if code != 0 {
- log.Warnf("Shutdown PushConsumer error, code is: %d, please check cpp logs for details", code)
+ if err != NIL {
+ return err
}
- C.DestroyPushConsumer(c.cconsumer)
- if code != 0 {
- log.Warnf("Destroy PushConsumer error, code is: %d, please check cpp logs for details", code)
+ err = rmqError(C.DestroyPushConsumer(c.cconsumer))
+ if err != NIL {
+ return err
}
return nil
}
@@ -221,9 +220,9 @@
if exist {
return nil
}
- code := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
- if code != 0 {
- return fmt.Errorf("subscribe topic: %s failed, error code is: %d", topic, int(code))
+ err := rmqError(C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression)))
+ if err != NIL {
+ return err
}
c.funcsMap.Store(topic, consumeFunc)
log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
diff --git a/examples/main.go b/examples/main.go
new file mode 100644
index 0000000..72a2a68
--- /dev/null
+++ b/examples/main.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "github.com/apache/rocketmq-client-go/core"
+ "gopkg.in/alecthomas/kingpin.v2"
+ "os"
+)
+
+var (
+ rmq = kingpin.New("rocketmq", "RocketMQ cmd tools")
+ namesrv = rmq.Flag("namesrv", "NameServer address.").Default("localhost:9876").Short('n').String()
+ topic = rmq.Flag("topic", "topic name.").Short('t').Required().String()
+ gid = rmq.Flag("groupId", "group Id").Short('g').Default("testGroup").String()
+ amount = rmq.Flag("amount", "how many message to produce or consume").Default("64").Short('a').Int()
+
+ produce = rmq.Command("produce", "send messages to RocketMQ")
+ body = produce.Flag("body", "message body").Short('b').Required().String()
+ workerCount = produce.Flag("workerCount", "works of send message with orderly").Default("1").Short('w').Int()
+ orderly = produce.Flag("orderly", "send msg orderly").Short('o').Bool()
+
+ consume = rmq.Command("consume", "consumes message from RocketMQ")
+)
+
+func main() {
+ switch kingpin.MustParse(rmq.Parse(os.Args[1:])) {
+ case produce.FullCommand():
+ pConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
+ GroupID: *gid,
+ NameServer: *namesrv,
+ LogC: &rocketmq.LogConfig{
+ Path: "example",
+ FileSize: 64 * 1 << 10,
+ FileNum: 1,
+ Level: rocketmq.LogLevelDebug,
+ },
+ }}
+ if *orderly {
+ sendMessageOrderly(pConfig)
+ } else {
+ sendMessage(pConfig)
+ }
+ case consume.FullCommand():
+ cConfig := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
+ GroupID: *gid,
+ NameServer: *namesrv,
+ LogC: &rocketmq.LogConfig{
+ Path: "example",
+ FileSize: 64 * 1 << 10,
+ FileNum: 1,
+ Level: rocketmq.LogLevelInfo,
+ },
+ }}
+
+ ConsumeWithPush(cConfig)
+ }
+}
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
deleted file mode 100644
index f3d70c7..0000000
--- a/examples/orderproducer/producer.go
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package main
-
-import (
- "flag"
- "fmt"
- "sync"
- "sync/atomic"
-
- rocketmq "github.com/apache/rocketmq-client-go/core"
-)
-
-type queueSelectorByOrderID struct{}
-
-func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interface{}) int {
- return arg.(int) % size
-}
-
-var (
- namesrvAddrs string
- topic string
- body string
- groupID string
- msgCount int64
- workerCount int
-)
-
-func init() {
- flag.StringVar(&namesrvAddrs, "n", "", "name server address")
- flag.StringVar(&topic, "t", "", "topic")
- flag.StringVar(&groupID, "g", "", "group")
- flag.StringVar(&body, "d", "", "body")
- flag.Int64Var(&msgCount, "m", 0, "message count")
- flag.IntVar(&workerCount, "w", 0, "worker count")
-}
-
-type worker struct {
- p rocketmq.Producer
- leftMsgCount *int64
-}
-
-func (w *worker) run() {
- selector := queueSelectorByOrderID{}
- for atomic.AddInt64(w.leftMsgCount, -1) >= 0 {
- r := w.p.SendMessageOrderly(
- &rocketmq.Message{Topic: topic, Body: body}, selector, 7 /*orderID*/, 3,
- )
- fmt.Printf("send result:%+v\n", r)
- }
-}
-
-// example:
-// ./producer -n "localhost:9876" -t local_test -g local_test -d data -m 100 -w 10
-func main() {
- flag.Parse()
-
- if namesrvAddrs == "" {
- println("empty namesrv address")
- return
- }
-
- if topic == "" {
- println("empty topic")
- return
- }
-
- if body == "" {
- println("empty body")
- return
- }
-
- if groupID == "" {
- println("empty groupID")
- return
- }
-
- if msgCount == 0 {
- println("zero message count")
- return
- }
-
- if workerCount == 0 {
- println("zero worker count")
- return
- }
-
- cfg := &rocketmq.ProducerConfig{}
- cfg.GroupID = groupID
- cfg.NameServer = namesrvAddrs
-
- producer, err := rocketmq.NewProducer(cfg)
- if err != nil {
- fmt.Println("create Producer failed, error:", err)
- return
- }
-
- producer.Start()
- defer producer.Shutdown()
-
- wg := sync.WaitGroup{}
- wg.Add(workerCount)
-
- workers := make([]worker, workerCount)
- for i := range workers {
- workers[i].p = producer
- workers[i].leftMsgCount = &msgCount
- }
-
- for i := range workers {
- go func(w *worker) { w.run(); wg.Done() }(&workers[i])
- }
-
- wg.Wait()
-}
diff --git a/examples/producer.go b/examples/producer.go
index 33eab07..01e8105 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -22,24 +22,29 @@
"github.com/apache/rocketmq-client-go/core"
)
-func main() {
- cfg := &rocketmq.ProducerConfig{}
- cfg.GroupID = "testGroup"
- cfg.NameServer = "localhost:9876"
- producer, err := rocketmq.NewProducer(cfg)
+func sendMessage(config *rocketmq.ProducerConfig) {
+ producer, err := rocketmq.NewProducer(config)
+
if err != nil {
fmt.Println("create Producer failed, error:", err)
return
}
- producer.Start()
+ err = producer.Start()
+ if err != nil {
+ fmt.Println("start producer error", err)
+ return
+ }
defer producer.Shutdown()
fmt.Printf("Producer: %s started... \n", producer)
- for i := 0; i < 100; i++ {
- msg := fmt.Sprintf("Hello RocketMQ-%d", i)
- result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
- fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
+ for i := 0; i < *amount; i++ {
+ msg := fmt.Sprintf("%s-%d", *body, i)
+ result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+ if err != nil {
+ fmt.Println("Error:", err)
+ }
+ fmt.Printf("send message: %s result: %s\n", msg, result)
}
fmt.Println("shutdown producer.")
}
diff --git a/examples/producer_orderly.go b/examples/producer_orderly.go
new file mode 100644
index 0000000..9943f5b
--- /dev/null
+++ b/examples/producer_orderly.go
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ "github.com/apache/rocketmq-client-go/core"
+)
+
+type queueSelectorByOrderID struct{}
+
+func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interface{}) int {
+ return arg.(int) % size
+}
+
+type worker struct {
+ p rocketmq.Producer
+ leftMsgCount int64
+}
+
+func (w *worker) run() {
+ selector := queueSelectorByOrderID{}
+ for atomic.AddInt64(&w.leftMsgCount, -1) >= 0 {
+ r, err := w.p.SendMessageOrderly(
+ &rocketmq.Message{Topic: *topic, Body: *body}, selector, 7 /*orderID*/, 3,
+ )
+ if err != nil {
+ println("Send Orderly Error:", err)
+ }
+ fmt.Printf("send orderly result:%+v\n", r)
+ }
+}
+
+func sendMessageOrderly(config *rocketmq.ProducerConfig) {
+ producer, err := rocketmq.NewProducer(config)
+ if err != nil {
+ fmt.Println("create Producer failed, error:", err)
+ return
+ }
+
+ producer.Start()
+ defer producer.Shutdown()
+
+ wg := sync.WaitGroup{}
+ wg.Add(*workerCount)
+
+ workers := make([]worker, *workerCount)
+ for i := range workers {
+ workers[i].p = producer
+ workers[i].leftMsgCount = (int64)(*amount)
+ }
+
+ for i := range workers {
+ go func(w *worker) { w.run(); wg.Done() }(&workers[i])
+ }
+
+ wg.Wait()
+}
diff --git a/examples/pullconsumer/consumer.go b/examples/pull_consumer.go
similarity index 69%
rename from examples/pullconsumer/consumer.go
rename to examples/pull_consumer.go
index d63729e..1b209c0 100644
--- a/examples/pullconsumer/consumer.go
+++ b/examples/pull_consumer.go
@@ -18,55 +18,15 @@
package main
import (
- "flag"
"fmt"
"time"
- rocketmq "github.com/apache/rocketmq-client-go/core"
+ "github.com/apache/rocketmq-client-go/core"
)
-var (
- namesrvAddrs string
- topic string
- body string
- groupID string
- msgCount int64
- workerCount int
-)
+func ConsumeWithPull(config *rocketmq.PullConsumerConfig, topic string) {
-func init() {
- flag.StringVar(&namesrvAddrs, "n", "", "name server address")
- flag.StringVar(&topic, "t", "", "topic")
- flag.StringVar(&groupID, "g", "", "group")
-}
-
-// ./consumer -n "localhost:9876" -t test -g local_test
-func main() {
- flag.Parse()
-
- if namesrvAddrs == "" {
- fmt.Println("empty namesrv")
- return
- }
-
- if topic == "" {
- fmt.Println("empty topic")
- return
- }
-
- if groupID == "" {
- fmt.Println("empty groupID")
- return
- }
-
- cfg := &rocketmq.PullConsumerConfig{}
- cfg.GroupID = groupID
- cfg.NameServer = namesrvAddrs
- cfg.LogC = &rocketmq.LogConfig{
- Path: "example",
- }
-
- consumer, err := rocketmq.NewPullConsumer(cfg)
+ consumer, err := rocketmq.NewPullConsumer(config)
if err != nil {
fmt.Printf("new pull consumer error:%s\n", err)
return
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index d17559c..723e32f 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -20,31 +20,40 @@
import (
"fmt"
"github.com/apache/rocketmq-client-go/core"
- "time"
+ "sync/atomic"
)
-func main() {
- fmt.Println("Start Receiving Messages...")
- cfg := &rocketmq.PushConsumerConfig{
- ThreadCount: 2,
- MessageBatchMaxSize: 16,
- }
- cfg.GroupID = "testGroupId"
- cfg.NameServer = "localhost:9876"
- consumer, err := rocketmq.NewPushConsumer(cfg)
+func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
+
+ consumer, err := rocketmq.NewPushConsumer(config)
if err != nil {
- fmt.Println("create Consumer failed, error:", err)
+ println("create Consumer failed, error:", err)
return
}
+ ch := make(chan interface{})
+ var count = (int64)(*amount)
// MUST subscribe topic before consumer started.
consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
fmt.Printf("A message received: \"%s\" \n", msg.Body)
+ if atomic.AddInt64(&count, -1) <= 0 {
+ ch <- "quit"
+ }
return rocketmq.ConsumeSuccess
})
- consumer.Start()
- defer consumer.Shutdown()
+ err = consumer.Start()
+ if err != nil {
+ println("consumer start failed,", err)
+ return
+ }
+
fmt.Printf("consumer: %s started...\n", consumer)
- time.Sleep(10 * time.Minute)
+ <-ch
+ err = consumer.Shutdown()
+ if err != nil {
+ println("consumer shutdown failed")
+ return
+ }
+ println("consumer has shutdown.")
}