[SCB-2689]fix rate limiting configuration changed cause NPE problem (#3374)
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java
index f04ace8..8885af4 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java
@@ -17,6 +17,7 @@
package org.apache.servicecomb.qps;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
@@ -28,6 +29,12 @@
* Support 3 levels of microservice/schema/operation.
*/
public class ConsumerQpsFlowControlHandler implements Handler {
+
+ @VisibleForTesting
+ public QpsControllerManager getQpsControllerMgr() {
+ return qpsControllerMgr;
+ }
+
private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(false);
@Override
@@ -40,7 +47,8 @@
QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation);
if (qpsStrategy.isLimitNewRequest()) {
// return http status 429
- CommonExceptionData errorData = new CommonExceptionData("consumer request rejected by qps flowcontrol");
+ CommonExceptionData errorData = new CommonExceptionData(
+ "consumer request rejected by qps flowcontrol");
asyncResp.consumerFail(
new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData));
return;
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
index 009f81c..21016db 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
@@ -24,7 +24,10 @@
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import com.google.common.annotations.VisibleForTesting;
+
public class ProviderQpsFlowControlHandler implements Handler {
+
private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(true);
@Override
@@ -44,16 +47,19 @@
String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE);
QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(microserviceName, invocation);
- isLimitNewRequest(qpsStrategy, asyncResp);
+ checkRequestRateLimited(qpsStrategy, asyncResp);
}
- private boolean isLimitNewRequest(QpsStrategy qpsStrategy, AsyncResponse asyncResp) {
+ private void checkRequestRateLimited(QpsStrategy qpsStrategy, AsyncResponse asyncResp) {
if (qpsStrategy.isLimitNewRequest()) {
- CommonExceptionData errorData = new CommonExceptionData("provider request rejected by qps flowcontrol");
+ CommonExceptionData errorData = new CommonExceptionData(
+ "provider request rejected by qps flowcontrol");
asyncResp.producerFail(new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData));
- return true;
- } else {
- return false;
}
}
+
+ @VisibleForTesting
+ public QpsControllerManager getQpsControllerMgr() {
+ return qpsControllerMgr;
+ }
}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java
index 1e954a6..436d6db 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java
@@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.netflix.config.DynamicProperty;
public class QpsControllerManager {
@@ -75,6 +76,11 @@
initGlobalQpsController();
}
+ @VisibleForTesting
+ public Map<String, AbstractQpsStrategy> getQualifiedNameControllerMap() {
+ return qualifiedNameControllerMap;
+ }
+
public QpsStrategy getOrCreate(String microserviceName, Invocation invocation) {
final String name = validatedName(microserviceName);
return qualifiedNameControllerMap
@@ -95,7 +101,8 @@
* Create relevant qpsLimit dynamicProperty and watch the configuration change.
* Search and return a valid qpsController.
*/
- private AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName,
+ @VisibleForTesting
+ AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName,
Invocation invocation) {
createForService(qualifiedNameKey, microserviceName, invocation);
String qualifiedAnyServiceName = Config.ANY_SERVICE + qualifiedNameKey.substring(microserviceName.length());
@@ -155,11 +162,6 @@
return null;
}
- private boolean keyMatch(String configKey, Entry<String, AbstractQpsStrategy> controllerEntry) {
- return controllerEntry.getKey().equals(configKey)
- || controllerEntry.getKey().startsWith(configKey + SEPARATOR);
- }
-
private boolean isValidQpsController(AbstractQpsStrategy qpsStrategy) {
return null != qpsStrategy && null != qpsStrategy.getQpsLimit();
}
@@ -182,37 +184,37 @@
configQpsControllerMap.put(configKey, innerQpsStrategy);
LOGGER.info("Global flow control strategy update, value = [{}]",
strategyProperty.getString());
- updateObjMap(configKey);
+ updateObjMap();
});
limitProperty.addCallback(() -> {
qpsStrategy.setQpsLimit(limitProperty.getLong());
LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey,
limitProperty.getString());
- updateObjMap(configKey);
+ updateObjMap();
});
bucketProperty.addCallback(() -> {
qpsStrategy.setBucketLimit(bucketProperty.getLong());
LOGGER.info("bucket limit updated, configKey = [{}], value = [{}]", configKey,
bucketProperty.getString());
- updateObjMap(configKey);
+ updateObjMap();
});
configQpsControllerMap.put(configKey, qpsStrategy);
}
- protected void updateObjMap(String configKey) {
+ protected void updateObjMap() {
Iterator<Entry<String, AbstractQpsStrategy>> it = qualifiedNameControllerMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, AbstractQpsStrategy> entry = it.next();
- if (keyMatch(configKey, entry)) {
- AbstractQpsStrategy qpsStrategy = searchQpsController(entry.getKey());
- if (qpsStrategy != null) {
- entry.setValue(qpsStrategy);
- LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]",
- entry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit());
- } else {
- it.remove();
- }
+ AbstractQpsStrategy qpsStrategy = searchQpsController(entry.getKey());
+ if (qpsStrategy == null) {
+ it.remove();
+ continue;
+ }
+ if (qpsStrategy != entry.getValue()) {
+ entry.setValue(qpsStrategy);
+ LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]",
+ entry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit());
}
}
}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java
index 4fd884e..65204c3 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java
@@ -29,7 +29,7 @@
private volatile long msCycleBegin;
// Request count between Interval begin and now in one interval
- private AtomicLong requestCount = new AtomicLong();
+ private final AtomicLong requestCount = new AtomicLong();
// request count before an interval
private volatile long lastRequestCount = 1;
@@ -55,9 +55,8 @@
// Configuration update and use is at the situation of multi-threaded concurrency
// It is possible that operation level updated to null,but schema level or microservice level does not updated
boolean isLimitRequest = newCount - lastRequestCount >= this.getQpsLimit();
- if (isLimitRequest) {
- LOGGER.warn("qps flowcontrol open, qpsLimit is {} and tps is {}", this.getQpsLimit(),
- newCount - lastRequestCount + 1);
+ if (isLimitRequest){
+ LOGGER.warn("qps flowcontrol open, qpsLimit is {} and tps is {}", this.getQpsLimit(), newCount - lastRequestCount + 1);
}
return isLimitRequest;
}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java
index 180be4d..b8bf65c 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java
@@ -33,7 +33,7 @@
private static final Logger LOGGER = LoggerFactory.getLogger(LeakyBucketStrategy.class);
// Request count between Interval begin and now in one interval
- private volatile AtomicLong requestCount = new AtomicLong();
+ private final AtomicLong requestCount = new AtomicLong();
private volatile long lastTime;
@@ -47,7 +47,7 @@
throw new IllegalStateException("should not happen");
}
if (this.getBucketLimit() == null) {
- this.setBucketLimit(Math.max(2 * this.getQpsLimit(), Integer.MAX_VALUE));
+ this.setBucketLimit(Math.min(2 * this.getQpsLimit(), Integer.MAX_VALUE));
}
long nowTime = System.currentTimeMillis();
//get the num of te period time