/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.dubbo.registry.integration; | |
import org.apache.dubbo.common.URL; | |
import org.apache.dubbo.common.Version; | |
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; | |
import org.apache.dubbo.common.extension.ExtensionLoader; | |
import org.apache.dubbo.common.logger.Logger; | |
import org.apache.dubbo.common.logger.LoggerFactory; | |
import org.apache.dubbo.common.utils.Assert; | |
import org.apache.dubbo.common.utils.CollectionUtils; | |
import org.apache.dubbo.common.utils.NetUtils; | |
import org.apache.dubbo.common.utils.StringUtils; | |
import org.apache.dubbo.common.utils.UrlUtils; | |
import org.apache.dubbo.registry.AddressListener; | |
import org.apache.dubbo.registry.NotifyListener; | |
import org.apache.dubbo.remoting.Constants; | |
import org.apache.dubbo.rpc.Invocation; | |
import org.apache.dubbo.rpc.Invoker; | |
import org.apache.dubbo.rpc.Protocol; | |
import org.apache.dubbo.rpc.RpcException; | |
import org.apache.dubbo.rpc.cluster.Configurator; | |
import org.apache.dubbo.rpc.cluster.Router; | |
import org.apache.dubbo.rpc.cluster.RouterChain; | |
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory; | |
import org.apache.dubbo.rpc.cluster.governance.GovernanceRuleRepository; | |
import org.apache.dubbo.rpc.cluster.support.ClusterUtils; | |
import org.apache.dubbo.rpc.model.ApplicationModel; | |
import org.apache.dubbo.rpc.protocol.InvokerWrapper; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.Optional; | |
import java.util.Set; | |
import java.util.stream.Collectors; | |
import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL; | |
import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; | |
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.APP_DYNAMIC_CONFIGURATORS_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.COMPATIBLE_CONFIG_KEY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_CONFIGURATORS_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL; | |
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY; | |
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTE_PROTOCOL; | |
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX; | |
import static org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS; | |
import static org.apache.dubbo.remoting.Constants.CHECK_KEY; | |
import static org.apache.dubbo.rpc.cluster.Constants.ROUTER_KEY; | |
/** | |
* RegistryDirectory | |
*/ | |
public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyListener { | |
private static final Logger logger = LoggerFactory.getLogger(RegistryDirectory.class); | |
private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener(); | |
private ReferenceConfigurationListener referenceConfigurationListener; | |
public RegistryDirectory(Class<T> serviceType, URL url) { | |
super(serviceType, url); | |
} | |
@Override | |
public void subscribe(URL url) { | |
setConsumerUrl(url); | |
// overrideConsumerUrl(); | |
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); | |
referenceConfigurationListener = new ReferenceConfigurationListener(this, url); | |
registry.subscribe(url, this); | |
} | |
@Override | |
public void unSubscribe(URL url) { | |
setConsumerUrl(null); | |
CONSUMER_CONFIGURATION_LISTENER.removeNotifyListener(this); | |
referenceConfigurationListener.stop(); | |
registry.unsubscribe(url, this); | |
} | |
@Override | |
public void destroy() { | |
if (isDestroyed()) { | |
return; | |
} | |
// unregister. | |
try { | |
if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) { | |
registry.unregister(getRegisteredConsumerUrl()); | |
} | |
} catch (Throwable t) { | |
logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t); | |
} | |
// unsubscribe. | |
try { | |
if (getConsumerUrl() != null && registry != null && registry.isAvailable()) { | |
registry.unsubscribe(getConsumerUrl(), this); | |
} | |
ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension() | |
.removeListener(ApplicationModel.getApplication(), CONSUMER_CONFIGURATION_LISTENER); | |
} catch (Throwable t) { | |
logger.warn("unexpected error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t); | |
} | |
super.destroy(); // must be executed after unsubscribing | |
try { | |
destroyAllInvokers(); | |
} catch (Throwable t) { | |
logger.warn("Failed to destroy service " + serviceKey, t); | |
} | |
} | |
@Override | |
public synchronized void notify(List<URL> urls) { | |
Map<String, List<URL>> categoryUrls = urls.stream() | |
.filter(Objects::nonNull) | |
.filter(this::isValidCategory) | |
.filter(this::isNotCompatibleFor26x) | |
.collect(Collectors.groupingBy(this::judgeCategory)); | |
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); | |
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); | |
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); | |
toRouters(routerURLs).ifPresent(this::addRouters); | |
// providers | |
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); | |
/** | |
* 3.x added for extend URL address | |
*/ | |
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class); | |
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); | |
if (supportedListeners != null && !supportedListeners.isEmpty()) { | |
for (AddressListener addressListener : supportedListeners) { | |
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this); | |
} | |
} | |
refreshOverrideAndInvoker(providerURLs); | |
} | |
private String judgeCategory(URL url) { | |
if (UrlUtils.isConfigurator(url)) { | |
return CONFIGURATORS_CATEGORY; | |
} else if (UrlUtils.isRoute(url)) { | |
return ROUTERS_CATEGORY; | |
} else if (UrlUtils.isProvider(url)) { | |
return PROVIDERS_CATEGORY; | |
} | |
return ""; | |
} | |
private void refreshOverrideAndInvoker(List<URL> urls) { | |
// mock zookeeper://xxx?mock=return null | |
overrideConsumerUrl(); | |
refreshInvoker(urls); | |
} | |
/** | |
* Convert the invokerURL list to the Invoker Map. The rules of the conversion are as follows: | |
* <ol> | |
* <li> If URL has been converted to invoker, it is no longer re-referenced and obtained directly from the cache, | |
* and notice that any parameter changes in the URL will be re-referenced.</li> | |
* <li>If the incoming invoker list is not empty, it means that it is the latest invoker list.</li> | |
* <li>If the list of incoming invokerUrl is empty, It means that the rule is only a override rule or a route | |
* rule, which needs to be re-contrasted to decide whether to re-reference.</li> | |
* </ol> | |
* | |
* @param invokerUrls this parameter can't be null | |
*/ | |
private void refreshInvoker(List<URL> invokerUrls) { | |
Assert.notNull(invokerUrls, "invokerUrls should not be null"); | |
if (invokerUrls.size() == 1 | |
&& invokerUrls.get(0) != null | |
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { | |
this.forbidden = true; // Forbid to access | |
this.invokers = Collections.emptyList(); | |
routerChain.setInvokers(this.invokers); | |
destroyAllInvokers(); // Close all invokers | |
} else { | |
this.forbidden = false; // Allow to access | |
Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference | |
if (invokerUrls == Collections.<URL>emptyList()) { | |
invokerUrls = new ArrayList<>(); | |
} | |
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { | |
invokerUrls.addAll(this.cachedInvokerUrls); | |
} else { | |
this.cachedInvokerUrls = new HashSet<>(); | |
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison | |
} | |
if (invokerUrls.isEmpty()) { | |
return; | |
} | |
Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map | |
/** | |
* If the calculation is wrong, it is not processed. | |
* | |
* 1. The protocol configured by the client is inconsistent with the protocol of the server. | |
* eg: consumer protocol = dubbo, provider only has other protocol services(rest). | |
* 2. The registration center is not robust and pushes illegal specification data. | |
* | |
*/ | |
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { | |
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls | |
.toString())); | |
return; | |
} | |
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); | |
// pre-route and build cache, notice that route cache should build on original Invoker list. | |
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. | |
routerChain.setInvokers(newInvokers); | |
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; | |
this.urlInvokerMap = newUrlInvokerMap; | |
try { | |
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker | |
} catch (Exception e) { | |
logger.warn("destroyUnusedInvokers error. ", e); | |
} | |
} | |
} | |
private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) { | |
List<Invoker<T>> mergedInvokers = new ArrayList<>(); | |
Map<String, List<Invoker<T>>> groupMap = new HashMap<>(); | |
for (Invoker<T> invoker : invokers) { | |
String group = invoker.getUrl().getParameter(GROUP_KEY, ""); | |
groupMap.computeIfAbsent(group, k -> new ArrayList<>()); | |
groupMap.get(group).add(invoker); | |
} | |
if (groupMap.size() == 1) { | |
mergedInvokers.addAll(groupMap.values().iterator().next()); | |
} else if (groupMap.size() > 1) { | |
for (List<Invoker<T>> groupList : groupMap.values()) { | |
StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList); | |
staticDirectory.buildRouterChain(); | |
mergedInvokers.add(CLUSTER.join(staticDirectory)); | |
} | |
} else { | |
mergedInvokers = invokers; | |
} | |
return mergedInvokers; | |
} | |
/** | |
* @param urls | |
* @return null : no routers ,do nothing | |
* else :routers list | |
*/ | |
private Optional<List<Router>> toRouters(List<URL> urls) { | |
if (urls == null || urls.isEmpty()) { | |
return Optional.empty(); | |
} | |
List<Router> routers = new ArrayList<>(); | |
for (URL url : urls) { | |
if (EMPTY_PROTOCOL.equals(url.getProtocol())) { | |
continue; | |
} | |
String routerType = url.getParameter(ROUTER_KEY); | |
if (routerType != null && routerType.length() > 0) { | |
url = url.setProtocol(routerType); | |
} | |
try { | |
Router router = ROUTER_FACTORY.getRouter(url); | |
if (!routers.contains(router)) { | |
routers.add(router); | |
} | |
} catch (Throwable t) { | |
logger.error("convert router url to router error, url: " + url, t); | |
} | |
} | |
return Optional.of(routers); | |
} | |
/** | |
* Turn urls into invokers, and if url has been refer, will not re-reference. | |
* | |
* @param urls | |
* @return invokers | |
*/ | |
private Map<URL, Invoker<T>> toInvokers(List<URL> urls) { | |
Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>(); | |
if (urls == null || urls.isEmpty()) { | |
return newUrlInvokerMap; | |
} | |
Set<URL> keys = new HashSet<>(); | |
String queryProtocols = this.queryMap.get(PROTOCOL_KEY); | |
for (URL providerUrl : urls) { | |
// If protocol is configured at the reference side, only the matching protocol is selected | |
if (queryProtocols != null && queryProtocols.length() > 0) { | |
boolean accept = false; | |
String[] acceptProtocols = queryProtocols.split(","); | |
for (String acceptProtocol : acceptProtocols) { | |
if (providerUrl.getProtocol().equals(acceptProtocol)) { | |
accept = true; | |
break; | |
} | |
} | |
if (!accept) { | |
continue; | |
} | |
} | |
if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { | |
continue; | |
} | |
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { | |
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + | |
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + | |
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + | |
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); | |
continue; | |
} | |
URL url = mergeUrl(providerUrl); | |
if (keys.contains(url)) { // Repeated url | |
continue; | |
} | |
keys.add(url); | |
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again | |
Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference | |
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(url); | |
if (invoker == null) { // Not in the cache, refer again | |
try { | |
boolean enabled = true; | |
if (url.hasParameter(DISABLED_KEY)) { | |
enabled = !url.getParameter(DISABLED_KEY, false); | |
} else { | |
enabled = url.getParameter(ENABLED_KEY, true); | |
} | |
if (enabled) { | |
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); | |
} | |
} catch (Throwable t) { | |
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); | |
} | |
if (invoker != null) { // Put new invoker in cache | |
newUrlInvokerMap.put(url, invoker); | |
} | |
} else { | |
newUrlInvokerMap.put(url, invoker); | |
} | |
} | |
keys.clear(); | |
return newUrlInvokerMap; | |
} | |
/** | |
* Merge url parameters. the order is: override > -D >Consumer > Provider | |
* | |
* @param providerUrl | |
* @return | |
*/ | |
private URL mergeUrl(URL providerUrl) { | |
providerUrl = ClusterUtils.mergeProviderUrl(providerUrl, queryMap); // Merge the consumer side parameters | |
providerUrl = overrideWithConfigurator(providerUrl); | |
providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker! | |
if ((providerUrl.getPath() == null || providerUrl.getPath() | |
.length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0 | |
//fix by tony.chenl DUBBO-44 | |
String path = getConsumerUrl().getParameter(INTERFACE_KEY); | |
if (path != null) { | |
int i = path.indexOf('/'); | |
if (i >= 0) { | |
path = path.substring(i + 1); | |
} | |
i = path.lastIndexOf(':'); | |
if (i >= 0) { | |
path = path.substring(0, i); | |
} | |
providerUrl = providerUrl.setPath(path); | |
} | |
} | |
return providerUrl; | |
} | |
private URL overrideWithConfigurator(URL providerUrl) { | |
// override url with configurator from "override://" URL for dubbo 2.6 and before | |
providerUrl = overrideWithConfigurators(this.configurators, providerUrl); | |
// override url with configurator from configurator from "app-name.configurators" | |
providerUrl = overrideWithConfigurators(CONSUMER_CONFIGURATION_LISTENER.getConfigurators(), providerUrl); | |
// override url with configurator from configurators from "service-name.configurators" | |
if (referenceConfigurationListener != null) { | |
providerUrl = overrideWithConfigurators(referenceConfigurationListener.getConfigurators(), providerUrl); | |
} | |
return providerUrl; | |
} | |
private URL overrideWithConfigurators(List<Configurator> configurators, URL url) { | |
if (CollectionUtils.isNotEmpty(configurators)) { | |
for (Configurator configurator : configurators) { | |
url = configurator.configure(url); | |
} | |
} | |
return url; | |
} | |
/** | |
* Close all invokers | |
*/ | |
private void destroyAllInvokers() { | |
Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference | |
if (localUrlInvokerMap != null) { | |
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) { | |
try { | |
invoker.destroy(); | |
} catch (Throwable t) { | |
logger.warn("Failed to destroy service " + serviceKey + " to provider " + invoker.getUrl(), t); | |
} | |
} | |
localUrlInvokerMap.clear(); | |
} | |
invokers = null; | |
} | |
/** | |
* Check whether the invoker in the cache needs to be destroyed | |
* If set attribute of url: refer.autodestroy=false, the invokers will only increase without decreasing,there may be a refer leak | |
* | |
* @param oldUrlInvokerMap | |
* @param newUrlInvokerMap | |
*/ | |
private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) { | |
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { | |
destroyAllInvokers(); | |
return; | |
} | |
// check deleted invoker | |
List<URL> deleted = null; | |
if (oldUrlInvokerMap != null) { | |
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values(); | |
for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) { | |
if (!newInvokers.contains(entry.getValue())) { | |
if (deleted == null) { | |
deleted = new ArrayList<>(); | |
} | |
deleted.add(entry.getKey()); | |
} | |
} | |
} | |
if (deleted != null) { | |
for (URL url : deleted) { | |
if (url != null) { | |
Invoker<T> invoker = oldUrlInvokerMap.remove(url); | |
if (invoker != null) { | |
try { | |
invoker.destroy(); | |
if (logger.isDebugEnabled()) { | |
logger.debug("destroy invoker[" + invoker.getUrl() + "] success. "); | |
} | |
} catch (Exception e) { | |
logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e); | |
} | |
} | |
} | |
} | |
} | |
} | |
@Override | |
public List<Invoker<T>> doList(Invocation invocation) { | |
if (forbidden) { | |
// 1. No service provider 2. Service providers are disabled | |
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + | |
getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + | |
NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + | |
", please check status of providers(disabled, not registered or in blacklist)."); | |
} | |
if (multiGroup) { | |
return this.invokers == null ? Collections.emptyList() : this.invokers; | |
} | |
List<Invoker<T>> invokers = null; | |
try { | |
// Get invokers from cache, only runtime routers will be executed. | |
invokers = routerChain.route(getConsumerUrl(), invocation); | |
} catch (Throwable t) { | |
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); | |
} | |
return invokers == null ? Collections.emptyList() : invokers; | |
} | |
@Override | |
public Class<T> getInterface() { | |
return serviceType; | |
} | |
@Override | |
public List<Invoker<T>> getAllInvokers() { | |
return invokers; | |
} | |
public URL getRegisteredConsumerUrl() { | |
return registeredConsumerUrl; | |
} | |
public void setRegisteredConsumerUrl(URL url) { | |
if (!shouldSimplified) { | |
this.registeredConsumerUrl = url.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY, | |
String.valueOf(false)); | |
} else { | |
this.registeredConsumerUrl = URL.valueOf(url, DEFAULT_REGISTER_CONSUMER_KEYS, null).addParameters( | |
CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY, String.valueOf(false)); | |
} | |
} | |
@Override | |
public boolean isAvailable() { | |
if (isDestroyed()) { | |
return false; | |
} | |
Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap; | |
if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) { | |
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) { | |
if (invoker.isAvailable()) { | |
return true; | |
} | |
} | |
} | |
return false; | |
} | |
public void buildRouterChain(URL url) { | |
this.setRouterChain(RouterChain.buildChain(url)); | |
} | |
/** | |
* Haomin: added for test purpose | |
*/ | |
public Map<URL, Invoker<T>> getUrlInvokerMap() { | |
return urlInvokerMap; | |
} | |
public List<Invoker<T>> getInvokers() { | |
return invokers; | |
} | |
private boolean isValidCategory(URL url) { | |
String category = url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); | |
if ((ROUTERS_CATEGORY.equals(category) || ROUTE_PROTOCOL.equals(url.getProtocol())) || | |
PROVIDERS_CATEGORY.equals(category) || | |
CONFIGURATORS_CATEGORY.equals(category) || DYNAMIC_CONFIGURATORS_CATEGORY.equals(category) || | |
APP_DYNAMIC_CONFIGURATORS_CATEGORY.equals(category)) { | |
return true; | |
} | |
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + | |
getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); | |
return false; | |
} | |
private boolean isNotCompatibleFor26x(URL url) { | |
return StringUtils.isEmpty(url.getParameter(COMPATIBLE_CONFIG_KEY)); | |
} | |
private void overrideConsumerUrl() { | |
// merge override parameters | |
this.overrideConsumerUrl = getConsumerUrl(); | |
if (overrideConsumerUrl != null) { | |
List<Configurator> localConfigurators = this.configurators; // local reference | |
doOverrideUrl(localConfigurators); | |
List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference | |
doOverrideUrl(localAppDynamicConfigurators); | |
if (referenceConfigurationListener != null) { | |
List<Configurator> localDynamicConfigurators = referenceConfigurationListener.getConfigurators(); // local reference | |
doOverrideUrl(localDynamicConfigurators); | |
} | |
} | |
} | |
private void doOverrideUrl(List<Configurator> configurators) { | |
if (CollectionUtils.isNotEmpty(configurators)) { | |
for (Configurator configurator : configurators) { | |
this.overrideConsumerUrl = configurator.configure(overrideConsumerUrl); | |
} | |
} | |
} | |
/** | |
* The delegate class, which is mainly used to store the URL address sent by the registry,and can be reassembled on the basis of providerURL queryMap overrideMap for re-refer. | |
* | |
* @param <T> | |
*/ | |
private static class InvokerDelegate<T> extends InvokerWrapper<T> { | |
private URL providerUrl; | |
public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) { | |
super(invoker, url); | |
this.providerUrl = providerUrl; | |
} | |
public URL getProviderUrl() { | |
return providerUrl; | |
} | |
} | |
private static class ReferenceConfigurationListener extends AbstractConfiguratorListener { | |
private RegistryDirectory directory; | |
private URL url; | |
ReferenceConfigurationListener(RegistryDirectory directory, URL url) { | |
this.directory = directory; | |
this.url = url; | |
this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX); | |
} | |
void stop() { | |
this.stopListen(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX); | |
} | |
@Override | |
protected void notifyOverrides() { | |
// to notify configurator/router changes | |
directory.refreshOverrideAndInvoker(Collections.emptyList()); | |
} | |
} | |
private static class ConsumerConfigurationListener extends AbstractConfiguratorListener { | |
List<RegistryDirectory> listeners = new ArrayList<>(); | |
ConsumerConfigurationListener() { | |
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX); | |
} | |
void addNotifyListener(RegistryDirectory listener) { | |
this.listeners.add(listener); | |
} | |
void removeNotifyListener(RegistryDirectory listener) { | |
this.listeners.remove(listener); | |
} | |
@Override | |
protected void notifyOverrides() { | |
listeners.forEach(listener -> listener.refreshOverrideAndInvoker(Collections.emptyList())); | |
} | |
} | |
} |