Merge branch 'apache:3.0' into 3.0
diff --git a/registry/polaris/core.go b/registry/polaris/core.go
index e87fac7..7e9f304 100644
--- a/registry/polaris/core.go
+++ b/registry/polaris/core.go
@@ -32,13 +32,13 @@
"dubbo.apache.org/dubbo-go/v3/remoting"
)
-type subscriber func(remoting.EventType, []model.Instance)
+type item func(remoting.EventType, []model.Instance)
type PolarisServiceWatcher struct {
consumer api.ConsumerAPI
subscribeParam *api.WatchServiceRequest
lock *sync.RWMutex
- subscribers []subscriber
+ subscribers []item
execOnce *sync.Once
}
@@ -48,7 +48,7 @@
subscribeParam: param,
consumer: consumer,
lock: &sync.RWMutex{},
- subscribers: make([]subscriber, 0),
+ subscribers: make([]item, 0),
execOnce: &sync.Once{},
}
return watcher, nil
diff --git a/registry/polaris/core_test.go b/registry/polaris/core_test.go
new file mode 100644
index 0000000..26a573c
--- /dev/null
+++ b/registry/polaris/core_test.go
@@ -0,0 +1,51 @@
+package polaris
+
+import (
+ "sync"
+ "testing"
+)
+
+import (
+ "github.com/polarismesh/polaris-go/api"
+ "github.com/polarismesh/polaris-go/pkg/model"
+
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+func TestPolarisServiceWatcher_AddSubscriber(t *testing.T) {
+ var (
+ newParam api.WatchServiceRequest
+ newConsumer api.ConsumerAPI
+ )
+ type fields struct {
+ consumer api.ConsumerAPI
+ subscribeParam *api.WatchServiceRequest
+ lock *sync.RWMutex
+ subscribers []item
+ execOnce *sync.Once
+ }
+ type args struct {
+ subscriber func(remoting.EventType, []model.Instance)
+ }
+ var tests []struct {
+ name string
+ fields fields
+ args args
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ watcher := &PolarisServiceWatcher{
+ subscribeParam: &newParam,
+ consumer: newConsumer,
+ lock: &sync.RWMutex{},
+ subscribers: make([]item, 0),
+ execOnce: &sync.Once{},
+ }
+ assert.Empty(t, watcher)
+ })
+ }
+}
diff --git a/registry/polaris/listener.go b/registry/polaris/listener.go
index c792760..c949f29 100644
--- a/registry/polaris/listener.go
+++ b/registry/polaris/listener.go
@@ -51,6 +51,7 @@
events: gxchan.NewUnboundedChan(32),
closeCh: make(chan struct{}),
}
+
return listener, nil
}
diff --git a/registry/polaris/registry.go b/registry/polaris/registry.go
index 0ead232..255acdb 100644
--- a/registry/polaris/registry.go
+++ b/registry/polaris/registry.go
@@ -150,6 +150,11 @@
// Subscribe returns nil if subscribing registry successfully. If not returns an error.
func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
+ var (
+ newParam api.WatchServiceRequest
+ newConsumer api.ConsumerAPI
+ )
+
role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
if role != common.CONSUMER {
return nil
@@ -163,8 +168,16 @@
continue
}
+ watcher, err := newPolarisWatcher(&newParam, newConsumer)
+ if err != nil {
+ logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err))
+ <-time.After(time.Duration(RegistryConnDelay) * time.Second)
+ continue
+ }
for {
+
serviceEvent, err := listener.Next()
+
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
@@ -172,6 +185,7 @@
}
logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
+ watcher.startWatch()
}
}
}
diff --git a/registry/polaris/registry_test.go b/registry/polaris/registry_test.go
new file mode 100644
index 0000000..45068b0
--- /dev/null
+++ b/registry/polaris/registry_test.go
@@ -0,0 +1,64 @@
+package polaris
+
+import (
+ "reflect"
+ "sync"
+ "testing"
+)
+
+import (
+ "github.com/polarismesh/polaris-go/api"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+func Test_createDeregisterParam(t *testing.T) {
+ type args struct {
+ url *common.URL
+ serviceName string
+ }
+ tests := []struct {
+ name string
+ args args
+ want *api.InstanceDeRegisterRequest
+ }{
+ // TODO: Add test cases.
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := createDeregisterParam(tt.args.url, tt.args.serviceName); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("createDeregisterParam() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_polarisRegistry_Destroy(t *testing.T) {
+ type fields struct {
+ url *common.URL
+ provider api.ProviderAPI
+ lock *sync.RWMutex
+ registryUrls map[string]*PolarisHeartbeat
+ listenerLock *sync.RWMutex
+ }
+ tests := []struct {
+ name string
+ fields fields
+ }{
+ {
+ name: "Test_polarisRegistry_Destroy",
+ fields: fields{
+ url: nil,
+ provider: nil,
+ registryUrls: nil,
+ },
+ },
+ // TODO: Add test cases.
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ })
+ }
+}