support load subscribe instance (#2139)

diff --git a/registry/base_registry.go b/registry/base_registry.go
index 7fb38e4..0fa643c 100644
--- a/registry/base_registry.go
+++ b/registry/base_registry.go
@@ -407,6 +407,11 @@
 	return nil
 }
 
+// LoadSubscribeInstances load subscribe instance
+func (r *BaseRegistry) LoadSubscribeInstances(url *common.URL, notify NotifyListener) error {
+	return r.facadeBasedRegistry.LoadSubscribeInstances(url, notify)
+}
+
 // closeRegisters close and remove registry client and reset services map
 func (r *BaseRegistry) closeRegisters() {
 	logger.Infof("begin to close provider client")
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 5f1f12d..b975462 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -91,6 +91,12 @@
 	}
 
 	dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
+	dir.consumerConfigurationListener.addNotifyListener(dir)
+	dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
+
+	if err := dir.registry.LoadSubscribeInstances(url.SubURL, dir); err != nil {
+		return nil, err
+	}
 
 	go dir.subscribe(url.SubURL)
 	return dir, nil
@@ -99,8 +105,6 @@
 // subscribe from registry
 func (dir *RegistryDirectory) subscribe(url *common.URL) {
 	logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key())
-	dir.consumerConfigurationListener.addNotifyListener(dir)
-	dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
 	if err := dir.registry.Subscribe(url, dir); err != nil {
 		logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
 	}
diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go
index 9a86bef..42f7e4b 100644
--- a/registry/etcdv3/registry.go
+++ b/registry/etcdv3/registry.go
@@ -168,6 +168,11 @@
 	return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry")
 }
 
+// LoadSubscribeInstances load subscribe instance
+func (r *etcdV3Registry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error {
+	return nil
+}
+
 func (r *etcdV3Registry) handleClientRestart() {
 	r.WaitGroup().Add(1)
 	go etcdv3.HandleClientRestart(r)
diff --git a/registry/mock_registry.go b/registry/mock_registry.go
index 8f4505c..5407dc6 100644
--- a/registry/mock_registry.go
+++ b/registry/mock_registry.go
@@ -131,6 +131,11 @@
 	return nil
 }
 
+// LoadSubscribeInstances load subscribe instance
+func (r *MockRegistry) LoadSubscribeInstances(_ *common.URL, _ NotifyListener) error {
+	return nil
+}
+
 type listener struct {
 	count      int64
 	registry   *MockRegistry
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index 46da4bc..edf44c1 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -19,6 +19,7 @@
 
 import (
 	"bytes"
+	"fmt"
 	"strconv"
 	"strings"
 	"time"
@@ -38,6 +39,7 @@
 	"dubbo.apache.org/dubbo-go/v3/common/constant"
 	"dubbo.apache.org/dubbo-go/v3/common/extension"
 	"dubbo.apache.org/dubbo-go/v3/registry"
+	"dubbo.apache.org/dubbo-go/v3/remoting"
 	"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
 )
 
@@ -207,6 +209,27 @@
 	return nil
 }
 
+// LoadSubscribeInstances load subscribe instance
+func (nr *nacosRegistry) LoadSubscribeInstances(url *common.URL, notify registry.NotifyListener) error {
+	serviceName := getSubscribeName(url)
+	groupName := nr.GetURL().GetParam(constant.RegistryGroupKey, defaultGroup)
+	instances, err := nr.namingClient.Client().SelectAllInstances(vo.SelectAllInstancesParam{
+		ServiceName: serviceName,
+		GroupName:   groupName,
+	})
+	if err != nil {
+		return perrors.New(fmt.Sprintf("could not query the instances for serviceName=%s,groupName=%s,error=%v",
+			serviceName, groupName, err))
+	}
+
+	for i := range instances {
+		if newUrl := generateUrl(instances[i]); newUrl != nil {
+			notify.Notify(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: newUrl})
+		}
+	}
+	return nil
+}
+
 func createSubscribeParam(url, regUrl *common.URL, cb callback) *vo.SubscribeParam {
 	serviceName := getSubscribeName(url)
 	groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
diff --git a/registry/polaris/registry.go b/registry/polaris/registry.go
index 02f1567..0533f77 100644
--- a/registry/polaris/registry.go
+++ b/registry/polaris/registry.go
@@ -18,6 +18,7 @@
 package polaris
 
 import (
+	"fmt"
 	"strconv"
 	"sync"
 	"time"
@@ -37,6 +38,7 @@
 	"dubbo.apache.org/dubbo-go/v3/common/constant"
 	"dubbo.apache.org/dubbo-go/v3/common/extension"
 	"dubbo.apache.org/dubbo-go/v3/registry"
+	"dubbo.apache.org/dubbo-go/v3/remoting"
 	"dubbo.apache.org/dubbo-go/v3/remoting/polaris"
 )
 
@@ -170,6 +172,27 @@
 	return perrors.New("UnSubscribe not support in polarisRegistry")
 }
 
