[SCB-2878]operation based load balancer (#4332)

diff --git a/core/src/main/java/org/apache/servicecomb/core/NonSwaggerInvocation.java b/core/src/main/java/org/apache/servicecomb/core/NonSwaggerInvocation.java
index d1f97d7..e7f903e 100644
--- a/core/src/main/java/org/apache/servicecomb/core/NonSwaggerInvocation.java
+++ b/core/src/main/java/org/apache/servicecomb/core/NonSwaggerInvocation.java
@@ -29,12 +29,12 @@
 
   @Override
   public String getSchemaId() {
-    throw new UnsupportedOperationException();
+    return "third-schema";
   }
 
   @Override
   public String getOperationName() {
-    throw new UnsupportedOperationException();
+    return "third-operation";
   }
 
   @Override
diff --git a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/discovery/DiscoveryTree.java b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/discovery/DiscoveryTree.java
index d040cf4..c7b70f6 100644
--- a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/discovery/DiscoveryTree.java
+++ b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/discovery/DiscoveryTree.java
@@ -92,10 +92,14 @@
   }
 
   private void log() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("DiscoveryFilters(name, enabled, order, group):");
     for (DiscoveryFilter filter : filters) {
-      LOGGER.info("DiscoveryFilter {}, enabled {}, order {}.",
-          filter.getClass().getName(), filter.enabled(), filter.getOrder());
+      sb.append("(").append(filter.getClass().getName()).append(",")
+          .append(filter.enabled()).append(",").append(filter.getOrder()).append(",")
+          .append(filter.isGroupingFilter()).append(")");
     }
+    LOGGER.info(sb.toString());
   }
 
   boolean isMatch(VersionedCache existing, VersionedCache inputCache) {
@@ -181,7 +185,7 @@
         }
 
         // no rerun support, go on even result is empty
-        // because maybe some filter use other mechanism to create a instance(eg:domain name)
+        // because maybe some filter use other mechanism to create an instance(eg:domain name)
       }
 
       parent = child;
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java
index 61f7a2e..e7dabe3 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java
@@ -20,6 +20,7 @@
 import java.util.Map;
 
 import org.apache.servicecomb.config.ConfigUtil;
