| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.registry.client.event.listener; |
| |
| import org.apache.dubbo.common.ProtocolServiceKey; |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.URLBuilder; |
| import org.apache.dubbo.common.constants.CommonConstants; |
| import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; |
| import org.apache.dubbo.common.utils.CollectionUtils; |
| import org.apache.dubbo.common.utils.ConcurrentHashSet; |
| import org.apache.dubbo.metadata.MetadataInfo; |
| import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo; |
| import org.apache.dubbo.registry.NotifyListener; |
| import org.apache.dubbo.registry.client.DefaultServiceInstance; |
| import org.apache.dubbo.registry.client.ServiceDiscovery; |
| import org.apache.dubbo.registry.client.ServiceInstance; |
| import org.apache.dubbo.registry.client.event.RetryServiceInstancesChangedEvent; |
| import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; |
| import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils; |
| import org.apache.dubbo.registry.client.metadata.ServiceInstanceNotificationCustomizer; |
| import org.apache.dubbo.rpc.model.ApplicationModel; |
| import org.apache.dubbo.rpc.model.ScopeModelUtil; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY; |
| import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_FAILED_REFRESH_ADDRESS; |
| import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL; |
| import static org.apache.dubbo.common.constants.RegistryConstants.ENABLE_EMPTY_PROTECTION_KEY; |
| import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION; |
| import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision; |
| |
| /** |
| * TODO, refactor to move revision-metadata mapping to ServiceDiscovery. Instances should have already been mapped with metadata when reached here. |
| * <p> |
| * The operations of ServiceInstancesChangedListener should be synchronized. |
| */ |
| public class ServiceInstancesChangedListener { |
| |
| private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(ServiceInstancesChangedListener.class); |
| |
| protected final Set<String> serviceNames; |
| protected final ServiceDiscovery serviceDiscovery; |
| protected URL url; |
| protected Map<String, Set<NotifyListenerWithKey>> listeners; |
| |
| protected AtomicBoolean destroyed = new AtomicBoolean(false); |
| |
| protected Map<String, List<ServiceInstance>> allInstances; |
| protected Map<String, List<ProtocolServiceKeyWithUrls>> serviceUrls; |
| |
| private volatile long lastRefreshTime; |
| private final Semaphore retryPermission; |
| private volatile ScheduledFuture<?> retryFuture; |
| private final ScheduledExecutorService scheduler; |
| private volatile boolean hasEmptyMetadata; |
| private final Set<ServiceInstanceNotificationCustomizer> serviceInstanceNotificationCustomizers; |
| |
| |
| public ServiceInstancesChangedListener(Set<String> serviceNames, ServiceDiscovery serviceDiscovery) { |
| this.serviceNames = serviceNames; |
| this.serviceDiscovery = serviceDiscovery; |
| this.listeners = new ConcurrentHashMap<>(); |
| this.allInstances = new HashMap<>(); |
| this.serviceUrls = new HashMap<>(); |
| retryPermission = new Semaphore(1); |
| ApplicationModel applicationModel = ScopeModelUtil.getApplicationModel(serviceDiscovery == null || serviceDiscovery.getUrl() == null ? null : serviceDiscovery.getUrl().getScopeModel()); |
| this.scheduler = applicationModel.getBeanFactory().getBean(FrameworkExecutorRepository.class).getMetadataRetryExecutor(); |
| this.serviceInstanceNotificationCustomizers = applicationModel.getExtensionLoader(ServiceInstanceNotificationCustomizer.class).getSupportedExtensionInstances(); |
| } |
| |
| /** |
| * On {@link ServiceInstancesChangedEvent the service instances change event} |
| * |
| * @param event {@link ServiceInstancesChangedEvent} |
| */ |
| public void onEvent(ServiceInstancesChangedEvent event) { |
| if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) { |
| return; |
| } |
| doOnEvent(event); |
| } |
| |
| /** |
| * @param event |
| */ |
| private synchronized void doOnEvent(ServiceInstancesChangedEvent event) { |
| if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) { |
| return; |
| } |
| |
| refreshInstance(event); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug(event.getServiceInstances().toString()); |
| } |
| |
| Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>(); |
| Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>(); |
| |
| // grouping all instances of this app(service name) by revision |
| for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) { |
| List<ServiceInstance> instances = entry.getValue(); |
| for (ServiceInstance instance : instances) { |
| String revision = getExportedServicesRevision(instance); |
| if (revision == null || EMPTY_REVISION.equals(revision)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Find instance without valid service metadata: " + instance.getAddress()); |
| } |
| continue; |
| } |
| List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>()); |
| subInstances.add(instance); |
| } |
| } |
| |
| // get MetadataInfo with revision |
| for (Map.Entry<String, List<ServiceInstance>> entry : revisionToInstances.entrySet()) { |
| String revision = entry.getKey(); |
| List<ServiceInstance> subInstances = entry.getValue(); |
| |
| MetadataInfo metadata = subInstances.stream() |
| .map(ServiceInstance::getServiceMetadata) |
| .filter(Objects::nonNull) |
| .filter(m -> revision.equals(m.getRevision())) |
| .findFirst() |
| .orElseGet(() -> serviceDiscovery.getRemoteMetadata(revision, subInstances)); |
| |
| parseMetadata(revision, metadata, localServiceToRevisions); |
| // update metadata into each instance, in case new instance created. |
| for (ServiceInstance tmpInstance : subInstances) { |
| MetadataInfo originMetadata = tmpInstance.getServiceMetadata(); |
| if (originMetadata == null || !Objects.equals(originMetadata.getRevision(), metadata.getRevision())) { |
| tmpInstance.setServiceMetadata(metadata); |
| } |
| } |
| } |
| |
| int emptyNum = hasEmptyMetadata(revisionToInstances); |
| if (emptyNum != 0) {// retry every 10 seconds |
| hasEmptyMetadata = true; |
| if (retryPermission.tryAcquire()) { |
| if (retryFuture != null && !retryFuture.isDone()) { |
| // cancel last retryFuture because only one retryFuture will be canceled at destroy(). |
| retryFuture.cancel(true); |
| } |
| try { |
| retryFuture = scheduler.schedule(new AddressRefreshRetryTask(retryPermission, event.getServiceName()), 10_000L, TimeUnit.MILLISECONDS); |
| } catch (Exception e) { |
| logger.error("Error submitting async retry task."); |
| } |
| logger.warn("Address refresh try task submitted"); |
| } |
| |
| // return if all metadata is empty, this notification will not take effect. |
| if (emptyNum == revisionToInstances.size()) { |
| // 1-17 - Address refresh failed. |
| logger.error(REGISTRY_FAILED_REFRESH_ADDRESS, "metadata Server failure", "", |
| "Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event."); |
| |
| return; |
| } |
| } |
| hasEmptyMetadata = false; |
| |
| Map<String, Map<Integer, Map<Set<String>, Object>>> protocolRevisionsToUrls = new HashMap<>(); |
| Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new HashMap<>(); |
| for (Map.Entry<ServiceInfo, Set<String>> entry : localServiceToRevisions.entrySet()) { |
| ServiceInfo serviceInfo = entry.getKey(); |
| Set<String> revisions = entry.getValue(); |
| |
| Map<Integer, Map<Set<String>, Object>> portToRevisions = protocolRevisionsToUrls.computeIfAbsent(serviceInfo.getProtocol(), k -> new HashMap<>()); |
| Map<Set<String>, Object> revisionsToUrls = portToRevisions.computeIfAbsent(serviceInfo.getPort(), k -> new HashMap<>()); |
| Object urls = revisionsToUrls.get(revisions); |
| if (urls == null) { |
| urls = getServiceUrlsCache(revisionToInstances, revisions, serviceInfo.getProtocol(), serviceInfo.getPort()); |
| revisionsToUrls.put(revisions, urls); |
| } |
| |
| List<ProtocolServiceKeyWithUrls> list = newServiceUrls.computeIfAbsent(serviceInfo.getPath(), k -> new LinkedList<>()); |
| list.add(new ProtocolServiceKeyWithUrls(serviceInfo.getProtocolServiceKey(), (List<URL>) urls)); |
| } |
| |
| this.serviceUrls = newServiceUrls; |
| this.notifyAddressChanged(); |
| } |
| |
| public synchronized void addListenerAndNotify(URL url, NotifyListener listener) { |
| if (destroyed.get()) { |
| return; |
| } |
| |
| Set<NotifyListenerWithKey> notifyListeners = this.listeners.computeIfAbsent(url.getServiceKey(), _k -> new ConcurrentHashSet<>()); |
| String protocol = listener.getConsumerUrl().getParameter(PROTOCOL_KEY, url.getProtocol()); |
| ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey(url.getServiceInterface(), url.getVersion(), url.getGroup(), |
| !CommonConstants.CONSUMER.equals(protocol) ? protocol : null); |
| NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(protocolServiceKey, listener); |
| notifyListeners.add(listenerWithKey); |
| |
| // Aggregate address and notify on subscription. |
| List<URL> urls = getAddresses(protocolServiceKey, listener.getConsumerUrl()); |
| |
| if (CollectionUtils.isNotEmpty(urls)) { |
| logger.info(String.format("Notify serviceKey: %s, listener: %s with %s urls on subscription", protocolServiceKey, listener, urls.size())); |
| listener.notify(urls); |
| } |
| } |
| |
| public synchronized void removeListener(String serviceKey, NotifyListener notifyListener) { |
| if (destroyed.get()) { |
| return; |
| } |
| |
| // synchronized method, no need to use DCL |
| Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey); |
| if (notifyListeners != null) { |
| notifyListeners.removeIf(listener -> listener.getNotifyListener().equals(notifyListener)); |
| |
| // ServiceKey has no listener, remove set |
| if (notifyListeners.size() == 0) { |
| this.listeners.remove(serviceKey); |
| } |
| } |
| } |
| |
| public boolean hasListeners() { |
| return CollectionUtils.isNotEmptyMap(listeners); |
| } |
| |
| /** |
| * Get the correlative service name |
| * |
| * @return the correlative service name |
| */ |
| public final Set<String> getServiceNames() { |
| return serviceNames; |
| } |
| |
| public void setUrl(URL url) { |
| this.url = url; |
| } |
| |
| public URL getUrl() { |
| return url; |
| } |
| |
| public Map<String, List<ServiceInstance>> getAllInstances() { |
| return allInstances; |
| } |
| |
| /** |
| * @param event {@link ServiceInstancesChangedEvent event} |
| * @return If service name matches, return <code>true</code>, or <code>false</code> |
| */ |
| private boolean accept(ServiceInstancesChangedEvent event) { |
| return serviceNames.contains(event.getServiceName()); |
| } |
| |
| protected boolean isRetryAndExpired(ServiceInstancesChangedEvent event) { |
| if (event instanceof RetryServiceInstancesChangedEvent) { |
| RetryServiceInstancesChangedEvent retryEvent = (RetryServiceInstancesChangedEvent) event; |
| logger.warn("Received address refresh retry event, " + retryEvent.getFailureRecordTime()); |
| if (retryEvent.getFailureRecordTime() < lastRefreshTime && !hasEmptyMetadata) { |
| logger.warn("Ignore retry event, event time: " + retryEvent.getFailureRecordTime() + ", last refresh time: " + lastRefreshTime); |
| return true; |
| } |
| logger.warn("Retrying address notification..."); |
| } |
| return false; |
| } |
| |
| private void refreshInstance(ServiceInstancesChangedEvent event) { |
| if (event instanceof RetryServiceInstancesChangedEvent) { |
| return; |
| } |
| String appName = event.getServiceName(); |
| List<ServiceInstance> appInstances = event.getServiceInstances(); |
| logger.info("Received instance notification, serviceName: " + appName + ", instances: " + appInstances.size()); |
| for (ServiceInstanceNotificationCustomizer serviceInstanceNotificationCustomizer : serviceInstanceNotificationCustomizers) { |
| serviceInstanceNotificationCustomizer.customize(appInstances); |
| } |
| allInstances.put(appName, appInstances); |
| lastRefreshTime = System.currentTimeMillis(); |
| } |
| |
| /** |
| * Calculate the number of revisions that failed to find metadata info. |
| * |
| * @param revisionToInstances instance list classified by revisions |
| * @return the number of revisions that failed at fetching MetadataInfo |
| */ |
| protected int hasEmptyMetadata(Map<String, List<ServiceInstance>> revisionToInstances) { |
| if (revisionToInstances == null) { |
| return 0; |
| } |
| |
| StringBuilder builder = new StringBuilder(); |
| int emptyMetadataNum = 0; |
| for (Map.Entry<String, List<ServiceInstance>> entry : revisionToInstances.entrySet()) { |
| DefaultServiceInstance serviceInstance = (DefaultServiceInstance) entry.getValue().get(0); |
| if (serviceInstance == null || serviceInstance.getServiceMetadata() == MetadataInfo.EMPTY) { |
| emptyMetadataNum++; |
| } |
| |
| builder.append(entry.getKey()); |
| builder.append(' '); |
| } |
| |
| if (emptyMetadataNum > 0) { |
| builder.insert(0, emptyMetadataNum + "/" + revisionToInstances.size() + " revisions failed to get metadata from remote: "); |
| logger.error(builder.toString()); |
| } else { |
| builder.insert(0, revisionToInstances.size() + " unique working revisions: "); |
| logger.info(builder.toString()); |
| } |
| return emptyMetadataNum; |
| } |
| |
| protected Map<ServiceInfo, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) { |
| Map<String, ServiceInfo> serviceInfos = metadata.getServices(); |
| for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) { |
| Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getValue(), _k -> new TreeSet<>()); |
| set.add(revision); |
| } |
| |
| return localServiceToRevisions; |
| } |
| |
| protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>> revisionToInstances, Set<String> revisions, String protocol, int port) { |
| List<URL> urls = new ArrayList<>(); |
| for (String r : revisions) { |
| for (ServiceInstance i : revisionToInstances.get(r)) { |
| if (port > 0) { |
| if (i.getPort() == port) { |
| urls.add(i.toURL(protocol).setScopeModel(i.getApplicationModel())); |
| } else { |
| urls.add(((DefaultServiceInstance) i).copyFrom(port).toURL(protocol).setScopeModel(i.getApplicationModel())); |
| } |
| continue; |
| } |
| // different protocols may have ports specified in meta |
| if (ServiceInstanceMetadataUtils.hasEndpoints(i)) { |
| DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol); |
| if (endpoint != null && endpoint.getPort() != i.getPort()) { |
| urls.add(((DefaultServiceInstance) i).copyFrom(endpoint).toURL(endpoint.getProtocol())); |
| continue; |
| } |
| } |
| urls.add(i.toURL(protocol).setScopeModel(i.getApplicationModel())); |
| } |
| } |
| return urls; |
| } |
| |
| protected List<URL> getAddresses(ProtocolServiceKey protocolServiceKey, URL consumerURL) { |
| List<ProtocolServiceKeyWithUrls> protocolServiceKeyWithUrlsList = serviceUrls.get(protocolServiceKey.getInterfaceName()); |
| List<URL> urls = new ArrayList<>(); |
| if (protocolServiceKeyWithUrlsList != null) { |
| for (ProtocolServiceKeyWithUrls protocolServiceKeyWithUrls : protocolServiceKeyWithUrlsList) { |
| if (ProtocolServiceKey.Matcher.isMatch(protocolServiceKey, protocolServiceKeyWithUrls.getProtocolServiceKey())) { |
| urls.addAll(protocolServiceKeyWithUrls.getUrls()); |
| } |
| } |
| } |
| if (serviceUrls.containsKey(CommonConstants.ANY_VALUE)) { |
| for (ProtocolServiceKeyWithUrls protocolServiceKeyWithUrls : serviceUrls.get(CommonConstants.ANY_VALUE)) { |
| urls.addAll(protocolServiceKeyWithUrls.getUrls()); |
| } |
| } |
| return urls; |
| } |
| |
| /** |
| * race condition is protected by onEvent/doOnEvent |
| */ |
| protected void notifyAddressChanged() { |
| // 1 different services |
| listeners.forEach((serviceKey, listenerSet) -> { |
| // 2 multiple subscription listener of the same service |
| for (NotifyListenerWithKey listenerWithKey : listenerSet) { |
| NotifyListener notifyListener = listenerWithKey.getNotifyListener(); |
| |
| List<URL> urls = toUrlsWithEmpty(getAddresses(listenerWithKey.getProtocolServiceKey(), notifyListener.getConsumerUrl())); |
| logger.info("Notify service " + listenerWithKey.getProtocolServiceKey() + " with urls " + urls.size()); |
| notifyListener.notify(urls); |
| } |
| }); |
| } |
| |
| protected List<URL> toUrlsWithEmpty(List<URL> urls) { |
| boolean emptyProtectionEnabled = serviceDiscovery.getUrl().getParameter(ENABLE_EMPTY_PROTECTION_KEY, true); |
| if (!emptyProtectionEnabled && urls == null) { |
| urls = new ArrayList<>(); |
| } else if (emptyProtectionEnabled && urls == null) { |
| urls = Collections.emptyList(); |
| } |
| |
| if (CollectionUtils.isEmpty(urls) && !emptyProtectionEnabled) { |
| // notice that the service of this.url may not be the same as notify listener. |
| URL empty = URLBuilder.from(this.url).setProtocol(EMPTY_PROTOCOL).build(); |
| urls.add(empty); |
| } |
| return urls; |
| } |
| |
| /** |
| * Since this listener is shared among interfaces, destroy this listener only when all interface listener are unsubscribed |
| */ |
| public void destroy() { |
| if (destroyed.compareAndSet(false, true)) { |
| logger.info("Destroying instance listener of " + this.getServiceNames()); |
| serviceDiscovery.removeServiceInstancesChangedListener(this); |
| synchronized (this) { |
| allInstances.clear(); |
| serviceUrls.clear(); |
| listeners.clear(); |
| if (retryFuture != null && !retryFuture.isDone()) { |
| retryFuture.cancel(true); |
| } |
| } |
| } |
| } |
| |
| public boolean isDestroyed() { |
| return destroyed.get(); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (!(o instanceof ServiceInstancesChangedListener)) { |
| return false; |
| } |
| ServiceInstancesChangedListener that = (ServiceInstancesChangedListener) o; |
| return Objects.equals(getServiceNames(), that.getServiceNames()); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(getClass(), getServiceNames()); |
| } |
| |
| protected class AddressRefreshRetryTask implements Runnable { |
| private final RetryServiceInstancesChangedEvent retryEvent; |
| private final Semaphore retryPermission; |
| |
| public AddressRefreshRetryTask(Semaphore semaphore, String serviceName) { |
| this.retryEvent = new RetryServiceInstancesChangedEvent(serviceName); |
| this.retryPermission = semaphore; |
| } |
| |
| @Override |
| public void run() { |
| retryPermission.release(); |
| ServiceInstancesChangedListener.this.onEvent(retryEvent); |
| } |
| } |
| |
| public static class NotifyListenerWithKey { |
| private final ProtocolServiceKey protocolServiceKey; |
| private final NotifyListener notifyListener; |
| |
| public NotifyListenerWithKey(ProtocolServiceKey protocolServiceKey, NotifyListener notifyListener) { |
| this.protocolServiceKey = protocolServiceKey; |
| this.notifyListener = notifyListener; |
| } |
| |
| public ProtocolServiceKey getProtocolServiceKey() { |
| return protocolServiceKey; |
| } |
| |
| public NotifyListener getNotifyListener() { |
| return notifyListener; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| NotifyListenerWithKey that = (NotifyListenerWithKey) o; |
| return Objects.equals(protocolServiceKey, that.protocolServiceKey) && Objects.equals(notifyListener, that.notifyListener); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(protocolServiceKey, notifyListener); |
| } |
| } |
| |
| public static class ProtocolServiceKeyWithUrls { |
| private final ProtocolServiceKey protocolServiceKey; |
| private final List<URL> urls; |
| |
| public ProtocolServiceKeyWithUrls(ProtocolServiceKey protocolServiceKey, List<URL> urls) { |
| this.protocolServiceKey = protocolServiceKey; |
| this.urls = urls; |
| } |
| |
| public ProtocolServiceKey getProtocolServiceKey() { |
| return protocolServiceKey; |
| } |
| |
| public List<URL> getUrls() { |
| return urls; |
| } |
| } |
| } |