blob: e99b40cd3d3ed29a11fc8c48c3f58dea4ac7fa47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.client.event.listener;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.event.ConditionalEventListener;
import org.apache.dubbo.event.EventListener;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.RegistryClusterIdentifier;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.metadata.MetadataUtils;
import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY;
import static org.apache.dubbo.metadata.MetadataInfo.DEFAULT_REVISION;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
/**
* The Service Discovery Changed {@link EventListener Event Listener}
*
* @see ServiceInstancesChangedEvent
* @since 2.7.5
*/
public class ServiceInstancesChangedListener implements ConditionalEventListener<ServiceInstancesChangedEvent> {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstancesChangedListener.class);
private final Set<String> serviceNames;
private final ServiceDiscovery serviceDiscovery;
private URL url;
private Map<String, NotifyListener> listeners;
private Map<String, List<ServiceInstance>> allInstances;
private Map<String, List<URL>> serviceUrls;
private Map<String, MetadataInfo> revisionToMetadata;
public ServiceInstancesChangedListener(Set<String> serviceNames, ServiceDiscovery serviceDiscovery) {
this.serviceNames = serviceNames;
this.serviceDiscovery = serviceDiscovery;
this.listeners = new HashMap<>();
this.allInstances = new HashMap<>();
this.serviceUrls = new HashMap<>();
this.revisionToMetadata = new HashMap<>();
}
/**
* On {@link ServiceInstancesChangedEvent the service instances change event}
*
* @param event {@link ServiceInstancesChangedEvent}
*/
public synchronized void onEvent(ServiceInstancesChangedEvent event) {
logger.info("Received instance notification, serviceName: " + event.getServiceName() + ", instances: " + event.getServiceInstances().size());
String appName = event.getServiceName();
allInstances.put(appName, event.getServiceInstances());
Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
Map<Set<String>, List<URL>> revisionsToUrls = new HashMap();
for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
List<ServiceInstance> instances = entry.getValue();
for (ServiceInstance instance : instances) {
String revision = getExportedServicesRevision(instance);
if (DEFAULT_REVISION.equals(revision)) {
logger.info("Find instance without valid service metadata: " + instance.getAddress());
continue;
}
List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
subInstances.add(instance);
MetadataInfo metadata = revisionToMetadata.get(revision);
if (metadata == null) {
metadata = getMetadataInfo(instance);
logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + " is " + metadata);
if (metadata != null) {
revisionToMetadata.put(revision, getMetadataInfo(instance));
} else {
}
}
if (metadata != null) {
parseMetadata(revision, metadata, localServiceToRevisions);
((DefaultServiceInstance) instance).setServiceMetadata(metadata);
}
// else {
// logger.error("Failed to load service metadata for instance " + instance);
// Set<String> set = localServiceToRevisions.computeIfAbsent(url.getServiceKey(), k -> new TreeSet<>());
// set.add(revision);
// }
localServiceToRevisions.forEach((serviceKey, revisions) -> {
List<URL> urls = revisionsToUrls.get(revisions);
if (urls != null) {
serviceUrls.put(serviceKey, urls);
} else {
urls = new ArrayList<>();
for (String r : revisions) {
for (ServiceInstance i : revisionToInstances.get(r)) {
urls.add(i.toURL());
}
}
revisionsToUrls.put(revisions, urls);
serviceUrls.put(serviceKey, urls);
}
});
}
}
this.notifyAddressChanged();
}
private Map<String, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<String, Set<String>> localServiceToRevisions) {
Map<String, ServiceInfo> serviceInfos = metadata.getServices();
for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) {
Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getKey(), k -> new TreeSet<>());
set.add(revision);
}
return localServiceToRevisions;
}
private MetadataInfo getMetadataInfo(ServiceInstance instance) {
String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
// FIXME, check "REGISTRY_CLUSTER_KEY" must be set by every registry implementation.
instance.getExtendParams().putIfAbsent(REGISTRY_CLUSTER_KEY, RegistryClusterIdentifier.getExtension(url).consumerKey(url));
MetadataInfo metadataInfo;
try {
if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getRemoteMetadataService();
metadataInfo = remoteMetadataService.getMetadata(instance);
} else {
MetadataService metadataServiceProxy = MetadataUtils.getMetadataServiceProxy(instance, serviceDiscovery);
metadataInfo = metadataServiceProxy.getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
}
} catch (Exception e) {
logger.error("Failed to load service metadata, metadta type is " + metadataType, e);
metadataInfo = null;
// TODO, load metadata backup. Stop getting metadata after x times of failure for one revision?
}
return metadataInfo;
}
private void notifyAddressChanged() {
listeners.forEach((key, notifyListener) -> {
//FIXME, group wildcard match
notifyListener.notify(toUrlsWithEmpty(serviceUrls.get(key)));
});
}
private List<URL> toUrlsWithEmpty(List<URL> urls) {
if (urls == null) {
urls = Collections.emptyList();
}
return urls;
}
public void addListener(String serviceKey, NotifyListener listener) {
this.listeners.put(serviceKey, listener);
}
public List<URL> getUrls(String serviceKey) {
return toUrlsWithEmpty(serviceUrls.get(serviceKey));
}
/**
* Get the correlative service name
*
* @return the correlative service name
*/
public final Set<String> getServiceNames() {
return serviceNames;
}
public void setUrl(URL url) {
this.url = url;
}
public URL getUrl() {
return url;
}
/**
* @param event {@link ServiceInstancesChangedEvent event}
* @return If service name matches, return <code>true</code>, or <code>false</code>
*/
public final boolean accept(ServiceInstancesChangedEvent event) {
return serviceNames.contains(event.getServiceName());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ServiceInstancesChangedListener)) return false;
ServiceInstancesChangedListener that = (ServiceInstancesChangedListener) o;
return Objects.equals(getServiceNames(), that.getServiceNames());
}
@Override
public int hashCode() {
return Objects.hash(getClass(), getServiceNames());
}
}