+import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.foundation.common.LegacyPropertyFactory;
 
 /**
@@ -30,55 +31,62 @@
   //// 2.1 configuration items
   public static final String ROOT = "servicecomb.loadbalance.";
 
-  public static final String SERVER_EXPIRED_IN_SECONDS = "servicecomb.loadbalance.stats.serverExpiredInSeconds";
-
-  public static final String TIMER_INTERVAL_IN_MILLIS = "servicecomb.loadbalance.stats.timerIntervalInMillis";
+  public static final String RULE_STRATEGY_GLOBAL = "servicecomb.loadbalance.strategy.name";
 
   public static final String RULE_STRATEGY_NAME = "strategy.name";
 
   // 2.0 configuration items
   public static final String ROOT_20 = "ribbon.";
 
-  // retry configurations
-  public static final String RETRY_HANDLER = "retryHandler";
-
   // SessionStickinessRule configruation
   public static final String SESSION_TIMEOUT_IN_SECONDS = "SessionStickinessRule.sessionTimeoutInSeconds";
 
   public static final String SUCCESSIVE_FAILED_TIMES = "SessionStickinessRule.successiveFailedTimes";
 
-  private static final double PERCENT = 100;
-
   public static final String FILTER_ISOLATION = "isolation.";
 
-  public static final String FILTER_OPEN = "enabled";
-
-  public static final String FILTER_ERROR_PERCENTAGE = "errorThresholdPercentage";
-
-  public static final String FILTER_ENABLE_REQUEST = "enableRequestThreshold";
-
-  public static final String FILTER_RECOVER_IMMEDIATELY_WHEN_SUCCESS = "recoverImmediatelyWhenSuccess";
-
-  public static final String FILTER_SINGLE_TEST = "singleTestTime";
-
   public static final String FILTER_MAX_SINGLE_TEST_WINDOW = "maxSingleTestWindow";
 
-  public static final String FILTER_MIN_ISOLATION_TIME = "minIsolationTime";
-
-  public static final String FILTER_CONTINUOUS_FAILURE_THRESHOLD = "continuousFailureThreshold";
-
   public static final String TRANSACTIONCONTROL_OPTIONS_PREFIX_PATTERN =
       "servicecomb.loadbalance.%s.transactionControl.options";
 
   public static final Configuration INSTANCE = new Configuration();
 
+  public record RuleType(int type, String value) {
+    public static final int TYPE_SCHEMA = 1;
+
+    public static final int TYPE_OPERATION = 2;
+
+    public String getValue() {
+      return value;
+    }
+
+    public int getType() {
+      return type;
+    }
+  }
+
   private Configuration() {
   }
 
-  public String getRuleStrategyName(String microservice) {
-    return getStringProperty(null,
-        ROOT + microservice + "." + RULE_STRATEGY_NAME,
-        ROOT + RULE_STRATEGY_NAME);
+  public RuleType getRuleStrategyName(Invocation invocation) {
+    String value = getStringProperty(null, ROOT + invocation.getMicroserviceName() + "." +
+        invocation.getSchemaId() + "." + invocation.getOperationName() + "." + RULE_STRATEGY_NAME);
+    if (value != null) {
+      return new RuleType(RuleType.TYPE_OPERATION, value);
+    }
+    value = getStringProperty(null, ROOT + invocation.getMicroserviceName() + "." +
+        invocation.getSchemaId() + "." + RULE_STRATEGY_NAME);
+    if (value != null) {
+      return new RuleType(RuleType.TYPE_SCHEMA, value);
+    }
+    value = getStringProperty(null, ROOT + invocation.getMicroserviceName() + "." +
+        RULE_STRATEGY_NAME);
+    if (value != null) {
+      return new RuleType(RuleType.TYPE_SCHEMA, value);
+    }
+    return new RuleType(RuleType.TYPE_SCHEMA,
+        getStringProperty("RoundRobin", RULE_STRATEGY_GLOBAL));
   }
 
   public int getSessionTimeoutInSeconds(String microservice) {
@@ -105,49 +113,6 @@
     }
   }
 
-  public boolean isIsolationFilterOpen(String microservice) {
-    String p = getStringProperty("true",
-        ROOT + microservice + "." + FILTER_ISOLATION + FILTER_OPEN,
-        ROOT + FILTER_ISOLATION + FILTER_OPEN);
-    return Boolean.parseBoolean(p);
-  }
-
-  public int getErrorThresholdPercentage(String microservice) {
-    final int defaultValue = 0;
-    String p = getStringProperty("0",
-        ROOT + microservice + "." + FILTER_ISOLATION + FILTER_ERROR_PERCENTAGE,
-        ROOT + FILTER_ISOLATION + FILTER_ERROR_PERCENTAGE);
-    try {
-      int result = Integer.parseInt(p);
-      if (result <= PERCENT && result > 0) {
-        return result;
-      }
-      return defaultValue;
-    } catch (NumberFormatException e) {
-      return defaultValue;
-    }
-  }
-
-  public int getEnableRequestThreshold(String microservice) {
-    return getThreshold(microservice, FILTER_ENABLE_REQUEST);
-  }
-
-  public int getSingleTestTime(String microservice) {
-    final int defaultValue = 60000;
-    String p = getStringProperty("60000",
-        ROOT + microservice + "." + FILTER_ISOLATION + FILTER_SINGLE_TEST,
-        ROOT + FILTER_ISOLATION + FILTER_SINGLE_TEST);
-    try {
-      int result = Integer.parseInt(p);
-      if (result >= 0) {
-        return result;
-      }
-      return defaultValue;
-    } catch (NumberFormatException e) {
-      return defaultValue;
-    }
-  }
-
   public int getMaxSingleTestWindow() {
     final int defaultValue = 60000;
     String p = getStringProperty(Integer.toString(defaultValue),
@@ -163,29 +128,6 @@
     }
   }
 
-  public int getMinIsolationTime(String microservice) {
-    final int defaultValue = 3000; // 3 seconds
-    String p = getStringProperty("3000",
-        ROOT + microservice + "." + FILTER_ISOLATION + FILTER_MIN_ISOLATION_TIME,
-        ROOT + FILTER_ISOLATION + FILTER_MIN_ISOLATION_TIME);
-    try {
-      int result = Integer.parseInt(p);
-      if (result >= 0) {
-        return result;
-      }
-      return defaultValue;
-    } catch (NumberFormatException e) {
-      return defaultValue;
-    }
-  }
-
-  public boolean isRecoverImmediatelyWhenSuccess(String microservice) {
-    String p = getStringProperty("true",
-        ROOT + microservice + "." + FILTER_ISOLATION + FILTER_RECOVER_IMMEDIATELY_WHEN_SUCCESS,
-        ROOT + FILTER_ISOLATION + FILTER_RECOVER_IMMEDIATELY_WHEN_SUCCESS);
-    return Boolean.parseBoolean(p);
-  }
-
   public Map<String, String> getFlowsplitFilterOptions(String microservice) {
     String keyPrefix = String.format(TRANSACTIONCONTROL_OPTIONS_PREFIX_PATTERN, microservice);
     return ConfigUtil.stringPropertiesWithPrefix(LegacyPropertyFactory.getEnvironment(), keyPrefix);
@@ -201,20 +143,4 @@
     }
     return defaultValue;
   }
-
-  public int getContinuousFailureThreshold(String microservice) {
-    return getThreshold(microservice, FILTER_CONTINUOUS_FAILURE_THRESHOLD);
-  }
-
-  private int getThreshold(String microservice, String threshold) {
-    final int defaultValue = 5;
-    String p = getStringProperty("5",
-        ROOT + microservice + "." + FILTER_ISOLATION + threshold,
-        ROOT + FILTER_ISOLATION + threshold);
-    try {
-      return Integer.parseInt(p);
-    } catch (NumberFormatException e) {
-      return defaultValue;
-    }
-  }
 }
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java
index 342bb8c..dc4cd6b 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java
@@ -25,8 +25,4 @@
   default RuleExt createLoadBalancerRule(String ruleName) {
     return null;
   }
-
-  default ServerListFilterExt createServerListFilter(String key, String value, Object... args) {
-    return null;
-  }
 }
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java
index 0d0245e..2187f59 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java
@@ -18,26 +18,19 @@
 
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class ExtensionsManager {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ExtensionsManager.class);
-
   private final List<ExtensionsFactory> extensionsFactories;
 
   public ExtensionsManager(List<ExtensionsFactory> extensionsFactories) {
     this.extensionsFactories = extensionsFactories;
   }
 
-  public RuleExt createLoadBalancerRule(String microservice) {
+  public RuleExt createLoadBalancerRule(String ruleStrategyName) {
     RuleExt rule = null;
 
     for (ExtensionsFactory factory : extensionsFactories) {
-      if (factory.isSupport(Configuration.RULE_STRATEGY_NAME,
-          Configuration.INSTANCE.getRuleStrategyName(microservice))) {
-        rule = factory.createLoadBalancerRule(
-            Configuration.INSTANCE.getRuleStrategyName(microservice));
+      if (factory.isSupport(Configuration.RULE_STRATEGY_NAME, ruleStrategyName)) {
+        rule = factory.createLoadBalancerRule(ruleStrategyName);
         break;
       }
     }
@@ -46,7 +39,6 @@
       rule = new RoundRobinRuleExt();
     }
 
-    LOGGER.info("Using load balance rule {} for microservice {}.", rule.getClass().getName(), microservice);
     return rule;
   }
 }
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadBalanceFilter.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadBalanceFilter.java
index 637e370..6f4f1a0 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadBalanceFilter.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadBalanceFilter.java
@@ -18,11 +18,12 @@
 
 import java.net.URI;
 import java.util.Map;
-import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.config.ConfigurationChangedEvent;
 import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.SCBEngine;
@@ -35,6 +36,8 @@
 import org.apache.servicecomb.core.governance.RetryContext;
 import org.apache.servicecomb.foundation.common.cache.VersionedCache;
 import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
+import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.apache.servicecomb.loadbalance.Configuration.RuleType;
 import org.apache.servicecomb.registry.discovery.DiscoveryContext;
 import org.apache.servicecomb.registry.discovery.DiscoveryTree;
 import org.apache.servicecomb.swagger.invocation.Response;
@@ -45,6 +48,7 @@
 import org.springframework.beans.factory.annotation.Autowired;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.eventbus.Subscribe;
 
 import io.github.resilience4j.core.metrics.Metrics.Outcome;
 import jakarta.ws.rs.core.Response.Status;
@@ -70,8 +74,6 @@
 
   private final ExtensionsManager extensionsManager;
 
-  private String strategy = null;
-
   private final SCBEngine scbEngine;
 
   // set endpoint in invocation.localContext
@@ -88,6 +90,22 @@
             boolean.class, false);
     this.extensionsManager = extensionsManager;
     this.discoveryTree = discoveryTree;
+    EventManager.register(this);
+  }
+
+  @Subscribe
+  @SuppressWarnings("unused")
+  public void onConfigurationChangedEvent(ConfigurationChangedEvent event) {
+    Set<String> changedKeys = event.getChanged();
+    for (String key : changedKeys) {
+      if (key.startsWith(Configuration.ROOT)) {
+        synchronized (lock) {
+          clearLoadBalancer();
+        }
+        LOGGER.info("clear load balance rule for configuration changed, {}", key);
+        break;
+      }
+    }
   }
 
   private void preCheck(SCBEngine scbEngine) {
@@ -132,15 +150,6 @@
 
     invocation.addLocalContext(RetryContext.RETRY_LOAD_BALANCE, true);
 
-    String strategy = Configuration.INSTANCE.getRuleStrategyName(invocation.getMicroserviceName());
-    if (!Objects.equals(strategy, this.strategy)) {
-      //配置变化,需要重新生成所有的lb实例
-      synchronized (lock) {
-        clearLoadBalancer();
-      }
-    }
-    this.strategy = strategy;
-
     LoadBalancer loadBalancer = getOrCreateLoadBalancer(invocation);
 
     return send(invocation, nextNode, loadBalancer);
@@ -268,12 +277,22 @@
         invocation.getMicroserviceName());
     invocation.addLocalContext(CONTEXT_KEY_SERVER_LIST, serversVersionedCache.data());
 
-    return loadBalancerMap
-        .computeIfAbsent(serversVersionedCache.name(), name -> createLoadBalancer(invocation.getMicroserviceName()));
+    RuleType ruleType = Configuration.INSTANCE.getRuleStrategyName(invocation);
+    String cacheKey;
+    if (ruleType.getType() == RuleType.TYPE_SCHEMA) {
+      cacheKey = invocation.getAppId() + "-" +
+          invocation.getMicroserviceName() + "-" + invocation.getSchemaId();
+    } else {
+      cacheKey = invocation.getAppId() + "-" +
+          invocation.getMicroserviceName() + "-" + invocation.getSchemaId() + "-" + invocation.getOperationName();
+    }
+    return loadBalancerMap.computeIfAbsent(cacheKey,
+        key -> createLoadBalancer(ruleType, key, invocation.getMicroserviceName()));
   }
 
-  private LoadBalancer createLoadBalancer(String microserviceName) {
-    RuleExt rule = extensionsManager.createLoadBalancerRule(microserviceName);
+  private LoadBalancer createLoadBalancer(RuleType ruleType, String cacheKey, String microserviceName) {
+    RuleExt rule = extensionsManager.createLoadBalancerRule(ruleType.getValue());
+    LOGGER.info("Using load balance rule {} for microservice {}.", rule.getClass().getName(), cacheKey);
     return new LoadBalancer(rule, microserviceName);
   }
 }
diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
index 7ad8057..9a87ee3 100644
--- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
+++ b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
@@ -41,22 +41,15 @@
 
   @Test
   public void testRuleName() {
-    Mockito.when(environment.getProperty("servicecomb.loadbalance.mytest1.strategy.name"))
-        .thenReturn("RoundRobin");
-    Mockito.when(environment.getProperty("servicecomb.loadbalance.mytest2.strategy.name"))
-        .thenReturn("Random");
-    Mockito.when(environment.getProperty("servicecomb.loadbalance.mytest3.strategy.name"))
-        .thenReturn("WeightedResponse");
-
     List<ExtensionsFactory> extensionsFactories = new ArrayList<>();
     extensionsFactories.add(new RuleNameExtentionsFactory());
     ExtensionsManager extensionsManager = new ExtensionsManager(extensionsFactories);
 
     Assertions.assertEquals(RoundRobinRuleExt.class.getName(),
-        extensionsManager.createLoadBalancerRule("mytest1").getClass().getName());
+        extensionsManager.createLoadBalancerRule("RoundRobin").getClass().getName());
     Assertions.assertEquals(RandomRuleExt.class.getName(),
-        extensionsManager.createLoadBalancerRule("mytest2").getClass().getName());
+        extensionsManager.createLoadBalancerRule("Random").getClass().getName());
     Assertions.assertEquals(WeightedResponseTimeRuleExt.class.getName(),
-        extensionsManager.createLoadBalancerRule("mytest3").getClass().getName());
+        extensionsManager.createLoadBalancerRule("WeightedResponse").getClass().getName());
   }
 }