fix(client):fix subscribe method is not based on the last subscription and remove useless client config GroupName (#251)
diff --git a/core/api.go b/core/api.go
index 9bb69a5..b0273e0 100644
--- a/core/api.go
+++ b/core/api.go
@@ -29,7 +29,6 @@
GroupID string
NameServer string
NameServerDomain string
- GroupName string
InstanceName string
Credentials *SessionCredentials
LogC *LogConfig
@@ -41,7 +40,6 @@
str = strJoin(str, "GroupId", config.GroupID)
str = strJoin(str, "NameServer", config.NameServer)
str = strJoin(str, "NameServerDomain", config.NameServerDomain)
- str = strJoin(str, "GroupName", config.GroupName)
str = strJoin(str, "InstanceName", config.InstanceName)
if config.LogC != nil {
diff --git a/core/api_test.go b/core/api_test.go
index 0c09dae..d7e9bbb 100644
--- a/core/api_test.go
+++ b/core/api_test.go
@@ -26,7 +26,6 @@
pConfig.GroupID = "testGroup"
pConfig.NameServer = "localhost:9876"
pConfig.NameServerDomain = "domain1"
- pConfig.GroupName = "producerGroupName"
pConfig.InstanceName = "testProducer"
pConfig.LogC = &LogConfig{
Path: "/rocketmq/log",
@@ -38,10 +37,9 @@
pConfig.MaxMessageSize = 1024
pConfig.ProducerModel = CommonProducer
- expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
- "GroupName: producerGroupName, InstanceName: testProducer, " +
- "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, S" +
- "endMsgTimeout: 30, CompressLevel: 4, MaxMessageSize: 1024, ProducerModel: CommonProducer, ]"
+ expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, InstanceName: testProducer, " +
+ "LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, " +
+ "SendMsgTimeout: 30, CompressLevel: 4, MaxMessageSize: 1024, ProducerModel: CommonProducer, ]"
assert.Equal(t, expect, pConfig.String())
}
@@ -49,7 +47,6 @@
pcConfig := PushConsumerConfig{}
pcConfig.GroupID = "testGroup"
pcConfig.NameServer = "localhost:9876"
- pcConfig.GroupName = "consumerGroupName"
pcConfig.InstanceName = "testPushConsumer"
pcConfig.LogC = &LogConfig{
Path: "/rocketmq/log",
@@ -58,14 +55,13 @@
Level: LogLevelDebug}
pcConfig.ThreadCount = 4
pcConfig.MessageBatchMaxSize = 1024
- expect := "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, GroupName: consumerGroupName, " +
+ expect := "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, " +
"InstanceName: testPushConsumer, LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, " +
"ThreadCount: 4, MessageBatchMaxSize: 1024, ]"
assert.Equal(t, expect, pcConfig.String())
pcConfig.NameServerDomain = "domain1"
- expect = "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
- "GroupName: consumerGroupName, InstanceName: testPushConsumer, " +
+ expect = "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, InstanceName: testPushConsumer, " +
"LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, ThreadCount: 4, MessageBatchMaxSize: 1024, ]"
assert.Equal(t, expect, pcConfig.String())
@@ -74,8 +70,7 @@
pcConfig.ConsumerModel = CoCurrently
pcConfig.MaxCacheMessageSize = 1024
pcConfig.MaxCacheMessageSizeInMB = 2048
- expect = "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
- "GroupName: consumerGroupName, InstanceName: testPushConsumer, " +
+ expect = "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, InstanceName: testPushConsumer, " +
"LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, ThreadCount: 4," +
" MessageBatchMaxSize: 32, MessageModel: Clustering, ConsumerModel: CoCurrently," +
" MaxCacheMessageSize: 1024, MaxCacheMessageSizeInMB: 2048, ]"
diff --git a/core/push_consumer.go b/core/push_consumer.go
index 32e47f3..bd7b68a 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -94,7 +94,7 @@
C.free(unsafe.Pointer(cs))
if cconsumer == nil {
- return nil, errors.New("Create PushConsumer failed")
+ return nil, errors.New("create PushConsumer failed")
}
var err rmqError
@@ -242,9 +242,8 @@
}
func (c *defaultPushConsumer) Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error {
- _, exist := c.funcsMap.Load(topic)
- if exist {
- return nil
+ if consumeFunc == nil {
+ return errors.New("consumeFunc is nil")
}
err := rmqError(C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression)))
if err != NIL {