[SCB-2878]operation based load balancer (#4332)
diff --git a/core/src/main/java/org/apache/servicecomb/core/NonSwaggerInvocation.java b/core/src/main/java/org/apache/servicecomb/core/NonSwaggerInvocation.java
index d1f97d7..e7f903e 100644
--- a/core/src/main/java/org/apache/servicecomb/core/NonSwaggerInvocation.java
+++ b/core/src/main/java/org/apache/servicecomb/core/NonSwaggerInvocation.java
@@ -29,12 +29,12 @@
@Override
public String getSchemaId() {
- throw new UnsupportedOperationException();
+ return "third-schema";
}
@Override
public String getOperationName() {
- throw new UnsupportedOperationException();
+ return "third-operation";
}
@Override
diff --git a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/discovery/DiscoveryTree.java b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/discovery/DiscoveryTree.java
index d040cf4..c7b70f6 100644
--- a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/discovery/DiscoveryTree.java
+++ b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/discovery/DiscoveryTree.java
@@ -92,10 +92,14 @@
}
private void log() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("DiscoveryFilters(name, enabled, order, group):");
for (DiscoveryFilter filter : filters) {
- LOGGER.info("DiscoveryFilter {}, enabled {}, order {}.",
- filter.getClass().getName(), filter.enabled(), filter.getOrder());
+ sb.append("(").append(filter.getClass().getName()).append(",")
+ .append(filter.enabled()).append(",").append(filter.getOrder()).append(",")
+ .append(filter.isGroupingFilter()).append(")");
}
+ LOGGER.info(sb.toString());
}
boolean isMatch(VersionedCache existing, VersionedCache inputCache) {
@@ -181,7 +185,7 @@
}
// no rerun support, go on even result is empty
- // because maybe some filter use other mechanism to create a instance(eg:domain name)
+ // because maybe some filter use other mechanism to create an instance(eg:domain name)
}
parent = child;
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java
index 61f7a2e..e7dabe3 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java
@@ -20,6 +20,7 @@
import java.util.Map;
import org.apache.servicecomb.config.ConfigUtil;
+import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.foundation.common.LegacyPropertyFactory;
/**
@@ -30,55 +31,62 @@
//// 2.1 configuration items
public static final String ROOT = "servicecomb.loadbalance.";
- public static final String SERVER_EXPIRED_IN_SECONDS = "servicecomb.loadbalance.stats.serverExpiredInSeconds";
-
- public static final String TIMER_INTERVAL_IN_MILLIS = "servicecomb.loadbalance.stats.timerIntervalInMillis";
+ public static final String RULE_STRATEGY_GLOBAL = "servicecomb.loadbalance.strategy.name";
public static final String RULE_STRATEGY_NAME = "strategy.name";
// 2.0 configuration items
public static final String ROOT_20 = "ribbon.";
- // retry configurations
- public static final String RETRY_HANDLER = "retryHandler";
-
// SessionStickinessRule configruation
public static final String SESSION_TIMEOUT_IN_SECONDS = "SessionStickinessRule.sessionTimeoutInSeconds";
public static final String SUCCESSIVE_FAILED_TIMES = "SessionStickinessRule.successiveFailedTimes";
- private static final double PERCENT = 100;
-
public static final String FILTER_ISOLATION = "isolation.";
- public static final String FILTER_OPEN = "enabled";
-
- public static final String FILTER_ERROR_PERCENTAGE = "errorThresholdPercentage";
-
- public static final String FILTER_ENABLE_REQUEST = "enableRequestThreshold";
-
- public static final String FILTER_RECOVER_IMMEDIATELY_WHEN_SUCCESS = "recoverImmediatelyWhenSuccess";
-
- public static final String FILTER_SINGLE_TEST = "singleTestTime";
-
public static final String FILTER_MAX_SINGLE_TEST_WINDOW = "maxSingleTestWindow";
- public static final String FILTER_MIN_ISOLATION_TIME = "minIsolationTime";
-
- public static final String FILTER_CONTINUOUS_FAILURE_THRESHOLD = "continuousFailureThreshold";
-
public static final String TRANSACTIONCONTROL_OPTIONS_PREFIX_PATTERN =
"servicecomb.loadbalance.%s.transactionControl.options";
public static final Configuration INSTANCE = new Configuration();
+ public record RuleType(int type, String value) {
+ public static final int TYPE_SCHEMA = 1;
+
+ public static final int TYPE_OPERATION = 2;
+
+ public String getValue() {
+ return value;
+ }
+
+ public int getType() {
+ return type;
+ }
+ }
+
private Configuration() {
}
- public String getRuleStrategyName(String microservice) {
- return getStringProperty(null,
- ROOT + microservice + "." + RULE_STRATEGY_NAME,
- ROOT + RULE_STRATEGY_NAME);
+ public RuleType getRuleStrategyName(Invocation invocation) {
+ String value = getStringProperty(null, ROOT + invocation.getMicroserviceName() + "." +
+ invocation.getSchemaId() + "." + invocation.getOperationName() + "." + RULE_STRATEGY_NAME);
+ if (value != null) {
+ return new RuleType(RuleType.TYPE_OPERATION, value);
+ }
+ value = getStringProperty(null, ROOT + invocation.getMicroserviceName() + "." +
+ invocation.getSchemaId() + "." + RULE_STRATEGY_NAME);
+ if (value != null) {
+ return new RuleType(RuleType.TYPE_SCHEMA, value);
+ }
+ value = getStringProperty(null, ROOT + invocation.getMicroserviceName() + "." +
+ RULE_STRATEGY_NAME);
+ if (value != null) {
+ return new RuleType(RuleType.TYPE_SCHEMA, value);
+ }
+ return new RuleType(RuleType.TYPE_SCHEMA,
+ getStringProperty("RoundRobin", RULE_STRATEGY_GLOBAL));
}
public int getSessionTimeoutInSeconds(String microservice) {
@@ -105,49 +113,6 @@
}
}
- public boolean isIsolationFilterOpen(String microservice) {
- String p = getStringProperty("true",
- ROOT + microservice + "." + FILTER_ISOLATION + FILTER_OPEN,
- ROOT + FILTER_ISOLATION + FILTER_OPEN);
- return Boolean.parseBoolean(p);
- }
-
- public int getErrorThresholdPercentage(String microservice) {
- final int defaultValue = 0;
- String p = getStringProperty("0",
- ROOT + microservice + "." + FILTER_ISOLATION + FILTER_ERROR_PERCENTAGE,
- ROOT + FILTER_ISOLATION + FILTER_ERROR_PERCENTAGE);
- try {
- int result = Integer.parseInt(p);
- if (result <= PERCENT && result > 0) {
- return result;
- }
- return defaultValue;
- } catch (NumberFormatException e) {
- return defaultValue;
- }
- }
-
- public int getEnableRequestThreshold(String microservice) {
- return getThreshold(microservice, FILTER_ENABLE_REQUEST);
- }
-
- public int getSingleTestTime(String microservice) {
- final int defaultValue = 60000;
- String p = getStringProperty("60000",
- ROOT + microservice + "." + FILTER_ISOLATION + FILTER_SINGLE_TEST,
- ROOT + FILTER_ISOLATION + FILTER_SINGLE_TEST);
- try {
- int result = Integer.parseInt(p);
- if (result >= 0) {
- return result;
- }
- return defaultValue;
- } catch (NumberFormatException e) {
- return defaultValue;
- }
- }
-
public int getMaxSingleTestWindow() {
final int defaultValue = 60000;
String p = getStringProperty(Integer.toString(defaultValue),
@@ -163,29 +128,6 @@
}
}
- public int getMinIsolationTime(String microservice) {
- final int defaultValue = 3000; // 3 seconds
- String p = getStringProperty("3000",
- ROOT + microservice + "." + FILTER_ISOLATION + FILTER_MIN_ISOLATION_TIME,
- ROOT + FILTER_ISOLATION + FILTER_MIN_ISOLATION_TIME);
- try {
- int result = Integer.parseInt(p);
- if (result >= 0) {
- return result;
- }
- return defaultValue;
- } catch (NumberFormatException e) {
- return defaultValue;
- }
- }
-
- public boolean isRecoverImmediatelyWhenSuccess(String microservice) {
- String p = getStringProperty("true",
- ROOT + microservice + "." + FILTER_ISOLATION + FILTER_RECOVER_IMMEDIATELY_WHEN_SUCCESS,
- ROOT + FILTER_ISOLATION + FILTER_RECOVER_IMMEDIATELY_WHEN_SUCCESS);
- return Boolean.parseBoolean(p);
- }
-
public Map<String, String> getFlowsplitFilterOptions(String microservice) {
String keyPrefix = String.format(TRANSACTIONCONTROL_OPTIONS_PREFIX_PATTERN, microservice);
return ConfigUtil.stringPropertiesWithPrefix(LegacyPropertyFactory.getEnvironment(), keyPrefix);
@@ -201,20 +143,4 @@
}
return defaultValue;
}
-
- public int getContinuousFailureThreshold(String microservice) {
- return getThreshold(microservice, FILTER_CONTINUOUS_FAILURE_THRESHOLD);
- }
-
- private int getThreshold(String microservice, String threshold) {
- final int defaultValue = 5;
- String p = getStringProperty("5",
- ROOT + microservice + "." + FILTER_ISOLATION + threshold,
- ROOT + FILTER_ISOLATION + threshold);
- try {
- return Integer.parseInt(p);
- } catch (NumberFormatException e) {
- return defaultValue;
- }
- }
}
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java
index 342bb8c..dc4cd6b 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java
@@ -25,8 +25,4 @@
default RuleExt createLoadBalancerRule(String ruleName) {
return null;
}
-
- default ServerListFilterExt createServerListFilter(String key, String value, Object... args) {
- return null;
- }
}
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java
index 0d0245e..2187f59 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java
@@ -18,26 +18,19 @@
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class ExtensionsManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(ExtensionsManager.class);
-
private final List<ExtensionsFactory> extensionsFactories;
public ExtensionsManager(List<ExtensionsFactory> extensionsFactories) {
this.extensionsFactories = extensionsFactories;
}
- public RuleExt createLoadBalancerRule(String microservice) {
+ public RuleExt createLoadBalancerRule(String ruleStrategyName) {
RuleExt rule = null;
for (ExtensionsFactory factory : extensionsFactories) {
- if (factory.isSupport(Configuration.RULE_STRATEGY_NAME,
- Configuration.INSTANCE.getRuleStrategyName(microservice))) {
- rule = factory.createLoadBalancerRule(
- Configuration.INSTANCE.getRuleStrategyName(microservice));
+ if (factory.isSupport(Configuration.RULE_STRATEGY_NAME, ruleStrategyName)) {
+ rule = factory.createLoadBalancerRule(ruleStrategyName);
break;
}
}
@@ -46,7 +39,6 @@
rule = new RoundRobinRuleExt();
}
- LOGGER.info("Using load balance rule {} for microservice {}.", rule.getClass().getName(), microservice);
return rule;
}
}
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadBalanceFilter.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadBalanceFilter.java
index 637e370..6f4f1a0 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadBalanceFilter.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadBalanceFilter.java
@@ -18,11 +18,12 @@
import java.net.URI;
import java.util.Map;
-import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.config.ConfigurationChangedEvent;
import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.SCBEngine;
@@ -35,6 +36,8 @@
import org.apache.servicecomb.core.governance.RetryContext;
import org.apache.servicecomb.foundation.common.cache.VersionedCache;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
+import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.apache.servicecomb.loadbalance.Configuration.RuleType;
import org.apache.servicecomb.registry.discovery.DiscoveryContext;
import org.apache.servicecomb.registry.discovery.DiscoveryTree;
import org.apache.servicecomb.swagger.invocation.Response;
@@ -45,6 +48,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.eventbus.Subscribe;
import io.github.resilience4j.core.metrics.Metrics.Outcome;
import jakarta.ws.rs.core.Response.Status;
@@ -70,8 +74,6 @@
private final ExtensionsManager extensionsManager;
- private String strategy = null;
-
private final SCBEngine scbEngine;
// set endpoint in invocation.localContext
@@ -88,6 +90,22 @@
boolean.class, false);
this.extensionsManager = extensionsManager;
this.discoveryTree = discoveryTree;
+ EventManager.register(this);
+ }
+
+ @Subscribe
+ @SuppressWarnings("unused")
+ public void onConfigurationChangedEvent(ConfigurationChangedEvent event) {
+ Set<String> changedKeys = event.getChanged();
+ for (String key : changedKeys) {
+ if (key.startsWith(Configuration.ROOT)) {
+ synchronized (lock) {
+ clearLoadBalancer();
+ }
+ LOGGER.info("clear load balance rule for configuration changed, {}", key);
+ break;
+ }
+ }
}
private void preCheck(SCBEngine scbEngine) {
@@ -132,15 +150,6 @@
invocation.addLocalContext(RetryContext.RETRY_LOAD_BALANCE, true);
- String strategy = Configuration.INSTANCE.getRuleStrategyName(invocation.getMicroserviceName());
- if (!Objects.equals(strategy, this.strategy)) {
- //配置变化,需要重新生成所有的lb实例
- synchronized (lock) {
- clearLoadBalancer();
- }
- }
- this.strategy = strategy;
-
LoadBalancer loadBalancer = getOrCreateLoadBalancer(invocation);
return send(invocation, nextNode, loadBalancer);
@@ -268,12 +277,22 @@
invocation.getMicroserviceName());
invocation.addLocalContext(CONTEXT_KEY_SERVER_LIST, serversVersionedCache.data());
- return loadBalancerMap
- .computeIfAbsent(serversVersionedCache.name(), name -> createLoadBalancer(invocation.getMicroserviceName()));
+ RuleType ruleType = Configuration.INSTANCE.getRuleStrategyName(invocation);
+ String cacheKey;
+ if (ruleType.getType() == RuleType.TYPE_SCHEMA) {
+ cacheKey = invocation.getAppId() + "-" +
+ invocation.getMicroserviceName() + "-" + invocation.getSchemaId();
+ } else {
+ cacheKey = invocation.getAppId() + "-" +
+ invocation.getMicroserviceName() + "-" + invocation.getSchemaId() + "-" + invocation.getOperationName();
+ }
+ return loadBalancerMap.computeIfAbsent(cacheKey,
+ key -> createLoadBalancer(ruleType, key, invocation.getMicroserviceName()));
}
- private LoadBalancer createLoadBalancer(String microserviceName) {
- RuleExt rule = extensionsManager.createLoadBalancerRule(microserviceName);
+ private LoadBalancer createLoadBalancer(RuleType ruleType, String cacheKey, String microserviceName) {
+ RuleExt rule = extensionsManager.createLoadBalancerRule(ruleType.getValue());
+ LOGGER.info("Using load balance rule {} for microservice {}.", rule.getClass().getName(), cacheKey);
return new LoadBalancer(rule, microserviceName);
}
}
diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
index 7ad8057..9a87ee3 100644
--- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
+++ b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
@@ -41,22 +41,15 @@
@Test
public void testRuleName() {
- Mockito.when(environment.getProperty("servicecomb.loadbalance.mytest1.strategy.name"))
- .thenReturn("RoundRobin");
- Mockito.when(environment.getProperty("servicecomb.loadbalance.mytest2.strategy.name"))
- .thenReturn("Random");
- Mockito.when(environment.getProperty("servicecomb.loadbalance.mytest3.strategy.name"))
- .thenReturn("WeightedResponse");
-
List<ExtensionsFactory> extensionsFactories = new ArrayList<>();
extensionsFactories.add(new RuleNameExtentionsFactory());
ExtensionsManager extensionsManager = new ExtensionsManager(extensionsFactories);
Assertions.assertEquals(RoundRobinRuleExt.class.getName(),
- extensionsManager.createLoadBalancerRule("mytest1").getClass().getName());
+ extensionsManager.createLoadBalancerRule("RoundRobin").getClass().getName());
Assertions.assertEquals(RandomRuleExt.class.getName(),
- extensionsManager.createLoadBalancerRule("mytest2").getClass().getName());
+ extensionsManager.createLoadBalancerRule("Random").getClass().getName());
Assertions.assertEquals(WeightedResponseTimeRuleExt.class.getName(),
- extensionsManager.createLoadBalancerRule("mytest3").getClass().getName());
+ extensionsManager.createLoadBalancerRule("WeightedResponse").getClass().getName());
}
}