blob: 167cdd2105e90b3f634654fb634c23144c62fd64 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.integration;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.AddressListener;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.client.migration.InvokersChangedListener;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Configurator;
import org.apache.dubbo.rpc.cluster.Constants;
import org.apache.dubbo.rpc.cluster.RouterChain;
import org.apache.dubbo.rpc.cluster.RouterFactory;
import org.apache.dubbo.rpc.cluster.directory.AbstractDirectory;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.rpc.model.ModuleModel;
import java.util.List;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_FAILED_DESTROY_SERVICE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_FAILED_DESTROY_UNREGISTER_URL;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_SITE_SELECTION;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
import static org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
/**
* DynamicDirectory
*/
public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(DynamicDirectory.class);
protected final Cluster cluster;
protected final RouterFactory routerFactory;
/**
* Initialization at construction time, assertion not null
*/
protected final String serviceKey;
/**
* Initialization at construction time, assertion not null
*/
protected final Class<T> serviceType;
/**
* Initialization at construction time, assertion not null, and always assign non-null value
*/
protected final URL directoryUrl;
protected final boolean multiGroup;
/**
* Initialization at the time of injection, the assertion is not null
*/
protected Protocol protocol;
/**
* Initialization at the time of injection, the assertion is not null
*/
protected Registry registry;
protected volatile boolean forbidden = false;
protected boolean shouldRegister;
protected boolean shouldSimplified;
/**
* Initialization at construction time, assertion not null, and always assign not null value
*/
protected volatile URL subscribeUrl;
protected volatile URL registeredConsumerUrl;
/**
* The initial value is null and the midway may be assigned to null, please use the local variable reference
* override rules
* Priority: override>-D>consumer>provider
* Rule one: for a certain provider <ip:port,timeout=100>
* Rule two: for all providers <* ,timeout=5000>
*/
protected volatile List<Configurator> configurators;
protected ServiceInstancesChangedListener serviceListener;
/**
* Should continue route if directory is empty
*/
private final boolean shouldFailFast;
private volatile InvokersChangedListener invokersChangedListener;
private volatile boolean invokersChanged;
public DynamicDirectory(Class<T> serviceType, URL url) {
super(url, true);
ModuleModel moduleModel = url.getOrDefaultModuleModel();
this.cluster = moduleModel.getExtensionLoader(Cluster.class).getAdaptiveExtension();
this.routerFactory = moduleModel.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
if (serviceType == null) {
throw new IllegalArgumentException("service type is null.");
}
if (StringUtils.isEmpty(url.getServiceKey())) {
throw new IllegalArgumentException("registry serviceKey is null.");
}
this.shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);
this.shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);
this.serviceType = serviceType;
this.serviceKey = super.getConsumerUrl().getServiceKey();
this.directoryUrl = consumerUrl;
String group = directoryUrl.getGroup("");
this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
this.shouldFailFast = Boolean.parseBoolean(ConfigurationUtils.getProperty(moduleModel, Constants.SHOULD_FAIL_FAST_KEY, "true"));
}
@Override
public void addServiceListener(ServiceInstancesChangedListener instanceListener) {
this.serviceListener = instanceListener;
}
@Override
public ServiceInstancesChangedListener getServiceListener() {
return this.serviceListener;
}
public void setProtocol(Protocol protocol) {
this.protocol = protocol;
}
public void setRegistry(Registry registry) {
this.registry = registry;
}
public Registry getRegistry() {
return registry;
}
public boolean isShouldRegister() {
return shouldRegister;
}
public void subscribe(URL url) {
setSubscribeUrl(url);
registry.subscribe(url, this);
}
public void unSubscribe(URL url) {
setSubscribeUrl(null);
registry.unsubscribe(url, this);
}
@Override
public List<Invoker<T>> doList(BitList<Invoker<T>> invokers, Invocation invocation) {
if (forbidden && shouldFailFast) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
", please check status of providers(disabled, not registered or in blacklist).");
}
if (multiGroup) {
return this.getInvokers();
}
try {
// Get invokers from cache, only runtime routers will be executed.
List<Invoker<T>> result = routerChain.route(getConsumerUrl(), invokers, invocation);
return result == null ? BitList.emptyList() : result;
} catch (Throwable t) {
// 2-1 - Failed to execute routing.
logger.error(CLUSTER_FAILED_SITE_SELECTION, "", "",
"Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
return BitList.emptyList();
}
}
@Override
public Class<T> getInterface() {
return serviceType;
}
@Override
public List<Invoker<T>> getAllInvokers() {
return this.getInvokers();
}
/**
* The currently effective consumer url
*
* @return URL
*/
@Override
public URL getConsumerUrl() {
return this.consumerUrl;
}
/**
* The original consumer url
*
* @return URL
*/
public URL getOriginalConsumerUrl() {
return this.consumerUrl;
}
/**
* The url registered to registry or metadata center
*
* @return URL
*/
public URL getRegisteredConsumerUrl() {
return registeredConsumerUrl;
}
/**
* The url used to subscribe from registry
*
* @return URL
*/
public URL getSubscribeUrl() {
return subscribeUrl;
}
public void setSubscribeUrl(URL subscribeUrl) {
this.subscribeUrl = subscribeUrl;
}
public void setRegisteredConsumerUrl(URL url) {
if (!shouldSimplified) {
this.registeredConsumerUrl = url.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY,
String.valueOf(false));
} else {
this.registeredConsumerUrl = URL.valueOf(url, DEFAULT_REGISTER_CONSUMER_KEYS, null).addParameters(
CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY, String.valueOf(false));
}
}
public void buildRouterChain(URL url) {
this.setRouterChain(RouterChain.buildChain(getInterface(), url));
}
@Override
public boolean isAvailable() {
if (isDestroyed() || this.forbidden) {
return false;
}
return CollectionUtils.isNotEmpty(getValidInvokers())
&& getValidInvokers().stream().anyMatch(Invoker::isAvailable);
}
@Override
public void destroy() {
if (isDestroyed()) {
return;
}
// unregister.
try {
if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) {
registry.unregister(getRegisteredConsumerUrl());
}
} catch (Throwable t) {
// 1-8: Failed to unregister / unsubscribe url on destroy.
logger.warn(REGISTRY_FAILED_DESTROY_UNREGISTER_URL, "", "",
"unexpected error when unregister service " + serviceKey + " from registry: " + registry.getUrl(), t);
}
// unsubscribe.
try {
if (getSubscribeUrl() != null && registry != null && registry.isAvailable()) {
registry.unsubscribe(getSubscribeUrl(), this);
}
} catch (Throwable t) {
// 1-8: Failed to unregister / unsubscribe url on destroy.
logger.warn(REGISTRY_FAILED_DESTROY_UNREGISTER_URL, "", "",
"unexpected error when unsubscribe service " + serviceKey + " from registry: " + registry.getUrl(), t);
}
ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (CollectionUtils.isNotEmpty(supportedListeners)) {
for (AddressListener addressListener : supportedListeners) {
addressListener.destroy(getConsumerUrl(), this);
}
}
synchronized (this) {
try {
destroyAllInvokers();
} catch (Throwable t) {
// 1-15 - Failed to destroy service.
logger.warn(REGISTRY_FAILED_DESTROY_SERVICE, "", "",
"Failed to destroy service " + serviceKey, t);
}
routerChain.destroy();
invokersChangedListener = null;
serviceListener = null;
super.destroy(); // must be executed after unsubscribing
}
}
@Override
public void discordAddresses() {
try {
destroyAllInvokers();
} catch (Throwable t) {
// 1-15 - Failed to destroy service.
logger.warn(REGISTRY_FAILED_DESTROY_SERVICE, "", "",
"Failed to destroy service " + serviceKey, t);
}
}
public synchronized void setInvokersChangedListener(InvokersChangedListener listener) {
this.invokersChangedListener = listener;
if (invokersChangedListener != null && invokersChanged) {
invokersChangedListener.onChange();
}
}
protected synchronized void invokersChanged() {
refreshInvoker();
invokersChanged = true;
if (invokersChangedListener != null) {
invokersChangedListener.onChange();
invokersChanged = false;
}
}
@Override
public boolean isNotificationReceived() {
return invokersChanged;
}
protected abstract void destroyAllInvokers();
}