Add Affinity Router (#14787)
* add affinity router
* fix complie error
* remove comments
* format code
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
index eab16e6..ac8bfcf 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
@@ -46,6 +46,8 @@
String CONDITIONS_KEY = "conditions";
+ String AFFINITY_KEY = "affinityAware";
+
String TAGS_KEY = "tags";
/**
@@ -141,5 +143,10 @@
String RULE_VERSION_V31 = "v3.1";
+ public static final String TRAFFIC_DISABLE_KEY = "trafficDisable";
+ public static final String RATIO_KEY = "ratio";
+ public static final int DefaultRouteRatio = 0;
public static final int DefaultRouteConditionSubSetWeight = 100;
+ public static final int DefaultRoutePriority = 0;
+ public static final double DefaultAffinityRatio = 0;
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouter.java
new file mode 100644
index 0000000..51b5dbb
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.Holder;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode;
+import org.apache.dubbo.rpc.cluster.router.condition.matcher.ConditionMatcher;
+import org.apache.dubbo.rpc.cluster.router.condition.matcher.ConditionMatcherFactory;
+import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_CONDITIONAL_ROUTE_LIST_EMPTY;
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_EXEC_CONDITION_ROUTER;
+import static org.apache.dubbo.rpc.cluster.Constants.AFFINITY_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.DefaultAffinityRatio;
+import static org.apache.dubbo.rpc.cluster.Constants.RATIO_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.RULE_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.RUNTIME_KEY;
+
+/**
+ * # dubbo/config/group/{$name}.affinity-router
+ * configVersion: v3.1
+ * scope: service # Or application
+ * key: service.apache.com
+ * enabled: true
+ * runtime: true
+ * affinityAware:
+ * key: region
+ * ratio: 20
+ */
+public class AffinityStateRouter<T> extends AbstractStateRouter<T> {
+ public static final String NAME = "affinity";
+
+ private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractStateRouter.class);
+
+ protected String affinityKey;
+ protected Double ratio;
+ protected ConditionMatcher matchMatcher;
+ protected List<ConditionMatcherFactory> matcherFactories;
+
+ private final boolean enabled;
+
+ public AffinityStateRouter(URL url) {
+ super(url);
+ this.enabled = url.getParameter(ENABLED_KEY, true);
+ this.affinityKey = url.getParameter(AFFINITY_KEY, "");
+ this.ratio = url.getParameter(RATIO_KEY, DefaultAffinityRatio);
+ this.matcherFactories =
+ moduleModel.getExtensionLoader(ConditionMatcherFactory.class).getActivateExtensions();
+ if (this.enabled) {
+ this.init(affinityKey);
+ }
+ }
+
+ public AffinityStateRouter(URL url, String affinityKey, Double ratio, boolean enabled) {
+ super(url);
+ this.enabled = enabled;
+ this.affinityKey = affinityKey;
+ this.ratio = ratio;
+ matcherFactories =
+ moduleModel.getExtensionLoader(ConditionMatcherFactory.class).getActivateExtensions();
+ if (this.enabled) {
+ this.init(affinityKey);
+ }
+ }
+
+ public void init(String rule) {
+ try {
+ if (rule == null || rule.trim().isEmpty()) {
+ throw new IllegalArgumentException("Illegal affinity rule!");
+ }
+ this.matchMatcher = parseRule(affinityKey);
+ } catch (ParseException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ private ConditionMatcher parseRule(String rule) throws ParseException {
+ ConditionMatcher matcher = getMatcher(rule);
+ // Multiple values
+ Set<String> values = matcher.getMatches();
+ values.add(getUrl().getParameter(rule));
+ return matcher;
+ }
+
+ @Override
+ protected BitList<Invoker<T>> doRoute(
+ BitList<Invoker<T>> invokers,
+ URL url,
+ Invocation invocation,
+ boolean needToPrintMessage,
+ Holder<RouterSnapshotNode<T>> nodeHolder,
+ Holder<String> messageHolder)
+ throws RpcException {
+ if (!enabled) {
+ if (needToPrintMessage) {
+ messageHolder.set("Directly return. Reason: AffinityRouter disabled.");
+ }
+ return invokers;
+ }
+
+ if (CollectionUtils.isEmpty(invokers)) {
+ if (needToPrintMessage) {
+ messageHolder.set("Directly return. Reason: Invokers from previous router is empty.");
+ }
+ return invokers;
+ }
+ try {
+ BitList<Invoker<T>> result = invokers.clone();
+ result.removeIf(invoker -> !matchInvoker(invoker.getUrl(), url));
+
+ if (result.size() / (double) invokers.size() >= ratio / (double) 100) {
+ if (needToPrintMessage) {
+ messageHolder.set("Match return.");
+ }
+ return result;
+ } else {
+ logger.warn(
+ CLUSTER_CONDITIONAL_ROUTE_LIST_EMPTY,
+ "execute affinity state router result is less than defined" + this.ratio,
+ "",
+ "The affinity result is ignored. consumer: " + NetUtils.getLocalHost()
+ + ", service: " + url.getServiceKey() + ", router: "
+ + url.getParameterAndDecoded(RULE_KEY));
+ if (needToPrintMessage) {
+ messageHolder.set("Directly return. Reason: Affinity state router result is less than defined.");
+ }
+ return invokers;
+ }
+ } catch (Throwable t) {
+ logger.error(
+ CLUSTER_FAILED_EXEC_CONDITION_ROUTER,
+ "execute affinity state router exception",
+ "",
+ "Failed to execute affinity router rule: " + getUrl() + ", invokers: " + invokers + ", cause: "
+ + t.getMessage(),
+ t);
+ }
+ if (needToPrintMessage) {
+ messageHolder.set("Directly return. Reason: Error occurred ( or result is empty ).");
+ }
+ return invokers;
+ }
+
+ @Override
+ public boolean isRuntime() {
+ // We always return true for previously defined Router, that is, old Router doesn't support cache anymore.
+ // return true;
+ return this.getUrl().getParameter(RUNTIME_KEY, false);
+ }
+
+ private ConditionMatcher getMatcher(String key) {
+ return moduleModel
+ .getExtensionLoader(ConditionMatcherFactory.class)
+ .getExtension("param")
+ .createMatcher(key, moduleModel);
+ }
+
+ private boolean matchInvoker(URL url, URL param) {
+ return doMatch(url, param, null, matchMatcher);
+ }
+
+ private boolean doMatch(URL url, URL param, Invocation invocation, ConditionMatcher matcher) {
+ Map<String, String> sample = url.toOriginalMap();
+ if (!matcher.isMatch(sample, param, invocation, false)) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouterFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouterFactory.java
new file mode 100644
index 0000000..db96b4f
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouterFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.cluster.router.state.CacheableStateRouterFactory;
+import org.apache.dubbo.rpc.cluster.router.state.StateRouter;
+
+/**
+ * affinity router factory
+ */
+public class AffinityStateRouterFactory extends CacheableStateRouterFactory {
+
+ public static final String NAME = "affinity";
+
+ @Override
+ protected <T> StateRouter<T> createRouter(Class<T> interfaceClass, URL url) {
+ return new AffinityStateRouter<T>(url);
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityListenableStateRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityListenableStateRouter.java
new file mode 100644
index 0000000..f5b543d
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityListenableStateRouter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.Holder;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.router.AbstractRouterRule;
+import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode;
+import org.apache.dubbo.rpc.cluster.router.affinity.AffinityStateRouter;
+import org.apache.dubbo.rpc.cluster.router.affinity.config.model.AffinityRouterRule;
+import org.apache.dubbo.rpc.cluster.router.affinity.config.model.AffinityRuleParser;
+import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+import org.apache.dubbo.rpc.cluster.router.state.TailStateRouter;
+
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_RULE_PARSING;
+
+/**
+ * Abstract router which listens to dynamic configuration
+ */
+public abstract class AffinityListenableStateRouter<T> extends AbstractStateRouter<T> implements ConfigurationListener {
+ public static final String NAME = "Affinity_LISTENABLE_ROUTER";
+ public static final String RULE_SUFFIX = ".affinity-router";
+
+ private static final ErrorTypeAwareLogger logger =
+ LoggerFactory.getErrorTypeAwareLogger(AffinityListenableStateRouter.class);
+ private volatile AffinityRouterRule affinityRouterRule;
+ private volatile AffinityStateRouter<T> affinityRouter;
+ private final String ruleKey;
+
+ public AffinityListenableStateRouter(URL url, String ruleKey) {
+ super(url);
+ this.setForce(false);
+ this.init(ruleKey);
+ this.ruleKey = ruleKey;
+ }
+
+ @Override
+ public synchronized void process(ConfigChangedEvent event) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Notification of affinity rule, change type is: " + event.getChangeType() + ", raw rule is:\n "
+ + event.getContent());
+ }
+
+ if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
+ affinityRouterRule = null;
+ affinityRouter = null;
+ } else {
+ try {
+ affinityRouterRule = AffinityRuleParser.parse(event.getContent());
+ generateConditions(affinityRouterRule);
+ } catch (Exception e) {
+ logger.error(
+ CLUSTER_FAILED_RULE_PARSING,
+ "Failed to parse the raw affinity rule",
+ "",
+ "Failed to parse the raw affinity rule and it will not take effect, please check "
+ + "if the affinity rule matches with the template, the raw rule is:\n "
+ + event.getContent(),
+ e);
+ }
+ }
+ }
+
+ @Override
+ public BitList<Invoker<T>> doRoute(
+ BitList<Invoker<T>> invokers,
+ URL url,
+ Invocation invocation,
+ boolean needToPrintMessage,
+ Holder<RouterSnapshotNode<T>> nodeHolder,
+ Holder<String> messageHolder)
+ throws RpcException {
+ if (CollectionUtils.isEmpty(invokers) || affinityRouter == null) {
+ if (needToPrintMessage) {
+ messageHolder.set(
+ "Directly return. Reason: Invokers from previous router is empty or affinityRouter is null.");
+ }
+ return invokers;
+ }
+
+ // We will check enabled status inside each router.
+ StringBuilder resultMessage = null;
+ if (needToPrintMessage) {
+ resultMessage = new StringBuilder();
+ }
+ invokers = affinityRouter.route(invokers, url, invocation, needToPrintMessage, nodeHolder);
+ if (needToPrintMessage) {
+ resultMessage.append(messageHolder.get());
+ }
+
+ if (needToPrintMessage) {
+ messageHolder.set(resultMessage.toString());
+ }
+
+ return invokers;
+ }
+
+ @Override
+ public boolean isForce() {
+ return (affinityRouterRule != null && affinityRouterRule.isForce());
+ }
+
+ private boolean isRuleRuntime() {
+ return affinityRouterRule != null && affinityRouterRule.isValid() && affinityRouterRule.isRuntime();
+ }
+
+ private void generateConditions(AbstractRouterRule rule) {
+ if (rule == null || !rule.isValid()) {
+ return;
+ }
+ AffinityRouterRule affinityRule = (AffinityRouterRule) rule;
+ affinityRouter = new AffinityStateRouter<>(
+ getUrl(), affinityRule.getAffinityKey(), affinityRule.getRatio(), affinityRule.isEnabled());
+ affinityRouter.setNextRouter(TailStateRouter.getInstance());
+ }
+
+ private synchronized void init(String ruleKey) {
+ if (StringUtils.isEmpty(ruleKey)) {
+ return;
+ }
+ String routerKey = ruleKey + RULE_SUFFIX;
+ this.getRuleRepository().addListener(routerKey, this);
+ String rule = this.getRuleRepository().getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP);
+ if (StringUtils.isNotEmpty(rule)) {
+ this.process(new ConfigChangedEvent(routerKey, DynamicConfiguration.DEFAULT_GROUP, rule));
+ }
+ }
+
+ public AffinityStateRouter<T> getAffinityRouter() {
+ return affinityRouter;
+ }
+
+ @Override
+ public void stop() {
+ this.getRuleRepository().removeListener(ruleKey + RULE_SUFFIX, this);
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouter.java
new file mode 100644
index 0000000..8690ab8
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.router.condition.config.ListenableStateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_TAG_ROUTE_EMPTY;
+import static org.apache.dubbo.common.utils.StringUtils.isEmpty;
+
+/**
+ * Application level affinity router, "application.affinity-router"
+ */
+public class AffinityProviderAppStateRouter<T> extends ListenableStateRouter<T> {
+ private static final ErrorTypeAwareLogger logger =
+ LoggerFactory.getErrorTypeAwareLogger(ListenableStateRouter.class);
+ public static final String NAME = "AFFINITY_PROVIDER_APP_ROUTER";
+ private String application;
+ private final String currentApplication;
+
+ public AffinityProviderAppStateRouter(URL url) {
+ super(url, url.getApplication());
+ this.currentApplication = url.getApplication();
+ }
+
+ @Override
+ public void notify(BitList<Invoker<T>> invokers) {
+ if (CollectionUtils.isEmpty(invokers)) {
+ return;
+ }
+
+ Invoker<T> invoker = invokers.get(0);
+ URL url = invoker.getUrl();
+ String providerApplication = url.getRemoteApplication();
+
+ // provider application is empty or equals with the current application
+ if (isEmpty(providerApplication)) {
+ logger.warn(
+ CLUSTER_TAG_ROUTE_EMPTY,
+ "affinity router get providerApplication is empty, will not subscribe to provider app rules.",
+ "",
+ "");
+ return;
+ }
+ if (providerApplication.equals(currentApplication)) {
+ return;
+ }
+
+ synchronized (this) {
+ if (!providerApplication.equals(application)) {
+ if (StringUtils.isNotEmpty(application)) {
+ this.getRuleRepository().removeListener(application + RULE_SUFFIX, this);
+ }
+ String key = providerApplication + RULE_SUFFIX;
+ this.getRuleRepository().addListener(key, this);
+ application = providerApplication;
+ String rawRule = this.getRuleRepository().getRule(key, DynamicConfiguration.DEFAULT_GROUP);
+ if (StringUtils.isNotEmpty(rawRule)) {
+ this.process(new ConfigChangedEvent(key, DynamicConfiguration.DEFAULT_GROUP, rawRule));
+ }
+ }
+ }
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouterFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouterFactory.java
new file mode 100644
index 0000000..a4a0e82
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouterFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.cluster.router.state.CacheableStateRouterFactory;
+import org.apache.dubbo.rpc.cluster.router.state.StateRouter;
+
+/**
+ * AffinityProvider router factory
+ */
+@Activate(order = 135)
+public class AffinityProviderAppStateRouterFactory extends CacheableStateRouterFactory {
+
+ public static final String NAME = "affinity-provider-app";
+
+ @Override
+ protected <T> StateRouter<T> createRouter(Class<T> interfaceClass, URL url) {
+ return new AffinityProviderAppStateRouter<>(url);
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouter.java
new file mode 100644
index 0000000..d15551e
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+
+/**
+ * Service level router, "server-unique-name.affinity-router"
+ */
+public class AffinityServiceStateRouter<T> extends AffinityListenableStateRouter<T> {
+ public static final String NAME = "AFFINITY_SERVICE_ROUTER";
+
+ public AffinityServiceStateRouter(URL url) {
+ super(url, DynamicConfiguration.getRuleKey(url));
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouterFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouterFactory.java
new file mode 100644
index 0000000..e36a417
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouterFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.cluster.router.state.CacheableStateRouterFactory;
+import org.apache.dubbo.rpc.cluster.router.state.StateRouter;
+
+/**
+ * Service level affinity router factory
+ */
+@Activate(order = 130)
+public class AffinityServiceStateRouterFactory extends CacheableStateRouterFactory {
+
+ public static final String NAME = "affinity_service";
+
+ @Override
+ protected <T> StateRouter<T> createRouter(Class<T> interfaceClass, URL url) {
+ return new AffinityServiceStateRouter<T>(url);
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRouterRule.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRouterRule.java
new file mode 100644
index 0000000..a095692
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRouterRule.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config.model;
+
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.cluster.router.AbstractRouterRule;
+
+import java.util.Map;
+
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_RULE_PARSING;
+import static org.apache.dubbo.rpc.cluster.Constants.AFFINITY_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.DefaultAffinityRatio;
+
+public class AffinityRouterRule extends AbstractRouterRule {
+
+ private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AffinityRouterRule.class);
+ private String affinityKey;
+ private Double ratio;
+
+ @SuppressWarnings("unchecked")
+ public static AffinityRouterRule parseFromMap(Map<String, Object> map) {
+ AffinityRouterRule affinityRouterRule = new AffinityRouterRule();
+ affinityRouterRule.parseFromMap0(map);
+ Object conditions = map.get(AFFINITY_KEY);
+
+ Map<String, String> conditionMap = (Map<String, String>) conditions;
+ affinityRouterRule.setAffinityKey(conditionMap.get("key"));
+ Object ratio = conditionMap.getOrDefault("ratio", String.valueOf(DefaultAffinityRatio));
+ affinityRouterRule.setRatio(Double.valueOf(String.valueOf(ratio)));
+
+ if (affinityRouterRule.getRatio() > 100 || affinityRouterRule.getRatio() < 0) {
+ logger.error(
+ CLUSTER_FAILED_RULE_PARSING,
+ "Invalid affinity router config.",
+ "",
+ "The ratio value must range from 0 to 100");
+ affinityRouterRule.setValid(false);
+ }
+ return affinityRouterRule;
+ }
+
+ public AffinityRouterRule() {}
+
+ public String getAffinityKey() {
+ return affinityKey;
+ }
+
+ public void setAffinityKey(String affinityKey) {
+ this.affinityKey = affinityKey;
+ }
+
+ public Double getRatio() {
+ return ratio;
+ }
+
+ public void setRatio(Double ratio) {
+ this.ratio = ratio;
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRuleParser.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRuleParser.java
new file mode 100644
index 0000000..23d86fb
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRuleParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config.model;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+
+import org.yaml.snakeyaml.LoaderOptions;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+import static org.apache.dubbo.rpc.cluster.Constants.CONFIG_VERSION_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.RULE_VERSION_V31;
+
+/**
+ * # dubbo/config/group/{$name}.affinity-router
+ * configVersion: v3.1
+ * scope: service # Or application
+ * key: service.apache.com
+ * enabled: true
+ * runtime: true
+ * affinityAware:
+ * key: region
+ * ratio: 20
+ */
+public class AffinityRuleParser {
+
+ public static AffinityRouterRule parse(String rawRule) {
+ AffinityRouterRule rule;
+ Yaml yaml = new Yaml(new SafeConstructor(new LoaderOptions()));
+ Map<String, Object> map = yaml.load(rawRule);
+ String confVersion = (String) map.get(CONFIG_VERSION_KEY);
+
+ rule = AffinityRouterRule.parseFromMap(map);
+ if (StringUtils.isEmpty(rule.getAffinityKey()) || !confVersion.startsWith(RULE_VERSION_V31)) {
+ rule.setValid(false);
+ }
+ rule.setRawRule(rawRule);
+
+ return rule;
+ }
+}
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityRouteTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityRouteTest.java
new file mode 100644
index 0000000..c2a3607
--- /dev/null
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityRouteTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.utils.Holder;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.cluster.router.MockInvoker;
+import org.apache.dubbo.rpc.cluster.router.affinity.config.AffinityServiceStateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class AffinityRouteTest {
+
+ private static BitList<Invoker<String>> invokers;
+
+ private static List<String> providerUrls;
+
+ @BeforeAll
+ public static void setUp() {
+
+ providerUrls = Arrays.asList(
+ "dubbo://127.0.0.1/com.foo.BarService",
+ "dubbo://127.0.0.1/com.foo.BarService",
+ "dubbo://127.0.0.1/com.foo.BarService?env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService",
+ "dubbo://dubbo.apache.org/com.foo.BarService",
+ "dubbo://dubbo.apache.org/com.foo.BarService?env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal");
+
+ List<Invoker<String>> invokerList = providerUrls.stream()
+ .map(url -> new MockInvoker<String>(URL.valueOf(url)))
+ .collect(Collectors.toList());
+
+ invokers = new BitList<>(invokerList);
+ }
+
+ public List<String> filtrate(List<String> invokers, String key) {
+
+ return invokers.stream().filter(invoker -> invoker.contains(key)).collect(Collectors.toList());
+ }
+
+ @Test
+ void testMetAffinityRoute() {
+ String config = "configVersion: v3.1\n"
+ + "scope: service\n"
+ + "key: service.apache.com\n"
+ + "enabled: true\n"
+ + "runtime: true\n"
+ + "affinityAware:\n"
+ + " key: region\n"
+ + " ratio: 20\n";
+
+ AffinityServiceStateRouter<String> affinityRoute = new AffinityServiceStateRouter<>(
+ URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"));
+
+ affinityRoute.process(new ConfigChangedEvent("com.foo.BarService", "", config, ConfigChangeType.ADDED));
+
+ RpcInvocation invocation = new RpcInvocation();
+ invocation.setMethodName("getComment");
+
+ BitList<Invoker<String>> res = affinityRoute.route(
+ invokers.clone(),
+ URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation,
+ false,
+ new Holder<>());
+ List<String> filtered = filtrate(new ArrayList<String>(providerUrls), "region=beijing");
+
+ assertEquals(filtered.size(), res.size());
+ System.out.println("The affinity routing condition is met and the result is routed");
+ }
+
+ @Test
+ void testUnMetAffinityRoute() {
+ String config = "configVersion: v3.1\n"
+ + "scope: service\n"
+ + "key: service.apache.com\n"
+ + "enabled: true\n"
+ + "runtime: true\n"
+ + "affinityAware:\n"
+ + " key: region\n"
+ + " ratio: 80\n";
+
+ AffinityServiceStateRouter<String> affinityRoute = new AffinityServiceStateRouter<>(
+ URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"));
+
+ affinityRoute.process(new ConfigChangedEvent("com.foo.BarService", "", config, ConfigChangeType.ADDED));
+
+ RpcInvocation invocation = new RpcInvocation();
+ invocation.setMethodName("getComment");
+
+ BitList<Invoker<String>> res = affinityRoute.route(
+ invokers.clone(),
+ URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation,
+ false,
+ new Holder<>());
+ List<String> filtered = filtrate(new ArrayList<String>(providerUrls), "region=beijing");
+
+ assertEquals(invokers.size(), res.size());
+ System.out.println("The affinity routing condition was not met and the result was not routed");
+ }
+
+ @Test
+ void testRatioEqualsAffinityRoute() {
+ String config = "configVersion: v3.1\n"
+ + "scope: service\n"
+ + "key: service.apache.com\n"
+ + "enabled: true\n"
+ + "runtime: true\n"
+ + "affinityAware:\n"
+ + " key: region\n"
+ + " ratio: 40\n";
+
+ AffinityServiceStateRouter<String> affinityRoute = new AffinityServiceStateRouter<>(
+ URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"));
+
+ affinityRoute.process(new ConfigChangedEvent("com.foo.BarService", "", config, ConfigChangeType.ADDED));
+
+ RpcInvocation invocation = new RpcInvocation();
+ invocation.setMethodName("getComment");
+
+ BitList<Invoker<String>> res = affinityRoute.route(
+ invokers.clone(),
+ URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation,
+ false,
+ new Holder<>());
+ List<String> filtered = filtrate(new ArrayList<String>(providerUrls), "region=beijing");
+
+ assertEquals(filtered.size(), res.size());
+ System.out.println("The affinity routing condition is met and the result is routed");
+ }
+
+ @Test
+ void testRatioNotEqualsAffinityRoute() {
+ String config = "configVersion: v3.1\n"
+ + "scope: service\n"
+ + "key: service.apache.com\n"
+ + "enabled: true\n"
+ + "runtime: true\n"
+ + "affinityAware:\n"
+ + " key: region\n"
+ + " ratio: 40.1\n";
+
+ AffinityServiceStateRouter<String> affinityRoute = new AffinityServiceStateRouter<>(
+ URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"));
+
+ affinityRoute.process(new ConfigChangedEvent("com.foo.BarService", "", config, ConfigChangeType.ADDED));
+
+ RpcInvocation invocation = new RpcInvocation();
+ invocation.setMethodName("getComment");
+
+ BitList<Invoker<String>> res = affinityRoute.route(
+ invokers.clone(),
+ URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation,
+ false,
+ new Holder<>());
+ List<String> filtered = filtrate(new ArrayList<String>(providerUrls), "region=beijing");
+
+ assertEquals(invokers.size(), res.size());
+ System.out.println("The affinity routing condition was not met and the result was not routed");
+ }
+}