feat: support nameserver resolver (#457)
use nameserver resolver instead of nameserver address, we can pass
passThrough for direct address, httpResolver for a domain, an env resolver
for env param.
diff --git a/.travis.yml b/.travis.yml
index e65ed79..d4ed1ac 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,9 +3,8 @@
go:
- "1.11.x"
- "1.12.x"
-
+ - "1.13.x"
go_import_path: github.com/apache/rocketmq-client-go/v2
-
env:
global:
- NAME_SERVER_ADDRESS=127.0.0.1:9876
@@ -27,7 +26,10 @@
- nohup sh bin/mqbroker -n localhost:9876 &
script:
+ - cd ${TRAVIS_HOME}
+ - ls -al
- cd ${GOPATH}/src/github.com/apache/rocketmq-client-go/v2
+ - ls -al
- go fmt ./... && [[ -z `git status -s` ]]
- go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
diff --git a/api.go b/api.go
index 0f9a0bd..0e149e9 100644
--- a/api.go
+++ b/api.go
@@ -19,6 +19,7 @@
import (
"context"
+
"github.com/pkg/errors"
"github.com/apache/rocketmq-client-go/v2/consumer"
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 2a1f9af..3129b8d 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -27,7 +27,8 @@
"sync/atomic"
"time"
- "github.com/json-iterator/go"
+ jsoniter "github.com/json-iterator/go"
+
"github.com/pkg/errors"
"github.com/tidwall/gjson"
@@ -273,9 +274,6 @@
}
func (dc *defaultConsumer) start() error {
- if len(dc.option.NameServerAddrs) == 0 {
- dc.namesrv.UpdateNameServerAddress(dc.option.NameServerDomain, dc.option.InstanceName)
- }
if dc.model == Clustering {
// set retry topic
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index a78a698..bac1cdb 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -22,9 +22,10 @@
package consumer
import (
+ reflect "reflect"
+
primitive "github.com/apache/rocketmq-client-go/v2/primitive"
gomock "github.com/golang/mock/gomock"
- reflect "reflect"
)
// MockOffsetStore is a mock of OffsetStore interface
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index e5f3bab..44a0597 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -26,7 +26,7 @@
"sync"
"time"
- "github.com/json-iterator/go"
+ jsoniter "github.com/json-iterator/go"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
@@ -255,12 +255,6 @@
rlog.LogKeyUnderlayError: err.Error(),
"offset": off,
})
- } else {
- rlog.Info("update offset to broker success", map[string]interface{}{
- rlog.LogKeyConsumerGroup: r.group,
- rlog.LogKeyMessageQueue: mq.String(),
- "offset": off,
- })
}
}
}
diff --git a/consumer/option.go b/consumer/option.go
index 2537554..07b4246 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -104,6 +104,8 @@
//
AutoCommit bool
RebalanceLockInterval time.Duration
+
+ Resolver primitive.NsResolver
}
func defaultPushConsumerOptions() consumerOptions {
@@ -115,6 +117,7 @@
MaxReconsumeTimes: -1,
ConsumerModel: Clustering,
AutoCommit: true,
+ Resolver: primitive.NewHttpResolver("DEFAULT"),
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
@@ -125,6 +128,7 @@
func defaultPullConsumerOptions() consumerOptions {
opts := consumerOptions{
ClientOptions: internal.DefaultClientOptions(),
+ Resolver: primitive.NewHttpResolver("DEFAULT"),
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
@@ -179,20 +183,6 @@
}
}
-// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
-func WithNameServer(nameServers primitive.NamesrvAddr) Option {
- return func(opts *consumerOptions) {
- opts.NameServerAddrs = nameServers
- }
-}
-
-// WithNameServerDomain set NameServer domain
-func WithNameServerDomain(nameServerUrl string) Option {
- return func(opts *consumerOptions) {
- opts.NameServerDomain = nameServerUrl
- }
-}
-
// WithNamespace set the namespace of consumer
func WithNamespace(namespace string) Option {
return func(opts *consumerOptions) {
@@ -263,3 +253,24 @@
options.PullInterval = interval
}
}
+
+// WithNsResovler set nameserver resolver to fetch nameserver addr
+func WithNsResovler(resolver primitive.NsResolver) Option {
+ return func(options *consumerOptions) {
+ options.Resolver = resolver
+ }
+}
+
+// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
+func WithNameServer(nameServers primitive.NamesrvAddr) Option {
+ return func(options *consumerOptions) {
+ options.Resolver = primitive.NewPassthroughResolver(nameServers)
+ }
+}
+
+// WithNameServerDomain set NameServer domain
+func WithNameServerDomain(nameServerUrl string) Option {
+ return func(opts *consumerOptions) {
+ opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
+ }
+}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 83369d1..0523f08 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -76,7 +76,7 @@
apply(&defaultOpts)
}
- srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
+ srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d14f0a5..d2aee5b 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -76,7 +76,7 @@
for _, apply := range opts {
apply(&defaultOpts)
}
- srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
+ srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index 3115807..7cb5fca 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -20,11 +20,12 @@
import (
"context"
"fmt"
+ "testing"
+
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/golang/mock/gomock"
. "github.com/smartystreets/goconvey/convey"
- "testing"
)
func mockB4Start(c *pushConsumer) {
@@ -35,7 +36,7 @@
Convey("test Start method", t, func() {
c, _ := NewPushConsumer(
WithGroupName("testGroup"),
- WithNameServer([]string{"127.0.0.1:9876"}),
+ WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithConsumerModel(BroadCasting),
)
diff --git a/consumer/strategy_test.go b/consumer/strategy_test.go
index 3cd79cc..e66b15c 100644
--- a/consumer/strategy_test.go
+++ b/consumer/strategy_test.go
@@ -19,9 +19,10 @@
import (
"fmt"
+ "testing"
+
"github.com/apache/rocketmq-client-go/v2/primitive"
. "github.com/smartystreets/goconvey/convey"
- "testing"
)
func TestAllocateByAveragely(t *testing.T) {
diff --git a/examples/consumer/acl/main.go b/examples/consumer/acl/main.go
index 583f8f0..a6535fd 100644
--- a/examples/consumer/acl/main.go
+++ b/examples/consumer/acl/main.go
@@ -31,7 +31,7 @@
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
diff --git a/examples/consumer/broadcast/main.go b/examples/consumer/broadcast/main.go
index 114aa87..29b0b12 100644
--- a/examples/consumer/broadcast/main.go
+++ b/examples/consumer/broadcast/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerModel(consumer.BroadCasting),
)
diff --git a/examples/consumer/delay/main.go b/examples/consumer/delay/main.go
index d14c620..8cc5c04 100644
--- a/examples/consumer/delay/main.go
+++ b/examples/consumer/delay/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
diff --git a/examples/consumer/interceptor/main.go b/examples/consumer/interceptor/main.go
index 37a5979..1036c6a 100644
--- a/examples/consumer/interceptor/main.go
+++ b/examples/consumer/interceptor/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
diff --git a/examples/consumer/namespace/main.go b/examples/consumer/namespace/main.go
index 815e152..e1b9dea 100644
--- a/examples/consumer/namespace/main.go
+++ b/examples/consumer/namespace/main.go
@@ -31,7 +31,7 @@
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
diff --git a/examples/consumer/orderly/main.go b/examples/consumer/orderly/main.go
index 3e51896..9e7a810 100644
--- a/examples/consumer/orderly/main.go
+++ b/examples/consumer/orderly/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
index 3c75272..d740b15 100644
--- a/examples/consumer/pull/main.go
+++ b/examples/consumer/pull/main.go
@@ -31,7 +31,7 @@
func main() {
c, err := rocketmq.NewPullConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
if err != nil {
rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)
diff --git a/examples/consumer/retry/concurrent/main.go b/examples/consumer/retry/concurrent/main.go
index acfe749..5fcf489 100644
--- a/examples/consumer/retry/concurrent/main.go
+++ b/examples/consumer/retry/concurrent/main.go
@@ -37,7 +37,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
)
diff --git a/examples/consumer/retry/order/main.go b/examples/consumer/retry/order/main.go
index ee726eb..4ec05e7 100644
--- a/examples/consumer/retry/order/main.go
+++ b/examples/consumer/retry/order/main.go
@@ -36,7 +36,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
diff --git a/examples/consumer/simple/main.go b/examples/consumer/simple/main.go
index 41091c4..7d1a0b7 100644
--- a/examples/consumer/simple/main.go
+++ b/examples/consumer/simple/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
diff --git a/examples/consumer/strategy/main.go b/examples/consumer/strategy/main.go
index 5ff639e..502524c 100644
--- a/examples/consumer/strategy/main.go
+++ b/examples/consumer/strategy/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithStrategy(consumer.AllocateByAveragely),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
diff --git a/examples/consumer/tag/main.go b/examples/consumer/tag/main.go
index a2e811f..0532971 100644
--- a/examples/consumer/tag/main.go
+++ b/examples/consumer/tag/main.go
@@ -31,7 +31,7 @@
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
selector := consumer.MessageSelector{
Type: consumer.TAG,
diff --git a/examples/consumer/trace/main.go b/examples/consumer/trace/main.go
index d25a635..35c4926 100644
--- a/examples/consumer/trace/main.go
+++ b/examples/consumer/trace/main.go
@@ -37,7 +37,7 @@
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
- consumer.WithNameServer(namesrvs),
+ consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithTrace(traceCfg),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
diff --git a/examples/producer/acl/main.go b/examples/producer/acl/main.go
index 73d4fcb..38d61dc 100644
--- a/examples/producer/acl/main.go
+++ b/examples/producer/acl/main.go
@@ -30,7 +30,7 @@
func main() {
p, err := rocketmq.NewProducer(
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
diff --git a/examples/producer/async/main.go b/examples/producer/async/main.go
index 699a70e..aa73881 100644
--- a/examples/producer/async/main.go
+++ b/examples/producer/async/main.go
@@ -31,7 +31,7 @@
// Package main implements a async producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithQueueSelector(producer.NewManualQueueSelector()))
diff --git a/examples/producer/batch/main.go b/examples/producer/batch/main.go
index 6260536..d807daf 100644
--- a/examples/producer/batch/main.go
+++ b/examples/producer/batch/main.go
@@ -30,7 +30,7 @@
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
diff --git a/examples/producer/delay/main.go b/examples/producer/delay/main.go
index 6d5cb5d..465e97c 100644
--- a/examples/producer/delay/main.go
+++ b/examples/producer/delay/main.go
@@ -29,7 +29,7 @@
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
diff --git a/examples/producer/interceptor/main.go b/examples/producer/interceptor/main.go
index 8dafe3d..47ea1fb 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -30,7 +30,7 @@
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithInterceptor(UserFirstInterceptor(), UserSecondInterceptor()),
)
diff --git a/examples/producer/namespace/main.go b/examples/producer/namespace/main.go
index 5a82b72..524bd32 100644
--- a/examples/producer/namespace/main.go
+++ b/examples/producer/namespace/main.go
@@ -30,7 +30,7 @@
func main() {
p, err := rocketmq.NewProducer(
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
diff --git a/examples/producer/simple/main.go b/examples/producer/simple/main.go
index 08b7682..8ac4421 100644
--- a/examples/producer/simple/main.go
+++ b/examples/producer/simple/main.go
@@ -31,7 +31,7 @@
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
diff --git a/examples/producer/tag/main.go b/examples/producer/tag/main.go
index 02c402e..2bcd51b 100644
--- a/examples/producer/tag/main.go
+++ b/examples/producer/tag/main.go
@@ -29,7 +29,7 @@
func main() {
p, _ := rocketmq.NewProducer(
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
diff --git a/examples/producer/trace/main.go b/examples/producer/trace/main.go
index 33d86d5..5f2c9b3 100644
--- a/examples/producer/trace/main.go
+++ b/examples/producer/trace/main.go
@@ -36,7 +36,7 @@
}
p, _ := rocketmq.NewProducer(
- producer.WithNameServer(namesrvs),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithTrace(traceCfg))
err := p.Start()
diff --git a/examples/producer/transaction/main.go b/examples/producer/transaction/main.go
index 8536501..dde39a9 100644
--- a/examples/producer/transaction/main.go
+++ b/examples/producer/transaction/main.go
@@ -79,7 +79,7 @@
func main() {
p, _ := rocketmq.NewTransactionProducer(
NewDemoListener(),
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(1),
)
err := p.Start()
diff --git a/internal/client.go b/internal/client.go
index 954cc57..37d7ea8 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -101,7 +101,6 @@
type ClientOptions struct {
GroupName string
NameServerAddrs primitive.NamesrvAddr
- NameServerDomain string
Namesrv *namesrvs
ClientIP string
InstanceName string
@@ -112,6 +111,7 @@
Interceptors []primitive.Interceptor
Credentials primitive.Credentials
Namespace string
+ Resolver primitive.NsResolver
}
func (opt *ClientOptions) ChangeInstanceNameToPID() {
@@ -263,31 +263,27 @@
if !c.option.Credentials.IsEmpty() {
c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
}
- // fetchNameServerAddr
- if len(c.option.NameServerAddrs) == 0 {
- c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
- go primitive.WithRecover(func() {
- op := func() {
- c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
- }
- time.Sleep(10 * time.Second)
- op()
+ go primitive.WithRecover(func() {
+ op := func() {
+ c.namesrvs.UpdateNameServerAddress()
+ }
+ time.Sleep(10 * time.Second)
+ op()
- ticker := time.NewTicker(2 * time.Minute)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- op()
- case <-c.done:
- rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
- "clientID": c.ClientID(),
- })
- return
- }
+ ticker := time.NewTicker(2 * time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ op()
+ case <-c.done:
+ rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
+ "clientID": c.ClientID(),
+ })
+ return
}
- })
- }
+ }
+ })
// schedule update route info
go primitive.WithRecover(func() {
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 7f9bfd6..f87d174 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -51,15 +51,15 @@
}
// UpdateNameServerAddress mocks base method
-func (m *MockNamesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
+func (m *MockNamesrvs) UpdateNameServerAddress() {
m.ctrl.T.Helper()
- m.ctrl.Call(m, "UpdateNameServerAddress", nameServerDomain, instanceName)
+ m.ctrl.Call(m, "UpdateNameServerAddress")
}
// UpdateNameServerAddress indicates an expected call of UpdateNameServerAddress
-func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServerDomain, instanceName interface{}) *gomock.Call {
+func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress() *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress), nameServerDomain, instanceName)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress))
}
// AddBroker mocks base method
diff --git a/internal/model.go b/internal/model.go
index 0c8259f..0ee9ccc 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -21,13 +21,13 @@
"bytes"
"encoding/json"
"fmt"
- "github.com/json-iterator/go"
"sort"
"strings"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
+ jsoniter "github.com/json-iterator/go"
)
type FindBrokerResult struct {
diff --git a/internal/namesrv.go b/internal/namesrv.go
index a06a09e..6d8ff2b 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -19,19 +19,12 @@
import (
"errors"
- "fmt"
- "github.com/apache/rocketmq-client-go/v2/internal/remote"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- "github.com/apache/rocketmq-client-go/v2/rlog"
- "io/ioutil"
- "net/http"
- "os"
- "os/user"
- "path"
"regexp"
"strings"
"sync"
- "time"
+
+ "github.com/apache/rocketmq-client-go/v2/internal/remote"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
)
const (
@@ -48,7 +41,7 @@
//go:generate mockgen -source namesrv.go -destination mock_namesrv.go -self_package github.com/apache/rocketmq-client-go/v2/internal --package internal Namesrvs
type Namesrvs interface {
- UpdateNameServerAddress(nameServerDomain, instanceName string)
+ UpdateNameServerAddress()
AddBroker(routeData *TopicRouteData)
@@ -94,13 +87,21 @@
lockNamesrv sync.Mutex
nameSrvClient remote.RemotingClient
+
+ resolver primitive.NsResolver
}
-var _ Namesrvs = &namesrvs{}
+var _ Namesrvs = (*namesrvs)(nil)
// NewNamesrv init Namesrv from namesrv addr string.
-func NewNamesrv(addr primitive.NamesrvAddr) (*namesrvs, error) {
- if err := addr.Check(); err != nil {
+// addr primitive.NamesrvAddr
+func NewNamesrv(resolver primitive.NsResolver) (*namesrvs, error) {
+ addr := resolver.Resolve()
+ if len(addr) == 0 {
+ return nil, errors.New("no name server addr found with resolver: " + resolver.Description())
+ }
+
+ if err := primitive.NamesrvAddr(addr).Check(); err != nil {
return nil, err
}
nameSrvClient := remote.NewRemotingClient()
@@ -110,6 +111,7 @@
nameSrvClient: nameSrvClient,
brokerVersionMap: make(map[string]map[string]int32, 0),
brokerLock: new(sync.RWMutex),
+ resolver: resolver,
}, nil
}
@@ -143,99 +145,21 @@
return s.srvs
}
-func getSnapshotFilePath(instanceName string) string {
- homeDir := ""
- if usr, err := user.Current(); err == nil {
- homeDir = usr.HomeDir
- } else {
- rlog.Error("name server domain, can't get user home directory", map[string]interface{}{
- "err": err,
- })
- }
- storePath := path.Join(homeDir, "/logs/rocketmq-go/snapshot")
- if _, err := os.Stat(storePath); os.IsNotExist(err) {
- if err = os.MkdirAll(storePath, 0755); err != nil {
- rlog.Fatal("can't create name server snapshot directory", map[string]interface{}{
- "path": storePath,
- "err": err,
- })
- }
- }
- filePath := path.Join(storePath, fmt.Sprintf("nameserver_addr-%s", instanceName))
- return filePath
-}
-
// UpdateNameServerAddress will update srvs.
// docs: https://rocketmq.apache.org/docs/best-practice-namesvr/
-func (s *namesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
+func (s *namesrvs) UpdateNameServerAddress() {
s.lock.Lock()
defer s.lock.Unlock()
- if nameServerDomain == "" {
- // try to get from environment variable
- if v := os.Getenv("NAMESRV_ADDR"); v != "" {
- s.srvs = strings.Split(v, ";")
- return
- }
- // use default domain
- nameServerDomain = DEFAULT_NAMESRV_ADDR
+ srvs := s.resolver.Resolve()
+ if len(srvs) == 0 {
+ return
}
- client := http.Client{Timeout: 10 * time.Second}
- resp, err := client.Get(nameServerDomain)
- if err == nil {
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err == nil {
- oldBodyStr := strings.Join(s.srvs, ";")
- bodyStr := string(body)
- if bodyStr != "" && oldBodyStr != bodyStr {
- s.srvs = strings.Split(string(body), ";")
-
- rlog.Info("name server address changed", map[string]interface{}{
- "old": oldBodyStr,
- "new": bodyStr,
- })
- // save to local snapshot
- filePath := getSnapshotFilePath(instanceName)
- if err := ioutil.WriteFile(filePath, body, 0644); err == nil {
- rlog.Info("name server snapshot save successfully", map[string]interface{}{
- "filePath": filePath,
- })
- } else {
- rlog.Error("name server snapshot save failed", map[string]interface{}{
- "filePath": filePath,
- "err": err,
- })
- }
- }
- rlog.Info("name server http fetch successfully", map[string]interface{}{
- "addrs": bodyStr,
- })
- return
- } else {
- rlog.Error("name server http fetch failed", map[string]interface{}{
- "NameServerDomain": nameServerDomain,
- "err": err,
- })
- }
+ updated := primitive.Diff(s.srvs, srvs)
+ if !updated {
+ return
}
- // load local snapshot if need when name server domain request failed
- if len(s.srvs) == 0 {
- filePath := getSnapshotFilePath(instanceName)
- if _, err := os.Stat(filePath); !os.IsNotExist(err) {
- if bs, err := ioutil.ReadFile(filePath); err == nil {
- rlog.Info("load the name server snapshot local file", map[string]interface{}{
- "filePath": filePath,
- })
- s.srvs = strings.Split(string(bs), ";")
- return
- }
- } else {
- rlog.Warning("name server snapshot local file not exists", map[string]interface{}{
- "filePath": filePath,
- })
- }
- }
+ s.srvs = srvs
}
diff --git a/internal/namesrv_test.go b/internal/namesrv_test.go
index ede14fc..e58dc29 100644
--- a/internal/namesrv_test.go
+++ b/internal/namesrv_test.go
@@ -19,7 +19,6 @@
import (
"fmt"
- "io/ioutil"
"net"
"net/http"
"os"
@@ -27,6 +26,8 @@
"sync"
"testing"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
)
@@ -34,7 +35,7 @@
// TestSelector test roundrobin selector in namesrv
func TestSelector(t *testing.T) {
srvs := []string{"127.0.0.1:9876", "127.0.0.1:9879", "12.24.123.243:10911", "12.24.123.243:10915"}
- namesrv, err := NewNamesrv(srvs)
+ namesrv, err := NewNamesrv(primitive.NewPassthroughResolver(srvs))
assert.Nil(t, err)
assert.Equal(t, srvs[0], namesrv.getNameServerAddress())
@@ -92,11 +93,14 @@
nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs", port)
fmt.Println("temporary name server domain: ", nameServerDommain)
+ resolver := primitive.NewHttpResolver("DEFAULT", nameServerDommain)
ns := &namesrvs{
- srvs: []string{},
- lock: new(sync.Mutex),
+ srvs: []string{},
+ lock: new(sync.Mutex),
+ resolver: resolver,
}
- ns.UpdateNameServerAddress(nameServerDommain, "DEFAULT")
+
+ ns.UpdateNameServerAddress()
index1 := ns.index
IP1 := ns.getNameServerAddress()
@@ -110,39 +114,6 @@
})
}
-func TestUpdateNameServerAddressSaveLocalSnapshot(t *testing.T) {
- Convey("Test UpdateNameServerAddress Save Local Snapshot", t, func() {
- srvs := []string{
- "192.168.100.1",
- "192.168.100.2",
- "192.168.100.3",
- "192.168.100.4",
- "192.168.100.5",
- }
- http.HandleFunc("/nameserver/addrs2", func(w http.ResponseWriter, r *http.Request) {
- fmt.Fprintf(w, strings.Join(srvs, ";"))
- })
- server := &http.Server{Addr: ":0", Handler: nil}
- listener, _ := net.Listen("tcp", ":0")
- go server.Serve(listener)
-
- port := listener.Addr().(*net.TCPAddr).Port
- nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs2", port)
- fmt.Println("temporary name server domain: ", nameServerDommain)
-
- ns := &namesrvs{
- srvs: []string{},
- lock: new(sync.Mutex),
- }
- ns.UpdateNameServerAddress(nameServerDommain, "DEFAULT")
- // check snapshot saved
- filePath := getSnapshotFilePath("DEFAULT")
- body := strings.Join(srvs, ";")
- bs, _ := ioutil.ReadFile(filePath)
- So(string(bs), ShouldEqual, body)
- })
-}
-
func TestUpdateNameServerAddressUseEnv(t *testing.T) {
Convey("Test UpdateNameServerAddress Use Env", t, func() {
srvs := []string{
@@ -153,86 +124,14 @@
"192.168.100.5",
}
+ resolver := primitive.NewEnvResolver()
ns := &namesrvs{
- srvs: []string{},
- lock: new(sync.Mutex),
+ srvs: []string{},
+ lock: new(sync.Mutex),
+ resolver: resolver,
}
os.Setenv("NAMESRV_ADDR", strings.Join(srvs, ";"))
- ns.UpdateNameServerAddress("", "DEFAULT")
-
- index1 := ns.index
- IP1 := ns.getNameServerAddress()
-
- index2 := ns.index
- IP2 := ns.getNameServerAddress()
-
- So(index1+1, ShouldEqual, index2)
- So(IP1, ShouldEqual, srvs[index1])
- So(IP2, ShouldEqual, srvs[index2])
- })
-}
-
-func TestUpdateNameServerAddressUseSnapshotFile(t *testing.T) {
- Convey("Test UpdateNameServerAddress Use Local Snapshot", t, func() {
- srvs := []string{
- "192.168.100.1",
- "192.168.100.2",
- "192.168.100.3",
- "192.168.100.4",
- "192.168.100.5",
- }
-
- ns := &namesrvs{
- srvs: []string{},
- lock: new(sync.Mutex),
- }
-
- os.Setenv("NAMESRV_ADDR", "") // clear env
- // setup local snapshot file
- filePath := getSnapshotFilePath("DEFAULT")
- body := strings.Join(srvs, ";")
- _ = ioutil.WriteFile(filePath, []byte(body), 0644)
-
- ns.UpdateNameServerAddress("http://127.0.0.1:80/error/nsaddrs", "DEFAULT")
-
- index1 := ns.index
- IP1 := ns.getNameServerAddress()
-
- index2 := ns.index
- IP2 := ns.getNameServerAddress()
-
- So(index1+1, ShouldEqual, index2)
- So(IP1, ShouldEqual, srvs[index1])
- So(IP2, ShouldEqual, srvs[index2])
- })
-}
-
-func TestUpdateNameServerAddressLoadSnapshotFileOnce(t *testing.T) {
- Convey("Test UpdateNameServerAddress Load Local Snapshot Once", t, func() {
- srvs := []string{
- "192.168.100.1",
- "192.168.100.2",
- "192.168.100.3",
- "192.168.100.4",
- "192.168.100.5",
- }
-
- ns := &namesrvs{
- srvs: []string{},
- lock: new(sync.Mutex),
- }
-
- os.Setenv("NAMESRV_ADDR", "") // clear env
- // setup local snapshot file
- filePath := getSnapshotFilePath("DEFAULT")
- body := strings.Join(srvs, ";")
- _ = ioutil.WriteFile(filePath, []byte(body), 0644)
- // load local snapshot file first time
- ns.UpdateNameServerAddress("http://127.0.0.1:80/error/nsaddrs", "DEFAULT")
-
- // change the local snapshot file to check load once
- _ = ioutil.WriteFile(filePath, []byte("127.0.0.1;127.0.0.2"), 0644)
- ns.UpdateNameServerAddress("http://127.0.0.1:80/error/nsaddrs", "DEFAULT")
+ ns.UpdateNameServerAddress()
index1 := ns.index
IP1 := ns.getNameServerAddress()
diff --git a/internal/remote/codec.go b/internal/remote/codec.go
index 3ac13e9..f756c11 100644
--- a/internal/remote/codec.go
+++ b/internal/remote/codec.go
@@ -22,7 +22,7 @@
"fmt"
"sync/atomic"
- "github.com/json-iterator/go"
+ jsoniter "github.com/json-iterator/go"
)
var opaque int32
diff --git a/internal/remote/codec_test.go b/internal/remote/codec_test.go
index ed854bd..8fb8a60 100644
--- a/internal/remote/codec_test.go
+++ b/internal/remote/codec_test.go
@@ -24,7 +24,8 @@
"testing"
"unsafe"
- "github.com/json-iterator/go"
+ jsoniter "github.com/json-iterator/go"
+
"github.com/stretchr/testify/assert"
)
diff --git a/internal/remote/mock_remote_client.go b/internal/remote/mock_remote_client.go
index bc1a0da..7d7b41c 100644
--- a/internal/remote/mock_remote_client.go
+++ b/internal/remote/mock_remote_client.go
@@ -22,9 +22,10 @@
import (
context "context"
+ reflect "reflect"
+
primitive "github.com/apache/rocketmq-client-go/v2/primitive"
gomock "github.com/golang/mock/gomock"
- reflect "reflect"
)
// MockRemotingClient is a mock of RemotingClient interface
diff --git a/internal/route_test.go b/internal/route_test.go
index 068f285..c9b65f0 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -42,7 +42,7 @@
addr, err := primitive.NewNamesrvAddr("1.1.1.1:8880", "1.1.1.2:8880", "1.1.1.3:8880")
assert.Nil(t, err)
- namesrv, err := NewNamesrv(addr)
+ namesrv, err := NewNamesrv(primitive.NewPassthroughResolver(addr))
assert.Nil(t, err)
namesrv.nameSrvClient = remotingCli
diff --git a/internal/trace.go b/internal/trace.go
index 48b257f..f0c643c 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -241,7 +241,7 @@
t = TraceTopicPrefix + traceCfg.TraceTopic
}
- srvs, err := NewNamesrv(traceCfg.NamesrvAddrs)
+ srvs, err := NewNamesrv(primitive.NewPassthroughResolver(traceCfg.NamesrvAddrs))
if err != nil {
panic(errors.Wrap(err, "new Namesrv failed."))
}
diff --git a/primitive/base.go b/primitive/base.go
index efc48ef..35b6268 100644
--- a/primitive/base.go
+++ b/primitive/base.go
@@ -95,3 +95,31 @@
fn()
}
+
+func Diff(origin, latest []string) bool {
+ if len(origin) != len(latest) {
+ return true
+ }
+
+ // check added
+ originFilter := make(map[string]struct{}, len(origin))
+ for _, srv := range origin {
+ originFilter[srv] = struct{}{}
+ }
+
+ latestFilter := make(map[string]struct{}, len(latest))
+ for _, srv := range latest {
+ if _, ok := originFilter[srv]; !ok {
+ return true // added
+ }
+ latestFilter[srv] = struct{}{}
+ }
+
+ // check delete
+ for _, srv := range origin {
+ if _, ok := latestFilter[srv]; !ok {
+ return true // deleted
+ }
+ }
+ return false
+}
diff --git a/primitive/base_test.go b/primitive/base_test.go
index 09d55c3..03a978d 100644
--- a/primitive/base_test.go
+++ b/primitive/base_test.go
@@ -44,3 +44,21 @@
err = verifyIP(IPs)
assert.Equal(t, "multiple IP addr does not support", err.Error())
}
+
+func TestBase(t *testing.T) {
+ a := []string{}
+ b := []string{}
+ assert.False(t, Diff(a, b))
+
+ a = []string{"a"}
+ b = []string{"a", "b"}
+ assert.True(t, Diff(a, b))
+
+ a = []string{"a", "b", "c"}
+ b = []string{"c", "a", "b"}
+ assert.False(t, Diff(a, b))
+
+ a = []string{"b", "a"}
+ b = []string{"a", "c"}
+ assert.True(t, Diff(a, b))
+}
diff --git a/primitive/nsresolver.go b/primitive/nsresolver.go
new file mode 100644
index 0000000..d844373
--- /dev/null
+++ b/primitive/nsresolver.go
@@ -0,0 +1,219 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package primitive
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "os/user"
+ "path"
+ "strings"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+// resolver for nameserver, monitor change of nameserver and notify client
+// consul or domain is common
+type NsResolver interface {
+ Resolve() []string
+ Description() string
+}
+
+type StaticResolver struct {
+}
+
+var _ NsResolver = (*EnvResolver)(nil)
+
+func NewEnvResolver() *EnvResolver {
+ return &EnvResolver{}
+}
+
+type EnvResolver struct {
+}
+
+func (e *EnvResolver) Resolve() []string {
+ if v := os.Getenv("NAMESRV_ADDR"); v != "" {
+ return strings.Split(v, ";")
+ }
+ return nil
+}
+
+func (e *EnvResolver) Description() string {
+ return "env resolver of var NAMESRV_ADDR"
+}
+
+type passthroughResolver struct {
+ addr []string
+ failback NsResolver
+}
+
+func NewPassthroughResolver(addr []string) *passthroughResolver {
+ return &passthroughResolver{
+ addr: addr,
+ failback: NewEnvResolver(),
+ }
+}
+
+func (p *passthroughResolver) Resolve() []string {
+ if p.addr != nil {
+ return p.addr
+ }
+ return p.failback.Resolve()
+}
+
+func (p *passthroughResolver) Description() string {
+ return fmt.Sprintf("passthrough resolver of %v", p.addr)
+}
+
+const (
+ DEFAULT_NAMESRV_ADDR = "http://jmenv.tbsite.net:8080/rocketmq/nsaddr"
+)
+
+var _ NsResolver = (*HttpResolver)(nil)
+
+type HttpResolver struct {
+ domain string
+ instance string
+ cli http.Client
+ failback NsResolver
+}
+
+func NewHttpResolver(instance string, domain ...string) *HttpResolver {
+ d := DEFAULT_NAMESRV_ADDR
+ if len(domain) > 0 {
+ d = domain[0]
+ }
+ client := http.Client{Timeout: 10 * time.Second}
+
+ h := &HttpResolver{
+ domain: d,
+ instance: instance,
+ cli: client,
+ failback: NewEnvResolver(),
+ }
+ return h
+}
+
+func (h *HttpResolver) Resolve() []string {
+ addrs := h.get()
+ if len(addrs) > 0 {
+ return addrs
+ }
+
+ addrs = h.loadSnapshot()
+ if len(addrs) > 0 {
+ return addrs
+ }
+ return h.failback.Resolve()
+}
+
+func (h *HttpResolver) Description() string {
+ return fmt.Sprintf("passthrough resolver of domain:%v instance:%v", h.domain, h.instance)
+}
+
+func (h *HttpResolver) get() []string {
+ resp, err := h.cli.Get(h.domain)
+ if err != nil {
+ rlog.Error("name server http fetch failed", map[string]interface{}{
+ "NameServerDomain": h.domain,
+ "err": err,
+ })
+ return nil
+ }
+
+ defer resp.Body.Close()
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ rlog.Error("name server read http response failed", map[string]interface{}{
+ "NameServerDomain": h.domain,
+ "err": err,
+ })
+ return nil
+ }
+
+ bodyStr := string(body)
+ if bodyStr == "" {
+ return nil
+ }
+
+ h.saveSnapshot(body)
+
+ return strings.Split(string(body), ";")
+}
+
+func (h *HttpResolver) saveSnapshot(body []byte) error {
+ filePath := h.getSnapshotFilePath(h.instance)
+ err := ioutil.WriteFile(filePath, body, 0644)
+ if err != nil {
+ rlog.Error("name server snapshot save failed", map[string]interface{}{
+ "filePath": filePath,
+ "err": err,
+ })
+ return err
+ }
+
+ rlog.Info("name server snapshot save successfully", map[string]interface{}{
+ "filePath": filePath,
+ })
+ return nil
+}
+
+func (h *HttpResolver) loadSnapshot() []string {
+ filePath := h.getSnapshotFilePath(h.instance)
+ _, err := os.Stat(filePath)
+ if os.IsNotExist(err) {
+ rlog.Warning("name server snapshot local file not exists", map[string]interface{}{
+ "filePath": filePath,
+ })
+ return nil
+ }
+
+ bs, err := ioutil.ReadFile(filePath)
+ if err != nil {
+ return nil
+ }
+
+ rlog.Info("load the name server snapshot local file", map[string]interface{}{
+ "filePath": filePath,
+ })
+ return strings.Split(string(bs), ";")
+}
+
+func (h *HttpResolver) getSnapshotFilePath(instanceName string) string {
+ homeDir := ""
+ if usr, err := user.Current(); err == nil {
+ homeDir = usr.HomeDir
+ } else {
+ rlog.Error("name server domain, can't get user home directory", map[string]interface{}{
+ "err": err,
+ })
+ }
+ storePath := path.Join(homeDir, "/logs/rocketmq-go/snapshot")
+ if _, err := os.Stat(storePath); os.IsNotExist(err) {
+ if err = os.MkdirAll(storePath, 0755); err != nil {
+ rlog.Fatal("can't create name server snapshot directory", map[string]interface{}{
+ "path": storePath,
+ "err": err,
+ })
+ }
+ }
+ filePath := path.Join(storePath, fmt.Sprintf("nameserver_addr-%s", instanceName))
+ return filePath
+}
diff --git a/primitive/nsresolver_test.go b/primitive/nsresolver_test.go
new file mode 100644
index 0000000..98d839a
--- /dev/null
+++ b/primitive/nsresolver_test.go
@@ -0,0 +1,133 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package primitive
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "os"
+ "strings"
+ "testing"
+
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+func TestEnvResolver(t *testing.T) {
+ Convey("Test UpdateNameServerAddress Use Env", t, func() {
+ srvs := []string{
+ "192.168.100.1",
+ "192.168.100.2",
+ "192.168.100.3",
+ "192.168.100.4",
+ "192.168.100.5",
+ }
+
+ resolver := NewEnvResolver()
+ os.Setenv("NAMESRV_ADDR", strings.Join(srvs, ";"))
+
+ addrs := resolver.Resolve()
+
+ So(Diff(srvs, addrs), ShouldBeFalse)
+ })
+}
+
+func TestHttpResolverWithGet(t *testing.T) {
+ Convey("Test UpdateNameServerAddress Save Local Snapshot", t, func() {
+ srvs := []string{
+ "192.168.100.1",
+ "192.168.100.2",
+ "192.168.100.3",
+ "192.168.100.4",
+ "192.168.100.5",
+ }
+ http.HandleFunc("/nameserver/addrs2", func(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, strings.Join(srvs, ";"))
+ })
+ server := &http.Server{Addr: ":0", Handler: nil}
+ listener, _ := net.Listen("tcp", ":0")
+ go server.Serve(listener)
+
+ port := listener.Addr().(*net.TCPAddr).Port
+ nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs2", port)
+ fmt.Println("temporary name server domain: ", nameServerDommain)
+
+ resolver := NewHttpResolver("DEFAULT", nameServerDommain)
+ resolver.Resolve()
+
+ // check snapshot saved
+ filePath := resolver.getSnapshotFilePath("DEFAULT")
+ body := strings.Join(srvs, ";")
+ bs, _ := ioutil.ReadFile(filePath)
+ So(string(bs), ShouldEqual, body)
+ })
+}
+
+func TestHttpResolverWithSnapshotFile(t *testing.T) {
+ Convey("Test UpdateNameServerAddress Use Local Snapshot", t, func() {
+ srvs := []string{
+ "192.168.100.1",
+ "192.168.100.2",
+ "192.168.100.3",
+ "192.168.100.4",
+ "192.168.100.5",
+ }
+
+ resolver := NewHttpResolver("DEFAULT", "http://127.0.0.1:80/error/nsaddrs")
+
+ os.Setenv("NAMESRV_ADDR", "") // clear env
+ // setup local snapshot file
+ filePath := resolver.getSnapshotFilePath("DEFAULT")
+ body := strings.Join(srvs, ";")
+ _ = ioutil.WriteFile(filePath, []byte(body), 0644)
+
+ addrs := resolver.Resolve()
+
+ So(Diff(addrs, srvs), ShouldBeFalse)
+ })
+}
+
+func TesHttpReslverWithSnapshotFileOnce(t *testing.T) {
+ Convey("Test UpdateNameServerAddress Load Local Snapshot Once", t, func() {
+ srvs := []string{
+ "192.168.100.1",
+ "192.168.100.2",
+ "192.168.100.3",
+ "192.168.100.4",
+ "192.168.100.5",
+ }
+
+ resolver := NewHttpResolver("DEFAULT", "http://127.0.0.1:80/error/nsaddrs")
+
+ os.Setenv("NAMESRV_ADDR", "") // clear env
+ // setup local snapshot file
+ filePath := resolver.getSnapshotFilePath("DEFAULT")
+ body := strings.Join(srvs, ";")
+ _ = ioutil.WriteFile(filePath, []byte(body), 0644)
+ // load local snapshot file first time
+ addrs1 := resolver.Resolve()
+
+ // change the local snapshot file to check load once
+ _ = ioutil.WriteFile(filePath, []byte("127.0.0.1;127.0.0.2"), 0644)
+
+ addrs2 := resolver.Resolve()
+
+ So(Diff(addrs1, addrs2), ShouldBeFalse)
+ So(Diff(addrs1, srvs), ShouldBeFalse)
+ })
+}
diff --git a/producer/option.go b/producer/option.go
index 97a480e..76e9a31 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -31,6 +31,7 @@
SendMsgTimeout: 3 * time.Second,
DefaultTopicQueueNums: 4,
CreateTopicKey: "TBW102",
+ Resolver: primitive.NewHttpResolver("DEFAULT"),
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
@@ -43,6 +44,7 @@
DefaultTopicQueueNums int
CreateTopicKey string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
// and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
+ Resolver primitive.NsResolver
}
type Option func(*producerOptions)
@@ -63,20 +65,6 @@
}
}
-// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
-func WithNameServer(nameServers primitive.NamesrvAddr) Option {
- return func(opts *producerOptions) {
- opts.NameServerAddrs = nameServers
- }
-}
-
-// WithNameServerDomain set NameServer domain
-func WithNameServerDomain(nameServerUrl string) Option {
- return func(opts *producerOptions) {
- opts.NameServerDomain = nameServerUrl
- }
-}
-
// WithNamespace set the namespace of producer
func WithNamespace(namespace string) Option {
return func(opts *producerOptions) {
@@ -133,3 +121,24 @@
options.CreateTopicKey = topic
}
}
+
+// WithNsResovler set nameserver resolver to fetch nameserver addr
+func WithNsResovler(resolver primitive.NsResolver) Option {
+ return func(options *producerOptions) {
+ options.Resolver = resolver
+ }
+}
+
+// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
+func WithNameServer(nameServers primitive.NamesrvAddr) Option {
+ return func(options *producerOptions) {
+ options.Resolver = primitive.NewPassthroughResolver(nameServers)
+ }
+}
+
+// WithNameServerDomain set NameServer domain
+func WithNameServerDomain(nameServerUrl string) Option {
+ return func(opts *producerOptions) {
+ opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
+ }
+}
diff --git a/producer/producer.go b/producer/producer.go
index e22cee9..7c5e0eb 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -57,7 +57,7 @@
for _, apply := range opts {
apply(&defaultOpts)
}
- srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
+ srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
@@ -80,9 +80,6 @@
func (p *defaultProducer) Start() error {
atomic.StoreInt32(&p.state, int32(internal.StateRunning))
- if len(p.options.NameServerAddrs) == 0 {
- p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName)
- }
p.client.RegisterProducer(p.group, p)
p.client.Start()
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 41bf80a..508acf8 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -35,7 +35,7 @@
func TestShutdown(t *testing.T) {
p, _ := NewDefaultProducer(
- WithNameServer([]string{"127.0.0.1:9876"}),
+ WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithRetry(2),
WithQueueSelector(NewManualQueueSelector()),
)
@@ -98,7 +98,7 @@
func TestSync(t *testing.T) {
p, _ := NewDefaultProducer(
- WithNameServer([]string{"127.0.0.1:9876"}),
+ WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithRetry(2),
WithQueueSelector(NewManualQueueSelector()),
)
@@ -149,7 +149,7 @@
func TestASync(t *testing.T) {
p, _ := NewDefaultProducer(
- WithNameServer([]string{"127.0.0.1:9876"}),
+ WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithRetry(2),
WithQueueSelector(NewManualQueueSelector()),
)
@@ -211,7 +211,7 @@
func TestOneway(t *testing.T) {
p, _ := NewDefaultProducer(
- WithNameServer([]string{"127.0.0.1:9876"}),
+ WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithRetry(2),
WithQueueSelector(NewManualQueueSelector()),
)