/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.dubbo.registry.integration; | |
import org.apache.dubbo.common.URL; | |
import org.apache.dubbo.common.URLBuilder; | |
import org.apache.dubbo.common.config.ConfigurationUtils; | |
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; | |
import org.apache.dubbo.common.extension.ExtensionLoader; | |
import org.apache.dubbo.common.logger.Logger; | |
import org.apache.dubbo.common.logger.LoggerFactory; | |
import org.apache.dubbo.common.timer.HashedWheelTimer; | |
import org.apache.dubbo.common.utils.CollectionUtils; | |
import org.apache.dubbo.common.utils.NamedThreadFactory; | |
import org.apache.dubbo.common.utils.StringUtils; | |
import org.apache.dubbo.common.utils.UrlUtils; | |
import org.apache.dubbo.registry.NotifyListener; | |
import org.apache.dubbo.registry.Registry; | |
import org.apache.dubbo.registry.RegistryFactory; | |
import org.apache.dubbo.registry.RegistryService; | |
import org.apache.dubbo.registry.retry.ReExportTask; | |
import org.apache.dubbo.registry.support.SkipFailbackWrapperException; | |
import org.apache.dubbo.rpc.Exporter; | |
import org.apache.dubbo.rpc.Invoker; | |
import org.apache.dubbo.rpc.Protocol; | |
import org.apache.dubbo.rpc.ProtocolServer; | |
import org.apache.dubbo.rpc.ProxyFactory; | |
import org.apache.dubbo.rpc.RpcException; | |
import org.apache.dubbo.rpc.cluster.Cluster; | |
import org.apache.dubbo.rpc.cluster.Configurator; | |
import org.apache.dubbo.rpc.cluster.governance.GovernanceRuleRepository; | |
import org.apache.dubbo.rpc.model.ApplicationModel; | |
import org.apache.dubbo.rpc.model.ProviderModel; | |
import org.apache.dubbo.rpc.protocol.InvokerWrapper; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import static java.util.concurrent.Executors.newSingleThreadExecutor; | |
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; | |
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.EXTRA_KEYS_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.HIDE_KEY_PREFIX; | |
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.OVERRIDE_PROTOCOL; | |
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL; | |
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY; | |
import static org.apache.dubbo.common.utils.UrlUtils.classifyUrls; | |
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX; | |
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL; | |
import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY; | |
import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RETRY_PERIOD; | |
import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL; | |
import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY; | |
import static org.apache.dubbo.registry.Constants.REGISTER_KEY; | |
import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY; | |
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY; | |
import static org.apache.dubbo.remoting.Constants.CHECK_KEY; | |
import static org.apache.dubbo.remoting.Constants.CODEC_KEY; | |
import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY; | |
import static org.apache.dubbo.remoting.Constants.EXCHANGER_KEY; | |
import static org.apache.dubbo.remoting.Constants.PAYLOAD_KEY; | |
import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY; | |
import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY; | |
import static org.apache.dubbo.rpc.Constants.MOCK_KEY; | |
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY; | |
import static org.apache.dubbo.rpc.cluster.Constants.EXPORT_KEY; | |
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY; | |
import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY; | |
import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY; | |
/** | |
* RegistryProtocol | |
*/ | |
public class RegistryProtocol implements Protocol { | |
public static final String[] DEFAULT_REGISTER_PROVIDER_KEYS = { | |
APPLICATION_KEY, CODEC_KEY, EXCHANGER_KEY, SERIALIZATION_KEY, CLUSTER_KEY, CONNECTIONS_KEY, DEPRECATED_KEY, | |
GROUP_KEY, LOADBALANCE_KEY, MOCK_KEY, PATH_KEY, TIMEOUT_KEY, TOKEN_KEY, VERSION_KEY, WARMUP_KEY, | |
WEIGHT_KEY, DUBBO_VERSION_KEY, RELEASE_KEY, PAYLOAD_KEY | |
}; | |
public static final String[] DEFAULT_REGISTER_CONSUMER_KEYS = { | |
APPLICATION_KEY, VERSION_KEY, GROUP_KEY, DUBBO_VERSION_KEY, RELEASE_KEY | |
}; | |
private final static Logger logger = LoggerFactory.getLogger(RegistryProtocol.class); | |
private final Map<URL, NotifyListener> overrideListeners = new ConcurrentHashMap<>(); | |
private final Map<String, ServiceConfigurationListener> serviceConfigurationListeners = new ConcurrentHashMap<>(); | |
private final ProviderConfigurationListener providerConfigurationListener = new ProviderConfigurationListener(); | |
//To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. | |
//providerurl <--> exporter | |
private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<>(); | |
private Cluster cluster; | |
private Protocol protocol; | |
private RegistryFactory registryFactory; | |
private ProxyFactory proxyFactory; | |
private ConcurrentMap<URL, ReExportTask> reExportFailedTasks = new ConcurrentHashMap<>(); | |
private HashedWheelTimer retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboReexportTimer", true), DEFAULT_REGISTRY_RETRY_PERIOD, TimeUnit.MILLISECONDS, 128); | |
//Filter the parameters that do not need to be output in url(Starting with .) | |
private static String[] getFilteredKeys(URL url) { | |
Map<String, String> params = url.getParameters(); | |
if (CollectionUtils.isNotEmptyMap(params)) { | |
return params.keySet().stream() | |
.filter(k -> k.startsWith(HIDE_KEY_PREFIX)) | |
.toArray(String[]::new); | |
} else { | |
return new String[0]; | |
} | |
} | |
public void setCluster(Cluster cluster) { | |
this.cluster = cluster; | |
} | |
public void setProtocol(Protocol protocol) { | |
this.protocol = protocol; | |
} | |
public void setRegistryFactory(RegistryFactory registryFactory) { | |
this.registryFactory = registryFactory; | |
} | |
public void setProxyFactory(ProxyFactory proxyFactory) { | |
this.proxyFactory = proxyFactory; | |
} | |
@Override | |
public int getDefaultPort() { | |
return 9090; | |
} | |
public Map<URL, NotifyListener> getOverrideListeners() { | |
return overrideListeners; | |
} | |
private void register(URL registryUrl, URL registeredProviderUrl) { | |
Registry registry = registryFactory.getRegistry(registryUrl); | |
registry.register(registeredProviderUrl); | |
} | |
private void registerStatedUrl(URL registryUrl, URL registeredProviderUrl, boolean registered) { | |
ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey()); | |
model.addStatedUrl(new ProviderModel.RegisterStatedURL( | |
registeredProviderUrl, | |
registryUrl, | |
registered)); | |
} | |
@Override | |
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { | |
URL registryUrl = getRegistryUrl(originInvoker); | |
// url to export locally | |
URL providerUrl = getProviderUrl(originInvoker); | |
// Subscribe the override data | |
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call | |
// the same service. Because the subscribed is cached key with the name of the service, it causes the | |
// subscription information to cover. | |
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); | |
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); | |
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); | |
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); | |
providerUrl = UrlUtils.unmodifiableUrl(providerUrl); | |
//export invoker | |
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); | |
// url to registry | |
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); | |
// decide if we need to delay publish | |
boolean register = providerUrl.getParameter(REGISTER_KEY, true); | |
if (register) { | |
register(registryUrl, registeredProviderUrl); | |
} | |
// register stated url on provider model | |
registerStatedUrl(registryUrl, registeredProviderUrl, register); | |
// Deprecated! Subscribe to override rules in 2.6.x or before. | |
final Registry registry = registryFactory.getRegistry(registryUrl); | |
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); | |
exporter.setRegisterUrl(registeredProviderUrl); | |
exporter.setSubscribeUrl(overrideSubscribeUrl); | |
notifyExport(exporter); | |
//Ensure that a new exporter instance is returned every time export | |
return new DestroyableExporter<>(exporter); | |
} | |
private <T> void notifyExport(ExporterChangeableWrapper<T> exporter) { | |
List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class) | |
.getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener"); | |
if (CollectionUtils.isNotEmpty(listeners)) { | |
for (RegistryProtocolListener listener : listeners) { | |
listener.onExport(this, exporter); | |
} | |
} | |
} | |
private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) { | |
providerUrl = providerConfigurationListener.overrideUrl(providerUrl); | |
ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener); | |
serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener); | |
return serviceConfigurationListener.overrideUrl(providerUrl); | |
} | |
@SuppressWarnings("unchecked") | |
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { | |
String key = getCacheKey(originInvoker); | |
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { | |
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); | |
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); | |
}); | |
} | |
public <T> void reExport(Exporter<T> exporter, URL newInvokerUrl) { | |
if (exporter instanceof ExporterChangeableWrapper) { | |
ExporterChangeableWrapper<T> exporterWrapper = (ExporterChangeableWrapper<T>) exporter; | |
Invoker<T> originInvoker = exporterWrapper.getOriginInvoker(); | |
reExport(originInvoker, newInvokerUrl); | |
} | |
} | |
/** | |
* Reexport the invoker of the modified url | |
* | |
* @param originInvoker | |
* @param newInvokerUrl | |
* @param <T> | |
*/ | |
@SuppressWarnings("unchecked") | |
public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) { | |
String key = getCacheKey(originInvoker); | |
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); | |
URL registeredUrl = exporter.getRegisterUrl(); | |
URL registryUrl = getRegistryUrl(originInvoker); | |
URL newProviderUrl = getUrlToRegistry(newInvokerUrl, registryUrl); | |
// update local exporter | |
Invoker<T> invokerDelegate = new InvokerDelegate<T>(originInvoker, newInvokerUrl); | |
exporter.setExporter(protocol.export(invokerDelegate)); | |
// update registry | |
if (!newProviderUrl.equals(registeredUrl)) { | |
try { | |
doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl); | |
} catch (Exception e) { | |
ReExportTask oldTask = reExportFailedTasks.get(registeredUrl); | |
if (oldTask != null) { | |
return; | |
} | |
ReExportTask task = new ReExportTask( | |
() -> doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl), | |
registeredUrl, | |
null | |
); | |
oldTask = reExportFailedTasks.putIfAbsent(registeredUrl, task); | |
if (oldTask == null) { | |
// never has a retry task. then start a new task for retry. | |
retryTimer.newTimeout(task, registryUrl.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD), TimeUnit.MILLISECONDS); | |
} | |
} | |
} | |
} | |
private <T> void doReExport(final Invoker<T> originInvoker, ExporterChangeableWrapper<T> exporter, | |
URL registryUrl, URL oldProviderUrl, URL newProviderUrl) { | |
if (getProviderUrl(originInvoker).getParameter(REGISTER_KEY, true)) { | |
Registry registry = null; | |
try { | |
registry = getRegistry(originInvoker); | |
} catch (Exception e) { | |
throw new SkipFailbackWrapperException(e); | |
} | |
logger.info("Try to unregister old url: " + oldProviderUrl); | |
registry.reExportUnregister(oldProviderUrl); | |
logger.info("Try to register new url: " + newProviderUrl); | |
registry.reExportRegister(newProviderUrl); | |
} | |
try { | |
ProviderModel.RegisterStatedURL statedUrl = getStatedUrl(registryUrl, newProviderUrl); | |
statedUrl.setProviderUrl(newProviderUrl); | |
exporter.setRegisterUrl(newProviderUrl); | |
} catch (Exception e) { | |
throw new SkipFailbackWrapperException(e); | |
} | |
} | |
private ProviderModel.RegisterStatedURL getStatedUrl(URL registryUrl, URL providerUrl) { | |
ProviderModel providerModel = ApplicationModel.getServiceRepository() | |
.lookupExportedService(providerUrl.getServiceKey()); | |
List<ProviderModel.RegisterStatedURL> statedUrls = providerModel.getStatedUrl(); | |
return statedUrls.stream() | |
.filter(u -> u.getRegistryUrl().equals(registryUrl) | |
&& u.getProviderUrl().getProtocol().equals(providerUrl.getProtocol())) | |
.findFirst().orElseThrow(() -> new IllegalStateException("There should have at least one registered url.")); | |
} | |
/** | |
* Get an instance of registry based on the address of invoker | |
* | |
* @param originInvoker | |
* @return | |
*/ | |
protected Registry getRegistry(final Invoker<?> originInvoker) { | |
URL registryUrl = getRegistryUrl(originInvoker); | |
return registryFactory.getRegistry(registryUrl); | |
} | |
protected URL getRegistryUrl(Invoker<?> originInvoker) { | |
URLBuilder builder = URLBuilder.from(originInvoker.getUrl()); | |
if (REGISTRY_PROTOCOL.equals(builder.getProtocol())) { | |
String protocol = builder.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY); | |
builder.setProtocol(protocol).removeParameter(REGISTRY_KEY); | |
} | |
return builder.build(); | |
} | |
protected URL getRegistryUrl(URL url) { | |
return URLBuilder.from(url) | |
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) | |
.removeParameter(REGISTRY_KEY) | |
.build(); | |
} | |
/** | |
* Return the url that is registered to the registry and filter the url parameter once | |
* | |
* @param providerUrl | |
* @return url to registry. | |
*/ | |
private URL getUrlToRegistry(final URL providerUrl, final URL registryUrl) { | |
if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) { | |
return providerUrl; | |
} | |
String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, ""); | |
// if path is not the same as interface name then we should keep INTERFACE_KEY, | |
// otherwise, the registry structure of zookeeper would be '/dubbo/path/providers', | |
// but what we expect is '/dubbo/interface/providers' | |
if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) { | |
if (StringUtils.isNotEmpty(extraKeys)) { | |
extraKeys += ","; | |
} | |
extraKeys += INTERFACE_KEY; | |
} | |
String[] extraKeyArrays = COMMA_SPLIT_PATTERN.split(extraKeys); | |
String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS, extraKeyArrays); | |
// TODO, avoid creation of URL | |
//The address you see at the registry | |
return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null)); | |
} | |
private URL getSubscribedOverrideUrl(URL registeredProviderUrl) { | |
return URLBuilder.from(registeredProviderUrl) | |
.setProtocol(PROVIDER_PROTOCOL) | |
.addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false)) | |
.build(); | |
} | |
/** | |
* Get the address of the providerUrl through the url of the invoker | |
* | |
* @param originInvoker | |
* @return | |
*/ | |
private URL getProviderUrl(final Invoker<?> originInvoker) { | |
String export = originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY); | |
if (export == null || export.length() == 0) { | |
throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl()); | |
} | |
return URLBuilder.valueOf(export); | |
} | |
/** | |
* Get the key cached in bounds by invoker | |
* | |
* @param originInvoker | |
* @return | |
*/ | |
private String getCacheKey(final Invoker<?> originInvoker) { | |
return originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY); | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { | |
url = getRegistryUrl(url); | |
Registry registry = registryFactory.getRegistry(url); | |
if (RegistryService.class.equals(type)) { | |
return proxyFactory.getInvoker((T) registry, type, url); | |
} | |
// group="a,b" or group="*" | |
Map<String, String> qs = Collections.unmodifiableMap(StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY))); | |
String group = qs.get(GROUP_KEY); | |
if (group != null && group.length() > 0) { | |
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { | |
return doRefer(getMergeableCluster(), registry, type, url, qs); | |
} | |
} | |
return doRefer(cluster, registry, type, url, qs); | |
} | |
private Cluster getMergeableCluster() { | |
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable"); | |
} | |
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) { | |
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url, parameters); | |
directory.setRegistry(registry); | |
directory.setProtocol(protocol); | |
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.get(REGISTER_IP_KEY), 0, type.getName(), parameters); | |
subscribeUrl = toSubscribeUrl(subscribeUrl); | |
if (directory.isShouldRegister()) { | |
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); | |
registry.register(directory.getRegisteredConsumerUrl()); | |
} | |
directory.buildRouterChain(subscribeUrl); | |
directory.subscribe(subscribeUrl); | |
Invoker<T> invoker = cluster.join(directory); | |
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url); | |
if (CollectionUtils.isEmpty(listeners)) { | |
return invoker; | |
} | |
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl); | |
for (RegistryProtocolListener listener : listeners) { | |
listener.onRefer(this, registryInvokerWrapper); | |
} | |
return registryInvokerWrapper; | |
} | |
public <T> void reRefer(Invoker<T> invoker, URL newSubscribeUrl) { | |
if (!(invoker instanceof RegistryInvokerWrapper)) { | |
return; | |
} | |
RegistryInvokerWrapper<T> invokerWrapper = (RegistryInvokerWrapper<T>) invoker; | |
URL oldSubscribeUrl = invokerWrapper.getUrl(); | |
RegistryDirectory<T> directory = invokerWrapper.getDirectory(); | |
Registry registry = directory.getRegistry(); | |
registry.unregister(directory.getRegisteredConsumerUrl()); | |
directory.unSubscribe(toSubscribeUrl(oldSubscribeUrl)); | |
directory.setRegisteredConsumerUrl(newSubscribeUrl); | |
registry.register(directory.getRegisteredConsumerUrl()); | |
directory.buildRouterChain(newSubscribeUrl); | |
directory.subscribe(toSubscribeUrl(newSubscribeUrl)); | |
invokerWrapper.setInvoker(invokerWrapper.getCluster().join(directory)); | |
invokerWrapper.setUrl(newSubscribeUrl); | |
} | |
private static URL toSubscribeUrl(URL url) { | |
return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY); | |
} | |
public URL getRegisteredConsumerUrl(final URL consumerUrl, URL registryUrl) { | |
if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) { | |
return consumerUrl.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY, | |
CHECK_KEY, String.valueOf(false)); | |
} else { | |
return URL.valueOf(consumerUrl, DEFAULT_REGISTER_CONSUMER_KEYS, null).addParameters( | |
CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY, String.valueOf(false)); | |
} | |
} | |
private List<RegistryProtocolListener> findRegistryProtocolListeners(URL url) { | |
return ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class) | |
.getActivateExtension(url, "registry.protocol.listener"); | |
} | |
// available to test | |
public String[] getParamsToRegistry(String[] defaultKeys, String[] additionalParameterKeys) { | |
int additionalLen = additionalParameterKeys.length; | |
String[] registryParams = new String[defaultKeys.length + additionalLen]; | |
System.arraycopy(defaultKeys, 0, registryParams, 0, defaultKeys.length); | |
System.arraycopy(additionalParameterKeys, 0, registryParams, defaultKeys.length, additionalLen); | |
return registryParams; | |
} | |
@Override | |
public void destroy() { | |
List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class) | |
.getLoadedExtensionInstances(); | |
if (CollectionUtils.isNotEmpty(listeners)) { | |
for (RegistryProtocolListener listener : listeners) { | |
listener.onDestroy(); | |
} | |
} | |
List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values()); | |
for (Exporter<?> exporter : exporters) { | |
exporter.unexport(); | |
} | |
bounds.clear(); | |
ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension() | |
.removeListener(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX, providerConfigurationListener); | |
} | |
@Override | |
public List<ProtocolServer> getServers() { | |
return protocol.getServers(); | |
} | |
//Merge the urls of configurators | |
private static URL getConfigedInvokerUrl(List<Configurator> configurators, URL url) { | |
if (configurators != null && configurators.size() > 0) { | |
for (Configurator configurator : configurators) { | |
url = configurator.configure(url); | |
} | |
} | |
return url; | |
} | |
public static class InvokerDelegate<T> extends InvokerWrapper<T> { | |
private final Invoker<T> invoker; | |
/** | |
* @param invoker | |
* @param url invoker.getUrl return this value | |
*/ | |
public InvokerDelegate(Invoker<T> invoker, URL url) { | |
super(invoker, url); | |
this.invoker = invoker; | |
} | |
public Invoker<T> getInvoker() { | |
if (invoker instanceof InvokerDelegate) { | |
return ((InvokerDelegate<T>) invoker).getInvoker(); | |
} else { | |
return invoker; | |
} | |
} | |
} | |
private static class DestroyableExporter<T> implements Exporter<T> { | |
private Exporter<T> exporter; | |
public DestroyableExporter(Exporter<T> exporter) { | |
this.exporter = exporter; | |
} | |
@Override | |
public Invoker<T> getInvoker() { | |
return exporter.getInvoker(); | |
} | |
@Override | |
public void unexport() { | |
exporter.unexport(); | |
} | |
} | |
/** | |
* Reexport: the exporter destroy problem in protocol | |
* 1.Ensure that the exporter returned by registryprotocol can be normal destroyed | |
* 2.No need to re-register to the registry after notify | |
* 3.The invoker passed by the export method , would better to be the invoker of exporter | |
*/ | |
private class OverrideListener implements NotifyListener { | |
private final URL subscribeUrl; | |
private final Invoker originInvoker; | |
private List<Configurator> configurators; | |
public OverrideListener(URL subscribeUrl, Invoker originalInvoker) { | |
this.subscribeUrl = subscribeUrl; | |
this.originInvoker = originalInvoker; | |
} | |
/** | |
* @param urls The list of registered information, is always not empty, The meaning is the same as the | |
* return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}. | |
*/ | |
@Override | |
public synchronized void notify(List<URL> urls) { | |
logger.debug("original override urls: " + urls); | |
List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl); | |
logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls); | |
// No matching results | |
if (matchedUrls.isEmpty()) { | |
return; | |
} | |
this.configurators = Configurator.toConfigurators(classifyUrls(matchedUrls, UrlUtils::isConfigurator)) | |
.orElse(configurators); | |
doOverrideIfNecessary(); | |
} | |
public synchronized void doOverrideIfNecessary() { | |
final Invoker<?> invoker; | |
if (originInvoker instanceof InvokerDelegate) { | |
invoker = ((InvokerDelegate<?>) originInvoker).getInvoker(); | |
} else { | |
invoker = originInvoker; | |
} | |
//The origin invoker | |
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker); | |
String key = getCacheKey(originInvoker); | |
ExporterChangeableWrapper<?> exporter = bounds.get(key); | |
if (exporter == null) { | |
logger.warn(new IllegalStateException("error state, exporter should not be null")); | |
return; | |
} | |
//The current, may have been merged many times | |
URL currentUrl = exporter.getInvoker().getUrl(); | |
//Merged with this configuration | |
URL newUrl = getConfigedInvokerUrl(configurators, currentUrl); | |
newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl); | |
newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey()) | |
.getConfigurators(), newUrl); | |
if (!currentUrl.equals(newUrl)) { | |
RegistryProtocol.this.reExport(originInvoker, newUrl); | |
logger.info("exported provider url changed, origin url: " + originUrl + | |
", old export url: " + currentUrl + ", new export url: " + newUrl); | |
} | |
} | |
private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) { | |
List<URL> result = new ArrayList<URL>(); | |
for (URL url : configuratorUrls) { | |
URL overrideUrl = url; | |
// Compatible with the old version | |
if (url.getParameter(CATEGORY_KEY) == null && OVERRIDE_PROTOCOL.equals(url.getProtocol())) { | |
overrideUrl = url.addParameter(CATEGORY_KEY, CONFIGURATORS_CATEGORY); | |
} | |
// Check whether url is to be applied to the current service | |
if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) { | |
result.add(url); | |
} | |
} | |
return result; | |
} | |
} | |
private class ServiceConfigurationListener extends AbstractConfiguratorListener { | |
private URL providerUrl; | |
private OverrideListener notifyListener; | |
public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) { | |
this.providerUrl = providerUrl; | |
this.notifyListener = notifyListener; | |
this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX); | |
} | |
private <T> URL overrideUrl(URL providerUrl) { | |
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl); | |
} | |
@Override | |
protected void notifyOverrides() { | |
notifyListener.doOverrideIfNecessary(); | |
} | |
} | |
private class ProviderConfigurationListener extends AbstractConfiguratorListener { | |
public ProviderConfigurationListener() { | |
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX); | |
} | |
/** | |
* Get existing configuration rule and override provider url before exporting. | |
* | |
* @param providerUrl | |
* @param <T> | |
* @return | |
*/ | |
private <T> URL overrideUrl(URL providerUrl) { | |
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl); | |
} | |
@Override | |
protected void notifyOverrides() { | |
overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary()); | |
} | |
} | |
/** | |
* exporter proxy, establish the corresponding relationship between the returned exporter and the exporter | |
* exported by the protocol, and can modify the relationship at the time of override. | |
* | |
* @param <T> | |
*/ | |
private class ExporterChangeableWrapper<T> implements Exporter<T> { | |
private final ExecutorService executor = newSingleThreadExecutor(new NamedThreadFactory("Exporter-Unexport", true)); | |
private final Invoker<T> originInvoker; | |
private Exporter<T> exporter; | |
private URL subscribeUrl; | |
private URL registerUrl; | |
public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) { | |
this.exporter = exporter; | |
this.originInvoker = originInvoker; | |
} | |
public Invoker<T> getOriginInvoker() { | |
return originInvoker; | |
} | |
@Override | |
public Invoker<T> getInvoker() { | |
return exporter.getInvoker(); | |
} | |
public void setExporter(Exporter<T> exporter) { | |
this.exporter = exporter; | |
} | |
@Override | |
public void unexport() { | |
String key = getCacheKey(this.originInvoker); | |
bounds.remove(key); | |
Registry registry = RegistryProtocol.this.getRegistry(originInvoker); | |
try { | |
registry.unregister(registerUrl); | |
} catch (Throwable t) { | |
logger.warn(t.getMessage(), t); | |
} | |
try { | |
NotifyListener listener = RegistryProtocol.this.overrideListeners.remove(subscribeUrl); | |
registry.unsubscribe(subscribeUrl, listener); | |
ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension() | |
.removeListener(subscribeUrl.getServiceKey() + CONFIGURATORS_SUFFIX, | |
serviceConfigurationListeners.get(subscribeUrl.getServiceKey())); | |
} catch (Throwable t) { | |
logger.warn(t.getMessage(), t); | |
} | |
executor.submit(() -> { | |
try { | |
int timeout = ConfigurationUtils.getServerShutdownTimeout(); | |
if (timeout > 0) { | |
logger.info("Waiting " + timeout + "ms for registry to notify all consumers before unexport. " + | |
"Usually, this is called when you use dubbo API"); | |
Thread.sleep(timeout); | |
} | |
exporter.unexport(); | |
} catch (Throwable t) { | |
logger.warn(t.getMessage(), t); | |
} | |
}); | |
} | |
public void setSubscribeUrl(URL subscribeUrl) { | |
this.subscribeUrl = subscribeUrl; | |
} | |
public void setRegisterUrl(URL registerUrl) { | |
this.registerUrl = registerUrl; | |
} | |
public URL getRegisterUrl() { | |
return registerUrl; | |
} | |
} | |
// for unit test | |
private static RegistryProtocol INSTANCE; | |
// for unit test | |
public RegistryProtocol() { | |
INSTANCE = this; | |
} | |
// for unit test | |
public static RegistryProtocol getRegistryProtocol() { | |
if (INSTANCE == null) { | |
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(REGISTRY_PROTOCOL); // load | |
} | |
return INSTANCE; | |
} | |
} |