blob: fd414edd1357d27b975f91387e2ba4f5855d578b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.registry.integration;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ConfigUtils;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.common.utils.UrlUtils;
import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.Registry;
import com.alibaba.dubbo.registry.RegistryFactory;
import com.alibaba.dubbo.registry.RegistryService;
import com.alibaba.dubbo.registry.support.ProviderConsumerRegTable;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.Cluster;
import com.alibaba.dubbo.rpc.cluster.Configurator;
import com.alibaba.dubbo.rpc.protocol.InvokerWrapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.alibaba.dubbo.common.Constants.ACCEPT_FOREIGN_IP;
import static com.alibaba.dubbo.common.Constants.QOS_ENABLE;
import static com.alibaba.dubbo.common.Constants.QOS_PORT;
import static com.alibaba.dubbo.common.Constants.VALIDATION_KEY;
/**
* RegistryProtocol
*
*/
public class RegistryProtocol implements Protocol {
private final static Logger logger = LoggerFactory.getLogger(RegistryProtocol.class);
private static RegistryProtocol INSTANCE;
private final Map<URL, NotifyListener> overrideListeners = new ConcurrentHashMap<URL, NotifyListener>();
//To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
//providerurl <--> exporter
private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();
private Cluster cluster;
private Protocol protocol;
private RegistryFactory registryFactory;
private ProxyFactory proxyFactory;
public RegistryProtocol() {
INSTANCE = this;
}
public static RegistryProtocol getRegistryProtocol() {
if (INSTANCE == null) {
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(Constants.REGISTRY_PROTOCOL); // load
}
return INSTANCE;
}
//Filter the parameters that do not need to be output in url(Starting with .)
private static String[] getFilteredKeys(URL url) {
Map<String, String> params = url.getParameters();
if (params != null && !params.isEmpty()) {
List<String> filteredKeys = new ArrayList<String>();
for (Map.Entry<String, String> entry : params.entrySet()) {
if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) {
filteredKeys.add(entry.getKey());
}
}
return filteredKeys.toArray(new String[filteredKeys.size()]);
} else {
return new String[]{};
}
}
public void setCluster(Cluster cluster) {
this.cluster = cluster;
}
public void setProtocol(Protocol protocol) {
this.protocol = protocol;
}
public void setRegistryFactory(RegistryFactory registryFactory) {
this.registryFactory = registryFactory;
}
public void setProxyFactory(ProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
}
@Override
public int getDefaultPort() {
return 9090;
}
public Map<URL, NotifyListener> getOverrideListeners() {
return overrideListeners;
}
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
/**
* Reexport the invoker of the modified url
*
* @param originInvoker
* @param newInvokerUrl
*/
@SuppressWarnings("unchecked")
private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
String key = getCacheKey(originInvoker);
final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
logger.warn(new IllegalStateException("error state, exporter should not be null"));
} else {
final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl);
exporter.setExporter(protocol.export(invokerDelegete));
}
}
/**
* Get an instance of registry based on the address of invoker
*
* @param originInvoker
* @return
*/
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
private URL getRegistryUrl(Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryUrl;
}
/**
* Return the url that is registered to the registry and filter the url parameter once
*
* @param originInvoker
* @return
*/
private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
URL providerUrl = getProviderUrl(originInvoker);
//The address you see at the registry
final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl))
.removeParameter(Constants.MONITOR_KEY)
.removeParameter(Constants.BIND_IP_KEY)
.removeParameter(Constants.BIND_PORT_KEY)
.removeParameter(QOS_ENABLE)
.removeParameter(QOS_PORT)
.removeParameter(ACCEPT_FOREIGN_IP)
.removeParameter(VALIDATION_KEY);
return registedProviderUrl;
}
private URL getSubscribedOverrideUrl(URL registedProviderUrl) {
return registedProviderUrl.setProtocol(Constants.PROVIDER_PROTOCOL)
.addParameters(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false));
}
/**
* Get the address of the providerUrl through the url of the invoker
*
* @param origininvoker
* @return
*/
private URL getProviderUrl(final Invoker<?> origininvoker) {
String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
if (export == null || export.length() == 0) {
throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
}
URL providerUrl = URL.valueOf(export);
return providerUrl;
}
/**
* Get the key cached in bounds by invoker
*
* @param originInvoker
* @return
*/
private String getCacheKey(final Invoker<?> originInvoker) {
URL providerUrl = getProviderUrl(originInvoker);
String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
return key;
}
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
private Cluster getMergeableCluster() {
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
@Override
public void destroy() {
List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
for (Exporter<?> exporter : exporters) {
exporter.unexport();
}
bounds.clear();
}
public static class InvokerDelegete<T> extends InvokerWrapper<T> {
private final Invoker<T> invoker;
/**
* @param invoker
* @param url invoker.getUrl return this value
*/
public InvokerDelegete(Invoker<T> invoker, URL url) {
super(invoker, url);
this.invoker = invoker;
}
public Invoker<T> getInvoker() {
if (invoker instanceof InvokerDelegete) {
return ((InvokerDelegete<T>) invoker).getInvoker();
} else {
return invoker;
}
}
}
/**
* Reexport: the exporter destroy problem in protocol
* 1.Ensure that the exporter returned by registryprotocol can be normal destroyed
* 2.No need to re-register to the registry after notify
* 3.The invoker passed by the export method , would better to be the invoker of exporter
*/
private class OverrideListener implements NotifyListener {
private final URL subscribeUrl;
private final Invoker originInvoker;
public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
this.subscribeUrl = subscribeUrl;
this.originInvoker = originalInvoker;
}
/**
* @param urls The list of registered information , is always not empty, The meaning is the same as the return value of {@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}.
*/
@Override
public synchronized void notify(List<URL> urls) {
logger.debug("original override urls: " + urls);
List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl);
logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
// No matching results
if (matchedUrls.isEmpty()) {
return;
}
List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls);
final Invoker<?> invoker;
if (originInvoker instanceof InvokerDelegete) {
invoker = ((InvokerDelegete<?>) originInvoker).getInvoker();
} else {
invoker = originInvoker;
}
//The origin invoker
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<?> exporter = bounds.get(key);
if (exporter == null) {
logger.warn(new IllegalStateException("error state, exporter should not be null"));
return;
}
//The current, may have been merged many times
URL currentUrl = exporter.getInvoker().getUrl();
//Merged with this configuration
URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
if (!currentUrl.equals(newUrl)) {
RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl);
}
}
private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) {
List<URL> result = new ArrayList<URL>();
for (URL url : configuratorUrls) {
URL overrideUrl = url;
// Compatible with the old version
if (url.getParameter(Constants.CATEGORY_KEY) == null
&& Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY);
}
// Check whether url is to be applied to the current service
if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) {
result.add(url);
}
}
return result;
}
//Merge the urls of configurators
private URL getConfigedInvokerUrl(List<Configurator> configurators, URL url) {
for (Configurator configurator : configurators) {
url = configurator.configure(url);
}
return url;
}
}
/**
* exporter proxy, establish the corresponding relationship between the returned exporter and the exporter exported by the protocol, and can modify the relationship at the time of override.
*
* @param <T>
*/
private class ExporterChangeableWrapper<T> implements Exporter<T> {
private final Invoker<T> originInvoker;
private Exporter<T> exporter;
public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
this.exporter = exporter;
this.originInvoker = originInvoker;
}
public Invoker<T> getOriginInvoker() {
return originInvoker;
}
@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void setExporter(Exporter<T> exporter) {
this.exporter = exporter;
}
@Override
public void unexport() {
String key = getCacheKey(this.originInvoker);
bounds.remove(key);
exporter.unexport();
}
}
static private class DestroyableExporter<T> implements Exporter<T> {
public static final ExecutorService executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("Exporter-Unexport", true));
private Exporter<T> exporter;
private Invoker<T> originInvoker;
private URL subscribeUrl;
private URL registerUrl;
public DestroyableExporter(Exporter<T> exporter, Invoker<T> originInvoker, URL subscribeUrl, URL registerUrl) {
this.exporter = exporter;
this.originInvoker = originInvoker;
this.subscribeUrl = subscribeUrl;
this.registerUrl = registerUrl;
}
@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
@Override
public void unexport() {
Registry registry = RegistryProtocol.INSTANCE.getRegistry(originInvoker);
try {
registry.unregister(registerUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
NotifyListener listener = RegistryProtocol.INSTANCE.overrideListeners.remove(subscribeUrl);
registry.unsubscribe(subscribeUrl, listener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
executor.submit(new Runnable() {
@Override
public void run() {
try {
int timeout = ConfigUtils.getServerShutdownTimeout();
if (timeout > 0) {
logger.info("Waiting " + timeout + "ms for registry to notify all consumers before unexport. Usually, this is called when you use dubbo API");
Thread.sleep(timeout);
}
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
});
}
}
}