| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.registry.client; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.extension.ExtensionLoader; |
| import org.apache.dubbo.common.extension.SPI; |
| import org.apache.dubbo.common.logger.Logger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.utils.CollectionUtils; |
| import org.apache.dubbo.common.utils.StringUtils; |
| import org.apache.dubbo.metadata.MappingChangedEvent; |
| import org.apache.dubbo.metadata.MappingListener; |
| import org.apache.dubbo.metadata.ServiceNameMapping; |
| import org.apache.dubbo.metadata.WritableMetadataService; |
| import org.apache.dubbo.registry.NotifyListener; |
| import org.apache.dubbo.registry.Registry; |
| import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; |
| import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; |
| import org.apache.dubbo.registry.client.metadata.SubscribedURLsSynthesizer; |
| import org.apache.dubbo.registry.support.AbstractRegistryFactory; |
| import org.apache.dubbo.registry.support.FailbackRegistry; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.stream.Collectors; |
| |
| import static java.lang.String.format; |
| import static java.util.Collections.emptySet; |
| import static java.util.Collections.unmodifiableSet; |
| import static java.util.stream.Collectors.toSet; |
| import static java.util.stream.Stream.of; |
| import static org.apache.dubbo.common.constants.CommonConstants.DUBBO; |
| import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR; |
| import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.MAPPING_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE; |
| import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDED_BY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE; |
| import static org.apache.dubbo.common.constants.RegistryConstants.SUBSCRIBED_SERVICE_NAMES_KEY; |
| import static org.apache.dubbo.common.function.ThrowableAction.execute; |
| import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty; |
| import static org.apache.dubbo.common.utils.StringUtils.isBlank; |
| import static org.apache.dubbo.registry.client.ServiceDiscoveryFactory.getExtension; |
| import static org.apache.dubbo.rpc.Constants.ID_KEY; |
| |
| /** |
| * Being different to the traditional registry, {@link ServiceDiscoveryRegistry} that is a new service-oriented |
| * {@link Registry} based on {@link ServiceDiscovery}, it will not interact in the external registry directly, |
| * but store the {@link URL urls} that Dubbo services exported and referenced into {@link WritableMetadataService} |
| * when {@link #register(URL)} and {@link #subscribe(URL, NotifyListener)} methods are executed. After that the exported |
| * {@link URL urls} can be get from {@link WritableMetadataService#getExportedURLs()} and its variant methods. In contrast, |
| * {@link WritableMetadataService#getSubscribedURLs()} method offers the subscribed {@link URL URLs}. |
| * <p> |
| * Every {@link ServiceDiscoveryRegistry} object has its own {@link ServiceDiscovery} instance that was initialized |
| * under {@link #ServiceDiscoveryRegistry(URL) the construction}. As the primary argument of constructor , the |
| * {@link URL} of connection the registry decides what the kind of ServiceDiscovery is. Generally, each |
| * protocol associates with a kind of {@link ServiceDiscovery}'s implementation if present, or the |
| * {@link FileSystemServiceDiscovery} will be the default one. Obviously, it's also allowed to extend |
| * {@link ServiceDiscovery} using {@link SPI the Dubbo SPI}. |
| * In contrast, current {@link ServiceInstance service instance} will not be registered to the registry whether any |
| * Dubbo service is exported or not. |
| * <p> |
| * |
| * @see ServiceDiscovery |
| * @see FailbackRegistry |
| * @see WritableMetadataService |
| * @since 2.7.5 |
| */ |
| public class ServiceDiscoveryRegistry implements Registry { |
| |
| protected final Logger logger = LoggerFactory.getLogger(getClass()); |
| |
| private final ServiceDiscovery serviceDiscovery; |
| |
| private final Set<String> subscribedServices; |
| |
| private final ServiceNameMapping serviceNameMapping; |
| |
| private final WritableMetadataService writableMetadataService; |
| |
| private final Set<String> registeredListeners = new LinkedHashSet<>(); |
| |
| /* apps - listener */ |
| private final Map<String, ServiceInstancesChangedListener> serviceListeners = new HashMap<>(); |
| |
| private URL registryURL; |
| |
| /** |
| * A cache for all URLs of services that the subscribed services exported |
| * The key is the service name |
| * The value is a nested {@link Map} whose key is the revision and value is all URLs of services |
| */ |
| private final Map<String, Map<String, List<URL>>> serviceRevisionExportedURLsCache = new LinkedHashMap<>(); |
| |
| public ServiceDiscoveryRegistry(URL registryURL) { |
| this.registryURL = registryURL; |
| this.serviceDiscovery = createServiceDiscovery(registryURL); |
| this.subscribedServices = parseServices(registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY)); |
| this.serviceNameMapping = ServiceNameMapping.getExtension(registryURL.getParameter(MAPPING_KEY)); |
| this.writableMetadataService = WritableMetadataService.getDefaultExtension(); |
| } |
| |
| public ServiceDiscovery getServiceDiscovery() { |
| return serviceDiscovery; |
| } |
| |
| /** |
| * Create the {@link ServiceDiscovery} from the registry {@link URL} |
| * |
| * @param registryURL the {@link URL} to connect the registry |
| * @return non-null |
| */ |
| protected ServiceDiscovery createServiceDiscovery(URL registryURL) { |
| ServiceDiscovery originalServiceDiscovery = getServiceDiscovery(registryURL); |
| ServiceDiscovery serviceDiscovery = enhanceEventPublishing(originalServiceDiscovery); |
| execute(() -> { |
| serviceDiscovery.initialize(registryURL.addParameter(INTERFACE_KEY, ServiceDiscovery.class.getName()) |
| .removeParameter(REGISTRY_TYPE_KEY)); |
| }); |
| return serviceDiscovery; |
| } |
| |
| private List<SubscribedURLsSynthesizer> initSubscribedURLsSynthesizers() { |
| ExtensionLoader<SubscribedURLsSynthesizer> loader = ExtensionLoader.getExtensionLoader(SubscribedURLsSynthesizer.class); |
| return Collections.unmodifiableList(new ArrayList<>(loader.getSupportedExtensionInstances())); |
| } |
| |
| /** |
| * Get the instance {@link ServiceDiscovery} from the registry {@link URL} using |
| * {@link ServiceDiscoveryFactory} SPI |
| * |
| * @param registryURL the {@link URL} to connect the registry |
| * @return |
| */ |
| private ServiceDiscovery getServiceDiscovery(URL registryURL) { |
| ServiceDiscoveryFactory factory = getExtension(registryURL); |
| return factory.getServiceDiscovery(registryURL); |
| } |
| |
| /** |
| * Enhance the original {@link ServiceDiscovery} with event publishing feature |
| * |
| * @param original the original {@link ServiceDiscovery} |
| * @return {@link EventPublishingServiceDiscovery} instance |
| */ |
| private ServiceDiscovery enhanceEventPublishing(ServiceDiscovery original) { |
| return new EventPublishingServiceDiscovery(original); |
| } |
| |
| protected boolean shouldRegister(URL providerURL) { |
| |
| String side = providerURL.getParameter(SIDE_KEY); |
| |
| boolean should = PROVIDER_SIDE.equals(side); // Only register the Provider. |
| |
| if (!should) { |
| if (logger.isDebugEnabled()) { |
| logger.debug(String.format("The URL[%s] should not be registered.", providerURL.toString())); |
| } |
| } |
| |
| return should; |
| } |
| |
| protected boolean shouldSubscribe(URL subscribedURL) { |
| return !shouldRegister(subscribedURL); |
| } |
| |
| @Override |
| public final void register(URL url) { |
| if (!shouldRegister(url)) { // Should Not Register |
| return; |
| } |
| doRegister(url); |
| } |
| |
| public void doRegister(URL url) { |
| String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY); |
| if (registryCluster != null && url.getParameter(REGISTRY_CLUSTER_KEY) == null) { |
| url = url.addParameter(REGISTRY_CLUSTER_KEY, registryCluster); |
| } |
| if (writableMetadataService.exportURL(url)) { |
| if (logger.isInfoEnabled()) { |
| logger.info(format("The URL[%s] registered successfully.", url.toString())); |
| } |
| } else { |
| if (logger.isWarnEnabled()) { |
| logger.info(format("The URL[%s] has been registered.", url.toString())); |
| } |
| } |
| } |
| |
| @Override |
| public final void unregister(URL url) { |
| if (!shouldRegister(url)) { |
| return; |
| } |
| doUnregister(url); |
| } |
| |
| public void doUnregister(URL url) { |
| String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY); |
| if (registryCluster != null && url.getParameter(REGISTRY_CLUSTER_KEY) == null) { |
| url = url.addParameter(REGISTRY_CLUSTER_KEY, registryCluster); |
| } |
| if (writableMetadataService.unexportURL(url)) { |
| if (logger.isInfoEnabled()) { |
| logger.info(format("The URL[%s] deregistered successfully.", url.toString())); |
| } |
| } else { |
| if (logger.isWarnEnabled()) { |
| logger.info(format("The URL[%s] has been deregistered.", url.toString())); |
| } |
| } |
| } |
| |
| @Override |
| public final void subscribe(URL url, NotifyListener listener) { |
| if (!shouldSubscribe(url)) { // Should Not Subscribe |
| return; |
| } |
| String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY); |
| if (registryCluster != null && url.getParameter(REGISTRY_CLUSTER_KEY) == null) { |
| url = url.addParameter(REGISTRY_CLUSTER_KEY, registryCluster); |
| } |
| doSubscribe(url, listener); |
| } |
| |
| public void doSubscribe(URL url, NotifyListener listener) { |
| writableMetadataService.subscribeURL(url); |
| |
| Set<String> serviceNames = getServices(url, listener); |
| if (CollectionUtils.isEmpty(serviceNames)) { |
| throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url); |
| } |
| |
| subscribeURLs(url, listener, serviceNames); |
| } |
| |
| @Override |
| public final void unsubscribe(URL url, NotifyListener listener) { |
| if (!shouldSubscribe(url)) { // Should Not Subscribe |
| return; |
| } |
| String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY); |
| if (registryCluster != null && url.getParameter(REGISTRY_CLUSTER_KEY) == null) { |
| url = url.addParameter(REGISTRY_CLUSTER_KEY, registryCluster); |
| } |
| doUnsubscribe(url, listener); |
| } |
| |
| public void doUnsubscribe(URL url, NotifyListener listener) { |
| writableMetadataService.unsubscribeURL(url); |
| } |
| |
| @Override |
| public List<URL> lookup(URL url) { |
| throw new UnsupportedOperationException(""); |
| } |
| |
| @Override |
| public URL getUrl() { |
| return registryURL; |
| } |
| |
| @Override |
| public boolean isAvailable() { |
| return !serviceDiscovery.getServices().isEmpty(); |
| } |
| |
| @Override |
| public void destroy() { |
| AbstractRegistryFactory.removeDestroyedRegistry(this); |
| execute(() -> { |
| // stop ServiceDiscovery |
| serviceDiscovery.destroy(); |
| }); |
| } |
| |
| protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) { |
| String serviceNamesKey = serviceNames.toString(); |
| // register ServiceInstancesChangedListener |
| ServiceInstancesChangedListener serviceListener = serviceListeners.computeIfAbsent(serviceNamesKey, |
| k -> new ServiceInstancesChangedListener(serviceNames, serviceDiscovery)); |
| serviceListener.setUrl(url); |
| listener.addServiceListener(serviceListener); |
| |
| String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR + url.getParameter(PROTOCOL_KEY, DUBBO); |
| serviceListener.addListener(protocolServiceKey, listener); |
| registerServiceInstancesChangedListener(url, serviceListener); |
| |
| serviceNames.forEach(serviceName -> { |
| List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName); |
| serviceListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances)); |
| }); |
| |
| listener.notify(serviceListener.getUrls(protocolServiceKey)); |
| } |
| |
| /** |
| * Register the {@link ServiceInstancesChangedListener} If absent |
| * |
| * @param url {@link URL} |
| * @param listener the {@link ServiceInstancesChangedListener} |
| */ |
| private void registerServiceInstancesChangedListener(URL url, ServiceInstancesChangedListener listener) { |
| String listenerId = createListenerId(url, listener); |
| if (registeredListeners.add(listenerId)) { |
| serviceDiscovery.addServiceInstancesChangedListener(listener); |
| } |
| } |
| |
| private String createListenerId(URL url, ServiceInstancesChangedListener listener) { |
| return listener.getServiceNames() + ":" + url.toString(VERSION_KEY, GROUP_KEY, PROTOCOL_KEY); |
| } |
| |
| /** |
| * 1.developer explicitly specifies the application name this interface belongs to |
| * 2.check Interface-App mapping |
| * 3.use the services specified in registry url. |
| * |
| * @param subscribedURL |
| * @return |
| */ |
| protected Set<String> getServices(URL subscribedURL, final NotifyListener listener) { |
| Set<String> subscribedServices = new TreeSet<>(); |
| |
| String serviceNames = subscribedURL.getParameter(PROVIDED_BY); |
| if (StringUtils.isNotEmpty(serviceNames)) { |
| subscribedServices.addAll(parseServices(serviceNames)); |
| } |
| |
| serviceNames = subscribedURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY); |
| if (StringUtils.isNotEmpty(serviceNames)) { |
| subscribedServices.addAll(parseServices(serviceNames)); |
| } |
| |
| if (isEmpty(subscribedServices)) { |
| subscribedServices.addAll(findMappedServices(subscribedURL, new DefaultMappingListener(subscribedURL, subscribedServices, listener))); |
| if (isEmpty(subscribedServices)) { |
| subscribedServices.addAll(getSubscribedServices()); |
| } |
| } |
| return subscribedServices; |
| } |
| |
| public static Set<String> parseServices(String literalServices) { |
| return isBlank(literalServices) ? emptySet() : |
| unmodifiableSet(of(literalServices.split(",")) |
| .map(String::trim) |
| .filter(StringUtils::isNotEmpty) |
| .collect(toSet())); |
| } |
| |
| /** |
| * Get the subscribed service names |
| * |
| * @return non-null |
| */ |
| public Set<String> getSubscribedServices() { |
| return subscribedServices; |
| } |
| |
| /** |
| * Get the mapped services name by the specified {@link URL} |
| * |
| * @param subscribedURL |
| * @return |
| */ |
| protected Set<String> findMappedServices(URL subscribedURL, MappingListener listener) { |
| return serviceNameMapping.getAndListen(subscribedURL, listener); |
| } |
| |
| /** |
| * Create an instance of {@link ServiceDiscoveryRegistry} if supported |
| * |
| * @param registryURL the {@link URL url} of registry |
| * @return <code>null</code> if not supported |
| */ |
| public static ServiceDiscoveryRegistry create(URL registryURL) { |
| return supports(registryURL) ? new ServiceDiscoveryRegistry(registryURL) : null; |
| } |
| |
| /** |
| * Supports or not ? |
| * |
| * @param registryURL the {@link URL url} of registry |
| * @return if supported, return <code>true</code>, or <code>false</code> |
| */ |
| public static boolean supports(URL registryURL) { |
| return SERVICE_REGISTRY_TYPE.equalsIgnoreCase(registryURL.getParameter(REGISTRY_TYPE_KEY)); |
| } |
| |
| private static List<URL> filterSubscribedURLs(URL subscribedURL, List<URL> exportedURLs) { |
| return exportedURLs.stream() |
| .filter(url -> isSameServiceInterface(subscribedURL, url)) |
| .filter(url -> isSameParameter(subscribedURL, url, VERSION_KEY)) |
| .filter(url -> isSameParameter(subscribedURL, url, GROUP_KEY)) |
| .filter(url -> isCompatibleProtocol(subscribedURL, url)) |
| .collect(Collectors.toList()); |
| } |
| |
| private static boolean isSameServiceInterface(URL one, URL another) { |
| return Objects.equals(one.getServiceInterface(), another.getServiceInterface()); |
| } |
| |
| private static boolean isSameParameter(URL one, URL another, String key) { |
| return Objects.equals(one.getParameter(key), another.getParameter(key)); |
| } |
| |
| private static boolean isCompatibleProtocol(URL one, URL another) { |
| String protocol = one.getParameter(PROTOCOL_KEY); |
| return isCompatibleProtocol(protocol, another); |
| } |
| |
| private static boolean isCompatibleProtocol(String protocol, URL targetURL) { |
| return protocol == null || Objects.equals(protocol, targetURL.getParameter(PROTOCOL_KEY)) |
| || Objects.equals(protocol, targetURL.getProtocol()); |
| } |
| |
| private class DefaultMappingListener implements MappingListener { |
| private URL url; |
| private Set<String> oldApps; |
| private NotifyListener listener; |
| |
| public DefaultMappingListener(URL subscribedURL, Set<String> serviceNames, NotifyListener listener) { |
| this.url = subscribedURL; |
| this.oldApps = serviceNames; |
| this.listener = listener; |
| } |
| |
| @Override |
| public void onEvent(MappingChangedEvent event) { |
| Set<String> newApps = event.getApps(); |
| Set<String> tempOldApps = oldApps; |
| oldApps = newApps; |
| |
| if (CollectionUtils.isEmpty(newApps)) { |
| return; |
| } |
| |
| if (CollectionUtils.isEmpty(tempOldApps) && newApps.size() > 0) { |
| subscribeURLs(url, listener, newApps); |
| return; |
| } |
| |
| for (String newAppName : newApps) { |
| if (!tempOldApps.contains(newAppName)) { |
| subscribeURLs(url, listener, newApps); |
| return; |
| } |
| } |
| } |
| } |
| } |