Multiple registries (#10323)
diff --git a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
index 0b1e5d9..7a24a40 100644
--- a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
+++ b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
@@ -19,7 +19,10 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryFactory;
@@ -35,15 +38,18 @@
import java.util.stream.Collectors;
import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
/**
* MultipleRegistry
*/
public class MultipleRegistry extends AbstractRegistry {
+ public static final Logger LOGGER = LoggerFactory.getLogger(MultipleRegistry.class);
public static final String REGISTRY_FOR_SERVICE = "service-registry";
public static final String REGISTRY_FOR_REFERENCE = "reference-registry";
+ public static final String REGISTRY_SEPARATOR = "separator";
private final Map<String, Registry> serviceRegistries = new ConcurrentHashMap<>(4);
private final Map<String, Registry> referenceRegistries = new ConcurrentHashMap<>(4);
private final Map<NotifyListener, MultipleNotifyListenerWrapper> multipleNotifyListenerMap = new ConcurrentHashMap<>(32);
@@ -86,7 +92,9 @@
}
protected void initServiceRegistry(URL url, Map<String, Registry> registryMap) {
- origServiceRegistryURLs = url.getParameter(REGISTRY_FOR_SERVICE, new ArrayList<>());
+ String serviceRegistryString = url.getParameter(REGISTRY_FOR_SERVICE);
+ char separator = url.getParameter(REGISTRY_SEPARATOR, COMMA_SEPARATOR).charAt(0);
+ origServiceRegistryURLs = StringUtils.splitToList(serviceRegistryString, separator);
effectServiceRegistryURLs = this.filterServiceRegistry(origServiceRegistryURLs);
for (String tmpUrl : effectServiceRegistryURLs) {
if (registryMap.get(tmpUrl) != null) {
@@ -101,7 +109,9 @@
}
protected void initReferenceRegistry(URL url, Map<String, Registry> registryMap) {
- origReferenceRegistryURLs = url.getParameter(REGISTRY_FOR_REFERENCE, new ArrayList<>());
+ String serviceRegistryString = url.getParameter(REGISTRY_FOR_REFERENCE);
+ char separator = url.getParameter(REGISTRY_SEPARATOR, COMMA_SEPARATOR).charAt(0);
+ origReferenceRegistryURLs = StringUtils.splitToList(serviceRegistryString, separator);
effectReferenceRegistryURLs = this.filterReferenceRegistry(origReferenceRegistryURLs);
for (String tmpUrl : effectReferenceRegistryURLs) {
if (registryMap.get(tmpUrl) != null) {
@@ -292,15 +302,56 @@
}
continue;
}
- notifyURLs.addAll(tmpUrls);
+ URL registryURL = singleNotifyListener.getRegistry().getUrl();
+ aggregateRegistryUrls(notifyURLs, tmpUrls, registryURL);
}
// if no notify URL, add empty protocol URL
if (emptyURL != null && notifyURLs.isEmpty()) {
notifyURLs.add(emptyURL);
+ LOGGER.info("No provider after aggregation, notify url with EMPTY protocol.");
+ } else {
+ LOGGER.info("Aggregated provider url size " + notifyURLs.size());
}
+
this.notify(notifyURLs);
}
+ /**
+ * Aggregate urls from different registries into one unified list while appending registry specific 'attachments' into each url.
+ *
+ * These 'attachments' can be very useful for traffic management among registries.
+ *
+ * @param notifyURLs unified url list
+ * @param singleURLs single registry url list
+ * @param registryURL single registry configuration url
+ */
+ public static void aggregateRegistryUrls(List<URL> notifyURLs, List<URL> singleURLs, URL registryURL) {
+ String registryAttachments = registryURL.getParameter("attachments");
+ if (StringUtils.isNotBlank(registryAttachments)) {
+ LOGGER.info("Registry attachments " + registryAttachments + " found, will append to provider urls, urls size " + singleURLs.size());
+ String[] pairs = registryAttachments.split(COMMA_SEPARATOR);
+ Map<String, String> attachments = new HashMap<>(pairs.length);
+ for (String rawPair : pairs) {
+ String[] keyValuePair = rawPair.split("=");
+ if (keyValuePair.length == 2) {
+ String key = keyValuePair[0];
+ String value = keyValuePair[1];
+ attachments.put(key, value);
+ }
+ }
+
+ for (URL tmpUrl : singleURLs) {
+ for (Map.Entry<String, String> entry : attachments.entrySet()) {
+ tmpUrl = tmpUrl.addParameterIfAbsent(entry.getKey(), entry.getValue());
+ }
+ notifyURLs.add(tmpUrl);
+ }
+ } else {
+ LOGGER.info("Single registry " + registryURL + " has url size " + singleURLs.size());
+ notifyURLs.addAll(singleURLs);
+ }
+ }
+
@Override
public void notify(List<URL> urls) {
sourceNotifyListener.notify(urls);
@@ -338,5 +389,11 @@
public List<URL> getUrlList() {
return urlList;
}
+
+ public Registry getRegistry() {
+ return registry;
+ }
+
+
}
}
diff --git a/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java b/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
index 441956a..dfa04ec 100644
--- a/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
+++ b/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
@@ -175,4 +175,23 @@
Assertions.assertEquals("empty", list.get(0).getProtocol());
}
+ @Test
+ public void testAggregation() {
+ List<URL> result = new ArrayList<URL>();
+ List<URL> listToAggregate = new ArrayList<URL>();
+ URL url1= URL.valueOf("dubbo://127.0.0.1:20880/service1");
+ URL url2= URL.valueOf("dubbo://127.0.0.1:20880/service1");
+ listToAggregate.add(url1);
+ listToAggregate.add(url2);
+
+ URL registryURL = URL.valueOf("mock://127.0.0.1/RegistryService?attachments=zone=hangzhou,tag=middleware&enable-empty-protection=false");
+
+ MultipleRegistry.MultipleNotifyListenerWrapper.aggregateRegistryUrls(result, listToAggregate, registryURL);
+
+ Assertions.assertEquals(2, result.size());
+ Assertions.assertEquals(2, result.get(0).getParameters().size());
+ Assertions.assertEquals("hangzhou", result.get(0).getParameter("zone"));
+ Assertions.assertEquals("middleware", result.get(1).getParameter("tag"));
+ }
+
}