Merge branch 'apache:3.0' into 3.0
diff --git a/registry/polaris/core.go b/registry/polaris/core.go
index e87fac7..7e9f304 100644
--- a/registry/polaris/core.go
+++ b/registry/polaris/core.go
@@ -32,13 +32,13 @@
 	"dubbo.apache.org/dubbo-go/v3/remoting"
 )
 
-type subscriber func(remoting.EventType, []model.Instance)
+type item func(remoting.EventType, []model.Instance)
 
 type PolarisServiceWatcher struct {
 	consumer       api.ConsumerAPI
 	subscribeParam *api.WatchServiceRequest
 	lock           *sync.RWMutex
-	subscribers    []subscriber
+	subscribers    []item
 	execOnce       *sync.Once
 }
 
@@ -48,7 +48,7 @@
 		subscribeParam: param,
 		consumer:       consumer,
 		lock:           &sync.RWMutex{},
-		subscribers:    make([]subscriber, 0),
+		subscribers:    make([]item, 0),
 		execOnce:       &sync.Once{},
 	}
 	return watcher, nil
diff --git a/registry/polaris/core_test.go b/registry/polaris/core_test.go
new file mode 100644
index 0000000..26a573c
--- /dev/null
+++ b/registry/polaris/core_test.go
@@ -0,0 +1,51 @@
+package polaris
+
+import (
+	"sync"
+	"testing"
+)
+
+import (
+	"github.com/polarismesh/polaris-go/api"
+	"github.com/polarismesh/polaris-go/pkg/model"
+
+	"github.com/stretchr/testify/assert"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+func TestPolarisServiceWatcher_AddSubscriber(t *testing.T) {
+	var (
+		newParam    api.WatchServiceRequest
+		newConsumer api.ConsumerAPI
+	)
+	type fields struct {
+		consumer       api.ConsumerAPI
+		subscribeParam *api.WatchServiceRequest
+		lock           *sync.RWMutex
+		subscribers    []item
+		execOnce       *sync.Once
+	}
+	type args struct {
+		subscriber func(remoting.EventType, []model.Instance)
+	}
+	var tests []struct {
+		name   string
+		fields fields
+		args   args
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			watcher := &PolarisServiceWatcher{
+				subscribeParam: &newParam,
+				consumer:       newConsumer,
+				lock:           &sync.RWMutex{},
+				subscribers:    make([]item, 0),
+				execOnce:       &sync.Once{},
+			}
+			assert.Empty(t, watcher)
+		})
+	}
+}
diff --git a/registry/polaris/listener.go b/registry/polaris/listener.go
index c792760..c949f29 100644
--- a/registry/polaris/listener.go
+++ b/registry/polaris/listener.go
@@ -51,6 +51,7 @@
 		events:    gxchan.NewUnboundedChan(32),
 		closeCh:   make(chan struct{}),
 	}
+
 	return listener, nil
 }
 
diff --git a/registry/polaris/registry.go b/registry/polaris/registry.go
index 0ead232..255acdb 100644
--- a/registry/polaris/registry.go
+++ b/registry/polaris/registry.go
@@ -150,6 +150,11 @@
 
 // Subscribe returns nil if subscribing registry successfully. If not returns an error.
 func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
+	var (
+		newParam    api.WatchServiceRequest
+		newConsumer api.ConsumerAPI
+	)
+
 	role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
 	if role != common.CONSUMER {
 		return nil
@@ -163,8 +168,16 @@
 			continue
 		}
 
+		watcher, err := newPolarisWatcher(&newParam, newConsumer)
+		if err != nil {
+			logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err))
+			<-time.After(time.Duration(RegistryConnDelay) * time.Second)
+			continue
+		}
 		for {
+
 			serviceEvent, err := listener.Next()
+
 			if err != nil {
 				logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
 				listener.Close()
@@ -172,6 +185,7 @@
 			}
 			logger.Infof("update begin, service event: %v", serviceEvent.String())
 			notifyListener.Notify(serviceEvent)
+			watcher.startWatch()
 		}
 	}
 }
diff --git a/registry/polaris/registry_test.go b/registry/polaris/registry_test.go
new file mode 100644
index 0000000..45068b0
--- /dev/null
+++ b/registry/polaris/registry_test.go
@@ -0,0 +1,64 @@
+package polaris
+
+import (
+	"reflect"
+	"sync"
+	"testing"
+)
+
+import (
+	"github.com/polarismesh/polaris-go/api"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common"
+)
+
+func Test_createDeregisterParam(t *testing.T) {
+	type args struct {
+		url         *common.URL
+		serviceName string
+	}
+	tests := []struct {
+		name string
+		args args
+		want *api.InstanceDeRegisterRequest
+	}{
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := createDeregisterParam(tt.args.url, tt.args.serviceName); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("createDeregisterParam() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_polarisRegistry_Destroy(t *testing.T) {
+	type fields struct {
+		url          *common.URL
+		provider     api.ProviderAPI
+		lock         *sync.RWMutex
+		registryUrls map[string]*PolarisHeartbeat
+		listenerLock *sync.RWMutex
+	}
+	tests := []struct {
+		name   string
+		fields fields
+	}{
+		{
+			name: "Test_polarisRegistry_Destroy",
+			fields: fields{
+				url:          nil,
+				provider:     nil,
+				registryUrls: nil,
+			},
+		},
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+		})
+	}
+}