Merge pull request #560 from beiwei30/subscribe-after-start
allow further subscription even after client starts, to align with Java SDK's behavior
diff --git a/consumer/option.go b/consumer/option.go
index 07b4246..330f6c4 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -254,8 +254,8 @@
}
}
-// WithNsResovler set nameserver resolver to fetch nameserver addr
-func WithNsResovler(resolver primitive.NsResolver) Option {
+// WithNsResolver set nameserver resolver to fetch nameserver addr
+func WithNsResolver(resolver primitive.NsResolver) Option {
return func(options *consumerOptions) {
options.Resolver = resolver
}
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index 7cb5fca..4789e9b 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -36,7 +36,7 @@
Convey("test Start method", t, func() {
c, _ := NewPushConsumer(
WithGroupName("testGroup"),
- WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithConsumerModel(BroadCasting),
)
diff --git a/docs/Introduction.md b/docs/Introduction.md
index 2132b91..a4e954e 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -30,7 +30,7 @@
```
p, err := rocketmq.NewProducer(
producer.WithNameServer(endPoint),
- //producer.WithNsResovler(primitive.NewPassthroughResolver(endPoint)),
+ //producer.WithNsResolver(primitive.NewPassthroughResolver(endPoint)),
producer.WithRetry(2),
producer.WithGroupName("GID_xxxxxx"),
)
diff --git a/examples/consumer/acl/main.go b/examples/consumer/acl/main.go
index a6535fd..96d0dd0 100644
--- a/examples/consumer/acl/main.go
+++ b/examples/consumer/acl/main.go
@@ -31,7 +31,7 @@
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
diff --git a/examples/consumer/broadcast/main.go b/examples/consumer/broadcast/main.go
index 29b0b12..219ab3e 100644
--- a/examples/consumer/broadcast/main.go
+++ b/examples/consumer/broadcast/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerModel(consumer.BroadCasting),
)
diff --git a/examples/consumer/delay/main.go b/examples/consumer/delay/main.go
index 8cc5c04..1bbe6ed 100644
--- a/examples/consumer/delay/main.go
+++ b/examples/consumer/delay/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
diff --git a/examples/consumer/interceptor/main.go b/examples/consumer/interceptor/main.go
index 1036c6a..83f43b0 100644
--- a/examples/consumer/interceptor/main.go
+++ b/examples/consumer/interceptor/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
diff --git a/examples/consumer/namespace/main.go b/examples/consumer/namespace/main.go
index e1b9dea..d46f210 100644
--- a/examples/consumer/namespace/main.go
+++ b/examples/consumer/namespace/main.go
@@ -31,7 +31,7 @@
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
diff --git a/examples/consumer/orderly/main.go b/examples/consumer/orderly/main.go
index 9e7a810..0f33e5f 100644
--- a/examples/consumer/orderly/main.go
+++ b/examples/consumer/orderly/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
index d740b15..3d49e7e 100644
--- a/examples/consumer/pull/main.go
+++ b/examples/consumer/pull/main.go
@@ -31,7 +31,7 @@
func main() {
c, err := rocketmq.NewPullConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
if err != nil {
rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)
diff --git a/examples/consumer/retry/concurrent/main.go b/examples/consumer/retry/concurrent/main.go
index 5fcf489..49f16d1 100644
--- a/examples/consumer/retry/concurrent/main.go
+++ b/examples/consumer/retry/concurrent/main.go
@@ -37,7 +37,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
)
diff --git a/examples/consumer/retry/order/main.go b/examples/consumer/retry/order/main.go
index 4ec05e7..e2d13bf 100644
--- a/examples/consumer/retry/order/main.go
+++ b/examples/consumer/retry/order/main.go
@@ -36,7 +36,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
diff --git a/examples/consumer/simple/main.go b/examples/consumer/simple/main.go
index 7d1a0b7..70f35b3 100644
--- a/examples/consumer/simple/main.go
+++ b/examples/consumer/simple/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
diff --git a/examples/consumer/strategy/main.go b/examples/consumer/strategy/main.go
index 502524c..269ce11 100644
--- a/examples/consumer/strategy/main.go
+++ b/examples/consumer/strategy/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithStrategy(consumer.AllocateByAveragely),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
diff --git a/examples/consumer/tag/main.go b/examples/consumer/tag/main.go
index 0532971..ec16f51 100644
--- a/examples/consumer/tag/main.go
+++ b/examples/consumer/tag/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
selector := consumer.MessageSelector{
Type: consumer.TAG,
diff --git a/examples/consumer/trace/main.go b/examples/consumer/trace/main.go
index 4daaa9f..97f22ed 100644
--- a/examples/consumer/trace/main.go
+++ b/examples/consumer/trace/main.go
@@ -37,7 +37,7 @@
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithTrace(traceCfg),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
diff --git a/examples/producer/acl/main.go b/examples/producer/acl/main.go
index 38d61dc..96881d3 100644
--- a/examples/producer/acl/main.go
+++ b/examples/producer/acl/main.go
@@ -30,7 +30,7 @@
func main() {
p, err := rocketmq.NewProducer(
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
diff --git a/examples/producer/async/main.go b/examples/producer/async/main.go
index aa73881..2a5182c 100644
--- a/examples/producer/async/main.go
+++ b/examples/producer/async/main.go
@@ -31,7 +31,7 @@
// Package main implements a async producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithQueueSelector(producer.NewManualQueueSelector()))
diff --git a/examples/producer/batch/main.go b/examples/producer/batch/main.go
index d807daf..dc591a1 100644
--- a/examples/producer/batch/main.go
+++ b/examples/producer/batch/main.go
@@ -30,7 +30,7 @@
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
diff --git a/examples/producer/delay/main.go b/examples/producer/delay/main.go
index 465e97c..aadbb70 100644
--- a/examples/producer/delay/main.go
+++ b/examples/producer/delay/main.go
@@ -29,7 +29,7 @@
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
diff --git a/examples/producer/interceptor/main.go b/examples/producer/interceptor/main.go
index 47ea1fb..c40f9a0 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -30,7 +30,7 @@
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithInterceptor(UserFirstInterceptor(), UserSecondInterceptor()),
)
diff --git a/examples/producer/namespace/main.go b/examples/producer/namespace/main.go
index 524bd32..9124284 100644
--- a/examples/producer/namespace/main.go
+++ b/examples/producer/namespace/main.go
@@ -30,7 +30,7 @@
func main() {
p, err := rocketmq.NewProducer(
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
diff --git a/examples/producer/simple/main.go b/examples/producer/simple/main.go
index 8ac4421..6fd3364 100644
--- a/examples/producer/simple/main.go
+++ b/examples/producer/simple/main.go
@@ -31,7 +31,7 @@
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
diff --git a/examples/producer/tag/main.go b/examples/producer/tag/main.go
index 2bcd51b..7ce8559 100644
--- a/examples/producer/tag/main.go
+++ b/examples/producer/tag/main.go
@@ -29,7 +29,7 @@
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
diff --git a/examples/producer/trace/main.go b/examples/producer/trace/main.go
index e6a25a9..b741704 100644
--- a/examples/producer/trace/main.go
+++ b/examples/producer/trace/main.go
@@ -36,7 +36,7 @@
}
p, _ := rocketmq.NewProducer(
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithTrace(traceCfg))
err := p.Start()
diff --git a/examples/producer/transaction/main.go b/examples/producer/transaction/main.go
index dde39a9..05b6c52 100644
--- a/examples/producer/transaction/main.go
+++ b/examples/producer/transaction/main.go
@@ -79,7 +79,7 @@
func main() {
p, _ := rocketmq.NewTransactionProducer(
NewDemoListener(),
- producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(1),
)
err := p.Start()
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 7b5b1ea..dae364e 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -194,7 +194,7 @@
responseFuture.ResponseCommand = cmd
responseFuture.executeInvokeCallback()
if responseFuture.Done != nil {
- responseFuture.Done <- true
+ close(responseFuture.Done)
}
})
}
diff --git a/producer/option.go b/producer/option.go
index 76e9a31..bacef7b 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -122,8 +122,8 @@
}
}
-// WithNsResovler set nameserver resolver to fetch nameserver addr
-func WithNsResovler(resolver primitive.NsResolver) Option {
+// WithNsResolver set nameserver resolver to fetch nameserver addr
+func WithNsResolver(resolver primitive.NsResolver) Option {
return func(options *producerOptions) {
options.Resolver = resolver
}
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 508acf8..d1b88c3 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -35,7 +35,7 @@
func TestShutdown(t *testing.T) {
p, _ := NewDefaultProducer(
- WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithRetry(2),
WithQueueSelector(NewManualQueueSelector()),
)
@@ -98,7 +98,7 @@
func TestSync(t *testing.T) {
p, _ := NewDefaultProducer(
- WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithRetry(2),
WithQueueSelector(NewManualQueueSelector()),
)
@@ -149,7 +149,7 @@
func TestASync(t *testing.T) {
p, _ := NewDefaultProducer(
- WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithRetry(2),
WithQueueSelector(NewManualQueueSelector()),
)
@@ -211,7 +211,7 @@
func TestOneway(t *testing.T) {
p, _ := NewDefaultProducer(
- WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithRetry(2),
WithQueueSelector(NewManualQueueSelector()),
)