/* | |
* Copyright 1999-2011 Alibaba Group. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.alibaba.dubbo.registry.support; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import com.alibaba.dubbo.common.Constants; | |
import com.alibaba.dubbo.common.URL; | |
import com.alibaba.dubbo.common.extension.ExtensionLoader; | |
import com.alibaba.dubbo.common.logger.Logger; | |
import com.alibaba.dubbo.common.logger.LoggerFactory; | |
import com.alibaba.dubbo.common.utils.NetUtils; | |
import com.alibaba.dubbo.common.utils.StringUtils; | |
import com.alibaba.dubbo.registry.NotifyListener; | |
import com.alibaba.dubbo.registry.Registry; | |
import com.alibaba.dubbo.registry.RegistryFactory; | |
import com.alibaba.dubbo.rpc.Exporter; | |
import com.alibaba.dubbo.rpc.Invoker; | |
import com.alibaba.dubbo.rpc.Protocol; | |
import com.alibaba.dubbo.rpc.RpcException; | |
import com.alibaba.dubbo.rpc.cluster.Cluster; | |
import com.alibaba.dubbo.rpc.protocol.InvokerWrapper; | |
/** | |
* RegistryProtocol | |
* | |
* @author william.liangf | |
* @author chao.liuc | |
*/ | |
public class RegistryProtocol implements Protocol { | |
private Cluster cluster; | |
public void setCluster(Cluster cluster) { | |
this.cluster = cluster; | |
} | |
private Protocol protocol; | |
public void setProtocol(Protocol protocol) { | |
this.protocol = protocol; | |
} | |
private RegistryFactory registryFactory; | |
public void setRegistryFactory(RegistryFactory registryFactory) { | |
this.registryFactory = registryFactory; | |
} | |
public int getDefaultPort() { | |
return 9090; | |
} | |
private static RegistryProtocol INSTANCE; | |
public RegistryProtocol() { | |
INSTANCE = this; | |
} | |
public static RegistryProtocol getRegistryProtocol() { | |
if (INSTANCE == null) { | |
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(Constants.REGISTRY_PROTOCOL); // load | |
} | |
return INSTANCE; | |
} | |
//用于解决rmi重复暴露端口冲突的问题,已经暴露过的服务不再重新暴露 | |
//providerurl <--> exporter | |
private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>(); | |
private final NotifyListener listener = new OverrideListener(); | |
private final static Logger logger = LoggerFactory.getLogger(RegistryProtocol.class); | |
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { | |
//export invoker | |
final ExporterChangeableWrapper<T> exporter = doLocolExport(originInvoker); | |
//registry provider | |
Registry registry = doRegister(originInvoker); | |
//设置exporter与registry的关系 (for unexport) | |
exporter.setRegistry(registry); | |
//保证每次export都返回一个新的exporter实例 | |
return new Exporter<T>() { | |
public Invoker<T> getInvoker() { | |
return exporter.getInvoker(); | |
} | |
public void unexport() { | |
exporter.unexport(); | |
} | |
}; | |
} | |
@SuppressWarnings("unchecked") | |
private <T> ExporterChangeableWrapper<T> doLocolExport(final Invoker<T> originInvoker){ | |
String key = getCacheKey(originInvoker); | |
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); | |
if (exporter == null) { | |
synchronized (bounds) { | |
exporter = (ExporterChangeableWrapper<T>) bounds.get(key); | |
if (exporter == null) { | |
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); | |
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); | |
bounds.put(key, exporter); | |
} | |
} | |
} | |
return (ExporterChangeableWrapper<T>) exporter; | |
} | |
/** | |
* 对修改了url的invoker重新export | |
* @param originInvoker | |
* @param newInvokerUrl | |
*/ | |
@SuppressWarnings("unchecked") | |
private <T> void doChangeLocolExport(final Invoker<T> originInvoker, URL newInvokerUrl){ | |
String key = getCacheKey(originInvoker); | |
final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); | |
if (exporter == null){ | |
logger.warn(new IllegalStateException("error state, exporter should not be null")); | |
return ;//不存在是异常场景 直接返回 | |
} else { | |
final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl); | |
exporter.setExporter(protocol.export(invokerDelegete)); | |
} | |
} | |
/** | |
* 注册 provider | |
* @param originInvoker | |
* @return | |
*/ | |
private Registry doRegister(final Invoker<?> originInvoker){ | |
final Registry registry = getRegistry(originInvoker); | |
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); | |
registry.register(registedProviderUrl, listener); | |
return registry; | |
} | |
/** | |
* 根据invoker的地址获取registry实例 | |
* @param originInvoker | |
* @return | |
*/ | |
private Registry getRegistry(final Invoker<?> originInvoker){ | |
URL registryUrl = originInvoker.getUrl(); | |
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { | |
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); | |
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); | |
} | |
return registryFactory.getRegistry(registryUrl); | |
} | |
/** | |
* 返回注册到注册中心的URL,对URL参数进行一次过滤 | |
* @param originInvoker | |
* @return | |
*/ | |
private URL getRegistedProviderUrl(final Invoker<?> originInvoker){ | |
URL providerUrl = getProviderUrl(originInvoker); | |
//注册中心看到的地址 | |
final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameter(Constants.MONITOR_KEY); | |
return registedProviderUrl; | |
} | |
/** | |
* 通过invoker的url 获取 providerUrl的地址 | |
* @param origininvoker | |
* @return | |
*/ | |
private URL getProviderUrl(final Invoker<?> origininvoker){ | |
String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); | |
if (export == null || export.length() == 0) { | |
throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl()); | |
} | |
URL providerUrl = URL.valueOf(export); | |
return providerUrl; | |
} | |
/** | |
* 获取invoker在bounds中缓存的key | |
* @param originInvoker | |
* @return | |
*/ | |
private String getCacheKey(final Invoker<?> originInvoker){ | |
URL providerUrl = getProviderUrl(originInvoker); | |
String key = providerUrl.removeParameters("dynamic", "enabled").toFullString(); | |
return key; | |
} | |
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { | |
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); | |
Registry registry = registryFactory.getRegistry(url); | |
// group="a,b" or group="*" | |
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); | |
String group = qs.get(Constants.GROUP_KEY); | |
if (group != null && group.length() > 0 ) { | |
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1 | |
|| "*".equals( group ) ) { | |
return doRefer( getMergeableCluster(), registry, type, url ); | |
} | |
} | |
return doRefer(cluster, registry, type, url); | |
} | |
private Cluster getMergeableCluster() { | |
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable"); | |
} | |
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { | |
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); | |
directory.setRegistry(registry); | |
directory.setProtocol(protocol); | |
registry.subscribe(new URL(Constants.SUBSCRIBE_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()), directory); | |
return cluster.join(directory); | |
} | |
//过滤URL中不需要输出的参数(以点号开头的) | |
private static String[] getFilteredKeys(URL url) { | |
Map<String, String> params = url.getParameters(); | |
if (params != null && !params.isEmpty()) { | |
List<String> filteredKeys = new ArrayList<String>(); | |
for (Map.Entry<String, String> entry : params.entrySet()) { | |
if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) { | |
filteredKeys.add(entry.getKey()); | |
} | |
} | |
return filteredKeys.toArray(new String[filteredKeys.size()]); | |
} else { | |
return new String[] {}; | |
} | |
} | |
public void destroy() { | |
List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values()); | |
for(Exporter<?> exporter :exporters){ | |
exporter.unexport(); | |
} | |
bounds.clear(); | |
} | |
/*重新export 1.protocol中的exporter destory问题 | |
*1.要求registryprotocol返回的exporter可以正常destroy | |
*2.notify后不需要重新向注册中心注册 | |
*3.export 方法传入的invoker最好能一直作为exporter的invoker. | |
*/ | |
private class OverrideListener implements NotifyListener { | |
/* | |
* provider 端可识别的override url只有这两种. | |
* override://0.0.0.0/serviceName?timeout=10 | |
* override://0.0.0.0/?timeout=10 | |
*/ | |
public void notify(List<URL> urls) { | |
List<ExporterChangeableWrapper<?>> exporters = new ArrayList<ExporterChangeableWrapper<?>>(bounds.values()); | |
for (ExporterChangeableWrapper<?> exporter : exporters){ | |
Invoker<?> invoker = exporter.getOriginInvoker(); | |
final Invoker<?> originInvoker ; | |
if (invoker instanceof InvokerDelegete){ | |
originInvoker = ((InvokerDelegete<?>)invoker).getInvoker(); | |
}else { | |
originInvoker = invoker; | |
} | |
URL originUrl = RegistryProtocol.this.getProviderUrl(originInvoker); | |
URL newUrl = getNewInvokerUrl(originUrl, urls); | |
if (! originUrl.toFullString().equals(newUrl.toFullString())){ | |
RegistryProtocol.this.doChangeLocolExport(originInvoker, newUrl); | |
} | |
} | |
} | |
private URL getNewInvokerUrl(final URL originUrl, final List<URL> urls){ | |
URL newUrl = originUrl; | |
//override://0.0.0.0/?timeout=10 ip:port无意义 | |
for (URL overrideUrl : urls){ | |
if (overrideUrl.getServiceInterface() == null){ | |
newUrl = newUrl.addParameters(overrideUrl.getParameters()); | |
} | |
} | |
//override://0.0.0.0/serviceName?timeout=10 | |
for (URL overrideUrl : urls){ | |
if (originUrl.getServiceKey().equals(overrideUrl.getServiceKey())){ | |
newUrl = newUrl.addParameters(overrideUrl.getParameters()); | |
} | |
} | |
return newUrl; | |
} | |
} | |
public static class InvokerDelegete<T> extends InvokerWrapper<T>{ | |
private final Invoker<T> invoker; | |
/** | |
* @param invoker | |
* @param url invoker.getUrl返回此值 | |
*/ | |
public InvokerDelegete(Invoker<T> invoker, URL url){ | |
super(invoker, url); | |
this.invoker = invoker; | |
} | |
public Invoker<T> getInvoker(){ | |
if (invoker instanceof InvokerDelegete){ | |
return ((InvokerDelegete<T>)invoker).getInvoker(); | |
} else { | |
return invoker; | |
} | |
} | |
} | |
/** | |
* exporter代理,建立返回的exporter与protocol export出的exporter的对应关系,在override时可以进行关系修改. | |
* | |
* @author chao.liuc | |
* | |
* @param <T> | |
*/ | |
private class ExporterChangeableWrapper<T> implements Exporter<T>{ | |
private Exporter<T> exporter; | |
private final Invoker<T> originInvoker; | |
private Registry registry; | |
public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker){ | |
this.exporter = exporter; | |
this.originInvoker = originInvoker; | |
} | |
public Invoker<T> getOriginInvoker() { | |
return originInvoker; | |
} | |
public Invoker<T> getInvoker() { | |
return exporter.getInvoker(); | |
} | |
public void setExporter(Exporter<T> exporter){ | |
this.exporter = exporter; | |
} | |
public void setRegistry(final Registry registry) { | |
if (this.registry != null){ | |
logger.warn(new IllegalStateException("registry can not be changed!")); | |
} | |
this.registry = registry; | |
} | |
public void unexport() { | |
String key = getCacheKey(this.originInvoker); | |
bounds.remove(key); | |
try { | |
if (registry != null && registry.isAvailable()) { | |
registry.unregister(getRegistedProviderUrl(originInvoker), listener); | |
} | |
} finally { | |
exporter.unexport(); | |
} | |
} | |
} | |
} |