+// LoadSubscribeInstances load subscribe instance
+func (pr *polarisRegistry) LoadSubscribeInstances(url *common.URL, notify registry.NotifyListener) error {
+	serviceName := getSubscribeName(url)
+	resp, err := pr.consumer.GetAllInstances(&api.GetAllInstancesRequest{
+		GetAllInstancesRequest: model.GetAllInstancesRequest{
+			Service:   serviceName,
+			Namespace: pr.namespace,
+		},
+	})
+	if err != nil {
+		return perrors.New(fmt.Sprintf("could not query the instances for serviceName=%s,namespace=%s,error=%v",
+			serviceName, pr.namespace, err))
+	}
+	for i := range resp.Instances {
+		if newUrl := generateUrl(resp.Instances[i]); newUrl != nil {
+			notify.Notify(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: newUrl})
+		}
+	}
+	return nil
+}
+
 // GetURL returns polaris registry's url.
 func (pr *polarisRegistry) GetURL() *common.URL {
 	return pr.url
diff --git a/registry/registry.go b/registry/registry.go
index f95f8d7..d084ac7 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -58,6 +58,12 @@
 	// consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
 	// listener A listener of the change event, not allowed to be empty
 	UnSubscribe(*common.URL, NotifyListener) error
+
+	// LoadSubscribeInstances Because the subscription is asynchronous,
+	// it may cause the consumer to fail to obtain the provider.
+	// so sync load the instance of the preparing to subscribe service before
+	// formally subscribing.
+	LoadSubscribeInstances(*common.URL, NotifyListener) error
 }
 
 // nolint
diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go
index ffc0744..a967118 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -229,6 +229,11 @@
 	return nil
 }
 
+// LoadSubscribeInstances load subscribe instance
+func (s *serviceDiscoveryRegistry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error {
+	return nil
+}
+
 func getUrlKey(url *common.URL) string {
 	var bf bytes.Buffer
 	if len(url.Protocol) != 0 {
diff --git a/registry/xds/registry.go b/registry/xds/registry.go
index f8d51cc..b988a3c 100644
--- a/registry/xds/registry.go
+++ b/registry/xds/registry.go
@@ -127,6 +127,11 @@
 	return nil
 }
 
+// LoadSubscribeInstances load subscribe instance
+func (nr *xdsRegistry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error {
+	return nil
+}
+
 // GetURL gets its registration URL
 func (nr *xdsRegistry) GetURL() *common.URL {
 	return nr.registryURL
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index c3cb613..b56c394 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -174,6 +174,11 @@
 	return r.getCloseListener(conf)
 }
 
+// LoadSubscribeInstances load subscribe instance
+func (r *zkRegistry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error {
+	return nil
+}
+
 // CloseAndNilClient closes listeners and clear client
 func (r *zkRegistry) CloseAndNilClient() {
 	r.listener.Close()
diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go
index dfc3a0d..faafacf 100644
--- a/registry/zookeeper/service_discovery.go
+++ b/registry/zookeeper/service_discovery.go
@@ -180,7 +180,7 @@
 	}
 	iss := make([]registry.ServiceInstance, 0, len(criss))
 	for _, cris := range criss {
-		iss = append(iss, zksd.toZookeeperInstance(cris))
+		iss = append(iss, toZookeeperInstance(cris))
 	}
 	return iss
 }
@@ -304,7 +304,7 @@
 }
 
 // toZookeeperInstance convert to registry's service instance
-func (zksd *zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
+func toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
 	pl, ok := cris.Payload.(map[string]interface{})
 	if !ok {
 		logger.Errorf("[zkServiceDiscovery] toZookeeperInstance{%s} payload is not map[string]interface{}", cris.ID)