| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.registry.integration; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.Version; |
| import org.apache.dubbo.common.extension.ExtensionLoader; |
| import org.apache.dubbo.common.logger.Logger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.utils.NetUtils; |
| import org.apache.dubbo.registry.NotifyListener; |
| import org.apache.dubbo.registry.Registry; |
| import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; |
| import org.apache.dubbo.rpc.Invocation; |
| import org.apache.dubbo.rpc.Invoker; |
| import org.apache.dubbo.rpc.Protocol; |
| import org.apache.dubbo.rpc.RpcException; |
| import org.apache.dubbo.rpc.cluster.Cluster; |
| import org.apache.dubbo.rpc.cluster.Configurator; |
| import org.apache.dubbo.rpc.cluster.RouterChain; |
| import org.apache.dubbo.rpc.cluster.RouterFactory; |
| import org.apache.dubbo.rpc.cluster.directory.AbstractDirectory; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE; |
| import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY; |
| import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY; |
| import static org.apache.dubbo.registry.Constants.REGISTER_KEY; |
| import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY; |
| import static org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS; |
| import static org.apache.dubbo.remoting.Constants.CHECK_KEY; |
| |
| |
| /** |
| * RegistryDirectory |
| */ |
| public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implements NotifyListener { |
| |
| private static final Logger logger = LoggerFactory.getLogger(DynamicDirectory.class); |
| |
| protected static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension(); |
| |
| protected static final RouterFactory ROUTER_FACTORY = ExtensionLoader.getExtensionLoader(RouterFactory.class) |
| .getAdaptiveExtension(); |
| |
| protected final String serviceKey; // Initialization at construction time, assertion not null |
| protected final Class<T> serviceType; // Initialization at construction time, assertion not null |
| protected final boolean multiGroup; |
| protected Protocol protocol; // Initialization at the time of injection, the assertion is not null |
| protected Registry registry; // Initialization at the time of injection, the assertion is not null |
| protected volatile boolean forbidden = false; |
| protected boolean shouldRegister; |
| protected boolean shouldSimplified; |
| |
| protected volatile URL overrideConsumerUrl; // Initialization at construction time, assertion not null, and always assign non null value |
| |
| protected volatile URL registeredConsumerUrl; |
| |
| /** |
| * override rules |
| * Priority: override>-D>consumer>provider |
| * Rule one: for a certain provider <ip:port,timeout=100> |
| * Rule two: for all providers <* ,timeout=5000> |
| */ |
| protected volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference |
| |
| // Map<url, Invoker> cache service url to invoker mapping. |
| protected volatile Map<URL, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference |
| protected volatile List<Invoker<T>> invokers; |
| |
| // Set<invokerUrls> cache invokeUrls to invokers mapping. |
| protected volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference |
| |
| protected ServiceInstancesChangedListener serviceListener; |
| |
| public DynamicDirectory(Class<T> serviceType, URL url) { |
| super(url); |
| if (serviceType == null) { |
| throw new IllegalArgumentException("service type is null."); |
| } |
| |
| shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true); |
| shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false); |
| if (url.getServiceKey() == null || url.getServiceKey().length() == 0) { |
| throw new IllegalArgumentException("registry serviceKey is null."); |
| } |
| this.serviceType = serviceType; |
| this.serviceKey = super.getConsumerUrl().getServiceKey(); |
| |
| String group = queryMap.get(GROUP_KEY) != null ? queryMap.get(GROUP_KEY) : ""; |
| this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(",")); |
| } |
| |
| @Override |
| public void addServiceListener(ServiceInstancesChangedListener instanceListener) { |
| this.serviceListener = instanceListener; |
| } |
| |
| public void setProtocol(Protocol protocol) { |
| this.protocol = protocol; |
| } |
| |
| public void setRegistry(Registry registry) { |
| this.registry = registry; |
| } |
| |
| public Registry getRegistry() { |
| return registry; |
| } |
| |
| public boolean isShouldRegister() { |
| return shouldRegister; |
| } |
| |
| public void subscribe(URL url) { |
| setConsumerUrl(url); |
| registry.subscribe(url, this); |
| } |
| |
| public void unSubscribe(URL url) { |
| setConsumerUrl(null); |
| registry.unsubscribe(url, this); |
| } |
| |
| @Override |
| public List<Invoker<T>> doList(Invocation invocation) { |
| if (forbidden) { |
| // 1. No service provider 2. Service providers are disabled |
| throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + |
| getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + |
| NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + |
| ", please check status of providers(disabled, not registered or in blacklist)."); |
| } |
| |
| if (multiGroup) { |
| return this.invokers == null ? Collections.emptyList() : this.invokers; |
| } |
| |
| List<Invoker<T>> invokers = null; |
| try { |
| // Get invokers from cache, only runtime routers will be executed. |
| invokers = routerChain.route(getConsumerUrl(), invocation); |
| } catch (Throwable t) { |
| logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); |
| } |
| |
| return invokers == null ? Collections.emptyList() : invokers; |
| } |
| |
| @Override |
| public Class<T> getInterface() { |
| return serviceType; |
| } |
| |
| @Override |
| public List<Invoker<T>> getAllInvokers() { |
| return invokers; |
| } |
| |
| public URL getRegisteredConsumerUrl() { |
| return registeredConsumerUrl; |
| } |
| |
| public void setRegisteredConsumerUrl(URL url) { |
| if (!shouldSimplified) { |
| this.registeredConsumerUrl = url.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY, |
| String.valueOf(false)); |
| } else { |
| this.registeredConsumerUrl = URL.valueOf(url, DEFAULT_REGISTER_CONSUMER_KEYS, null).addParameters( |
| CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY, String.valueOf(false)); |
| } |
| } |
| |
| @Override |
| public boolean isAvailable() { |
| if (isDestroyed()) { |
| return false; |
| } |
| Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap; |
| if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) { |
| for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) { |
| if (invoker.isAvailable()) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| public void buildRouterChain(URL url) { |
| this.setRouterChain(RouterChain.buildChain(url)); |
| } |
| |
| /** |
| * Haomin: added for test purpose |
| */ |
| public Map<URL, Invoker<T>> getUrlInvokerMap() { |
| return urlInvokerMap; |
| } |
| |
| public List<Invoker<T>> getInvokers() { |
| return invokers; |
| } |
| |
| @Override |
| public void setConsumerUrl(URL consumerUrl) { |
| this.consumerUrl = consumerUrl; |
| this.overrideConsumerUrl = consumerUrl; |
| } |
| |
| } |