[SCB-2689]fix rate limiting configuration changed cause NPE problem (#3374)

diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java
index f04ace8..8885af4 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.qps;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.servicecomb.core.Handler;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
@@ -28,6 +29,12 @@
  * Support 3 levels of microservice/schema/operation.
  */
 public class ConsumerQpsFlowControlHandler implements Handler {
+
+  @VisibleForTesting
+  public QpsControllerManager getQpsControllerMgr() {
+    return qpsControllerMgr;
+  }
+
   private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(false);
 
   @Override
@@ -40,7 +47,8 @@
     QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation);
     if (qpsStrategy.isLimitNewRequest()) {
       // return http status 429
-      CommonExceptionData errorData = new CommonExceptionData("consumer request rejected by qps flowcontrol");
+      CommonExceptionData errorData = new CommonExceptionData(
+          "consumer request rejected by qps flowcontrol");
       asyncResp.consumerFail(
           new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData));
       return;
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
index 009f81c..21016db 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
@@ -24,7 +24,10 @@
 import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ProviderQpsFlowControlHandler implements Handler {
+
   private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(true);
 
   @Override
@@ -44,16 +47,19 @@
 
     String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE);
     QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(microserviceName, invocation);
-    isLimitNewRequest(qpsStrategy, asyncResp);
+    checkRequestRateLimited(qpsStrategy, asyncResp);
   }
 
-  private boolean isLimitNewRequest(QpsStrategy qpsStrategy, AsyncResponse asyncResp) {
+  private void checkRequestRateLimited(QpsStrategy qpsStrategy, AsyncResponse asyncResp) {
     if (qpsStrategy.isLimitNewRequest()) {
-      CommonExceptionData errorData = new CommonExceptionData("provider request rejected by qps flowcontrol");
+      CommonExceptionData errorData = new CommonExceptionData(
+          "provider request rejected by qps flowcontrol");
       asyncResp.producerFail(new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData));
-      return true;
-    } else {
-      return false;
     }
   }
+
+  @VisibleForTesting
+  public QpsControllerManager getQpsControllerMgr() {
+    return qpsControllerMgr;
+  }
 }
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java
index 1e954a6..436d6db 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java
@@ -32,6 +32,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.netflix.config.DynamicProperty;
 
 public class QpsControllerManager {
@@ -75,6 +76,11 @@
     initGlobalQpsController();
   }
 
+  @VisibleForTesting
+  public Map<String, AbstractQpsStrategy> getQualifiedNameControllerMap() {
+    return qualifiedNameControllerMap;
+  }
+
   public QpsStrategy getOrCreate(String microserviceName, Invocation invocation) {
     final String name = validatedName(microserviceName);
     return qualifiedNameControllerMap
@@ -95,7 +101,8 @@
    * Create relevant qpsLimit dynamicProperty and watch the configuration change.
    * Search and return a valid qpsController.
    */
-  private AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName,
+  @VisibleForTesting
+  AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName,
       Invocation invocation) {
     createForService(qualifiedNameKey, microserviceName, invocation);
     String qualifiedAnyServiceName = Config.ANY_SERVICE + qualifiedNameKey.substring(microserviceName.length());
@@ -155,11 +162,6 @@
     return null;
   }
 
-  private boolean keyMatch(String configKey, Entry<String, AbstractQpsStrategy> controllerEntry) {
-    return controllerEntry.getKey().equals(configKey)
-        || controllerEntry.getKey().startsWith(configKey + SEPARATOR);
-  }
-
   private boolean isValidQpsController(AbstractQpsStrategy qpsStrategy) {
     return null != qpsStrategy && null != qpsStrategy.getQpsLimit();
   }
@@ -182,37 +184,37 @@
       configQpsControllerMap.put(configKey, innerQpsStrategy);
       LOGGER.info("Global flow control strategy update, value = [{}]",
           strategyProperty.getString());
-      updateObjMap(configKey);
+      updateObjMap();
     });
     limitProperty.addCallback(() -> {
       qpsStrategy.setQpsLimit(limitProperty.getLong());
       LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey,
           limitProperty.getString());
-      updateObjMap(configKey);
+      updateObjMap();
     });
     bucketProperty.addCallback(() -> {
       qpsStrategy.setBucketLimit(bucketProperty.getLong());
       LOGGER.info("bucket limit updated, configKey = [{}], value = [{}]", configKey,
           bucketProperty.getString());
-      updateObjMap(configKey);
+      updateObjMap();
     });
 
     configQpsControllerMap.put(configKey, qpsStrategy);
   }
 
-  protected void updateObjMap(String configKey) {
+  protected void updateObjMap() {
     Iterator<Entry<String, AbstractQpsStrategy>> it = qualifiedNameControllerMap.entrySet().iterator();
     while (it.hasNext()) {
       Map.Entry<String, AbstractQpsStrategy> entry = it.next();
-      if (keyMatch(configKey, entry)) {
-        AbstractQpsStrategy qpsStrategy = searchQpsController(entry.getKey());
-        if (qpsStrategy != null) {
-          entry.setValue(qpsStrategy);
-          LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]",
-              entry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit());
-        } else {
-          it.remove();
-        }
+      AbstractQpsStrategy qpsStrategy = searchQpsController(entry.getKey());
+      if (qpsStrategy == null) {
+        it.remove();
+        continue;
+      }
+      if (qpsStrategy != entry.getValue()) {
+        entry.setValue(qpsStrategy);
+        LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]",
+            entry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit());
       }
     }
   }
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java
index 4fd884e..65204c3 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java
@@ -29,7 +29,7 @@
   private volatile long msCycleBegin;
 
   // Request count between Interval begin and now in one interval
-  private AtomicLong requestCount = new AtomicLong();
+  private final AtomicLong requestCount = new AtomicLong();
 
   // request count  before an interval
   private volatile long lastRequestCount = 1;
@@ -55,9 +55,8 @@
     // Configuration update and use is at the situation of multi-threaded concurrency
     // It is possible that operation level updated to null,but schema level or microservice level does not updated
     boolean isLimitRequest = newCount - lastRequestCount >= this.getQpsLimit();
-    if (isLimitRequest) {
-      LOGGER.warn("qps flowcontrol open, qpsLimit is {} and tps is {}", this.getQpsLimit(),
-          newCount - lastRequestCount + 1);
+    if (isLimitRequest){
+      LOGGER.warn("qps flowcontrol open, qpsLimit is {} and tps is {}", this.getQpsLimit(), newCount - lastRequestCount + 1);
     }
     return isLimitRequest;
   }
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java
index 180be4d..b8bf65c 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java
@@ -33,7 +33,7 @@
   private static final Logger LOGGER = LoggerFactory.getLogger(LeakyBucketStrategy.class);
 
   // Request count between Interval begin and now in one interval
-  private volatile AtomicLong requestCount = new AtomicLong();
+  private final AtomicLong requestCount = new AtomicLong();
 
   private volatile long lastTime;
 
@@ -47,7 +47,7 @@
       throw new IllegalStateException("should not happen");
     }
     if (this.getBucketLimit() == null) {
-      this.setBucketLimit(Math.max(2 * this.getQpsLimit(), Integer.MAX_VALUE));
+      this.setBucketLimit(Math.min(2 * this.getQpsLimit(), Integer.MAX_VALUE));
     }
     long nowTime = System.currentTimeMillis();
     //get the num of te period time