blob: 20aee2640761524d1d79072ed94d3d42751b3962 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.router.tag;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.Holder;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode;
import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.rpc.cluster.router.tag.model.TagRouterRule;
import org.apache.dubbo.rpc.cluster.router.tag.model.TagRuleParser;
import java.util.List;
import java.util.function.Predicate;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.TAG_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_TAG_ROUTE_EMPTY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_TAG_ROUTE_INVALID;
import static org.apache.dubbo.rpc.Constants.FORCE_USE_TAG;
/**
* TagRouter, "application.tag-router"
*/
public class TagStateRouter<T> extends AbstractStateRouter<T> implements ConfigurationListener {
public static final String NAME = "TAG_ROUTER";
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(TagStateRouter.class);
private static final String RULE_SUFFIX = ".tag-router";
private TagRouterRule tagRouterRule;
private String application;
public TagStateRouter(URL url) {
super(url);
}
@Override
public synchronized void process(ConfigChangedEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("Notification of tag rule, change type is: " + event.getChangeType() + ", raw rule is:\n " +
event.getContent());
}
try {
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
this.tagRouterRule = null;
} else {
this.tagRouterRule = TagRuleParser.parse(event.getContent());
}
} catch (Exception e) {
logger.error(CLUSTER_TAG_ROUTE_INVALID,"Failed to parse the raw tag router rule","","Failed to parse the raw tag router rule and it will not take effect, please check if the " +
"rule matches with the template, the raw rule is:\n ",e);
}
}
@Override
public BitList<Invoker<T>> doRoute(BitList<Invoker<T>> invokers, URL url, Invocation invocation, boolean needToPrintMessage, Holder<RouterSnapshotNode<T>> nodeHolder, Holder<String> messageHolder) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
if (needToPrintMessage) {
messageHolder.set("Directly Return. Reason: Invokers from previous router is empty.");
}
return invokers;
}
// since the rule can be changed by config center, we should copy one to use.
final TagRouterRule tagRouterRuleCopy = tagRouterRule;
if (tagRouterRuleCopy == null || !tagRouterRuleCopy.isValid() || !tagRouterRuleCopy.isEnabled()) {
if (needToPrintMessage) {
messageHolder.set("Disable Tag Router. Reason: tagRouterRule is invalid or disabled");
}
return filterUsingStaticTag(invokers, url, invocation);
}
BitList<Invoker<T>> result = invokers;
String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
invocation.getAttachment(TAG_KEY);
// if we are requesting for a Provider with a specific tag
if (StringUtils.isNotEmpty(tag)) {
List<String> addresses = tagRouterRuleCopy.getTagnameToAddresses().get(tag);
// filter by dynamic tag group first
if (CollectionUtils.isNotEmpty(addresses)) {
result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
// if result is not null OR it's null but force=true, return result directly
if (CollectionUtils.isNotEmpty(result) || tagRouterRuleCopy.isForce()) {
if (needToPrintMessage) {
messageHolder.set("Use tag " + tag + " to route. Reason: result is not null OR it's null but force=true");
}
return result;
}
} else {
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY)));
}
// If there's no tagged providers that can match the current tagged request. force.tag is set by default
// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
if (needToPrintMessage) {
messageHolder.set("Use tag " + tag + " to route. Reason: result is not empty or ForceUseTag key is true in invocation");
}
return result;
}
// FAILOVER: return all Providers without any tags.
else {
BitList<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(),
tagRouterRuleCopy.getAddresses()));
if (needToPrintMessage) {
messageHolder.set("FAILOVER: return all Providers without any tags");
}
return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
}
} else {
// List<String> addresses = tagRouterRule.filter(providerApp);
// return all addresses in dynamic tag group.
List<String> addresses = tagRouterRuleCopy.getAddresses();
if (CollectionUtils.isNotEmpty(addresses)) {
result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses));
// 1. all addresses are in dynamic tag group, return empty list.
if (CollectionUtils.isEmpty(result)) {
if (needToPrintMessage) {
messageHolder.set("all addresses are in dynamic tag group, return empty list");
}
return result;
}
// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
// static tag group.
}
if (needToPrintMessage) {
messageHolder.set("filter using the static tag group");
}
return filterInvoker(result, invoker -> {
String localTag = invoker.getUrl().getParameter(TAG_KEY);
return StringUtils.isEmpty(localTag) || !tagRouterRuleCopy.getTagNames().contains(localTag);
});
}
}
/**
* If there's no dynamic tag rule being set, use static tag in URL.
* <p>
* A typical scenario is a Consumer using version 2.7.x calls Providers using version 2.6.x or lower,
* the Consumer should always respect the tag in provider URL regardless of whether a dynamic tag rule has been set to it or not.
* <p>
* TODO, to guarantee consistent behavior of interoperability between 2.6- and 2.7+, this method should has the same logic with the TagRouter in 2.6.x.
*
* @param invokers
* @param url
* @param invocation
* @param <T>
* @return
*/
private <T> BitList<Invoker<T>> filterUsingStaticTag(BitList<Invoker<T>> invokers, URL url, Invocation invocation) {
BitList<Invoker<T>> result;
// Dynamic param
String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
invocation.getAttachment(TAG_KEY);
// Tag request
if (!StringUtils.isEmpty(tag)) {
result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY)));
if (CollectionUtils.isEmpty(result) && !isForceUseTag(invocation)) {
result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
}
} else {
result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
}
return result;
}
@Override
public boolean isRuntime() {
return tagRouterRule != null && tagRouterRule.isRuntime();
}
@Override
public boolean isForce() {
// FIXME
return tagRouterRule != null && tagRouterRule.isForce();
}
private boolean isForceUseTag(Invocation invocation) {
return Boolean.parseBoolean(invocation.getAttachment(FORCE_USE_TAG, this.getUrl().getParameter(FORCE_USE_TAG, "false")));
}
private <T> BitList<Invoker<T>> filterInvoker(BitList<Invoker<T>> invokers, Predicate<Invoker<T>> predicate) {
if (invokers.stream().allMatch(predicate)) {
return invokers;
}
BitList<Invoker<T>> newInvokers = invokers.clone();
newInvokers.removeIf(invoker -> !predicate.test(invoker));
return newInvokers;
}
private boolean addressMatches(URL url, List<String> addresses) {
return addresses != null && checkAddressMatch(addresses, url.getHost(), url.getPort());
}
private boolean addressNotMatches(URL url, List<String> addresses) {
return addresses == null || !checkAddressMatch(addresses, url.getHost(), url.getPort());
}
private boolean checkAddressMatch(List<String> addresses, String host, int port) {
for (String address : addresses) {
try {
if (NetUtils.matchIpExpression(address, host, port)) {
return true;
}
if ((ANYHOST_VALUE + ":" + port).equals(address)) {
return true;
}
} catch (Exception e) {
logger.error(CLUSTER_TAG_ROUTE_INVALID,"tag route address is invalid","","The format of ip address is invalid in tag route. Address :" + address,e);
}
}
return false;
}
public void setApplication(String app) {
this.application = app;
}
@Override
public void notify(BitList<Invoker<T>> invokers) {
if (CollectionUtils.isEmpty(invokers)) {
return;
}
Invoker<T> invoker = invokers.get(0);
URL url = invoker.getUrl();
String providerApplication = url.getRemoteApplication();
if (StringUtils.isEmpty(providerApplication)) {
logger.error(CLUSTER_TAG_ROUTE_EMPTY,"tag router get providerApplication is empty","","TagRouter must getConfig from or subscribe to a specific application, but the application " +
"in this TagRouter is not specified.");
return;
}
synchronized (this) {
if (!providerApplication.equals(application)) {
if (StringUtils.isNotEmpty(application)) {
this.getRuleRepository().removeListener(application + RULE_SUFFIX, this);
}
String key = providerApplication + RULE_SUFFIX;
this.getRuleRepository().addListener(key, this);
application = providerApplication;
String rawRule = this.getRuleRepository().getRule(key, DynamicConfiguration.DEFAULT_GROUP);
if (StringUtils.isNotEmpty(rawRule)) {
this.process(new ConfigChangedEvent(key, DynamicConfiguration.DEFAULT_GROUP, rawRule));
}
}
}
}
@Override
public void stop() {
if (StringUtils.isNotEmpty(application)) {
this.getRuleRepository().removeListener(application + RULE_SUFFIX, this);
}
}
}