blob: 02c8c15d8a5dcafe2e7f75076a01495a1dff61e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.client;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metadata.InstanceMetadataChangedListener;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.metadata.RevisionResolver;
import org.apache.dubbo.registry.Constants;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.client.metadata.MetadataServiceDelegation;
import org.apache.dubbo.registry.client.metadata.MetadataUtils;
import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;
import org.apache.dubbo.rpc.service.Destroyable;
import com.alibaba.fastjson.JSONObject;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_FAILED_NOTIFY_EVENT;
public class ReflectionBasedServiceDiscovery extends AbstractServiceDiscovery {
private final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());
/**
* Echo check if consumer is still work
* echo task may take a lot of time when consumer offline, create a new ScheduledThreadPool
*/
private final ScheduledExecutorService echoCheckExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-Registry-EchoCheck-Consumer"));
// =================================== Provider side =================================== //
/**
* Local {@link ServiceInstance} Metadata's revision
*/
private String lastMetadataRevision;
// =================================== Consumer side =================================== //
/**
* Local Cache of {@link ServiceInstance} Metadata
* <p>
* Key - {@link ServiceInstance} ID ( usually ip + port )
* Value - Json processed metadata string
*/
private final ConcurrentHashMap<String, String> metadataMap = new ConcurrentHashMap<>();
/**
* Local Cache of {@link ServiceInstance}
* <p>
* Key - Service Name
* Value - List {@link ServiceInstance}
*/
private final ConcurrentHashMap<String, List<ServiceInstance>> cachedServiceInstances = new ConcurrentHashMap<>();
private final MetadataServiceDelegation metadataService;
public ConcurrentMap<String, MetadataService> metadataServiceProxies = new ConcurrentHashMap<>();
/**
* Local Cache of Service's {@link ServiceInstance} list revision,
* used to check if {@link ServiceInstance} list has been updated
* <p>
* Key - ServiceName
* Value - a revision calculate from {@link List} of {@link ServiceInstance}
*/
private final ConcurrentHashMap<String, String> serviceInstanceRevisionMap = new ConcurrentHashMap<>();
public ReflectionBasedServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
super(applicationModel, registryURL);
long echoPollingCycle = registryURL.getParameter(Constants.ECHO_POLLING_CYCLE_KEY, Constants.DEFAULT_ECHO_POLLING_CYCLE);
this.metadataService = applicationModel.getBeanFactory().getOrRegisterBean(MetadataServiceDelegation.class);
// Echo check: test if consumer is offline, remove MetadataChangeListener,
// reduce the probability of failure when metadata update
echoCheckExecutor.scheduleAtFixedRate(() -> {
Map<String, InstanceMetadataChangedListener> listenerMap = metadataService.getInstanceMetadataChangedListenerMap();
Iterator<Map.Entry<String, InstanceMetadataChangedListener>> iterator = listenerMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, InstanceMetadataChangedListener> entry = iterator.next();
try {
entry.getValue().echo(CommonConstants.DUBBO);
} catch (RpcException e) {
if (logger.isInfoEnabled()) {
logger.info("Send echo message to consumer error. Possible cause: consumer is offline.");
}
iterator.remove();
}
}
}, echoPollingCycle, echoPollingCycle, TimeUnit.MILLISECONDS);
}
public void doInitialize(URL registryURL) {
}
@Override
public void doDestroy() throws Exception {
metadataMap.clear();
serviceInstanceRevisionMap.clear();
echoCheckExecutor.shutdown();
}
private void updateInstanceMetadata(ServiceInstance serviceInstance) {
String metadataString = JSONObject.toJSONString(serviceInstance.getMetadata());
String metadataRevision = RevisionResolver.calRevision(metadataString);
// check if metadata updated
if (!metadataRevision.equalsIgnoreCase(lastMetadataRevision)) {
if (logger.isDebugEnabled()) {
logger.debug("Update Service Instance Metadata of DNS registry. Newer metadata: " + metadataString);
}
lastMetadataRevision = metadataRevision;
// save the newest metadata to local
metadataService.exportInstanceMetadata(metadataString);
// notify to consumer
Map<String, InstanceMetadataChangedListener> listenerMap = metadataService.getInstanceMetadataChangedListenerMap();
Iterator<Map.Entry<String, InstanceMetadataChangedListener>> iterator = listenerMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, InstanceMetadataChangedListener> entry = iterator.next();
try {
entry.getValue().onEvent(metadataString);
} catch (RpcException e) {
// 1-7 - Failed to notify registry event.
// The updating of metadata to consumer is a type of registry event.
logger.warn(REGISTRY_FAILED_NOTIFY_EVENT, "consumer is offline", "",
"Notify to consumer error, removing listener.");
// remove listener if consumer is offline
iterator.remove();
}
}
}
}
@Override
public void doRegister(ServiceInstance serviceInstance) throws RuntimeException {
updateInstanceMetadata(serviceInstance);
}
@Override
public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
updateInstanceMetadata(serviceInstance);
}
@Override
public void doUnregister(ServiceInstance serviceInstance) throws RuntimeException {
// notify empty message to consumer
metadataService.exportInstanceMetadata("");
metadataService.getInstanceMetadataChangedListenerMap().forEach((consumerId, listener) -> listener.onEvent(""));
metadataService.getInstanceMetadataChangedListenerMap().clear();
}
@SuppressWarnings("unchecked")
public final void fillServiceInstance(DefaultServiceInstance serviceInstance) {
String hostId = serviceInstance.getAddress();
if (metadataMap.containsKey(hostId)) {
// Use cached metadata.
// Metadata will be updated by provider callback
String metadataString = metadataMap.get(hostId);
serviceInstance.setMetadata(JSONObject.parseObject(metadataString, Map.class));
} else {
// refer from MetadataUtils, this proxy is different from the one used to refer exportedURL
MetadataService metadataService = getMetadataServiceProxy(serviceInstance);
String consumerId = ScopeModelUtil.getApplicationModel(registryURL.getScopeModel()).getApplicationName() + NetUtils.getLocalHost();
String metadata = metadataService.getAndListenInstanceMetadata(
consumerId, metadataString -> {
if (logger.isDebugEnabled()) {
logger.debug("Receive callback: " + metadataString + serviceInstance);
}
if (StringUtils.isEmpty(metadataString)) {
// provider is shutdown
metadataMap.remove(hostId);
} else {
metadataMap.put(hostId, metadataString);
}
});
metadataMap.put(hostId, metadata);
serviceInstance.setMetadata(JSONObject.parseObject(metadata, Map.class));
}
}
public final void notifyListener(String serviceName, ServiceInstancesChangedListener listener, List<ServiceInstance> instances) {
String serviceInstanceRevision = RevisionResolver.calRevision(JSONObject.toJSONString(instances));
boolean changed = !serviceInstanceRevision.equalsIgnoreCase(
serviceInstanceRevisionMap.put(serviceName, serviceInstanceRevision));
if (logger.isDebugEnabled()) {
logger.debug("Service changed event received (possibly because of DNS polling). " +
"Service Instance changed: " + changed + " Service Name: " + serviceName);
}
if (changed) {
List<ServiceInstance> oldServiceInstances = cachedServiceInstances.getOrDefault(serviceName, new LinkedList<>());
// remove expired invoker
Set<ServiceInstance> allServiceInstances = new HashSet<>(oldServiceInstances.size() + instances.size());
allServiceInstances.addAll(oldServiceInstances);
allServiceInstances.addAll(instances);
oldServiceInstances.forEach(allServiceInstances::remove);
allServiceInstances.forEach(this::destroyMetadataServiceProxy);
cachedServiceInstances.put(serviceName, instances);
listener.onEvent(new ServiceInstancesChangedEvent(serviceName, instances));
}
}
@Override
public Set<String> getServices() {
return Collections.emptySet();
}
@Override
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
return Collections.emptyList();
}
private String computeKey(ServiceInstance serviceInstance) {
return serviceInstance.getServiceName() + "##" + serviceInstance.getAddress() + "##" +
ServiceInstanceMetadataUtils.getExportedServicesRevision(serviceInstance);
}
private synchronized MetadataService getMetadataServiceProxy(ServiceInstance instance) {
return metadataServiceProxies.computeIfAbsent(computeKey(instance), k -> MetadataUtils.referProxy(instance).getProxy());
}
private synchronized void destroyMetadataServiceProxy(ServiceInstance instance) {
String key = computeKey(instance);
if (metadataServiceProxies.containsKey(key)) {
Object metadataServiceProxy = metadataServiceProxies.remove(key);
if (metadataServiceProxy instanceof Destroyable) {
((Destroyable) metadataServiceProxy).$destroy();
}
}
}
/**
* UT used only
*/
@Deprecated
public final ConcurrentHashMap<String, List<ServiceInstance>> getCachedServiceInstances() {
return cachedServiceInstances;
}
}