Extract common fields to clientConfig
diff --git a/cmd/producer.go b/cmd/producer.go
index df8712a..b67f50d 100644
--- a/cmd/producer.go
+++ b/cmd/producer.go
@@ -11,7 +11,7 @@
topic string
body string
groupID string
- keys string
+ keys string
)
func init() {
@@ -23,7 +23,7 @@
}
-func main() {
+func main() {
flag.Parse()
if namesrvAddrs == "" {
println("empty nameServer address")
@@ -45,10 +45,14 @@
return
}
- producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{GroupID: groupID, NameServer: namesrvAddrs})
+ cfg := &rocketmq.ProducerConfig{}
+ cfg.GroupID = groupID
+ cfg.NameServer = namesrvAddrs
+
+ producer, _ := rocketmq.NewProducer(cfg)
producer.Start()
defer producer.Shutdown()
result := producer.SendMessageSync(&rocketmq.Message{Topic: topic, Body: body, Keys: keys})
println(fmt.Sprintf("send message result: %s", result))
-}
\ No newline at end of file
+}
diff --git a/core/api.go b/core/api.go
index a159c87..8ad6af0 100644
--- a/core/api.go
+++ b/core/api.go
@@ -22,6 +22,16 @@
return GetVersion()
}
+type clientConfig struct {
+ GroupID string
+ NameServer string
+ NameServerDomain string
+ GroupName string
+ InstanceName string
+ Credentials *SessionCredentials
+ LogC *LogConfig
+}
+
// NewProducer create a new producer with config
func NewProducer(config *ProducerConfig) (Producer, error) {
return newDefaultProducer(config)
@@ -29,14 +39,7 @@
// ProducerConfig define a producer
type ProducerConfig struct {
- GroupID string
- NameServer string
- NameServerDomain string
- GroupName string
- InstanceName string
- Credentials *SessionCredentials
-
- logC *LogConfig
+ clientConfig
SendMsgTimeout int
CompressLevel int
MaxMessageSize int
@@ -67,15 +70,9 @@
return newPushConsumer(config)
}
-// ConsumerConfig define a new conusmer.
+// PushConsumerConfig define a new consumer.
type PushConsumerConfig struct {
- GroupID string
- NameServer string
- NameServerDomain string
- InstanceName string
- Credentials *SessionCredentials
-
- logC *LogConfig
+ clientConfig
ThreadCount int
MessageBatchMaxSize int
Model MessageModel
@@ -97,6 +94,7 @@
// PullConsumer consumer pulling the message
type PullConsumer interface {
baseAPI
+
// Pull returns the messages from the consume queue by specify the offset and the max number
Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult
diff --git a/core/log_test.go b/core/log_test.go
index f06d1c7..8c4a449 100644
--- a/core/log_test.go
+++ b/core/log_test.go
@@ -6,7 +6,7 @@
)
func TestLogConfig_String(t *testing.T) {
- logc := LogConfig{Path: "/log/path1", FileNum: 3, FileSize: 1 << 20, Level:LogLevelDebug}
+ logc := LogConfig{Path: "/log/path1", FileNum: 3, FileSize: 1 << 20, Level: LogLevelDebug}
assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Debug}", logc.String())
logc.Level = LogLevelFatal
assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Fatal}", logc.String())
diff --git a/core/message.go b/core/message.go
index 76ec80c..8e75d18 100644
--- a/core/message.go
+++ b/core/message.go
@@ -47,7 +47,7 @@
var cmsg = C.CreateMessage(cs)
C.free(unsafe.Pointer(cs))
- cs =C.CString(gomsg.Tags)
+ cs = C.CString(gomsg.Tags)
C.SetMessageTags(cmsg, cs)
C.free(unsafe.Pointer(cs))
@@ -88,7 +88,7 @@
}
func (msgExt *MessageExt) String() string {
- return fmt.Sprintf("[MessageId: %s, %s, QueueId: %d, ReconsumeTimes: %d, StoreSize: %d, BornTimestamp: %d, " +
+ return fmt.Sprintf("[MessageId: %s, %s, QueueId: %d, ReconsumeTimes: %d, StoreSize: %d, BornTimestamp: %d, "+
"StoreTimestamp: %d, QueueOffset: %d, CommitLogOffset: %d, PreparedTransactionOffset: %d]", msgExt.MessageID,
msgExt.Message.String(), msgExt.QueueId, msgExt.ReconsumeTimes, msgExt.StoreSize, msgExt.BornTimestamp,
msgExt.StoreTimestamp, msgExt.QueueOffset, msgExt.CommitLogOffset, msgExt.PreparedTransactionOffset)
diff --git a/core/message_test.go b/core/message_test.go
index a86b81b..1df6ade 100644
--- a/core/message_test.go
+++ b/core/message_test.go
@@ -23,11 +23,11 @@
func TestMessage_String(t *testing.T) {
msg := Message{
- Topic: "testTopic",
- Tags:"TagA, TagB",
- Keys:"Key1, Key2",
- Body:"Body1234567890",
- DelayTimeLevel: 8,}
+ Topic: "testTopic",
+ Tags: "TagA, TagB",
+ Keys: "Key1, Key2",
+ Body: "Body1234567890",
+ DelayTimeLevel: 8}
expect := "[Topic: testTopic, Tags: TagA, TagB, Keys: Key1, Key2, Body: Body1234567890, DelayTimeLevel: 8," +
" Property: map[]]"
assert.Equal(t, expect, msg.String())
@@ -35,21 +35,21 @@
func TestMessageExt_String(t *testing.T) {
msg := Message{
- Topic: "testTopic",
- Tags:"TagA, TagB",
- Keys:"Key1, Key2",
- Body:"Body1234567890",
- DelayTimeLevel: 8,}
+ Topic: "testTopic",
+ Tags: "TagA, TagB",
+ Keys: "Key1, Key2",
+ Body: "Body1234567890",
+ DelayTimeLevel: 8}
msgExt := MessageExt{
- Message: msg,
- MessageID: "messageId",
- QueueId: 2,
- ReconsumeTimes: 13,
- StoreSize: 1 << 10,
- BornTimestamp: int64(1234567890897),
- StoreTimestamp: int64(1234567890),
- QueueOffset: int64(1234567890),
- CommitLogOffset: int64(1234567890),
+ Message: msg,
+ MessageID: "messageId",
+ QueueId: 2,
+ ReconsumeTimes: 13,
+ StoreSize: 1 << 10,
+ BornTimestamp: int64(1234567890897),
+ StoreTimestamp: int64(1234567890),
+ QueueOffset: int64(1234567890),
+ CommitLogOffset: int64(1234567890),
PreparedTransactionOffset: int64(1234567890),
}
expect := "[MessageId: messageId, [Topic: testTopic, Tags: TagA, TagB, Keys: Key1, Key2, " +
diff --git a/core/producer.go b/core/producer.go
index bd326a5..77ed63b 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -63,6 +63,10 @@
}
func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
+ if config == nil {
+ return nil, errors.New("config is nil")
+ }
+
if config.GroupID == "" {
return nil, errors.New("GroupId is empty")
}
@@ -71,12 +75,11 @@
return nil, errors.New("NameServer and NameServerDomain is empty")
}
-
producer := &defaultProducer{config: config}
cs := C.CString(config.GroupID)
cproduer := C.CreateProducer(cs)
C.free(unsafe.Pointer(cs))
-
+
if cproduer == nil {
return nil, errors.New("create Producer failed, please check cpp logs for details")
}
@@ -87,7 +90,7 @@
code = int(C.SetProducerNameServerAddress(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set NameServerAddress error, code is: %d" +
+ return nil, errors.New(fmt.Sprintf("Producer Set NameServerAddress error, code is: %d"+
"please check cpp logs for details", code))
}
}
@@ -97,7 +100,7 @@
code = int(C.SetProducerNameServerDomain(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set NameServerDomain error, code is: %d" +
+ return nil, errors.New(fmt.Sprintf("Producer Set NameServerDomain error, code is: %d"+
"please check cpp logs for details", code))
}
}
@@ -107,7 +110,7 @@
code = int(C.SetProducerInstanceName(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("Producer Set InstanceName error, code is: %d" +
+ return nil, errors.New(fmt.Sprintf("Producer Set InstanceName error, code is: %d"+
"please check cpp logs for details", code))
}
}
@@ -126,20 +129,20 @@
}
}
- if config.logC != nil {
- cs = C.CString(config.logC.Path)
+ if config.LogC != nil {
+ cs = C.CString(config.LogC.Path)
code = int(C.SetProducerLogPath(cproduer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
}
- code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.logC.FileNum), C.long(config.logC.FileSize)))
+ code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
if code != 0 {
return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
}
- code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.logC.Level)))
+ code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
if code != 0 {
return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
}
@@ -183,7 +186,7 @@
func (p *defaultProducer) Start() error {
code := int(C.StartProducer(p.cproduer))
if code != 0 {
- return errors.New(fmt.Sprintf("start producer error, error code is: %d", code))
+ return errors.New(fmt.Sprintf("start producer error, error code is: %d", code))
}
return nil
}
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index 5796dc0..6dcac43 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -65,10 +65,7 @@
// PullConsumerConfig the configuration for the pull consumer
type PullConsumerConfig struct {
- GroupID string
- NameServer string
- Credentials *SessionCredentials
- Log *LogConfig
+ clientConfig
}
// DefaultPullConsumer default consumer pulling the message
@@ -89,6 +86,10 @@
// NewPullConsumer creates one pull consumer
func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
+ if conf == nil {
+ return nil, errors.New("config is nil")
+ }
+
cs := C.CString(conf.GroupID)
cconsumer := C.CreatePullConsumer(cs)
C.free(unsafe.Pointer(cs))
@@ -97,7 +98,7 @@
C.SetPullConsumerNameServerAddress(cconsumer, cs)
C.free(unsafe.Pointer(cs))
- log := conf.Log
+ log := conf.LogC
if log != nil {
cs = C.CString(log.Path)
if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
diff --git a/core/push_consumer.go b/core/push_consumer.go
index f332e3f..5f63b16 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -74,6 +74,9 @@
}
func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
+ if config == nil {
+ return nil, errors.New("config is nil")
+ }
if config.GroupID == "" {
return nil, errors.New("GroupId is empty.")
}
@@ -97,7 +100,7 @@
code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf(fmt.Sprintf("PushConsumer Set NameServerAddress error, code is: %d", code)))
+ return nil, errors.New(fmt.Sprintf(fmt.Sprintf("PushConsumer Set NameServerAddress error, code is: %d", code)))
}
}
@@ -106,7 +109,7 @@
code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set NameServerDomain error, code is: %d", code))
+ return nil, errors.New(fmt.Sprintf("PushConsumer Set NameServerDomain error, code is: %d", code))
}
}
@@ -115,7 +118,7 @@
code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
- return nil, errors.New(fmt.Sprintf("PushConsumer Set InstanceName error, code is: %d, " +
+ return nil, errors.New(fmt.Sprintf("PushConsumer Set InstanceName error, code is: %d, "+
"please check cpp logs for details", code))
}
}
@@ -133,20 +136,20 @@
}
}
- if config.logC != nil {
- cs = C.CString(config.logC.Path)
+ if config.LogC != nil {
+ cs = C.CString(config.LogC.Path)
code = int(C.SetProducerLogPath(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if code != 0 {
return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
}
- code = int(C.SetProducerLogFileNumAndSize(cconsumer, C.int(config.logC.FileNum), C.long(config.logC.FileSize)))
+ code = int(C.SetProducerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
if code != 0 {
return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
}
- code = int(C.SetProducerLogLevel(cconsumer, C.CLogLevel(config.logC.Level)))
+ code = int(C.SetProducerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
if code != 0 {
return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
}
@@ -185,7 +188,7 @@
func (c *defaultPushConsumer) Start() error {
code := C.StartPushConsumer(c.cconsumer)
- if code != 0{
+ if code != 0 {
return errors.New(fmt.Sprintf("start PushConsumer error, code is: %d", int(code)))
}
return nil
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
index 11fb864..5559d87 100644
--- a/examples/orderproducer/producer.go
+++ b/examples/orderproducer/producer.go
@@ -83,11 +83,16 @@
return
}
- producer := rocketmq.NewProducer(&rocketmq.ProducerConfig{
- GroupID: groupID,
- NameServer: namesrvAddrs,
- })
-
+ cfg := &rocketmq.ProducerConfig{}
+ cfg.GroupID = groupID
+ cfg.NameServer = namesrvAddrs
+
+ producer, err := rocketmq.NewProducer(cfg)
+ if err != nil {
+ fmt.Println("create Producer failed, error:", err)
+ return
+ }
+
producer.Start()
defer producer.Shutdown()
diff --git a/examples/producer.go b/examples/producer.go
index 62ddfa0..56ecee2 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -24,7 +24,15 @@
)
func main() {
- producer := rocketmq.NewProducer(&rocketmq.ProducerConfig{GroupID: "testGroup", NameServer: "localhost:9876"})
+ cfg := &rocketmq.ProducerConfig{}
+ cfg.GroupID = "testGroup"
+ cfg.NameServer = "47.101.55.250:9876"
+ producer, err := rocketmq.NewProducer(cfg)
+ if err != nil {
+ fmt.Println("create Producer failed, error:", err)
+ return
+ }
+
producer.Start()
defer producer.Shutdown()
@@ -36,4 +44,3 @@
}
time.Sleep(10 * time.Second)
}
-
diff --git a/examples/pullconsumer/consumer.go b/examples/pullconsumer/consumer.go
index 3643a22..d63729e 100644
--- a/examples/pullconsumer/consumer.go
+++ b/examples/pullconsumer/consumer.go
@@ -59,13 +59,14 @@
return
}
- consumer, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
- GroupID: groupID,
- NameServer: namesrvAddrs,
- Log: &rocketmq.LogConfig{
- Path: "example",
- },
- })
+ cfg := &rocketmq.PullConsumerConfig{}
+ cfg.GroupID = groupID
+ cfg.NameServer = namesrvAddrs
+ cfg.LogC = &rocketmq.LogConfig{
+ Path: "example",
+ }
+
+ consumer, err := rocketmq.NewPullConsumer(cfg)
if err != nil {
fmt.Printf("new pull consumer error:%s\n", err)
return
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index 590e5a5..d17559c 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -25,8 +25,17 @@
func main() {
fmt.Println("Start Receiving Messages...")
- consumer, _ := rocketmq.NewPushConsumer(&rocketmq.PushConsumerConfig{GroupID: "testGroupId", NameServer: "localhost:9876",
- ThreadCount: 2, MessageBatchMaxSize: 16})
+ cfg := &rocketmq.PushConsumerConfig{
+ ThreadCount: 2,
+ MessageBatchMaxSize: 16,
+ }
+ cfg.GroupID = "testGroupId"
+ cfg.NameServer = "localhost:9876"
+ consumer, err := rocketmq.NewPushConsumer(cfg)
+ if err != nil {
+ fmt.Println("create Consumer failed, error:", err)
+ return
+ }
// MUST subscribe topic before consumer started.
consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {