blob: 098f6453a56e027b72c307e9ff8c3c2131667251 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.client;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.metadata.AbstractServiceNameMapping;
import org.apache.dubbo.metadata.MappingChangedEvent;
import org.apache.dubbo.metadata.MappingListener;
import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.rpc.model.ApplicationModel;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE;
import static org.apache.dubbo.common.function.ThrowableAction.execute;
import static org.apache.dubbo.common.utils.CollectionUtils.toTreeSet;
import static org.apache.dubbo.metadata.ServiceNameMapping.toStringKeys;
import static org.apache.dubbo.registry.client.ServiceDiscoveryFactory.getExtension;
/**
* TODO, this bridge implementation is not necessary now, protocol can interact with service discovery directly.
* <p>
* ServiceDiscoveryRegistry is a very special Registry implementation, which is used to bridge the old interface-level service discovery model
* with the new service discovery model introduced in 3.0 in a compatible manner.
* <p>
* It fully complies with the extension specification of the Registry SPI, but is different from the specific implementation of zookeeper and Nacos,
* because it does not interact with any real third-party registry, but only with the relevant components of ServiceDiscovery in the process.
* In short, it bridges the old interface model and the new service discovery model:
* <p>
* - register() aggregates interface level data into MetadataInfo by mainly interacting with MetadataService.
* - subscribe() triggers the whole subscribe process of the application level service discovery model.
* - Maps interface to applications depending on ServiceNameMapping.
* - Starts the new service discovery listener (InstanceListener) and makes NotifierListeners part of the InstanceListener.
*/
public class ServiceDiscoveryRegistry extends FailbackRegistry {
protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());
private final ServiceDiscovery serviceDiscovery;
private final AbstractServiceNameMapping serviceNameMapping;
/* apps - listener */
private final Map<String, ServiceInstancesChangedListener> serviceListeners = new ConcurrentHashMap<>();
private final Map<String, MappingListener> mappingListeners = new ConcurrentHashMap<>();
/* This lock has the same scope and lifecycle as its corresponding instance listener.
It's used to make sure that only one interface mapping to the same app list can do subscribe or unsubscribe at the same moment.
And the lock should be destroyed when listener destroying its corresponding instance listener.
* */
private final ConcurrentMap<String, Lock> appSubscriptionLocks = new ConcurrentHashMap<>();
public ServiceDiscoveryRegistry(URL registryURL, ApplicationModel applicationModel) {
super(registryURL);
this.serviceDiscovery = createServiceDiscovery(registryURL);
this.serviceNameMapping = (AbstractServiceNameMapping) ServiceNameMapping.getDefaultExtension(registryURL.getScopeModel());
super.applicationModel = applicationModel;
}
// Currently, for test purpose
protected ServiceDiscoveryRegistry(URL registryURL, ServiceDiscovery serviceDiscovery, ServiceNameMapping serviceNameMapping) {
super(registryURL);
this.serviceDiscovery = serviceDiscovery;
this.serviceNameMapping = (AbstractServiceNameMapping) serviceNameMapping;
}
public ServiceDiscovery getServiceDiscovery() {
return serviceDiscovery;
}
/**
* Create the {@link ServiceDiscovery} from the registry {@link URL}
*
* @param registryURL the {@link URL} to connect the registry
* @return non-null
*/
protected ServiceDiscovery createServiceDiscovery(URL registryURL) {
return getServiceDiscovery(registryURL.addParameter(INTERFACE_KEY, ServiceDiscovery.class.getName())
.removeParameter(REGISTRY_TYPE_KEY));
}
/**
* Get the instance {@link ServiceDiscovery} from the registry {@link URL} using
* {@link ServiceDiscoveryFactory} SPI
*
* @param registryURL the {@link URL} to connect the registry
* @return
*/
private ServiceDiscovery getServiceDiscovery(URL registryURL) {
ServiceDiscoveryFactory factory = getExtension(registryURL);
return factory.getServiceDiscovery(registryURL);
}
protected boolean shouldRegister(URL providerURL) {
String side = providerURL.getSide();
boolean should = PROVIDER_SIDE.equals(side); // Only register the Provider.
if (!should) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("The URL[%s] should not be registered.", providerURL));
}
}
if (!acceptable(providerURL)) {
logger.info("URL " + providerURL + " will not be registered to Registry. Registry " + this.getUrl() + " does not accept service of this protocol type.");
return false;
}
return should;
}
protected boolean shouldSubscribe(URL subscribedURL) {
return !shouldRegister(subscribedURL);
}
@Override
public final void register(URL url) {
if (!shouldRegister(url)) { // Should Not Register
return;
}
doRegister(url);
}
@Override
public void doRegister(URL url) {
// fixme, add registry-cluster is not necessary anymore
url = addRegistryClusterKey(url);
serviceDiscovery.register(url);
}
@Override
public final void unregister(URL url) {
if (!shouldRegister(url)) {
return;
}
doUnregister(url);
}
@Override
public void doUnregister(URL url) {
// fixme, add registry-cluster is not necessary anymore
url = addRegistryClusterKey(url);
serviceDiscovery.unregister(url);
}
@Override
public final void subscribe(URL url, NotifyListener listener) {
if (!shouldSubscribe(url)) { // Should Not Subscribe
return;
}
doSubscribe(url, listener);
}
@Override
public void doSubscribe(URL url, NotifyListener listener) {
url = addRegistryClusterKey(url);
serviceDiscovery.subscribe(url, listener);
boolean check = url.getParameter(CHECK_KEY, false);
String key = ServiceNameMapping.buildMappingKey(url);
Lock mappingLock = serviceNameMapping.getMappingLock(key);
try {
mappingLock.lock();
Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);
try {
MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
} catch (Exception e) {
logger.warn("Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);
}
if (CollectionUtils.isEmpty(subscribedServices)) {
logger.info("No interface-apps mapping found in local cache, stop subscribing, will automatically wait for mapping listener callback: " + url);
// if (check) {
// throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
// }
return;
}
subscribeURLs(url, listener, subscribedServices);
} finally {
mappingLock.unlock();
}
}
@Override
public final void unsubscribe(URL url, NotifyListener listener) {
if (!shouldSubscribe(url)) { // Should Not Subscribe
return;
}
url = addRegistryClusterKey(url);
doUnsubscribe(url, listener);
}
private URL addRegistryClusterKey(URL url) {
String registryCluster = serviceDiscovery.getUrl().getParameter(REGISTRY_CLUSTER_KEY);
if (registryCluster != null && url.getParameter(REGISTRY_CLUSTER_KEY) == null) {
url = url.addParameter(REGISTRY_CLUSTER_KEY, registryCluster);
}
return url;
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
// TODO: remove service name mapping listener
serviceDiscovery.unsubscribe(url, listener);
String protocolServiceKey = url.getProtocolServiceKey();
Set<String> serviceNames = serviceNameMapping.getCachedMapping(url);
serviceNameMapping.stopListen(url, mappingListeners.remove(protocolServiceKey));
if (CollectionUtils.isNotEmpty(serviceNames)) {
String serviceNamesKey = toStringKeys(serviceNames);
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
try {
appSubscriptionLock.lock();
ServiceInstancesChangedListener instancesChangedListener = serviceListeners.get(serviceNamesKey);
if (instancesChangedListener != null) {
instancesChangedListener.removeListener(protocolServiceKey, listener);
if (!instancesChangedListener.hasListeners()) {
instancesChangedListener.destroy();
serviceListeners.remove(serviceNamesKey);
removeAppSubscriptionLock(serviceNamesKey);
}
}
} finally {
appSubscriptionLock.unlock();
}
}
}
@Override
public List<URL> lookup(URL url) {
throw new UnsupportedOperationException("");
}
@Override
public boolean isAvailable() {
if (serviceDiscovery instanceof NopServiceDiscovery) {
// NopServiceDiscovery is designed for compatibility, check availability is meaningless, just return true
return true;
}
return !serviceDiscovery.isDestroy() && !serviceDiscovery.getServices().isEmpty();
}
@Override
public void destroy() {
registryManager.removeDestroyedRegistry(this);
// stop ServiceDiscovery
execute(serviceDiscovery::destroy);
// destroy all event listener
for (ServiceInstancesChangedListener listener : serviceListeners.values()) {
listener.destroy();
}
appSubscriptionLocks.clear();
serviceListeners.clear();
mappingListeners.clear();
}
@Override
public boolean isServiceDiscovery() {
return true;
}
protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
serviceNames = toTreeSet(serviceNames);
String serviceNamesKey = toStringKeys(serviceNames);
String serviceKey = url.getServiceKey();
logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, serviceKey));
// register ServiceInstancesChangedListener
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
try {
appSubscriptionLock.lock();
ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
if (serviceInstancesChangedListener == null) {
serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
serviceInstancesChangedListener.setUrl(url);
for (String serviceName : serviceNames) {
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
if (CollectionUtils.isNotEmpty(serviceInstances)) {
serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
}
if (!serviceInstancesChangedListener.isDestroyed()) {
serviceInstancesChangedListener.setUrl(url);
listener.addServiceListener(serviceInstancesChangedListener);
serviceInstancesChangedListener.addListenerAndNotify(url, listener);
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
} else {
logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
serviceListeners.remove(serviceNamesKey);
}
} finally {
appSubscriptionLock.unlock();
}
}
/**
* Supports or not ?
*
* @param registryURL the {@link URL url} of registry
* @return if supported, return <code>true</code>, or <code>false</code>
*/
public static boolean supports(URL registryURL) {
return SERVICE_REGISTRY_TYPE.equalsIgnoreCase(registryURL.getParameter(REGISTRY_TYPE_KEY));
}
public Map<String, ServiceInstancesChangedListener> getServiceListeners() {
return serviceListeners;
}
private class DefaultMappingListener implements MappingListener {
private final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(DefaultMappingListener.class);
private final URL url;
private Set<String> oldApps;
private NotifyListener listener;
private volatile boolean stopped;
public DefaultMappingListener(URL subscribedURL, Set<String> serviceNames, NotifyListener listener) {
this.url = subscribedURL;
this.oldApps = serviceNames;
this.listener = listener;
}
@Override
public synchronized void onEvent(MappingChangedEvent event) {
logger.info("Received mapping notification from meta server, " + event);
if (stopped) {
logger.warn("Listener has been stopped, ignore mapping notification, check why listener is not removed.");
return;
}
Set<String> newApps = event.getApps();
Set<String> tempOldApps = oldApps;
if (CollectionUtils.isEmpty(newApps) || CollectionUtils.equals(newApps, tempOldApps)) {
return;
}
logger.info("Mapping of service " + event.getServiceKey() + "changed from " + tempOldApps + " to " + newApps);
Lock mappingLock = serviceNameMapping.getMappingLock(event.getServiceKey());
try {
mappingLock.lock();
if (CollectionUtils.isEmpty(tempOldApps) && newApps.size() > 0) {
serviceNameMapping.putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
subscribeURLs(url, listener, newApps);
oldApps = newApps;
return;
}
for (String newAppName : newApps) {
if (!tempOldApps.contains(newAppName)) {
serviceNameMapping.removeCachedMapping(ServiceNameMapping.buildMappingKey(url));
serviceNameMapping.putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
// old instance listener related to old app list that needs to be destroyed after subscribe refresh.
ServiceInstancesChangedListener oldListener = listener.getServiceListener();
if (oldListener != null) {
String appKey = toStringKeys(toTreeSet(tempOldApps));
Lock appSubscriptionLock = getAppSubscription(appKey);
try {
appSubscriptionLock.lock();
oldListener.removeListener(url.getServiceKey(), listener);
if (!oldListener.hasListeners()) {
oldListener.destroy();
removeAppSubscriptionLock(appKey);
}
} finally {
appSubscriptionLock.unlock();
}
}
subscribeURLs(url, listener, newApps);
oldApps = newApps;
return;
}
}
} finally {
mappingLock.unlock();
}
}
@Override
public void stop() {
stopped = true;
}
}
public Lock getAppSubscription(String key) {
return appSubscriptionLocks.computeIfAbsent(key, _k -> new ReentrantLock());
}
public void removeAppSubscriptionLock(String key) {
Lock lock = appSubscriptionLocks.get(key);
if (lock != null) {
try {
lock.lock();
appSubscriptionLocks.remove(key);
} finally {
lock.unlock();
}
}
}
}