Add  Affinity Router (#14787)

* add affinity router

* fix complie error

* remove comments

* format code
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
index eab16e6..ac8bfcf 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
@@ -46,6 +46,8 @@
 
     String CONDITIONS_KEY = "conditions";
 
+    String AFFINITY_KEY = "affinityAware";
+
     String TAGS_KEY = "tags";
 
     /**
@@ -141,5 +143,10 @@
 
     String RULE_VERSION_V31 = "v3.1";
 
+    public static final String TRAFFIC_DISABLE_KEY = "trafficDisable";
+    public static final String RATIO_KEY = "ratio";
+    public static final int DefaultRouteRatio = 0;
     public static final int DefaultRouteConditionSubSetWeight = 100;
+    public static final int DefaultRoutePriority = 0;
+    public static final double DefaultAffinityRatio = 0;
 }
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouter.java
new file mode 100644
index 0000000..51b5dbb
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.Holder;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode;
+import org.apache.dubbo.rpc.cluster.router.condition.matcher.ConditionMatcher;
+import org.apache.dubbo.rpc.cluster.router.condition.matcher.ConditionMatcherFactory;
+import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_CONDITIONAL_ROUTE_LIST_EMPTY;
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_EXEC_CONDITION_ROUTER;
+import static org.apache.dubbo.rpc.cluster.Constants.AFFINITY_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.DefaultAffinityRatio;
+import static org.apache.dubbo.rpc.cluster.Constants.RATIO_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.RULE_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.RUNTIME_KEY;
+
+/**
+ * # dubbo/config/group/{$name}.affinity-router
+ * configVersion: v3.1
+ * scope: service # Or application
+ * key: service.apache.com
+ * enabled: true
+ * runtime: true
+ * affinityAware:
+ *   key: region
+ *   ratio: 20
+ */
+public class AffinityStateRouter<T> extends AbstractStateRouter<T> {
+    public static final String NAME = "affinity";
+
+    private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractStateRouter.class);
+
+    protected String affinityKey;
+    protected Double ratio;
+    protected ConditionMatcher matchMatcher;
+    protected List<ConditionMatcherFactory> matcherFactories;
+
+    private final boolean enabled;
+
+    public AffinityStateRouter(URL url) {
+        super(url);
+        this.enabled = url.getParameter(ENABLED_KEY, true);
+        this.affinityKey = url.getParameter(AFFINITY_KEY, "");
+        this.ratio = url.getParameter(RATIO_KEY, DefaultAffinityRatio);
+        this.matcherFactories =
+                moduleModel.getExtensionLoader(ConditionMatcherFactory.class).getActivateExtensions();
+        if (this.enabled) {
+            this.init(affinityKey);
+        }
+    }
+
+    public AffinityStateRouter(URL url, String affinityKey, Double ratio, boolean enabled) {
+        super(url);
+        this.enabled = enabled;
+        this.affinityKey = affinityKey;
+        this.ratio = ratio;
+        matcherFactories =
+                moduleModel.getExtensionLoader(ConditionMatcherFactory.class).getActivateExtensions();
+        if (this.enabled) {
+            this.init(affinityKey);
+        }
+    }
+
+    public void init(String rule) {
+        try {
+            if (rule == null || rule.trim().isEmpty()) {
+                throw new IllegalArgumentException("Illegal affinity rule!");
+            }
+            this.matchMatcher = parseRule(affinityKey);
+        } catch (ParseException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    private ConditionMatcher parseRule(String rule) throws ParseException {
+        ConditionMatcher matcher = getMatcher(rule);
+        // Multiple values
+        Set<String> values = matcher.getMatches();
+        values.add(getUrl().getParameter(rule));
+        return matcher;
+    }
+
+    @Override
+    protected BitList<Invoker<T>> doRoute(
+            BitList<Invoker<T>> invokers,
+            URL url,
+            Invocation invocation,
+            boolean needToPrintMessage,
+            Holder<RouterSnapshotNode<T>> nodeHolder,
+            Holder<String> messageHolder)
+            throws RpcException {
+        if (!enabled) {
+            if (needToPrintMessage) {
+                messageHolder.set("Directly return. Reason: AffinityRouter disabled.");
+            }
+            return invokers;
+        }
+
+        if (CollectionUtils.isEmpty(invokers)) {
+            if (needToPrintMessage) {
+                messageHolder.set("Directly return. Reason: Invokers from previous router is empty.");
+            }
+            return invokers;
+        }
+        try {
+            BitList<Invoker<T>> result = invokers.clone();
+            result.removeIf(invoker -> !matchInvoker(invoker.getUrl(), url));
+
+            if (result.size() / (double) invokers.size() >= ratio / (double) 100) {
+                if (needToPrintMessage) {
+                    messageHolder.set("Match return.");
+                }
+                return result;
+            } else {
+                logger.warn(
+                        CLUSTER_CONDITIONAL_ROUTE_LIST_EMPTY,
+                        "execute affinity state router result is less than defined" + this.ratio,
+                        "",
+                        "The affinity result is ignored. consumer: " + NetUtils.getLocalHost()
+                                + ", service: " + url.getServiceKey() + ", router: "
+                                + url.getParameterAndDecoded(RULE_KEY));
+                if (needToPrintMessage) {
+                    messageHolder.set("Directly return. Reason: Affinity state router result is less than defined.");
+                }
+                return invokers;
+            }
+        } catch (Throwable t) {
+            logger.error(
+                    CLUSTER_FAILED_EXEC_CONDITION_ROUTER,
+                    "execute affinity state router exception",
+                    "",
+                    "Failed to execute affinity router rule: " + getUrl() + ", invokers: " + invokers + ", cause: "
+                            + t.getMessage(),
+                    t);
+        }
+        if (needToPrintMessage) {
+            messageHolder.set("Directly return. Reason: Error occurred ( or result is empty ).");
+        }
+        return invokers;
+    }
+
+    @Override
+    public boolean isRuntime() {
+        // We always return true for previously defined Router, that is, old Router doesn't support cache anymore.
+        //        return true;
+        return this.getUrl().getParameter(RUNTIME_KEY, false);
+    }
+
+    private ConditionMatcher getMatcher(String key) {
+        return moduleModel
+                .getExtensionLoader(ConditionMatcherFactory.class)
+                .getExtension("param")
+                .createMatcher(key, moduleModel);
+    }
+
+    private boolean matchInvoker(URL url, URL param) {
+        return doMatch(url, param, null, matchMatcher);
+    }
+
+    private boolean doMatch(URL url, URL param, Invocation invocation, ConditionMatcher matcher) {
+        Map<String, String> sample = url.toOriginalMap();
+        if (!matcher.isMatch(sample, param, invocation, false)) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouterFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouterFactory.java
new file mode 100644
index 0000000..db96b4f
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityStateRouterFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.cluster.router.state.CacheableStateRouterFactory;
+import org.apache.dubbo.rpc.cluster.router.state.StateRouter;
+
+/**
+ * affinity router factory
+ */
+public class AffinityStateRouterFactory extends CacheableStateRouterFactory {
+
+    public static final String NAME = "affinity";
+
+    @Override
+    protected <T> StateRouter<T> createRouter(Class<T> interfaceClass, URL url) {
+        return new AffinityStateRouter<T>(url);
+    }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityListenableStateRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityListenableStateRouter.java
new file mode 100644
index 0000000..f5b543d
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityListenableStateRouter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.Holder;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.router.AbstractRouterRule;
+import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode;
+import org.apache.dubbo.rpc.cluster.router.affinity.AffinityStateRouter;
+import org.apache.dubbo.rpc.cluster.router.affinity.config.model.AffinityRouterRule;
+import org.apache.dubbo.rpc.cluster.router.affinity.config.model.AffinityRuleParser;
+import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+import org.apache.dubbo.rpc.cluster.router.state.TailStateRouter;
+
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_RULE_PARSING;
+
+/**
+ * Abstract router which listens to dynamic configuration
+ */
+public abstract class AffinityListenableStateRouter<T> extends AbstractStateRouter<T> implements ConfigurationListener {
+    public static final String NAME = "Affinity_LISTENABLE_ROUTER";
+    public static final String RULE_SUFFIX = ".affinity-router";
+
+    private static final ErrorTypeAwareLogger logger =
+            LoggerFactory.getErrorTypeAwareLogger(AffinityListenableStateRouter.class);
+    private volatile AffinityRouterRule affinityRouterRule;
+    private volatile AffinityStateRouter<T> affinityRouter;
+    private final String ruleKey;
+
+    public AffinityListenableStateRouter(URL url, String ruleKey) {
+        super(url);
+        this.setForce(false);
+        this.init(ruleKey);
+        this.ruleKey = ruleKey;
+    }
+
+    @Override
+    public synchronized void process(ConfigChangedEvent event) {
+        if (logger.isInfoEnabled()) {
+            logger.info("Notification of affinity rule, change type is: " + event.getChangeType() + ", raw rule is:\n "
+                    + event.getContent());
+        }
+
+        if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
+            affinityRouterRule = null;
+            affinityRouter = null;
+        } else {
+            try {
+                affinityRouterRule = AffinityRuleParser.parse(event.getContent());
+                generateConditions(affinityRouterRule);
+            } catch (Exception e) {
+                logger.error(
+                        CLUSTER_FAILED_RULE_PARSING,
+                        "Failed to parse the raw affinity rule",
+                        "",
+                        "Failed to parse the raw affinity rule and it will not take effect, please check "
+                                + "if the affinity rule matches with the template, the raw rule is:\n "
+                                + event.getContent(),
+                        e);
+            }
+        }
+    }
+
+    @Override
+    public BitList<Invoker<T>> doRoute(
+            BitList<Invoker<T>> invokers,
+            URL url,
+            Invocation invocation,
+            boolean needToPrintMessage,
+            Holder<RouterSnapshotNode<T>> nodeHolder,
+            Holder<String> messageHolder)
+            throws RpcException {
+        if (CollectionUtils.isEmpty(invokers) || affinityRouter == null) {
+            if (needToPrintMessage) {
+                messageHolder.set(
+                        "Directly return. Reason: Invokers from previous router is empty or affinityRouter is null.");
+            }
+            return invokers;
+        }
+
+        // We will check enabled status inside each router.
+        StringBuilder resultMessage = null;
+        if (needToPrintMessage) {
+            resultMessage = new StringBuilder();
+        }
+        invokers = affinityRouter.route(invokers, url, invocation, needToPrintMessage, nodeHolder);
+        if (needToPrintMessage) {
+            resultMessage.append(messageHolder.get());
+        }
+
+        if (needToPrintMessage) {
+            messageHolder.set(resultMessage.toString());
+        }
+
+        return invokers;
+    }
+
+    @Override
+    public boolean isForce() {
+        return (affinityRouterRule != null && affinityRouterRule.isForce());
+    }
+
+    private boolean isRuleRuntime() {
+        return affinityRouterRule != null && affinityRouterRule.isValid() && affinityRouterRule.isRuntime();
+    }
+
+    private void generateConditions(AbstractRouterRule rule) {
+        if (rule == null || !rule.isValid()) {
+            return;
+        }
+        AffinityRouterRule affinityRule = (AffinityRouterRule) rule;
+        affinityRouter = new AffinityStateRouter<>(
+                getUrl(), affinityRule.getAffinityKey(), affinityRule.getRatio(), affinityRule.isEnabled());
+        affinityRouter.setNextRouter(TailStateRouter.getInstance());
+    }
+
+    private synchronized void init(String ruleKey) {
+        if (StringUtils.isEmpty(ruleKey)) {
+            return;
+        }
+        String routerKey = ruleKey + RULE_SUFFIX;
+        this.getRuleRepository().addListener(routerKey, this);
+        String rule = this.getRuleRepository().getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP);
+        if (StringUtils.isNotEmpty(rule)) {
+            this.process(new ConfigChangedEvent(routerKey, DynamicConfiguration.DEFAULT_GROUP, rule));
+        }
+    }
+
+    public AffinityStateRouter<T> getAffinityRouter() {
+        return affinityRouter;
+    }
+
+    @Override
+    public void stop() {
+        this.getRuleRepository().removeListener(ruleKey + RULE_SUFFIX, this);
+    }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouter.java
new file mode 100644
index 0000000..8690ab8
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.router.condition.config.ListenableStateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_TAG_ROUTE_EMPTY;
+import static org.apache.dubbo.common.utils.StringUtils.isEmpty;
+
+/**
+ * Application level affinity router, "application.affinity-router"
+ */
+public class AffinityProviderAppStateRouter<T> extends ListenableStateRouter<T> {
+    private static final ErrorTypeAwareLogger logger =
+            LoggerFactory.getErrorTypeAwareLogger(ListenableStateRouter.class);
+    public static final String NAME = "AFFINITY_PROVIDER_APP_ROUTER";
+    private String application;
+    private final String currentApplication;
+
+    public AffinityProviderAppStateRouter(URL url) {
+        super(url, url.getApplication());
+        this.currentApplication = url.getApplication();
+    }
+
+    @Override
+    public void notify(BitList<Invoker<T>> invokers) {
+        if (CollectionUtils.isEmpty(invokers)) {
+            return;
+        }
+
+        Invoker<T> invoker = invokers.get(0);
+        URL url = invoker.getUrl();
+        String providerApplication = url.getRemoteApplication();
+
+        // provider application is empty or equals with the current application
+        if (isEmpty(providerApplication)) {
+            logger.warn(
+                    CLUSTER_TAG_ROUTE_EMPTY,
+                    "affinity router get providerApplication is empty, will not subscribe to provider app rules.",
+                    "",
+                    "");
+            return;
+        }
+        if (providerApplication.equals(currentApplication)) {
+            return;
+        }
+
+        synchronized (this) {
+            if (!providerApplication.equals(application)) {
+                if (StringUtils.isNotEmpty(application)) {
+                    this.getRuleRepository().removeListener(application + RULE_SUFFIX, this);
+                }
+                String key = providerApplication + RULE_SUFFIX;
+                this.getRuleRepository().addListener(key, this);
+                application = providerApplication;
+                String rawRule = this.getRuleRepository().getRule(key, DynamicConfiguration.DEFAULT_GROUP);
+                if (StringUtils.isNotEmpty(rawRule)) {
+                    this.process(new ConfigChangedEvent(key, DynamicConfiguration.DEFAULT_GROUP, rawRule));
+                }
+            }
+        }
+    }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouterFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouterFactory.java
new file mode 100644
index 0000000..a4a0e82
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityProviderAppStateRouterFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.cluster.router.state.CacheableStateRouterFactory;
+import org.apache.dubbo.rpc.cluster.router.state.StateRouter;
+
+/**
+ * AffinityProvider router factory
+ */
+@Activate(order = 135)
+public class AffinityProviderAppStateRouterFactory extends CacheableStateRouterFactory {
+
+    public static final String NAME = "affinity-provider-app";
+
+    @Override
+    protected <T> StateRouter<T> createRouter(Class<T> interfaceClass, URL url) {
+        return new AffinityProviderAppStateRouter<>(url);
+    }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouter.java
new file mode 100644
index 0000000..d15551e
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+
+/**
+ * Service level router, "server-unique-name.affinity-router"
+ */
+public class AffinityServiceStateRouter<T> extends AffinityListenableStateRouter<T> {
+    public static final String NAME = "AFFINITY_SERVICE_ROUTER";
+
+    public AffinityServiceStateRouter(URL url) {
+        super(url, DynamicConfiguration.getRuleKey(url));
+    }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouterFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouterFactory.java
new file mode 100644
index 0000000..e36a417
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/AffinityServiceStateRouterFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.cluster.router.state.CacheableStateRouterFactory;
+import org.apache.dubbo.rpc.cluster.router.state.StateRouter;
+
+/**
+ * Service level affinity router factory
+ */
+@Activate(order = 130)
+public class AffinityServiceStateRouterFactory extends CacheableStateRouterFactory {
+
+    public static final String NAME = "affinity_service";
+
+    @Override
+    protected <T> StateRouter<T> createRouter(Class<T> interfaceClass, URL url) {
+        return new AffinityServiceStateRouter<T>(url);
+    }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRouterRule.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRouterRule.java
new file mode 100644
index 0000000..a095692
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRouterRule.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config.model;
+
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.cluster.router.AbstractRouterRule;
+
+import java.util.Map;
+
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_RULE_PARSING;
+import static org.apache.dubbo.rpc.cluster.Constants.AFFINITY_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.DefaultAffinityRatio;
+
+public class AffinityRouterRule extends AbstractRouterRule {
+
+    private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AffinityRouterRule.class);
+    private String affinityKey;
+    private Double ratio;
+
+    @SuppressWarnings("unchecked")
+    public static AffinityRouterRule parseFromMap(Map<String, Object> map) {
+        AffinityRouterRule affinityRouterRule = new AffinityRouterRule();
+        affinityRouterRule.parseFromMap0(map);
+        Object conditions = map.get(AFFINITY_KEY);
+
+        Map<String, String> conditionMap = (Map<String, String>) conditions;
+        affinityRouterRule.setAffinityKey(conditionMap.get("key"));
+        Object ratio = conditionMap.getOrDefault("ratio", String.valueOf(DefaultAffinityRatio));
+        affinityRouterRule.setRatio(Double.valueOf(String.valueOf(ratio)));
+
+        if (affinityRouterRule.getRatio() > 100 || affinityRouterRule.getRatio() < 0) {
+            logger.error(
+                    CLUSTER_FAILED_RULE_PARSING,
+                    "Invalid affinity router config.",
+                    "",
+                    "The ratio value must range from 0 to 100");
+            affinityRouterRule.setValid(false);
+        }
+        return affinityRouterRule;
+    }
+
+    public AffinityRouterRule() {}
+
+    public String getAffinityKey() {
+        return affinityKey;
+    }
+
+    public void setAffinityKey(String affinityKey) {
+        this.affinityKey = affinityKey;
+    }
+
+    public Double getRatio() {
+        return ratio;
+    }
+
+    public void setRatio(Double ratio) {
+        this.ratio = ratio;
+    }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRuleParser.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRuleParser.java
new file mode 100644
index 0000000..23d86fb
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/affinity/config/model/AffinityRuleParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity.config.model;
+
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Map;
+
+import org.yaml.snakeyaml.LoaderOptions;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+import static org.apache.dubbo.rpc.cluster.Constants.CONFIG_VERSION_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.RULE_VERSION_V31;
+
+/**
+ * # dubbo/config/group/{$name}.affinity-router
+ * configVersion: v3.1
+ * scope: service # Or application
+ * key: service.apache.com
+ * enabled: true
+ * runtime: true
+ * affinityAware:
+ *   key: region
+ *   ratio: 20
+ */
+public class AffinityRuleParser {
+
+    public static AffinityRouterRule parse(String rawRule) {
+        AffinityRouterRule rule;
+        Yaml yaml = new Yaml(new SafeConstructor(new LoaderOptions()));
+        Map<String, Object> map = yaml.load(rawRule);
+        String confVersion = (String) map.get(CONFIG_VERSION_KEY);
+
+        rule = AffinityRouterRule.parseFromMap(map);
+        if (StringUtils.isEmpty(rule.getAffinityKey()) || !confVersion.startsWith(RULE_VERSION_V31)) {
+            rule.setValid(false);
+        }
+        rule.setRawRule(rawRule);
+
+        return rule;
+    }
+}
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityRouteTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityRouteTest.java
new file mode 100644
index 0000000..c2a3607
--- /dev/null
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/affinity/AffinityRouteTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.affinity;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.utils.Holder;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.cluster.router.MockInvoker;
+import org.apache.dubbo.rpc.cluster.router.affinity.config.AffinityServiceStateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class AffinityRouteTest {
+
+    private static BitList<Invoker<String>> invokers;
+
+    private static List<String> providerUrls;
+
+    @BeforeAll
+    public static void setUp() {
+
+        providerUrls = Arrays.asList(
+                "dubbo://127.0.0.1/com.foo.BarService",
+                "dubbo://127.0.0.1/com.foo.BarService",
+                "dubbo://127.0.0.1/com.foo.BarService?env=normal",
+                "dubbo://127.0.0.1/com.foo.BarService?env=normal",
+                "dubbo://127.0.0.1/com.foo.BarService?env=normal",
+                "dubbo://127.0.0.1/com.foo.BarService?region=beijing",
+                "dubbo://127.0.0.1/com.foo.BarService?region=beijing",
+                "dubbo://127.0.0.1/com.foo.BarService?region=beijing",
+                "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+                "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+                "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+                "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+                "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=normal",
+                "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou",
+                "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou",
+                "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray",
+                "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray",
+                "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal",
+                "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal",
+                "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal",
+                "dubbo://dubbo.apache.org/com.foo.BarService",
+                "dubbo://dubbo.apache.org/com.foo.BarService",
+                "dubbo://dubbo.apache.org/com.foo.BarService?env=normal",
+                "dubbo://dubbo.apache.org/com.foo.BarService?env=normal",
+                "dubbo://dubbo.apache.org/com.foo.BarService?env=normal",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=normal",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal",
+                "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal");
+
+        List<Invoker<String>> invokerList = providerUrls.stream()
+                .map(url -> new MockInvoker<String>(URL.valueOf(url)))
+                .collect(Collectors.toList());
+
+        invokers = new BitList<>(invokerList);
+    }
+
+    public List<String> filtrate(List<String> invokers, String key) {
+
+        return invokers.stream().filter(invoker -> invoker.contains(key)).collect(Collectors.toList());
+    }
+
+    @Test
+    void testMetAffinityRoute() {
+        String config = "configVersion: v3.1\n"
+                + "scope: service\n"
+                + "key: service.apache.com\n"
+                + "enabled: true\n"
+                + "runtime: true\n"
+                + "affinityAware:\n"
+                + "  key: region\n"
+                + "  ratio: 20\n";
+
+        AffinityServiceStateRouter<String> affinityRoute = new AffinityServiceStateRouter<>(
+                URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing"));
+
+        affinityRoute.process(new ConfigChangedEvent("com.foo.BarService", "", config, ConfigChangeType.ADDED));
+
+        RpcInvocation invocation = new RpcInvocation();
+        invocation.setMethodName("getComment");
+
+        BitList<Invoker<String>> res = affinityRoute.route(
+                invokers.clone(),
+                URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing"),
+                invocation,
+                false,
+                new Holder<>());
+        List<String> filtered = filtrate(new ArrayList<String>(providerUrls), "region=beijing");
+
+        assertEquals(filtered.size(), res.size());
+        System.out.println("The affinity routing condition is met and the result is routed");
+    }
+
+    @Test
+    void testUnMetAffinityRoute() {
+        String config = "configVersion: v3.1\n"
+                + "scope: service\n"
+                + "key: service.apache.com\n"
+                + "enabled: true\n"
+                + "runtime: true\n"
+                + "affinityAware:\n"
+                + "  key: region\n"
+                + "  ratio: 80\n";
+
+        AffinityServiceStateRouter<String> affinityRoute = new AffinityServiceStateRouter<>(
+                URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing"));
+
+        affinityRoute.process(new ConfigChangedEvent("com.foo.BarService", "", config, ConfigChangeType.ADDED));
+
+        RpcInvocation invocation = new RpcInvocation();
+        invocation.setMethodName("getComment");
+
+        BitList<Invoker<String>> res = affinityRoute.route(
+                invokers.clone(),
+                URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing"),
+                invocation,
+                false,
+                new Holder<>());
+        List<String> filtered = filtrate(new ArrayList<String>(providerUrls), "region=beijing");
+
+        assertEquals(invokers.size(), res.size());
+        System.out.println("The affinity routing condition was not met and the result was not routed");
+    }
+
+    @Test
+    void testRatioEqualsAffinityRoute() {
+        String config = "configVersion: v3.1\n"
+                + "scope: service\n"
+                + "key: service.apache.com\n"
+                + "enabled: true\n"
+                + "runtime: true\n"
+                + "affinityAware:\n"
+                + "  key: region\n"
+                + "  ratio: 40\n";
+
+        AffinityServiceStateRouter<String> affinityRoute = new AffinityServiceStateRouter<>(
+                URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing"));
+
+        affinityRoute.process(new ConfigChangedEvent("com.foo.BarService", "", config, ConfigChangeType.ADDED));
+
+        RpcInvocation invocation = new RpcInvocation();
+        invocation.setMethodName("getComment");
+
+        BitList<Invoker<String>> res = affinityRoute.route(
+                invokers.clone(),
+                URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing"),
+                invocation,
+                false,
+                new Holder<>());
+        List<String> filtered = filtrate(new ArrayList<String>(providerUrls), "region=beijing");
+
+        assertEquals(filtered.size(), res.size());
+        System.out.println("The affinity routing condition is met and the result is routed");
+    }
+
+    @Test
+    void testRatioNotEqualsAffinityRoute() {
+        String config = "configVersion: v3.1\n"
+                + "scope: service\n"
+                + "key: service.apache.com\n"
+                + "enabled: true\n"
+                + "runtime: true\n"
+                + "affinityAware:\n"
+                + "  key: region\n"
+                + "  ratio: 40.1\n";
+
+        AffinityServiceStateRouter<String> affinityRoute = new AffinityServiceStateRouter<>(
+                URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing"));
+
+        affinityRoute.process(new ConfigChangedEvent("com.foo.BarService", "", config, ConfigChangeType.ADDED));
+
+        RpcInvocation invocation = new RpcInvocation();
+        invocation.setMethodName("getComment");
+
+        BitList<Invoker<String>> res = affinityRoute.route(
+                invokers.clone(),
+                URL.valueOf("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing"),
+                invocation,
+                false,
+                new Holder<>());
+        List<String> filtered = filtrate(new ArrayList<String>(providerUrls), "region=beijing");
+
+        assertEquals(invokers.size(), res.size());
+        System.out.println("The affinity routing condition was not met and the result was not routed");
+    }
+}