Reduce memory allocation during address change notification. (#5613)
1. Merge registry notification events happened in a certain short interval.
2. cache URL instances to reduce string -> URL recreation
3. reduce allocation during export and refer at start up.
4. add frozen status for URL
5. use URLBuilder for unfrozen URLs
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
index 3e5f1d2..2634461 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
@@ -17,29 +17,22 @@
package org.apache.dubbo.rpc.cluster.support;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.remoting.Constants;
-import java.util.HashMap;
import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.ALIVE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CORE_THREADS_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX;
-import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INVOKER_LISTENER_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TAG_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.utils.CollectionUtils.isNotEmptyMap;
+import static org.apache.dubbo.common.utils.StringUtils.isNotEmpty;
+import static org.apache.dubbo.remoting.Constants.TRANSPORTER_KEY;
/**
* ClusterUtils
@@ -50,71 +43,40 @@
}
public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {
- Map<String, String> map = new HashMap<String, String>();
Map<String, String> remoteMap = remoteUrl.getParameters();
- if (remoteMap != null && remoteMap.size() > 0) {
- map.putAll(remoteMap);
-
- // Remove configurations from provider, some items should be affected by provider.
- map.remove(THREAD_NAME_KEY);
- map.remove(DEFAULT_KEY_PREFIX + THREAD_NAME_KEY);
-
- map.remove(THREADPOOL_KEY);
- map.remove(DEFAULT_KEY_PREFIX + THREADPOOL_KEY);
-
- map.remove(CORE_THREADS_KEY);
- map.remove(DEFAULT_KEY_PREFIX + CORE_THREADS_KEY);
-
- map.remove(THREADS_KEY);
- map.remove(DEFAULT_KEY_PREFIX + THREADS_KEY);
-
- map.remove(QUEUES_KEY);
- map.remove(DEFAULT_KEY_PREFIX + QUEUES_KEY);
-
- map.remove(ALIVE_KEY);
- map.remove(DEFAULT_KEY_PREFIX + ALIVE_KEY);
-
- map.remove(Constants.TRANSPORTER_KEY);
- map.remove(DEFAULT_KEY_PREFIX + Constants.TRANSPORTER_KEY);
+ if (remoteMap == null || remoteMap.size() <= 0) {
+ return remoteUrl.addParameters(localMap);
}
- if (localMap != null && localMap.size() > 0) {
- Map<String, String> copyOfLocalMap = new HashMap<>(localMap);
+ // Remove configurations from provider, some items should not being affected by provider.
+ remoteMap.remove(THREAD_NAME_KEY);
+ remoteMap.remove(THREADPOOL_KEY);
+ remoteMap.remove(CORE_THREADS_KEY);
+ remoteMap.remove(THREADS_KEY);
+ remoteMap.remove(QUEUES_KEY);
+ remoteMap.remove(ALIVE_KEY);
+ remoteMap.remove(TRANSPORTER_KEY);
- if(map.containsKey(GROUP_KEY)){
- copyOfLocalMap.remove(GROUP_KEY);
- }
- if(map.containsKey(VERSION_KEY)){
- copyOfLocalMap.remove(VERSION_KEY);
- }
+ remoteMap.put(REMOTE_APPLICATION_KEY, remoteMap.get(APPLICATION_KEY));
- copyOfLocalMap.remove(RELEASE_KEY);
- copyOfLocalMap.remove(DUBBO_VERSION_KEY);
- copyOfLocalMap.remove(METHODS_KEY);
- copyOfLocalMap.remove(TIMESTAMP_KEY);
- copyOfLocalMap.remove(TAG_KEY);
-
- map.putAll(copyOfLocalMap);
-
- map.put(REMOTE_APPLICATION_KEY, remoteMap.get(APPLICATION_KEY));
+ if (isNotEmptyMap(localMap)) {
+ remoteMap.putAll(localMap);
// Combine filters and listeners on Provider and Consumer
String remoteFilter = remoteMap.get(REFERENCE_FILTER_KEY);
- String localFilter = copyOfLocalMap.get(REFERENCE_FILTER_KEY);
- if (remoteFilter != null && remoteFilter.length() > 0
- && localFilter != null && localFilter.length() > 0) {
- map.put(REFERENCE_FILTER_KEY, remoteFilter + "," + localFilter);
+ String localFilter = localMap.get(REFERENCE_FILTER_KEY);
+ if (isNotEmpty(remoteFilter) && isNotEmpty(localFilter)) {
+ remoteMap.put(REFERENCE_FILTER_KEY, remoteFilter + "," + localFilter);
}
String remoteListener = remoteMap.get(INVOKER_LISTENER_KEY);
- String localListener = copyOfLocalMap.get(INVOKER_LISTENER_KEY);
- if (remoteListener != null && remoteListener.length() > 0
- && localListener != null && localListener.length() > 0) {
- map.put(INVOKER_LISTENER_KEY, remoteListener + "," + localListener);
+ String localListener = localMap.get(INVOKER_LISTENER_KEY);
+ if (isNotEmpty(remoteListener) && isNotEmpty(localListener)) {
+ remoteMap.put(INVOKER_LISTENER_KEY, remoteListener + "," + localListener);
}
}
- return remoteUrl.clearParameters().addParameters(map);
+ return remoteUrl;
}
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index e605643..22b5b66 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -33,7 +33,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -43,7 +42,6 @@
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
-import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.HOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
@@ -93,26 +91,26 @@
private static final long serialVersionUID = -1985165475234910535L;
- private final String protocol;
+ protected String protocol;
- private final String username;
+ protected String username;
- private final String password;
+ protected String password;
// by default, host to registry
- private final String host;
+ protected String host;
// by default, port to registry
- private final int port;
+ protected int port;
- private final String path;
+ protected String path;
- private final Map<String, String> parameters;
-
- private final Map<String, Map<String, String>> methodParameters;
+ protected Map<String, String> parameters;
// ==== cache ====
+ private volatile transient Map<String, Map<String, String>> methodParameters;
+
private volatile transient Map<String, Number> numbers;
private volatile transient Map<String, Map<String, Number>> methodNumbers;
@@ -142,7 +140,6 @@
this.address = null;
this.path = null;
this.parameters = null;
- this.methodParameters = null;
}
public URL(String protocol, String host, int port) {
@@ -184,17 +181,6 @@
int port,
String path,
Map<String, String> parameters) {
- this(protocol, username, password, host, port, path, parameters, toMethodParameters(parameters));
- }
-
- public URL(String protocol,
- String username,
- String password,
- String host,
- int port,
- String path,
- Map<String, String> parameters,
- Map<String, Map<String, String>> methodParameters) {
if (StringUtils.isEmpty(username)
&& StringUtils.isNotEmpty(password)) {
throw new IllegalArgumentException("Invalid url, password without username!");
@@ -204,7 +190,6 @@
this.password = password;
this.host = host;
this.port = Math.max(port, 0);
- this.address = getAddress(this.host, this.port);
// trim the beginning "/"
while (path != null && path.startsWith("/")) {
@@ -212,12 +197,10 @@
}
this.path = path;
if (parameters == null) {
- parameters = new HashMap<>();
+ this.parameters = new HashMap<>();
} else {
- parameters = new HashMap<>(parameters);
+ this.parameters = new HashMap<>(parameters);
}
- this.parameters = Collections.unmodifiableMap(parameters);
- this.methodParameters = Collections.unmodifiableMap(methodParameters);
}
private static String getAddress(String host, int port) {
@@ -225,135 +208,29 @@
}
/**
- * NOTICE: This method allocate too much objects, we can use {@link URLStrParser#parseDecodedStr(String)} instead.
- * <p>
- * Parse url string
+ * parse decoded url string, formatted dubbo://host:port/path?param=value, into strutted URL.
*
- * @param url URL string
- * @return URL instance
- * @see URL
+ * @param url, decoded url string
+ * @return
*/
public static URL valueOf(String url) {
- if (url == null || (url = url.trim()).length() == 0) {
- throw new IllegalArgumentException("url == null");
- }
- String protocol = null;
- String username = null;
- String password = null;
- String host = null;
- int port = 0;
- String path = null;
- Map<String, String> parameters = null;
- int i = url.indexOf('?'); // separator between body and parameters
- if (i >= 0) {
- String[] parts = url.substring(i + 1).split("&");
- parameters = new HashMap<>();
- for (String part : parts) {
- part = part.trim();
- if (part.length() > 0) {
- int j = part.indexOf('=');
- if (j >= 0) {
- String key = part.substring(0, j);
- String value = part.substring(j + 1);
- parameters.put(key, value);
- // compatible with lower versions registering "default." keys
- if (key.startsWith(DEFAULT_KEY_PREFIX)) {
- parameters.putIfAbsent(key.substring(DEFAULT_KEY_PREFIX.length()), value);
- }
- } else {
- parameters.put(part, part);
- }
- }
- }
- url = url.substring(0, i);
- }
- i = url.indexOf("://");
- if (i >= 0) {
- if (i == 0) {
- throw new IllegalStateException("url missing protocol: \"" + url + "\"");
- }
- protocol = url.substring(0, i);
- url = url.substring(i + 3);
- } else {
- // case: file:/path/to/file.txt
- i = url.indexOf(":/");
- if (i >= 0) {
- if (i == 0) {
- throw new IllegalStateException("url missing protocol: \"" + url + "\"");
- }
- protocol = url.substring(0, i);
- url = url.substring(i + 1);
- }
- }
-
- i = url.indexOf('/');
- if (i >= 0) {
- path = url.substring(i + 1);
- url = url.substring(0, i);
- }
- i = url.lastIndexOf('@');
- if (i >= 0) {
- username = url.substring(0, i);
- int j = username.indexOf(':');
- if (j >= 0) {
- password = username.substring(j + 1);
- username = username.substring(0, j);
- }
- url = url.substring(i + 1);
- }
- i = url.lastIndexOf(':');
- if (i >= 0 && i < url.length() - 1) {
- if (url.lastIndexOf('%') > i) {
- // ipv6 address with scope id
- // e.g. fe80:0:0:0:894:aeec:f37d:23e1%en0
- // see https://howdoesinternetwork.com/2013/ipv6-zone-id
- // ignore
- } else {
- port = Integer.parseInt(url.substring(i + 1));
- url = url.substring(0, i);
- }
- }
- if (url.length() > 0) {
- host = url;
- }
-
- return new URL(protocol, username, password, host, port, path, parameters);
+ return valueOf(url, false);
}
- public static Map<String, Map<String, String>> toMethodParameters(Map<String, String> parameters) {
- Map<String, Map<String, String>> methodParameters = new HashMap<>();
- if (parameters == null) {
- return methodParameters;
+ /**
+ * parse normal or encoded url string into strutted URL:
+ * - dubbo://host:port/path?param=value
+ * - URL.encode("dubbo://host:port/path?param=value")
+ *
+ * @param url, url string
+ * @param encoded, encoded or decoded
+ * @return
+ */
+ public static URL valueOf(String url, boolean encoded) {
+ if (encoded) {
+ return URLStrParser.parseEncodedStr(url, false);
}
-
- String methodsString = parameters.get(METHODS_KEY);
- if (StringUtils.isNotEmpty(methodsString)) {
- List<String> methods = StringUtils.splitToList(methodsString, ',');
- for (Map.Entry<String, String> entry : parameters.entrySet()) {
- String key = entry.getKey();
- for (int i = 0; i < methods.size(); i++) {
- String method = methods.get(i);
- int methodLen = method.length();
- if (key.length() > methodLen
- && key.startsWith(method)
- && key.charAt(methodLen) == '.') {//equals to: key.startsWith(method + '.')
- String realKey = key.substring(methodLen + 1);
- URL.putMethodParameter(method, realKey, entry.getValue(), methodParameters);
- }
- }
- }
- } else {
- for (Map.Entry<String, String> entry : parameters.entrySet()) {
- String key = entry.getKey();
- int methodSeparator = key.indexOf('.');
- if (methodSeparator > 0) {
- String method = key.substring(0, methodSeparator);
- String realKey = key.substring(methodSeparator + 1);
- URL.putMethodParameter(method, realKey, entry.getValue(), methodParameters);
- }
- }
- }
- return methodParameters;
+ return URLStrParser.parseDecodedStr(url, false);
}
public static URL valueOf(String url, String... reserveParams) {
@@ -567,6 +444,46 @@
}
public Map<String, Map<String, String>> getMethodParameters() {
+ if (methodParameters == null) {
+ methodParameters = initMethodParameters(this.parameters);
+ }
+ return methodParameters;
+ }
+
+ private void resetMethodParameters() {
+ this.methodParameters = null;
+ }
+
+ private Map<String, Map<String, String>> initMethodParameters(Map<String, String> parameters) {
+ Map<String, Map<String, String>> methodParameters = new HashMap<>();
+ if (parameters == null) {
+ return methodParameters;
+ }
+
+ String methodsString = parameters.get(METHODS_KEY);
+ if (StringUtils.isNotEmpty(methodsString)) {
+ String[] methods = methodsString.split(",");
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ String key = entry.getKey();
+ for (String method : methods) {
+ String methodPrefix = method + '.';
+ if (key.startsWith(methodPrefix)) {
+ String realKey = key.substring(methodPrefix.length());
+ URL.putMethodParameter(method, realKey, entry.getValue(), methodParameters);
+ }
+ }
+ }
+ } else {
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ String key = entry.getKey();
+ int methodSeparator = key.indexOf('.');
+ if (methodSeparator > 0) {
+ String method = key.substring(0, methodSeparator);
+ String realKey = key.substring(methodSeparator + 1);
+ URL.putMethodParameter(method, realKey, entry.getValue(), methodParameters);
+ }
+ }
+ }
return methodParameters;
}
@@ -794,7 +711,7 @@
}
public String getMethodParameter(String method, String key) {
- Map<String, String> keyMap = methodParameters.get(method);
+ Map<String, String> keyMap = getMethodParameters().get(method);
String value = null;
if (keyMap != null) {
value = keyMap.get(key);
@@ -1092,40 +1009,6 @@
return new URL(protocol, username, password, host, port, path, map);
}
-
- public URL addMethodParameter(String method, String key, String value) {
- if (StringUtils.isEmpty(method)
- || StringUtils.isEmpty(key)
- || StringUtils.isEmpty(value)) {
- return this;
- }
-
- Map<String, String> map = new HashMap<>(getParameters());
- map.put(method + "." + key, value);
- Map<String, Map<String, String>> methodMap = toMethodParameters(map);
- URL.putMethodParameter(method, key, value, methodMap);
-
- return new URL(protocol, username, password, host, port, path, map, methodMap);
- }
-
- public URL addMethodParameterIfAbsent(String method, String key, String value) {
- if (StringUtils.isEmpty(method)
- || StringUtils.isEmpty(key)
- || StringUtils.isEmpty(value)) {
- return this;
- }
- if (hasMethodParameter(method, key)) {
- return this;
- }
-
- Map<String, String> map = new HashMap<>(getParameters());
- map.put(method + "." + key, value);
- Map<String, Map<String, String>> methodMap = toMethodParameters(map);
- URL.putMethodParameter(method, key, value, methodMap);
-
- return new URL(protocol, username, password, host, port, path, map, methodMap);
- }
-
/**
* Add parameters to a new url.
*
@@ -1157,9 +1040,11 @@
return this;
}
- Map<String, String> map = new HashMap<>(getParameters());
- map.putAll(parameters);
- return new URL(protocol, username, password, host, port, path, map);
+ Map<String, String> srcParams = getParameters();
+ Map<String, String> newMap = new HashMap<>((int) ((srcParams.size() + parameters.size()) / 0.75 + 1));
+ newMap.putAll(srcParams);
+ newMap.putAll(parameters);
+ return new URL(protocol, username, password, host, port, path, newMap);
}
public URL addParametersIfAbsent(Map<String, String> parameters) {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
index 20c6c60..aad6e1c 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
@@ -24,34 +24,10 @@
import java.util.Map;
import java.util.Objects;
-public final class URLBuilder {
- private String protocol;
-
- private String username;
-
- private String password;
-
- // by default, host to registry
- private String host;
-
- // by default, port to registry
- private int port;
-
- private String path;
-
- private Map<String, String> parameters;
-
- private Map<String, Map<String, String>> methodParameters;
+public final class URLBuilder extends URL {
public URLBuilder() {
- protocol = null;
- username = null;
- password = null;
- host = null;
- port = 0;
- path = null;
- parameters = new HashMap<>();
- methodParameters = new HashMap<>();
+ super();
}
public URLBuilder(String protocol, String host, int port) {
@@ -84,24 +60,7 @@
String host,
int port,
String path, Map<String, String> parameters) {
- this(protocol, username, password, host, port, path, parameters, URL.toMethodParameters(parameters));
- }
-
- public URLBuilder(String protocol,
- String username,
- String password,
- String host,
- int port,
- String path, Map<String, String> parameters,
- Map<String, Map<String, String>> methodParameters) {
- this.protocol = protocol;
- this.username = username;
- this.password = password;
- this.host = host;
- this.port = port;
- this.path = path;
- this.parameters = parameters != null ? parameters : new HashMap<>();
- this.methodParameters = (methodParameters != null ? methodParameters : new HashMap<>());
+ super(protocol, username, password, host, port, path, parameters);
}
public static URLBuilder from(URL url) {
@@ -112,7 +71,6 @@
int port = url.getPort();
String path = url.getPath();
Map<String, String> parameters = new HashMap<>(url.getParameters());
- Map<String, Map<String, String>> methodParameters = new HashMap<>(url.getMethodParameters());
return new URLBuilder(
protocol,
username,
@@ -120,8 +78,7 @@
host,
port,
path,
- parameters,
- methodParameters);
+ parameters);
}
public URL build() {
@@ -141,13 +98,12 @@
path = path.substring(firstNonSlash);
}
}
- if (CollectionUtils.isEmptyMap(methodParameters)) {
- return new URL(protocol, username, password, host, port, path, parameters);
- } else {
- return new URL(protocol, username, password, host, port, path, parameters, methodParameters);
- }
- }
+ URL url = new URL(protocol, username, password, host, port, path);
+ url.parameters = this.parameters;
+
+ return url;
+ }
public URLBuilder setProtocol(String protocol) {
this.protocol = protocol;
@@ -263,14 +219,6 @@
return this;
}
- public URLBuilder addMethodParameter(String method, String key, String value) {
- if (StringUtils.isEmpty(method) || StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
- return this;
- }
- URL.putMethodParameter(method, key, value, methodParameters);
- return this;
- }
-
public URLBuilder addParameterIfAbsent(String key, String value) {
if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
return this;
@@ -282,17 +230,6 @@
return this;
}
- public URLBuilder addMethodParameterIfAbsent(String method, String key, String value) {
- if (StringUtils.isEmpty(method) || StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
- return this;
- }
- if (hasMethodParameter(method, key)) {
- return this;
- }
- URL.putMethodParameter(method, key, value, methodParameters);
- return this;
- }
-
public URLBuilder addParameters(Map<String, String> parameters) {
if (CollectionUtils.isEmptyMap(parameters)) {
return this;
@@ -316,15 +253,6 @@
return this;
}
- public URLBuilder addMethodParameters(Map<String, Map<String, String>> methodParameters) {
- if (CollectionUtils.isEmptyMap(methodParameters)) {
- return this;
- }
-
- this.methodParameters.putAll(methodParameters);
- return this;
- }
-
public URLBuilder addParametersIfAbsent(Map<String, String> parameters) {
if (CollectionUtils.isEmptyMap(parameters)) {
return this;
@@ -389,39 +317,19 @@
return StringUtils.isNotEmpty(value);
}
- public boolean hasMethodParameter(String method, String key) {
- if (method == null) {
- String suffix = "." + key;
- for (String fullKey : parameters.keySet()) {
- if (fullKey.endsWith(suffix)) {
- return true;
- }
- }
- return false;
- }
- if (key == null) {
- String prefix = method + ".";
- for (String fullKey : parameters.keySet()) {
- if (fullKey.startsWith(prefix)) {
- return true;
- }
- }
- return false;
- }
- String value = getMethodParameter(method, key);
- return value != null && value.length() > 0;
- }
-
public String getParameter(String key) {
return parameters.get(key);
}
- public String getMethodParameter(String method, String key) {
- Map<String, String> keyMap = methodParameters.get(method);
- String value = null;
- if (keyMap != null) {
- value = keyMap.get(key);
- }
- return value;
+ /**
+ * Parse url string
+ *
+ * @param url URL string
+ * @return URL instance
+ * @see URL
+ */
+ public static URLBuilder valueOf(String url) {
+ return (URLBuilder) URLStrParser.parseDecodedStr(url, true);
}
+
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
index 37afb78..ccf3edf 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX;
import static org.apache.dubbo.common.utils.StringUtils.EMPTY_STRING;
import static org.apache.dubbo.common.utils.StringUtils.decodeHexByte;
import static org.apache.dubbo.common.utils.Utf8Utils.decodeUtf8;
@@ -34,12 +35,16 @@
//empty
}
+ public static URL parseDecodedStr(String decodedURLStr) {
+ return parseDecodedStr(decodedURLStr, false);
+ }
+
/**
* @param decodedURLStr : after {@link URL#decode} string
* decodedURLStr format: protocol://username:password@host:port/path?k1=v1&k2=v2
* [protocol://][username:password@][host:port]/[path][?k1=v1&k2=v2]
*/
- public static URL parseDecodedStr(String decodedURLStr) {
+ public static URL parseDecodedStr(String decodedURLStr, boolean modifiable) {
Map<String, String> parameters = null;
int pathEndIdx = decodedURLStr.indexOf('?');
if (pathEndIdx >= 0) {
@@ -49,7 +54,7 @@
}
String decodedBody = decodedURLStr.substring(0, pathEndIdx);
- return parseURLBody(decodedURLStr, decodedBody, parameters);
+ return parseURLBody(decodedURLStr, decodedBody, parameters, modifiable);
}
private static Map<String, String> parseDecodedParams(String str, int from) {
@@ -92,7 +97,7 @@
* @param parameters :
* @return URL
*/
- private static URL parseURLBody(String fullURLStr, String decodedBody, Map<String, String> parameters) {
+ private static URL parseURLBody(String fullURLStr, String decodedBody, Map<String, String> parameters, boolean modifiable) {
int starIdx = 0, endIdx = decodedBody.length();
String protocol = null;
int protoEndIdx = decodedBody.indexOf("://");
@@ -149,15 +154,25 @@
if (endIdx > starIdx) {
host = decodedBody.substring(starIdx, endIdx);
}
- return new URL(protocol, username, password, host, port, path, parameters);
+
+ if (modifiable) {
+ return new URLBuilder(protocol, username, password, host, port, path, parameters);
+ } else {
+ return new URL(protocol, username, password, host, port, path, parameters);
+ }
}
+ public static URL parseEncodedStr(String encodedURLStr) {
+ return parseEncodedStr(encodedURLStr, false);
+ }
+
+
/**
* @param encodedURLStr : after {@link URL#encode(String)} string
* encodedURLStr after decode format: protocol://username:password@host:port/path?k1=v1&k2=v2
* [protocol://][username:password@][host:port]/[path][?k1=v1&k2=v2]
*/
- public static URL parseEncodedStr(String encodedURLStr) {
+ public static URL parseEncodedStr(String encodedURLStr, boolean modifiable) {
Map<String, String> parameters = null;
int pathEndIdx = encodedURLStr.indexOf("%3F");// '?'
if (pathEndIdx >= 0) {
@@ -168,7 +183,7 @@
//decodedBody format: [protocol://][username:password@][host:port]/[path]
String decodedBody = decodeComponent(encodedURLStr, 0, pathEndIdx, false, DECODE_TEMP_BUF.get());
- return parseURLBody(encodedURLStr, decodedBody, parameters);
+ return parseURLBody(encodedURLStr, decodedBody, parameters, modifiable);
}
private static Map<String, String> parseEncodedParams(String str, int from) {
@@ -225,12 +240,30 @@
if (isEncoded) {
String name = decodeComponent(str, nameStart, valueStart - 3, false, tempBuf);
- String value = decodeComponent(str, valueStart, valueEnd, false, tempBuf);
+ String value;
+ if (valueStart == valueEnd) {
+ value = name;
+ } else {
+ value = decodeComponent(str, valueStart, valueEnd, false, tempBuf);
+ }
params.put(name, value);
+ // compatible with lower versions registering "default." keys
+ if (name.startsWith(DEFAULT_KEY_PREFIX)) {
+ params.putIfAbsent(name.substring(DEFAULT_KEY_PREFIX.length()), value);
+ }
} else {
- String name = str.substring(nameStart, valueStart -1);
- String value = str.substring(valueStart, valueEnd);
+ String name = str.substring(nameStart, valueStart - 1);
+ String value;
+ if (valueStart == valueEnd) {
+ value = name;
+ } else {
+ value = str.substring(valueStart, valueEnd);
+ }
params.put(name, value);
+ // compatible with lower versions registering "default." keys
+ if (name.startsWith(DEFAULT_KEY_PREFIX)) {
+ params.putIfAbsent(name.substring(DEFAULT_KEY_PREFIX.length()), value);
+ }
}
return true;
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 27c5744..defa559 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -310,4 +310,8 @@
String SSL_ENABLED_KEY = "ssl-enabled";
+ String REGISTRY_SNAPSHOT_KEY = "snapshot";
+
+ String REGISTRY_DELAY_NOTIFICATION_KEY = "delay-notification";
+
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
index cf34202..bfea952 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
@@ -336,15 +336,27 @@
for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
- if ((k.equals(key) || k.endsWith("." + key))
- && ((keyValue != null && keyValue.equals(v)) || (keyValue == null && ConfigUtils.isNotEmpty(v)))) {
+ if (isMatch(key, keyValue, k, v)) {
return true;
}
}
+
+ for (Map.Entry<String, Map<String, String>> entry : url.getMethodParameters().entrySet()) {
+ Map<String, String> methodKeyValues = entry.getValue();
+ for (Map.Entry<String, String> methodEntry : methodKeyValues.entrySet()) {
+ String k = methodEntry.getKey();
+ String v = methodEntry.getValue();
+ return isMatch(key, keyValue, k, v);
+ }
+ }
}
return false;
}
+ private boolean isMatch(String key, String value, String k, String v) {
+ return k.equals(key) && ((value != null && value.equals(v)) || (value == null && ConfigUtils.isNotEmpty(v)));
+ }
+
/**
* Get extension's instance. Return <code>null</code> if extension is not found or is not initialized. Pls. note
* that this method will not trigger extension load.
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index dd37bff..3df98d5 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -50,6 +50,8 @@
private ScheduledExecutorService serviceExporterExecutor;
+ public ScheduledExecutorService registryNotificationExecutor;
+
private ScheduledExecutorService reconnectScheduledExecutor;
private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();
@@ -62,6 +64,7 @@
//
// reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler"));
serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
+ registryNotificationExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-registry-notification"));
}
/**
@@ -155,6 +158,11 @@
}
@Override
+ public ScheduledExecutorService getRegistryNotificationExecutor() {
+ return registryNotificationExecutor;
+ }
+
+ @Override
public ExecutorService getSharedExecutor() {
return SHARED_EXECUTOR;
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
index af3b110..dd91442 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -58,6 +58,13 @@
ScheduledExecutorService getServiceExporterExecutor();
/**
+ * Scheduled executor handle registry notification.
+ *
+ * @return
+ */
+ ScheduledExecutorService getRegistryNotificationExecutor();
+
+ /**
* Get the default shared threadpool.
*
* @return
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
index d45ada1..797f09b 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.common.utils;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.constants.RemotingConstants;
import java.util.ArrayList;
@@ -553,4 +554,15 @@
arr[1] = serviceKey;
return arr;
}
+
+ public static URLBuilder newModifiableUrl(URL url) {
+ return URLBuilder.from(url);
+ }
+
+ public static URL unmodifiableUrl(URL url) {
+ if (url instanceof URLBuilder) {
+ return ((URLBuilder) url).build();
+ }
+ return url;
+ }
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java
index 3b868fe..7411205 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java
@@ -24,6 +24,7 @@
import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.EXTRA_KEYS_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_DELAY_NOTIFICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SHUTDOWN_WAIT_KEY;
import static org.apache.dubbo.config.Constants.REGISTRIES_SUFFIX;
@@ -180,6 +181,8 @@
*/
private Integer weight;
+ private Integer lazyNotification;
+
public RegistryConfig() {
}
@@ -497,6 +500,15 @@
this.weight = weight;
}
+ @Parameter(key = REGISTRY_DELAY_NOTIFICATION_KEY)
+ public Integer getLazyNotification() {
+ return lazyNotification;
+ }
+
+ public void setLazyNotification(Integer lazyNotification) {
+ this.lazyNotification = lazyNotification;
+ }
+
@Override
public void refresh() {
super.refresh();
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/URLStrParserTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/URLStrParserTest.java
index 3ca62ce..db3c657 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/URLStrParserTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/URLStrParserTest.java
@@ -21,15 +21,12 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-/**
- * Created by LinShunkang on 2020/03/12
- */
public class URLStrParserTest {
@Test
public void test() {
String str = "dubbo%3A%2F%2Fadmin%3Aadmin123%40192.168.1.41%3A28113%2Forg.test.api.DemoService%24Iface%3Fanyhost%3Dtrue%26application%3Ddemo-service%26dubbo%3D2.6.1%26generic%3Dfalse%26interface%3Dorg.test.api.DemoService%24Iface%26methods%3DorbCompare%2CcheckText%2CcheckPicture%26pid%3D65557%26revision%3D1.4.17%26service.filter%3DbootMetrics%26side%3Dprovider%26status%3Dserver%26threads%3D200%26timestamp%3D1583136298859%26version%3D1.0.0";
- System.out.println(URLStrParser.parseEncodedStr(str));
+ System.out.println(URLStrParser.parseEncodedStr(str, false));
String decodeStr = URL.decode(str);
URL originalUrl = URL.valueOf(decodeStr);
@@ -37,4 +34,12 @@
assertThat(URLStrParser.parseDecodedStr(decodeStr), equalTo(originalUrl));
}
+ @Test
+ public void testNoValue() {
+ String str = URL.encode("http://1.2.3.4:8080/path?k0=&k1=v1");
+
+ System.out.println(URLStrParser.parseEncodedStr(str, false));
+
+ }
+
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/URLTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/URLTest.java
index db5f57b..52ee650 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/URLTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/URLTest.java
@@ -274,7 +274,7 @@
assertEquals("1.0.0", url.getParameter("version"));
assertEquals("morgan", url.getParameter("application"));
- url = URL.valueOf("dubbo://admin:hello1234@10.20.130.230:20880/context/path?version=1.0.0&application=morgan&noValue");
+ url = URL.valueOf("dubbo://admin:hello1234@10.20.130.230:20880/context/path?version=1.0.0&application=morgan&noValue=");
assertURLStrDecoder(url);
assertEquals("dubbo", url.getProtocol());
assertEquals("admin", url.getUsername());
@@ -300,7 +300,7 @@
@Test
public void test_noValueKey() throws Exception {
- URL url = URL.valueOf("http://1.2.3.4:8080/path?k0&k1=v1");
+ URL url = URL.valueOf("http://1.2.3.4:8080/path?k0=&k1=v1");
assertURLStrDecoder(url);
assertTrue(url.hasParameter("k0"));
@@ -874,4 +874,20 @@
url = URL.valueOf("dubbo://10.20.130.230:20880/path");
assertURLStrDecoder(url);
}
+
+ @Test
+ public void testEquals() {
+ URL url1 = URL.valueOf("10.20.130.230:20880/context/path?interface=org.apache.dubbo.test.interfaceName&group=group&version=1.0.0");
+ URL url2 = URL.valueOf("10.20.130.230:20880/context/path?interface=org.apache.dubbo.test.interfaceName&group=group&version=1.0.0");
+ Assertions.assertEquals(url1, url2);
+
+ URL url3 = URL.valueOf("10.20.130.230:20881/context/path?interface=org.apache.dubbo.test.interfaceName&group=group&version=1.0.0");
+ Assertions.assertNotEquals(url1, url3);
+
+ URL url4 = URL.valueOf("10.20.130.230:20880/context/path?interface=org.apache.dubbo.test.interfaceName&weight=10&group=group&version=1.0.0");
+ Assertions.assertNotEquals(url1, url4);
+
+ URL url5 = URL.valueOf("10.20.130.230:20880/context/path?interface=org.apache.dubbo.test.interfaceName&weight=10&group=group&version=1.0.0");
+ Assertions.assertEquals(url4, url5);
+ }
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java
index ed737ef..ea54e9f 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java
@@ -17,7 +17,7 @@
package org.apache.dubbo.common.threadpool.support;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
+
import org.junit.jupiter.api.Test;
import java.util.concurrent.Executors;
@@ -27,7 +27,7 @@
public class AbortPolicyWithReportTest {
@Test
public void jStackDumpTest() throws InterruptedException {
- URL url = URL.valueOf("dubbo://admin:hello1234@10.20.130.230:20880/context/path?dump.directory=/tmp&version=1.0.0&application=morgan&noValue");
+ URL url = URL.valueOf("dubbo://admin:hello1234@10.20.130.230:20880/context/path?dump.directory=/tmp&version=1.0.0&application=morgan&noValue=");
AbortPolicyWithReport abortPolicyWithReport = new AbortPolicyWithReport("Test", url);
try {
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 0792d12..a147874 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -32,7 +32,6 @@
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.config.event.ServiceConfigExportedEvent;
import org.apache.dubbo.config.event.ServiceConfigUnexportedEvent;
-import org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker;
import org.apache.dubbo.config.support.Parameter;
import org.apache.dubbo.config.utils.ConfigValidationUtils;
import org.apache.dubbo.event.Event;
@@ -487,9 +486,7 @@
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
- DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
-
- Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
+ Exporter<?> exporter = PROTOCOL.export(invoker);
exporters.add(exporter);
}
} else {
@@ -497,9 +494,7 @@
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
- DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
-
- Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
+ Exporter<?> exporter = PROTOCOL.export(invoker);
exporters.add(exporter);
}
/**
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
index 1805b2a..aa1fe74 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
@@ -623,6 +623,11 @@
<xsd:documentation><![CDATA[ Is this registry the preferred one. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="delay-notification" type="xsd:int">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ The maximum time to wait before notification. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="weight" type="xsd:integer">
<xsd:annotation>
<xsd:documentation><![CDATA[ weight of registry. ]]></xsd:documentation>
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index bfaaf3f..3248e9c 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -617,6 +617,11 @@
<xsd:documentation><![CDATA[ Is this registry the preferred one. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="delay-notification" type="xsd:int">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ The maximum time to wait before notification. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="weight" type="xsd:integer">
<xsd:annotation>
<xsd:documentation><![CDATA[ weight of registry. ]]></xsd:documentation>
diff --git a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java
index 3ca881f..b39199e 100644
--- a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java
+++ b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java
@@ -50,6 +50,12 @@
@Override
protected Monitor createMonitor(URL url) {
+ Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, buildMonitorURL(url));
+ MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
+ return new DubboMonitor(monitorInvoker, monitorService);
+ }
+
+ private URL buildMonitorURL(URL url) {
URLBuilder urlBuilder = URLBuilder.from(url);
urlBuilder.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL));
if (StringUtils.isEmpty(url.getPath())) {
@@ -63,9 +69,7 @@
}
urlBuilder.addParameters(CHECK_KEY, String.valueOf(false),
REFERENCE_FILTER_KEY, filter + "-monitor");
- Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, urlBuilder.build());
- MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
- return new DubboMonitor(monitorInvoker, monitorService);
+ return urlBuilder.build();
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
index d5b3dbc..084deff 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
@@ -19,6 +19,8 @@
import org.apache.dubbo.common.Node;
import org.apache.dubbo.common.URL;
+import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_DELAY_NOTIFICATION_KEY;
+
/**
* Registry. (SPI, Prototype, ThreadSafe)
*
@@ -26,6 +28,10 @@
* @see org.apache.dubbo.registry.support.AbstractRegistry
*/
public interface Registry extends Node, RegistryService {
+ default int getDelay() {
+ return getUrl().getParameter(REGISTRY_DELAY_NOTIFICATION_KEY, -1);
+ }
+
default void reExportRegister(URL url) {
register(url);
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/RegistryNotifier.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/RegistryNotifier.java
new file mode 100644
index 0000000..afb640a
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/RegistryNotifier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry;
+
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public abstract class RegistryNotifier {
+
+ private volatile long lastExecuteTime;
+ private volatile long lastEventTime;
+
+ private Object rawAddresses;
+ private Registry registry;
+
+ private ScheduledExecutorService scheduler = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
+ .getDefaultExtension().getRegistryNotificationExecutor();
+
+ public Registry getRegistry() {
+ return registry;
+ }
+
+ public RegistryNotifier(Registry registry) {
+ this.registry = registry;
+ }
+
+ public void notify(Object rawAddresses) {
+ this.rawAddresses = rawAddresses;
+ long notifyTime = System.currentTimeMillis();
+ this.lastEventTime = notifyTime;
+
+ int delayTime = getRegistry().getDelay();
+ long delta = (System.currentTimeMillis() - lastExecuteTime) - delayTime;
+ if (delta >= 0) {
+ scheduler.submit(new NotificationTask(this, notifyTime));
+ } else {
+ scheduler.schedule(new NotificationTask(this, notifyTime), -delta, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ protected abstract void doNotify(Object rawAddresses);
+
+ public class NotificationTask implements Runnable {
+ private RegistryNotifier listener;
+ private long time;
+
+ public NotificationTask(RegistryNotifier listener, long time) {
+ this.listener = listener;
+ this.time = time;
+ }
+
+ @Override
+ public void run() {
+ if (this.time == listener.lastEventTime) {
+ listener.doNotify(listener.rawAddresses);
+ listener.lastExecuteTime = System.currentTimeMillis();
+ }
+ }
+ }
+
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 736108f..14e26ea 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -23,6 +23,7 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
@@ -63,11 +64,18 @@
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PREFERRED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TAG_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.APP_DYNAMIC_CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.COMPATIBLE_CONFIG_KEY;
@@ -77,6 +85,7 @@
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTE_PROTOCOL;
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
@@ -84,7 +93,6 @@
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.ROUTER_KEY;
@@ -103,6 +111,7 @@
private final String serviceKey; // Initialization at construction time, assertion not null
private final Class<T> serviceType; // Initialization at construction time, assertion not null
private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
+ private final Map<String, String> mergeMap;
private final URL directoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
private final boolean multiGroup;
private Protocol protocol; // Initialization at the time of injection, the assertion is not null
@@ -124,17 +133,17 @@
private volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference
// Map<url, Invoker> cache service url to invoker mapping.
- private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+ private volatile Map<URL, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private volatile List<Invoker<T>> invokers;
// Set<invokerUrls> cache invokeUrls to invokers mapping.
- private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+ private volatile List<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
private ReferenceConfigurationListener serviceConfigurationListener;
- public RegistryDirectory(Class<T> serviceType, URL url) {
+ public RegistryDirectory(Class<T> serviceType, URL url, Map<String, String> parameters) {
super(url);
if (serviceType == null) {
throw new IllegalArgumentException("service type is null.");
@@ -147,19 +156,37 @@
}
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
- this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
+ this.queryMap = parameters;
+ this.mergeMap = genMergeMap(parameters);
this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
String group = directoryUrl.getParameter(GROUP_KEY, "");
this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
}
+ private Map<String, String> genMergeMap(Map<String, String> parameters) {
+ Map<String, String> copyOfParameters = new HashMap<>(parameters);
+ copyOfParameters.remove(GROUP_KEY);
+ copyOfParameters.remove(VERSION_KEY);
+ copyOfParameters.remove(RELEASE_KEY);
+ copyOfParameters.remove(DUBBO_VERSION_KEY);
+ copyOfParameters.remove(METHODS_KEY);
+ copyOfParameters.remove(TIMESTAMP_KEY);
+ copyOfParameters.remove(TAG_KEY);
+ return copyOfParameters;
+ }
+
private URL turnRegistryUrlToConsumerUrl(URL url) {
- return URLBuilder.from(url)
+ // save any parameter in registry that will be useful to the new url.
+ URLBuilder builder = URLBuilder.from(url)
.setPath(url.getServiceInterface())
.clearParameters()
.addParameters(queryMap)
- .removeParameter(MONITOR_KEY)
- .build();
+ .removeParameter(MONITOR_KEY);
+ String isDefault = url.getParameter(PREFERRED_KEY);
+ if (StringUtils.isNotEmpty(isDefault)) {
+ builder.addParameter(REGISTRY_KEY + "." + PREFERRED_KEY, isDefault);
+ }
+ return builder.build();
}
public void setProtocol(Protocol protocol) {
@@ -283,7 +310,6 @@
*
* @param invokerUrls this parameter can't be null
*/
- // TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated.
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
@@ -296,20 +322,21 @@
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
- Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
+ Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
+
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
- invokerUrls.addAll(this.cachedInvokerUrls);
+ invokerUrls = this.cachedInvokerUrls;
} else {
- this.cachedInvokerUrls = new HashSet<>();
- this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
+ this.cachedInvokerUrls = invokerUrls;//Cached invoker urls, convenient for comparison
}
+
if (invokerUrls.isEmpty()) {
return;
}
- Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
+ Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
@@ -401,18 +428,21 @@
* @param urls
* @return invokers
*/
- private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
- Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+ private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
+ Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
- Set<String> keys = new HashSet<>();
+ Set<URL> keys = new HashSet<>();
+ String[] acceptProtocols = null;
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
+ if (queryProtocols != null && queryProtocols.length() > 0) {
+ acceptProtocols = queryProtocols.split(",");
+ }
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
- if (queryProtocols != null && queryProtocols.length() > 0) {
+ if (ArrayUtils.isNotEmpty(acceptProtocols)) {
boolean accept = false;
- String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
@@ -433,16 +463,16 @@
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
- URL url = mergeUrl(providerUrl);
+ URL url = UrlUtils.unmodifiableUrl(mergeUrl(providerUrl));
- String key = url.toFullString(); // The parameter urls are sorted
- if (keys.contains(key)) { // Repeated url
+ if (keys.contains(url)) { // Repeated url
continue;
}
- keys.add(key);
- // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
- Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
- Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
+ keys.add(url);
+ // Cache key is url that does not merge with consumer side parameters,
+ // regardless of how the consumer combines parameters, if the server url changes, then refer again
+ Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+ Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(url);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
@@ -458,10 +488,10 @@
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
- newUrlInvokerMap.put(key, invoker);
+ newUrlInvokerMap.put(url, invoker);
}
} else {
- newUrlInvokerMap.put(key, invoker);
+ newUrlInvokerMap.put(url, invoker);
}
}
keys.clear();
@@ -475,7 +505,7 @@
* @return
*/
private URL mergeUrl(URL providerUrl) {
- providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters
+ providerUrl = ClusterUtils.mergeUrl(providerUrl, mergeMap); // Merge the consumer side parameters
providerUrl = overrideWithConfigurator(providerUrl);
@@ -531,7 +561,7 @@
* Close all invokers
*/
private void destroyAllInvokers() {
- Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+ Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
if (localUrlInvokerMap != null) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
try {
@@ -552,16 +582,16 @@
* @param oldUrlInvokerMap
* @param newUrlInvokerMap
*/
- private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
+ private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
// check deleted invoker
- List<String> deleted = null;
+ List<URL> deleted = null;
if (oldUrlInvokerMap != null) {
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
- for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
+ for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<>();
@@ -572,7 +602,7 @@
}
if (deleted != null) {
- for (String url : deleted) {
+ for (URL url : deleted) {
if (url != null) {
Invoker<T> invoker = oldUrlInvokerMap.remove(url);
if (invoker != null) {
@@ -649,7 +679,7 @@
if (isDestroyed()) {
return false;
}
- Map<String, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
+ Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
if (invoker.isAvailable()) {
@@ -667,7 +697,7 @@
/**
* Haomin: added for test purpose
*/
- public Map<String, Invoker<T>> getUrlInvokerMap() {
+ public Map<URL, Invoker<T>> getUrlInvokerMap() {
return urlInvokerMap;
}
@@ -694,7 +724,7 @@
private void overrideDirectoryUrl() {
// merge override parameters
- this.overrideDirectoryUrl = directoryUrl;
+ this.overrideDirectoryUrl = UrlUtils.newModifiableUrl(directoryUrl);
List<Configurator> localConfigurators = this.configurators; // local reference
doOverrideUrl(localConfigurators);
List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index 76da303..c706899 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -48,7 +48,7 @@
import org.apache.dubbo.rpc.protocol.InvokerWrapper;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -67,19 +67,13 @@
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
-import static org.apache.dubbo.common.constants.FilterConstants.VALIDATION_KEY;
-import static org.apache.dubbo.common.constants.QosConstants.ACCEPT_FOREIGN_IP;
-import static org.apache.dubbo.common.constants.QosConstants.QOS_ENABLE;
-import static org.apache.dubbo.common.constants.QosConstants.QOS_HOST;
-import static org.apache.dubbo.common.constants.QosConstants.QOS_PORT;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.OVERRIDE_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
@@ -95,15 +89,13 @@
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY;
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
-import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
-import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
import static org.apache.dubbo.remoting.Constants.CODEC_KEY;
import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY;
import static org.apache.dubbo.remoting.Constants.EXCHANGER_KEY;
+import static org.apache.dubbo.remoting.Constants.PAYLOAD_KEY;
import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
-import static org.apache.dubbo.rpc.Constants.INTERFACES;
import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.EXPORT_KEY;
@@ -118,7 +110,7 @@
public static final String[] DEFAULT_REGISTER_PROVIDER_KEYS = {
APPLICATION_KEY, CODEC_KEY, EXCHANGER_KEY, SERIALIZATION_KEY, CLUSTER_KEY, CONNECTIONS_KEY, DEPRECATED_KEY,
GROUP_KEY, LOADBALANCE_KEY, MOCK_KEY, PATH_KEY, TIMEOUT_KEY, TOKEN_KEY, VERSION_KEY, WARMUP_KEY,
- WEIGHT_KEY, TIMESTAMP_KEY, DUBBO_VERSION_KEY, RELEASE_KEY
+ WEIGHT_KEY, DUBBO_VERSION_KEY, RELEASE_KEY, PAYLOAD_KEY
};
public static final String[] DEFAULT_REGISTER_CONSUMER_KEYS = {
@@ -205,11 +197,12 @@
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
+ providerUrl = UrlUtils.unmodifiableUrl(providerUrl);
+
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
- final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
@@ -222,6 +215,7 @@
registerStatedUrl(registryUrl, registeredProviderUrl, register);
// Deprecated! Subscribe to override rules in 2.6.x or before.
+ final Registry registry = registryFactory.getRegistry(registryUrl);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
@@ -252,7 +246,6 @@
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
-
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
@@ -358,12 +351,12 @@
}
protected URL getRegistryUrl(Invoker<?> originInvoker) {
- URL registryUrl = originInvoker.getUrl();
- if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
- String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
- registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
+ URLBuilder builder = URLBuilder.from(originInvoker.getUrl());
+ if (REGISTRY_PROTOCOL.equals(builder.getProtocol())) {
+ String protocol = builder.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
+ builder.setProtocol(protocol).removeParameter(REGISTRY_KEY);
}
- return registryUrl;
+ return builder.build();
}
protected URL getRegistryUrl(URL url) {
@@ -381,32 +374,31 @@
* @return url to registry.
*/
private URL getUrlToRegistry(final URL providerUrl, final URL registryUrl) {
- //The address you see at the registry
if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
- return providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameters(
- MONITOR_KEY, BIND_IP_KEY, BIND_PORT_KEY, QOS_ENABLE, QOS_HOST, QOS_PORT, ACCEPT_FOREIGN_IP, VALIDATION_KEY,
- INTERFACES);
- } else {
- String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, "");
- // if path is not the same as interface name then we should keep INTERFACE_KEY,
- // otherwise, the registry structure of zookeeper would be '/dubbo/path/providers',
- // but what we expect is '/dubbo/interface/providers'
- if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) {
- if (StringUtils.isNotEmpty(extraKeys)) {
- extraKeys += ",";
- }
- extraKeys += INTERFACE_KEY;
- }
- String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS
- , COMMA_SPLIT_PATTERN.split(extraKeys));
- return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null));
+ return providerUrl;
}
-
+ String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, "");
+ // if path is not the same as interface name then we should keep INTERFACE_KEY,
+ // otherwise, the registry structure of zookeeper would be '/dubbo/path/providers',
+ // but what we expect is '/dubbo/interface/providers'
+ if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) {
+ if (StringUtils.isNotEmpty(extraKeys)) {
+ extraKeys += ",";
+ }
+ extraKeys += INTERFACE_KEY;
+ }
+ String[] extraKeyArrays = COMMA_SPLIT_PATTERN.split(extraKeys);
+ String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS, extraKeyArrays);
+ // TODO, avoid creation of URL
+ //The address you see at the registry
+ return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null));
}
private URL getSubscribedOverrideUrl(URL registeredProviderUrl) {
- return registeredProviderUrl.setProtocol(PROVIDER_PROTOCOL)
- .addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false));
+ return URLBuilder.from(registeredProviderUrl)
+ .setProtocol(PROVIDER_PROTOCOL)
+ .addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false))
+ .build();
}
/**
@@ -420,7 +412,7 @@
if (export == null || export.length() == 0) {
throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
}
- return URL.valueOf(export);
+ return URLBuilder.valueOf(export);
}
/**
@@ -430,9 +422,7 @@
* @return
*/
private String getCacheKey(final Invoker<?> originInvoker) {
- URL providerUrl = getProviderUrl(originInvoker);
- String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
- return key;
+ return originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY);
}
@Override
@@ -445,33 +435,34 @@
}
// group="a,b" or group="*"
- Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
+ Map<String, String> qs = Collections.unmodifiableMap(StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
- return doRefer(getMergeableCluster(), registry, type, url);
+ return doRefer(getMergeableCluster(), registry, type, url, qs);
}
}
- return doRefer(cluster, registry, type, url);
+ return doRefer(cluster, registry, type, url, qs);
}
private Cluster getMergeableCluster() {
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
}
- private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
- RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
+ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
+ RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url, parameters);
directory.setRegistry(registry);
directory.setProtocol(protocol);
- // all attributes of REFER_KEY
- Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
- URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
+
+ URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.get(REGISTER_IP_KEY), 0, type.getName(), parameters);
+ subscribeUrl = toSubscribeUrl(subscribeUrl);
+
if (directory.isShouldRegister()) {
- directory.setRegisteredConsumerUrl(subscribeUrl);
+ directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
- directory.subscribe(toSubscribeUrl(subscribeUrl));
+ directory.subscribe(subscribeUrl);
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
@@ -511,6 +502,16 @@
return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
}
+ public URL getRegisteredConsumerUrl(final URL consumerUrl, URL registryUrl) {
+ if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
+ return consumerUrl.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY,
+ CHECK_KEY, String.valueOf(false));
+ } else {
+ return URL.valueOf(consumerUrl, DEFAULT_REGISTER_CONSUMER_KEYS, null).addParameters(
+ CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY, String.valueOf(false));
+ }
+ }
+
private List<RegistryProtocolListener> findRegistryProtocolListeners(URL url) {
return ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
.getActivateExtension(url, "registry.protocol.listener");
@@ -626,8 +627,7 @@
public synchronized void notify(List<URL> urls) {
logger.debug("original override urls: " + urls);
- List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY,
- CONFIGURATORS_CATEGORY));
+ List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl);
logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
// No matching results
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
index efcd7e9..a322388 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
@@ -57,6 +57,7 @@
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
import static org.apache.dubbo.common.constants.CommonConstants.FILE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_SNAPSHOT_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.ACCEPTS_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
@@ -404,7 +405,7 @@
return;
}
if (logger.isInfoEnabled()) {
- logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
+ logger.info("Notify urls for subscribing service " + url.getServiceKey() + ", provider url size: " + urls.size());
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
@@ -426,7 +427,9 @@
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
- saveProperties(url);
+ if (registryUrl.getParameter(REGISTRY_SNAPSHOT_KEY, false)) {
+ saveProperties(url);
+ }
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
new file mode 100644
index 0000000..c2b6929
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.UrlUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
+
+/**
+ * Useful for registries who's sdk returns raw string as provider instance, for example, zookeeper and etcd.
+ */
+public abstract class CacheableFailbackRegistry extends FailbackRegistry {
+
+ protected final ConcurrentMap<URL, ConcurrentMap<String, URL>> stringUrls = new ConcurrentHashMap<>();
+
+ public CacheableFailbackRegistry(URL url) {
+ super(url);
+ }
+
+ protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
+ if (CollectionUtils.isNotEmpty(providers)) {
+ Map<String, URL> consumerStringUrls = stringUrls.computeIfAbsent(consumer, (k) -> new ConcurrentHashMap<>());
+ Map<String, URL> copyOfStringUrls = new HashMap<>(consumerStringUrls);
+ for (String rawProvider : providers) {
+ URL cachedUrl = copyOfStringUrls.remove(rawProvider);
+ if (cachedUrl == null) {
+ // parse encoded (URLEncoder.encode) url directly.
+ URL url = URL.valueOf(rawProvider, true);
+ if (isMatch(consumer, url)) {
+ consumerStringUrls.put(rawProvider, url);
+ }
+ }
+ }
+ copyOfStringUrls.keySet().forEach(consumerStringUrls::remove);
+
+ List<URL> urls = new ArrayList<>(consumerStringUrls.size());
+ consumerStringUrls.values().forEach(u -> urls.add(UrlUtils.newModifiableUrl(u)));
+ return urls;
+ }
+
+ stringUrls.remove(consumer);
+ return new ArrayList<>(1);
+ }
+
+ protected List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
+ List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
+ if (urls.isEmpty()) {
+ int i = path.lastIndexOf(PATH_SEPARATOR);
+ String category = i < 0 ? path : path.substring(i + 1);
+ URL empty = URLBuilder.from(consumer)
+ .setProtocol(EMPTY_PROTOCOL)
+ .addParameter(CATEGORY_KEY, category)
+ .build();
+ urls.add(empty);
+ }
+ return urls;
+ }
+
+ protected abstract boolean isMatch(URL subscribeUrl, URL providerUrl);
+
+}
diff --git a/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistryFactory.java b/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistryFactory.java
index bc669c6..93226e8 100644
--- a/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistryFactory.java
+++ b/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistryFactory.java
@@ -35,6 +35,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.CALLBACK_INSTANCES_LIMIT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
@@ -104,7 +105,10 @@
urls.add(url.setAddress(address));
}
}
- RegistryDirectory<RegistryService> directory = new RegistryDirectory<>(RegistryService.class, url.addParameter(INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(REFER_KEY, url.toParameterString()));
+
+ Map<String, String> referParams = url.getParameters();
+ URL registryUrl = url.addParameter(INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(REFER_KEY, url.toParameterString());
+ RegistryDirectory<RegistryService> directory = new RegistryDirectory<>(RegistryService.class, registryUrl, referParams);
Invoker<RegistryService> registryInvoker = cluster.join(directory);
RegistryService registryService = proxyFactory.getProxy(registryInvoker);
DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);
diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java
index 9ce163c..3d7225c 100644
--- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java
+++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java
@@ -440,7 +440,7 @@
registryDirectory.destroy();
List<Invoker<RegistryDirectoryTest>> cachedInvokers = registryDirectory.getInvokers();
- Map<String, Invoker<RegistryDirectoryTest>> urlInvokerMap = registryDirectory.getUrlInvokerMap();
+ Map<URL, Invoker<RegistryDirectoryTest>> urlInvokerMap = registryDirectory.getUrlInvokerMap();
Assertions.assertNull(cachedInvokers);
Assertions.assertEquals(0, urlInvokerMap.size());
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
index a15d7d5..ad11041 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
@@ -22,9 +22,9 @@
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.registry.RegistryNotifier;
+import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
import org.apache.dubbo.remoting.etcd.ChildListener;
-import org.apache.dubbo.remoting.etcd.Constants;
import org.apache.dubbo.remoting.etcd.EtcdClient;
import org.apache.dubbo.remoting.etcd.EtcdTransporter;
import org.apache.dubbo.remoting.etcd.StateListener;
@@ -48,7 +48,6 @@
import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
@@ -57,7 +56,7 @@
/**
* Support for ectd3 registry.
*/
-public class EtcdRegistry extends FailbackRegistry {
+public class EtcdRegistry extends CacheableFailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(EtcdRegistry.class);
@@ -220,8 +219,7 @@
Optional.ofNullable(listeners.get(listener))
.orElseGet(() -> {
ChildListener watchListener, prev;
- prev = listeners.putIfAbsent(listener, watchListener = (parentPath, currentChildren) -> EtcdRegistry.this.notify(url, listener,
- toUrlsWithEmpty(url, parentPath, currentChildren)));
+ prev = listeners.putIfAbsent(listener, watchListener = new RegistryChildListenerImpl(url, path, listener));
return prev != null ? prev : watchListener;
});
@@ -330,30 +328,26 @@
return categories;
}
- protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
- List<URL> urls = new ArrayList<>();
- if (providers != null && providers.size() > 0) {
- for (String provider : providers) {
- provider = URL.decode(provider);
- if (provider.contains(Constants.HTTP_SUBFIX_KEY)) {
- URL url = URL.valueOf(provider);
- if (UrlUtils.isMatch(consumer, url)) {
- urls.add(url);
- }
- }
- }
- }
- return urls;
+ @Override
+ protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
+ return UrlUtils.isMatch(subscribeUrl, providerUrl);
}
- protected List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
- List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
- if (urls == null || urls.isEmpty()) {
- int i = path.lastIndexOf('/');
- String category = i < 0 ? path : path.substring(i + 1);
- URL empty = consumer.setProtocol(EMPTY_PROTOCOL).addParameter(CATEGORY_KEY, category);
- urls.add(empty);
+ private class RegistryChildListenerImpl implements ChildListener {
+ private RegistryNotifier notifier;
+
+ public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener) {
+ notifier = new RegistryNotifier(EtcdRegistry.this) {
+ @Override
+ protected void doNotify(Object rawAddresses) {
+ EtcdRegistry.this.notify(consumerUrl, listener, EtcdRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
+ }
+ };
}
- return urls;
+
+ @Override
+ public void childChanged(String path, List<String> children) {
+ notifier.notify(children);
+ }
}
}
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
index 187da71..6a62387 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
@@ -27,15 +27,18 @@
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.RegistryNotifier;
import org.apache.dubbo.registry.nacos.util.NacosInstanceManageUtil;
import org.apache.dubbo.registry.support.FailbackRegistry;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
+import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
+import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
@@ -489,24 +492,7 @@
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
- EventListener eventListener = event -> {
- if (event instanceof NamingEvent) {
- NamingEvent e = (NamingEvent) event;
- List<Instance> instances = e.getInstances();
-
-
- if (isServiceNamesWithCompatibleMode(url)) {
- /**
- * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
- * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
- */
- NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
- instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
- }
-
- notifySubscriber(url, listener, instances);
- }
- };
+ EventListener eventListener = new RegistryChildListenerImpl(serviceName, url, listener);
namingService.subscribe(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
eventListener);
@@ -626,4 +612,35 @@
void callback(NamingService namingService) throws NacosException;
}
+
+ private class RegistryChildListenerImpl implements EventListener {
+ private RegistryNotifier notifier;
+
+ public RegistryChildListenerImpl(String serviceName, URL consumerUrl, NotifyListener listener) {
+ notifier = new RegistryNotifier(NacosRegistry.this) {
+ @Override
+ protected void doNotify(Object rawAddresses) {
+ List<Instance> instances = (List<Instance>) rawAddresses;
+ if (isServiceNamesWithCompatibleMode(consumerUrl)) {
+ /**
+ * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
+ * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
+ */
+ NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
+ instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
+ }
+ NacosRegistry.this.notifySubscriber(consumerUrl, listener, instances);
+ }
+ };
+ }
+
+ @Override
+ public void onEvent(Event event) {
+ if (event instanceof NamingEvent) {
+ NamingEvent e = (NamingEvent) event;
+ notifier.notify(e.getInstances());
+ }
+ }
+ }
+
}
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
index 0938a39..c0072a7 100644
--- a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
+++ b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.registry.redis;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.constants.RemotingConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -28,7 +27,7 @@
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
import org.apache.dubbo.rpc.RpcException;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@@ -57,7 +56,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
@@ -66,7 +64,6 @@
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD;
import static org.apache.dubbo.registry.Constants.DEFAULT_SESSION_TIMEOUT;
import static org.apache.dubbo.registry.Constants.REGISTER;
@@ -77,7 +74,7 @@
/**
* RedisRegistry
*/
-public class RedisRegistry extends FailbackRegistry {
+public class RedisRegistry extends CacheableFailbackRegistry {
private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class);
@@ -437,27 +434,17 @@
if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
continue;
}
- List<URL> urls = new ArrayList<>();
+
Map<String, String> values = jedis.hgetAll(key);
+ List<String> rawUrls = new ArrayList<>(values.size());
if (CollectionUtils.isNotEmptyMap(values)) {
for (Map.Entry<String, String> entry : values.entrySet()) {
- URL u = URL.valueOf(entry.getKey());
- if (!u.getParameter(DYNAMIC_KEY, true)
- || Long.parseLong(entry.getValue()) >= now) {
- if (UrlUtils.isMatch(url, u)) {
- urls.add(u);
- }
+ if (Long.parseLong(entry.getValue()) >= now) {
+ rawUrls.add(entry.getKey());
}
}
}
- if (urls.isEmpty()) {
- urls.add(URLBuilder.from(url)
- .setProtocol(EMPTY_PROTOCOL)
- .setAddress(ANYHOST_VALUE)
- .setPath(toServiceName(key))
- .addParameter(CATEGORY_KEY, category)
- .build());
- }
+ List<URL> urls = toUrlsWithEmpty(url, category, rawUrls);
result.addAll(urls);
if (logger.isInfoEnabled()) {
logger.info("redis notify: " + key + " = " + urls);
@@ -471,6 +458,11 @@
}
}
+ @Override
+ protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
+ return !providerUrl.getParameter(DYNAMIC_KEY, true) && UrlUtils.isMatch(subscribeUrl, providerUrl);
+ }
+
private String toServiceName(String categoryPath) {
String servicePath = toServicePath(categoryPath);
return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
diff --git a/dubbo-registry/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java b/dubbo-registry/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
index 4c784b3..22b804e 100644
--- a/dubbo-registry/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
+++ b/dubbo-registry/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
@@ -21,12 +21,15 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.registry.RegistryNotifier;
+import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
import com.alipay.sofa.registry.client.api.RegistryClient;
import com.alipay.sofa.registry.client.api.RegistryClientConfig;
import com.alipay.sofa.registry.client.api.Subscriber;
+import com.alipay.sofa.registry.client.api.SubscriberDataObserver;
import com.alipay.sofa.registry.client.api.model.RegistryType;
import com.alipay.sofa.registry.client.api.model.UserData;
import com.alipay.sofa.registry.client.api.registration.PublisherRegistration;
@@ -42,10 +45,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
@@ -60,7 +63,7 @@
*
* @since 2.7.2
*/
-public class SofaRegistry extends FailbackRegistry {
+public class SofaRegistry extends CacheableFailbackRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(SofaRegistry.class);
@@ -165,18 +168,14 @@
LOGGER.warn("Service name [" + serviceName + "] have bean registered in SOFARegistry.");
CountDownLatch countDownLatch = new CountDownLatch(1);
- handleRegistryData(listSubscriber.peekData(), listener, countDownLatch);
+ handleRegistryData(url, listSubscriber.peekData(), listener, countDownLatch);
waitAddress(serviceName, countDownLatch);
return;
}
final CountDownLatch latch = new CountDownLatch(1);
SubscriberRegistration subscriberRegistration = new SubscriberRegistration(serviceName,
- (dataId, data) -> {
- //record change
- printAddressData(dataId, data);
- handleRegistryData(data, listener, latch);
- });
+ new RegistryChildListenerImpl(url, serviceName, listener, latch));
addAttributesForSub(subscriberRegistration);
listSubscriber = registryClient.register(subscriberRegistration);
@@ -207,28 +206,21 @@
registryClient.unregister(serviceName, DEFAULT_GROUP, RegistryType.SUBSCRIBER);
}
- private void handleRegistryData(UserData data, NotifyListener notifyListener,
+ private void handleRegistryData(URL subscribeUrl, UserData data, NotifyListener notifyListener,
CountDownLatch latch) {
try {
- List<URL> urls = new ArrayList<>();
- if (null != data) {
-
- List<String> datas = flatUserData(data);
- for (String serviceUrl : datas) {
- URL url = URL.valueOf(serviceUrl);
- String serverApplication = url.getParameter(APPLICATION_KEY);
- if (StringUtils.isNotEmpty(serverApplication)) {
- url = url.addParameter("dstApp", serverApplication);
- }
- urls.add(url);
- }
- }
+ List<URL> urls = toUrlsWithEmpty(subscribeUrl, PROVIDERS_CATEGORY, flatUserData(data));
notifyListener.notify(urls);
} finally {
latch.countDown();
}
}
+ @Override
+ protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
+ return UrlUtils.isMatch(subscribeUrl, providerUrl);
+ }
+
private String buildServiceName(URL url) {
// return url.getServiceKey();
StringBuilder buf = new StringBuilder();
@@ -289,6 +281,9 @@
*/
protected List<String> flatUserData(UserData userData) {
List<String> result = new ArrayList<>();
+ if (userData == null) {
+ return result;
+ }
Map<String, List<String>> zoneData = userData.getZoneData();
for (Map.Entry<String, List<String>> entry : zoneData.entrySet()) {
@@ -297,4 +292,24 @@
return result;
}
+
+ private class RegistryChildListenerImpl implements SubscriberDataObserver {
+ private RegistryNotifier notifier;
+
+ public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
+ notifier = new RegistryNotifier(SofaRegistry.this) {
+ @Override
+ protected void doNotify(Object rawAddresses) {
+ //record change
+ printAddressData(path, (UserData) rawAddresses);
+ handleRegistryData(consumerUrl, (UserData) rawAddresses, listener, latch);
+ }
+ };
+ }
+
+ @Override
+ public void handleData(String dataId, UserData data) {
+ notifier.notify(data);
+ }
+ }
}
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
index 51fa4aa..f51a6e6 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
@@ -17,15 +17,14 @@
package org.apache.dubbo.registry.zookeeper;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
-import org.apache.dubbo.common.URLStrParser;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.registry.RegistryNotifier;
+import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.zookeeper.ChildListener;
import org.apache.dubbo.remoting.zookeeper.StateListener;
@@ -40,26 +39,24 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
-import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_SEPARATOR_ENCODED;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
/**
* ZookeeperRegistry
- *
*/
-public class ZookeeperRegistry extends FailbackRegistry {
+public class ZookeeperRegistry extends CacheableFailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
@@ -168,10 +165,11 @@
}
}
} else {
+ CountDownLatch latch = new CountDownLatch(1);
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
- ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
+ ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
@@ -179,6 +177,8 @@
}
}
notify(url, listener, urls);
+ // tells the listener to run only after the sync notification of main thread finishes.
+ latch.countDown();
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
@@ -187,9 +187,9 @@
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
- ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
+ ConcurrentMap<NotifyListener, org.apache.dubbo.remoting.zookeeper.ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
- ChildListener zkListener = listeners.get(listener);
+ org.apache.dubbo.remoting.zookeeper.ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
@@ -263,35 +263,6 @@
return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
}
- private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
- List<URL> urls = new ArrayList<>();
- if (CollectionUtils.isNotEmpty(providers)) {
- for (String provider : providers) {
- if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
- URL url = URLStrParser.parseEncodedStr(provider);
- if (UrlUtils.isMatch(consumer, url)) {
- urls.add(url);
- }
- }
- }
- }
- return urls;
- }
-
- private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
- List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
- if (urls == null || urls.isEmpty()) {
- int i = path.lastIndexOf(PATH_SEPARATOR);
- String category = i < 0 ? path : path.substring(i + 1);
- URL empty = URLBuilder.from(consumer)
- .setProtocol(EMPTY_PROTOCOL)
- .addParameter(CATEGORY_KEY, category)
- .build();
- urls.add(empty);
- }
- return urls;
- }
-
/**
* When zookeeper connection recovered from a connection loss, it need to fetch the latest provider list.
* re-register watcher is only a side effect and is not mandate.
@@ -312,4 +283,53 @@
}
}
+ @Override
+ protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
+ return UrlUtils.isMatch(subscribeUrl, providerUrl);
+ }
+
+ private class RegistryChildListenerImpl implements ChildListener {
+ private RegistryNotifier notifier;
+ private long lastExecuteTime;
+ private CountDownLatch latch;
+
+ public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
+ this.latch = latch;
+ notifier = new RegistryNotifier(ZookeeperRegistry.this) {
+ @Override
+ public void notify(Object rawAddresses) {
+ int delayTime = getRegistry().getDelay();
+ if (delayTime <= 0) {
+ this.doNotify(rawAddresses);
+ } else {
+ long interval = delayTime - (System.currentTimeMillis() - lastExecuteTime);
+ if (interval > 0) {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ lastExecuteTime = System.currentTimeMillis();
+ this.doNotify(rawAddresses);
+ }
+ }
+
+ @Override
+ protected void doNotify(Object rawAddresses) {
+ ZookeeperRegistry.this.notify(consumerUrl, listener, ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
+ }
+ };
+ }
+
+ @Override
+ public void childChanged(String path, List<String> children) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ logger.warn("Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread.");
+ }
+ notifier.notify(children);
+ }
+ }
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index 0da56fa..ed7718c 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -264,7 +264,7 @@
}
}
if (modified > 0) {
- notifyExecutor.execute(() -> listener.childChanged(path, new ArrayList<>(urls)));
+ listener.childChanged(path, new ArrayList<>(urls));
}
}