blob: e043e2d3d741a4ae82bdad510360f297ca618314 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.router.xds;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.Holder;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.xds.util.PilotExchanger;
import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode;
import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.rpc.cluster.router.xds.rule.ClusterWeight;
import org.apache.dubbo.rpc.cluster.router.xds.rule.DestinationSubset;
import org.apache.dubbo.rpc.cluster.router.xds.rule.HTTPRouteDestination;
import org.apache.dubbo.rpc.cluster.router.xds.rule.HeaderMatcher;
import org.apache.dubbo.rpc.cluster.router.xds.rule.HttpRequestMatch;
import org.apache.dubbo.rpc.cluster.router.xds.rule.PathMatcher;
import org.apache.dubbo.rpc.cluster.router.xds.rule.XdsRouteRule;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
public class XdsRouter<T> extends AbstractStateRouter<T> implements XdsRouteRuleListener, EdsEndpointListener {
private Set<String> subscribeApplications;
private final ConcurrentHashMap<String, List<XdsRouteRule>> xdsRouteRuleMap;
private final ConcurrentHashMap<String, DestinationSubset<T>> destinationSubsetMap;
private final RdsRouteRuleManager rdsRouteRuleManager;
private final EdsEndpointManager edsEndpointManager;
private volatile BitList<Invoker<T>> currentInvokeList;
private static final String BINARY_HEADER_SUFFIX = "-bin";
private final boolean isEnable;
public XdsRouter(URL url) {
super(url);
isEnable = PilotExchanger.isEnabled();
rdsRouteRuleManager =
url.getOrDefaultApplicationModel().getBeanFactory().getBean(RdsRouteRuleManager.class);
edsEndpointManager = url.getOrDefaultApplicationModel().getBeanFactory().getBean(EdsEndpointManager.class);
subscribeApplications = new ConcurrentHashSet<>();
destinationSubsetMap = new ConcurrentHashMap<>();
xdsRouteRuleMap = new ConcurrentHashMap<>();
currentInvokeList = new BitList<>(new ArrayList<>());
}
/**
* @deprecated only for uts
*/
protected XdsRouter(
URL url, RdsRouteRuleManager rdsRouteRuleManager, EdsEndpointManager edsEndpointManager, boolean isEnable) {
super(url);
this.isEnable = isEnable;
this.rdsRouteRuleManager = rdsRouteRuleManager;
this.edsEndpointManager = edsEndpointManager;
subscribeApplications = new ConcurrentHashSet<>();
destinationSubsetMap = new ConcurrentHashMap<>();
xdsRouteRuleMap = new ConcurrentHashMap<>();
currentInvokeList = new BitList<>(new ArrayList<>());
}
@Override
protected BitList<Invoker<T>> doRoute(
BitList<Invoker<T>> invokers,
URL url,
Invocation invocation,
boolean needToPrintMessage,
Holder<RouterSnapshotNode<T>> nodeHolder,
Holder<String> messageHolder)
throws RpcException {
if (!isEnable) {
if (needToPrintMessage) {
messageHolder.set(
"Directly Return. Reason: Pilot exchanger has not been initialized, may not in mesh mode.");
}
return invokers;
}
if (CollectionUtils.isEmpty(invokers)) {
if (needToPrintMessage) {
messageHolder.set("Directly Return. Reason: Invokers from previous router is empty.");
}
return invokers;
}
if (CollectionUtils.isEmptyMap(xdsRouteRuleMap)) {
if (needToPrintMessage) {
messageHolder.set("Directly Return. Reason: xds route rule is empty.");
}
return invokers;
}
StringBuilder stringBuilder = needToPrintMessage ? new StringBuilder() : null;
// find match cluster
String matchCluster = null;
Set<String> appNames = subscribeApplications;
for (String subscribeApplication : appNames) {
List<XdsRouteRule> rules = xdsRouteRuleMap.get(subscribeApplication);
if (CollectionUtils.isEmpty(rules)) {
continue;
}
for (XdsRouteRule rule : rules) {
String cluster = computeMatchCluster(invocation, rule);
if (cluster != null) {
matchCluster = cluster;
break;
}
}
if (matchCluster != null) {
if (stringBuilder != null) {
stringBuilder
.append("Match App: ")
.append(subscribeApplication)
.append(" Cluster: ")
.append(matchCluster)
.append(' ');
}
break;
}
}
// not match request just return
if (matchCluster == null) {
if (needToPrintMessage) {
messageHolder.set("Directly Return. Reason: xds rule not match.");
}
return invokers;
}
DestinationSubset<T> destinationSubset = destinationSubsetMap.get(matchCluster);
// cluster no target provider
if (destinationSubset == null) {
if (needToPrintMessage) {
messageHolder.set(stringBuilder.append("no target subset").toString());
}
return BitList.emptyList();
}
if (needToPrintMessage) {
messageHolder.set(stringBuilder.toString());
}
if (destinationSubset.getInvokers() == null) {
return BitList.emptyList();
}
return destinationSubset.getInvokers().and(invokers);
}
private String computeMatchCluster(Invocation invocation, XdsRouteRule rule) {
// compute request match cluster
HttpRequestMatch requestMatch = rule.getMatch();
if (requestMatch.getPathMatcher() == null && CollectionUtils.isEmpty(requestMatch.getHeaderMatcherList())) {
return null;
}
PathMatcher pathMatcher = requestMatch.getPathMatcher();
if (pathMatcher != null) {
String path = "/" + invocation.getInvoker().getUrl().getPath() + "/" + RpcUtils.getMethodName(invocation);
if (!pathMatcher.isMatch(path)) {
return null;
}
}
List<HeaderMatcher> headerMatchers = requestMatch.getHeaderMatcherList();
for (HeaderMatcher headerMatcher : headerMatchers) {
String headerName = headerMatcher.getName();
// not support byte
if (headerName.endsWith(BINARY_HEADER_SUFFIX)) {
return null;
}
String headValue = invocation.getAttachment(headerName);
if (!headerMatcher.match(headValue)) {
return null;
}
}
HTTPRouteDestination route = rule.getRoute();
if (route.getCluster() != null) {
return route.getCluster();
}
return computeWeightCluster(route.getWeightedClusters());
}
private String computeWeightCluster(List<ClusterWeight> weightedClusters) {
int totalWeight = Math.max(
weightedClusters.stream().mapToInt(ClusterWeight::getWeight).sum(), 1);
// target must greater than 0
// if weight is 0, the destination will not receive any traffic.
int target = ThreadLocalRandom.current().nextInt(1, totalWeight + 1);
for (ClusterWeight weightedCluster : weightedClusters) {
int weight = weightedCluster.getWeight();
target -= weight;
if (target <= 0) {
return weightedCluster.getName();
}
}
return null;
}
public void notify(BitList<Invoker<T>> invokers) {
BitList<Invoker<T>> invokerList = invokers == null ? BitList.emptyList() : invokers;
currentInvokeList = invokerList.clone();
// compute need subscribe/unsubscribe rds application
Set<String> currentApplications = new HashSet<>();
for (Invoker<T> invoker : invokerList) {
String applicationName = invoker.getUrl().getRemoteApplication();
if (StringUtils.isNotEmpty(applicationName)) {
currentApplications.add(applicationName);
}
}
if (!subscribeApplications.equals(currentApplications)) {
synchronized (this) {
for (String currentApplication : currentApplications) {
if (!subscribeApplications.contains(currentApplication)) {
rdsRouteRuleManager.subscribeRds(currentApplication, this);
}
}
for (String preApplication : subscribeApplications) {
if (!currentApplications.contains(preApplication)) {
rdsRouteRuleManager.unSubscribeRds(preApplication, this);
}
}
subscribeApplications = currentApplications;
}
}
// update subset
synchronized (this) {
BitList<Invoker<T>> allInvokers = currentInvokeList.clone();
for (DestinationSubset<T> subset : destinationSubsetMap.values()) {
computeSubset(subset, allInvokers);
}
}
}
private void computeSubset(DestinationSubset<T> subset, BitList<Invoker<T>> invokers) {
Set<Endpoint> endpoints = subset.getEndpoints();
List<Invoker<T>> filterInvokers = invokers.stream()
.filter(inv -> {
String host = inv.getUrl().getHost();
int port = inv.getUrl().getPort();
Optional<Endpoint> any = endpoints.stream()
.filter(end -> host.equals(end.getAddress()) && port == end.getPortValue())
.findAny();
return any.isPresent();
})
.collect(Collectors.toList());
subset.setInvokers(new BitList<>(filterInvokers));
}
@Override
public synchronized void onRuleChange(String appName, List<XdsRouteRule> xdsRouteRules) {
if (CollectionUtils.isEmpty(xdsRouteRules)) {
clearRule(appName);
return;
}
Set<String> oldCluster = getAllCluster();
xdsRouteRuleMap.put(appName, xdsRouteRules);
Set<String> newCluster = getAllCluster();
changeClusterSubscribe(oldCluster, newCluster);
}
private Set<String> getAllCluster() {
if (CollectionUtils.isEmptyMap(xdsRouteRuleMap)) {
return new HashSet<>();
}
Set<String> clusters = new HashSet<>();
xdsRouteRuleMap.forEach((appName, rules) -> {
for (XdsRouteRule rule : rules) {
HTTPRouteDestination action = rule.getRoute();
if (action.getCluster() != null) {
clusters.add(action.getCluster());
} else if (CollectionUtils.isNotEmpty(action.getWeightedClusters())) {
for (ClusterWeight weightedCluster : action.getWeightedClusters()) {
clusters.add(weightedCluster.getName());
}
}
}
});
return clusters;
}
private void changeClusterSubscribe(Set<String> oldCluster, Set<String> newCluster) {
Set<String> removeSubscribe = new HashSet<>(oldCluster);
Set<String> addSubscribe = new HashSet<>(newCluster);
removeSubscribe.removeAll(newCluster);
addSubscribe.removeAll(oldCluster);
// remove subscribe cluster
for (String cluster : removeSubscribe) {
edsEndpointManager.unSubscribeEds(cluster, this);
destinationSubsetMap.remove(cluster);
}
// add subscribe cluster
for (String cluster : addSubscribe) {
destinationSubsetMap.put(cluster, new DestinationSubset<>(cluster));
edsEndpointManager.subscribeEds(cluster, this);
}
}
@Override
public synchronized void clearRule(String appName) {
Set<String> oldCluster = getAllCluster();
List<XdsRouteRule> oldRules = xdsRouteRuleMap.remove(appName);
if (CollectionUtils.isEmpty(oldRules)) {
return;
}
Set<String> newCluster = getAllCluster();
changeClusterSubscribe(oldCluster, newCluster);
}
@Override
public synchronized void onEndPointChange(String cluster, Set<Endpoint> endpoints) {
// find and update subset
DestinationSubset<T> subset = destinationSubsetMap.get(cluster);
if (subset == null) {
return;
}
subset.setEndpoints(endpoints);
computeSubset(subset, currentInvokeList.clone());
}
@Override
public void stop() {
for (String app : subscribeApplications) {
rdsRouteRuleManager.unSubscribeRds(app, this);
}
for (String cluster : getAllCluster()) {
edsEndpointManager.unSubscribeEds(cluster, this);
}
}
@Deprecated
Set<String> getSubscribeApplications() {
return subscribeApplications;
}
/**
* for ut only
*/
@Deprecated
BitList<Invoker<T>> getInvokerList() {
return currentInvokeList;
}
/**
* for ut only
*/
@Deprecated
ConcurrentHashMap<String, List<XdsRouteRule>> getXdsRouteRuleMap() {
return xdsRouteRuleMap;
}
/**
* for ut only
*/
@Deprecated
ConcurrentHashMap<String, DestinationSubset<T>> getDestinationSubsetMap() {
return destinationSubsetMap;
}
}