support load subscribe instance (#2139)
diff --git a/registry/base_registry.go b/registry/base_registry.go
index 7fb38e4..0fa643c 100644
--- a/registry/base_registry.go
+++ b/registry/base_registry.go
@@ -407,6 +407,11 @@
return nil
}
+// LoadSubscribeInstances load subscribe instance
+func (r *BaseRegistry) LoadSubscribeInstances(url *common.URL, notify NotifyListener) error {
+ return r.facadeBasedRegistry.LoadSubscribeInstances(url, notify)
+}
+
// closeRegisters close and remove registry client and reset services map
func (r *BaseRegistry) closeRegisters() {
logger.Infof("begin to close provider client")
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 5f1f12d..b975462 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -91,6 +91,12 @@
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
+ dir.consumerConfigurationListener.addNotifyListener(dir)
+ dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
+
+ if err := dir.registry.LoadSubscribeInstances(url.SubURL, dir); err != nil {
+ return nil, err
+ }
go dir.subscribe(url.SubURL)
return dir, nil
@@ -99,8 +105,6 @@
// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key())
- dir.consumerConfigurationListener.addNotifyListener(dir)
- dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
if err := dir.registry.Subscribe(url, dir); err != nil {
logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
}
diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go
index 9a86bef..42f7e4b 100644
--- a/registry/etcdv3/registry.go
+++ b/registry/etcdv3/registry.go
@@ -168,6 +168,11 @@
return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry")
}
+// LoadSubscribeInstances load subscribe instance
+func (r *etcdV3Registry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error {
+ return nil
+}
+
func (r *etcdV3Registry) handleClientRestart() {
r.WaitGroup().Add(1)
go etcdv3.HandleClientRestart(r)
diff --git a/registry/mock_registry.go b/registry/mock_registry.go
index 8f4505c..5407dc6 100644
--- a/registry/mock_registry.go
+++ b/registry/mock_registry.go
@@ -131,6 +131,11 @@
return nil
}
+// LoadSubscribeInstances load subscribe instance
+func (r *MockRegistry) LoadSubscribeInstances(_ *common.URL, _ NotifyListener) error {
+ return nil
+}
+
type listener struct {
count int64
registry *MockRegistry
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index 46da4bc..edf44c1 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -19,6 +19,7 @@
import (
"bytes"
+ "fmt"
"strconv"
"strings"
"time"
@@ -38,6 +39,7 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)
@@ -207,6 +209,27 @@
return nil
}
+// LoadSubscribeInstances load subscribe instance
+func (nr *nacosRegistry) LoadSubscribeInstances(url *common.URL, notify registry.NotifyListener) error {
+ serviceName := getSubscribeName(url)
+ groupName := nr.GetURL().GetParam(constant.RegistryGroupKey, defaultGroup)
+ instances, err := nr.namingClient.Client().SelectAllInstances(vo.SelectAllInstancesParam{
+ ServiceName: serviceName,
+ GroupName: groupName,
+ })
+ if err != nil {
+ return perrors.New(fmt.Sprintf("could not query the instances for serviceName=%s,groupName=%s,error=%v",
+ serviceName, groupName, err))
+ }
+
+ for i := range instances {
+ if newUrl := generateUrl(instances[i]); newUrl != nil {
+ notify.Notify(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: newUrl})
+ }
+ }
+ return nil
+}
+
func createSubscribeParam(url, regUrl *common.URL, cb callback) *vo.SubscribeParam {
serviceName := getSubscribeName(url)
groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
diff --git a/registry/polaris/registry.go b/registry/polaris/registry.go
index 02f1567..0533f77 100644
--- a/registry/polaris/registry.go
+++ b/registry/polaris/registry.go
@@ -18,6 +18,7 @@
package polaris
import (
+ "fmt"
"strconv"
"sync"
"time"
@@ -37,6 +38,7 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/polaris"
)
@@ -170,6 +172,27 @@
return perrors.New("UnSubscribe not support in polarisRegistry")
}
+// LoadSubscribeInstances load subscribe instance
+func (pr *polarisRegistry) LoadSubscribeInstances(url *common.URL, notify registry.NotifyListener) error {
+ serviceName := getSubscribeName(url)
+ resp, err := pr.consumer.GetAllInstances(&api.GetAllInstancesRequest{
+ GetAllInstancesRequest: model.GetAllInstancesRequest{
+ Service: serviceName,
+ Namespace: pr.namespace,
+ },
+ })
+ if err != nil {
+ return perrors.New(fmt.Sprintf("could not query the instances for serviceName=%s,namespace=%s,error=%v",
+ serviceName, pr.namespace, err))
+ }
+ for i := range resp.Instances {
+ if newUrl := generateUrl(resp.Instances[i]); newUrl != nil {
+ notify.Notify(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: newUrl})
+ }
+ }
+ return nil
+}
+
// GetURL returns polaris registry's url.
func (pr *polarisRegistry) GetURL() *common.URL {
return pr.url
diff --git a/registry/registry.go b/registry/registry.go
index f95f8d7..d084ac7 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -58,6 +58,12 @@
// consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
// listener A listener of the change event, not allowed to be empty
UnSubscribe(*common.URL, NotifyListener) error
+
+ // LoadSubscribeInstances Because the subscription is asynchronous,
+ // it may cause the consumer to fail to obtain the provider.
+ // so sync load the instance of the preparing to subscribe service before
+ // formally subscribing.
+ LoadSubscribeInstances(*common.URL, NotifyListener) error
}
// nolint
diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go
index ffc0744..a967118 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -229,6 +229,11 @@
return nil
}
+// LoadSubscribeInstances load subscribe instance
+func (s *serviceDiscoveryRegistry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error {
+ return nil
+}
+
func getUrlKey(url *common.URL) string {
var bf bytes.Buffer
if len(url.Protocol) != 0 {
diff --git a/registry/xds/registry.go b/registry/xds/registry.go
index f8d51cc..b988a3c 100644
--- a/registry/xds/registry.go
+++ b/registry/xds/registry.go
@@ -127,6 +127,11 @@
return nil
}
+// LoadSubscribeInstances load subscribe instance
+func (nr *xdsRegistry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error {
+ return nil
+}
+
// GetURL gets its registration URL
func (nr *xdsRegistry) GetURL() *common.URL {
return nr.registryURL
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index c3cb613..b56c394 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -174,6 +174,11 @@
return r.getCloseListener(conf)
}
+// LoadSubscribeInstances load subscribe instance
+func (r *zkRegistry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error {
+ return nil
+}
+
// CloseAndNilClient closes listeners and clear client
func (r *zkRegistry) CloseAndNilClient() {
r.listener.Close()
diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go
index dfc3a0d..faafacf 100644
--- a/registry/zookeeper/service_discovery.go
+++ b/registry/zookeeper/service_discovery.go
@@ -180,7 +180,7 @@
}
iss := make([]registry.ServiceInstance, 0, len(criss))
for _, cris := range criss {
- iss = append(iss, zksd.toZookeeperInstance(cris))
+ iss = append(iss, toZookeeperInstance(cris))
}
return iss
}
@@ -304,7 +304,7 @@
}
// toZookeeperInstance convert to registry's service instance
-func (zksd *zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
+func toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
pl, ok := cris.Payload.(map[string]interface{})
if !ok {
logger.Errorf("[zkServiceDiscovery] toZookeeperInstance{%s} payload is not map[string]interface{}", cris.ID)