blob: df573d1c73638f1d4bef6733ce7ab92a38403f8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.client;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.registry.AddressListener;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.integration.DynamicDirectory;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> implements NotifyListener {
private static final Logger logger = LoggerFactory.getLogger(ServiceDiscoveryRegistryDirectory.class);
// instance address to invoker mapping.
private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private ServiceInstancesChangedListener listener;
public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) {
super(serviceType, url);
}
@Override
public boolean isAvailable() {
if (isDestroyed()) {
return false;
}
Map<String, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
if (invoker.isAvailable()) {
return true;
}
}
}
return false;
}
@Override
public synchronized void notify(List<URL> instanceUrls) {
// Set the context of the address notification thread.
RpcContext.setRpcContext(getConsumerUrl());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
instanceUrls = addressListener.notify(instanceUrls, getConsumerUrl(), this);
}
}
refreshInvoker(instanceUrls);
}
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty url list to clear address.");
if (invokerUrls.size() == 0) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
return;
}
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (CollectionUtils.isEmpty(invokerUrls)) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
if (oldUrlInvokerMap != null) {
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
// notify invokers refreshed
this.invokersChanged();
}
/**
* Turn urls into invokers, and if url has been refer, will not re-reference.
*
* @param urls
* @return invokers
*/
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
for (URL url : urls) {
InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(instanceAddressURL.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + instanceAddressURL.getProtocol() +
" in notified url: " + instanceAddressURL + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
// FIXME, some keys may need to be removed.
instanceAddressURL.addConsumerParams(getConsumerUrl().getProtocolServiceKey(), queryMap);
Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(instanceAddressURL.getAddress());
if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again
try {
boolean enabled = true;
if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false);
} else {
enabled = instanceAddressURL.getParameter(ENABLED_KEY, true);
}
if (enabled) {
invoker = protocol.refer(serviceType, instanceAddressURL);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
}
} else {
newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
}
}
return newUrlInvokerMap;
}
private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL) {
InstanceAddressURL oldURL = (InstanceAddressURL) invoker.getUrl();
if (!newURL.getInstance().equals(oldURL.getInstance())) {
return true;
}
return !oldURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey())
.equals(newURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey()));
}
private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
return invokers;
}
/**
* Close all invokers
*/
@Override
protected void destroyAllInvokers() {
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
if (localUrlInvokerMap != null) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
try {
invoker.destroy();
} catch (Throwable t) {
logger.warn("Failed to destroy service " + serviceKey + " to provider " + invoker.getUrl(), t);
}
}
localUrlInvokerMap.clear();
}
invokers = null;
}
/**
* Check whether the invoker in the cache needs to be destroyed
* If set attribute of url: refer.autodestroy=false, the invokers will only increase without decreasing,there may be a refer leak
*
* @param oldUrlInvokerMap
* @param newUrlInvokerMap
*/
private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
// check deleted invoker
List<String> deleted = null;
if (oldUrlInvokerMap != null) {
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<>();
}
deleted.add(entry.getKey());
}
}
}
if (deleted != null) {
for (String addressKey : deleted) {
if (addressKey != null) {
Invoker<T> invoker = oldUrlInvokerMap.remove(addressKey);
if (invoker != null) {
try {
invoker.destroy();
if (logger.isDebugEnabled()) {
logger.debug("destroy invoker[" + invoker.getUrl() + "] success. ");
}
} catch (Exception e) {
logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e);
}
}
}
}
}
}
}