| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.registry.client; |
| |
| import org.apache.dubbo.common.ProtocolServiceKey; |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; |
| import org.apache.dubbo.common.constants.CommonConstants; |
| import org.apache.dubbo.common.extension.ExtensionLoader; |
| import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.utils.Assert; |
| import org.apache.dubbo.common.utils.CollectionUtils; |
| import org.apache.dubbo.common.utils.NetUtils; |
| import org.apache.dubbo.metadata.MetadataInfo; |
| import org.apache.dubbo.registry.AddressListener; |
| import org.apache.dubbo.registry.Constants; |
| import org.apache.dubbo.registry.ProviderFirstParams; |
| import org.apache.dubbo.registry.integration.AbstractConfiguratorListener; |
| import org.apache.dubbo.registry.integration.DynamicDirectory; |
| import org.apache.dubbo.rpc.Invocation; |
| import org.apache.dubbo.rpc.Invoker; |
| import org.apache.dubbo.rpc.Protocol; |
| import org.apache.dubbo.rpc.Result; |
| import org.apache.dubbo.rpc.RpcContext; |
| import org.apache.dubbo.rpc.RpcException; |
| import org.apache.dubbo.rpc.RpcInvocation; |
| import org.apache.dubbo.rpc.RpcServiceContext; |
| import org.apache.dubbo.rpc.cluster.Configurator; |
| import org.apache.dubbo.rpc.cluster.RouterChain; |
| import org.apache.dubbo.rpc.cluster.directory.StaticDirectory; |
| import org.apache.dubbo.rpc.cluster.router.state.BitList; |
| import org.apache.dubbo.rpc.model.ModuleModel; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_HASHMAP_LOAD_FACTOR; |
| import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL; |
| import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE; |
| import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX; |
| import static org.apache.dubbo.rpc.model.ScopeModelUtil.getModuleModel; |
| |
| public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> { |
| private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(ServiceDiscoveryRegistryDirectory.class); |
| |
| /** |
| * instance address to invoker mapping. |
| * The initial value is null and the midway may be assigned to null, please use the local variable reference |
| */ |
| private volatile Map<ProtocolServiceKeyWithAddress, Invoker<T>> urlInvokerMap; |
| private volatile ReferenceConfigurationListener referenceConfigurationListener; |
| private volatile boolean enableConfigurationListen = true; |
| private volatile List<URL> originalUrls = null; |
| private volatile Map<String, String> overrideQueryMap; |
| private final Set<String> providerFirstParams; |
| private final ModuleModel moduleModel; |
| private final ProtocolServiceKey consumerProtocolServiceKey; |
| private final Map<ProtocolServiceKey, URL> customizedConsumerUrlMap = new ConcurrentHashMap<>(); |
| |
| public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) { |
| super(serviceType, url); |
| moduleModel = getModuleModel(url.getScopeModel()); |
| |
| Set<ProviderFirstParams> providerFirstParams = url.getOrDefaultApplicationModel().getExtensionLoader(ProviderFirstParams.class).getSupportedExtensionInstances(); |
| if (CollectionUtils.isEmpty(providerFirstParams)) { |
| this.providerFirstParams = null; |
| } else { |
| if (providerFirstParams.size() == 1) { |
| this.providerFirstParams = Collections.unmodifiableSet(providerFirstParams.iterator().next().params()); |
| } else { |
| Set<String> params = new HashSet<>(); |
| for (ProviderFirstParams paramsFilter : providerFirstParams) { |
| if (paramsFilter.params() == null) { |
| break; |
| } |
| params.addAll(paramsFilter.params()); |
| } |
| this.providerFirstParams = Collections.unmodifiableSet(params); |
| } |
| } |
| |
| String protocol = consumerUrl.getParameter(PROTOCOL_KEY, consumerUrl.getProtocol()); |
| consumerProtocolServiceKey = new ProtocolServiceKey(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), |
| !CommonConstants.CONSUMER.equals(protocol) ? protocol : null); |
| } |
| |
| @Override |
| public void subscribe(URL url) { |
| if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) { |
| enableConfigurationListen = true; |
| getConsumerConfigurationListener(moduleModel).addNotifyListener(this); |
| referenceConfigurationListener = new ReferenceConfigurationListener(this.moduleModel, this, url); |
| } else { |
| enableConfigurationListen = false; |
| } |
| super.subscribe(url); |
| } |
| |
| private ConsumerConfigurationListener getConsumerConfigurationListener(ModuleModel moduleModel) { |
| return moduleModel.getBeanFactory().getOrRegisterBean(ConsumerConfigurationListener.class, |
| type -> new ConsumerConfigurationListener(moduleModel)); |
| } |
| |
| @Override |
| public void unSubscribe(URL url) { |
| super.unSubscribe(url); |
| this.originalUrls = null; |
| if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) { |
| getConsumerConfigurationListener(moduleModel).removeNotifyListener(this); |
| referenceConfigurationListener.stop(); |
| } |
| } |
| |
| @Override |
| public void destroy() { |
| super.destroy(); |
| if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) { |
| getConsumerConfigurationListener(moduleModel).removeNotifyListener(this); |
| referenceConfigurationListener.stop(); |
| } |
| } |
| |
| @Override |
| public void buildRouterChain(URL url) { |
| this.setRouterChain(RouterChain.buildChain(getInterface(), url.addParameter(REGISTRY_TYPE_KEY, SERVICE_REGISTRY_TYPE))); |
| } |
| |
| @Override |
| public synchronized void notify(List<URL> instanceUrls) { |
| if (isDestroyed()) { |
| return; |
| } |
| // Set the context of the address notification thread. |
| RpcServiceContext.getServiceContext().setConsumerUrl(getConsumerUrl()); |
| |
| // 3.x added for extend URL address |
| ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class); |
| List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); |
| if (supportedListeners != null && !supportedListeners.isEmpty()) { |
| for (AddressListener addressListener : supportedListeners) { |
| instanceUrls = addressListener.notify(instanceUrls, getConsumerUrl(), this); |
| } |
| } |
| |
| refreshOverrideAndInvoker(instanceUrls); |
| } |
| |
| // RefreshOverrideAndInvoker will be executed by registryCenter and configCenter, so it should be synchronized. |
| private synchronized void refreshOverrideAndInvoker(List<URL> instanceUrls) { |
| // mock zookeeper://xxx?mock=return null |
| refreshInvoker(instanceUrls); |
| } |
| |
| private InstanceAddressURL overrideWithConfigurator(InstanceAddressURL providerUrl) { |
| // override url with configurator from "app-name.configurators" |
| providerUrl = overrideWithConfigurators(getConsumerConfigurationListener(moduleModel).getConfigurators(), providerUrl); |
| |
| // override url with configurator from configurators from "service-name.configurators" |
| if (referenceConfigurationListener != null) { |
| providerUrl = overrideWithConfigurators(referenceConfigurationListener.getConfigurators(), providerUrl); |
| } |
| |
| return providerUrl; |
| } |
| |
| private InstanceAddressURL overrideWithConfigurators(List<Configurator> configurators, InstanceAddressURL url) { |
| if (CollectionUtils.isNotEmpty(configurators)) { |
| // wrap url |
| OverrideInstanceAddressURL overrideInstanceAddressURL = new OverrideInstanceAddressURL(url); |
| if (overrideQueryMap != null) { |
| // override app-level configs |
| overrideInstanceAddressURL = (OverrideInstanceAddressURL) overrideInstanceAddressURL.addParameters(overrideQueryMap); |
| } |
| for (Configurator configurator : configurators) { |
| overrideInstanceAddressURL = (OverrideInstanceAddressURL) configurator.configure(overrideInstanceAddressURL); |
| } |
| return overrideInstanceAddressURL; |
| } |
| return url; |
| } |
| |
| @Override |
| public boolean isServiceDiscovery() { |
| return true; |
| } |
| |
| /** |
| * This implementation makes sure all application names related to serviceListener received address notification. |
| * <p> |
| * FIXME, make sure deprecated "interface-application" mapping item be cleared in time. |
| */ |
| @Override |
| public boolean isNotificationReceived() { |
| return serviceListener == null || serviceListener.isDestroyed() |
| || serviceListener.getAllInstances().size() == serviceListener.getServiceNames().size(); |
| } |
| |
| private void refreshInvoker(List<URL> invokerUrls) { |
| Assert.notNull(invokerUrls, "invokerUrls should not be null, use EMPTY url to clear current addresses."); |
| this.originalUrls = invokerUrls; |
| |
| if (invokerUrls.size() == 1 && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { |
| logger.warn("Received url with EMPTY protocol, will clear all available addresses."); |
| this.forbidden = true; // Forbid to access |
| routerChain.setInvokers(BitList.emptyList()); |
| destroyAllInvokers(); // Close all invokers |
| } else { |
| this.forbidden = false; // Allow accessing |
| if (CollectionUtils.isEmpty(invokerUrls)) { |
| logger.warn("Received empty url list, will ignore for protection purpose."); |
| return; |
| } |
| |
| // use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers(). |
| Map<ProtocolServiceKeyWithAddress, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; |
| // can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers(). |
| Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap = null; |
| if (localUrlInvokerMap != null) { |
| // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing. |
| oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR)); |
| localUrlInvokerMap.forEach(oldUrlInvokerMap::put); |
| } |
| Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map |
| logger.info("Refreshed invoker size " + newUrlInvokerMap.size()); |
| |
| if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { |
| logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")")); |
| return; |
| } |
| List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); |
| this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers)); |
| // pre-route and build cache |
| routerChain.setInvokers(this.getInvokers()); |
| this.urlInvokerMap = newUrlInvokerMap; |
| |
| if (oldUrlInvokerMap != null) { |
| try { |
| destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker |
| } catch (Exception e) { |
| logger.warn("destroyUnusedInvokers error. ", e); |
| } |
| } |
| } |
| |
| // notify invokers refreshed |
| this.invokersChanged(); |
| } |
| |
| /** |
| * Turn urls into invokers, and if url has been refer, will not re-reference. |
| * the items that will be put into newUrlInvokeMap will be removed from oldUrlInvokerMap. |
| * |
| * @param oldUrlInvokerMap it might be modified during the process. |
| * @param urls |
| * @return invokers |
| */ |
| private Map<ProtocolServiceKeyWithAddress, Invoker<T>> toInvokers(Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap, List<URL> urls) { |
| Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1)); |
| if (urls == null || urls.isEmpty()) { |
| return newUrlInvokerMap; |
| } |
| |
| for (URL url : urls) { |
| InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url; |
| if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) { |
| continue; |
| } |
| if (!getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).hasExtension(instanceAddressURL.getProtocol())) { |
| logger.error(new IllegalStateException("Unsupported protocol " + instanceAddressURL.getProtocol() + |
| " in notified url: " + instanceAddressURL + " from registry " + getUrl().getAddress() + |
| " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + |
| getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).getSupportedExtensions())); |
| continue; |
| } |
| |
| instanceAddressURL.setProviderFirstParams(providerFirstParams); |
| |
| // Override provider urls if needed |
| if (enableConfigurationListen) { |
| instanceAddressURL = overrideWithConfigurator(instanceAddressURL); |
| } |
| |
| // filter all the service available (version wildcard, group wildcard, protocol wildcard) |
| int port = instanceAddressURL.getPort(); |
| List<ProtocolServiceKey> matchedProtocolServiceKeys = instanceAddressURL.getMetadataInfo() |
| .getMatchedServiceInfos(consumerProtocolServiceKey) |
| .stream() |
| .filter(serviceInfo -> serviceInfo.getPort() <= 0 || serviceInfo.getPort() == port) |
| .map(MetadataInfo.ServiceInfo::getProtocolServiceKey) |
| .collect(Collectors.toList()); |
| |
| // see org.apache.dubbo.common.ProtocolServiceKey.isSameWith |
| // check if needed to override the consumer url |
| boolean shouldWrap = matchedProtocolServiceKeys.size() != 1 || !consumerProtocolServiceKey.isSameWith(matchedProtocolServiceKeys.get(0)); |
| |
| for (ProtocolServiceKey matchedProtocolServiceKey : matchedProtocolServiceKeys) { |
| ProtocolServiceKeyWithAddress protocolServiceKeyWithAddress = new ProtocolServiceKeyWithAddress(matchedProtocolServiceKey, instanceAddressURL.getAddress()); |
| Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.get(protocolServiceKeyWithAddress); |
| if (invoker == null || urlChanged(invoker, instanceAddressURL, matchedProtocolServiceKey)) { // Not in the cache, refer again |
| try { |
| boolean enabled; |
| if (instanceAddressURL.hasParameter(DISABLED_KEY)) { |
| enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false); |
| } else { |
| enabled = instanceAddressURL.getParameter(ENABLED_KEY, true); |
| } |
| if (enabled) { |
| if (shouldWrap) { |
| URL newConsumerUrl = customizedConsumerUrlMap.computeIfAbsent(matchedProtocolServiceKey, |
| k -> consumerUrl.setProtocol(k.getProtocol()) |
| .addParameter(CommonConstants.GROUP_KEY, k.getGroup()) |
| .addParameter(CommonConstants.VERSION_KEY, k.getVersion())); |
| RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl); |
| invoker = new InstanceWrappedInvoker<>(protocol.refer(serviceType, instanceAddressURL), newConsumerUrl, matchedProtocolServiceKey); |
| } else { |
| invoker = protocol.refer(serviceType, instanceAddressURL); |
| } |
| } |
| } catch (Throwable t) { |
| logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t); |
| } |
| if (invoker != null) { // Put new invoker in cache |
| newUrlInvokerMap.put(protocolServiceKeyWithAddress, invoker); |
| } |
| } else { |
| newUrlInvokerMap.put(protocolServiceKeyWithAddress, invoker); |
| oldUrlInvokerMap.remove(protocolServiceKeyWithAddress, invoker); |
| } |
| } |
| } |
| return newUrlInvokerMap; |
| } |
| |
| private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL, ProtocolServiceKey protocolServiceKey) { |
| InstanceAddressURL oldURL = (InstanceAddressURL) invoker.getUrl(); |
| |
| if (!newURL.getInstance().equals(oldURL.getInstance())) { |
| return true; |
| } |
| |
| if (oldURL instanceof OverrideInstanceAddressURL || newURL instanceof OverrideInstanceAddressURL) { |
| if (!(oldURL instanceof OverrideInstanceAddressURL && newURL instanceof OverrideInstanceAddressURL)) { |
| // sub-class changed |
| return true; |
| } else { |
| if (!((OverrideInstanceAddressURL) oldURL).getOverrideParams().equals(((OverrideInstanceAddressURL) newURL).getOverrideParams())) { |
| return true; |
| } |
| } |
| } |
| |
| MetadataInfo.ServiceInfo oldServiceInfo = oldURL.getMetadataInfo().getValidServiceInfo(protocolServiceKey.toString()); |
| if (null == oldServiceInfo) { |
| return false; |
| } |
| |
| return !oldServiceInfo.equals(newURL.getMetadataInfo().getValidServiceInfo(protocolServiceKey.toString())); |
| } |
| |
| private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) { |
| List<Invoker<T>> mergedInvokers = new ArrayList<>(); |
| Map<String, List<Invoker<T>>> groupMap = new HashMap<>(); |
| for (Invoker<T> invoker : invokers) { |
| String group = invoker.getUrl().getGroup(""); |
| groupMap.computeIfAbsent(group, k -> new ArrayList<>()); |
| groupMap.get(group).add(invoker); |
| } |
| |
| if (groupMap.size() == 1) { |
| mergedInvokers.addAll(groupMap.values().iterator().next()); |
| } else if (groupMap.size() > 1) { |
| for (List<Invoker<T>> groupList : groupMap.values()) { |
| StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList); |
| staticDirectory.buildRouterChain(); |
| mergedInvokers.add(cluster.join(staticDirectory, false)); |
| } |
| } else { |
| mergedInvokers = invokers; |
| } |
| return mergedInvokers; |
| } |
| |
| /** |
| * Close all invokers |
| */ |
| @Override |
| protected void destroyAllInvokers() { |
| Map<ProtocolServiceKeyWithAddress, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference |
| if (localUrlInvokerMap != null) { |
| for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) { |
| try { |
| invoker.destroyAll(); |
| } catch (Throwable t) { |
| logger.warn("Failed to destroy service " + serviceKey + " to provider " + invoker.getUrl(), t); |
| } |
| } |
| localUrlInvokerMap.clear(); |
| } |
| |
| this.urlInvokerMap = null; |
| this.destroyInvokers(); |
| } |
| |
| /** |
| * Check whether the invoker in the cache needs to be destroyed |
| * If set attribute of url: refer.autodestroy=false, the invokers will only increase without decreasing,there may be a refer leak |
| * |
| * @param oldUrlInvokerMap |
| * @param newUrlInvokerMap |
| */ |
| private void destroyUnusedInvokers(Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap, Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap) { |
| if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { |
| destroyAllInvokers(); |
| return; |
| } |
| |
| if (oldUrlInvokerMap == null || oldUrlInvokerMap.size() == 0) { |
| return; |
| } |
| |
| for (Map.Entry<ProtocolServiceKeyWithAddress, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) { |
| Invoker<T> invoker = entry.getValue(); |
| if (invoker != null) { |
| try { |
| invoker.destroyAll(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("destroy invoker[" + invoker.getUrl() + "] success. "); |
| } |
| } catch (Exception e) { |
| logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e); |
| } |
| } |
| } |
| logger.info(oldUrlInvokerMap.size() + " deprecated invokers deleted."); |
| } |
| |
| private class ReferenceConfigurationListener extends AbstractConfiguratorListener { |
| private final ServiceDiscoveryRegistryDirectory<?> directory; |
| private final URL url; |
| |
| ReferenceConfigurationListener(ModuleModel moduleModel, ServiceDiscoveryRegistryDirectory<?> directory, URL url) { |
| super(moduleModel); |
| this.directory = directory; |
| this.url = url; |
| this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX); |
| } |
| |
| void stop() { |
| this.stopListen(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX); |
| } |
| |
| @Override |
| protected void notifyOverrides() { |
| // to notify configurator/router changes |
| if (directory.originalUrls != null) { |
| URL backup = RpcContext.getServiceContext().getConsumerUrl(); |
| RpcContext.getServiceContext().setConsumerUrl(directory.getConsumerUrl()); |
| directory.refreshOverrideAndInvoker(directory.originalUrls); |
| RpcContext.getServiceContext().setConsumerUrl(backup); |
| } |
| } |
| } |
| |
| private static class ConsumerConfigurationListener extends AbstractConfiguratorListener { |
| private final List<ServiceDiscoveryRegistryDirectory<?>> listeners = new ArrayList<>(); |
| |
| ConsumerConfigurationListener(ModuleModel moduleModel) { |
| super(moduleModel); |
| } |
| |
| void addNotifyListener(ServiceDiscoveryRegistryDirectory<?> listener) { |
| if (listeners.size() == 0) { |
| this.initWith(moduleModel.getApplicationModel().getApplicationName() + CONFIGURATORS_SUFFIX); |
| } |
| this.listeners.add(listener); |
| } |
| |
| void removeNotifyListener(ServiceDiscoveryRegistryDirectory<?> listener) { |
| this.listeners.remove(listener); |
| if (listeners.size() == 0) { |
| this.stopListen(moduleModel.getApplicationModel().getApplicationName() + CONFIGURATORS_SUFFIX); |
| } |
| } |
| |
| @Override |
| protected void notifyOverrides() { |
| listeners.forEach(listener -> { |
| if (listener.originalUrls != null) { |
| URL backup = RpcContext.getServiceContext().getConsumerUrl(); |
| RpcContext.getServiceContext().setConsumerUrl(listener.getConsumerUrl()); |
| listener.refreshOverrideAndInvoker(listener.originalUrls); |
| RpcContext.getServiceContext().setConsumerUrl(backup); |
| } |
| }); |
| } |
| } |
| |
| public static final class ProtocolServiceKeyWithAddress extends ProtocolServiceKey { |
| private final String address; |
| |
| public ProtocolServiceKeyWithAddress(ProtocolServiceKey protocolServiceKey, String address) { |
| super(protocolServiceKey.getInterfaceName(), protocolServiceKey.getVersion(), protocolServiceKey.getGroup(), protocolServiceKey.getProtocol()); |
| this.address = address; |
| } |
| |
| public String getAddress() { |
| return address; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| if (!super.equals(o)) { |
| return false; |
| } |
| ProtocolServiceKeyWithAddress that = (ProtocolServiceKeyWithAddress) o; |
| return Objects.equals(address, that.address); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(super.hashCode(), address); |
| } |
| } |
| |
| public static final class InstanceWrappedInvoker<T> implements Invoker<T> { |
| private final Invoker<T> originInvoker; |
| private final URL newConsumerUrl; |
| private final ProtocolServiceKey protocolServiceKey; |
| |
| public InstanceWrappedInvoker(Invoker<T> originInvoker, URL newConsumerUrl, ProtocolServiceKey protocolServiceKey) { |
| this.originInvoker = originInvoker; |
| this.newConsumerUrl = newConsumerUrl; |
| this.protocolServiceKey = protocolServiceKey; |
| } |
| |
| @Override |
| public Class<T> getInterface() { |
| return originInvoker.getInterface(); |
| } |
| |
| @Override |
| public Result invoke(Invocation invocation) throws RpcException { |
| // override consumer url with real protocol service key |
| RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl); |
| // recreate invocation due to the protocol service key changed |
| RpcInvocation copiedInvocation = new RpcInvocation(invocation.getTargetServiceUniqueName(), |
| invocation.getServiceModel(), invocation.getMethodName(), invocation.getServiceName(), protocolServiceKey.toString(), |
| invocation.getParameterTypes(), invocation.getArguments(), invocation.getObjectAttachments(), |
| invocation.getInvoker(), invocation.getAttributes(), |
| invocation instanceof RpcInvocation ? ((RpcInvocation) invocation).getInvokeMode() : null); |
| copiedInvocation.setObjectAttachment(CommonConstants.GROUP_KEY, protocolServiceKey.getGroup()); |
| copiedInvocation.setObjectAttachment(CommonConstants.VERSION_KEY, protocolServiceKey.getVersion()); |
| return originInvoker.invoke(copiedInvocation); |
| } |
| |
| @Override |
| public URL getUrl() { |
| RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl); |
| return originInvoker.getUrl(); |
| } |
| |
| @Override |
| public boolean isAvailable() { |
| RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl); |
| return originInvoker.isAvailable(); |
| } |
| |
| @Override |
| public void destroy() { |
| RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl); |
| originInvoker.destroy(); |
| } |
| } |
| |
| } |