Multiple registries (#10323)

diff --git a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
index 0b1e5d9..7a24a40 100644
--- a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
+++ b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
@@ -19,7 +19,10 @@
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.Registry;
 import org.apache.dubbo.registry.RegistryFactory;
@@ -35,15 +38,18 @@
 import java.util.stream.Collectors;
 
 import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR;
 import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
 
 /**
  * MultipleRegistry
  */
 public class MultipleRegistry extends AbstractRegistry {
+    public static final Logger LOGGER = LoggerFactory.getLogger(MultipleRegistry.class);
 
     public static final String REGISTRY_FOR_SERVICE = "service-registry";
     public static final String REGISTRY_FOR_REFERENCE = "reference-registry";
+    public static final String REGISTRY_SEPARATOR = "separator";
     private final Map<String, Registry> serviceRegistries = new ConcurrentHashMap<>(4);
     private final Map<String, Registry> referenceRegistries = new ConcurrentHashMap<>(4);
     private final Map<NotifyListener, MultipleNotifyListenerWrapper> multipleNotifyListenerMap = new ConcurrentHashMap<>(32);
@@ -86,7 +92,9 @@
     }
 
     protected void initServiceRegistry(URL url, Map<String, Registry> registryMap) {
-        origServiceRegistryURLs = url.getParameter(REGISTRY_FOR_SERVICE, new ArrayList<>());
+        String serviceRegistryString = url.getParameter(REGISTRY_FOR_SERVICE);
+        char separator = url.getParameter(REGISTRY_SEPARATOR, COMMA_SEPARATOR).charAt(0);
+        origServiceRegistryURLs = StringUtils.splitToList(serviceRegistryString, separator);
         effectServiceRegistryURLs = this.filterServiceRegistry(origServiceRegistryURLs);
         for (String tmpUrl : effectServiceRegistryURLs) {
             if (registryMap.get(tmpUrl) != null) {
@@ -101,7 +109,9 @@
     }
 
     protected void initReferenceRegistry(URL url, Map<String, Registry> registryMap) {
-        origReferenceRegistryURLs = url.getParameter(REGISTRY_FOR_REFERENCE, new ArrayList<>());
+        String serviceRegistryString = url.getParameter(REGISTRY_FOR_REFERENCE);
+        char separator = url.getParameter(REGISTRY_SEPARATOR, COMMA_SEPARATOR).charAt(0);
+        origReferenceRegistryURLs = StringUtils.splitToList(serviceRegistryString, separator);
         effectReferenceRegistryURLs = this.filterReferenceRegistry(origReferenceRegistryURLs);
         for (String tmpUrl : effectReferenceRegistryURLs) {
             if (registryMap.get(tmpUrl) != null) {
@@ -292,15 +302,56 @@
                     }
                     continue;
                 }
-                notifyURLs.addAll(tmpUrls);
+                URL registryURL = singleNotifyListener.getRegistry().getUrl();
+                aggregateRegistryUrls(notifyURLs, tmpUrls, registryURL);
             }
             // if no notify URL, add empty protocol URL
             if (emptyURL != null && notifyURLs.isEmpty()) {
                 notifyURLs.add(emptyURL);
+                LOGGER.info("No provider after aggregation, notify url with EMPTY protocol.");
+            } else {
+                LOGGER.info("Aggregated provider url size " + notifyURLs.size());
             }
+
             this.notify(notifyURLs);
         }
 
+        /**
+         * Aggregate urls from different registries into one unified list while appending registry specific 'attachments' into each url.
+         *
+         * These 'attachments' can be very useful for traffic management among registries.
+         *
+         * @param notifyURLs unified url list
+         * @param singleURLs single registry url list
+         * @param registryURL single registry configuration url
+         */
+        public static void aggregateRegistryUrls(List<URL> notifyURLs, List<URL> singleURLs, URL registryURL) {
+            String registryAttachments = registryURL.getParameter("attachments");
+            if (StringUtils.isNotBlank(registryAttachments)) {
+                LOGGER.info("Registry attachments " + registryAttachments + " found, will append to provider urls, urls size " + singleURLs.size());
+                String[] pairs = registryAttachments.split(COMMA_SEPARATOR);
+                Map<String, String> attachments = new HashMap<>(pairs.length);
+                for (String rawPair : pairs) {
+                   String[] keyValuePair = rawPair.split("=");
+                   if (keyValuePair.length == 2) {
+                       String key = keyValuePair[0];
+                       String value = keyValuePair[1];
+                       attachments.put(key, value);
+                   }
+                }
+
+                for (URL tmpUrl : singleURLs) {
+                    for (Map.Entry<String, String> entry : attachments.entrySet()) {
+                        tmpUrl = tmpUrl.addParameterIfAbsent(entry.getKey(), entry.getValue());
+                    }
+                    notifyURLs.add(tmpUrl);
+                }
+            } else {
+                LOGGER.info("Single registry " + registryURL + " has url size " + singleURLs.size());
+                notifyURLs.addAll(singleURLs);
+            }
+        }
+
         @Override
         public void notify(List<URL> urls) {
             sourceNotifyListener.notify(urls);
@@ -338,5 +389,11 @@
         public List<URL> getUrlList() {
             return urlList;
         }
+
+        public Registry getRegistry() {
+            return registry;
+        }
+
+
     }
 }
diff --git a/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java b/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
index 441956a..dfa04ec 100644
--- a/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
+++ b/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
@@ -175,4 +175,23 @@
         Assertions.assertEquals("empty", list.get(0).getProtocol());
     }
 
+    @Test
+    public void testAggregation() {
+        List<URL> result = new ArrayList<URL>();
+        List<URL> listToAggregate = new ArrayList<URL>();
+        URL url1= URL.valueOf("dubbo://127.0.0.1:20880/service1");
+        URL url2= URL.valueOf("dubbo://127.0.0.1:20880/service1");
+        listToAggregate.add(url1);
+        listToAggregate.add(url2);
+
+        URL registryURL = URL.valueOf("mock://127.0.0.1/RegistryService?attachments=zone=hangzhou,tag=middleware&enable-empty-protection=false");
+
+        MultipleRegistry.MultipleNotifyListenerWrapper.aggregateRegistryUrls(result, listToAggregate, registryURL);
+
+        Assertions.assertEquals(2, result.size());
+        Assertions.assertEquals(2, result.get(0).getParameters().size());
+        Assertions.assertEquals("hangzhou", result.get(0).getParameter("zone"));
+        Assertions.assertEquals("middleware", result.get(1).getParameter("tag"));
+    }
+
 }